From 40e60d5686a831fe492b8abe19513822576b7b8b Mon Sep 17 00:00:00 2001 From: gaozhangmin Date: Thu, 7 Aug 2025 20:42:44 +0800 Subject: [PATCH] fix race condition of zk watch event --- .../common/concurrent/FutureUtils.java | 36 + .../discover/ZKRegistrationClient.java | 218 ++++-- .../FaultInjectableZKRegistrationManager.java | 619 ++++++++++++++++++ .../discover/ZKRegistrationClientTest.java | 174 +++++ 4 files changed, 996 insertions(+), 51 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/FaultInjectableZKRegistrationManager.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/ZKRegistrationClientTest.java diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java index daba9c41ce5..415f1d799e8 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -29,8 +30,10 @@ import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.stats.OpStatsListener; @@ -65,6 +68,39 @@ public static T result(CompletableFuture future, long timeout, TimeUnit t return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER, timeout, timeUnit); } + @ThreadSafe + public static class Sequencer { + private CompletableFuture sequencerFuture = CompletableFuture.completedFuture(null); + private final boolean allowExceptionBreakChain; + + public Sequencer(boolean allowExceptionBreakChain) { + this.allowExceptionBreakChain = allowExceptionBreakChain; + } + + public static Sequencer create(boolean allowExceptionBreakChain) { + return new Sequencer<>(allowExceptionBreakChain); + } + public static Sequencer create() { + return new Sequencer<>(false); + } + + /** + * @throws NullPointerException NPE when param is null + */ + public synchronized CompletableFuture sequential(Supplier> newTask) { + Objects.requireNonNull(newTask); + if (sequencerFuture.isDone()) { + if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) { + return sequencerFuture; + } + return sequencerFuture = newTask.get(); + } + return sequencerFuture = allowExceptionBreakChain + ? sequencerFuture.thenCompose(__ -> newTask.get()) + : sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get()); + } + } + @SneakyThrows(InterruptedException.class) public static T result( CompletableFuture future, Function exceptionHandler) throws ExceptionT { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java index cca631086b0..e3e8ab4bd78 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java @@ -18,6 +18,7 @@ package org.apache.bookkeeper.discover; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; @@ -179,14 +180,17 @@ public void close() { private WatchTask watchWritableBookiesTask = null; @Getter(AccessLevel.PACKAGE) private WatchTask watchReadOnlyBookiesTask = null; - private final ConcurrentHashMap> bookieServiceInfoCache = - new ConcurrentHashMap<>(); + private final ConcurrentHashMap> writableBookieInfo = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap> readOnlyBookieInfo = + new ConcurrentHashMap<>(); private final Watcher bookieServiceInfoCacheInvalidation; private final boolean bookieAddressTracking; // registration paths private final String bookieRegistrationPath; private final String bookieAllRegistrationPath; private final String bookieReadonlyRegistrationPath; + private final FutureUtils.Sequencer sequencer; public ZKRegistrationClient(ZooKeeper zk, String ledgersRootPath, @@ -194,6 +198,7 @@ public ZKRegistrationClient(ZooKeeper zk, boolean bookieAddressTracking) { this.zk = zk; this.scheduler = scheduler; + this.sequencer = FutureUtils.Sequencer.create(); // Following Bookie Network Address Changes is an expensive operation // as it requires additional ZooKeeper watches // we can disable this feature, in case the BK cluster has only @@ -239,7 +244,10 @@ public CompletableFuture> getBookieServiceInfo(Book // we can only serve data from cache here, // because it can happen than this method is called inside the main // zookeeper client event loop thread - Versioned resultFromCache = bookieServiceInfoCache.get(bookieId); + Versioned resultFromCache = writableBookieInfo.get(bookieId); + if (resultFromCache == null) { + resultFromCache = readOnlyBookieInfo.get(bookieId); + } if (log.isDebugEnabled()) { log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache); } @@ -250,6 +258,21 @@ public CompletableFuture> getBookieServiceInfo(Book } } + private Versioned updateBookieInfo(BookieId bookieId, boolean isReadonly, + byte[] bytes, Stat stat) + throws IOException { + BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes); + Versioned result = new Versioned<>(bookieServiceInfo, + new LongVersion(stat.getCversion())); + log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, result.getValue()); + if (isReadonly) { + readOnlyBookieInfo.put(bookieId, result); + } else { + writableBookieInfo.put(bookieId, result); + } + return result; + } + /** * Read BookieServiceInfo from ZooKeeper and updates the local cache. * @@ -263,35 +286,64 @@ private CompletableFuture> readBookieServiceInfoAsy CompletableFuture> promise = new CompletableFuture<>(); zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation, (int rc, String path, Object o, byte[] bytes, Stat stat) -> { - if (KeeperException.Code.OK.intValue() == rc) { - try { - BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes); - Versioned result = new Versioned<>(bookieServiceInfo, - new LongVersion(stat.getCversion())); - log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, result.getValue()); - bookieServiceInfoCache.put(bookieId, result); - promise.complete(result); - } catch (IOException ex) { - log.error("Cannot update BookieInfo for ", ex); - promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path) - .initCause(ex)); - return; - } - } else if (KeeperException.Code.NONODE.intValue() == rc) { - // not found, looking for a readonly bookie - zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation, - (int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> { - if (KeeperException.Code.OK.intValue() == rc2) { + if (KeeperException.Code.OK.intValue() == rc) { try { - BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes2); Versioned result = - new Versioned<>(bookieServiceInfo, new LongVersion(stat2.getCversion())); - log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, result.getValue()); - bookieServiceInfoCache.put(bookieId, result); + updateBookieInfo(bookieId, false, bytes, stat); promise.complete(result); } catch (IOException ex) { log.error("Cannot update BookieInfo for ", ex); - promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc2), path2) + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path) + .initCause(ex)); + return; + } + } else if (KeeperException.Code.NONODE.intValue() == rc) { + // not found, looking for a readonly bookie + zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation, + (int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> { + if (KeeperException.Code.OK.intValue() == rc2) { + try { + Versioned result = + updateBookieInfo(bookieId, true, bytes, stat); + promise.complete(result); + } catch (IOException ex) { + log.error("Cannot update BookieInfo for ", ex); + promise.completeExceptionally( + KeeperException.create(KeeperException.Code.get(rc2), path2) + .initCause(ex) + ); + return; + } + } else { + // not found as writable and readonly, the bookie is offline + promise.completeExceptionally( + BKException.create(BKException.Code.NoBookieAvailableException) + ); + } + }, null); + } else { + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); + } + }, null); + return promise; + } + + + private CompletableFuture> readBookieInfoAsReadonlyBookie(BookieId bookieId) { + String pathAsReadonly = bookieReadonlyRegistrationPath + "/" + bookieId; + + CompletableFuture> promise = new CompletableFuture<>(); + // not found, looking for a readonly bookie + zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation, + (int rc, String path, Object o, byte[] bytes, Stat stat) -> { + if (KeeperException.Code.OK.intValue() == rc) { + try { + Versioned result = + updateBookieInfo(bookieId, true, bytes, stat); + promise.complete(result); + } catch (IOException ex) { + log.error("Cannot update BookieInfo for ", ex); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path) .initCause(ex)); return; } @@ -300,10 +352,30 @@ private CompletableFuture> readBookieServiceInfoAsy promise.completeExceptionally(BKException.create(BKException.Code.NoBookieAvailableException)); } }, null); - } else { - promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); - } - }, null); + return promise; + } + + private CompletableFuture> readBookieInfoAsWritableBookie(BookieId bookieId) { + String pathAsWritable = bookieRegistrationPath + "/" + bookieId; + + CompletableFuture> promise = new CompletableFuture<>(); + zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation, + (int rc, String path, Object o, byte[] bytes, Stat stat) -> { + if (KeeperException.Code.OK.intValue() == rc) { + try { + Versioned result = + updateBookieInfo(bookieId, false, bytes, stat); + promise.complete(result); + } catch (IOException ex) { + log.error("Cannot update BookieInfo for ", ex); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path) + .initCause(ex)); + return; + } + } else { + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); + } + }, null); return promise; } @@ -358,7 +430,24 @@ private CompletableFuture>> getChildren(String regPath, List>> bookieInfoUpdated = new ArrayList<>(bookies.size()); for (BookieId id : bookies) { // update the cache for new bookies - if (!bookieServiceInfoCache.containsKey(id)) { + if (path.equals(bookieReadonlyRegistrationPath)) { + if (readOnlyBookieInfo.get(id) == null) { + bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id)); + continue; + } + } + if (path.equals(bookieRegistrationPath)) { + if (writableBookieInfo.get(id) == null) { + bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id)); + continue; + } + + } + if (path.equals(bookieAllRegistrationPath)) { + if (writableBookieInfo.get(id) != null || readOnlyBookieInfo.get(id) != null) { + // jump to next bookie id + continue; + } bookieInfoUpdated.add(readBookieServiceInfoAsync(id)); } } @@ -484,30 +573,57 @@ public void process(WatchedEvent we) { if (log.isDebugEnabled()) { log.debug("zk event {} for {} state {}", we.getType(), we.getPath(), we.getState()); } - if (we.getState() == KeeperState.Expired) { - log.info("zk session expired, invalidating cache"); - bookieServiceInfoCache.clear(); - return; - } + BookieId bookieId = stripBookieIdFromPath(we.getPath()); if (bookieId == null) { return; } - switch (we.getType()) { - case NodeDeleted: - log.info("Invalidate cache for {}", bookieId); - bookieServiceInfoCache.remove(bookieId); - break; - case NodeDataChanged: - log.info("refresh cache for {}", bookieId); - readBookieServiceInfoAsync(bookieId); - break; - default: - if (log.isDebugEnabled()) { - log.debug("ignore cache event {} for {}", we.getType(), bookieId); - } - break; + // make the notification callback run sequential in background. + final String path = we.getPath(); + if (!path.startsWith(bookieReadonlyRegistrationPath) && !path.startsWith(bookieRegistrationPath)) { + // ignore unknown path + return; + } + if (path.equals(bookieReadonlyRegistrationPath) || path.equals(bookieRegistrationPath)) { + // ignore root path + return; } + sequencer.sequential(() -> { + if (we.getState() == KeeperState.Expired) { + log.info("zk session expired, invalidating cache"); + readOnlyBookieInfo.clear(); + writableBookieInfo.clear(); + } + switch (we.getType()) { + case NodeDeleted: + if (path.startsWith(bookieReadonlyRegistrationPath)) { + log.info("Invalidate readonly cache for {}", bookieId); + readOnlyBookieInfo.remove(bookieId); + } + if (path.startsWith(bookieRegistrationPath)) { + log.info("Invalidate writable cache for {}", bookieId); + writableBookieInfo.remove(bookieId); + } + break; + case NodeDataChanged: + if (path.startsWith(bookieReadonlyRegistrationPath)) { + log.info("refresh readonly cache for {}. path: {}", bookieId, path); + readBookieInfoAsReadonlyBookie(bookieId); + } + if (path.startsWith(bookieRegistrationPath)) { + log.info("refresh writable cache for {}. path: {}", bookieId, path); + readBookieInfoAsWritableBookie(bookieId); + } + break; + default: + if (log.isDebugEnabled()) { + log.debug("ignore cache event {} for {}", we.getType(), bookieId); + } + break; + + } + return completedFuture(null); + }); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/FaultInjectableZKRegistrationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/FaultInjectableZKRegistrationManager.java new file mode 100644 index 00000000000..33bf783f0fa --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/FaultInjectableZKRegistrationManager.java @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.discover; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY; +import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID; +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory; +import org.apache.bookkeeper.meta.LayoutManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.ZkLayoutManager; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.DataFormats; +import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +/** + * Fault injectable ZK registration manager. + * Copy from #{@link org.apache.bookkeeper.discover.ZKRegistrationManager}. + */ +@Slf4j +public class FaultInjectableZKRegistrationManager implements RegistrationManager { + + private static final Function EXCEPTION_FUNC = cause -> { + if (cause instanceof BKException) { + log.error("Failed to get bookie list : ", cause); + return (BKException) cause; + } else if (cause instanceof InterruptedException) { + log.error("Interrupted reading bookie list : ", cause); + return new BKException.BKInterruptedException(); + } else { + return new BKException.MetaStoreException(); + } + }; + + private final ServerConfiguration conf; + private final ZooKeeper zk; + private final List zkAcls; + private final LayoutManager layoutManager; + + private volatile boolean zkRegManagerInitialized = false; + + // ledgers root path + private final String ledgersRootPath; + // cookie path + private final String cookiePath; + // registration paths + protected final String bookieRegistrationPath; + protected final String bookieReadonlyRegistrationPath; + // session timeout in milliseconds + private final int zkTimeoutMs; + private final List listeners = new ArrayList<>(); + private Function hookOnRegisterReadOnly; + + public FaultInjectableZKRegistrationManager(ServerConfiguration conf, + ZooKeeper zk) { + this(conf, zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(conf)); + } + + public FaultInjectableZKRegistrationManager(ServerConfiguration conf, + ZooKeeper zk, + String ledgersRootPath) { + this.conf = conf; + this.zk = zk; + this.zkAcls = ZkUtils.getACLs(conf); + this.ledgersRootPath = ledgersRootPath; + this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE; + this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; + this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; + this.zkTimeoutMs = conf.getZkTimeout(); + + this.layoutManager = new ZkLayoutManager( + zk, + ledgersRootPath, + zkAcls); + + this.zk.register(event -> { + if (!zkRegManagerInitialized) { + // do nothing until first registration + return; + } + // Check for expired connection. + if (event.getType().equals(Watcher.Event.EventType.None) + && event.getState().equals(Watcher.Event.KeeperState.Expired)) { + listeners.forEach(RegistrationListener::onRegistrationExpired); + } + }); + } + + @Override + public void close() { + // no-op + } + + /** + * Returns the CookiePath of the bookie in the ZooKeeper. + * + * @param bookieId bookie id + * @return + */ + public String getCookiePath(BookieId bookieId) { + return this.cookiePath + "/" + bookieId; + } + + // + // Registration Management + // + + /** + * Check existence of regPath and wait it expired if possible. + * + * @param regPath reg node path. + * @return true if regPath exists, otherwise return false + * @throws IOException if can't create reg path + */ + protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException { + final CountDownLatch prevNodeLatch = new CountDownLatch(1); + Watcher zkPrevRegNodewatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + // Check for prev znode deletion. Connection expiration is + // not handling, since bookie has logic to shutdown. + if (Event.EventType.NodeDeleted == event.getType()) { + prevNodeLatch.countDown(); + } + } + }; + try { + Stat stat = zk.exists(regPath, zkPrevRegNodewatcher); + if (null != stat) { + // if the ephemeral owner isn't current zookeeper client + // wait for it to be expired. + if (stat.getEphemeralOwner() != zk.getSessionId()) { + log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:" + + " {} ms for znode deletion", regPath, zkTimeoutMs); + // waiting for the previous bookie reg znode deletion + if (!prevNodeLatch.await(zkTimeoutMs, TimeUnit.MILLISECONDS)) { + throw new KeeperException.NodeExistsException(regPath); + } else { + return false; + } + } + return true; + } else { + return false; + } + } catch (KeeperException ke) { + log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke); + throw new IOException("ZK exception checking and wait ephemeral znode " + + regPath + " expired", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie); + throw new IOException("Interrupted checking and wait ephemeral znode " + + regPath + " expired", ie); + } + } + + @Override + public void registerBookie(BookieId bookieId, boolean readOnly, + BookieServiceInfo bookieServiceInfo) throws BookieException { + if (!readOnly) { + String regPath = bookieRegistrationPath + "/" + bookieId; + doRegisterBookie(regPath, bookieServiceInfo); + } else { + doRegisterReadOnlyBookie(bookieId, bookieServiceInfo); + } + } + + @VisibleForTesting + static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) { + if (log.isDebugEnabled()) { + log.debug("serialize BookieServiceInfo {}", bookieServiceInfo); + } + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + DataFormats.BookieServiceInfoFormat.Builder builder = DataFormats.BookieServiceInfoFormat.newBuilder(); + List bsiEndpoints = bookieServiceInfo.getEndpoints().stream() + .map(e -> { + return DataFormats.BookieServiceInfoFormat.Endpoint.newBuilder() + .setId(e.getId()) + .setPort(e.getPort()) + .setHost(e.getHost()) + .setProtocol(e.getProtocol()) + .addAllAuth(e.getAuth()) + .addAllExtensions(e.getExtensions()) + .build(); + }) + .collect(Collectors.toList()); + + builder.addAllEndpoints(bsiEndpoints); + builder.putAllProperties(bookieServiceInfo.getProperties()); + + builder.build().writeTo(os); + return os.toByteArray(); + } catch (IOException err) { + log.error("Cannot serialize bookieServiceInfo from " + bookieServiceInfo); + throw new RuntimeException(err); + } + } + + private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException { + // ZK ephemeral node for this Bookie. + try { + if (!checkRegNodeAndWaitExpired(regPath)) { + // Create the ZK ephemeral node for this Bookie. + zk.create(regPath, serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL); + zkRegManagerInitialized = true; + } + } catch (KeeperException ke) { + log.error("ZK exception registering ephemeral Znode for Bookie!", ke); + // Throw an IOException back up. This will cause the Bookie + // constructor to error out. Alternatively, we could do a System + // exit here as this is a fatal error. + throw new BookieException.MetadataStoreException(ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted exception registering ephemeral Znode for Bookie!", ie); + // Throw an IOException back up. This will cause the Bookie + // constructor to error out. Alternatively, we could do a System + // exit here as this is a fatal error. + throw new BookieException.MetadataStoreException(ie); + } catch (IOException e) { + throw new BookieException.MetadataStoreException(e); + } + } + + private void doRegisterReadOnlyBookie(BookieId bookieId, BookieServiceInfo bookieServiceInfo) + throws BookieException { + try { + if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) { + try { + zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo), + zkAcls, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // this node is just now created by someone. + } + } + String regPath = bookieReadonlyRegistrationPath + "/" + bookieId; + doRegisterBookie(regPath, bookieServiceInfo); + log.info("transitioned to readonly mode, bookieId: {}", bookieId); + // clear the write state + regPath = bookieRegistrationPath + "/" + bookieId; + try { + if (hookOnRegisterReadOnly != null) { + hookOnRegisterReadOnly.apply(null); + } + // Clear the current registered node + + zk.delete(regPath, -1); + log.info("delete the writable bookie registration node {}", regPath); + } catch (KeeperException.NoNodeException nne) { + log.warn("No writable bookie registered node {} when transitioning to readonly", + regPath, nne); + } + } catch (KeeperException | InterruptedException e) { + throw new BookieException.MetadataStoreException(e); + } + } + + @Override + public void unregisterBookie(BookieId bookieId, boolean readOnly) throws BookieException { + String regPath; + if (!readOnly) { + regPath = bookieRegistrationPath + "/" + bookieId; + } else { + regPath = bookieReadonlyRegistrationPath + "/" + bookieId; + } + doUnregisterBookie(regPath); + } + + private void doUnregisterBookie(String regPath) throws BookieException { + try { + zk.delete(regPath, -1); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BookieException.MetadataStoreException(ie); + } catch (KeeperException e) { + throw new BookieException.MetadataStoreException(e); + } + } + + // + // Cookie Management + // + + @Override + public void writeCookie(BookieId bookieId, + Versioned cookieData) throws BookieException { + String zkPath = getCookiePath(bookieId); + try { + if (org.apache.bookkeeper.versioning.Version.NEW == cookieData.getVersion()) { + if (zk.exists(cookiePath, false) == null) { + try { + zk.create(cookiePath, new byte[0], zkAcls, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nne) { + log.info("More than one bookie tried to create {} at once. Safe to ignore.", + cookiePath); + } + } + zk.create(zkPath, cookieData.getValue(), zkAcls, CreateMode.PERSISTENT); + } else { + if (!(cookieData.getVersion() instanceof LongVersion)) { + throw new BookieException.BookieIllegalOpException( + "Invalid version type, expected it to be LongVersion"); + } + zk.setData( + zkPath, + cookieData.getValue(), + (int) ((LongVersion) cookieData.getVersion()).getLongVersion()); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BookieException.MetadataStoreException("Interrupted writing cookie for bookie " + bookieId, ie); + } catch (KeeperException.NoNodeException nne) { + throw new BookieException.CookieNotFoundException(bookieId.toString()); + } catch (KeeperException.NodeExistsException nee) { + throw new BookieException.CookieExistException(bookieId.toString()); + } catch (KeeperException e) { + throw new BookieException.MetadataStoreException("Failed to write cookie for bookie " + bookieId); + } + } + + @Override + public Versioned readCookie(BookieId bookieId) throws BookieException { + String zkPath = getCookiePath(bookieId); + try { + Stat stat = zk.exists(zkPath, false); + byte[] data = zk.getData(zkPath, false, stat); + // sets stat version from ZooKeeper + LongVersion version = new LongVersion(stat.getVersion()); + return new Versioned<>(data, version); + } catch (KeeperException.NoNodeException nne) { + throw new BookieException.CookieNotFoundException(bookieId.toString()); + } catch (KeeperException | InterruptedException e) { + throw new BookieException.MetadataStoreException("Failed to read cookie for bookie " + bookieId); + } + } + + @Override + public void removeCookie(BookieId bookieId, Version version) throws BookieException { + String zkPath = getCookiePath(bookieId); + try { + zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion()); + } catch (KeeperException.NoNodeException e) { + throw new BookieException.CookieNotFoundException(bookieId.toString()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new BookieException.MetadataStoreException("Interrupted deleting cookie for bookie " + bookieId, e); + } catch (KeeperException e) { + throw new BookieException.MetadataStoreException("Failed to delete cookie for bookie " + bookieId); + } + + log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId); + } + + + @Override + public String getClusterInstanceId() throws BookieException { + String instanceId = null; + try { + if (zk.exists(ledgersRootPath, null) == null) { + log.error("BookKeeper metadata doesn't exist in zookeeper. " + + "Has the cluster been initialized? " + + "Try running bin/bookkeeper shell metaformat"); + throw new KeeperException.NoNodeException("BookKeeper metadata"); + } + try { + byte[] data = zk.getData(ledgersRootPath + "/" + + INSTANCEID, false, null); + instanceId = new String(data, UTF_8); + } catch (KeeperException.NoNodeException e) { + log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification"); + } + } catch (KeeperException | InterruptedException e) { + throw new BookieException.MetadataStoreException("Failed to get cluster instance id", e); + } + return instanceId; + } + + @Override + public boolean prepareFormat() throws Exception { + boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); + boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false); + // Create ledgers root node if not exists + if (!ledgerRootExists) { + ZkUtils.createFullPathOptimistic(zk, ledgersRootPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, + CreateMode.PERSISTENT); + } + // create available bookies node if not exists + if (!availableNodeExists) { + zk.create(bookieRegistrationPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT); + } + + // create readonly bookies node if not exists + if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { + zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls, CreateMode.PERSISTENT); + } + + return ledgerRootExists; + } + + @Override + public boolean initNewCluster() throws Exception { + String zkServers = ZKMetadataDriverBase.resolveZkServers(conf); + String instanceIdPath = ledgersRootPath + "/" + INSTANCEID; + log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", zkServers, + ledgersRootPath); + + boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); + + if (ledgerRootExists) { + log.error("Ledger root path: {} already exists", ledgersRootPath); + return false; + } + + List multiOps = Lists.newArrayListWithExpectedSize(4); + + // Create ledgers root node + multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT)); + + // create available bookies node + multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT)); + + // create readonly bookies node + multiOps.add(Op.create( + bookieReadonlyRegistrationPath, + EMPTY_BYTE_ARRAY, + zkAcls, + CreateMode.PERSISTENT)); + + // create INSTANCEID + String instanceId = UUID.randomUUID().toString(); + multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8), + zkAcls, CreateMode.PERSISTENT)); + + // execute the multi ops + zk.multi(multiOps); + + // creates the new layout and stores in zookeeper + AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager); + + log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", zkServers, + ledgersRootPath, instanceId); + return true; + } + + @Override + public boolean nukeExistingCluster() throws Exception { + String zkServers = ZKMetadataDriverBase.resolveZkServers(conf); + log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}", + zkServers, ledgersRootPath); + + boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); + if (!ledgerRootExists) { + log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, " + + "so exiting nuke operation", ledgersRootPath, zkServers); + return true; + } + + boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false); + try (RegistrationClient regClient = new ZKRegistrationClient( + zk, + ledgersRootPath, + null, + false + )) { + if (availableNodeExists) { + Collection rwBookies = FutureUtils + .result(regClient.getWritableBookies(), EXCEPTION_FUNC).getValue(); + if (rwBookies != null && !rwBookies.isEmpty()) { + log.error("Bookies are still up and connected to this cluster, " + + "stop all bookies before nuking the cluster"); + return false; + } + + boolean readonlyNodeExists = null != zk.exists(bookieReadonlyRegistrationPath, false); + if (readonlyNodeExists) { + Collection roBookies = FutureUtils + .result(regClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue(); + if (roBookies != null && !roBookies.isEmpty()) { + log.error("Readonly Bookies are still up and connected to this cluster, " + + "stop all bookies before nuking the cluster"); + return false; + } + } + } + } + + LedgerManagerFactory ledgerManagerFactory = + AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager); + return ledgerManagerFactory.validateAndNukeExistingCluster(conf, layoutManager); + } + + @Override + public boolean format() throws Exception { + // Clear underreplicated ledgers + try { + ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("underreplicated ledgers root path node not exists in zookeeper to delete"); + } + } + + // Clear underreplicatedledger locks + try { + ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/' + + BookKeeperConstants.UNDER_REPLICATION_LOCK); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("underreplicatedledger locks node not exists in zookeeper to delete"); + } + } + + // Clear the cookies + try { + ZKUtil.deleteRecursive(zk, cookiePath); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("cookies node not exists in zookeeper to delete"); + } + } + + // Clear the INSTANCEID + try { + zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, -1); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("INSTANCEID not exists in zookeeper to delete"); + } + } + + // create INSTANCEID + String instanceId = UUID.randomUUID().toString(); + zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, + instanceId.getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT); + + log.info("Successfully formatted BookKeeper metadata"); + return true; + } + + @Override + public boolean isBookieRegistered(BookieId bookieId) throws BookieException { + String regPath = bookieRegistrationPath + "/" + bookieId; + String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + bookieId; + try { + return ((null != zk.exists(regPath, false)) || (null != zk.exists(readonlyRegPath, false))); + } catch (KeeperException e) { + log.error("ZK exception while checking registration ephemeral znodes for BookieId: {}", bookieId, e); + throw new BookieException.MetadataStoreException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("InterruptedException while checking registration ephemeral znodes for BookieId: {}", bookieId, + e); + throw new BookieException.MetadataStoreException(e); + } + } + + @Override + public void addRegistrationListener(RegistrationListener listener) { + listeners.add(listener); + } + + public void betweenRegisterReadOnlyBookie(Function fn) { + hookOnRegisterReadOnly = fn; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/ZKRegistrationClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/ZKRegistrationClientTest.java new file mode 100644 index 00000000000..c2fa6fc0eea --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/ZKRegistrationClientTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.discover; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import lombok.Cleanup; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; +import org.junit.Test; + +public class ZKRegistrationClientTest extends BookKeeperClusterTestCase { + + public ZKRegistrationClientTest() { + super(0); + } + + @Test + public void testNetworkDelayWithBkZkManager() throws Throwable { + final String zksConnectionString = zkUtil.getZooKeeperConnectString(); + final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); + // prepare registration manager + @Cleanup + ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null); + final ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setZkLedgersRootPath(ledgersRoot); + final FaultInjectableZKRegistrationManager rm = + new FaultInjectableZKRegistrationManager(serverConfiguration, zk); + rm.prepareFormat(); + + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + @Cleanup + RegistrationClient rc1 = new ZKRegistrationClient(zk, ledgersRoot, scheduledExecutorService, true); + rc1.watchWritableBookies(bookies -> { + System.out.println("Writable bookies changed" + bookies); + }); + rc1.watchReadOnlyBookies(bookies -> { + System.out.println("ReadOnly bookies changed" + bookies); + }); + @Cleanup + RegistrationClient rc2 = new ZKRegistrationClient(zk, ledgersRoot, scheduledExecutorService, true); + rc2.watchWritableBookies(bookies -> { + System.out.println("Writable bookies changed"); + }); + rc2.watchReadOnlyBookies(bookies -> { + System.out.println("ReadOnly bookies changed"); + }); + + final List addresses = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + addresses.add(BookieId.parse("BOOKIE-" + i)); + } + final Map bookieServiceInfos = new HashMap<>(); + + int port = 223; + for (BookieId address : addresses) { + BookieServiceInfo info = new BookieServiceInfo(); + BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); + endpoint.setAuth(Collections.emptyList()); + endpoint.setExtensions(Collections.emptyList()); + endpoint.setId("id"); + endpoint.setHost("localhost"); + endpoint.setPort(port++); + endpoint.setProtocol("bookie-rpc"); + info.setEndpoints(Arrays.asList(endpoint)); + bookieServiceInfos.put(address, info); + rm.registerBookie(address, false, info); + // write the cookie + rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW)); + } + + // trigger loading the BookieServiceInfo in the local cache + getAndVerifyAllBookies(rc1, addresses); + getAndVerifyAllBookies(rc2, addresses); + + Awaitility.await().untilAsserted(() -> { + for (BookieId address : addresses) { + compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + } + }); + + // verified the init status. + + + // mock network delay + rm.betweenRegisterReadOnlyBookie(__ -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }); + + for (int i = 0; i < addresses.size() / 2; i++) { + final BookieId bkId = addresses.get(i); + // turn some bookies to be read only. + rm.registerBookie(bkId, true, bookieServiceInfos.get(bkId)); + } + + Awaitility.await().untilAsserted(() -> { + for (BookieId address : addresses) { + compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + } + }); + + } + + private static void getAndVerifyAllBookies(RegistrationClient rc, List addresses) + throws InterruptedException, ExecutionException { + Set all = rc.getAllBookies().get().getValue(); + assertEquals(all.size(), addresses.size()); + for (BookieId id : all) { + assertTrue(addresses.contains(id)); + } + for (BookieId id : addresses) { + assertTrue(all.contains(id)); + } + } + + private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) { + assertEquals(a.getProperties(), b.getProperties()); + assertEquals(a.getEndpoints().size(), b.getEndpoints().size()); + for (int i = 0; i < a.getEndpoints().size(); i++) { + BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i); + BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i); + assertEquals(e1.getHost(), e2.getHost()); + assertEquals(e1.getPort(), e2.getPort()); + assertEquals(e1.getId(), e2.getId()); + assertEquals(e1.getProtocol(), e2.getProtocol()); + assertEquals(e1.getExtensions(), e2.getExtensions()); + assertEquals(e1.getAuth(), e2.getAuth()); + } + + } +}