Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -455,12 +456,17 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol
}

final ContainerCommandRequestProto requestProto;
if (logProto.getCmdType() == Type.WriteChunk) {
// combine state machine data
requestProto = ContainerCommandRequestProto.newBuilder(logProto)
.setWriteChunk(WriteChunkRequestProto.newBuilder(logProto.getWriteChunk())
.setData(stateMachineLogEntry.getStateMachineEntry().getStateMachineData()))
.build();
if (logProto.getCmdType() == Type.WriteChunk || logProto.getCmdType() == Type.PutBlock) {
// stateMachineData carries a complete ContainerCommandRequestProto;
// parse it directly — no manual reconstruction needed.
// For WriteChunk it includes chunk data; for PutBlock it is the same as logData.
try {
requestProto = ContainerCommandRequestProto.parseFrom(
stateMachineLogEntry.getStateMachineEntry().getStateMachineData());
} catch (InvalidProtocolBufferException e) {
trx.setException(e);
return trx;
}
} else {
// request and log are the same when there is no state machine data,
requestProto = logProto;
Expand Down Expand Up @@ -500,6 +506,8 @@ public TransactionContext startTransaction(RaftClientRequest request)
final ContainerCommandRequestProto.Builder protoBuilder = ContainerCommandRequestProto.newBuilder(proto)
.clearEncodedToken();
boolean blockAlreadyFinalized = false;
// For WriteChunk, holds the full proto (with data) that travels in stateMachineData.
ContainerCommandRequestProto fullProtoWithData = null;
if (proto.getCmdType() == Type.PutBlock) {
blockAlreadyFinalized = shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID());
} else if (proto.getCmdType() == Type.WriteChunk) {
Expand All @@ -508,13 +516,15 @@ public TransactionContext startTransaction(RaftClientRequest request)
if (!blockAlreadyFinalized) {
Preconditions.checkArgument(write.hasData());
Preconditions.checkArgument(!write.getData().isEmpty());
final WriteChunkRequestProto.Builder commitWriteChunkProto = WriteChunkRequestProto.newBuilder(write)
.clearData();
protoBuilder.setWriteChunk(commitWriteChunkProto)
.setPipelineID(getGroupId().getUuid().toString())
// Set metadata fields while the write chunk still contains data, then snapshot.
protoBuilder.setPipelineID(getGroupId().getUuid().toString())
.setTraceID(proto.getTraceID());

builder.setStateMachineData(write.getData());
// fullProtoWithData: token cleared, pipelineID set, chunk data present — travels in AppendEntries.
fullProtoWithData = protoBuilder.build();
// Strip data for the WAL entry (logProto).
protoBuilder.setWriteChunk(WriteChunkRequestProto.newBuilder(write).clearData());
// stateMachineData carries the full proto so followers can parse it directly.
builder.setStateMachineData(fullProtoWithData.toByteString());
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems an incompatible change since it changes the format of the stateMachineData. Then, an old datanode will not work with a new datanode.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean a case like rolling upgrade when different versions of datanodes would be present at the same time? Let me think about it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we must rolling upgrade for datanodes.

}
} else if (proto.getCmdType() == Type.FinalizeBlock) {
containerController.addFinalizedBlock(proto.getContainerID(),
Expand All @@ -528,7 +538,17 @@ public TransactionContext startTransaction(RaftClientRequest request)
return transactionContext;
} else {
final ContainerCommandRequestProto containerCommandRequestProto = protoBuilder.build();
TransactionContext txnContext = builder.setStateMachineContext(new Context(proto, containerCommandRequestProto))
// For WriteChunk: fullProtoWithData carries chunk data so write() can dispatch WRITE_DATA.
// For PutBlock: carry the proto as stateMachineData so Ratis calls write() on all nodes,
// enabling the RocksDB write to happen pre-quorum on every node (no metadata gap on follower crash).
// For all other commands: no state machine data.
final ContainerCommandRequestProto requestProtoForContext =
fullProtoWithData != null ? fullProtoWithData : proto;
if (proto.getCmdType() == Type.PutBlock && !blockAlreadyFinalized) {
builder.setStateMachineData(containerCommandRequestProto.toByteString());
}
TransactionContext txnContext = builder
.setStateMachineContext(new Context(requestProtoForContext, containerCommandRequestProto))
.setLogData(containerCommandRequestProto.toByteString())
.build();
metrics.recordStartTransactionCompleteNs(Time.monotonicNowNanos() - startTime);
Expand Down Expand Up @@ -589,7 +609,9 @@ private CompletableFuture<Message> writeStateMachineData(
Preconditions.checkArgument(!write.getData().isEmpty());
try {
if (server.getDivision(getGroupId()).getInfo().isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
// Cache the full proto bytes so read() can return them for slow followers
// without having to reconstruct the proto from raw bytes.
stateMachineDataCache.put(entryIndex, requestProto.toByteString());
Copy link
Copy Markdown

@yandrey321 yandrey321 May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how cache would be cleaned in case if write to disk fails?

How long data would be stored in cache and how it would be evicted over the time?

}
} catch (InterruptedException ioe) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -685,6 +707,115 @@ private void handleCommandResult(ContainerCommandRequestProto requestProto, long
}
}

/**
* Writes PutBlock metadata to RocksDB inside {@link StateMachine.DataApi#write()}, before Raft quorum.
* This ensures every node that ACKs the log entry has already persisted the block metadata,
* eliminating the gap where a follower crashes after the leader commits but before the follower
* runs {@link #applyTransaction(TransactionContext)}.
*
* <p>Ordering: waits for all preceding WriteChunk {@code raftFuture}s to complete before
* dispatching so that {@code finishWriteChunks()} inside {@link KeyValueHandler#handlePutBlock}
* always sees fully written chunk files.
*
* <p>{@link #applyTransaction(TransactionContext)} becomes a no-op for PutBlock because
* {@code persistPutBlock()} detects the already-written BCSID and returns early.
*/
private CompletableFuture<Message> writePutBlockData(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be refactored to smaller functions to improve readability

ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
final WriteFutures previous = writeChunkFutureMap.get(entryIndex);
if (previous != null) {
return previous.getRaftFuture();
}
try {
if (ratisServer.getServer().getDivision(getGroupId()).getInfo().isLeader()) {
stateMachineDataCache.put(entryIndex, requestProto.toByteString());
}
} catch (InterruptedException ioe) {
Thread.currentThread().interrupt();
return completeExceptionally(ioe);
} catch (IOException ioe) {
return completeExceptionally(ioe);
}

// Snapshot preceding WriteChunk futures so finishWriteChunks() only runs after chunk bytes are on disk.
final SortedMap<Long, WriteFutures> preceding = writeChunkFutureMap.headMap(entryIndex, false);
final CompletableFuture<Void> precedingChunksDone = preceding.isEmpty()
? CompletableFuture.completedFuture(null)
: CompletableFuture.allOf(preceding.values().stream()
.map(WriteFutures::getRaftFuture)
.toArray(CompletableFuture[]::new));

final DispatcherContext context = DispatcherContext
.newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
.setTerm(term)
.setLogIndex(entryIndex)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();

final CompletableFuture<Message> raftFuture = new CompletableFuture<>();

final CompletableFuture<ContainerCommandResponseProto> future = containerTaskQueues.submit(
requestProto.getContainerID(),
() -> {
// Single try-finally ensures writeChunkFutureMap is always cleaned up and
// raftFuture is always completed regardless of which code path exits.
try {
try {
precedingChunksDone.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
StorageContainerException sce = new StorageContainerException(
"Interrupted waiting for preceding chunk writes at logIndex " + entryIndex,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
raftFuture.completeExceptionally(sce);
throw sce;
} catch (ExecutionException e) {
// A preceding write (possibly for a different container) failed; the pipeline
// is already closing. Fail this entry so Ratis does not hang waiting for the ACK.
StorageContainerException sce = new StorageContainerException(
"Preceding write failed at logIndex " + entryIndex + ": " + e.getMessage(),
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
raftFuture.completeExceptionally(sce);
throw sce;
}
ContainerCommandResponseProto result = dispatchCommand(requestProto, context);
if (result.getResult() != ContainerProtos.Result.SUCCESS
&& result.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& result.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
StorageContainerException sce =
new StorageContainerException(result.getMessage(), result.getResult());
LOG.error("{}: writePutBlockData failed at logIndex {} containerID={}: {} {}",
getGroupId(), entryIndex, requestProto.getContainerID(),
result.getMessage(), result.getResult());
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(sce);
} else {
raftFuture.complete(result::toByteString);
}
return result;
} catch (Exception e) {
if (!raftFuture.isDone()) {
LOG.error("{}: writePutBlockData failed at logIndex {} containerID={}",
getGroupId(), entryIndex, requestProto.getContainerID(), e);
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
}
throw e;
} finally {
writeChunkFutureMap.remove(entryIndex);
}
},
executor);

writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime));
if (LOG.isDebugEnabled()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are no expensive calls or string formatting for LOG.debug() call, so it would check the log level inside and shortcircuit the call.

LOG.debug("{}: writePutBlockData logIndex={} containerID={}",
getGroupId(), entryIndex, requestProto.getContainerID());
}
return raftFuture;
}

private void validateLongRunningWrite() throws StorageContainerException {
// get min valid write chunk operation's future context
Map.Entry<Long, WriteFutures> writeFutureContextEntry = null;
Expand Down Expand Up @@ -803,6 +934,9 @@ public CompletableFuture<Message> write(LogEntryProto entry, TransactionContext
case WriteChunk:
return writeStateMachineData(requestProto, entry.getIndex(),
entry.getTerm(), writeStateMachineStartTime);
case PutBlock:
return writePutBlockData(requestProto, entry.getIndex(),
entry.getTerm(), writeStateMachineStartTime);
Comment on lines +937 to +939
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change here is to write PutBlock to RocksDb when writing the RaftLog entry (via DataApi.write(..)). If the RaftLog entry is truncated, what will happen to the RocksDb record?

default:
throw new IllegalStateException("Cmd Type:" + cmdType
+ " should not have state machine data");
Expand Down Expand Up @@ -881,7 +1015,13 @@ private ByteString readStateMachineData(
"read chunk len=%s does not match chunk expected len=%s for chunk:%s",
data.size(), chunkInfo.getLen(), chunkInfo);

return data;
// Return the full proto bytes (not raw chunk bytes) so that followers can
// parse stateMachineData directly via parseFrom() without reconstruction.
final ContainerCommandRequestProto fullProto = ContainerCommandRequestProto.newBuilder(requestProto)
.setWriteChunk(WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk())
.setData(data))
.build();
return fullProto.toByteString();
}

/**
Expand Down Expand Up @@ -939,6 +1079,10 @@ public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContex
final ContainerCommandRequestProto requestProto = context != null ? context.getLogProto()
: getContainerCommandRequestProto(getGroupId(), entry.getStateMachineLogEntry().getLogData());

if (requestProto.getCmdType() == Type.PutBlock) {
// PutBlock proto is self-contained (no external chunk bytes); return it directly.
return CompletableFuture.completedFuture(requestProto.toByteString());
}
if (requestProto.getCmdType() != Type.WriteChunk) {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data");
Expand Down
Loading