-
Notifications
You must be signed in to change notification settings - Fork 605
HDDS-12864. All commit semantics in replication writes #10365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentSkipListMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
|
|
@@ -455,12 +456,17 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol | |
| } | ||
|
|
||
| final ContainerCommandRequestProto requestProto; | ||
| if (logProto.getCmdType() == Type.WriteChunk) { | ||
| // combine state machine data | ||
| requestProto = ContainerCommandRequestProto.newBuilder(logProto) | ||
| .setWriteChunk(WriteChunkRequestProto.newBuilder(logProto.getWriteChunk()) | ||
| .setData(stateMachineLogEntry.getStateMachineEntry().getStateMachineData())) | ||
| .build(); | ||
| if (logProto.getCmdType() == Type.WriteChunk || logProto.getCmdType() == Type.PutBlock) { | ||
| // stateMachineData carries a complete ContainerCommandRequestProto; | ||
| // parse it directly — no manual reconstruction needed. | ||
| // For WriteChunk it includes chunk data; for PutBlock it is the same as logData. | ||
| try { | ||
| requestProto = ContainerCommandRequestProto.parseFrom( | ||
| stateMachineLogEntry.getStateMachineEntry().getStateMachineData()); | ||
| } catch (InvalidProtocolBufferException e) { | ||
| trx.setException(e); | ||
| return trx; | ||
| } | ||
| } else { | ||
| // request and log are the same when there is no state machine data, | ||
| requestProto = logProto; | ||
|
|
@@ -500,6 +506,8 @@ public TransactionContext startTransaction(RaftClientRequest request) | |
| final ContainerCommandRequestProto.Builder protoBuilder = ContainerCommandRequestProto.newBuilder(proto) | ||
| .clearEncodedToken(); | ||
| boolean blockAlreadyFinalized = false; | ||
| // For WriteChunk, holds the full proto (with data) that travels in stateMachineData. | ||
| ContainerCommandRequestProto fullProtoWithData = null; | ||
| if (proto.getCmdType() == Type.PutBlock) { | ||
| blockAlreadyFinalized = shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID()); | ||
| } else if (proto.getCmdType() == Type.WriteChunk) { | ||
|
|
@@ -508,13 +516,15 @@ public TransactionContext startTransaction(RaftClientRequest request) | |
| if (!blockAlreadyFinalized) { | ||
| Preconditions.checkArgument(write.hasData()); | ||
| Preconditions.checkArgument(!write.getData().isEmpty()); | ||
| final WriteChunkRequestProto.Builder commitWriteChunkProto = WriteChunkRequestProto.newBuilder(write) | ||
| .clearData(); | ||
| protoBuilder.setWriteChunk(commitWriteChunkProto) | ||
| .setPipelineID(getGroupId().getUuid().toString()) | ||
| // Set metadata fields while the write chunk still contains data, then snapshot. | ||
| protoBuilder.setPipelineID(getGroupId().getUuid().toString()) | ||
| .setTraceID(proto.getTraceID()); | ||
|
|
||
| builder.setStateMachineData(write.getData()); | ||
| // fullProtoWithData: token cleared, pipelineID set, chunk data present — travels in AppendEntries. | ||
| fullProtoWithData = protoBuilder.build(); | ||
| // Strip data for the WAL entry (logProto). | ||
| protoBuilder.setWriteChunk(WriteChunkRequestProto.newBuilder(write).clearData()); | ||
| // stateMachineData carries the full proto so followers can parse it directly. | ||
| builder.setStateMachineData(fullProtoWithData.toByteString()); | ||
| } | ||
| } else if (proto.getCmdType() == Type.FinalizeBlock) { | ||
| containerController.addFinalizedBlock(proto.getContainerID(), | ||
|
|
@@ -528,7 +538,17 @@ public TransactionContext startTransaction(RaftClientRequest request) | |
| return transactionContext; | ||
| } else { | ||
| final ContainerCommandRequestProto containerCommandRequestProto = protoBuilder.build(); | ||
| TransactionContext txnContext = builder.setStateMachineContext(new Context(proto, containerCommandRequestProto)) | ||
| // For WriteChunk: fullProtoWithData carries chunk data so write() can dispatch WRITE_DATA. | ||
| // For PutBlock: carry the proto as stateMachineData so Ratis calls write() on all nodes, | ||
| // enabling the RocksDB write to happen pre-quorum on every node (no metadata gap on follower crash). | ||
| // For all other commands: no state machine data. | ||
| final ContainerCommandRequestProto requestProtoForContext = | ||
| fullProtoWithData != null ? fullProtoWithData : proto; | ||
| if (proto.getCmdType() == Type.PutBlock && !blockAlreadyFinalized) { | ||
| builder.setStateMachineData(containerCommandRequestProto.toByteString()); | ||
| } | ||
| TransactionContext txnContext = builder | ||
| .setStateMachineContext(new Context(requestProtoForContext, containerCommandRequestProto)) | ||
| .setLogData(containerCommandRequestProto.toByteString()) | ||
| .build(); | ||
| metrics.recordStartTransactionCompleteNs(Time.monotonicNowNanos() - startTime); | ||
|
|
@@ -589,7 +609,9 @@ private CompletableFuture<Message> writeStateMachineData( | |
| Preconditions.checkArgument(!write.getData().isEmpty()); | ||
| try { | ||
| if (server.getDivision(getGroupId()).getInfo().isLeader()) { | ||
| stateMachineDataCache.put(entryIndex, write.getData()); | ||
| // Cache the full proto bytes so read() can return them for slow followers | ||
| // without having to reconstruct the proto from raw bytes. | ||
| stateMachineDataCache.put(entryIndex, requestProto.toByteString()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how cache would be cleaned in case if write to disk fails? How long data would be stored in cache and how it would be evicted over the time? |
||
| } | ||
| } catch (InterruptedException ioe) { | ||
| Thread.currentThread().interrupt(); | ||
|
|
@@ -685,6 +707,115 @@ private void handleCommandResult(ContainerCommandRequestProto requestProto, long | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Writes PutBlock metadata to RocksDB inside {@link StateMachine.DataApi#write()}, before Raft quorum. | ||
| * This ensures every node that ACKs the log entry has already persisted the block metadata, | ||
| * eliminating the gap where a follower crashes after the leader commits but before the follower | ||
| * runs {@link #applyTransaction(TransactionContext)}. | ||
| * | ||
| * <p>Ordering: waits for all preceding WriteChunk {@code raftFuture}s to complete before | ||
| * dispatching so that {@code finishWriteChunks()} inside {@link KeyValueHandler#handlePutBlock} | ||
| * always sees fully written chunk files. | ||
| * | ||
| * <p>{@link #applyTransaction(TransactionContext)} becomes a no-op for PutBlock because | ||
| * {@code persistPutBlock()} detects the already-written BCSID and returns early. | ||
| */ | ||
| private CompletableFuture<Message> writePutBlockData( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be refactored to smaller functions to improve readability |
||
| ContainerCommandRequestProto requestProto, long entryIndex, long term, | ||
| long startTime) { | ||
| final WriteFutures previous = writeChunkFutureMap.get(entryIndex); | ||
| if (previous != null) { | ||
| return previous.getRaftFuture(); | ||
| } | ||
| try { | ||
| if (ratisServer.getServer().getDivision(getGroupId()).getInfo().isLeader()) { | ||
| stateMachineDataCache.put(entryIndex, requestProto.toByteString()); | ||
| } | ||
| } catch (InterruptedException ioe) { | ||
| Thread.currentThread().interrupt(); | ||
| return completeExceptionally(ioe); | ||
| } catch (IOException ioe) { | ||
| return completeExceptionally(ioe); | ||
| } | ||
|
|
||
| // Snapshot preceding WriteChunk futures so finishWriteChunks() only runs after chunk bytes are on disk. | ||
| final SortedMap<Long, WriteFutures> preceding = writeChunkFutureMap.headMap(entryIndex, false); | ||
| final CompletableFuture<Void> precedingChunksDone = preceding.isEmpty() | ||
| ? CompletableFuture.completedFuture(null) | ||
| : CompletableFuture.allOf(preceding.values().stream() | ||
| .map(WriteFutures::getRaftFuture) | ||
| .toArray(CompletableFuture[]::new)); | ||
|
|
||
| final DispatcherContext context = DispatcherContext | ||
| .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA) | ||
| .setTerm(term) | ||
| .setLogIndex(entryIndex) | ||
| .setContainer2BCSIDMap(container2BCSIDMap) | ||
| .build(); | ||
|
|
||
| final CompletableFuture<Message> raftFuture = new CompletableFuture<>(); | ||
|
|
||
| final CompletableFuture<ContainerCommandResponseProto> future = containerTaskQueues.submit( | ||
| requestProto.getContainerID(), | ||
| () -> { | ||
| // Single try-finally ensures writeChunkFutureMap is always cleaned up and | ||
| // raftFuture is always completed regardless of which code path exits. | ||
| try { | ||
| try { | ||
| precedingChunksDone.get(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| StorageContainerException sce = new StorageContainerException( | ||
| "Interrupted waiting for preceding chunk writes at logIndex " + entryIndex, | ||
| ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); | ||
| raftFuture.completeExceptionally(sce); | ||
| throw sce; | ||
| } catch (ExecutionException e) { | ||
| // A preceding write (possibly for a different container) failed; the pipeline | ||
| // is already closing. Fail this entry so Ratis does not hang waiting for the ACK. | ||
| StorageContainerException sce = new StorageContainerException( | ||
| "Preceding write failed at logIndex " + entryIndex + ": " + e.getMessage(), | ||
| ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); | ||
| raftFuture.completeExceptionally(sce); | ||
| throw sce; | ||
| } | ||
| ContainerCommandResponseProto result = dispatchCommand(requestProto, context); | ||
| if (result.getResult() != ContainerProtos.Result.SUCCESS | ||
| && result.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN | ||
| && result.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { | ||
| StorageContainerException sce = | ||
| new StorageContainerException(result.getMessage(), result.getResult()); | ||
| LOG.error("{}: writePutBlockData failed at logIndex {} containerID={}: {} {}", | ||
| getGroupId(), entryIndex, requestProto.getContainerID(), | ||
| result.getMessage(), result.getResult()); | ||
| stateMachineHealthy.set(false); | ||
| raftFuture.completeExceptionally(sce); | ||
| } else { | ||
| raftFuture.complete(result::toByteString); | ||
| } | ||
| return result; | ||
| } catch (Exception e) { | ||
| if (!raftFuture.isDone()) { | ||
| LOG.error("{}: writePutBlockData failed at logIndex {} containerID={}", | ||
| getGroupId(), entryIndex, requestProto.getContainerID(), e); | ||
| stateMachineHealthy.set(false); | ||
| raftFuture.completeExceptionally(e); | ||
| } | ||
| throw e; | ||
| } finally { | ||
| writeChunkFutureMap.remove(entryIndex); | ||
| } | ||
| }, | ||
| executor); | ||
|
|
||
| writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime)); | ||
| if (LOG.isDebugEnabled()) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are no expensive calls or string formatting for LOG.debug() call, so it would check the log level inside and shortcircuit the call. |
||
| LOG.debug("{}: writePutBlockData logIndex={} containerID={}", | ||
| getGroupId(), entryIndex, requestProto.getContainerID()); | ||
| } | ||
| return raftFuture; | ||
| } | ||
|
|
||
| private void validateLongRunningWrite() throws StorageContainerException { | ||
| // get min valid write chunk operation's future context | ||
| Map.Entry<Long, WriteFutures> writeFutureContextEntry = null; | ||
|
|
@@ -803,6 +934,9 @@ public CompletableFuture<Message> write(LogEntryProto entry, TransactionContext | |
| case WriteChunk: | ||
| return writeStateMachineData(requestProto, entry.getIndex(), | ||
| entry.getTerm(), writeStateMachineStartTime); | ||
| case PutBlock: | ||
| return writePutBlockData(requestProto, entry.getIndex(), | ||
| entry.getTerm(), writeStateMachineStartTime); | ||
|
Comment on lines
+937
to
+939
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change here is to write PutBlock to RocksDb when writing the RaftLog entry (via DataApi.write(..)). If the RaftLog entry is truncated, what will happen to the RocksDb record? |
||
| default: | ||
| throw new IllegalStateException("Cmd Type:" + cmdType | ||
| + " should not have state machine data"); | ||
|
|
@@ -881,7 +1015,13 @@ private ByteString readStateMachineData( | |
| "read chunk len=%s does not match chunk expected len=%s for chunk:%s", | ||
| data.size(), chunkInfo.getLen(), chunkInfo); | ||
|
|
||
| return data; | ||
| // Return the full proto bytes (not raw chunk bytes) so that followers can | ||
| // parse stateMachineData directly via parseFrom() without reconstruction. | ||
| final ContainerCommandRequestProto fullProto = ContainerCommandRequestProto.newBuilder(requestProto) | ||
| .setWriteChunk(WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk()) | ||
| .setData(data)) | ||
| .build(); | ||
| return fullProto.toByteString(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -939,6 +1079,10 @@ public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContex | |
| final ContainerCommandRequestProto requestProto = context != null ? context.getLogProto() | ||
| : getContainerCommandRequestProto(getGroupId(), entry.getStateMachineLogEntry().getLogData()); | ||
|
|
||
| if (requestProto.getCmdType() == Type.PutBlock) { | ||
| // PutBlock proto is self-contained (no external chunk bytes); return it directly. | ||
| return CompletableFuture.completedFuture(requestProto.toByteString()); | ||
| } | ||
| if (requestProto.getCmdType() != Type.WriteChunk) { | ||
| throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() | ||
| + " cannot have state machine data"); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems an incompatible change since it changes the format of the stateMachineData. Then, an old datanode will not work with a new datanode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean a case like rolling upgrade when different versions of datanodes would be present at the same time? Let me think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we must rolling upgrade for datanodes.