From 39f64b7c70b71fa74924d5f48e14705c16d8de8c Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 31 Mar 2026 14:54:39 +0100 Subject: [PATCH] Added support of partition_max_in_flight_bytes --- bom/pom.xml | 2 +- .../tech/ydb/topic/read/impl/ReaderImpl.java | 4 +-- .../ydb/topic/settings/ReaderSettings.java | 28 +++++++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/bom/pom.xml b/bom/pom.xml index bf81760c1..68eba6944 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -15,7 +15,7 @@ 1.0.0 - 1.7.3 + 1.7.4 2.2.0 diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 88e9f2cda..adaed55ed 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -230,7 +230,8 @@ public void startAndInitialize() { start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest - .newBuilder(); + .newBuilder() + .setPartitionMaxInFlightBytes(settings.getPartitionMaxInFlightBytes()); if (settings.getConsumerName() != null) { initRequestBuilder.setConsumer(settings.getConsumerName()); } @@ -254,7 +255,6 @@ public void startAndInitialize() { if (readerName != null && !readerName.isEmpty()) { initRequestBuilder.setReaderName(readerName); } - send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setInitRequest(initRequestBuilder) .build()); diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index 772d83702..175ef1667 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableList; import tech.ydb.core.Status; +import tech.ydb.topic.read.events.DataReceivedEvent; /** * @author Nikolay Perfilov @@ -23,6 +24,7 @@ public class ReaderSettings { private final List topics; private final long maxMemoryUsageBytes; private final int maxBatchSize; + private final long partitionMaxInFlightBytes; private final Executor decompressionExecutor; private final BiConsumer errorsHandler; @@ -33,6 +35,7 @@ private ReaderSettings(Builder builder) { this.topics = ImmutableList.copyOf(builder.topics); this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes; this.maxBatchSize = builder.maxBatchSize; + this.partitionMaxInFlightBytes = builder.partitionMaxInFlightBytes; this.decompressionExecutor = builder.decompressionExecutor; this.errorsHandler = builder.errorsHandler; } @@ -62,6 +65,10 @@ public long getMaxMemoryUsageBytes() { return maxMemoryUsageBytes; } + public long getPartitionMaxInFlightBytes() { + return partitionMaxInFlightBytes; + } + public int getMaxBatchSize() { return maxBatchSize; } @@ -84,6 +91,7 @@ public static class Builder { private String readerName = null; private List topics = new ArrayList<>(); private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; + private long partitionMaxInFlightBytes = 0; private int maxBatchSize = 0; private Executor decompressionExecutor = null; private BiConsumer errorsHandler = null; @@ -118,6 +126,7 @@ public Builder withoutConsumer() { /** * Set reader name for debug purposes + * @param readerName custom reader name * @return settings builder */ public Builder setReaderName(String readerName) { @@ -140,11 +149,30 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) { return this; } + /** + * Set the maximum count of messages for the one {@link DataReceivedEvent }. Default value is {@code 0} that + * means no-limit mode, every next {@link DataReceivedEvent } will have all messages available to read + * + * @param maxBatchSize maximum count of messages in one event + * @return settings builder + */ public Builder setMaxBatchSize(int maxBatchSize) { this.maxBatchSize = maxBatchSize; return this; } + /** + * Configure limit for messages allowed to read to internal buffer from one partition. + * Default value is {@code 0} that turns off this limit. + * + * @param maxInFlightBytes maximum size of messages in bytes + * @return settings builder + */ + public Builder setPartitionMaxInFlightBytes(long maxInFlightBytes) { + this.partitionMaxInFlightBytes = maxInFlightBytes; + return this; + } + public Builder setErrorsHandler(BiConsumer handler) { this.errorsHandler = handler; return this;