Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(DataStreamRequestB
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
.setCommitInfos(reply.getCommitInfos())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void close() {
private final ChannelFuture channelFuture;

private final DataStreamManagement requests;
private final ReadStreamManagement reads;
private final ProxiesPool proxies;

private final NettyServerStreamRpcMetrics metrics;
Expand All @@ -162,6 +163,7 @@ public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
this.metrics = new NettyServerStreamRpcMetrics(this.name);
this.requests = new DataStreamManagement(server, metrics);
this.reads = new ReadStreamManagement(server);

final RaftProperties properties = server.getProperties();

Expand Down Expand Up @@ -235,6 +237,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
try(UncheckedAutoCloseable autoReset = requestRef.set(request)) {
if (reads.process(request, ctx)) {
return;
}
requests.read(request, ctx, proxies.get(request)::getDataStreamOutput);
}
}
Expand All @@ -248,6 +253,7 @@ public void channelInactive(ChannelHandlerContext ctx) {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
Optional.ofNullable(requestRef.getAndSetNull())
.ifPresent(request -> requests.replyDataStreamException(throwable, request, ctx));
ctx.close();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.netty.server;

import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;

import static org.apache.ratis.client.impl.ClientProtoUtils.toRaftClientRequest;
import static org.apache.ratis.netty.server.DataStreamManagement.replyDataStreamException;

public class ReadStreamManagement {
public static final Logger LOG = LoggerFactory.getLogger(ReadStreamManagement.class);

static class ReadStream implements WritableByteChannel {
private final ClientId clientId;
private final long streamId;
private final ChannelHandlerContext ctx;
private final CompletableFuture<Void> closed = new CompletableFuture<>();
private long streamOffset;

ReadStream(DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
this.clientId = request.getClientId();
this.streamId = request.getStreamId();
this.ctx = ctx;
}

@Override
public boolean isOpen() {
return !closed.isDone();
}

@Override
public void close() {
closed.complete(null);
}

@Override
public synchronized int write(ByteBuffer buffer) throws IOException {
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
buffer = buffer.asReadOnlyBuffer();
final int length = buffer.remaining();
final DataStreamReplyByteBuffer reply = newReply(buffer);
final ChannelFuture future = ctx.writeAndFlush(reply);
try {
future.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(
"Interrupted while writing " + length + " bytes at offset " + streamOffset);
}
if (!future.isSuccess()) {
throw new IOException("Failed to write " + length + " bytes at offset " + streamOffset, future.cause());
}
streamOffset += length;
return length;
}

private synchronized DataStreamReplyByteBuffer newReply(ByteBuffer buffer) {
return DataStreamReplyByteBuffer.newBuilder()
.setClientId(clientId)
.setType(Type.STREAM_DATA)
.setStreamId(streamId)
.setStreamOffset(streamOffset)
.setBuffer(buffer)
.setSuccess(true)
.setBytesWritten(buffer.remaining())
.build();
}
}

private final RaftServer server;
private final String name;

ReadStreamManagement(RaftServer server) {
this.server = server;
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
}

boolean process(DataStreamRequestByteBuf requestBuf, ChannelHandlerContext ctx) {
boolean processed = false;
try {
processed = processImpl(requestBuf, ctx);
} catch (Throwable e) {
LOG.error("Failed to process {}", requestBuf, e);
processed = true;
} finally {
if (processed) {
requestBuf.release();
}
}
return processed;
}

private boolean processImpl(DataStreamRequestByteBuf requestBuf, ChannelHandlerContext ctx)
throws InvalidProtocolBufferException {
if (requestBuf.getType() != Type.STREAM_HEADER) {
return false;
}
final RaftClientRequest request = toRaftClientRequest(
RaftClientRequestProto.parseFrom(requestBuf.slice().nioBuffer()));
if (!request.is(TypeCase.READ)) {
return false;
}

final RaftServer.Division division;
try {
division = server.getDivision(request.getRaftGroupId());
} catch (IOException e) {
replyDataStreamException(server, e, request, requestBuf, ctx);
return true;
}

final ReadStream stream = new ReadStream(requestBuf, ctx);
division.getStateMachine().data().query(request.getMessage(), stream);
return true;
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ default CompletableFuture<DataStream> stream(RaftClientRequest request) {
return CompletableFuture.completedFuture(null);
}

/**
* Similar to {@link #query(Message)} except that
* {@link #query(Message)} returns the result in a future
* while this method sends the result using the given stream.
*
* @param request the client request
* @param stream the output stream to send the results
*/
default void query(Message request, WritableByteChannel stream) {
}

/**
* Link asynchronously the given stream with the given log entry.
* The given stream can be null if it is unavailable due to errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -147,8 +149,14 @@ static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeerId> peers, Raf
}

class MultiDataStreamStateMachine extends BaseStateMachine {
static final int READ_ONLY_STREAM_CHUNKS = 3;

private final ConcurrentMap<ClientInvocationId, SingleDataStream> streams = new ConcurrentHashMap<>();

static ByteString getReadOnlyStreamChunk(ByteString query, int index) {
return query.concat(ByteString.copyFromUtf8("-chunk-" + index));
}

@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
final SingleDataStream s = new SingleDataStream(request);
Expand Down Expand Up @@ -176,6 +184,34 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
return CompletableFuture.completedFuture(() -> bytesWritten);
}

@Override
public CompletableFuture<Message> query(Message request) {
return CompletableFuture.completedFuture(request);
}

@Override
public void query(Message request, WritableByteChannel stream) {
CompletableFuture.supplyAsync(() -> {
try {
streamReadOnlyImpl(request, stream);
} catch (IOException e) {
throw new CompletionException("Failed to streamReadOnly for " + request, e);
}
return null;
});
}

private void streamReadOnlyImpl(Message request, WritableByteChannel stream) throws IOException {
try {
for (int i = 0; i < READ_ONLY_STREAM_CHUNKS; i++) {
final ByteString chunk = getReadOnlyStreamChunk(request.getContent(), i);
stream.write(chunk.asReadOnlyByteBuffer());
}
} finally {
stream.close();
}
}

SingleDataStream getSingleDataStream(RaftClientRequest request) {
return getSingleDataStream(ClientInvocationId.valueOf(request));
}
Expand Down
Loading
Loading