diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index ce6f7f3369..c9a83fcbc0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -59,6 +59,7 @@ import org.apache.fluss.rpc.messages.RebalanceResponse; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; @@ -342,8 +343,14 @@ private void initCoordinatorContext() throws Exception { long start4loadTabletServer = System.currentTimeMillis(); Map tabletServerRegistrations = zooKeeperClient.getTabletServers(currentServers); + List skippedNullRegistration = new ArrayList<>(); + List skippedNoEndpoint = new ArrayList<>(); for (int server : currentServers) { TabletServerRegistration registration = tabletServerRegistrations.get(server); + if (registration == null) { + skippedNullRegistration.add(server); + continue; + } ServerInfo serverInfo = new ServerInfo( server, @@ -357,6 +364,7 @@ private void initCoordinatorContext() throws Exception { "Can not find endpoint for listener name {} for tablet server {}", internalListenerName, serverInfo); + skippedNoEndpoint.add(server); continue; } tabletServerInfos.add(serverInfo); @@ -370,8 +378,30 @@ private void initCoordinatorContext() throws Exception { coordinatorContext.setLiveTabletServers(tabletServerInfos); LOG.info( - "Load tablet servers success in {}ms when initializing coordinator context.", - System.currentTimeMillis() - start4loadTabletServer); + "Load tablet servers success in {}ms when initializing coordinator context. " + + "ZK returned {} servers, loaded {} into liveSet, " + + "skipped {} (null registration), skipped {} (no endpoint). " + + "Live server IDs: {}", + System.currentTimeMillis() - start4loadTabletServer, + currentServers.length, + tabletServerInfos.size(), + skippedNullRegistration.size(), + skippedNoEndpoint.size(), + tabletServerInfos.stream() + .map(s -> String.valueOf(s.id())) + .collect(Collectors.joining(","))); + if (!skippedNullRegistration.isEmpty()) { + LOG.warn( + "Skipped {} servers with null ZK registration: {}", + skippedNullRegistration.size(), + skippedNullRegistration); + } + if (!skippedNoEndpoint.isEmpty()) { + LOG.warn( + "Skipped {} servers with no internal endpoint: {}", + skippedNoEndpoint.size(), + skippedNoEndpoint); + } // init tablet server channels coordinatorChannelManager.startup(internalServerNodes); @@ -938,11 +968,25 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( notifyLeaderAndIsrResponseReceivedEvent.getNotifyLeaderAndIsrResultForBuckets(); for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket : notifyLeaderAndIsrResultForBuckets) { - // if the error code is not none, we will consider it as offline if (notifyLeaderAndIsrResultForBucket.failed()) { - offlineReplicas.add( - new TableBucketReplica( - notifyLeaderAndIsrResultForBucket.getTableBucket(), serverId)); + Errors error = notifyLeaderAndIsrResultForBucket.getError().error(); + TableBucket tableBucket = notifyLeaderAndIsrResultForBucket.getTableBucket(); + if (isFatalReplicaError(error)) { + LOG.warn( + "Fatal NotifyLeaderAndIsr error for bucket {} on server {}: {}. " + + "Marking replica offline.", + tableBucket, + serverId, + notifyLeaderAndIsrResultForBucket.getError()); + offlineReplicas.add(new TableBucketReplica(tableBucket, serverId)); + } else { + LOG.warn( + "Transient NotifyLeaderAndIsr error for bucket {} on server {}: {}. " + + "Replica remains online.", + tableBucket, + serverId, + notifyLeaderAndIsrResultForBucket.getError()); + } } } if (!offlineReplicas.isEmpty()) { @@ -951,6 +995,18 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( } } + /** + * Returns true if the error indicates a fatal replica failure (storage corruption, unknown + * internal error) that warrants excluding this replica from future leader elections. All other + * errors are considered transient and should NOT mark the replica offline. + */ + private static boolean isFatalReplicaError(Errors error) { + return error == Errors.STORAGE_EXCEPTION + || error == Errors.LOG_STORAGE_EXCEPTION + || error == Errors.KV_STORAGE_EXCEPTION + || error == Errors.UNKNOWN_SERVER_ERROR; + } + private void onReplicaBecomeOffline(Set offlineReplicas) { LOG.info("The replica {} become offline.", offlineReplicas); for (TableBucketReplica offlineReplica : offlineReplicas) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index c57f1c6fb7..6f54147379 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidColumnProjectionException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -1188,17 +1189,16 @@ private void addFetcherForReplicas( Integer leaderId = replica.getLeaderId(); TableBucket tb = replica.getTableBucket(); LogTablet logTablet = replica.getLogTablet(); - if (leaderId == null) { + if (leaderId == null || leaderId < 0) { result.put( tb, new NotifyLeaderAndIsrResultForBucket( tb, ApiError.fromThrowable( - new StorageException( + new LeaderNotAvailableException( String.format( - "Could not find leader for follower replica %s while make " - + "follower for %s.", - serverId, tb))))); + "No leader available for follower replica %s on server %s.", + tb, serverId))))); } else { bucketAndStatus.put( tb, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 0d304df1e4..3082d0f542 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -42,18 +42,23 @@ import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; +import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.ApiKeys; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; +import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent; +import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.statemachine.BucketState; import org.apache.fluss.server.coordinator.statemachine.ReplicaState; import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.CommitRemoteLogManifestData; +import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; @@ -1039,6 +1044,358 @@ void testDoBucketReassignment() throws Exception { ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupPath(ZkData.ServerIdZNode.path(3)); } + /** + * Verifies that fatal errors (KvStorageException, UNKNOWN_SERVER_ERROR) in NotifyLeaderAndIsr + * responses correctly mark the replica offline, preventing it from being elected as leader. + */ + @Test + void testFatalErrorMarksReplicaOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "fatal_error_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + int follower2 = followers.get(1); + + // follower1 returns KvStorageException — fatal, should be marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .KvStorageException( + "kv storage error")))), + follower1)); + + // follower2 returns UNKNOWN_SERVER_ERROR — fatal, should be marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + new ApiError( + Errors.UNKNOWN_SERVER_ERROR, + "unexpected NPE"))), + follower2)); + + // Both followers should be offline. + retryVerifyContext( + ctx -> { + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should be offline after KvStorageException") + .isFalse(); + assertThat(ctx.isReplicaOnline(follower2, tb)) + .as("follower2 should be offline after UNKNOWN_SERVER_ERROR") + .isFalse(); + }); + + // Leader dies — no viable candidates (both followers offline). + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(leader)); + + // Bucket should remain without a leader (or leader == -1). + retryVerifyContext( + ctx -> { + Optional lai = ctx.getBucketLeaderAndIsr(tb); + assertThat(lai).isPresent(); + assertThat(lai.get().leader()) + .as("No viable candidate — leader should be -1") + .isEqualTo(-1); + }); + } + + /** + * Verifies that transient errors (FencedLeaderEpochException, LeaderNotAvailableException) in + * NotifyLeaderAndIsr responses do NOT mark the replica offline. The replica remains eligible + * for leader election. + * + *

LeaderNotAvailableException specifically validates the ReplicaManager fix: when no leader + * is available for a follower replica, the error is now correctly classified as transient + * (LeaderNotAvailableException) instead of fatal (StorageException). + * + *

Scenario: ISR = {leader, follower1, follower2}. + * + *

    + *
  1. follower1 returns FencedLeaderEpochException (transient) + *
  2. follower2 returns LeaderNotAvailableException (transient) + *
  3. leader dies + *
  4. Either follower1 or follower2 should be elected as new leader + *
+ */ + @Test + void testTransientErrorDoesNotMarkReplicaOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "transient_error_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + int follower2 = followers.get(1); + + // follower1 returns FencedLeaderEpochException — transient, should NOT be offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new FencedLeaderEpochException( + "stale epoch")))), + follower1)); + + // follower2 returns LeaderNotAvailableException — transient, should NOT be offline. + // This also validates the ReplicaManager fix: "no leader" now produces + // LeaderNotAvailableException (transient) instead of StorageException (fatal). + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .LeaderNotAvailableException( + "no leader yet")))), + follower2)); + + // Verify both followers remain online after transient errors. + retryVerifyContext( + ctx -> { + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should remain online after FencedLeaderEpochException") + .isTrue(); + assertThat(ctx.isReplicaOnline(follower2, tb)) + .as("follower2 should remain online after LeaderNotAvailableException") + .isTrue(); + }); + + // Leader dies — both followers are viable candidates. + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(leader)); + + // A new leader should be elected from the followers. + retryVerifyContext( + ctx -> { + Optional lai = ctx.getBucketLeaderAndIsr(tb); + assertThat(lai).isPresent(); + assertThat(lai.get().leader()) + .as("One of the followers should be elected as new leader") + .isIn(follower1, follower2); + }); + } + + /** + * Verifies the combined behavior: a follower with a transient error remains eligible for + * election, while a follower with a fatal error is excluded. + * + *

