From 1e4db62e02ae39d750d459f2ab154b37f5815c6a Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Mon, 18 May 2026 15:59:50 -0700 Subject: [PATCH] HDDS-12864. All commit semantics in replication writes --- .../server/ratis/ContainerStateMachine.java | 174 ++++++++++++++++-- 1 file changed, 159 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c99b33f8c682..201fed885178 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -44,6 +44,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -455,12 +456,17 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol } final ContainerCommandRequestProto requestProto; - if (logProto.getCmdType() == Type.WriteChunk) { - // combine state machine data - requestProto = ContainerCommandRequestProto.newBuilder(logProto) - .setWriteChunk(WriteChunkRequestProto.newBuilder(logProto.getWriteChunk()) - .setData(stateMachineLogEntry.getStateMachineEntry().getStateMachineData())) - .build(); + if (logProto.getCmdType() == Type.WriteChunk || logProto.getCmdType() == Type.PutBlock) { + // stateMachineData carries a complete ContainerCommandRequestProto; + // parse it directly — no manual reconstruction needed. + // For WriteChunk it includes chunk data; for PutBlock it is the same as logData. + try { + requestProto = ContainerCommandRequestProto.parseFrom( + stateMachineLogEntry.getStateMachineEntry().getStateMachineData()); + } catch (InvalidProtocolBufferException e) { + trx.setException(e); + return trx; + } } else { // request and log are the same when there is no state machine data, requestProto = logProto; @@ -500,6 +506,8 @@ public TransactionContext startTransaction(RaftClientRequest request) final ContainerCommandRequestProto.Builder protoBuilder = ContainerCommandRequestProto.newBuilder(proto) .clearEncodedToken(); boolean blockAlreadyFinalized = false; + // For WriteChunk, holds the full proto (with data) that travels in stateMachineData. + ContainerCommandRequestProto fullProtoWithData = null; if (proto.getCmdType() == Type.PutBlock) { blockAlreadyFinalized = shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID()); } else if (proto.getCmdType() == Type.WriteChunk) { @@ -508,13 +516,15 @@ public TransactionContext startTransaction(RaftClientRequest request) if (!blockAlreadyFinalized) { Preconditions.checkArgument(write.hasData()); Preconditions.checkArgument(!write.getData().isEmpty()); - final WriteChunkRequestProto.Builder commitWriteChunkProto = WriteChunkRequestProto.newBuilder(write) - .clearData(); - protoBuilder.setWriteChunk(commitWriteChunkProto) - .setPipelineID(getGroupId().getUuid().toString()) + // Set metadata fields while the write chunk still contains data, then snapshot. + protoBuilder.setPipelineID(getGroupId().getUuid().toString()) .setTraceID(proto.getTraceID()); - - builder.setStateMachineData(write.getData()); + // fullProtoWithData: token cleared, pipelineID set, chunk data present — travels in AppendEntries. + fullProtoWithData = protoBuilder.build(); + // Strip data for the WAL entry (logProto). + protoBuilder.setWriteChunk(WriteChunkRequestProto.newBuilder(write).clearData()); + // stateMachineData carries the full proto so followers can parse it directly. + builder.setStateMachineData(fullProtoWithData.toByteString()); } } else if (proto.getCmdType() == Type.FinalizeBlock) { containerController.addFinalizedBlock(proto.getContainerID(), @@ -528,7 +538,17 @@ public TransactionContext startTransaction(RaftClientRequest request) return transactionContext; } else { final ContainerCommandRequestProto containerCommandRequestProto = protoBuilder.build(); - TransactionContext txnContext = builder.setStateMachineContext(new Context(proto, containerCommandRequestProto)) + // For WriteChunk: fullProtoWithData carries chunk data so write() can dispatch WRITE_DATA. + // For PutBlock: carry the proto as stateMachineData so Ratis calls write() on all nodes, + // enabling the RocksDB write to happen pre-quorum on every node (no metadata gap on follower crash). + // For all other commands: no state machine data. + final ContainerCommandRequestProto requestProtoForContext = + fullProtoWithData != null ? fullProtoWithData : proto; + if (proto.getCmdType() == Type.PutBlock && !blockAlreadyFinalized) { + builder.setStateMachineData(containerCommandRequestProto.toByteString()); + } + TransactionContext txnContext = builder + .setStateMachineContext(new Context(requestProtoForContext, containerCommandRequestProto)) .setLogData(containerCommandRequestProto.toByteString()) .build(); metrics.recordStartTransactionCompleteNs(Time.monotonicNowNanos() - startTime); @@ -589,7 +609,9 @@ private CompletableFuture writeStateMachineData( Preconditions.checkArgument(!write.getData().isEmpty()); try { if (server.getDivision(getGroupId()).getInfo().isLeader()) { - stateMachineDataCache.put(entryIndex, write.getData()); + // Cache the full proto bytes so read() can return them for slow followers + // without having to reconstruct the proto from raw bytes. + stateMachineDataCache.put(entryIndex, requestProto.toByteString()); } } catch (InterruptedException ioe) { Thread.currentThread().interrupt(); @@ -685,6 +707,115 @@ private void handleCommandResult(ContainerCommandRequestProto requestProto, long } } + /** + * Writes PutBlock metadata to RocksDB inside {@link StateMachine.DataApi#write()}, before Raft quorum. + * This ensures every node that ACKs the log entry has already persisted the block metadata, + * eliminating the gap where a follower crashes after the leader commits but before the follower + * runs {@link #applyTransaction(TransactionContext)}. + * + *

Ordering: waits for all preceding WriteChunk {@code raftFuture}s to complete before + * dispatching so that {@code finishWriteChunks()} inside {@link KeyValueHandler#handlePutBlock} + * always sees fully written chunk files. + * + *

{@link #applyTransaction(TransactionContext)} becomes a no-op for PutBlock because + * {@code persistPutBlock()} detects the already-written BCSID and returns early. + */ + private CompletableFuture writePutBlockData( + ContainerCommandRequestProto requestProto, long entryIndex, long term, + long startTime) { + final WriteFutures previous = writeChunkFutureMap.get(entryIndex); + if (previous != null) { + return previous.getRaftFuture(); + } + try { + if (ratisServer.getServer().getDivision(getGroupId()).getInfo().isLeader()) { + stateMachineDataCache.put(entryIndex, requestProto.toByteString()); + } + } catch (InterruptedException ioe) { + Thread.currentThread().interrupt(); + return completeExceptionally(ioe); + } catch (IOException ioe) { + return completeExceptionally(ioe); + } + + // Snapshot preceding WriteChunk futures so finishWriteChunks() only runs after chunk bytes are on disk. + final SortedMap preceding = writeChunkFutureMap.headMap(entryIndex, false); + final CompletableFuture precedingChunksDone = preceding.isEmpty() + ? CompletableFuture.completedFuture(null) + : CompletableFuture.allOf(preceding.values().stream() + .map(WriteFutures::getRaftFuture) + .toArray(CompletableFuture[]::new)); + + final DispatcherContext context = DispatcherContext + .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA) + .setTerm(term) + .setLogIndex(entryIndex) + .setContainer2BCSIDMap(container2BCSIDMap) + .build(); + + final CompletableFuture raftFuture = new CompletableFuture<>(); + + final CompletableFuture future = containerTaskQueues.submit( + requestProto.getContainerID(), + () -> { + // Single try-finally ensures writeChunkFutureMap is always cleaned up and + // raftFuture is always completed regardless of which code path exits. + try { + try { + precedingChunksDone.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + StorageContainerException sce = new StorageContainerException( + "Interrupted waiting for preceding chunk writes at logIndex " + entryIndex, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); + raftFuture.completeExceptionally(sce); + throw sce; + } catch (ExecutionException e) { + // A preceding write (possibly for a different container) failed; the pipeline + // is already closing. Fail this entry so Ratis does not hang waiting for the ACK. + StorageContainerException sce = new StorageContainerException( + "Preceding write failed at logIndex " + entryIndex + ": " + e.getMessage(), + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); + raftFuture.completeExceptionally(sce); + throw sce; + } + ContainerCommandResponseProto result = dispatchCommand(requestProto, context); + if (result.getResult() != ContainerProtos.Result.SUCCESS + && result.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN + && result.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { + StorageContainerException sce = + new StorageContainerException(result.getMessage(), result.getResult()); + LOG.error("{}: writePutBlockData failed at logIndex {} containerID={}: {} {}", + getGroupId(), entryIndex, requestProto.getContainerID(), + result.getMessage(), result.getResult()); + stateMachineHealthy.set(false); + raftFuture.completeExceptionally(sce); + } else { + raftFuture.complete(result::toByteString); + } + return result; + } catch (Exception e) { + if (!raftFuture.isDone()) { + LOG.error("{}: writePutBlockData failed at logIndex {} containerID={}", + getGroupId(), entryIndex, requestProto.getContainerID(), e); + stateMachineHealthy.set(false); + raftFuture.completeExceptionally(e); + } + throw e; + } finally { + writeChunkFutureMap.remove(entryIndex); + } + }, + executor); + + writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime)); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: writePutBlockData logIndex={} containerID={}", + getGroupId(), entryIndex, requestProto.getContainerID()); + } + return raftFuture; + } + private void validateLongRunningWrite() throws StorageContainerException { // get min valid write chunk operation's future context Map.Entry writeFutureContextEntry = null; @@ -803,6 +934,9 @@ public CompletableFuture write(LogEntryProto entry, TransactionContext case WriteChunk: return writeStateMachineData(requestProto, entry.getIndex(), entry.getTerm(), writeStateMachineStartTime); + case PutBlock: + return writePutBlockData(requestProto, entry.getIndex(), + entry.getTerm(), writeStateMachineStartTime); default: throw new IllegalStateException("Cmd Type:" + cmdType + " should not have state machine data"); @@ -881,7 +1015,13 @@ private ByteString readStateMachineData( "read chunk len=%s does not match chunk expected len=%s for chunk:%s", data.size(), chunkInfo.getLen(), chunkInfo); - return data; + // Return the full proto bytes (not raw chunk bytes) so that followers can + // parse stateMachineData directly via parseFrom() without reconstruction. + final ContainerCommandRequestProto fullProto = ContainerCommandRequestProto.newBuilder(requestProto) + .setWriteChunk(WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk()) + .setData(data)) + .build(); + return fullProto.toByteString(); } /** @@ -939,6 +1079,10 @@ public CompletableFuture read(LogEntryProto entry, TransactionContex final ContainerCommandRequestProto requestProto = context != null ? context.getLogProto() : getContainerCommandRequestProto(getGroupId(), entry.getStateMachineLogEntry().getLogData()); + if (requestProto.getCmdType() == Type.PutBlock) { + // PutBlock proto is self-contained (no external chunk bytes); return it directly. + return CompletableFuture.completedFuture(requestProto.toByteString()); + } if (requestProto.getCmdType() != Type.WriteChunk) { throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() + " cannot have state machine data");