Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.stats.OpStatsListener;
Expand Down Expand Up @@ -65,6 +68,39 @@ public static <T> T result(CompletableFuture<T> future, long timeout, TimeUnit t
return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER, timeout, timeUnit);
}

@ThreadSafe
public static class Sequencer<T> {
private CompletableFuture<T> sequencerFuture = CompletableFuture.completedFuture(null);
private final boolean allowExceptionBreakChain;

public Sequencer(boolean allowExceptionBreakChain) {
this.allowExceptionBreakChain = allowExceptionBreakChain;
}

public static <T> Sequencer<T> create(boolean allowExceptionBreakChain) {
return new Sequencer<>(allowExceptionBreakChain);
}
public static <T> Sequencer<T> create() {
return new Sequencer<>(false);
}

/**
* @throws NullPointerException NPE when param is null
*/
public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
Objects.requireNonNull(newTask);
if (sequencerFuture.isDone()) {
if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) {
return sequencerFuture;
}
return sequencerFuture = newTask.get();
}
return sequencerFuture = allowExceptionBreakChain
? sequencerFuture.thenCompose(__ -> newTask.get())
: sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get());
}
}

@SneakyThrows(InterruptedException.class)
public static <T, ExceptionT extends Throwable> T result(
CompletableFuture<T> future, Function<Throwable, ExceptionT> exceptionHandler) throws ExceptionT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.bookkeeper.discover;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
Expand Down Expand Up @@ -179,21 +180,25 @@ public void close() {
private WatchTask watchWritableBookiesTask = null;
@Getter(AccessLevel.PACKAGE)
private WatchTask watchReadOnlyBookiesTask = null;
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo =
new ConcurrentHashMap<>();
private final Watcher bookieServiceInfoCacheInvalidation;
private final boolean bookieAddressTracking;
// registration paths
private final String bookieRegistrationPath;
private final String bookieAllRegistrationPath;
private final String bookieReadonlyRegistrationPath;
private final FutureUtils.Sequencer<Void> sequencer;

public ZKRegistrationClient(ZooKeeper zk,
String ledgersRootPath,
ScheduledExecutorService scheduler,
boolean bookieAddressTracking) {
this.zk = zk;
this.scheduler = scheduler;
this.sequencer = FutureUtils.Sequencer.create();
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
// we can disable this feature, in case the BK cluster has only
Expand Down Expand Up @@ -239,7 +244,10 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
// we can only serve data from cache here,
// because it can happen than this method is called inside the main
// zookeeper client event loop thread
Versioned<BookieServiceInfo> resultFromCache = bookieServiceInfoCache.get(bookieId);
Versioned<BookieServiceInfo> resultFromCache = writableBookieInfo.get(bookieId);
if (resultFromCache == null) {
resultFromCache = readOnlyBookieInfo.get(bookieId);
}
if (log.isDebugEnabled()) {
log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache);
}
Expand All @@ -250,6 +258,21 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
}
}

private Versioned<BookieServiceInfo> updateBookieInfo(BookieId bookieId, boolean isReadonly,
byte[] bytes, Stat stat)
throws IOException {
BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes);
Versioned<BookieServiceInfo> result = new Versioned<>(bookieServiceInfo,
new LongVersion(stat.getCversion()));
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, result.getValue());
if (isReadonly) {
readOnlyBookieInfo.put(bookieId, result);
} else {
writableBookieInfo.put(bookieId, result);
}
return result;
}

