Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public interface Code {
= new ConcurrentHashMap<>();

/** Put an injection point. */
public static void put(String injectionPoint, Code code) {
public static Code put(String injectionPoint, Code code) {
LOG.debug("put: {}, {}", injectionPoint, code);
INJECTION_POINTS.put(injectionPoint, code);
return INJECTION_POINTS.put(injectionPoint, code);
}

/** Execute the injected code, if there is any. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -215,7 +216,7 @@ private void resetClient(AppendEntriesRequest request, Event event) {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
getClient().resetConnectBackoff();
if (appendLogRequestObserver != null) {
appendLogRequestObserver.stop();
appendLogRequestObserver.stop(event);
appendLogRequestObserver = null;
}
final int errorCount = replyState.process(event);
Expand Down Expand Up @@ -266,16 +267,23 @@ private boolean installSnapshot() {

@Override
public void run() throws IOException {
for(; isRunning(); mayWait()) {
//HB period is expired OR we have messages OR follower is behind with commit index
if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
final boolean installingSnapshot = installSnapshot();
appendLog(installingSnapshot || haveTooManyPendingRequests());
try {
for (; isRunning(); mayWait()) {
//HB period is expired OR we have messages OR follower is behind with commit index
if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
final boolean installingSnapshot = installSnapshot();
appendLog(installingSnapshot || haveTooManyPendingRequests());
}
getLeaderState().checkHealth(getFollower());
}
} finally {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
if (appendLogRequestObserver != null) {
appendLogRequestObserver.onCompleted();
appendLogRequestObserver = null;
}
}
getLeaderState().checkHealth(getFollower());
}

Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted);
}

public long getWaitTimeMs() {
Expand Down Expand Up @@ -366,16 +374,46 @@ void onNext(AppendEntriesRequestProto proto)
while (!stream.isReady() && running) {
sleep(waitForReady, isHeartBeat);
}
if (!running) {
return;
}
stream.onNext(proto);
}

void stop() {
void stop(Event event) {
running = false;
if (event == Event.COMPLETE) {
onCompleted();
} else {
cancelStream("stop due to " + event);
}
}

void onCompleted() {
appendLog.onCompleted();
Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted);
try {
appendLog.onCompleted();
} catch (Exception e) {
LOG.debug("Failed to complete appendLog stream", e);
}
try {
Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted);
} catch (Exception e) {
LOG.debug("Failed to complete heartbeat stream", e);
}
}

void cancelStream(String reason) {
try {
appendLog.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(reason)));
} catch (Exception e) {
LOG.debug("Failed to cancel appendLog stream", e);
}
try {
Optional.ofNullable(heartbeat).ifPresent((hb) ->
hb.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(reason))));
} catch (Exception e) {
LOG.debug("Failed to cancel heartbeat stream", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
Expand All @@ -46,6 +48,9 @@
class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);

public static final String GRPC_SERVER_HANDLE_ERROR =
JavaUtils.getClassSimpleName(GrpcServerProtocolService.class) + ".handleError";

private enum BatchLogKey implements BatchLogger.Key {
COMPLETED_REQUEST,
COMPLETED_REPLY
Expand Down Expand Up @@ -114,7 +119,9 @@ StatusRuntimeException wrapException(Throwable e, REQUEST request) {
private void handleError(Throwable e, REQUEST request) {
GrpcUtil.warn(LOG, () -> getId() + ": Failed " + op + " request " + requestToString(request), e);
if (isClosed.compareAndSet(false, true)) {
previousOnNext.set(null);
responseObserver.onError(wrapException(e, request));
CodeInjectionForTesting.execute(GRPC_SERVER_HANDLE_ERROR, getId(), null, previousOnNext.get());
}
}

Expand Down Expand Up @@ -172,18 +179,22 @@ public void onCompleted() {
BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(),
suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}",
getId(), op, getPreviousRequestString(), suffix));
previousOnNext.set(null);
requestFuture.get().thenAccept(reply -> {
BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
suffix -> LOG.info("{}: Completed {}, lastReply: {} {}",
getId(), op, ProtoUtils.shortDebugString(reply), suffix));
responseObserver.onCompleted();
});
requestFuture.set(null);
}
}
@Override
public void onError(Throwable t) {
GrpcUtil.warn(LOG, () -> getId() + ": "+ op + " onError, lastRequest: " + getPreviousRequestString(), t);
if (isClosed.compareAndSet(false, true)) {
previousOnNext.set(null);
requestFuture.set(null);
Status status = Status.fromThrowable(t);
if (status != null && status.getCode() != Status.Code.CANCELLED) {
responseObserver.onCompleted();
Expand Down
Loading
Loading