diff --git a/bom/pom.xml b/bom/pom.xml
index bf81760c1..68eba6944 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -15,7 +15,7 @@
1.0.0
- 1.7.3
+ 1.7.4
2.2.0
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
index 88e9f2cda..adaed55ed 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
@@ -230,7 +230,8 @@ public void startAndInitialize() {
start(this::processMessage).whenComplete(this::closeDueToError);
YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest
- .newBuilder();
+ .newBuilder()
+ .setPartitionMaxInFlightBytes(settings.getPartitionMaxInFlightBytes());
if (settings.getConsumerName() != null) {
initRequestBuilder.setConsumer(settings.getConsumerName());
}
@@ -254,7 +255,6 @@ public void startAndInitialize() {
if (readerName != null && !readerName.isEmpty()) {
initRequestBuilder.setReaderName(readerName);
}
-
send(YdbTopic.StreamReadMessage.FromClient.newBuilder()
.setInitRequest(initRequestBuilder)
.build());
diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
index 772d83702..175ef1667 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
@@ -10,6 +10,7 @@
import com.google.common.collect.ImmutableList;
import tech.ydb.core.Status;
+import tech.ydb.topic.read.events.DataReceivedEvent;
/**
* @author Nikolay Perfilov
@@ -23,6 +24,7 @@ public class ReaderSettings {
private final List topics;
private final long maxMemoryUsageBytes;
private final int maxBatchSize;
+ private final long partitionMaxInFlightBytes;
private final Executor decompressionExecutor;
private final BiConsumer errorsHandler;
@@ -33,6 +35,7 @@ private ReaderSettings(Builder builder) {
this.topics = ImmutableList.copyOf(builder.topics);
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
this.maxBatchSize = builder.maxBatchSize;
+ this.partitionMaxInFlightBytes = builder.partitionMaxInFlightBytes;
this.decompressionExecutor = builder.decompressionExecutor;
this.errorsHandler = builder.errorsHandler;
}
@@ -62,6 +65,10 @@ public long getMaxMemoryUsageBytes() {
return maxMemoryUsageBytes;
}
+ public long getPartitionMaxInFlightBytes() {
+ return partitionMaxInFlightBytes;
+ }
+
public int getMaxBatchSize() {
return maxBatchSize;
}
@@ -84,6 +91,7 @@ public static class Builder {
private String readerName = null;
private List topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
+ private long partitionMaxInFlightBytes = 0;
private int maxBatchSize = 0;
private Executor decompressionExecutor = null;
private BiConsumer errorsHandler = null;
@@ -118,6 +126,7 @@ public Builder withoutConsumer() {
/**
* Set reader name for debug purposes
+ * @param readerName custom reader name
* @return settings builder
*/
public Builder setReaderName(String readerName) {
@@ -140,11 +149,30 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
return this;
}
+ /**
+ * Set the maximum count of messages for the one {@link DataReceivedEvent }. Default value is {@code 0} that
+ * means no-limit mode, every next {@link DataReceivedEvent } will have all messages available to read
+ *
+ * @param maxBatchSize maximum count of messages in one event
+ * @return settings builder
+ */
public Builder setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}
+ /**
+ * Configure limit for messages allowed to read to internal buffer from one partition.
+ * Default value is {@code 0} that turns off this limit.
+ *
+ * @param maxInFlightBytes maximum size of messages in bytes
+ * @return settings builder
+ */
+ public Builder setPartitionMaxInFlightBytes(long maxInFlightBytes) {
+ this.partitionMaxInFlightBytes = maxInFlightBytes;
+ return this;
+ }
+
public Builder setErrorsHandler(BiConsumer handler) {
this.errorsHandler = handler;
return this;