From 8807f45ca1625008b74925512b9ba692ab3ea07b Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 17:12:11 +0800 Subject: [PATCH 01/12] RATIS-2524. Add ReadIndex batching --- .../src/site/markdown/configurations.md | 25 ++++ .../ratis/server/RaftServerConfigKeys.java | 32 ++++ .../ratis/server/impl/RaftServerImpl.java | 16 ++ .../ratis/server/impl/ReadIndexBatching.java | 140 ++++++++++++++++++ .../apache/ratis/LinearizableReadTests.java | 79 +++++++++- 5 files changed, 291 insertions(+), 1 deletion(-) create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index f5189ed862..1904ee04ce 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -253,6 +253,31 @@ but there are tradeoffs (e.g. Write and Read performance) between different type | **Type** | TimeDuration | | **Default** | 10ms | +| **Property** | `raft.server.read.read-index.batch.enabled` | +|:----------------|:----------------------------------------------------------------------------------| +| **Description** | whether to batch follower-to-leader ReadIndex RPCs for plain linearizable reads | +| **Type** | boolean | +| **Default** | false | + +| **Property** | `raft.server.read.read-index.batch.interval` | +|:----------------|:----------------------------------------------------| +| **Description** | maximum time to collect reads into one ReadIndex batch | +| **Type** | TimeDuration | +| **Default** | 500us | + +| **Property** | `raft.server.read.read-index.batch.size` | +|:----------------|:----------------------------------------------------| +| **Description** | maximum number of reads in one ReadIndex batch | +| **Type** | int | +| **Default** | 64 | + +When ReadIndex batching is enabled, a follower batches plain linearizable read +requests and sends a single ReadIndex request for each sealed batch. A batch is +sealed when either `batch.interval` expires or `batch.size` is reached. New +reads after sealing are assigned to a later batch. Read-after-write requests +bypass batching so that the leader can evaluate each request's client-specific +write index. + | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| | **Description** | whether to check heartbeat for read index. | diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 2d55594782..bb10f08237 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -309,6 +309,38 @@ static TimeDuration repliedIndexBatchInterval(RaftProperties properties) { static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) { setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval); } + + interface Batch { + String PREFIX = ReadIndex.PREFIX + ".batch"; + + String ENABLED_KEY = PREFIX + ".enabled"; + boolean ENABLED_DEFAULT = false; + static boolean enabled(RaftProperties properties) { + return getBoolean(properties::getBoolean, ENABLED_KEY, ENABLED_DEFAULT, getDefaultLog()); + } + static void setEnabled(RaftProperties properties, boolean enabled) { + setBoolean(properties::setBoolean, ENABLED_KEY, enabled); + } + + String BATCH_INTERVAL_KEY = PREFIX + ".interval"; + TimeDuration BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(500, TimeUnit.MICROSECONDS); + static TimeDuration batchInterval(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(BATCH_INTERVAL_DEFAULT.getUnit()), + BATCH_INTERVAL_KEY, BATCH_INTERVAL_DEFAULT, getDefaultLog()); + } + static void setBatchInterval(RaftProperties properties, TimeDuration interval) { + setTimeDuration(properties::setTimeDuration, BATCH_INTERVAL_KEY, interval); + } + + String BATCH_SIZE_KEY = PREFIX + ".size"; + int BATCH_SIZE_DEFAULT = 64; + static int batchSize(RaftProperties properties) { + return getInt(properties::getInt, BATCH_SIZE_KEY, BATCH_SIZE_DEFAULT, getDefaultLog(), requireMin(1)); + } + static void setBatchSize(RaftProperties properties, int batchSize) { + setInt(properties::setInt, BATCH_SIZE_KEY, batchSize, requireMin(1)); + } + } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 1c9cd3f658..984f8218f8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -163,6 +163,7 @@ class RaftServerImpl implements RaftServer.Division, static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction"; + static final String READ_INDEX = CLASS_NAME + ".readIndexAsync"; static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete"; static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection"; static final String START_COMPLETE = CLASS_NAME + ".startComplete"; @@ -241,6 +242,7 @@ public long[] getFollowerMatchIndices() { private final CommitInfoCache commitInfoCache = new CommitInfoCache(); private final WriteIndexCache writeIndexCache; private final NavigableIndices appendLogTermIndices; + private final ReadIndexBatching readIndexBatching; private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this); private final LeaderElectionMetrics leaderElectionMetrics; @@ -279,6 +281,11 @@ public long[] getFollowerMatchIndices() { this.dataStreamMap = new DataStreamMapImpl(id); this.readOption = RaftServerConfigKeys.Read.option(properties); this.writeIndexCache = new WriteIndexCache(properties); + this.readIndexBatching = RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(properties) ? + new ReadIndexBatching( + RaftServerConfigKeys.Read.ReadIndex.Batch.batchInterval(properties), + RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties), + this::sendReadIndexAsyncUnbatched) : null; this.transactionManager = new TransactionManager(id); TraceUtils.setTracerWhenEnabled(properties); @@ -1091,6 +1098,14 @@ ReadRequests getReadRequests() { } private CompletableFuture sendReadIndexAsync(RaftClientRequest clientRequest) { + if (readIndexBatching != null + && !clientRequest.getType().getRead().getReadAfterWriteConsistent()) { + return readIndexBatching.submit(clientRequest); + } + return sendReadIndexAsyncUnbatched(clientRequest); + } + + private CompletableFuture sendReadIndexAsyncUnbatched(RaftClientRequest clientRequest) { final RaftPeerId leaderId = getInfo().getLeaderId(); if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); @@ -1569,6 +1584,7 @@ public CompletableFuture readIndexAsync(ReadIndexRequestPro assertLifeCycleState(LifeCycle.States.RUNNING); final RaftPeerId peerId = RaftPeerId.valueOf(request.getServerRequest().getRequestorId()); + CodeInjectionForTesting.execute(READ_INDEX, getId(), peerId, request); final LeaderStateImpl leader = role.getLeaderState().orElse(null); if (leader == null) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java new file mode 100644 index 0000000000..53ce60ae2b --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.TimeoutExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** Batch follower-to-leader ReadIndex requests. */ +class ReadIndexBatching { + private static final Logger LOG = LoggerFactory.getLogger(ReadIndexBatching.class); + + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); + private final TimeDuration batchInterval; + private final int batchSize; + private final Function> sender; + + private Batch open; + + ReadIndexBatching(TimeDuration batchInterval, int batchSize, + Function> sender) { + this.batchInterval = batchInterval; + this.batchSize = batchSize; + this.sender = sender; + } + + CompletableFuture submit(RaftClientRequest request) { + final CompletableFuture future = new CompletableFuture<>(); + final Batch batch; + final boolean schedule; + final boolean seal; + synchronized (this) { + schedule = open == null; + if (schedule) { + open = new Batch(); + } + batch = open; + batch.add(request, future); + seal = batch.size() >= batchSize; + if (seal) { + open = null; + } + } + + if (schedule) { + scheduler.onTimeout(batchInterval, () -> seal(batch), LOG, + () -> "Failed to seal ReadIndex batch"); + } + if (seal) { + batch.seal(sender); + } + return future; + } + + private void seal(Batch batch) { + synchronized (this) { + if (open == batch) { + open = null; + } + } + batch.seal(sender); + } + + private static class Pending { + private final RaftClientRequest request; + private final CompletableFuture future; + + Pending(RaftClientRequest request, CompletableFuture future) { + this.request = request; + this.future = future; + } + } + + private static class Batch { + private final AtomicBoolean sealed = new AtomicBoolean(); + private final List pending = new ArrayList<>(); + + void add(RaftClientRequest request, CompletableFuture future) { + pending.add(new Pending(request, future)); + } + + int size() { + return pending.size(); + } + + void seal(Function> sender) { + if (!sealed.compareAndSet(false, true)) { + return; + } + if (pending.isEmpty()) { + return; + } + + final CompletableFuture replyFuture; + try { + replyFuture = sender.apply(pending.get(0).request); + } catch (Throwable t) { + completeExceptionally(t); + return; + } + + replyFuture.whenComplete((reply, throwable) -> { + if (throwable != null) { + completeExceptionally(JavaUtils.unwrapCompletionException(throwable)); + } else { + pending.forEach(p -> p.future.complete(reply)); + } + }); + } + + private void completeExceptionally(Throwable throwable) { + pending.forEach(p -> p.future.completeExceptionally(throwable)); + } + } +} diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 09781b546e..3ebdfbfae3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -30,6 +30,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedConsumer; @@ -42,6 +43,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; @@ -88,6 +90,7 @@ public void setup() { RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); + RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(p, false); // Enable dummy request so linearizable-read tests exercise the default ordered-async bootstrap path. RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true); @@ -209,6 +212,80 @@ static void runTestFollowerReadOnlyParallel(C cluste } } + @Test + public void testFollowerReadIndexBatching() throws Exception { + RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(getProperties(), true); + RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval( + getProperties(), TimeDuration.valueOf(1, TimeUnit.SECONDS)); + RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(getProperties(), 64); + + runWithReadIndexCounting(cluster -> { + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + + try (RaftClient leaderClient = cluster.createClient(leaderId); + RaftClient followerClient = cluster.createClient(followerId)) { + assertReplyExact(1, leaderClient.io().send(INCREMENT)); + + final int n = 8; + final List replies = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + replies.add(new Reply(1, followerClient.async().sendReadOnly(QUERY, followerId))); + } + + for (Reply reply : replies) { + reply.assertAtLeast(); + } + } + }, count -> { + Assertions.assertTrue(count.get() > 0, () -> "ReadIndex count = " + count); + Assertions.assertTrue(count.get() < 8, () -> "ReadIndex count = " + count); + }); + } + + @Test + public void testFollowerReadAfterWriteBypassesReadIndexBatching() throws Exception { + RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(getProperties(), true); + RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval( + getProperties(), TimeDuration.valueOf(1, TimeUnit.SECONDS)); + RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(getProperties(), 64); + + final int n = 5; + runWithReadIndexCounting(cluster -> { + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + + try (RaftClient leaderClient = cluster.createClient(leaderId); + RaftClient followerClient = cluster.createClient(followerId)) { + assertReplyExact(1, leaderClient.io().send(INCREMENT)); + + final List replies = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + replies.add(new Reply(1, followerClient.async().sendReadAfterWrite(QUERY))); + } + + for (Reply reply : replies) { + reply.assertAtLeast(); + } + } + }, count -> Assertions.assertEquals(n, count.get())); + } + + private void runWithReadIndexCounting(CheckedConsumer testCase, + CheckedConsumer assertion) throws Exception { + final AtomicInteger count = new AtomicInteger(); + CodeInjectionForTesting.put("RaftServerImpl.readIndexAsync", (localId, remoteId, args) -> { + count.incrementAndGet(); + return true; + }); + try { + runWithNewCluster(testCase); + assertion.accept(count); + } finally { + CodeInjectionForTesting.remove("RaftServerImpl.readIndexAsync"); + } + } + @Test public void testLinearizableReadFailWhenLeaderDown() throws Exception { runWithNewCluster(LinearizableReadTests::runTestLinearizableReadFailWhenLeaderDown); @@ -285,4 +362,4 @@ static void runTestReadAfterWrite(C cluster) throws assertReplyAtLeast(2, asyncReply.join()); } } -} \ No newline at end of file +} From 8aa91bef7e5e2b40f0cc909084ea02cf5680447f Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 17:17:07 +0800 Subject: [PATCH 02/12] RATIS-2524. Rename ReadIndex fallback helper --- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 984f8218f8..a115220eb3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -285,7 +285,7 @@ public long[] getFollowerMatchIndices() { new ReadIndexBatching( RaftServerConfigKeys.Read.ReadIndex.Batch.batchInterval(properties), RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties), - this::sendReadIndexAsyncUnbatched) : null; + this::sendReadIndexAsyncImpl) : null; this.transactionManager = new TransactionManager(id); TraceUtils.setTracerWhenEnabled(properties); @@ -1102,10 +1102,10 @@ private CompletableFuture sendReadIndexAsync(RaftClientRequ && !clientRequest.getType().getRead().getReadAfterWriteConsistent()) { return readIndexBatching.submit(clientRequest); } - return sendReadIndexAsyncUnbatched(clientRequest); + return sendReadIndexAsyncImpl(clientRequest); } - private CompletableFuture sendReadIndexAsyncUnbatched(RaftClientRequest clientRequest) { + private CompletableFuture sendReadIndexAsyncImpl(RaftClientRequest clientRequest) { final RaftPeerId leaderId = getInfo().getLeaderId(); if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); From bdbb7829a2744cbe6e094a1eef9104f730d4a78f Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 17:19:13 +0800 Subject: [PATCH 03/12] RATIS-2524. Rename ReadIndex batch callback --- .../ratis/server/impl/ReadIndexBatching.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index 53ce60ae2b..09f2d494bc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -38,15 +38,15 @@ class ReadIndexBatching { private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private final TimeDuration batchInterval; private final int batchSize; - private final Function> sender; + private final Function> readIndexAsyncImpl; private Batch open; ReadIndexBatching(TimeDuration batchInterval, int batchSize, - Function> sender) { + Function> readIndexAsyncImpl) { this.batchInterval = batchInterval; this.batchSize = batchSize; - this.sender = sender; + this.readIndexAsyncImpl = readIndexAsyncImpl; } CompletableFuture submit(RaftClientRequest request) { @@ -72,7 +72,7 @@ CompletableFuture submit(RaftClientRequest request) { () -> "Failed to seal ReadIndex batch"); } if (seal) { - batch.seal(sender); + batch.seal(readIndexAsyncImpl); } return future; } @@ -83,7 +83,7 @@ private void seal(Batch batch) { open = null; } } - batch.seal(sender); + batch.seal(readIndexAsyncImpl); } private static class Pending { @@ -108,7 +108,7 @@ int size() { return pending.size(); } - void seal(Function> sender) { + void seal(Function> readIndexAsyncImpl) { if (!sealed.compareAndSet(false, true)) { return; } @@ -118,7 +118,7 @@ void seal(Function> se final CompletableFuture replyFuture; try { - replyFuture = sender.apply(pending.get(0).request); + replyFuture = readIndexAsyncImpl.apply(pending.get(0).request); } catch (Throwable t) { completeExceptionally(t); return; From 27533a778fe9978cb83ddc4aa9c2c963aac45f3c Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 18:02:02 +0800 Subject: [PATCH 04/12] RATIS-2524. Clarify ReadIndex batch synchronization --- .../java/org/apache/ratis/server/impl/ReadIndexBatching.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index 09f2d494bc..dd6f98f4c7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -40,6 +40,7 @@ class ReadIndexBatching { private final int batchSize; private final Function> readIndexAsyncImpl; + /** Guarded by {@code this}; the monitor provides visibility, so volatile is not needed. */ private Batch open; ReadIndexBatching(TimeDuration batchInterval, int batchSize, From 9b68c609d76805ac2c6094553d8991167764663b Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 18:09:40 +0800 Subject: [PATCH 05/12] RATIS-2524. Clarify ReadIndex batch anchor request --- .../java/org/apache/ratis/server/impl/ReadIndexBatching.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index dd6f98f4c7..70b2de0ab9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -119,6 +119,9 @@ void seal(Function> re final CompletableFuture replyFuture; try { + // Plain reads only need one ReadIndex RPC for the batch. Read-after-write requests + // bypass batching before reaching this class, since their client request carries + // per-client write-index state. replyFuture = readIndexAsyncImpl.apply(pending.get(0).request); } catch (Throwable t) { completeExceptionally(t); From fe750f1b7f59fbf36224e052086477d695a790eb Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 18:15:15 +0800 Subject: [PATCH 06/12] RATIS-2524. Handle batched ReadIndex after promotion --- .../ratis/server/impl/RaftServerImpl.java | 6 +++++ .../apache/ratis/LinearizableReadTests.java | 25 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index a115220eb3..9d851a690b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1106,6 +1106,12 @@ private CompletableFuture sendReadIndexAsync(RaftClientRequ } private CompletableFuture sendReadIndexAsyncImpl(RaftClientRequest clientRequest) { + final LeaderStateImpl leader = role.getLeaderState().orElse(null); + if (leader != null) { + return getReadIndex(clientRequest, leader) + .thenApply(index -> toReadIndexReplyProto(getId(), getMemberId(), true, index)); + } + final RaftPeerId leaderId = getInfo().getLeaderId(); if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 3ebdfbfae3..4a5712c52d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -271,6 +271,31 @@ public void testFollowerReadAfterWriteBypassesReadIndexBatching() throws Excepti }, count -> Assertions.assertEquals(n, count.get())); } + @Test + public void testFollowerReadIndexBatchingBecomesLeader() throws Exception { + RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(getProperties(), true); + RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval( + getProperties(), TimeDuration.valueOf(1, TimeUnit.SECONDS)); + RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(getProperties(), 64); + + runWithReadIndexCounting(cluster -> { + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + + try (RaftClient leaderClient = cluster.createClient(leaderId); + RaftClient followerClient = cluster.createClient(followerId)) { + assertReplyExact(1, leaderClient.io().send(INCREMENT)); + + final Reply reply = new Reply(1, followerClient.async().sendReadOnly(QUERY, followerId)); + final RaftClientReply transferReply = leaderClient.admin().transferLeadership(followerId, 20000); + Assertions.assertTrue(transferReply.isSuccess()); + Assertions.assertEquals(followerId, RaftTestUtil.waitForLeader(cluster).getId()); + + reply.assertAtLeast(); + } + }, count -> Assertions.assertEquals(0, count.get())); + } + private void runWithReadIndexCounting(CheckedConsumer testCase, CheckedConsumer assertion) throws Exception { final AtomicInteger count = new AtomicInteger(); From 32350ea999705bc086d292a8d2fd0ad44d2d9001 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 18:20:29 +0800 Subject: [PATCH 07/12] RATIS-2524. Close ReadIndex batches on shutdown --- .../ratis/server/impl/RaftServerImpl.java | 5 ++ .../ratis/server/impl/ReadIndexBatching.java | 36 +++++++- .../server/impl/TestReadIndexBatching.java | 88 +++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 9d851a690b..edfe2e58bc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -529,6 +529,11 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { public void close() { lifeCycle.checkStateAndClose(() -> { LOG.info("{}: shutdown", getMemberId()); + try { + Optional.ofNullable(readIndexBatching).ifPresent(ReadIndexBatching::close); + } catch (Exception e) { + LOG.warn("{}: Failed to close ReadIndexBatching", getMemberId(), e); + } try { jmxAdapter.unregister(); } catch (Exception e) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index 70b2de0ab9..7663fcad57 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -19,6 +19,7 @@ import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; @@ -35,16 +36,24 @@ class ReadIndexBatching { private static final Logger LOG = LoggerFactory.getLogger(ReadIndexBatching.class); - private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); + private final TimeoutExecutor scheduler; private final TimeDuration batchInterval; private final int batchSize; private final Function> readIndexAsyncImpl; /** Guarded by {@code this}; the monitor provides visibility, so volatile is not needed. */ private Batch open; + /** Guarded by {@code this}. */ + private boolean closed; ReadIndexBatching(TimeDuration batchInterval, int batchSize, Function> readIndexAsyncImpl) { + this(TimeoutExecutor.getInstance(), batchInterval, batchSize, readIndexAsyncImpl); + } + + ReadIndexBatching(TimeoutExecutor scheduler, TimeDuration batchInterval, int batchSize, + Function> readIndexAsyncImpl) { + this.scheduler = scheduler; this.batchInterval = batchInterval; this.batchSize = batchSize; this.readIndexAsyncImpl = readIndexAsyncImpl; @@ -56,6 +65,9 @@ CompletableFuture submit(RaftClientRequest request) { final boolean schedule; final boolean seal; synchronized (this) { + if (closed) { + return JavaUtils.completeExceptionally(newClosedException()); + } schedule = open == null; if (schedule) { open = new Batch(); @@ -78,6 +90,22 @@ CompletableFuture submit(RaftClientRequest request) { return future; } + void close() { + final Batch batch; + synchronized (this) { + closed = true; + batch = open; + open = null; + } + if (batch != null) { + batch.cancel(newClosedException()); + } + } + + private static ReadIndexException newClosedException() { + return new ReadIndexException("ReadIndex batching is closed."); + } + private void seal(Batch batch) { synchronized (this) { if (open == batch) { @@ -137,6 +165,12 @@ void seal(Function> re }); } + private void cancel(Throwable throwable) { + if (sealed.compareAndSet(false, true)) { + completeExceptionally(throwable); + } + } + private void completeExceptionally(Throwable throwable) { pending.forEach(p -> p.future.completeExceptionally(throwable)); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java new file mode 100644 index 0000000000..9fd39c5b12 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; +import org.apache.ratis.protocol.exceptions.ReadIndexException; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.TimeoutExecutor; +import org.apache.ratis.util.function.CheckedRunnable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Supplier; + +class TestReadIndexBatching { + @Test + void testCloseCompletesOpenBatchExceptionally() throws Exception { + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexBatching batching = new ReadIndexBatching( + new NoOpTimeoutExecutor(), TimeDuration.valueOf(1, TimeUnit.DAYS), 64, request -> { + readIndexCount.incrementAndGet(); + return new CompletableFuture(); + }); + + final CompletableFuture reply = batching.submit(null); + Assertions.assertFalse(reply.isDone()); + + batching.close(); + + final ExecutionException e = Assertions.assertThrows(ExecutionException.class, reply::get); + Assertions.assertTrue(e.getCause() instanceof ReadIndexException); + Assertions.assertEquals(0, readIndexCount.get()); + } + + @Test + void testSubmitAfterCloseCompletesExceptionally() { + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexBatching batching = new ReadIndexBatching( + new NoOpTimeoutExecutor(), TimeDuration.valueOf(1, TimeUnit.DAYS), 64, request -> { + readIndexCount.incrementAndGet(); + return new CompletableFuture(); + }); + + batching.close(); + + final CompletableFuture reply = batching.submit(null); + final ExecutionException e = Assertions.assertThrows(ExecutionException.class, reply::get); + Assertions.assertTrue(e.getCause() instanceof ReadIndexException); + Assertions.assertEquals(0, readIndexCount.get()); + } + + private static class NoOpTimeoutExecutor implements TimeoutExecutor { + @Override + public int getTaskCount() { + return 0; + } + + @Override + public void onTimeout( + TimeDuration timeout, CheckedRunnable task, Consumer errorHandler) { + } + + @Override + public void onTimeout(TimeDuration timeout, CheckedRunnable task, Logger log, Supplier errorMessage) { + } + } +} From 081d38da511edc237fde0689e3d27a006ee17bcd Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 8 May 2026 18:36:56 +0800 Subject: [PATCH 08/12] RATIS-2524. Harden ReadIndex batching --- .../src/site/markdown/configurations.md | 2 +- .../ratis/server/RaftServerConfigKeys.java | 4 +- .../ratis/server/impl/ReadIndexBatching.java | 1 + .../server/impl/TestReadIndexBatching.java | 109 +++++++++++++++++- 4 files changed, 107 insertions(+), 9 deletions(-) diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 1904ee04ce..7453d73825 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -261,7 +261,7 @@ but there are tradeoffs (e.g. Write and Read performance) between different type | **Property** | `raft.server.read.read-index.batch.interval` | |:----------------|:----------------------------------------------------| -| **Description** | maximum time to collect reads into one ReadIndex batch | +| **Description** | positive maximum time to collect reads into one ReadIndex batch | | **Type** | TimeDuration | | **Default** | 500us | diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index bb10f08237..e100f3779d 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -326,10 +326,10 @@ static void setEnabled(RaftProperties properties, boolean enabled) { TimeDuration BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(500, TimeUnit.MICROSECONDS); static TimeDuration batchInterval(RaftProperties properties) { return getTimeDuration(properties.getTimeDuration(BATCH_INTERVAL_DEFAULT.getUnit()), - BATCH_INTERVAL_KEY, BATCH_INTERVAL_DEFAULT, getDefaultLog()); + BATCH_INTERVAL_KEY, BATCH_INTERVAL_DEFAULT, getDefaultLog(), requirePositive()); } static void setBatchInterval(RaftProperties properties, TimeDuration interval) { - setTimeDuration(properties::setTimeDuration, BATCH_INTERVAL_KEY, interval); + setTimeDuration(properties::setTimeDuration, BATCH_INTERVAL_KEY, interval, requirePositive()); } String BATCH_SIZE_KEY = PREFIX + ".size"; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index 7663fcad57..4e47f011d6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -127,6 +127,7 @@ private static class Pending { private static class Batch { private final AtomicBoolean sealed = new AtomicBoolean(); + /** Appended only while this batch is {@code open}; sealing first removes it from {@code open}. */ private final List pending = new ArrayList<>(); void add(RaftClientRequest request, CompletableFuture future) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java index 9fd39c5b12..fe4b095530 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java @@ -17,23 +17,99 @@ */ package org.apache.ratis.server.impl; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; import org.apache.ratis.protocol.exceptions.ReadIndexException; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; import org.apache.ratis.util.function.CheckedRunnable; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.function.Supplier; class TestReadIndexBatching { + @Test + void testBatchIntervalMustBePositive() { + final RaftProperties properties = new RaftProperties(); + final TimeDuration zero = TimeDuration.valueOf(0, TimeUnit.MICROSECONDS); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval(properties, zero)); + + properties.setTimeDuration(RaftServerConfigKeys.Read.ReadIndex.Batch.BATCH_INTERVAL_KEY, zero); + Assertions.assertThrows(IllegalArgumentException.class, + () -> RaftServerConfigKeys.Read.ReadIndex.Batch.batchInterval(properties)); + } + + @Test + void testBatchSizeSealsImmediately() throws Exception { + final CapturingTimeoutExecutor scheduler = new CapturingTimeoutExecutor(); + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexReplyProto readIndexReply = ReadIndexReplyProto.getDefaultInstance(); + final ReadIndexBatching batching = new ReadIndexBatching( + scheduler, TimeDuration.valueOf(1, TimeUnit.DAYS), 2, request -> { + readIndexCount.incrementAndGet(); + return CompletableFuture.completedFuture(readIndexReply); + }); + + final CompletableFuture first = batching.submit(null); + Assertions.assertFalse(first.isDone()); + Assertions.assertEquals(1, scheduler.getTaskCount()); + + final CompletableFuture second = batching.submit(null); + Assertions.assertEquals(1, readIndexCount.get()); + Assertions.assertSame(readIndexReply, first.get()); + Assertions.assertSame(readIndexReply, second.get()); + + scheduler.runNext(); + Assertions.assertEquals(1, readIndexCount.get()); + } + + @Test + void testBatchIntervalSealsOpenBatch() throws Exception { + final CapturingTimeoutExecutor scheduler = new CapturingTimeoutExecutor(); + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexReplyProto readIndexReply = ReadIndexReplyProto.getDefaultInstance(); + final ReadIndexBatching batching = new ReadIndexBatching( + scheduler, TimeDuration.valueOf(1, TimeUnit.DAYS), 64, request -> { + readIndexCount.incrementAndGet(); + return CompletableFuture.completedFuture(readIndexReply); + }); + + final CompletableFuture reply = batching.submit(null); + Assertions.assertFalse(reply.isDone()); + Assertions.assertEquals(1, scheduler.getTaskCount()); + + scheduler.runNext(); + Assertions.assertEquals(1, readIndexCount.get()); + Assertions.assertSame(readIndexReply, reply.get()); + } + + @Test + void testReadIndexFailureCompletesAllBatchFuturesExceptionally() throws Exception { + final RuntimeException failure = new RuntimeException("read index failed"); + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(failure); + final ReadIndexBatching batching = new ReadIndexBatching( + new NoOpTimeoutExecutor(), TimeDuration.valueOf(1, TimeUnit.DAYS), 2, request -> failed); + + final CompletableFuture first = batching.submit(null); + final CompletableFuture second = batching.submit(null); + + Assertions.assertSame(failure, + Assertions.assertThrows(ExecutionException.class, first::get).getCause()); + Assertions.assertSame(failure, + Assertions.assertThrows(ExecutionException.class, second::get).getCause()); + } + @Test void testCloseCompletesOpenBatchExceptionally() throws Exception { final AtomicInteger readIndexCount = new AtomicInteger(); @@ -70,19 +146,40 @@ void testSubmitAfterCloseCompletesExceptionally() { Assertions.assertEquals(0, readIndexCount.get()); } - private static class NoOpTimeoutExecutor implements TimeoutExecutor { + private static class NoOpTimeoutExecutor extends CapturingTimeoutExecutor { + @Override + public void onTimeout( + TimeDuration timeout, CheckedRunnable task, Consumer errorHandler) { + } + + @Override + void runNext() { + } + } + + private static class CapturingTimeoutExecutor implements TimeoutExecutor { + private final List> tasks = new ArrayList<>(); + @Override public int getTaskCount() { - return 0; + return tasks.size(); } @Override public void onTimeout( TimeDuration timeout, CheckedRunnable task, Consumer errorHandler) { + tasks.add(task); } - @Override - public void onTimeout(TimeDuration timeout, CheckedRunnable task, Logger log, Supplier errorMessage) { + void runNext() throws Exception { + final CheckedRunnable task = tasks.remove(0); + try { + task.run(); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable t) { + throw new AssertionError(t); + } } } } From fa462cffbcd1215683c83886b2e0c320edd4b6d0 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 13 May 2026 09:57:43 +0800 Subject: [PATCH 09/12] RATIS-2524. Preserve ReadIndex batch interval precision --- .../apache/ratis/server/impl/ReadIndexBatching.java | 3 ++- .../ratis/server/impl/TestReadIndexBatching.java | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index 4e47f011d6..66e3edc96b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -23,6 +23,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; +import org.apache.ratis.util.TimeoutScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ class ReadIndexBatching { ReadIndexBatching(TimeDuration batchInterval, int batchSize, Function> readIndexAsyncImpl) { - this(TimeoutExecutor.getInstance(), batchInterval, batchSize, readIndexAsyncImpl); + this(TimeoutScheduler.getInstance(), batchInterval, batchSize, readIndexAsyncImpl); } ReadIndexBatching(TimeoutExecutor scheduler, TimeDuration batchInterval, int batchSize, diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java index fe4b095530..d138387d74 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java @@ -23,10 +23,12 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; +import org.apache.ratis.util.TimeoutScheduler; import org.apache.ratis.util.function.CheckedRunnable; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -36,6 +38,16 @@ import java.util.function.Consumer; class TestReadIndexBatching { + @Test + void testDefaultSchedulerPreservesSubMillisecondBatchInterval() throws Exception { + final ReadIndexBatching batching = new ReadIndexBatching( + TimeDuration.valueOf(500, TimeUnit.MICROSECONDS), 64, request -> new CompletableFuture<>()); + + final Field scheduler = ReadIndexBatching.class.getDeclaredField("scheduler"); + scheduler.setAccessible(true); + Assertions.assertTrue(scheduler.get(batching) instanceof TimeoutScheduler); + } + @Test void testBatchIntervalMustBePositive() { final RaftProperties properties = new RaftProperties(); From 0cbbcb064429f4bdab197d6aabfdc0de7fc1cb09 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 13 May 2026 17:44:04 +0800 Subject: [PATCH 10/12] RATIS-2524. Use subclass coverage for ReadIndex batching --- .../apache/ratis/LinearizableReadTests.java | 108 +----------------- ...estLinearizableReadIndexBatchWithGrpc.java | 26 +++++ 2 files changed, 32 insertions(+), 102 deletions(-) create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 4a5712c52d..4afa75fa8b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -30,7 +30,6 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; -import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedConsumer; @@ -43,7 +42,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; @@ -70,10 +68,15 @@ public abstract class LinearizableReadTests public abstract Type readIndexType(); + public boolean readIndexBatchEnabled() { + return false; + } + public final void assertRaftProperties(RaftProperties p) { assertOption(LINEARIZABLE, p); assertEquals(isLeaderLeaseEnabled(), RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p)); + assertEquals(readIndexBatchEnabled(), RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(p)); } protected void runWithNewCluster(CheckedConsumer testCase) throws Exception { @@ -90,7 +93,7 @@ public void setup() { RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); - RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(p, false); + RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(p, readIndexBatchEnabled()); // Enable dummy request so linearizable-read tests exercise the default ordered-async bootstrap path. RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true); @@ -212,105 +215,6 @@ static void runTestFollowerReadOnlyParallel(C cluste } } - @Test - public void testFollowerReadIndexBatching() throws Exception { - RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(getProperties(), true); - RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval( - getProperties(), TimeDuration.valueOf(1, TimeUnit.SECONDS)); - RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(getProperties(), 64); - - runWithReadIndexCounting(cluster -> { - final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); - final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); - - try (RaftClient leaderClient = cluster.createClient(leaderId); - RaftClient followerClient = cluster.createClient(followerId)) { - assertReplyExact(1, leaderClient.io().send(INCREMENT)); - - final int n = 8; - final List replies = new ArrayList<>(n); - for (int i = 0; i < n; i++) { - replies.add(new Reply(1, followerClient.async().sendReadOnly(QUERY, followerId))); - } - - for (Reply reply : replies) { - reply.assertAtLeast(); - } - } - }, count -> { - Assertions.assertTrue(count.get() > 0, () -> "ReadIndex count = " + count); - Assertions.assertTrue(count.get() < 8, () -> "ReadIndex count = " + count); - }); - } - - @Test - public void testFollowerReadAfterWriteBypassesReadIndexBatching() throws Exception { - RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(getProperties(), true); - RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval( - getProperties(), TimeDuration.valueOf(1, TimeUnit.SECONDS)); - RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(getProperties(), 64); - - final int n = 5; - runWithReadIndexCounting(cluster -> { - final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); - final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); - - try (RaftClient leaderClient = cluster.createClient(leaderId); - RaftClient followerClient = cluster.createClient(followerId)) { - assertReplyExact(1, leaderClient.io().send(INCREMENT)); - - final List replies = new ArrayList<>(n); - for (int i = 0; i < n; i++) { - replies.add(new Reply(1, followerClient.async().sendReadAfterWrite(QUERY))); - } - - for (Reply reply : replies) { - reply.assertAtLeast(); - } - } - }, count -> Assertions.assertEquals(n, count.get())); - } - - @Test - public void testFollowerReadIndexBatchingBecomesLeader() throws Exception { - RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(getProperties(), true); - RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval( - getProperties(), TimeDuration.valueOf(1, TimeUnit.SECONDS)); - RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(getProperties(), 64); - - runWithReadIndexCounting(cluster -> { - final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); - final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); - - try (RaftClient leaderClient = cluster.createClient(leaderId); - RaftClient followerClient = cluster.createClient(followerId)) { - assertReplyExact(1, leaderClient.io().send(INCREMENT)); - - final Reply reply = new Reply(1, followerClient.async().sendReadOnly(QUERY, followerId)); - final RaftClientReply transferReply = leaderClient.admin().transferLeadership(followerId, 20000); - Assertions.assertTrue(transferReply.isSuccess()); - Assertions.assertEquals(followerId, RaftTestUtil.waitForLeader(cluster).getId()); - - reply.assertAtLeast(); - } - }, count -> Assertions.assertEquals(0, count.get())); - } - - private void runWithReadIndexCounting(CheckedConsumer testCase, - CheckedConsumer assertion) throws Exception { - final AtomicInteger count = new AtomicInteger(); - CodeInjectionForTesting.put("RaftServerImpl.readIndexAsync", (localId, remoteId, args) -> { - count.incrementAndGet(); - return true; - }); - try { - runWithNewCluster(testCase); - assertion.accept(count); - } finally { - CodeInjectionForTesting.remove("RaftServerImpl.readIndexAsync"); - } - } - @Test public void testLinearizableReadFailWhenLeaderDown() throws Exception { runWithNewCluster(LinearizableReadTests::runTestLinearizableReadFailWhenLeaderDown); diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java new file mode 100644 index 0000000000..a19ac2fb3b --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchWithGrpc + extends TestLinearizableReadWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} From 194d16b2c2f0bb4c42af3cd084e8ab9fa1049cf2 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 13 May 2026 17:48:04 +0800 Subject: [PATCH 11/12] RATIS-2524. Cover ReadIndex batching variants --- ...xBatchAppliedIndexLeaderLeaseWithGrpc.java | 26 +++++++++++++++++++ ...bleReadIndexBatchAppliedIndexWithGrpc.java | 26 +++++++++++++++++++ ...ableReadIndexBatchLeaderLeaseWithGrpc.java | 26 +++++++++++++++++++ 3 files changed, 78 insertions(+) create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..663dec30bb --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc + extends TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java new file mode 100644 index 0000000000..8c2fe1c8f0 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchAppliedIndexWithGrpc + extends TestLinearizableReadAppliedIndexWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..5b90839cd5 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchLeaderLeaseWithGrpc + extends TestLinearizableLeaderLeaseReadWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} From 368015dfa71b5c14d1929da0cfe59f8c0bd91dda Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 15 May 2026 13:32:49 +0800 Subject: [PATCH 12/12] RATIS-2524. Use opportunistic ReadIndex batching --- .../src/site/markdown/configurations.md | 18 +- .../ratis/server/RaftServerConfigKeys.java | 10 - .../ratis/server/impl/RaftServerImpl.java | 11 +- .../ratis/server/impl/ReadIndexBatching.java | 176 +++++++++++------- .../server/impl/TestReadIndexBatching.java | 147 +++++++-------- 5 files changed, 198 insertions(+), 164 deletions(-) diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 7453d73825..5272d2914b 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -259,24 +259,18 @@ but there are tradeoffs (e.g. Write and Read performance) between different type | **Type** | boolean | | **Default** | false | -| **Property** | `raft.server.read.read-index.batch.interval` | -|:----------------|:----------------------------------------------------| -| **Description** | positive maximum time to collect reads into one ReadIndex batch | -| **Type** | TimeDuration | -| **Default** | 500us | - | **Property** | `raft.server.read.read-index.batch.size` | |:----------------|:----------------------------------------------------| -| **Description** | maximum number of reads in one ReadIndex batch | +| **Description** | maximum number of reads in one opportunistic ReadIndex batch | | **Type** | int | | **Default** | 64 | When ReadIndex batching is enabled, a follower batches plain linearizable read -requests and sends a single ReadIndex request for each sealed batch. A batch is -sealed when either `batch.interval` expires or `batch.size` is reached. New -reads after sealing are assigned to a later batch. Read-after-write requests -bypass batching so that the leader can evaluate each request's client-specific -write index. +requests opportunistically and sends a single ReadIndex request for the reads +already queued when the batch is drained. `batch.size` is a maximum cap, not a +target size; the follower does not wait to fill a batch. Read-after-write +requests bypass batching so that the leader can evaluate each request's +client-specific write index. | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index e100f3779d..15043bfa76 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -322,16 +322,6 @@ static void setEnabled(RaftProperties properties, boolean enabled) { setBoolean(properties::setBoolean, ENABLED_KEY, enabled); } - String BATCH_INTERVAL_KEY = PREFIX + ".interval"; - TimeDuration BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(500, TimeUnit.MICROSECONDS); - static TimeDuration batchInterval(RaftProperties properties) { - return getTimeDuration(properties.getTimeDuration(BATCH_INTERVAL_DEFAULT.getUnit()), - BATCH_INTERVAL_KEY, BATCH_INTERVAL_DEFAULT, getDefaultLog(), requirePositive()); - } - static void setBatchInterval(RaftProperties properties, TimeDuration interval) { - setTimeDuration(properties::setTimeDuration, BATCH_INTERVAL_KEY, interval, requirePositive()); - } - String BATCH_SIZE_KEY = PREFIX + ".size"; int BATCH_SIZE_DEFAULT = 64; static int batchSize(RaftProperties properties) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index edfe2e58bc..a1da0bfcd8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -281,11 +281,6 @@ public long[] getFollowerMatchIndices() { this.dataStreamMap = new DataStreamMapImpl(id); this.readOption = RaftServerConfigKeys.Read.option(properties); this.writeIndexCache = new WriteIndexCache(properties); - this.readIndexBatching = RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(properties) ? - new ReadIndexBatching( - RaftServerConfigKeys.Read.ReadIndex.Batch.batchInterval(properties), - RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties), - this::sendReadIndexAsyncImpl) : null; this.transactionManager = new TransactionManager(id); TraceUtils.setTracerWhenEnabled(properties); @@ -308,6 +303,11 @@ public long[] getFollowerMatchIndices() { RaftServerConfigKeys.ThreadPool.clientCached(properties), RaftServerConfigKeys.ThreadPool.clientSize(properties), id + "-client"); + this.readIndexBatching = RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(properties) ? + new ReadIndexBatching( + serverExecutor, + RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties), + this::sendReadIndexAsyncImpl) : null; this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); } @@ -1104,6 +1104,7 @@ ReadRequests getReadRequests() { private CompletableFuture sendReadIndexAsync(RaftClientRequest clientRequest) { if (readIndexBatching != null + && !role.getLeaderState().isPresent() && !clientRequest.getType().getRead().getReadAfterWriteConsistent()) { return readIndexBatching.submit(clientRequest); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java index 66e3edc96b..7167c3ce5b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -21,85 +21,92 @@ import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutExecutor; -import org.apache.ratis.util.TimeoutScheduler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -/** Batch follower-to-leader ReadIndex requests. */ +/** + * Opportunistically batch follower-to-leader ReadIndex requests. + * + *

