diff --git a/CHANGELOG.md b/CHANGELOG.md
index a923d54fe..4b00b34eb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,11 @@
* Query: Added support of ApacheArrow for query execution
* Tests: Updated testcontainers to 2.0.3
+## 2.3.32 ##
+* Topic: Added support of availabilityPeriod to AlterConsumerSettings
+* Topic: Added partitionMaxInFlightBytes option to ReaderSettings
+* Coordination: Fixed NPE on session stoping before connect
+
## 2.3.31 ##
* Topic: Added support of availabilityPeriod to topic consumer
* Topic: Added preferReady option to topic's control plane methods
diff --git a/bom/pom.xml b/bom/pom.xml
index 2a362a192..4086f4a13 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -15,7 +15,7 @@
1.0.0
- 1.9.3
+ 1.9.4
2.3.1
diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java
index d38f670ad..c952fc3ed 100644
--- a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java
+++ b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java
@@ -111,6 +111,7 @@ public CompletableFuture connect() {
final Stream stream = new Stream(rpc);
if (!updateState(local, makeConnectionState(local, stream))) {
logger.warn("{} cannot be connected with state {}", this, local.getState());
+ stream.cancelStream();
return CompletableFuture.completedFuture(Status.of(StatusCode.BAD_REQUEST));
}
@@ -120,8 +121,8 @@ public CompletableFuture connect() {
}
private CompletableFuture> connectToSession(Stream stream, long sessionID) {
- // start new stream
- stream.startStream().whenCompleteAsync((status, th) -> {
+ // attach completion handler to the stream
+ stream.getFinishedFuture().whenCompleteAsync((status, th) -> {
// this handler is executed when stream finishes
// we have some action to do here
diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java
index 38018f0ef..a927913ae 100644
--- a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java
+++ b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java
@@ -27,7 +27,7 @@
*
* @author Aleksandr Gorshenin
*/
-class Stream implements GrpcReadWriteStream.Observer {
+class Stream {
private static final int SHUTDOWN_TIMEOUT_MS = 1000;
private static final Logger logger = LoggerFactory.getLogger(Stream.class);
@@ -43,10 +43,7 @@ class Stream implements GrpcReadWriteStream.Observer {
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder()
.disableDeadline()
.build());
- }
-
- public CompletableFuture startStream() {
- stream.start(this).whenComplete((status, th) -> {
+ stream.start(this::onNext).whenComplete((status, th) -> {
if (th != null) {
startFuture.completeExceptionally(th);
stopFuture.completeExceptionally(th);
@@ -56,7 +53,9 @@ public CompletableFuture startStream() {
stopFuture.complete(status);
}
});
+ }
+ public CompletableFuture getFinishedFuture() {
return stopFuture;
}
@@ -93,7 +92,6 @@ public CompletableFuture stop() {
SessionRequest.SessionStop.newBuilder().build()
).build();
-
logger.trace("stream {} send session stop msg", hashCode());
stream.sendNext(stopMsg);
@@ -121,7 +119,6 @@ public void sendMsg(long requestId, StreamMsg> msg) {
}
}
- @Override
public void onNext(SessionResponse resp) {
if (resp.hasFailure()) {
onFail(resp.getFailure());
diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java b/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java
index e00c066c0..3b57c1246 100644
--- a/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java
+++ b/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java
@@ -52,11 +52,19 @@ public CompletableFuture start(Observer observer) {
@Override
public void sendNext(SessionRequest message) {
+ // Emulate GrpcReadWriteStream behaviour
+ if (observer == null) {
+ throw new NullPointerException("send message before start");
+ }
requests.offer(message);
}
@Override
public void close() {
+ // Emulate GrpcReadWriteStream behaviour
+ if (observer == null) {
+ throw new NullPointerException("close stream before start");
+ }
isClosed = true;
}
diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java
new file mode 100644
index 000000000..eda1ce169
--- /dev/null
+++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java
@@ -0,0 +1,40 @@
+package tech.ydb.coordination.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import tech.ydb.core.Issue;
+import tech.ydb.core.Status;
+import tech.ydb.core.StatusCode;
+import tech.ydb.test.junit4.GrpcTransportRule;
+
+/**
+ *
+ * @author Aleksandr Gorshenin
+ */
+public class StreamIntegrationTest {
+ @ClassRule
+ public static final GrpcTransportRule YDB_TRANSPORT = new GrpcTransportRule();
+
+ private static final Rpc RPC = new RpcImpl(YDB_TRANSPORT);
+
+ @Rule
+ public final Timeout testTimeoutRule = new Timeout(10, TimeUnit.SECONDS);
+
+ @Test
+ public void stopBeforeStartTest() {
+ Stream stream = new Stream(RPC);
+ Status stopped = stream.stop().join();
+
+ Assert.assertEquals(StatusCode.CLIENT_GRPC_ERROR, stopped.getCode());
+ Assert.assertEquals(1, stopped.getIssues().length);
+ Issue issue = stopped.getIssues()[0];
+ Assert.assertTrue(issue.getMessage().startsWith("gRPC error: (INVALID_ARGUMENT) on"));
+ Assert.assertTrue(issue.getMessage().endsWith("First message must be a SessionStart"));
+ }
+}
diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java
index 73a0a9247..65206c30f 100644
--- a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java
+++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java
@@ -53,7 +53,7 @@ public void baseConnectTest() {
Assert.assertFalse(grpc.isCanceled());
Assert.assertFalse(grpc.hasNextRequest());
- CompletableFuture finished = stream.startStream();
+ CompletableFuture finished = stream.getFinishedFuture();
CompletableFuture> connected = stream.sendSessionStart(0, "demo", Duration.ZERO, ByteString.EMPTY);
Assert.assertFalse(grpc.isClosed());
diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java
index 2cdcba969..e818e1144 100644
--- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java
+++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java
@@ -116,7 +116,7 @@ private void nextRequest(int count) {
@Override
public void onMessage(RespT message) {
- try (Scope ignored = callSpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = callSpan.makeCurrent()) {
try {
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
diff --git a/pom.xml b/pom.xml
index 3ef8c961b..904fb7077 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,8 @@
-Xlint:-processing
-Xlint:-options
+
+ -Xlint:-try
diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java
index 1b5197567..520926d4a 100644
--- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java
+++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java
@@ -174,7 +174,7 @@ public void run() {
}
public void requestSession() {
- try (Scope ignored = executeSpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = executeSpan.makeCurrent()) {
retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL);
}
CompletableFuture> sessionFuture = createSessionWithRetrySpanParent();
@@ -201,10 +201,10 @@ private void acceptSession(@Nonnull Result sessionResult) {
final QuerySession session = sessionResult.getValue();
try {
- try (Scope ignored = retrySpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
fn.apply(session).whenComplete((fnResult, fnException) -> {
try {
- try (Scope ignored1 = retrySpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored1 = retrySpan.makeCurrent()) {
session.close();
if (fnException != null) {
@@ -282,7 +282,7 @@ private void handleException(@Nonnull Throwable ex) {
}
private CompletableFuture> createSessionWithRetrySpanParent() {
- try (Scope ignored = retrySpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
return queryClient.createSession(sessionCreationTimeout);
}
}
diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java
index a2139c8c8..dc82601c9 100644
--- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java
+++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java
@@ -191,7 +191,7 @@ public void run() {
}
public void requestSession() {
- try (Scope ignored = executeSpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = executeSpan.makeCurrent()) {
retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL);
}
CompletableFuture> sessionFuture = createSessionWithRetrySpanParent();
@@ -218,10 +218,10 @@ private void acceptSession(@Nonnull Result sessionResult) {
final Session session = sessionResult.getValue();
try {
- try (Scope ignored = retrySpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
fn.apply(session).whenComplete((fnResult, fnException) -> {
try {
- try (Scope ignored1 = retrySpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored1 = retrySpan.makeCurrent()) {
session.close();
if (fnException != null) {
@@ -307,7 +307,7 @@ private void handleException(@Nonnull Throwable ex) {
}
private CompletableFuture> createSessionWithRetrySpanParent() {
- try (Scope ignored = retrySpan.makeCurrent()) {
+ try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
return sessionSupplier.createSession(sessionCreationTimeout);
}
}
diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
index fdb88314c..6c84a610c 100644
--- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
@@ -250,6 +250,11 @@ public CompletableFuture alterTopic(String path, AlterTopicSettings sett
alterConsumerBuilder.setSetSupportedCodecs(toProto(consumerSupportedCodecs));
}
+ Duration availabilityPeriod = alterConsumer.getAvailabilityPeriod();
+ if (availabilityPeriod != null) {
+ alterConsumerBuilder.setSetAvailabilityPeriod(ProtobufUtils.durationToProto(availabilityPeriod));
+ }
+
Map consumerAttributes = alterConsumer.getAlterAttributes();
if (!consumerAttributes.isEmpty()) {
alterConsumerBuilder.putAllAlterAttributes(consumerAttributes);
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
index 27dd5f855..cdae4193f 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
@@ -514,6 +514,8 @@ private static YdbTopic.StreamReadMessage.InitRequest buildInitRequest(ReaderSet
List topics = settings.getTopics();
YdbTopic.StreamReadMessage.InitRequest.Builder builder = YdbTopic.StreamReadMessage.InitRequest.newBuilder();
+
+ builder.setPartitionMaxInFlightBytes(settings.getPartitionMaxInFlightBytes());
if (consumerName != null && !consumerName.isEmpty()) {
builder.setConsumer(consumerName);
}
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
index de89b1724..9645d2b7a 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
@@ -136,7 +136,6 @@ private class ReadSessionFactory {
this.settings = settings;
this.decompressor = decompressor;
this.codecRegistry = codecRegistry;
-
}
public ReadSession createNextSession() {
diff --git a/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java b/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java
index 3688844b1..81435e926 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java
@@ -1,5 +1,6 @@
package tech.ydb.topic.settings;
+import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
@@ -9,6 +10,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.SupportedCodecs;
/**
@@ -22,6 +24,9 @@ public class AlterConsumerSettings {
private final Instant readFrom;
@Nullable
private final SupportedCodecs supportedCodecs;
+ @Nullable
+ private final Duration availabilityPeriod;
+
private final Map alterAttributes;
private final Set dropAttributes;
@@ -32,6 +37,7 @@ private AlterConsumerSettings(Builder builder) {
this.supportedCodecs = builder.supportedCodecs;
this.alterAttributes = builder.alterAttributes;
this.dropAttributes = builder.dropAttributes;
+ this.availabilityPeriod = builder.availabilityPeriod;
}
public static Builder newBuilder() {
@@ -57,6 +63,11 @@ public SupportedCodecs getSupportedCodecs() {
return supportedCodecs;
}
+ @Nullable
+ public Duration getAvailabilityPeriod() {
+ return availabilityPeriod;
+ }
+
public Map getAlterAttributes() {
return alterAttributes;
}
@@ -73,6 +84,7 @@ public static class Builder {
private Boolean important = null;
private Instant readFrom = null;
private SupportedCodecs supportedCodecs = null;
+ private Duration availabilityPeriod = null;
private Map alterAttributes = new HashMap<>();
private Set dropAttributes = new HashSet<>();
@@ -96,6 +108,20 @@ public Builder setImportant(boolean important) {
return this;
}
+ /**
+ * Configure availabilityPeriod for this consumer.
+ *
+ * Option availabilityPeriod is not compatible with important option
+ *
+ * @see Consumer#getAvailabilityPeriod()
+ * @param period - availability period value
+ * @return settings builder
+ */
+ public Builder setAvailabilityPeriod(Duration period) {
+ this.availabilityPeriod = period;
+ return this;
+ }
+
/**
* @param readFrom Time to read from. All messages with smaller server written_at timestamp will be skipped.
* @return settings builder
diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
index 772d83702..175ef1667 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
@@ -10,6 +10,7 @@
import com.google.common.collect.ImmutableList;
import tech.ydb.core.Status;
+import tech.ydb.topic.read.events.DataReceivedEvent;
/**
* @author Nikolay Perfilov
@@ -23,6 +24,7 @@ public class ReaderSettings {
private final List topics;
private final long maxMemoryUsageBytes;
private final int maxBatchSize;
+ private final long partitionMaxInFlightBytes;
private final Executor decompressionExecutor;
private final BiConsumer errorsHandler;
@@ -33,6 +35,7 @@ private ReaderSettings(Builder builder) {
this.topics = ImmutableList.copyOf(builder.topics);
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
this.maxBatchSize = builder.maxBatchSize;
+ this.partitionMaxInFlightBytes = builder.partitionMaxInFlightBytes;
this.decompressionExecutor = builder.decompressionExecutor;
this.errorsHandler = builder.errorsHandler;
}
@@ -62,6 +65,10 @@ public long getMaxMemoryUsageBytes() {
return maxMemoryUsageBytes;
}
+ public long getPartitionMaxInFlightBytes() {
+ return partitionMaxInFlightBytes;
+ }
+
public int getMaxBatchSize() {
return maxBatchSize;
}
@@ -84,6 +91,7 @@ public static class Builder {
private String readerName = null;
private List topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
+ private long partitionMaxInFlightBytes = 0;
private int maxBatchSize = 0;
private Executor decompressionExecutor = null;
private BiConsumer errorsHandler = null;
@@ -118,6 +126,7 @@ public Builder withoutConsumer() {
/**
* Set reader name for debug purposes
+ * @param readerName custom reader name
* @return settings builder
*/
public Builder setReaderName(String readerName) {
@@ -140,11 +149,30 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
return this;
}
+ /**
+ * Set the maximum count of messages for the one {@link DataReceivedEvent }. Default value is {@code 0} that
+ * means no-limit mode, every next {@link DataReceivedEvent } will have all messages available to read
+ *
+ * @param maxBatchSize maximum count of messages in one event
+ * @return settings builder
+ */
public Builder setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}
+ /**
+ * Configure limit for messages allowed to read to internal buffer from one partition.
+ * Default value is {@code 0} that turns off this limit.
+ *
+ * @param maxInFlightBytes maximum size of messages in bytes
+ * @return settings builder
+ */
+ public Builder setPartitionMaxInFlightBytes(long maxInFlightBytes) {
+ this.partitionMaxInFlightBytes = maxInFlightBytes;
+ return this;
+ }
+
public Builder setErrorsHandler(BiConsumer handler) {
this.errorsHandler = handler;
return this;
diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
index 7f9daadba..3ff0583bf 100644
--- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
+++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
@@ -1,6 +1,7 @@
package tech.ydb.topic.impl;
import java.time.Duration;
+import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -25,6 +26,7 @@
import tech.ydb.test.junit4.GrpcTransportRule;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Consumer;
+import tech.ydb.topic.description.ConsumerDescription;
import tech.ydb.topic.description.PartitionInfo;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.read.AsyncReader;
@@ -33,6 +35,7 @@
import tech.ydb.topic.read.events.AbstractReadEventHandler;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings;
+import tech.ydb.topic.settings.AlterConsumerSettings;
import tech.ydb.topic.settings.AlterPartitioningSettings;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.topic.settings.AutoPartitioningStrategy;
@@ -334,10 +337,10 @@ public void step09_describeTopicStats() {
for (PartitionInfo partition: withStats.getPartitions()) {
Assert.assertNotNull(partition.getPartitionStats());
}
- }
+ }
@Test
- public void step10_invalidConsumerTest() {
+ public void step10_invalidAddConsumerTest() {
AlterTopicSettings settings = AlterTopicSettings.newBuilder()
.addAddConsumer(Consumer.newBuilder()
.setName("WRONG_CONSUMER")
@@ -350,5 +353,41 @@ public void step10_invalidConsumerTest() {
Status status = client.alterTopic(TEST_TOPIC, settings).join();
Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess());
Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode());
+ }
+
+ @Test
+ public void step11_invalidAlterConsumerTest() {
+ AlterTopicSettings settings = AlterTopicSettings.newBuilder()
+ .addAlterConsumer(AlterConsumerSettings.newBuilder()
+ .setName(TEST_CONSUMER2)
+ // important and availability_period are incompatible
+ .setImportant(true)
+ .setAvailabilityPeriod(Duration.ofMinutes(5))
+ .build()
+ ).build();
+
+ Status status = client.alterTopic(TEST_TOPIC, settings).join();
+ Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess());
+ Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode());
+ }
+
+ @Test
+ public void step12_alterConsumerTest() {
+ AlterTopicSettings settings = AlterTopicSettings.newBuilder()
+ .addAlterConsumer(AlterConsumerSettings.newBuilder()
+ .setName(TEST_CONSUMER2)
+ .setReadFrom(Instant.EPOCH.plusSeconds(10))
+ .setAvailabilityPeriod(Duration.ofMinutes(5))
+ .build()
+ ).build();
+
+ Status status = client.alterTopic(TEST_TOPIC, settings).join();
+ Assert.assertTrue("Alter must be OK, but got status " + status, status.isSuccess());
+
+ ConsumerDescription description = client.describeConsumer(TEST_TOPIC, TEST_CONSUMER2).join().getValue();
+
+ Assert.assertEquals(TEST_CONSUMER2, description.getConsumer().getName());
+ Assert.assertEquals(Instant.EPOCH.plusSeconds(10), description.getConsumer().getReadFrom());
+ Assert.assertEquals(Duration.ofMinutes(5), description.getConsumer().getAvailabilityPeriod());
}
}