Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ public void start() {
@Override
public boolean isRunning() {
return daemon.isWorking()
&& server.getInfo().isAlive()
&& isLeaderAlive();
}

private boolean isLeaderAlive() {
return server.getInfo().isAlive()
&& server.getInfo().isLeader()
&& getRaftLog().isOpened();
}
Expand All @@ -136,8 +140,12 @@ public CompletableFuture<LifeCycle.State> stopAsync() {
}

void restart() {
if (!isRunning()) {
LOG.warn("{} is not running: skipping restart", this);
if (daemon.isClosingOrClosed()) {
LOG.warn("{}: daemon is closing or closed, skipping restart", this);
return;
}
if (!isLeaderAlive()) {
LOG.warn("{}: leader is not ready, skipping restart", this);
return;
}
getLeaderState().restart(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public boolean isWorking() {
return !LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
}

public boolean isClosingOrClosed() {
return LifeCycle.States.CLOSING_OR_CLOSED.contains(lifeCycle.getCurrentState());
}

public void tryToStart() {
if (lifeCycle.compareAndTransition(NEW, STARTING)) {
daemon.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public void testFollowerHeartbeatMetric() throws IOException, InterruptedExcepti
assertTrue(t.getTimer().getCount() > 0L);
}
}
cluster.shutdown();
}

void runTest(CLUSTER cluster) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.ratis.grpc;

import org.apache.ratis.LogAppenderTests;
import org.apache.ratis.grpc.server.GrpcServicesImpl;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
Expand All @@ -29,11 +32,14 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
Expand All @@ -42,7 +48,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.ratis.RaftTestUtil.waitForLeader;

Expand Down Expand Up @@ -148,4 +157,58 @@ private void runTestRestartLogAppender(MiniRaftClusterWithGrpc cluster) throws E
Assertions.assertTrue(newleaderMetrics.getRegistry().counter(counter).getCount() >= 1L);
}
}

@Test
public void testLogAppenderAutoRestartOnException() throws Exception {
runWithNewCluster(3, this::runTestAutoRestartOnException);
}

private void runTestAutoRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exception {
final RaftServer.Division leader = waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();

try (RaftClient client = cluster.createClient(leaderId)) {
for (int i = 0; i < 5; i++) {
Assertions.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("init-" + i)).isSuccess());
}
}

final Set<LogAppender> before = RaftServerTestUtil.getLogAppenders(leader).collect(Collectors.toSet());
Assertions.assertEquals(2, before.size());

// Inject a one-time IllegalStateException into the leader's AppendEntries send path.
// This causes the LogAppenderDaemon to enter EXCEPTION state and call restart().
final RaftGroupId groupId = cluster.getGroupId();
final AtomicInteger failCount = new AtomicInteger(0);
try {
CodeInjectionForTesting.put(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST, (localId, remoteId, args) -> {
if (leaderId.equals(localId)
&& args.length > 0 && args[0] instanceof RaftProtos.AppendEntriesRequestProto) {
final RaftProtos.AppendEntriesRequestProto proto = (RaftProtos.AppendEntriesRequestProto) args[0];
if (RaftGroupId.valueOf(proto.getServerRequest().getRaftGroupId().getId()).equals(groupId)
&& failCount.getAndIncrement() < 1) {
throw new IllegalStateException("Injected failure for restart test");
}
}
return false;
});

JavaUtils.attempt(() -> {
final Set<LogAppender> current = RaftServerTestUtil.getLogAppenders(leader)
.collect(Collectors.toSet());
Assertions.assertEquals(2, current.size());
Assertions.assertTrue(current.stream().anyMatch(a -> !before.contains(a)),
"Expected at least one new LogAppender instance after daemon exception restart");
}, 30, ONE_SECOND, "LogAppender auto-restart after exception", LOG);
} finally {
CodeInjectionForTesting.remove(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST);
}

try (RaftClient client = cluster.createClient(leaderId)) {
for (int i = 0; i < 5; i++) {
Assertions.assertTrue(
client.io().send(new RaftTestUtil.SimpleMessage("after-restart-" + i)).isSuccess());
}
}
}
}
Loading