Scenario: ISR = {leader, follower1, follower2}. + * + *

    + *
  1. follower1 returns FencedLeaderEpochException (transient — remains online) + *
  2. follower2 returns StorageException (fatal — marked offline) + *
  3. leader dies + *
  4. follower1 is the ONLY viable candidate → elected as new leader + *
+ */ + @Test + void testMixedErrorsOnlyFatalMarksOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "mixed_error_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + int follower2 = followers.get(1); + + // follower1: transient error — remains online. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new FencedLeaderEpochException( + "stale epoch")))), + follower1)); + + // follower2: fatal error — marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .StorageException("disk error")))), + follower2)); + + // Wait for follower2 to be offline. + retryVerifyContext( + ctx -> + assertThat(ctx.isReplicaOnline(follower2, tb)) + .as("follower2 should be offline after StorageException") + .isFalse()); + + // Verify follower1 is still online. + retryVerifyContext( + ctx -> + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should remain online after transient error") + .isTrue()); + + // Leader dies — follower1 is the ONLY viable candidate. + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(leader)); + + // follower1 should be elected as new leader. + retryVerifyContext( + ctx -> { + Optional lai = ctx.getBucketLeaderAndIsr(tb); + assertThat(lai).isPresent(); + assertThat(lai.get().leader()) + .as( + "follower1 (server %d) should be elected as new leader " + + "after leader (server %d) died", + follower1, leader) + .isEqualTo(follower1); + }); + } + + /** + * Verifies that when a server with offline-marked replicas dies, the offline markers are + * cleaned up (via removeOfflineBucketInServer in processDeadTabletServer) and the server is + * removed from the live set. + * + *

