diff --git a/CHANGELOG.md b/CHANGELOG.md index a923d54fe..4b00b34eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,11 @@ * Query: Added support of ApacheArrow for query execution * Tests: Updated testcontainers to 2.0.3 +## 2.3.32 ## +* Topic: Added support of availabilityPeriod to AlterConsumerSettings +* Topic: Added partitionMaxInFlightBytes option to ReaderSettings +* Coordination: Fixed NPE on session stoping before connect + ## 2.3.31 ## * Topic: Added support of availabilityPeriod to topic consumer * Topic: Added preferReady option to topic's control plane methods diff --git a/bom/pom.xml b/bom/pom.xml index 2a362a192..4086f4a13 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -15,7 +15,7 @@ 1.0.0 - 1.9.3 + 1.9.4 2.3.1 diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java index d38f670ad..c952fc3ed 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java @@ -111,6 +111,7 @@ public CompletableFuture connect() { final Stream stream = new Stream(rpc); if (!updateState(local, makeConnectionState(local, stream))) { logger.warn("{} cannot be connected with state {}", this, local.getState()); + stream.cancelStream(); return CompletableFuture.completedFuture(Status.of(StatusCode.BAD_REQUEST)); } @@ -120,8 +121,8 @@ public CompletableFuture connect() { } private CompletableFuture> connectToSession(Stream stream, long sessionID) { - // start new stream - stream.startStream().whenCompleteAsync((status, th) -> { + // attach completion handler to the stream + stream.getFinishedFuture().whenCompleteAsync((status, th) -> { // this handler is executed when stream finishes // we have some action to do here diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java index 38018f0ef..a927913ae 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java @@ -27,7 +27,7 @@ * * @author Aleksandr Gorshenin */ -class Stream implements GrpcReadWriteStream.Observer { +class Stream { private static final int SHUTDOWN_TIMEOUT_MS = 1000; private static final Logger logger = LoggerFactory.getLogger(Stream.class); @@ -43,10 +43,7 @@ class Stream implements GrpcReadWriteStream.Observer { this.stream = rpc.createSession(GrpcRequestSettings.newBuilder() .disableDeadline() .build()); - } - - public CompletableFuture startStream() { - stream.start(this).whenComplete((status, th) -> { + stream.start(this::onNext).whenComplete((status, th) -> { if (th != null) { startFuture.completeExceptionally(th); stopFuture.completeExceptionally(th); @@ -56,7 +53,9 @@ public CompletableFuture startStream() { stopFuture.complete(status); } }); + } + public CompletableFuture getFinishedFuture() { return stopFuture; } @@ -93,7 +92,6 @@ public CompletableFuture stop() { SessionRequest.SessionStop.newBuilder().build() ).build(); - logger.trace("stream {} send session stop msg", hashCode()); stream.sendNext(stopMsg); @@ -121,7 +119,6 @@ public void sendMsg(long requestId, StreamMsg msg) { } } - @Override public void onNext(SessionResponse resp) { if (resp.hasFailure()) { onFail(resp.getFailure()); diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java b/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java index e00c066c0..3b57c1246 100644 --- a/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java +++ b/coordination/src/test/java/tech/ydb/coordination/impl/GrpcStreamMock.java @@ -52,11 +52,19 @@ public CompletableFuture start(Observer observer) { @Override public void sendNext(SessionRequest message) { + // Emulate GrpcReadWriteStream behaviour + if (observer == null) { + throw new NullPointerException("send message before start"); + } requests.offer(message); } @Override public void close() { + // Emulate GrpcReadWriteStream behaviour + if (observer == null) { + throw new NullPointerException("close stream before start"); + } isClosed = true; } diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java new file mode 100644 index 000000000..eda1ce169 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java @@ -0,0 +1,40 @@ +package tech.ydb.coordination.impl; + +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import tech.ydb.core.Issue; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.test.junit4.GrpcTransportRule; + +/** + * + * @author Aleksandr Gorshenin + */ +public class StreamIntegrationTest { + @ClassRule + public static final GrpcTransportRule YDB_TRANSPORT = new GrpcTransportRule(); + + private static final Rpc RPC = new RpcImpl(YDB_TRANSPORT); + + @Rule + public final Timeout testTimeoutRule = new Timeout(10, TimeUnit.SECONDS); + + @Test + public void stopBeforeStartTest() { + Stream stream = new Stream(RPC); + Status stopped = stream.stop().join(); + + Assert.assertEquals(StatusCode.CLIENT_GRPC_ERROR, stopped.getCode()); + Assert.assertEquals(1, stopped.getIssues().length); + Issue issue = stopped.getIssues()[0]; + Assert.assertTrue(issue.getMessage().startsWith("gRPC error: (INVALID_ARGUMENT) on")); + Assert.assertTrue(issue.getMessage().endsWith("First message must be a SessionStart")); + } +} diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java index 73a0a9247..65206c30f 100644 --- a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java @@ -53,7 +53,7 @@ public void baseConnectTest() { Assert.assertFalse(grpc.isCanceled()); Assert.assertFalse(grpc.hasNextRequest()); - CompletableFuture finished = stream.startStream(); + CompletableFuture finished = stream.getFinishedFuture(); CompletableFuture> connected = stream.sendSessionStart(0, "demo", Duration.ZERO, ByteString.EMPTY); Assert.assertFalse(grpc.isClosed()); diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 2cdcba969..e818e1144 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -116,7 +116,7 @@ private void nextRequest(int count) { @Override public void onMessage(RespT message) { - try (Scope ignored = callSpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = callSpan.makeCurrent()) { try { if (logger.isTraceEnabled()) { logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message)); diff --git a/pom.xml b/pom.xml index 3ef8c961b..904fb7077 100644 --- a/pom.xml +++ b/pom.xml @@ -168,6 +168,8 @@ -Xlint:-processing -Xlint:-options + + -Xlint:-try diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index 1b5197567..520926d4a 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -174,7 +174,7 @@ public void run() { } public void requestSession() { - try (Scope ignored = executeSpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = executeSpan.makeCurrent()) { retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); } CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); @@ -201,10 +201,10 @@ private void acceptSession(@Nonnull Result sessionResult) { final QuerySession session = sessionResult.getValue(); try { - try (Scope ignored = retrySpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) { fn.apply(session).whenComplete((fnResult, fnException) -> { try { - try (Scope ignored1 = retrySpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored1 = retrySpan.makeCurrent()) { session.close(); if (fnException != null) { @@ -282,7 +282,7 @@ private void handleException(@Nonnull Throwable ex) { } private CompletableFuture> createSessionWithRetrySpanParent() { - try (Scope ignored = retrySpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) { return queryClient.createSession(sessionCreationTimeout); } } diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java index a2139c8c8..dc82601c9 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java @@ -191,7 +191,7 @@ public void run() { } public void requestSession() { - try (Scope ignored = executeSpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = executeSpan.makeCurrent()) { retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); } CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); @@ -218,10 +218,10 @@ private void acceptSession(@Nonnull Result sessionResult) { final Session session = sessionResult.getValue(); try { - try (Scope ignored = retrySpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) { fn.apply(session).whenComplete((fnResult, fnException) -> { try { - try (Scope ignored1 = retrySpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored1 = retrySpan.makeCurrent()) { session.close(); if (fnException != null) { @@ -307,7 +307,7 @@ private void handleException(@Nonnull Throwable ex) { } private CompletableFuture> createSessionWithRetrySpanParent() { - try (Scope ignored = retrySpan.makeCurrent()) { + try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) { return sessionSupplier.createSession(sessionCreationTimeout); } } diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java index fdb88314c..6c84a610c 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java @@ -250,6 +250,11 @@ public CompletableFuture alterTopic(String path, AlterTopicSettings sett alterConsumerBuilder.setSetSupportedCodecs(toProto(consumerSupportedCodecs)); } + Duration availabilityPeriod = alterConsumer.getAvailabilityPeriod(); + if (availabilityPeriod != null) { + alterConsumerBuilder.setSetAvailabilityPeriod(ProtobufUtils.durationToProto(availabilityPeriod)); + } + Map consumerAttributes = alterConsumer.getAlterAttributes(); if (!consumerAttributes.isEmpty()) { alterConsumerBuilder.putAllAlterAttributes(consumerAttributes); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java index 27dd5f855..cdae4193f 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java @@ -514,6 +514,8 @@ private static YdbTopic.StreamReadMessage.InitRequest buildInitRequest(ReaderSet List topics = settings.getTopics(); YdbTopic.StreamReadMessage.InitRequest.Builder builder = YdbTopic.StreamReadMessage.InitRequest.newBuilder(); + + builder.setPartitionMaxInFlightBytes(settings.getPartitionMaxInFlightBytes()); if (consumerName != null && !consumerName.isEmpty()) { builder.setConsumer(consumerName); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index de89b1724..9645d2b7a 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -136,7 +136,6 @@ private class ReadSessionFactory { this.settings = settings; this.decompressor = decompressor; this.codecRegistry = codecRegistry; - } public ReadSession createNextSession() { diff --git a/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java b/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java index 3688844b1..81435e926 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/AlterConsumerSettings.java @@ -1,5 +1,6 @@ package tech.ydb.topic.settings; +import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.HashSet; @@ -9,6 +10,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import tech.ydb.topic.description.Consumer; import tech.ydb.topic.description.SupportedCodecs; /** @@ -22,6 +24,9 @@ public class AlterConsumerSettings { private final Instant readFrom; @Nullable private final SupportedCodecs supportedCodecs; + @Nullable + private final Duration availabilityPeriod; + private final Map alterAttributes; private final Set dropAttributes; @@ -32,6 +37,7 @@ private AlterConsumerSettings(Builder builder) { this.supportedCodecs = builder.supportedCodecs; this.alterAttributes = builder.alterAttributes; this.dropAttributes = builder.dropAttributes; + this.availabilityPeriod = builder.availabilityPeriod; } public static Builder newBuilder() { @@ -57,6 +63,11 @@ public SupportedCodecs getSupportedCodecs() { return supportedCodecs; } + @Nullable + public Duration getAvailabilityPeriod() { + return availabilityPeriod; + } + public Map getAlterAttributes() { return alterAttributes; } @@ -73,6 +84,7 @@ public static class Builder { private Boolean important = null; private Instant readFrom = null; private SupportedCodecs supportedCodecs = null; + private Duration availabilityPeriod = null; private Map alterAttributes = new HashMap<>(); private Set dropAttributes = new HashSet<>(); @@ -96,6 +108,20 @@ public Builder setImportant(boolean important) { return this; } + /** + * Configure availabilityPeriod for this consumer. + *
+ * Option availabilityPeriod is not compatible with important option + * + * @see Consumer#getAvailabilityPeriod() + * @param period - availability period value + * @return settings builder + */ + public Builder setAvailabilityPeriod(Duration period) { + this.availabilityPeriod = period; + return this; + } + /** * @param readFrom Time to read from. All messages with smaller server written_at timestamp will be skipped. * @return settings builder diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index 772d83702..175ef1667 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableList; import tech.ydb.core.Status; +import tech.ydb.topic.read.events.DataReceivedEvent; /** * @author Nikolay Perfilov @@ -23,6 +24,7 @@ public class ReaderSettings { private final List topics; private final long maxMemoryUsageBytes; private final int maxBatchSize; + private final long partitionMaxInFlightBytes; private final Executor decompressionExecutor; private final BiConsumer errorsHandler; @@ -33,6 +35,7 @@ private ReaderSettings(Builder builder) { this.topics = ImmutableList.copyOf(builder.topics); this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes; this.maxBatchSize = builder.maxBatchSize; + this.partitionMaxInFlightBytes = builder.partitionMaxInFlightBytes; this.decompressionExecutor = builder.decompressionExecutor; this.errorsHandler = builder.errorsHandler; } @@ -62,6 +65,10 @@ public long getMaxMemoryUsageBytes() { return maxMemoryUsageBytes; } + public long getPartitionMaxInFlightBytes() { + return partitionMaxInFlightBytes; + } + public int getMaxBatchSize() { return maxBatchSize; } @@ -84,6 +91,7 @@ public static class Builder { private String readerName = null; private List topics = new ArrayList<>(); private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; + private long partitionMaxInFlightBytes = 0; private int maxBatchSize = 0; private Executor decompressionExecutor = null; private BiConsumer errorsHandler = null; @@ -118,6 +126,7 @@ public Builder withoutConsumer() { /** * Set reader name for debug purposes + * @param readerName custom reader name * @return settings builder */ public Builder setReaderName(String readerName) { @@ -140,11 +149,30 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) { return this; } + /** + * Set the maximum count of messages for the one {@link DataReceivedEvent }. Default value is {@code 0} that + * means no-limit mode, every next {@link DataReceivedEvent } will have all messages available to read + * + * @param maxBatchSize maximum count of messages in one event + * @return settings builder + */ public Builder setMaxBatchSize(int maxBatchSize) { this.maxBatchSize = maxBatchSize; return this; } + /** + * Configure limit for messages allowed to read to internal buffer from one partition. + * Default value is {@code 0} that turns off this limit. + * + * @param maxInFlightBytes maximum size of messages in bytes + * @return settings builder + */ + public Builder setPartitionMaxInFlightBytes(long maxInFlightBytes) { + this.partitionMaxInFlightBytes = maxInFlightBytes; + return this; + } + public Builder setErrorsHandler(BiConsumer handler) { this.errorsHandler = handler; return this; diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java index 7f9daadba..3ff0583bf 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java @@ -1,6 +1,7 @@ package tech.ydb.topic.impl; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -25,6 +26,7 @@ import tech.ydb.test.junit4.GrpcTransportRule; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Consumer; +import tech.ydb.topic.description.ConsumerDescription; import tech.ydb.topic.description.PartitionInfo; import tech.ydb.topic.description.TopicDescription; import tech.ydb.topic.read.AsyncReader; @@ -33,6 +35,7 @@ import tech.ydb.topic.read.events.AbstractReadEventHandler; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings; +import tech.ydb.topic.settings.AlterConsumerSettings; import tech.ydb.topic.settings.AlterPartitioningSettings; import tech.ydb.topic.settings.AlterTopicSettings; import tech.ydb.topic.settings.AutoPartitioningStrategy; @@ -334,10 +337,10 @@ public void step09_describeTopicStats() { for (PartitionInfo partition: withStats.getPartitions()) { Assert.assertNotNull(partition.getPartitionStats()); } - } + } @Test - public void step10_invalidConsumerTest() { + public void step10_invalidAddConsumerTest() { AlterTopicSettings settings = AlterTopicSettings.newBuilder() .addAddConsumer(Consumer.newBuilder() .setName("WRONG_CONSUMER") @@ -350,5 +353,41 @@ public void step10_invalidConsumerTest() { Status status = client.alterTopic(TEST_TOPIC, settings).join(); Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess()); Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode()); + } + + @Test + public void step11_invalidAlterConsumerTest() { + AlterTopicSettings settings = AlterTopicSettings.newBuilder() + .addAlterConsumer(AlterConsumerSettings.newBuilder() + .setName(TEST_CONSUMER2) + // important and availability_period are incompatible + .setImportant(true) + .setAvailabilityPeriod(Duration.ofMinutes(5)) + .build() + ).build(); + + Status status = client.alterTopic(TEST_TOPIC, settings).join(); + Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess()); + Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode()); + } + + @Test + public void step12_alterConsumerTest() { + AlterTopicSettings settings = AlterTopicSettings.newBuilder() + .addAlterConsumer(AlterConsumerSettings.newBuilder() + .setName(TEST_CONSUMER2) + .setReadFrom(Instant.EPOCH.plusSeconds(10)) + .setAvailabilityPeriod(Duration.ofMinutes(5)) + .build() + ).build(); + + Status status = client.alterTopic(TEST_TOPIC, settings).join(); + Assert.assertTrue("Alter must be OK, but got status " + status, status.isSuccess()); + + ConsumerDescription description = client.describeConsumer(TEST_TOPIC, TEST_CONSUMER2).join().getValue(); + + Assert.assertEquals(TEST_CONSUMER2, description.getConsumer().getName()); + Assert.assertEquals(Instant.EPOCH.plusSeconds(10), description.getConsumer().getReadFrom()); + Assert.assertEquals(Duration.ofMinutes(5), description.getConsumer().getAvailabilityPeriod()); } }