From 7bbfaa45e47fde1258ef8cada9ea0784e5258b25 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 1 Apr 2026 11:42:34 +0100 Subject: [PATCH] Fixed commit hangling if commitOffset < readOffset --- .../topic/read/impl/ReadPartitionSession.java | 7 +++---- .../tech/ydb/topic/read/impl/ReadSession.java | 16 ++++++++-------- .../topic/impl/TopicReadersIntegrationTest.java | 2 -- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java index ab49983e5..76e7f94b6 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java @@ -38,15 +38,14 @@ public abstract class ReadPartitionSession { private final Queue readingQueue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean isReadingNow = new AtomicBoolean(); - ReadPartitionSession(String traceID, ReadSession session, PartitionSession partition, long readOffset, - long committedOffset) { + ReadPartitionSession(String traceID, ReadSession session, PartitionSession partition, long lastCommittedOffset) { this.traceID = traceID; this.session = session; this.partition = partition; this.maxBatchSize = session.getMaxBatchSize(); this.decoder = session.getMessageDecoder(); - this.committer = new MessageCommitterImpl(this, committedOffset); - this.lastReadOffset = readOffset; + this.committer = new MessageCommitterImpl(this, lastCommittedOffset); + this.lastReadOffset = lastCommittedOffset; } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java index cdae4193f..ce60f60b1 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java @@ -204,18 +204,18 @@ public void confirm(StartPartitionSessionSettings options) { return; } - long read = committed; - long commit = committed; + long readFrom = committed; + long commitTo = committed; if (options != null) { if (options.getReadOffset() != null) { - read = options.getReadOffset(); + readFrom = options.getReadOffset(); } if (options.getCommitOffset() != null) { - commit = options.getCommitOffset(); + commitTo = options.getCommitOffset(); } } - partSessions.put(psid, new ReadPartitionSession(traceID, ReadSession.this, partition, read, commit) { + partSessions.put(psid, new ReadPartitionSession(traceID, ReadSession.this, partition, commitTo) { @Override public CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) { return reader.handleDataReceivedEvent(event); @@ -223,13 +223,13 @@ public CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) }); logger.info("[{}] Sending StartPartitionSessionResponse for {} and consumer \"{}\" with readOffset " - + "{} and commitOffset {}", traceID, partition, consumerName, read, commit); + + "{} and commitOffset {}", traceID, partition, consumerName, readFrom, commitTo); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setStartPartitionSessionResponse(YdbTopic.StreamReadMessage.StartPartitionSessionResponse .newBuilder() .setPartitionSessionId(psid) - .setReadOffset(read) - .setCommitOffset(commit) + .setReadOffset(readFrom) + .setCommitOffset(commitTo) .build() ).build()); } diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReadersIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReadersIntegrationTest.java index 564e7c69b..b0a6d3eff 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicReadersIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReadersIntegrationTest.java @@ -17,7 +17,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -251,7 +250,6 @@ public void onMessages(DataReceivedEvent event) { } @Test - @Ignore("Disabled: readRetentionedTopicTest is currently freezing when emulating topic retention; ") public void readRetentionedTopicTest() throws Exception { ReaderSettings readerSettings = ReaderSettings.newBuilder() .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())