/**
* Read BookieServiceInfo from ZooKeeper and updates the local cache.
*
Expand All @@ -263,35 +286,64 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsy
CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation,
(int rc, String path, Object o, byte[] bytes, Stat stat) -> {
if (KeeperException.Code.OK.intValue() == rc) {
try {
BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes);
Versioned<BookieServiceInfo> result = new Versioned<>(bookieServiceInfo,
new LongVersion(stat.getCversion()));
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, result.getValue());
bookieServiceInfoCache.put(bookieId, result);
promise.complete(result);
} catch (IOException ex) {
log.error("Cannot update BookieInfo for ", ex);
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
.initCause(ex));
return;
}
} else if (KeeperException.Code.NONODE.intValue() == rc) {
// not found, looking for a readonly bookie
zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
(int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
if (KeeperException.Code.OK.intValue() == rc) {
try {
BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes2);
Versioned<BookieServiceInfo> result =
new Versioned<>(bookieServiceInfo, new LongVersion(stat2.getCversion()));
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, result.getValue());
bookieServiceInfoCache.put(bookieId, result);
updateBookieInfo(bookieId, false, bytes, stat);
promise.complete(result);
} catch (IOException ex) {
log.error("Cannot update BookieInfo for ", ex);
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc2), path2)
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
.initCause(ex));
return;
}
} else if (KeeperException.Code.NONODE.intValue() == rc) {
// not found, looking for a readonly bookie
zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
(int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
try {
Versioned<BookieServiceInfo> result =
updateBookieInfo(bookieId, true, bytes, stat);
promise.complete(result);
} catch (IOException ex) {
log.error("Cannot update BookieInfo for ", ex);
promise.completeExceptionally(
KeeperException.create(KeeperException.Code.get(rc2), path2)
.initCause(ex)
);
return;
}
} else {
// not found as writable and readonly, the bookie is offline
promise.completeExceptionally(
BKException.create(BKException.Code.NoBookieAvailableException)
);
}
}, null);
} else {
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
}
}, null);
return promise;
}


private CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
String pathAsReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;

CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
// not found, looking for a readonly bookie
zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
(int rc, String path, Object o, byte[] bytes, Stat stat) -> {
if (KeeperException.Code.OK.intValue() == rc) {
try {
Versioned<BookieServiceInfo> result =
updateBookieInfo(bookieId, true, bytes, stat);
promise.complete(result);
} catch (IOException ex) {
log.error("Cannot update BookieInfo for ", ex);
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
.initCause(ex));
return;
}
Expand All @@ -300,10 +352,30 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsy
promise.completeExceptionally(BKException.create(BKException.Code.NoBookieAvailableException));
}
}, null);
} else {
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
}
}, null);
return promise;
}

private CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsWritableBookie(BookieId bookieId) {
String pathAsWritable = bookieRegistrationPath + "/" + bookieId;

CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation,
(int rc, String path, Object o, byte[] bytes, Stat stat) -> {
if (KeeperException.Code.OK.intValue() == rc) {
try {
Versioned<BookieServiceInfo> result =
updateBookieInfo(bookieId, false, bytes, stat);
promise.complete(result);
} catch (IOException ex) {
log.error("Cannot update BookieInfo for ", ex);
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
.initCause(ex));
return;
}
} else {
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
}
}, null);
return promise;
}

Expand Down Expand Up @@ -358,7 +430,24 @@ private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String regPath,
List<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated = new ArrayList<>(bookies.size());
for (BookieId id : bookies) {
// update the cache for new bookies
if (!bookieServiceInfoCache.containsKey(id)) {
if (path.equals(bookieReadonlyRegistrationPath)) {
if (readOnlyBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
continue;
}
}
if (path.equals(bookieRegistrationPath)) {
if (writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id));
continue;
}

}
if (path.equals(bookieAllRegistrationPath)) {
if (writableBookieInfo.get(id) != null || readOnlyBookieInfo.get(id) != null) {
// jump to next bookie id
continue;
}
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
}
}
Expand Down Expand Up @@ -484,30 +573,57 @@ public void process(WatchedEvent we) {
if (log.isDebugEnabled()) {
log.debug("zk event {} for {} state {}", we.getType(), we.getPath(), we.getState());
}
if (we.getState() == KeeperState.Expired) {
log.info("zk session expired, invalidating cache");
bookieServiceInfoCache.clear();
return;
}

BookieId bookieId = stripBookieIdFromPath(we.getPath());
if (bookieId == null) {
return;
}
switch (we.getType()) {
case NodeDeleted:
log.info("Invalidate cache for {}", bookieId);
bookieServiceInfoCache.remove(bookieId);
break;
case NodeDataChanged:
log.info("refresh cache for {}", bookieId);
readBookieServiceInfoAsync(bookieId);
break;
default:
if (log.isDebugEnabled()) {
log.debug("ignore cache event {} for {}", we.getType(), bookieId);
}
break;
// make the notification callback run sequential in background.
final String path = we.getPath();
if (!path.startsWith(bookieReadonlyRegistrationPath) && !path.startsWith(bookieRegistrationPath)) {
// ignore unknown path
return;
}
if (path.equals(bookieReadonlyRegistrationPath) || path.equals(bookieRegistrationPath)) {
// ignore root path
return;
}
sequencer.sequential(() -> {
if (we.getState() == KeeperState.Expired) {
log.info("zk session expired, invalidating cache");
readOnlyBookieInfo.clear();
writableBookieInfo.clear();
}
switch (we.getType()) {
case NodeDeleted:
if (path.startsWith(bookieReadonlyRegistrationPath)) {
log.info("Invalidate readonly cache for {}", bookieId);
readOnlyBookieInfo.remove(bookieId);
}
if (path.startsWith(bookieRegistrationPath)) {
log.info("Invalidate writable cache for {}", bookieId);
writableBookieInfo.remove(bookieId);
}
break;
case NodeDataChanged:
if (path.startsWith(bookieReadonlyRegistrationPath)) {
log.info("refresh readonly cache for {}. path: {}", bookieId, path);
readBookieInfoAsReadonlyBookie(bookieId);
}
if (path.startsWith(bookieRegistrationPath)) {
log.info("refresh writable cache for {}. path: {}", bookieId, path);
readBookieInfoAsWritableBookie(bookieId);
}
break;
default:
if (log.isDebugEnabled()) {
log.debug("ignore cache event {} for {}", we.getType(), bookieId);
}
break;

}
return completedFuture(null);
});
}
}

Expand Down
Loading