Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -765,17 +765,18 @@ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest re
} catch (GroupMismatchException e) {
return JavaUtils.completeExceptionally(e);
}
return checkLeaderState(request, null);
return checkLeaderState(request, null, null);
}

/**
* @return null if the server is in leader state.
*/
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
private CompletableFuture<RaftClientReply> checkLeaderState(
RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) {
if (!getInfo().isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, exception);
return RetryCacheImpl.failWithReply(reply, entry);
return failWithReply(reply, entry, context);
}
if (!getInfo().isLeaderReady()) {
final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request));
Expand All @@ -784,13 +785,13 @@ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest re
}
final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId());
final RaftClientReply reply = newExceptionReply(request, lnre);
return RetryCacheImpl.failWithReply(reply, entry);
return failWithReply(reply, entry, context);
}

if (!request.isReadOnly() && isSteppingDown()) {
final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down");
final RaftClientReply reply = newExceptionReply(request, lsde);
return RetryCacheImpl.failWithReply(reply, entry);
return failWithReply(reply, entry, context);
}

return null;
Expand All @@ -816,11 +817,44 @@ void assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyEx
getMemberId() + " is not in " + expected + ": current state is " + c), expected);
}

private CompletableFuture<RaftClientReply> getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
return entry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request));
private CompletableFuture<RaftClientReply> getResourceUnavailableReply(String op,
RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) {
final ResourceUnavailableException e = new ResourceUnavailableException(getMemberId()
+ ": Failed to " + op + " for " + request);
cancelTransaction(context, e);
return entry.failWithException(e);
}

private CompletableFuture<RaftClientReply> failWithReply(
RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) {
if (context != null) {
cancelTransaction(context, reply.getException());
}

if (entry == null) {
return CompletableFuture.completedFuture(reply);
}
entry.failWithReply(reply);
return entry.getReplyFuture();
}

/** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context. */
private void cancelTransaction(TransactionContextImpl context, Exception exception) {
if (context == null) {
return;
}

if (exception != null) {
context.setException(exception);
}

try {
context.cancelTransaction();
} catch (IOException ioe) {
LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe);
}
}

/**
* Handle a normal update request from client.
*/
Expand All @@ -833,27 +867,28 @@ private CompletableFuture<RaftClientReply> appendTransaction(

final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null);
if (unsyncedLeaderState == null) {
final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException());
return RetryCacheImpl.failWithReply(reply, cacheEntry);
final NotLeaderException nle = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, nle);
return failWithReply(reply, cacheEntry, context);
}
final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
if (unsyncedPermit == null) {
return getResourceUnavailableReply(request, cacheEntry);
return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context);
}

final LeaderStateImpl leaderState;
final PendingRequest pending;
synchronized (this) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry, context);
if (reply != null) {
return reply;
}

leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState ? unsyncedPermit
: leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
return getResourceUnavailableReply(request, cacheEntry);
return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context);
}

// append the message to its local log
Expand All @@ -863,20 +898,18 @@ private CompletableFuture<RaftClientReply> appendTransaction(
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
final RaftClientReply exceptionReply = newExceptionReply(request, e);
// leader will step down here
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION);
}
return CompletableFuture.completedFuture(exceptionReply);
return failWithReply(exceptionReply, cacheEntry, null);
}

// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
return cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request));
return getResourceUnavailableReply("add a pending write request", request, cacheEntry, context);
}
}
leaderState.notifySenders();
Expand Down Expand Up @@ -1001,19 +1034,23 @@ private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest requ
// return the cached future.
return cacheEntry.getReplyFuture();
}
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
// This request will be added to pending requests later in appendTransaction.
// Any failure in between must invoke cancelTransaction.
final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction(
filterDataStreamRaftClientRequest(request));
if (context.getException() != null) {
final StateMachineException e = new StateMachineException(getMemberId(), context.getException());
final Exception exception = context.getException();
final StateMachineException e = new StateMachineException(getMemberId(), exception);
final RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
return failWithReply(exceptionReply, cacheEntry, context);
}

return appendTransaction(request, context, cacheEntry);
try {
return appendTransaction(request, context, cacheEntry);
} catch (Exception e) {
cancelTransaction(context, e);
throw e;
}
}

private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,4 @@ public synchronized void close() {
cache.invalidateAll();
statistics.set(null);
}

static CompletableFuture<RaftClientReply> failWithReply(
RaftClientReply reply, CacheEntry entry) {
if (entry != null) {
entry.failWithReply(reply);
return entry.getReplyFuture();
} else {
return CompletableFuture.completedFuture(reply);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ public TransactionContext preAppendTransaction() throws IOException {

@Override
public TransactionContext cancelTransaction() throws IOException {
// TODO: This is not called from Raft server / log yet. When an IOException happens, we should
// call this to let the SM know that Transaction cannot be synced
return stateMachine.cancelTransaction(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.fail;

Expand All @@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
}

private static volatile boolean failPreAppend = false;
private static final AtomicInteger numCancelTransaction = new AtomicInteger();

protected static class StateMachineWithException extends
SimpleStateMachine4Testing {
Expand All @@ -72,6 +74,12 @@ public TransactionContext preAppendTransaction(TransactionContext trx)
return trx;
}
}

@Override
public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
numCancelTransaction.incrementAndGet();
return super.cancelTransaction(trx);
}
}

{
Expand Down Expand Up @@ -179,4 +187,31 @@ private void runTestRetryOnExceptionDuringReplication(CLUSTER cluster) throws Ex
failPreAppend = false;
}
}

@Test
public void testNoCancelTransactionOnPreAppendFailure() throws Exception {
runWithNewCluster(3, this::runTestNoCancelTransactionOnPreAppendFailure);
}

private void runTestNoCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
failPreAppend = true;
numCancelTransaction.set(0);
try (final RaftClient client = cluster.createClient(leaderId)) {
try {
client.io().send(new SimpleMessage("cancel-transaction"));
fail("Exception expected");
} catch (StateMachineException e) {
Assertions.assertTrue(e.getCause().getMessage().contains("Fake Exception in preAppend"));
}

JavaUtils.attemptRepeatedly(() -> {
Assertions.assertEquals(0, numCancelTransaction.get(),
() -> "Expected cancelTransaction() not to be called but got " + numCancelTransaction.get());
return null;
}, 10, ONE_SECOND, "wait for cancelTransaction", LOG);
} finally {
failPreAppend = false;
}
}
}
Loading