The batch is drained on the server executor without waiting for a timer. {@code batchSize} + * is only a maximum drain cap, not a target size. + */ class ReadIndexBatching { - private static final Logger LOG = LoggerFactory.getLogger(ReadIndexBatching.class); - - private final TimeoutExecutor scheduler; - private final TimeDuration batchInterval; + private final Executor executor; private final int batchSize; private final Function> readIndexAsyncImpl; - /** Guarded by {@code this}; the monitor provides visibility, so volatile is not needed. */ - private Batch open; + /** Guarded by {@code this}. */ + private final Queue pending = new ArrayDeque<>(); + /** Guarded by {@code this}. */ + private final HashSet inFlight = new HashSet<>(); + /** Guarded by {@code this}; at most one drain task is scheduled or running. */ + private boolean drainScheduled; /** Guarded by {@code this}. */ private boolean closed; - ReadIndexBatching(TimeDuration batchInterval, int batchSize, + ReadIndexBatching(Executor executor, int batchSize, Function> readIndexAsyncImpl) { - this(TimeoutScheduler.getInstance(), batchInterval, batchSize, readIndexAsyncImpl); - } - - ReadIndexBatching(TimeoutExecutor scheduler, TimeDuration batchInterval, int batchSize, - Function> readIndexAsyncImpl) { - this.scheduler = scheduler; - this.batchInterval = batchInterval; + this.executor = executor; this.batchSize = batchSize; this.readIndexAsyncImpl = readIndexAsyncImpl; } CompletableFuture submit(RaftClientRequest request) { final CompletableFuture future = new CompletableFuture<>(); - final Batch batch; final boolean schedule; - final boolean seal; synchronized (this) { if (closed) { return JavaUtils.completeExceptionally(newClosedException()); } - schedule = open == null; + pending.add(new Pending(request, future)); + schedule = !drainScheduled; if (schedule) { - open = new Batch(); - } - batch = open; - batch.add(request, future); - seal = batch.size() >= batchSize; - if (seal) { - open = null; + drainScheduled = true; } } if (schedule) { - scheduler.onTimeout(batchInterval, () -> seal(batch), LOG, - () -> "Failed to seal ReadIndex batch"); - } - if (seal) { - batch.seal(readIndexAsyncImpl); + scheduleDrain(); } return future; } void close() { - final Batch batch; + close(newClosedException()); + } + + private void close(Throwable throwable) { + final List queued; + final List running; synchronized (this) { + if (closed) { + return; + } closed = true; - batch = open; - open = null; + drainScheduled = false; + queued = new ArrayList<>(pending); + pending.clear(); + running = new ArrayList<>(inFlight); + inFlight.clear(); } - if (batch != null) { - batch.cancel(newClosedException()); + queued.forEach(p -> p.future.completeExceptionally(throwable)); + running.forEach(batch -> batch.completeExceptionally(throwable)); + } + + private void scheduleDrain() { + try { + executor.execute(this::drain); + } catch (RejectedExecutionException e) { + close(new ReadIndexException("Failed to schedule ReadIndex batch drain.", e)); } } @@ -107,13 +114,51 @@ private static ReadIndexException newClosedException() { return new ReadIndexException("ReadIndex batching is closed."); } - private void seal(Batch batch) { + private void drain() { + final Batch batch; synchronized (this) { - if (open == batch) { - open = null; + if (closed || pending.isEmpty()) { + drainScheduled = false; + return; } + batch = pollBatch(); + inFlight.add(batch); + } + + batch.send(readIndexAsyncImpl, () -> onBatchDone(batch)); + + final boolean scheduleNext; + synchronized (this) { + if (closed) { + scheduleNext = false; + } else if (pending.isEmpty()) { + drainScheduled = false; + scheduleNext = false; + } else { + scheduleNext = true; + } + } + if (scheduleNext) { + scheduleDrain(); + } + } + + private Batch pollBatch() { + final List batch = new ArrayList<>(Math.min(batchSize, pending.size())); + for (int i = 0; i < batchSize; i++) { + final Pending next = pending.poll(); + if (next == null) { + break; + } + batch.add(next); + } + return new Batch(batch); + } + + private void onBatchDone(Batch batch) { + synchronized (this) { + inFlight.remove(batch); } - batch.seal(readIndexAsyncImpl); } private static class Pending { @@ -127,23 +172,19 @@ private static class Pending { } private static class Batch { - private final AtomicBoolean sealed = new AtomicBoolean(); - /** Appended only while this batch is {@code open}; sealing first removes it from {@code open}. */ - private final List pending = new ArrayList<>(); - - void add(RaftClientRequest request, CompletableFuture future) { - pending.add(new Pending(request, future)); - } + private final AtomicBoolean completed = new AtomicBoolean(); + private final List pending; - int size() { - return pending.size(); + Batch(List pending) { + this.pending = pending; } - void seal(Function> readIndexAsyncImpl) { - if (!sealed.compareAndSet(false, true)) { + void send(Function> readIndexAsyncImpl, + Runnable onComplete) { + if (pending.isEmpty()) { return; } - if (pending.isEmpty()) { + if (completed.get()) { return; } @@ -155,26 +196,33 @@ void seal(Function> re replyFuture = readIndexAsyncImpl.apply(pending.get(0).request); } catch (Throwable t) { completeExceptionally(t); + onComplete.run(); return; } replyFuture.whenComplete((reply, throwable) -> { - if (throwable != null) { - completeExceptionally(JavaUtils.unwrapCompletionException(throwable)); - } else { - pending.forEach(p -> p.future.complete(reply)); + try { + if (throwable != null) { + completeExceptionally(JavaUtils.unwrapCompletionException(throwable)); + } else { + complete(reply); + } + } finally { + onComplete.run(); } }); } - private void cancel(Throwable throwable) { - if (sealed.compareAndSet(false, true)) { - completeExceptionally(throwable); + private void complete(ReadIndexReplyProto reply) { + if (completed.compareAndSet(false, true)) { + pending.forEach(p -> p.future.complete(reply)); } } private void completeExceptionally(Throwable throwable) { - pending.forEach(p -> p.future.completeExceptionally(throwable)); + if (completed.compareAndSet(false, true)) { + pending.forEach(p -> p.future.completeExceptionally(throwable)); + } } } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java index d138387d74..8ccdf4e92c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java @@ -21,101 +21,94 @@ import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutExecutor; -import org.apache.ratis.util.TimeoutScheduler; -import org.apache.ratis.util.function.CheckedRunnable; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; class TestReadIndexBatching { @Test - void testDefaultSchedulerPreservesSubMillisecondBatchInterval() throws Exception { - final ReadIndexBatching batching = new ReadIndexBatching( - TimeDuration.valueOf(500, TimeUnit.MICROSECONDS), 64, request -> new CompletableFuture<>()); - - final Field scheduler = ReadIndexBatching.class.getDeclaredField("scheduler"); - scheduler.setAccessible(true); - Assertions.assertTrue(scheduler.get(batching) instanceof TimeoutScheduler); - } - - @Test - void testBatchIntervalMustBePositive() { + void testBatchSizeMustBePositive() { final RaftProperties properties = new RaftProperties(); - final TimeDuration zero = TimeDuration.valueOf(0, TimeUnit.MICROSECONDS); Assertions.assertThrows(IllegalArgumentException.class, - () -> RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchInterval(properties, zero)); + () -> RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(properties, 0)); - properties.setTimeDuration(RaftServerConfigKeys.Read.ReadIndex.Batch.BATCH_INTERVAL_KEY, zero); + properties.setInt(RaftServerConfigKeys.Read.ReadIndex.Batch.BATCH_SIZE_KEY, 0); Assertions.assertThrows(IllegalArgumentException.class, - () -> RaftServerConfigKeys.Read.ReadIndex.Batch.batchInterval(properties)); + () -> RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties)); } @Test - void testBatchSizeSealsImmediately() throws Exception { - final CapturingTimeoutExecutor scheduler = new CapturingTimeoutExecutor(); + void testSubmitSchedulesOneOpportunisticDrain() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); final AtomicInteger readIndexCount = new AtomicInteger(); final ReadIndexReplyProto readIndexReply = ReadIndexReplyProto.getDefaultInstance(); final ReadIndexBatching batching = new ReadIndexBatching( - scheduler, TimeDuration.valueOf(1, TimeUnit.DAYS), 2, request -> { + executor, 64, request -> { readIndexCount.incrementAndGet(); return CompletableFuture.completedFuture(readIndexReply); }); final CompletableFuture first = batching.submit(null); Assertions.assertFalse(first.isDone()); - Assertions.assertEquals(1, scheduler.getTaskCount()); + Assertions.assertEquals(1, executor.getTaskCount()); final CompletableFuture second = batching.submit(null); + Assertions.assertFalse(second.isDone()); + Assertions.assertEquals(1, executor.getTaskCount()); + + executor.runNext(); Assertions.assertEquals(1, readIndexCount.get()); Assertions.assertSame(readIndexReply, first.get()); Assertions.assertSame(readIndexReply, second.get()); - - scheduler.runNext(); - Assertions.assertEquals(1, readIndexCount.get()); } @Test - void testBatchIntervalSealsOpenBatch() throws Exception { - final CapturingTimeoutExecutor scheduler = new CapturingTimeoutExecutor(); + void testBatchSizeCapsEachDrain() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); final AtomicInteger readIndexCount = new AtomicInteger(); final ReadIndexReplyProto readIndexReply = ReadIndexReplyProto.getDefaultInstance(); final ReadIndexBatching batching = new ReadIndexBatching( - scheduler, TimeDuration.valueOf(1, TimeUnit.DAYS), 64, request -> { + executor, 2, request -> { readIndexCount.incrementAndGet(); return CompletableFuture.completedFuture(readIndexReply); }); - final CompletableFuture reply = batching.submit(null); - Assertions.assertFalse(reply.isDone()); - Assertions.assertEquals(1, scheduler.getTaskCount()); + final CompletableFuture first = batching.submit(null); + final CompletableFuture second = batching.submit(null); + final CompletableFuture third = batching.submit(null); - scheduler.runNext(); + executor.runNext(); Assertions.assertEquals(1, readIndexCount.get()); - Assertions.assertSame(readIndexReply, reply.get()); + Assertions.assertSame(readIndexReply, first.get()); + Assertions.assertSame(readIndexReply, second.get()); + Assertions.assertFalse(third.isDone()); + Assertions.assertEquals(1, executor.getTaskCount()); + + executor.runNext(); + Assertions.assertEquals(2, readIndexCount.get()); + Assertions.assertSame(readIndexReply, third.get()); } @Test - void testReadIndexFailureCompletesAllBatchFuturesExceptionally() throws Exception { + void testReadIndexFailureCompletesBatchFuturesExceptionally() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); final RuntimeException failure = new RuntimeException("read index failed"); final CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(failure); - final ReadIndexBatching batching = new ReadIndexBatching( - new NoOpTimeoutExecutor(), TimeDuration.valueOf(1, TimeUnit.DAYS), 2, request -> failed); + final ReadIndexBatching batching = new ReadIndexBatching(executor, 64, request -> failed); final CompletableFuture first = batching.submit(null); final CompletableFuture second = batching.submit(null); + executor.runNext(); Assertions.assertSame(failure, Assertions.assertThrows(ExecutionException.class, first::get).getCause()); Assertions.assertSame(failure, @@ -123,29 +116,52 @@ void testReadIndexFailureCompletesAllBatchFuturesExceptionally() throws Exceptio } @Test - void testCloseCompletesOpenBatchExceptionally() throws Exception { + void testCloseCompletesQueuedAndInFlightBatchesExceptionally() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); final AtomicInteger readIndexCount = new AtomicInteger(); + final CompletableFuture readIndexFuture = new CompletableFuture<>(); final ReadIndexBatching batching = new ReadIndexBatching( - new NoOpTimeoutExecutor(), TimeDuration.valueOf(1, TimeUnit.DAYS), 64, request -> { + executor, 1, request -> { readIndexCount.incrementAndGet(); - return new CompletableFuture(); + return readIndexFuture; }); - final CompletableFuture reply = batching.submit(null); - Assertions.assertFalse(reply.isDone()); + final CompletableFuture inFlight = batching.submit(null); + final CompletableFuture queued = batching.submit(null); + executor.runNext(); + + Assertions.assertEquals(1, readIndexCount.get()); + Assertions.assertFalse(inFlight.isDone()); + Assertions.assertFalse(queued.isDone()); batching.close(); - final ExecutionException e = Assertions.assertThrows(ExecutionException.class, reply::get); - Assertions.assertTrue(e.getCause() instanceof ReadIndexException); - Assertions.assertEquals(0, readIndexCount.get()); + assertReadIndexException(inFlight); + assertReadIndexException(queued); + + readIndexFuture.complete(ReadIndexReplyProto.getDefaultInstance()); + assertReadIndexException(inFlight); + } + + @Test + void testScheduleFailureClosesBatching() throws Exception { + final ReadIndexBatching batching = new ReadIndexBatching( + command -> { + throw new RejectedExecutionException("closed"); + }, 64, request -> CompletableFuture.completedFuture(ReadIndexReplyProto.getDefaultInstance())); + + final CompletableFuture rejected = batching.submit(null); + assertReadIndexException(rejected); + + final CompletableFuture afterClose = batching.submit(null); + assertReadIndexException(afterClose); } @Test void testSubmitAfterCloseCompletesExceptionally() { final AtomicInteger readIndexCount = new AtomicInteger(); final ReadIndexBatching batching = new ReadIndexBatching( - new NoOpTimeoutExecutor(), TimeDuration.valueOf(1, TimeUnit.DAYS), 64, request -> { + Runnable::run, 64, request -> { readIndexCount.incrementAndGet(); return new CompletableFuture(); }); @@ -158,40 +174,25 @@ void testSubmitAfterCloseCompletesExceptionally() { Assertions.assertEquals(0, readIndexCount.get()); } - private static class NoOpTimeoutExecutor extends CapturingTimeoutExecutor { - @Override - public void onTimeout( - TimeDuration timeout, CheckedRunnable task, Consumer errorHandler) { - } - - @Override - void runNext() { - } + private static void assertReadIndexException(CompletableFuture future) throws Exception { + final ExecutionException e = Assertions.assertThrows(ExecutionException.class, future::get); + Assertions.assertTrue(e.getCause() instanceof ReadIndexException); } - private static class CapturingTimeoutExecutor implements TimeoutExecutor { - private final List> tasks = new ArrayList<>(); + private static class CapturingExecutor implements Executor { + private final List tasks = new ArrayList<>(); - @Override public int getTaskCount() { return tasks.size(); } @Override - public void onTimeout( - TimeDuration timeout, CheckedRunnable task, Consumer errorHandler) { - tasks.add(task); + public void execute(Runnable command) { + tasks.add(command); } - void runNext() throws Exception { - final CheckedRunnable task = tasks.remove(0); - try { - task.run(); - } catch (RuntimeException | Error e) { - throw e; - } catch (Throwable t) { - throw new AssertionError(t); - } + void runNext() { + tasks.remove(0).run(); } } }