Note: This test covers the "death" half of the restart cycle. The full restart cycle + * (death → re-registration → replica back online) depends on ZK watcher integration which is + * covered by testRestartTriggerReplicaToOffline. + */ + @Test + void testDeadServerClearsOfflineMarker() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "server_restart_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + + // follower1 gets a fatal error — marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .StorageException("disk error")))), + follower1)); + + // Verify follower1 is offline. + retryVerifyContext( + ctx -> + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should be offline after StorageException") + .isFalse()); + + // Simulate server death: this should clear offline markers for follower1. + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(follower1)); + + // After dead event, follower1 should be removed from live set + // and its offline markers should be cleared. + // Note: we check getOfflineBucketCount() instead of isReplicaOnline() because + // isReplicaOnline() requires the server to be in the live set, which is false + // after death. getOfflineBucketCount() directly verifies removeOfflineBucketInServer + // was called. + retryVerifyContext( + ctx -> { + assertThat(ctx.liveTabletServerSet()) + .as("follower1 should be removed from live set after death") + .doesNotContain(follower1); + assertThat(ctx.getOfflineBucketCount()) + .as( + "offline bucket count should be 0 after server death " + + "clears offline markers") + .isEqualTo(0); + }); + } + private void verifyIsr(TableBucket tb, int expectedLeader, List expectedIsr) throws Exception { LeaderAndIsr leaderAndIsr = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 317dee0a2e..5a85853f54 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -90,6 +90,7 @@ import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.messages.UpdateMetadataResponse; import org.apache.fluss.rpc.protocol.ApiKeys; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.utils.types.Tuple2; @@ -275,7 +276,7 @@ public CompletableFuture notifyLeaderAndIsr( .setTableBucket() .setTableId(pbNotifyLeaderForBucket.getTableBucket().getTableId()) .setBucketId(pbNotifyLeaderForBucket.getTableBucket().getBucketId()); - pbNotifyLeaderRespForBucket.setErrorCode(1); + pbNotifyLeaderRespForBucket.setErrorCode(Errors.STORAGE_EXCEPTION.code()); pbNotifyLeaderRespForBucket.setErrorMessage( "mock notifyLeaderAndIsr fail for test purpose."); bucketsResps.add(pbNotifyLeaderRespForBucket);