Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
* Query: Added support of ApacheArrow for query execution
* Tests: Updated testcontainers to 2.0.3

## 2.3.32 ##
* Topic: Added support of availabilityPeriod to AlterConsumerSettings
* Topic: Added partitionMaxInFlightBytes option to ReaderSettings
* Coordination: Fixed NPE on session stoping before connect

## 2.3.31 ##
* Topic: Added support of availabilityPeriod to topic consumer
* Topic: Added preferReady option to topic's control plane methods
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<ydb-auth-api.version>1.0.0</ydb-auth-api.version>
<ydb-proto-api.version>1.9.3</ydb-proto-api.version>
<ydb-proto-api.version>1.9.4</ydb-proto-api.version>
<yc-auth.version>2.3.1</yc-auth.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public CompletableFuture<Status> connect() {
final Stream stream = new Stream(rpc);
if (!updateState(local, makeConnectionState(local, stream))) {
logger.warn("{} cannot be connected with state {}", this, local.getState());
stream.cancelStream();
return CompletableFuture.completedFuture(Status.of(StatusCode.BAD_REQUEST));
}

Expand All @@ -120,8 +121,8 @@ public CompletableFuture<Status> connect() {
}

private CompletableFuture<Result<Long>> connectToSession(Stream stream, long sessionID) {
// start new stream
stream.startStream().whenCompleteAsync((status, th) -> {
// attach completion handler to the stream
stream.getFinishedFuture().whenCompleteAsync((status, th) -> {
// this handler is executed when stream finishes
// we have some action to do here

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @author Aleksandr Gorshenin
*/
class Stream implements GrpcReadWriteStream.Observer<SessionResponse> {
class Stream {
private static final int SHUTDOWN_TIMEOUT_MS = 1000;
private static final Logger logger = LoggerFactory.getLogger(Stream.class);

Expand All @@ -43,10 +43,7 @@ class Stream implements GrpcReadWriteStream.Observer<SessionResponse> {
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder()
.disableDeadline()
.build());
}

public CompletableFuture<Status> startStream() {
stream.start(this).whenComplete((status, th) -> {
stream.start(this::onNext).whenComplete((status, th) -> {
if (th != null) {
startFuture.completeExceptionally(th);
stopFuture.completeExceptionally(th);
Expand All @@ -56,7 +53,9 @@ public CompletableFuture<Status> startStream() {
stopFuture.complete(status);
}
});
}

public CompletableFuture<Status> getFinishedFuture() {
return stopFuture;
}

Expand Down Expand Up @@ -93,7 +92,6 @@ public CompletableFuture<Status> stop() {
SessionRequest.SessionStop.newBuilder().build()
).build();


logger.trace("stream {} send session stop msg", hashCode());
stream.sendNext(stopMsg);

Expand Down Expand Up @@ -121,7 +119,6 @@ public void sendMsg(long requestId, StreamMsg<?> msg) {
}
}

@Override
public void onNext(SessionResponse resp) {
if (resp.hasFailure()) {
onFail(resp.getFailure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,19 @@ public CompletableFuture<Status> start(Observer<SessionResponse> observer) {

@Override
public void sendNext(SessionRequest message) {
// Emulate GrpcReadWriteStream behaviour
if (observer == null) {
throw new NullPointerException("send message before start");
}
requests.offer(message);
}

@Override
public void close() {
// Emulate GrpcReadWriteStream behaviour
if (observer == null) {
throw new NullPointerException("close stream before start");
}
isClosed = true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package tech.ydb.coordination.impl;

import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.test.junit4.GrpcTransportRule;

/**
*
* @author Aleksandr Gorshenin
*/
public class StreamIntegrationTest {
@ClassRule
public static final GrpcTransportRule YDB_TRANSPORT = new GrpcTransportRule();

private static final Rpc RPC = new RpcImpl(YDB_TRANSPORT);

@Rule
public final Timeout testTimeoutRule = new Timeout(10, TimeUnit.SECONDS);

@Test
public void stopBeforeStartTest() {
Stream stream = new Stream(RPC);
Status stopped = stream.stop().join();

Assert.assertEquals(StatusCode.CLIENT_GRPC_ERROR, stopped.getCode());
Assert.assertEquals(1, stopped.getIssues().length);
Issue issue = stopped.getIssues()[0];
Assert.assertTrue(issue.getMessage().startsWith("gRPC error: (INVALID_ARGUMENT) on"));
Assert.assertTrue(issue.getMessage().endsWith("First message must be a SessionStart"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void baseConnectTest() {
Assert.assertFalse(grpc.isCanceled());
Assert.assertFalse(grpc.hasNextRequest());

CompletableFuture<Status> finished = stream.startStream();
CompletableFuture<Status> finished = stream.getFinishedFuture();
CompletableFuture<Result<Long>> connected = stream.sendSessionStart(0, "demo", Duration.ZERO, ByteString.EMPTY);

Assert.assertFalse(grpc.isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void nextRequest(int count) {

@Override
public void onMessage(RespT message) {
try (Scope ignored = callSpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = callSpan.makeCurrent()) {
try {
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@
<arg>-Xlint:-processing</arg>
<!-- Silence warnning "source value 8 is obsolete and will be removed in a future release" -->
<arg>-Xlint:-options</arg>
<!-- Silence warnning "auto-closeable resource ignored is never referenced in body of corresponding try statement" -->
<arg>-Xlint:-try</arg>
</compilerArgs>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void run() {
}

public void requestSession() {
try (Scope ignored = executeSpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = executeSpan.makeCurrent()) {
retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL);
}
CompletableFuture<Result<QuerySession>> sessionFuture = createSessionWithRetrySpanParent();
Expand All @@ -201,10 +201,10 @@ private void acceptSession(@Nonnull Result<QuerySession> sessionResult) {

final QuerySession session = sessionResult.getValue();
try {
try (Scope ignored = retrySpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
fn.apply(session).whenComplete((fnResult, fnException) -> {
try {
try (Scope ignored1 = retrySpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored1 = retrySpan.makeCurrent()) {
session.close();

if (fnException != null) {
Expand Down Expand Up @@ -282,7 +282,7 @@ private void handleException(@Nonnull Throwable ex) {
}

private CompletableFuture<Result<QuerySession>> createSessionWithRetrySpanParent() {
try (Scope ignored = retrySpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
return queryClient.createSession(sessionCreationTimeout);
}
}
Expand Down
8 changes: 4 additions & 4 deletions table/src/main/java/tech/ydb/table/SessionRetryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void run() {
}

public void requestSession() {
try (Scope ignored = executeSpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = executeSpan.makeCurrent()) {
retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL);
}
CompletableFuture<Result<Session>> sessionFuture = createSessionWithRetrySpanParent();
Expand All @@ -218,10 +218,10 @@ private void acceptSession(@Nonnull Result<Session> sessionResult) {

final Session session = sessionResult.getValue();
try {
try (Scope ignored = retrySpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
fn.apply(session).whenComplete((fnResult, fnException) -> {
try {
try (Scope ignored1 = retrySpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored1 = retrySpan.makeCurrent()) {
session.close();

if (fnException != null) {
Expand Down Expand Up @@ -307,7 +307,7 @@ private void handleException(@Nonnull Throwable ex) {
}

private CompletableFuture<Result<Session>> createSessionWithRetrySpanParent() {
try (Scope ignored = retrySpan.makeCurrent()) {
try (@SuppressWarnings("unused") Scope ignored = retrySpan.makeCurrent()) {
return sessionSupplier.createSession(sessionCreationTimeout);
}
}
Expand Down
5 changes: 5 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ public CompletableFuture<Status> alterTopic(String path, AlterTopicSettings sett
alterConsumerBuilder.setSetSupportedCodecs(toProto(consumerSupportedCodecs));
}

Duration availabilityPeriod = alterConsumer.getAvailabilityPeriod();
if (availabilityPeriod != null) {
alterConsumerBuilder.setSetAvailabilityPeriod(ProtobufUtils.durationToProto(availabilityPeriod));
}

Map<String, String> consumerAttributes = alterConsumer.getAlterAttributes();
if (!consumerAttributes.isEmpty()) {
alterConsumerBuilder.putAllAlterAttributes(consumerAttributes);
Expand Down
2 changes: 2 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ private static YdbTopic.StreamReadMessage.InitRequest buildInitRequest(ReaderSet
List<TopicReadSettings> topics = settings.getTopics();

YdbTopic.StreamReadMessage.InitRequest.Builder builder = YdbTopic.StreamReadMessage.InitRequest.newBuilder();

builder.setPartitionMaxInFlightBytes(settings.getPartitionMaxInFlightBytes());
if (consumerName != null && !consumerName.isEmpty()) {
builder.setConsumer(consumerName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ private class ReadSessionFactory {
this.settings = settings;
this.decompressor = decompressor;
this.codecRegistry = codecRegistry;

}

public ReadSession createNextSession() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.settings;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -9,6 +10,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.SupportedCodecs;

/**
Expand All @@ -22,6 +24,9 @@ public class AlterConsumerSettings {
private final Instant readFrom;
@Nullable
private final SupportedCodecs supportedCodecs;
@Nullable
private final Duration availabilityPeriod;

private final Map<String, String> alterAttributes;
private final Set<String> dropAttributes;

Expand All @@ -32,6 +37,7 @@ private AlterConsumerSettings(Builder builder) {
this.supportedCodecs = builder.supportedCodecs;
this.alterAttributes = builder.alterAttributes;
this.dropAttributes = builder.dropAttributes;
this.availabilityPeriod = builder.availabilityPeriod;
}

public static Builder newBuilder() {
Expand All @@ -57,6 +63,11 @@ public SupportedCodecs getSupportedCodecs() {
return supportedCodecs;
}

@Nullable
public Duration getAvailabilityPeriod() {
return availabilityPeriod;
}

public Map<String, String> getAlterAttributes() {
return alterAttributes;
}
Expand All @@ -73,6 +84,7 @@ public static class Builder {
private Boolean important = null;
private Instant readFrom = null;
private SupportedCodecs supportedCodecs = null;
private Duration availabilityPeriod = null;
private Map<String, String> alterAttributes = new HashMap<>();
private Set<String> dropAttributes = new HashSet<>();

Expand All @@ -96,6 +108,20 @@ public Builder setImportant(boolean important) {
return this;
}

/**
* Configure <code>availabilityPeriod</code> for this consumer.
* <br>
* Option <code>availabilityPeriod</code> is not compatible with <code>important</code> option
*
* @see Consumer#getAvailabilityPeriod()
* @param period - availability period value
* @return settings builder
*/
public Builder setAvailabilityPeriod(Duration period) {
this.availabilityPeriod = period;
return this;
}

/**
* @param readFrom Time to read from. All messages with smaller server written_at timestamp will be skipped.
* @return settings builder
Expand Down
Loading
Loading