From 5cb5b8a623386efca91322808197e78349fdaef8 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Thu, 12 Mar 2026 00:31:37 +0530 Subject: [PATCH 1/6] RATIS-2433. Cancel transaction in case of failure to append --- .../ratis/server/impl/RaftServerImpl.java | 126 ++++++++++++------ .../impl/TransactionContextImpl.java | 2 - .../impl/RaftStateMachineExceptionTests.java | 35 +++++ 3 files changed, 122 insertions(+), 41 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d9dd09d966..a9e7c2b175 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -816,11 +816,29 @@ void assertLifeCycleState(Set expected) throws ServerNotReadyEx getMemberId() + " is not in " + expected + ": current state is " + c), expected); } - private CompletableFuture getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) { - return entry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to acquire a pending write request for " + request)); + /** + * Cancels a given transaction. + * For a provided exception set the exception for the transaction context + * and then cancel the transaction to notify the state machine that the transaction cannot be appended. + * @param context + * @param exception + */ + private void cancelTransaction(TransactionContextImpl context, Exception exception) { + if (context == null) { + return; } + if (exception != null) { + context.setException(exception); + } + + try { + context.cancelTransaction(); + } catch (IOException ioe) { + LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe); + } + } + /** * Handle a normal update request from client. */ @@ -833,52 +851,73 @@ private CompletableFuture appendTransaction( final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null); if (unsyncedLeaderState == null) { - final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException()); + final NotLeaderException nle = generateNotLeaderException(); + final RaftClientReply reply = newExceptionReply(request, nle); + cancelTransaction(context, nle); return RetryCacheImpl.failWithReply(reply, cacheEntry); } final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); if (unsyncedPermit == null) { - return getResourceUnavailableReply(request, cacheEntry); + final ResourceUnavailableException e = new ResourceUnavailableException( + getMemberId() + ": Failed to acquire a pending write request for " + request); + cancelTransaction(context, e); + return cacheEntry.failWithException(e); } - final LeaderStateImpl leaderState; - final PendingRequest pending; + LeaderStateImpl leaderState = null; + PendingRequest pending = null; + CompletableFuture failure = null; + Exception cancelException = null; synchronized (this) { final CompletableFuture reply = checkLeaderState(request, cacheEntry); if (reply != null) { - return reply; + failure = reply; } - leaderState = role.getLeaderStateNonNull(); - final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit - : leaderState.tryAcquirePendingRequest(request.getMessage()); - if (permit == null) { - return getResourceUnavailableReply(request, cacheEntry); - } + if (failure == null) { + leaderState = role.getLeaderStateNonNull(); + final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit + : leaderState.tryAcquirePendingRequest(request.getMessage()); + if (permit == null) { + final ResourceUnavailableException e = new ResourceUnavailableException( + getMemberId() + ": Failed to acquire a pending write request for " + request); + failure = cacheEntry.failWithException(e); + cancelException = e; + } else { + // append the message to its local log + writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); + try { + state.appendLog(context); + } catch (StateMachineException e) { + // the StateMachineException is thrown by the SM in the preAppend stage. + // Return the exception in a RaftClientReply. + final RaftClientReply exceptionReply = newExceptionReply(request, e); + cacheEntry.failWithReply(exceptionReply); + failure = CompletableFuture.completedFuture(exceptionReply); + cancelException = e; + // leader will step down here + if (e.leaderShouldStepDown() && getInfo().isLeader()) { + leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); + } + } - // append the message to its local log - writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); - try { - state.appendLog(context); - } catch (StateMachineException e) { - // the StateMachineException is thrown by the SM in the preAppend stage. - // Return the exception in a RaftClientReply. - RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); - // leader will step down here - if (e.leaderShouldStepDown() && getInfo().isLeader()) { - leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); + if (failure == null) { + // put the request into the pending queue + pending = leaderState.addPendingRequest(permit, request, context); + if (pending == null) { + final ResourceUnavailableException e = new ResourceUnavailableException( + getMemberId() + ": Failed to add a pending write request for " + request); + failure = cacheEntry.failWithException(e); + cancelException = e; + } + } } - return CompletableFuture.completedFuture(exceptionReply); - } - - // put the request into the pending queue - pending = leaderState.addPendingRequest(permit, request, context); - if (pending == null) { - return cacheEntry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to add a pending write request for " + request)); } } + if (failure != null) { + cancelTransaction(context, cancelException); + return failure; + } leaderState.notifySenders(); return pending.getFuture(); } @@ -1001,19 +1040,28 @@ private CompletableFuture writeAsyncImpl(RaftClientRequest requ // return the cached future. return cacheEntry.getReplyFuture(); } - // TODO: this client request will not be added to pending requests until - // later which means that any failure in between will leave partial state in - // the state machine. We should call cancelTransaction() for failed requests + // This request will be added to pending requests later in appendTransaction. + // Any failure in between must invoke cancelTransaction. final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( filterDataStreamRaftClientRequest(request)); if (context.getException() != null) { - final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); + final Exception exception = context.getException(); + final StateMachineException e = new StateMachineException(getMemberId(), exception); + cancelTransaction(context, exception); final RaftClientReply exceptionReply = newExceptionReply(request, e); cacheEntry.failWithReply(exceptionReply); return CompletableFuture.completedFuture(exceptionReply); } - return appendTransaction(request, context, cacheEntry); + try { + return appendTransaction(request, context, cacheEntry); + } catch (IOException ioe) { + cancelTransaction(context, ioe); + throw ioe; + } catch (RuntimeException re) { + cancelTransaction(context, re); + throw re; + } } private CompletableFuture watchAsync(RaftClientRequest request) { diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index d92f3a1c82..8497b12f4d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -191,8 +191,6 @@ public TransactionContext preAppendTransaction() throws IOException { @Override public TransactionContext cancelTransaction() throws IOException { - // TODO: This is not called from Raft server / log yet. When an IOException happens, we should - // call this to let the SM know that Transaction cannot be synced return stateMachine.cancelTransaction(this); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index 3a58f4e7c6..43b6af97d5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.fail; @@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests { + Assertions.assertTrue(numCancelTransaction.get() > 0, + () -> "Expected cancelTransaction() to be called but got " + numCancelTransaction.get()); + return null; + }, 10, ONE_SECOND, "wait for cancelTransaction", LOG); + } finally { + failPreAppend = false; + } + } } From 5b53c35ef68e9484d2d66ef7f26912b0e5ae8303 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Sat, 21 Mar 2026 13:16:09 +0530 Subject: [PATCH 2/6] Fix checkstyle error --- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index a9e7c2b175..f146a60767 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -816,13 +816,7 @@ void assertLifeCycleState(Set expected) throws ServerNotReadyEx getMemberId() + " is not in " + expected + ": current state is " + c), expected); } - /** - * Cancels a given transaction. - * For a provided exception set the exception for the transaction context - * and then cancel the transaction to notify the state machine that the transaction cannot be appended. - * @param context - * @param exception - */ + /** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context.*/ private void cancelTransaction(TransactionContextImpl context, Exception exception) { if (context == null) { return; From ff18bf6542890968c8a2f4e900f2a32b750f0876 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Mon, 6 Apr 2026 19:35:21 +0530 Subject: [PATCH 3/6] Address review comments --- .../ratis/server/impl/RaftServerImpl.java | 117 +++++++++--------- .../ratis/server/impl/RetryCacheImpl.java | 10 -- .../impl/RaftStateMachineExceptionTests.java | 4 +- 3 files changed, 58 insertions(+), 73 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index f146a60767..d96a515745 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -765,17 +765,18 @@ private CompletableFuture checkLeaderState(RaftClientRequest re } catch (GroupMismatchException e) { return JavaUtils.completeExceptionally(e); } - return checkLeaderState(request, null); + return checkLeaderState(request, null, null); } /** * @return null if the server is in leader state. */ - private CompletableFuture checkLeaderState(RaftClientRequest request, CacheEntry entry) { + private CompletableFuture checkLeaderState( + RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) { if (!getInfo().isLeader()) { NotLeaderException exception = generateNotLeaderException(); final RaftClientReply reply = newExceptionReply(request, exception); - return RetryCacheImpl.failWithReply(reply, entry); + return failWithReply(reply, entry, context); } if (!getInfo().isLeaderReady()) { final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request)); @@ -784,13 +785,13 @@ private CompletableFuture checkLeaderState(RaftClientRequest re } final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId()); final RaftClientReply reply = newExceptionReply(request, lnre); - return RetryCacheImpl.failWithReply(reply, entry); + return failWithReply(reply, entry, context); } if (!request.isReadOnly() && isSteppingDown()) { final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down"); final RaftClientReply reply = newExceptionReply(request, lsde); - return RetryCacheImpl.failWithReply(reply, entry); + return failWithReply(reply, entry, context); } return null; @@ -816,7 +817,25 @@ void assertLifeCycleState(Set expected) throws ServerNotReadyEx getMemberId() + " is not in " + expected + ": current state is " + c), expected); } - /** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context.*/ + private CompletableFuture getResourceUnavailableReply(String op, + RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) { + final ResourceUnavailableException e = new ResourceUnavailableException(getMemberId() + + ": Failed to " + op + " for " + request); + cancelTransaction(context, e); + return entry.failWithException(e); + } + + private CompletableFuture failWithReply( + RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) { + cancelTransaction(context, reply.getException()); + if (entry == null) { + return CompletableFuture.completedFuture(reply); + } + entry.failWithReply(reply); + return entry.getReplyFuture(); + } + + /** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context. */ private void cancelTransaction(TransactionContextImpl context, Exception exception) { if (context == null) { return; @@ -847,70 +866,48 @@ private CompletableFuture appendTransaction( if (unsyncedLeaderState == null) { final NotLeaderException nle = generateNotLeaderException(); final RaftClientReply reply = newExceptionReply(request, nle); - cancelTransaction(context, nle); - return RetryCacheImpl.failWithReply(reply, cacheEntry); + return failWithReply(reply, cacheEntry, context); } final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); if (unsyncedPermit == null) { - final ResourceUnavailableException e = new ResourceUnavailableException( - getMemberId() + ": Failed to acquire a pending write request for " + request); - cancelTransaction(context, e); - return cacheEntry.failWithException(e); + return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context); } - LeaderStateImpl leaderState = null; - PendingRequest pending = null; - CompletableFuture failure = null; - Exception cancelException = null; + final LeaderStateImpl leaderState; + final PendingRequest pending; synchronized (this) { - final CompletableFuture reply = checkLeaderState(request, cacheEntry); + final CompletableFuture reply = checkLeaderState(request, cacheEntry, context); if (reply != null) { - failure = reply; + return reply; } - if (failure == null) { - leaderState = role.getLeaderStateNonNull(); - final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit - : leaderState.tryAcquirePendingRequest(request.getMessage()); - if (permit == null) { - final ResourceUnavailableException e = new ResourceUnavailableException( - getMemberId() + ": Failed to acquire a pending write request for " + request); - failure = cacheEntry.failWithException(e); - cancelException = e; - } else { - // append the message to its local log - writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); - try { - state.appendLog(context); - } catch (StateMachineException e) { - // the StateMachineException is thrown by the SM in the preAppend stage. - // Return the exception in a RaftClientReply. - final RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); - failure = CompletableFuture.completedFuture(exceptionReply); - cancelException = e; - // leader will step down here - if (e.leaderShouldStepDown() && getInfo().isLeader()) { - leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); - } - } + leaderState = role.getLeaderStateNonNull(); + final PendingRequests.Permit permit = leaderState == unsyncedLeaderState ? unsyncedPermit + : leaderState.tryAcquirePendingRequest(request.getMessage()); + if (permit == null) { + return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context); + } - if (failure == null) { - // put the request into the pending queue - pending = leaderState.addPendingRequest(permit, request, context); - if (pending == null) { - final ResourceUnavailableException e = new ResourceUnavailableException( - getMemberId() + ": Failed to add a pending write request for " + request); - failure = cacheEntry.failWithException(e); - cancelException = e; - } - } + // append the message to its local log + writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); + try { + state.appendLog(context); + } catch (StateMachineException e) { + // the StateMachineException is thrown by the SM in the preAppend stage. + // Return the exception in a RaftClientReply. + final RaftClientReply exceptionReply = newExceptionReply(request, e); + // leader will step down here + if (e.leaderShouldStepDown() && getInfo().isLeader()) { + leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); } + return failWithReply(exceptionReply, cacheEntry, context); + } + + // put the request into the pending queue + pending = leaderState.addPendingRequest(permit, request, context); + if (pending == null) { + return getResourceUnavailableReply("add a pending write request", request, cacheEntry, context); } - } - if (failure != null) { - cancelTransaction(context, cancelException); - return failure; } leaderState.notifySenders(); return pending.getFuture(); @@ -1041,10 +1038,8 @@ private CompletableFuture writeAsyncImpl(RaftClientRequest requ if (context.getException() != null) { final Exception exception = context.getException(); final StateMachineException e = new StateMachineException(getMemberId(), exception); - cancelTransaction(context, exception); final RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); - return CompletableFuture.completedFuture(exceptionReply); + return failWithReply(exceptionReply, cacheEntry, context); } try { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java index 4da459ae9e..96ad62a531 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java @@ -257,14 +257,4 @@ public synchronized void close() { cache.invalidateAll(); statistics.set(null); } - - static CompletableFuture failWithReply( - RaftClientReply reply, CacheEntry entry) { - if (entry != null) { - entry.failWithReply(reply); - return entry.getReplyFuture(); - } else { - return CompletableFuture.completedFuture(reply); - } - } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index 43b6af97d5..2ca788c792 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -206,8 +206,8 @@ private void runTestCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws } JavaUtils.attemptRepeatedly(() -> { - Assertions.assertTrue(numCancelTransaction.get() > 0, - () -> "Expected cancelTransaction() to be called but got " + numCancelTransaction.get()); + Assertions.assertEquals(1, numCancelTransaction.get(), + () -> "Expected cancelTransaction() to be called exactly once but got " + numCancelTransaction.get()); return null; }, 10, ONE_SECOND, "wait for cancelTransaction", LOG); } finally { From 03cbd6896def8370fc65c7e815f14064e802a2f7 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Wed, 8 Apr 2026 00:06:15 +0530 Subject: [PATCH 4/6] Address review comments --- .../ratis/server/impl/RaftServerImpl.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d96a515745..e4bbf4cccd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -825,9 +825,19 @@ private CompletableFuture getResourceUnavailableReply(String op return entry.failWithException(e); } + /** + * Fail with a reply and cancel the transaction if provided. + * @param reply the reply to fail with + * @param entry the cache entry to fail with, if null return the completed reply + * @param context the transaction context to cancel, if null no transaction is cancelled + * @return the future of the reply + */ private CompletableFuture failWithReply( RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) { - cancelTransaction(context, reply.getException()); + if (context != null) { + cancelTransaction(context, reply.getException()); + } + if (entry == null) { return CompletableFuture.completedFuture(reply); } @@ -900,7 +910,7 @@ private CompletableFuture appendTransaction( if (e.leaderShouldStepDown() && getInfo().isLeader()) { leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); } - return failWithReply(exceptionReply, cacheEntry, context); + return failWithReply(exceptionReply, cacheEntry, null); } // put the request into the pending queue @@ -1044,12 +1054,9 @@ private CompletableFuture writeAsyncImpl(RaftClientRequest requ try { return appendTransaction(request, context, cacheEntry); - } catch (IOException ioe) { - cancelTransaction(context, ioe); - throw ioe; - } catch (RuntimeException re) { - cancelTransaction(context, re); - throw re; + } catch (Exception e) { + cancelTransaction(context, e); + throw e; } } From cea7ddc3f9d278f110525524f414bcc216bf7a7e Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Wed, 8 Apr 2026 00:14:54 +0530 Subject: [PATCH 5/6] Fix checkstyle --- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index e4bbf4cccd..b21f95bc1c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -825,13 +825,6 @@ private CompletableFuture getResourceUnavailableReply(String op return entry.failWithException(e); } - /** - * Fail with a reply and cancel the transaction if provided. - * @param reply the reply to fail with - * @param entry the cache entry to fail with, if null return the completed reply - * @param context the transaction context to cancel, if null no transaction is cancelled - * @return the future of the reply - */ private CompletableFuture failWithReply( RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) { if (context != null) { From 2c332bd540db0dec22ae6ce1aa4b70b72553e458 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Wed, 8 Apr 2026 23:30:34 +0530 Subject: [PATCH 6/6] Do not cancel transaction on state machine exception --- .../server/impl/RaftStateMachineExceptionTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index 2ca788c792..1e46907d10 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -189,11 +189,11 @@ private void runTestRetryOnExceptionDuringReplication(CLUSTER cluster) throws Ex } @Test - public void testCancelTransactionOnPreAppendFailure() throws Exception { - runWithNewCluster(3, this::runTestCancelTransactionOnPreAppendFailure); + public void testNoCancelTransactionOnPreAppendFailure() throws Exception { + runWithNewCluster(3, this::runTestNoCancelTransactionOnPreAppendFailure); } - private void runTestCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws Exception { + private void runTestNoCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); failPreAppend = true; numCancelTransaction.set(0); @@ -206,8 +206,8 @@ private void runTestCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws } JavaUtils.attemptRepeatedly(() -> { - Assertions.assertEquals(1, numCancelTransaction.get(), - () -> "Expected cancelTransaction() to be called exactly once but got " + numCancelTransaction.get()); + Assertions.assertEquals(0, numCancelTransaction.get(), + () -> "Expected cancelTransaction() not to be called but got " + numCancelTransaction.get()); return null; }, 10, ONE_SECOND, "wait for cancelTransaction", LOG); } finally {