Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<ydb-auth-api.version>1.0.0</ydb-auth-api.version>
<ydb-proto-api.version>1.7.3</ydb-proto-api.version>
<ydb-proto-api.version>1.7.4</ydb-proto-api.version>
<yc-auth.version>2.2.0</yc-auth.version>
</properties>

Expand Down
4 changes: 2 additions & 2 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public void startAndInitialize() {
start(this::processMessage).whenComplete(this::closeDueToError);

YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest
.newBuilder();
.newBuilder()
.setPartitionMaxInFlightBytes(settings.getPartitionMaxInFlightBytes());
if (settings.getConsumerName() != null) {
initRequestBuilder.setConsumer(settings.getConsumerName());
}
Expand All @@ -254,7 +255,6 @@ public void startAndInitialize() {
if (readerName != null && !readerName.isEmpty()) {
initRequestBuilder.setReaderName(readerName);
}

send(YdbTopic.StreamReadMessage.FromClient.newBuilder()
.setInitRequest(initRequestBuilder)
.build());
Expand Down
28 changes: 28 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.collect.ImmutableList;

import tech.ydb.core.Status;
import tech.ydb.topic.read.events.DataReceivedEvent;

/**
* @author Nikolay Perfilov
Expand All @@ -23,6 +24,7 @@ public class ReaderSettings {
private final List<TopicReadSettings> topics;
private final long maxMemoryUsageBytes;
private final int maxBatchSize;
private final long partitionMaxInFlightBytes;
private final Executor decompressionExecutor;
private final BiConsumer<Status, Throwable> errorsHandler;

Expand All @@ -33,6 +35,7 @@ private ReaderSettings(Builder builder) {
this.topics = ImmutableList.copyOf(builder.topics);
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
this.maxBatchSize = builder.maxBatchSize;
this.partitionMaxInFlightBytes = builder.partitionMaxInFlightBytes;
this.decompressionExecutor = builder.decompressionExecutor;
this.errorsHandler = builder.errorsHandler;
}
Expand Down Expand Up @@ -62,6 +65,10 @@ public long getMaxMemoryUsageBytes() {
return maxMemoryUsageBytes;
}

public long getPartitionMaxInFlightBytes() {
return partitionMaxInFlightBytes;
}

public int getMaxBatchSize() {
return maxBatchSize;
}
Expand All @@ -84,6 +91,7 @@ public static class Builder {
private String readerName = null;
private List<TopicReadSettings> topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private long partitionMaxInFlightBytes = 0;
private int maxBatchSize = 0;
private Executor decompressionExecutor = null;
private BiConsumer<Status, Throwable> errorsHandler = null;
Expand Down Expand Up @@ -118,6 +126,7 @@ public Builder withoutConsumer() {

/**
* Set reader name for debug purposes
* @param readerName custom reader name
* @return settings builder
*/
public Builder setReaderName(String readerName) {
Expand All @@ -140,11 +149,30 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
return this;
}

/**
* Set the maximum count of messages for the one {@link DataReceivedEvent }. Default value is {@code 0} that
* means no-limit mode, every next {@link DataReceivedEvent } will have all messages available to read
*
* @param maxBatchSize maximum count of messages in one event
* @return settings builder
*/
public Builder setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* Configure limit for messages allowed to read to internal buffer from one partition.
* Default value is {@code 0} that turns off this limit.
*
* @param maxInFlightBytes maximum size of messages in bytes
* @return settings builder
*/
public Builder setPartitionMaxInFlightBytes(long maxInFlightBytes) {
this.partitionMaxInFlightBytes = maxInFlightBytes;
return this;
}

public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
this.errorsHandler = handler;
return this;
Expand Down
Loading