Skip to content

RATIS-1240. Add input stream to DataStreamApi for read operations in Server#1469

Open
peterxcli wants to merge 5 commits into
apache:masterfrom
peterxcli:RATIS-1240-Add-input-stream-to-DataStreamApi-for-read-operations
Open

RATIS-1240. Add input stream to DataStreamApi for read operations in Server#1469
peterxcli wants to merge 5 commits into
apache:masterfrom
peterxcli:RATIS-1240-Add-input-stream-to-DataStreamApi-for-read-operations

Conversation

@peterxcli
Copy link
Copy Markdown
Member

@peterxcli peterxcli commented May 25, 2026

What changes were proposed in this pull request?

New Server and StateMachine API:

  • Added streamReadOnlyAsync(RaftClientRequest, StateMachine.DataChannel) to the RaftServer interface, allowing clients to submit read-only requests whose responses are streamed via the data stream RPC.
  • Added streamReadOnly(RaftClientRequest, DataChannel) to the StateMachine.Data interface, enabling state machines to implement custom streaming logic for read-only queries.

Server Implementation:

  • Implemented server-side handling for read-only streaming requests in DataStreamManagement, including request parsing, channel management, and response streaming.
  • Added executeStreamReadOnlyAsync to RaftServerImpl to coordinate read-only stream execution and integrate with the read index logic.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-1086

How was this patch tested?

(Please explain how this patch was tested. Ex: unit tests, manual tests)
(If this patch involves UI changes, please attach a screen-shot; otherwise, remove this)

Signed-off-by: peterxcli <peterxcli@gmail.com>
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , thanks a lot working on stream read! It is going to be very useful.

This change is very big and quite involved. In particular, it defines several new public APIs. We need to careful design the APIs.

Could you split this into two or three subtasks? One usual way is to change the server first and then the client.

