diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d9dd09d966..b21f95bc1c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -765,17 +765,18 @@ private CompletableFuture checkLeaderState(RaftClientRequest re } catch (GroupMismatchException e) { return JavaUtils.completeExceptionally(e); } - return checkLeaderState(request, null); + return checkLeaderState(request, null, null); } /** * @return null if the server is in leader state. */ - private CompletableFuture checkLeaderState(RaftClientRequest request, CacheEntry entry) { + private CompletableFuture checkLeaderState( + RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) { if (!getInfo().isLeader()) { NotLeaderException exception = generateNotLeaderException(); final RaftClientReply reply = newExceptionReply(request, exception); - return RetryCacheImpl.failWithReply(reply, entry); + return failWithReply(reply, entry, context); } if (!getInfo().isLeaderReady()) { final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request)); @@ -784,13 +785,13 @@ private CompletableFuture checkLeaderState(RaftClientRequest re } final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId()); final RaftClientReply reply = newExceptionReply(request, lnre); - return RetryCacheImpl.failWithReply(reply, entry); + return failWithReply(reply, entry, context); } if (!request.isReadOnly() && isSteppingDown()) { final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down"); final RaftClientReply reply = newExceptionReply(request, lsde); - return RetryCacheImpl.failWithReply(reply, entry); + return failWithReply(reply, entry, context); } return null; @@ -816,11 +817,44 @@ void assertLifeCycleState(Set expected) throws ServerNotReadyEx getMemberId() + " is not in " + expected + ": current state is " + c), expected); } - private CompletableFuture getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) { - return entry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to acquire a pending write request for " + request)); + private CompletableFuture getResourceUnavailableReply(String op, + RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) { + final ResourceUnavailableException e = new ResourceUnavailableException(getMemberId() + + ": Failed to " + op + " for " + request); + cancelTransaction(context, e); + return entry.failWithException(e); + } + + private CompletableFuture failWithReply( + RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) { + if (context != null) { + cancelTransaction(context, reply.getException()); + } + + if (entry == null) { + return CompletableFuture.completedFuture(reply); + } + entry.failWithReply(reply); + return entry.getReplyFuture(); + } + + /** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context. */ + private void cancelTransaction(TransactionContextImpl context, Exception exception) { + if (context == null) { + return; } + if (exception != null) { + context.setException(exception); + } + + try { + context.cancelTransaction(); + } catch (IOException ioe) { + LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe); + } + } + /** * Handle a normal update request from client. */ @@ -833,27 +867,28 @@ private CompletableFuture appendTransaction( final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null); if (unsyncedLeaderState == null) { - final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException()); - return RetryCacheImpl.failWithReply(reply, cacheEntry); + final NotLeaderException nle = generateNotLeaderException(); + final RaftClientReply reply = newExceptionReply(request, nle); + return failWithReply(reply, cacheEntry, context); } final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage()); if (unsyncedPermit == null) { - return getResourceUnavailableReply(request, cacheEntry); + return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context); } final LeaderStateImpl leaderState; final PendingRequest pending; synchronized (this) { - final CompletableFuture reply = checkLeaderState(request, cacheEntry); + final CompletableFuture reply = checkLeaderState(request, cacheEntry, context); if (reply != null) { return reply; } leaderState = role.getLeaderStateNonNull(); - final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit + final PendingRequests.Permit permit = leaderState == unsyncedLeaderState ? unsyncedPermit : leaderState.tryAcquirePendingRequest(request.getMessage()); if (permit == null) { - return getResourceUnavailableReply(request, cacheEntry); + return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context); } // append the message to its local log @@ -863,20 +898,18 @@ private CompletableFuture appendTransaction( } catch (StateMachineException e) { // the StateMachineException is thrown by the SM in the preAppend stage. // Return the exception in a RaftClientReply. - RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); + final RaftClientReply exceptionReply = newExceptionReply(request, e); // leader will step down here if (e.leaderShouldStepDown() && getInfo().isLeader()) { leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION); } - return CompletableFuture.completedFuture(exceptionReply); + return failWithReply(exceptionReply, cacheEntry, null); } // put the request into the pending queue pending = leaderState.addPendingRequest(permit, request, context); if (pending == null) { - return cacheEntry.failWithException(new ResourceUnavailableException( - getMemberId() + ": Failed to add a pending write request for " + request)); + return getResourceUnavailableReply("add a pending write request", request, cacheEntry, context); } } leaderState.notifySenders(); @@ -1001,19 +1034,23 @@ private CompletableFuture writeAsyncImpl(RaftClientRequest requ // return the cached future. return cacheEntry.getReplyFuture(); } - // TODO: this client request will not be added to pending requests until - // later which means that any failure in between will leave partial state in - // the state machine. We should call cancelTransaction() for failed requests + // This request will be added to pending requests later in appendTransaction. + // Any failure in between must invoke cancelTransaction. final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( filterDataStreamRaftClientRequest(request)); if (context.getException() != null) { - final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); + final Exception exception = context.getException(); + final StateMachineException e = new StateMachineException(getMemberId(), exception); final RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); - return CompletableFuture.completedFuture(exceptionReply); + return failWithReply(exceptionReply, cacheEntry, context); } - return appendTransaction(request, context, cacheEntry); + try { + return appendTransaction(request, context, cacheEntry); + } catch (Exception e) { + cancelTransaction(context, e); + throw e; + } } private CompletableFuture watchAsync(RaftClientRequest request) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java index 4da459ae9e..96ad62a531 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java @@ -257,14 +257,4 @@ public synchronized void close() { cache.invalidateAll(); statistics.set(null); } - - static CompletableFuture failWithReply( - RaftClientReply reply, CacheEntry entry) { - if (entry != null) { - entry.failWithReply(reply); - return entry.getReplyFuture(); - } else { - return CompletableFuture.completedFuture(reply); - } - } } diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index d92f3a1c82..8497b12f4d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -191,8 +191,6 @@ public TransactionContext preAppendTransaction() throws IOException { @Override public TransactionContext cancelTransaction() throws IOException { - // TODO: This is not called from Raft server / log yet. When an IOException happens, we should - // call this to let the SM know that Transaction cannot be synced return stateMachine.cancelTransaction(this); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index 3a58f4e7c6..1e46907d10 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.fail; @@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests { + Assertions.assertEquals(0, numCancelTransaction.get(), + () -> "Expected cancelTransaction() not to be called but got " + numCancelTransaction.get()); + return null; + }, 10, ONE_SECOND, "wait for cancelTransaction", LOG); + } finally { + failPreAppend = false; + } + } }