Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@ public abstract class ReadPartitionSession {
private final Queue<Batch> readingQueue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean isReadingNow = new AtomicBoolean();

ReadPartitionSession(String traceID, ReadSession session, PartitionSession partition, long readOffset,
long committedOffset) {
ReadPartitionSession(String traceID, ReadSession session, PartitionSession partition, long lastCommittedOffset) {
this.traceID = traceID;
this.session = session;
this.partition = partition;
this.maxBatchSize = session.getMaxBatchSize();
this.decoder = session.getMessageDecoder();
this.committer = new MessageCommitterImpl(this, committedOffset);
this.lastReadOffset = readOffset;
this.committer = new MessageCommitterImpl(this, lastCommittedOffset);
this.lastReadOffset = lastCommittedOffset;
}

@Override
Expand Down
16 changes: 8 additions & 8 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,32 +204,32 @@ public void confirm(StartPartitionSessionSettings options) {
return;
}

long read = committed;
long commit = committed;
long readFrom = committed;
long commitTo = committed;
if (options != null) {
if (options.getReadOffset() != null) {
read = options.getReadOffset();
readFrom = options.getReadOffset();
}
if (options.getCommitOffset() != null) {
commit = options.getCommitOffset();
commitTo = options.getCommitOffset();
}
}

partSessions.put(psid, new ReadPartitionSession(traceID, ReadSession.this, partition, read, commit) {
partSessions.put(psid, new ReadPartitionSession(traceID, ReadSession.this, partition, commitTo) {
@Override
public CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event) {
return reader.handleDataReceivedEvent(event);
}
});

logger.info("[{}] Sending StartPartitionSessionResponse for {} and consumer \"{}\" with readOffset "
+ "{} and commitOffset {}", traceID, partition, consumerName, read, commit);
+ "{} and commitOffset {}", traceID, partition, consumerName, readFrom, commitTo);
send(YdbTopic.StreamReadMessage.FromClient.newBuilder()
.setStartPartitionSessionResponse(YdbTopic.StreamReadMessage.StartPartitionSessionResponse
.newBuilder()
.setPartitionSessionId(psid)
.setReadOffset(read)
.setCommitOffset(commit)
.setReadOffset(readFrom)
.setCommitOffset(commitTo)
.build()
).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
Expand Down Expand Up @@ -251,7 +250,6 @@ public void onMessages(DataReceivedEvent event) {
}

@Test
@Ignore("Disabled: readRetentionedTopicTest is currently freezing when emulating topic retention; ")
public void readRetentionedTopicTest() throws Exception {
ReaderSettings readerSettings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
Expand Down
Loading