private CompletableFuture<Void> submitReadOnlyRequest(DataStreamRequestByteBuf request,
RaftClientRequest raftClientRequest, ChannelHandlerContext ctx) {
try {
final StateMachine.ReadOnlyDataStream readOnlyDataStream = new StateMachine.ReadOnlyDataStream() {
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reuse StateMachine.DataChannel which is a Java WritableByteChannel. Then, we can use other Java API such as FileChannel.transferTo (which is a highly efficient, zero-copy data transfer operation).

      final StateMachine.DataChannel readOnlyDataStream = new StateMachine.DataChannel() {
        private long streamOffset;
        private boolean closed = false;

        @Override
        public synchronized boolean isOpen() {
          return !closed;
        }

        @Override
        public synchronized void close() {
          closed = true;
        }

        @Override
        public synchronized void force(boolean metadata) throws AlreadyClosedException{
          if (!isOpen()) {
            throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
          }
          ctx.flush();
        }

        @Override
        public synchronized int write(ByteBuffer buffer) throws IOException {
          if (!isOpen()) {
            throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
          }

          final DataStreamReplyByteBuffer reply = newDataStreamReadOnlyReplyByteBuffer(request, streamOffset, buffer);
          final int length = buffer.remaining();
          final ChannelFuture future = ctx.write(reply);
          try {
            future.await();
          } catch (InterruptedException e) {
            throw new InterruptedIOException("Interrupted while writing " + length
                + " bytes at offset " + streamOffset);
          }
          streamOffset += length;
          return length;
        }
      };

@peterxcli
Copy link
Copy Markdown
Member Author

Could you split this into two or three subtasks? One usual way is to change the server first and then the client.

Thanks for the review! Sure, I will split this into server and client patches!

peterxcli and others added 2 commits May 26, 2026 14:14
Co-authored-by: Tsz-Wo Nicholas Sze <szetszwo@apache.org>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli changed the title RATIS-1240. Add input stream to DataStreamApi for read operations RATIS-1240. Add input stream to DataStreamApi for read operations in Server May 26, 2026
peterxcli added 2 commits May 26, 2026 16:36
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli marked this pull request as ready for review May 26, 2026 09:53
@peterxcli peterxcli requested a review from szetszwo May 26, 2026 09:53
@peterxcli
Copy link
Copy Markdown
Member Author

@szetszwo thanks for the review, I address the data channel reuse suggestion and only keep server side code in this PR. Please take another look, thanks!

@peterxcli
Copy link
Copy Markdown
Member Author

peterxcli commented May 26, 2026

@szetszwo I integrated the Ratis stream read API into Ozone as a POC, and the results are promising.

The no-MD5 sequential reads comparison:

Key size Buffer ReadBlock stream Ratis data stream Ratis / ReadBlock bandwidth
256 MiB 32 MiB 922 MB/s 1918 MB/s 2.08x bandwidth
256 MiB 8 MiB 1292 MB/s 2146 MB/s 1.66x bandwidth
256 MiB 1 MiB 955 MB/s 2279 MB/s 2.39x bandwidth
256 MiB 4 KiB 730 MB/s 2574 MB/s 3.52x bandwidth
500 MiB 32 MiB 989 MB/s 2632 MB/s 2.66x bandwidth
500 MiB 8 MiB 1896 MB/s 2774 MB/s 1.46x bandwidth
500 MiB 1 MiB 1299 MB/s 2969 MB/s 2.29x bandwidth
500 MiB 4 KiB 1073 MB/s 3232 MB/s 3.01x bandwidth
1 GiB 32 MiB 1091 MB/s 2466 MB/s 2.26x bandwidth
1 GiB 8 MiB 1473 MB/s 3182 MB/s 2.16x bandwidth
1 GiB 1 MiB 1387 MB/s 3331 MB/s 2.40x bandwidth
1 GiB 4 KiB 1127 MB/s 3362 MB/s 2.98x bandwidth

The no-MD5 random reads comparison:

Key size Random read size Direct stream Ratis data stream Bandwidth ratio IOPS ratio Direct elapsed Ratis elapsed
256 MiB 1 MiB 41.94 MB/s 110.81 MB/s 2.64x bandwidth 2.64x IOPS 0.763 s 0.289 s
256 MiB 4 KiB 0.16 MB/s 0.73 MB/s 4.56x bandwidth 4.65x IOPS 0.799 s 0.172 s
500 MiB 1 MiB 93.90 MB/s 297.39 MB/s 3.17x bandwidth 3.16x IOPS 0.341 s 0.108 s
500 MiB 4 KiB 0.41 MB/s 1.33 MB/s 3.24x bandwidth 3.22x IOPS 0.303 s 0.094 s
1 GiB 1 MiB 69.44 MB/s 284.96 MB/s 4.10x bandwidth 4.12x IOPS 0.461 s 0.112 s
1 GiB 4 KiB 0.12 MB/s 0.94 MB/s 7.83x bandwidth 7.77x IOPS 1.034 s 0.133 s

ozone poc: peterxcli/ozone#16
ratis stream read client poc: peterxcli#1

Copy link
Copy Markdown
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , thanks for working on this! Please see the comments inlined and also linearizable.

Comment on lines +610 to +613
final Throwable cause = future.cause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a new IOException in both cases since it can add the offset to the message.

Comment on lines +163 to +166
default CompletableFuture<RaftClientReply> streamReadOnlyAsync(
RaftClientRequest request, StateMachine.DataChannel stream) throws IOException {
throw new UnsupportedOperationException("This method is NOT supported.");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new method seems not needed since we may:

  • Phase 1: Directly call DataApi.streamReadOnly(..) and ignore all linearizable checks.
  • Phase 2: Reuse RaftClientAsynchronousProtocol.submitClientRequestAsync(..) to submit a dummy read request for linearizable checks and then call DataApi.streamReadOnly(..).

Of course, we should start with Phase 1 for simpilcity.

* @param stream the output stream for response data chunks
* @return a future for the terminal reply message
*/
default CompletableFuture<Message> streamReadOnly(RaftClientRequest request, DataChannel stream) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be similar to query(Message):

    /**
     * Similar to {@link #query(Message)} except that
     * {@link #query(Message)} returns the result in a future
     * while this method sends the result using the given stream.
     *
     * @param request the client request
     * @param stream the output stream to send the results
     */
    default void query(Message request, DataChannel stream) {
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants