From 1b25454a2db979cd10a8e52ffb3092899687a281 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Mon, 2 Mar 2026 21:21:56 +0300 Subject: [PATCH 1/8] Add test --- .../impl/TopicReaderEventOrderingTest.java | 493 ++++++++++++++++++ 1 file changed, 493 insertions(+) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java new file mode 100644 index 000000000..46711f008 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java @@ -0,0 +1,493 @@ +package tech.ydb.topic.impl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.Status; +import tech.ydb.test.junit4.GrpcTransportRule; +import tech.ydb.topic.TopicClient; +import tech.ydb.topic.description.Consumer; +import tech.ydb.topic.read.AsyncReader; +import tech.ydb.topic.read.PartitionSession; +import tech.ydb.topic.read.events.ReadEventHandler; +import tech.ydb.topic.settings.CreateTopicSettings; +import tech.ydb.topic.settings.ReadEventHandlersSettings; +import tech.ydb.topic.settings.ReaderSettings; +import tech.ydb.topic.settings.TopicReadSettings; +import tech.ydb.topic.settings.WriterSettings; +import tech.ydb.topic.write.Message; +import tech.ydb.topic.write.SyncWriter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test to verify event ordering guarantees and session close race conditions in Topic API. + * + * This test checks for two related problems: + * 1. Event Ordering: StartPartitionSessionEvent and StopPartitionSessionEvent must be delivered in order, + * ensuring stop events are not processed before their corresponding start events. + * + * 2. Session Close Race Condition: Server reader sessions should not be closed before onPartitionSessionClosed + * and onReaderClosed callbacks complete execution. This prevents partitions from being reassigned to other + * readers before the original reader has finished cleaning up its resources. + * + * @author Generated Test + */ +public class TopicReaderEventOrderingTest { + private static final Logger logger = LoggerFactory.getLogger(TopicReaderEventOrderingTest.class); + + @ClassRule + public static final GrpcTransportRule ydbTransport = new GrpcTransportRule(); + + private static final String TEST_CONSUMER = "test-consumer"; + + private TopicClient client; + private String testTopic; + + @Before + public void setup() { + testTopic = "test-topic-" + UUID.randomUUID(); + logger.info("Creating test topic: {}", testTopic); + + client = TopicClient.newClient(ydbTransport).build(); + client.createTopic(testTopic, CreateTopicSettings.newBuilder() + .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER).build()) + .build() + ).join().expectSuccess("Failed to create test topic"); + } + + @After + public void tearDown() { + if (testTopic != null && client != null) { + logger.info("Dropping test topic: {}", testTopic); + Status dropStatus = client.dropTopic(testTopic).join(); + dropStatus.expectSuccess("Failed to drop test topic"); + } + if (client != null) { + client.close(); + } + } + + private void sendMessage(String data) throws Exception { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath(testTopic) + .setProducerId("test-producer") + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.initAndWait(); + writer.send(Message.of(data.getBytes())); + writer.flush(); + } + + /** + * Test for event ordering: verifies that StopPartitionSessionEvent is never processed + * before its corresponding StartPartitionSessionEvent. + */ + @Test + public void testEventOrderingGuarantees() throws Exception { + logger.info("Starting testEventOrderingGuarantees"); + + // Track events for each partition session + List eventLog = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch startReceived = new CountDownLatch(1); + CountDownLatch stopReceived = new CountDownLatch(1); + CountDownLatch closeReceived = new CountDownLatch(1); + AtomicBoolean orderingViolation = new AtomicBoolean(false); + + ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "test-event-executor")); + + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(testTopic) + .build()) + .setConsumerName(TEST_CONSUMER) + .build(); + + AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(executor) + .setEventHandler(new ReadEventHandler() { + private final AtomicReference activeSession = new AtomicReference<>(); + + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + eventLog.add("onMessages[session=" + event.getPartitionSession().getId() + "]"); + } + + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + long sessionId = event.getPartitionSession().getId(); + eventLog.add("onStartPartitionSession[session=" + sessionId + "]"); + logger.info("onStartPartitionSession: session={}", sessionId); + + if (activeSession.get() != null) { + logger.error("START event received while session {} is still active", activeSession.get()); + orderingViolation.set(true); + } + activeSession.set(sessionId); + event.confirm(); + startReceived.countDown(); + } + + @Override + public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) { + long sessionId = event.getPartitionSessionId(); + eventLog.add("onStopPartitionSession[session=" + sessionId + "]"); + logger.info("onStopPartitionSession: session={}", sessionId); + + if (activeSession.get() == null) { + logger.error("STOP event received without corresponding START event"); + orderingViolation.set(true); + } else if (!activeSession.get().equals(sessionId)) { + logger.error("STOP event for session {} but active session is {}", + sessionId, activeSession.get()); + orderingViolation.set(true); + } + + event.confirm(); + stopReceived.countDown(); + } + + @Override + public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { + long sessionId = event.getPartitionSession().getId(); + eventLog.add("onPartitionSessionClosed[session=" + sessionId + "]"); + logger.info("onPartitionSessionClosed: session={}", sessionId); + activeSession.set(null); + closeReceived.countDown(); + } + }) + .build() + ); + + reader.init().join(); + + // Send a message to trigger partition assignment + sendMessage("test-message"); + + // Wait for start event + assertTrue("Start event not received", startReceived.await(10, TimeUnit.SECONDS)); + + // Shutdown reader to trigger stop event + logger.info("Shutting down reader"); + reader.shutdown().get(10, TimeUnit.SECONDS); + + // assertTrue("Stop event not received", stopReceived.await(10, TimeUnit.SECONDS)); + assertTrue("Close event not received", closeReceived.await(10, TimeUnit.SECONDS)); + + executor.shutdownNow(); + + logger.info("Event log: {}", eventLog); + assertFalse("Event ordering violation detected", orderingViolation.get()); + + // Verify event sequence + int startIndex = -1; + int stopIndex = -1; + for (int i = 0; i < eventLog.size(); i++) { + if (eventLog.get(i).startsWith("onStartPartitionSession")) { + startIndex = i; + } + if (eventLog.get(i).startsWith("onPartitionSessionClosed")) { + stopIndex = i; + } + } + + assertTrue("Start event should be present", startIndex >= 0); + assertTrue("Close event should be present", stopIndex >= 0); + assertTrue("Start event must come before Stop event", startIndex < stopIndex); + } + + /** + * Test for session close race condition: verifies that partitions are not reassigned to other readers + * before the original reader completes its cleanup in onPartitionSessionClosed and onReaderClosed callbacks. + */ + @Test + public void testSessionCloseRaceCondition() throws Exception { + logger.info("Starting testSessionCloseRaceCondition"); + + // Shared state to track the race condition + AtomicReference reader1PartitionSession = new AtomicReference<>(); + AtomicBoolean reader1CleanupInProgress = new AtomicBoolean(false); + AtomicBoolean reader1CleanupCompleted = new AtomicBoolean(false); + AtomicBoolean raceConditionDetected = new AtomicBoolean(false); + CountDownLatch reader1Started = new CountDownLatch(1); + CountDownLatch reader1CleanupStarted = new CountDownLatch(1); + CountDownLatch reader2Started = new CountDownLatch(1); + CountDownLatch allowReader1ToFinish = new CountDownLatch(1); + + // Create two single-threaded executors to simulate the scenario + ExecutorService reader1Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-1-executor")); + ExecutorService reader2Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-2-executor")); + + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(testTopic) + .build()) + .setConsumerName(TEST_CONSUMER) + .build(); + + // Create Reader-1 + AsyncReader reader1 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(reader1Executor) + .setEventHandler(new ReadEventHandler() { + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + // No-op + } + + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + PartitionSession session = event.getPartitionSession(); + logger.info("Reader-1: onStartPartitionSession - partition={}, session={}", + session.getPartitionId(), session.getId()); + reader1PartitionSession.set(session); + event.confirm(); + reader1Started.countDown(); + } + + @Override + public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { + PartitionSession session = event.getPartitionSession(); + logger.info("Reader-1: onPartitionSessionClosed - partition={}, session={}", + session.getPartitionId(), session.getId()); + logger.info("Reader-1: before closing resources"); + + reader1CleanupInProgress.set(true); + reader1CleanupStarted.countDown(); + + // Simulate slow cleanup (e.g., closing database connections, flushing buffers) + try { + boolean finished = allowReader1ToFinish.await(5, TimeUnit.SECONDS); + if (!finished) { + logger.error("Reader-1: cleanup timeout"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Reader-1: cleanup interrupted", e); + } + + logger.info("Reader-1: after closing resources"); + reader1CleanupInProgress.set(false); + reader1CleanupCompleted.set(true); + } + + @Override + public void onReaderClosed(tech.ydb.topic.read.events.ReaderClosedEvent event) { + logger.info("Reader-1: onReaderClosed"); + } + }) + .build() + ); + + reader1.init().join(); + + // Send a message to trigger partition assignment to Reader-1 + sendMessage("test-message-1"); + + // Wait for Reader-1 to receive partition + assertTrue("Reader-1 did not receive partition", reader1Started.await(10, TimeUnit.SECONDS)); + assertNotNull("Reader-1 partition session is null", reader1PartitionSession.get()); + + Long assignedPartitionId = reader1PartitionSession.get().getPartitionId(); + logger.info("Reader-1 received partition: {}", assignedPartitionId); + + // Start shutdown of Reader-1 + logger.info("Before reader-1 shutdown"); + CompletableFuture reader1ShutdownFuture = reader1.shutdown(); + + // Wait for Reader-1 cleanup to start + assertTrue("Reader-1 cleanup did not start", reader1CleanupStarted.await(10, TimeUnit.SECONDS)); + logger.info("Reader-1 cleanup started"); + + // Create Reader-2 while Reader-1 is still cleaning up + AsyncReader reader2 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(reader2Executor) + .setEventHandler(new ReadEventHandler() { + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + // No-op + } + + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + PartitionSession session = event.getPartitionSession(); + logger.info("Reader-2: onStartPartitionSession - partition={}, session={}", + session.getPartitionId(), session.getId()); + + // Check if Reader-1 is still cleaning up + if (reader1CleanupInProgress.get()) { + logger.error("RACE CONDITION DETECTED: Reader-2 received partition {} while Reader-1 is still cleaning up", + session.getPartitionId()); + raceConditionDetected.set(true); + } + + if (!reader1CleanupCompleted.get()) { + logger.warn("Reader-2 received partition {} before Reader-1 completed cleanup", + session.getPartitionId()); + } + + event.confirm(); + reader2Started.countDown(); + } + }) + .build() + ); + + reader2.init().join(); + + // Give some time for Reader-2 to potentially receive the partition during Reader-1's cleanup + Thread.sleep(500); + + // Allow Reader-1 to finish cleanup + allowReader1ToFinish.countDown(); + + // Wait for Reader-1 shutdown to complete + reader1ShutdownFuture.get(10, TimeUnit.SECONDS); + logger.info("After reader-1 shutdown"); + + // Wait a bit more for partition reassignment to Reader-2 + boolean reader2GotPartition = reader2Started.await(15, TimeUnit.SECONDS); + + // Cleanup + reader2.shutdown().get(10, TimeUnit.SECONDS); + reader1Executor.shutdownNow(); + reader2Executor.shutdownNow(); + + // Assertions + assertFalse("Race condition detected: Reader-2 received partition while Reader-1 was still cleaning up", + raceConditionDetected.get()); + + if (reader2GotPartition) { + assertTrue("Reader-1 cleanup should be completed before Reader-2 receives the partition", + reader1CleanupCompleted.get()); + logger.info("Test passed: Reader-2 received partition only after Reader-1 completed cleanup"); + } else { + logger.warn("Reader-2 did not receive partition within timeout - test inconclusive"); + } + } + + /** + * Test for multiple rapid reader switches: verifies proper event ordering and cleanup + * when partitions are rapidly reassigned between multiple readers. + */ + @Test + public void testMultipleReaderRapidSwitching() throws Exception { + logger.info("Starting testMultipleReaderRapidSwitching"); + + AtomicInteger activeReaders = new AtomicInteger(0); + AtomicInteger maxConcurrentOwners = new AtomicInteger(0); + List eventSequence = Collections.synchronizedList(new ArrayList<>()); + AtomicBoolean concurrentOwnershipDetected = new AtomicBoolean(false); + + // Create 3 readers that will be started and stopped rapidly + for (int readerNum = 1; readerNum <= 3; readerNum++) { + final int readerIndex = readerNum; + ExecutorService executor = Executors.newSingleThreadExecutor( + r -> new Thread(r, "reader-" + readerIndex + "-executor")); + + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(testTopic) + .build()) + .setConsumerName(TEST_CONSUMER) + .build(); + + AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(executor) + .setEventHandler(new ReadEventHandler() { + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + // No-op + } + + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + String logMsg = String.format("Reader-%d: START partition %d, session %d", + readerIndex, event.getPartitionSession().getPartitionId(), + event.getPartitionSession().getId()); + eventSequence.add(logMsg); + logger.info(logMsg); + + int current = activeReaders.incrementAndGet(); + maxConcurrentOwners.updateAndGet(max -> Math.max(max, current)); + + if (current > 1) { + logger.error("Multiple readers simultaneously own partitions: {}", current); + concurrentOwnershipDetected.set(true); + } + + event.confirm(); + } + + @Override + public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) { + String logMsg = String.format("Reader-%d: STOP partition %d, session %d", + readerIndex, event.getPartitionId(), event.getPartitionSessionId()); + eventSequence.add(logMsg); + logger.info(logMsg); + event.confirm(); + } + + @Override + public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { + String logMsg = String.format("Reader-%d: CLOSED partition %d, session %d", + readerIndex, event.getPartitionSession().getPartitionId(), + event.getPartitionSession().getId()); + eventSequence.add(logMsg); + logger.info(logMsg); + + activeReaders.decrementAndGet(); + } + }) + .build() + ); + + reader.init().join(); + + // Send a message + if (readerIndex == 1) { + sendMessage("test-message-" + readerIndex); + } + + // Wait a bit + Thread.sleep(200); + + // Shutdown reader + logger.info("Shutting down reader-{}", readerIndex); + reader.shutdown().get(10, TimeUnit.SECONDS); + + executor.shutdownNow(); + + // Small delay between readers + Thread.sleep(100); + } + + logger.info("Event sequence: {}", eventSequence); + logger.info("Max concurrent owners: {}", maxConcurrentOwners.get()); + + assertFalse("Concurrent partition ownership detected", concurrentOwnershipDetected.get()); + assertEquals("Multiple readers should not own partitions concurrently", 1, maxConcurrentOwners.get()); + } +} From e046fd5554d483177de68f9a3056bb0898666ba4 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Mon, 2 Mar 2026 21:22:35 +0300 Subject: [PATCH 2/8] Change event ordering --- .../ydb/topic/read/impl/AsyncReaderImpl.java | 14 +++--- .../tech/ydb/topic/read/impl/ReaderImpl.java | 44 +++++++++++++------ .../ydb/topic/read/impl/SyncReaderImpl.java | 8 ++-- 3 files changed, 43 insertions(+), 23 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index cf048948a..2173b79ef 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -137,32 +137,32 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta } @Override - protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, - PartitionSession partitionSession, Runnable confirmCallback) { + protected CompletableFuture handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, + PartitionSession partitionSession, Runnable confirmCallback) { final long committedOffset = request.getCommittedOffset(); final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSession, committedOffset, confirmCallback); - handlerExecutor.execute(() -> { + return CompletableFuture.runAsync(() -> { try { eventHandler.onStopPartitionSession(event); } catch (Throwable th) { logUserThrowableAndStopWorking(th, "onStopPartitionSession"); throw th; } - }); + }, handlerExecutor); } @Override - protected void handleClosePartitionSession(tech.ydb.topic.read.PartitionSession partitionSession) { + protected CompletableFuture handleClosePartitionSession(tech.ydb.topic.read.PartitionSession partitionSession) { final PartitionSessionClosedEvent event = new PartitionSessionClosedEventImpl(partitionSession); - handlerExecutor.execute(() -> { + return CompletableFuture.runAsync(() -> { try { eventHandler.onPartitionSessionClosed(event); } catch (Throwable th) { logUserThrowableAndStopWorking(th, "onPartitionSessionClosed"); throw th; } - }); + }, handlerExecutor); } protected CompletableFuture handleReaderClosed() { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 88e9f2cda..ea86cea26 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -109,17 +109,22 @@ protected CompletableFuture initImpl() { } protected abstract CompletableFuture handleDataReceivedEvent(DataReceivedEvent event); + protected abstract void handleSessionStarted(String sessionId); + protected abstract void handleCommitResponse(long committedOffset, PartitionSession partitionSession); + protected abstract void handleStartPartitionSessionRequest( YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, Consumer confirmCallback); - protected abstract void handleStopPartitionSession( + + protected abstract CompletableFuture handleStopPartitionSession( YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, PartitionSession partitionSession, Runnable confirmCallback); - protected abstract void handleClosePartitionSession(PartitionSession partitionSession); + + protected abstract CompletableFuture handleClosePartitionSession(PartitionSession partitionSession); @Override protected void onStreamReconnect() { @@ -193,7 +198,7 @@ protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransactio topicOffsets.forEach(partitionOffsets -> { YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.Builder partitionOffsetsBuilder = YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.newBuilder() - .setPartitionId(partitionOffsets.getPartitionSession().getPartitionId()); + .setPartitionId(partitionOffsets.getPartitionSession().getPartitionId()); partitionOffsets.getOffsets().forEach(offsetsRange -> partitionOffsetsBuilder.addPartitionOffsets( YdbTopic.OffsetsRange.newBuilder() .setStart(offsetsRange.getStart()) @@ -219,6 +224,7 @@ class ReadSessionImpl extends ReadSession { private final AtomicLong sizeBytesToRequest = new AtomicLong(0); private final MessageDecoder decoder; private final Map partitionSessions = new ConcurrentHashMap<>(); + private ReadSessionImpl(Executor decompressionExecutor) { super(topicRpc, id + '.' + seqNumberCounter.incrementAndGet()); this.decoder = new MessageDecoder(settings.getMaxMemoryUsageBytes(), decompressionExecutor); @@ -277,7 +283,7 @@ private void sendReadRequest() { } private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSession, - StartPartitionSessionSettings startSettings) { + StartPartitionSessionSettings startSettings) { if (!isWorking.get()) { logger.info("[{}] Need to send StartPartitionSessionResponse, but reading session is already closed", partitionSession.getFullId()); @@ -335,7 +341,7 @@ private void sendStopPartitionSessionResponse(long partitionSessionId) { } private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, - List rangesToCommit) { + List rangesToCommit) { if (!isWorking.get()) { if (logger.isInfoEnabled()) { StringBuilder message = new StringBuilder("[").append(streamId) @@ -355,8 +361,8 @@ private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, } YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder builder = - YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() - .setPartitionSessionId(partitionSessionId); + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() + .setPartitionSessionId(partitionSessionId); rangesToCommit.forEach(range -> { builder.addOffsets(YdbTopic.OffsetsRange.newBuilder() .setStart(range.getStart()) @@ -370,13 +376,25 @@ private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, } private void closePartitionSessions() { - partitionSessions.values().forEach(this::closePartitionSession); + List> closeFutures = new ArrayList<>(); + partitionSessions.values().forEach(partitionSession -> + closeFutures.add(closePartitionSession(partitionSession)) + ); + // Wait for all partition session close callbacks to complete before proceeding + if (!closeFutures.isEmpty()) { + for (int i = 0; i < closeFutures.size(); i++) { + CompletableFuture z = closeFutures.get(i); + z.join(); + } + // CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0])).join(); + } partitionSessions.clear(); } - private void closePartitionSession(PartitionSessionImpl partitionSession) { + + private CompletableFuture closePartitionSession(PartitionSessionImpl partitionSession) { partitionSession.shutdown(); - handleClosePartitionSession(partitionSession.getSessionId()); + return handleClosePartitionSession(partitionSession.getSessionId()); } private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) { @@ -427,7 +445,7 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart if (partitionSession != null) { logger.info("[{}] Received graceful StopPartitionSessionRequest", partitionSession.getFullId()); handleStopPartitionSession(request, partitionSession.getSessionId(), - () -> sendStopPartitionSessionResponse(request.getPartitionSessionId())); + () -> sendStopPartitionSessionResponse(request.getPartitionSessionId())).join(); } else { logger.error("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " + "but have no such partition session active", streamId, request.getPartitionSessionId()); @@ -440,7 +458,7 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart PartitionSessionImpl partitionSession = partitionSessions.remove(request.getPartitionSessionId()); if (partitionSession != null) { logger.info("[{}] Received force StopPartitionSessionRequest", partitionSession.getFullId()); - closePartitionSession(partitionSession); + closePartitionSession(partitionSession).join(); } else { logger.info("[{}] Received force StopPartitionSessionRequest for partition session {}, " + "but have no such partition session running", streamId, request.getPartitionSessionId()); @@ -462,7 +480,7 @@ private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse batchReadFutures.add(readFuture); } else { logger.info("[{}] Received PartitionData for unknown(most likely already closed) " + - "PartitionSessionId={}", streamId, partitionId); + "PartitionSessionId={}", streamId, partitionId); } }); CompletableFuture.allOf(batchReadFutures.toArray(new CompletableFuture[0])) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index 7c9a43ccf..f3faf2db1 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -187,14 +187,16 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta } @Override - protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, - PartitionSession partitionSession, Runnable confirmCallback) { + protected CompletableFuture handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, + PartitionSession partitionSession, Runnable confirmCallback) { confirmCallback.run(); + return null; } @Override - protected void handleClosePartitionSession(PartitionSession partitionSession) { + protected CompletableFuture handleClosePartitionSession(PartitionSession partitionSession) { logger.debug("ClosePartitionSession event received. Ignoring."); + return null; } @Override From 5e7a9a73df853ef870c461abf5e583a04f6bcf01 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Tue, 3 Mar 2026 23:18:30 +0300 Subject: [PATCH 3/8] Add tests --- .../impl/TopicReaderEventOrderingTest.java | 325 ++++++++++-------- 1 file changed, 182 insertions(+), 143 deletions(-) diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java index 46711f008..dfbf6c573 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java @@ -4,8 +4,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,6 +31,7 @@ import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.ReadEventHandler; import tech.ydb.topic.settings.CreateTopicSettings; +import tech.ydb.topic.settings.PartitioningSettings; import tech.ydb.topic.settings.ReadEventHandlersSettings; import tech.ydb.topic.settings.ReaderSettings; import tech.ydb.topic.settings.TopicReadSettings; @@ -63,6 +66,8 @@ public class TopicReaderEventOrderingTest { private static final String TEST_CONSUMER = "test-consumer"; + private static final int partitionCount = 2; + private TopicClient client; private String testTopic; @@ -74,6 +79,10 @@ public void setup() { client = TopicClient.newClient(ydbTransport).build(); client.createTopic(testTopic, CreateTopicSettings.newBuilder() .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER).build()) + .setPartitioningSettings(PartitioningSettings. + newBuilder() + .setMinActivePartitions(partitionCount) + .build()) .build() ).join().expectSuccess("Failed to create test topic"); } @@ -110,12 +119,24 @@ private void sendMessage(String data) throws Exception { public void testEventOrderingGuarantees() throws Exception { logger.info("Starting testEventOrderingGuarantees"); + // Per-partition tracking: partitionId -> whether Reader-1 has confirmed its stop. + // Using partitionId (not sessionId) as the key because Reader-2's new session + // for the same partition will have a different sessionId. + ConcurrentHashMap partitionStopConfirmed = new ConcurrentHashMap<>(); + // Track events for each partition session - List eventLog = Collections.synchronizedList(new ArrayList<>()); + Map> eventLog = new ConcurrentHashMap<>(); + for (long i = 0; i < partitionCount; i++) { + eventLog.put(i, Collections.synchronizedList(new ArrayList<>())); + } + + Map activeSessions = new ConcurrentHashMap<>(); + CountDownLatch startReceived = new CountDownLatch(1); CountDownLatch stopReceived = new CountDownLatch(1); CountDownLatch closeReceived = new CountDownLatch(1); AtomicBoolean orderingViolation = new AtomicBoolean(false); + AtomicBoolean raceConditionDetected = new AtomicBoolean(false); ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "test-event-executor")); @@ -129,53 +150,39 @@ public void testEventOrderingGuarantees() throws Exception { AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() .setExecutor(executor) .setEventHandler(new ReadEventHandler() { - private final AtomicReference activeSession = new AtomicReference<>(); + @Override public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { - eventLog.add("onMessages[session=" + event.getPartitionSession().getId() + "]"); + long partitionId = event.getPartitionSession().getPartitionId(); + eventLog.get(partitionId).add("onMessages[session=" + event.getPartitionSession().getId() + "]"); } @Override public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { long sessionId = event.getPartitionSession().getId(); - eventLog.add("onStartPartitionSession[session=" + sessionId + "]"); + long partitionId = event.getPartitionSession().getPartitionId(); + eventLog.get(partitionId).add("onStartPartitionSession[partitionId = " + partitionId + ",session=" + sessionId + "]"); logger.info("onStartPartitionSession: session={}", sessionId); - if (activeSession.get() != null) { - logger.error("START event received while session {} is still active", activeSession.get()); + if (activeSessions.get(partitionId) != null) { + logger.error("START event received while session {} is still active", activeSessions.get(partitionId)); orderingViolation.set(true); } - activeSession.set(sessionId); + // Register this partition as "not yet stopped" before confirming. + partitionStopConfirmed.put(partitionId, new AtomicBoolean(false)); + activeSessions.put(partitionId, sessionId); event.confirm(); startReceived.countDown(); } - @Override - public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) { - long sessionId = event.getPartitionSessionId(); - eventLog.add("onStopPartitionSession[session=" + sessionId + "]"); - logger.info("onStopPartitionSession: session={}", sessionId); - - if (activeSession.get() == null) { - logger.error("STOP event received without corresponding START event"); - orderingViolation.set(true); - } else if (!activeSession.get().equals(sessionId)) { - logger.error("STOP event for session {} but active session is {}", - sessionId, activeSession.get()); - orderingViolation.set(true); - } - - event.confirm(); - stopReceived.countDown(); - } - @Override public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { long sessionId = event.getPartitionSession().getId(); - eventLog.add("onPartitionSessionClosed[session=" + sessionId + "]"); + long partitionId = event.getPartitionSession().getPartitionId(); + eventLog.get(partitionId).add("onPartitionSessionClosed[partitionId =" + partitionId + ",session=" + sessionId + "]"); logger.info("onPartitionSessionClosed: session={}", sessionId); - activeSession.set(null); + activeSessions.remove(partitionId); closeReceived.countDown(); } }) @@ -202,21 +209,156 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession logger.info("Event log: {}", eventLog); assertFalse("Event ordering violation detected", orderingViolation.get()); - // Verify event sequence - int startIndex = -1; - int stopIndex = -1; - for (int i = 0; i < eventLog.size(); i++) { - if (eventLog.get(i).startsWith("onStartPartitionSession")) { - startIndex = i; - } - if (eventLog.get(i).startsWith("onPartitionSessionClosed")) { - stopIndex = i; + for (long partitionId = 0; partitionId < partitionCount; partitionId++) { + // Verify event sequence + int startIndex = -1; + int stopIndex = -1; + for (int i = 0; i < eventLog.size(); i++) { + if (eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { + startIndex = i; + } + if (eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) { + stopIndex = i; + } } + + assertTrue("Start event should be present", startIndex >= 0); + assertTrue("Close event should be present", stopIndex >= 0); + assertTrue("Start event must come before Stop event", startIndex < stopIndex); } + } + + /** + * Test that verifies onStopPartitionSession is called when the server gracefully + * reassigns a partition to a second reader joining the same consumer group. + * + * A graceful StopPartitionSessionRequest is sent by the server when it needs to + * rebalance partition ownership between active readers. This is distinct from a + * force stop, which happens on client-side shutdown (reader.shutdown()) and only + * triggers onPartitionSessionClosed. + */ + @Test + public void testOnStopPartitionSessionCalledOnGracefulReassignment() throws Exception { + logger.info("Starting testOnStopPartitionSessionCalledOnGracefulReassignment"); + + // Per-partition tracking: partitionId -> whether Reader-1 has confirmed its stop. + // Using partitionId (not sessionId) as the key because Reader-2's new session + // for the same partition will have a different sessionId. + ConcurrentHashMap partitionStopConfirmed = new ConcurrentHashMap<>(); + + // With setMinActivePartitions(2) the topic has 2 partitions; Reader-1 should get both. + CountDownLatch reader1AllPartitionsAssigned = new CountDownLatch(2); + CountDownLatch reader1StopReceived = new CountDownLatch(1); + AtomicBoolean stopPartitionSessionCalled = new AtomicBoolean(false); + AtomicBoolean raceConditionDetected = new AtomicBoolean(false); + CountDownLatch reader2StartReceived = new CountDownLatch(1); + + ExecutorService executor1 = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-1-executor")); + ExecutorService executor2 = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-2-executor")); + + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(testTopic) + .build()) + .setConsumerName(TEST_CONSUMER) + .build(); + + AsyncReader reader1 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(executor1) + .setEventHandler(new ReadEventHandler() { + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + } + + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); + logger.info("Reader-1: onStartPartitionSession partitionId={}", partitionId); + // Register this partition as "not yet stopped" before confirming. + partitionStopConfirmed.put(partitionId, new AtomicBoolean(false)); + event.confirm(); + reader1AllPartitionsAssigned.countDown(); + } + + @Override + public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); + logger.info("Reader-1: onStopPartitionSession partitionId={}", partitionId); + stopPartitionSessionCalled.set(true); + // Mark stop confirmed BEFORE event.confirm(). The server only sends + // StartPartitionSession to Reader-2 after receiving this confirm, so + // Reader-2 is guaranteed to see the flag as true when it starts. + partitionStopConfirmed.get(partitionId).set(true); + event.confirm(); + reader1StopReceived.countDown(); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .build() + ); + + reader1.init().join(); + + // Wait for Reader-1 to hold all partitions before Reader-2 joins, so that the + // server must perform a graceful reassignment rather than a direct assignment. + assertTrue("Reader-1 did not receive all partitions", + reader1AllPartitionsAssigned.await(15, TimeUnit.SECONDS)); + logger.info("Reader-1 holds {} partitions. Starting Reader-2 to trigger graceful rebalancing...", + partitionStopConfirmed.size()); + + // Reader-2 joins the same consumer group. The server rebalances by sending a + // graceful StopPartitionSessionRequest to Reader-1 and, after Reader-1 confirms, + // a StartPartitionSessionRequest to Reader-2. + AsyncReader reader2 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(executor2) + .setEventHandler(new ReadEventHandler() { + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + } + + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); + logger.info("Reader-2: onStartPartitionSession partitionId={}", partitionId); + + // If Reader-1 ever held this partition, its stop MUST have been + // confirmed before Reader-2 can receive a start. The flag was set + // before event.confirm() in Reader-1, which is what triggers this + // START message from the server, so any false here is a real race. + AtomicBoolean stopFlag = partitionStopConfirmed.get(partitionId); + if (stopFlag != null && !stopFlag.get()) { + logger.error("RACE CONDITION: Reader-2 started on partition {} before " + + "Reader-1 confirmed stop", partitionId); + raceConditionDetected.set(true); + } + + event.confirm(); + reader2StartReceived.countDown(); + } + }) + .build() + ); - assertTrue("Start event should be present", startIndex >= 0); - assertTrue("Close event should be present", stopIndex >= 0); - assertTrue("Start event must come before Stop event", startIndex < stopIndex); + reader2.init().join(); + + assertTrue("onStopPartitionSession was not called on Reader-1", + reader1StopReceived.await(15, TimeUnit.SECONDS)); + assertTrue("stopPartitionSessionCalled should be true", stopPartitionSessionCalled.get()); + + assertTrue("Reader-2 did not receive StartPartitionSession after graceful reassignment", + reader2StartReceived.await(10, TimeUnit.SECONDS)); + + assertFalse("Race condition: Reader-2 started on partition before Reader-1 confirmed stop", + raceConditionDetected.get()); + + reader1.shutdown().get(10, TimeUnit.SECONDS); + reader2.shutdown().get(10, TimeUnit.SECONDS); + executor1.shutdownNow(); + executor2.shutdownNow(); } /** @@ -387,107 +529,4 @@ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSes logger.warn("Reader-2 did not receive partition within timeout - test inconclusive"); } } - - /** - * Test for multiple rapid reader switches: verifies proper event ordering and cleanup - * when partitions are rapidly reassigned between multiple readers. - */ - @Test - public void testMultipleReaderRapidSwitching() throws Exception { - logger.info("Starting testMultipleReaderRapidSwitching"); - - AtomicInteger activeReaders = new AtomicInteger(0); - AtomicInteger maxConcurrentOwners = new AtomicInteger(0); - List eventSequence = Collections.synchronizedList(new ArrayList<>()); - AtomicBoolean concurrentOwnershipDetected = new AtomicBoolean(false); - - // Create 3 readers that will be started and stopped rapidly - for (int readerNum = 1; readerNum <= 3; readerNum++) { - final int readerIndex = readerNum; - ExecutorService executor = Executors.newSingleThreadExecutor( - r -> new Thread(r, "reader-" + readerIndex + "-executor")); - - ReaderSettings readerSettings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder() - .setPath(testTopic) - .build()) - .setConsumerName(TEST_CONSUMER) - .build(); - - AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() - .setExecutor(executor) - .setEventHandler(new ReadEventHandler() { - @Override - public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { - // No-op - } - - @Override - public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { - String logMsg = String.format("Reader-%d: START partition %d, session %d", - readerIndex, event.getPartitionSession().getPartitionId(), - event.getPartitionSession().getId()); - eventSequence.add(logMsg); - logger.info(logMsg); - - int current = activeReaders.incrementAndGet(); - maxConcurrentOwners.updateAndGet(max -> Math.max(max, current)); - - if (current > 1) { - logger.error("Multiple readers simultaneously own partitions: {}", current); - concurrentOwnershipDetected.set(true); - } - - event.confirm(); - } - - @Override - public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) { - String logMsg = String.format("Reader-%d: STOP partition %d, session %d", - readerIndex, event.getPartitionId(), event.getPartitionSessionId()); - eventSequence.add(logMsg); - logger.info(logMsg); - event.confirm(); - } - - @Override - public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { - String logMsg = String.format("Reader-%d: CLOSED partition %d, session %d", - readerIndex, event.getPartitionSession().getPartitionId(), - event.getPartitionSession().getId()); - eventSequence.add(logMsg); - logger.info(logMsg); - - activeReaders.decrementAndGet(); - } - }) - .build() - ); - - reader.init().join(); - - // Send a message - if (readerIndex == 1) { - sendMessage("test-message-" + readerIndex); - } - - // Wait a bit - Thread.sleep(200); - - // Shutdown reader - logger.info("Shutting down reader-{}", readerIndex); - reader.shutdown().get(10, TimeUnit.SECONDS); - - executor.shutdownNow(); - - // Small delay between readers - Thread.sleep(100); - } - - logger.info("Event sequence: {}", eventSequence); - logger.info("Max concurrent owners: {}", maxConcurrentOwners.get()); - - assertFalse("Concurrent partition ownership detected", concurrentOwnershipDetected.get()); - assertEquals("Multiple readers should not own partitions concurrently", 1, maxConcurrentOwners.get()); - } } From 742c441918be23556534936888b6d8a03386f0ab Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Thu, 5 Mar 2026 21:51:39 +0300 Subject: [PATCH 4/8] Change test --- .../impl/TopicReaderEventOrderingTest.java | 226 +++++------------- 1 file changed, 54 insertions(+), 172 deletions(-) diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java index dfbf6c573..50b64b0f1 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java @@ -3,6 +3,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -47,11 +48,11 @@ /** * Test to verify event ordering guarantees and session close race conditions in Topic API. - * + *

* This test checks for two related problems: * 1. Event Ordering: StartPartitionSessionEvent and StopPartitionSessionEvent must be delivered in order, * ensuring stop events are not processed before their corresponding start events. - * + *

* 2. Session Close Race Condition: Server reader sessions should not be closed before onPartitionSessionClosed * and onReaderClosed callbacks complete execution. This prevents partitions from being reassigned to other * readers before the original reader has finished cleaning up its resources. @@ -66,7 +67,7 @@ public class TopicReaderEventOrderingTest { private static final String TEST_CONSUMER = "test-consumer"; - private static final int partitionCount = 2; + private static final int partitionCount = 1; private TopicClient client; private String testTopic; @@ -119,11 +120,6 @@ private void sendMessage(String data) throws Exception { public void testEventOrderingGuarantees() throws Exception { logger.info("Starting testEventOrderingGuarantees"); - // Per-partition tracking: partitionId -> whether Reader-1 has confirmed its stop. - // Using partitionId (not sessionId) as the key because Reader-2's new session - // for the same partition will have a different sessionId. - ConcurrentHashMap partitionStopConfirmed = new ConcurrentHashMap<>(); - // Track events for each partition session Map> eventLog = new ConcurrentHashMap<>(); for (long i = 0; i < partitionCount; i++) { @@ -132,11 +128,9 @@ public void testEventOrderingGuarantees() throws Exception { Map activeSessions = new ConcurrentHashMap<>(); - CountDownLatch startReceived = new CountDownLatch(1); - CountDownLatch stopReceived = new CountDownLatch(1); - CountDownLatch closeReceived = new CountDownLatch(1); + CountDownLatch startReceived = new CountDownLatch(partitionCount); + CountDownLatch closeReceived = new CountDownLatch(partitionCount); AtomicBoolean orderingViolation = new AtomicBoolean(false); - AtomicBoolean raceConditionDetected = new AtomicBoolean(false); ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "test-event-executor")); @@ -151,7 +145,6 @@ public void testEventOrderingGuarantees() throws Exception { .setExecutor(executor) .setEventHandler(new ReadEventHandler() { - @Override public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { long partitionId = event.getPartitionSession().getPartitionId(); @@ -169,8 +162,7 @@ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSes logger.error("START event received while session {} is still active", activeSessions.get(partitionId)); orderingViolation.set(true); } - // Register this partition as "not yet stopped" before confirming. - partitionStopConfirmed.put(partitionId, new AtomicBoolean(false)); + activeSessions.put(partitionId, sessionId); event.confirm(); startReceived.countDown(); @@ -213,11 +205,11 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession // Verify event sequence int startIndex = -1; int stopIndex = -1; - for (int i = 0; i < eventLog.size(); i++) { - if (eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { + for (int i = 0; i < eventLog.get(partitionId).size(); i++) { + if (startIndex == - 1 && eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { startIndex = i; } - if (eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) { + if (stopIndex == -1 && eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) { stopIndex = i; } } @@ -228,139 +220,6 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession } } - /** - * Test that verifies onStopPartitionSession is called when the server gracefully - * reassigns a partition to a second reader joining the same consumer group. - * - * A graceful StopPartitionSessionRequest is sent by the server when it needs to - * rebalance partition ownership between active readers. This is distinct from a - * force stop, which happens on client-side shutdown (reader.shutdown()) and only - * triggers onPartitionSessionClosed. - */ - @Test - public void testOnStopPartitionSessionCalledOnGracefulReassignment() throws Exception { - logger.info("Starting testOnStopPartitionSessionCalledOnGracefulReassignment"); - - // Per-partition tracking: partitionId -> whether Reader-1 has confirmed its stop. - // Using partitionId (not sessionId) as the key because Reader-2's new session - // for the same partition will have a different sessionId. - ConcurrentHashMap partitionStopConfirmed = new ConcurrentHashMap<>(); - - // With setMinActivePartitions(2) the topic has 2 partitions; Reader-1 should get both. - CountDownLatch reader1AllPartitionsAssigned = new CountDownLatch(2); - CountDownLatch reader1StopReceived = new CountDownLatch(1); - AtomicBoolean stopPartitionSessionCalled = new AtomicBoolean(false); - AtomicBoolean raceConditionDetected = new AtomicBoolean(false); - CountDownLatch reader2StartReceived = new CountDownLatch(1); - - ExecutorService executor1 = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-1-executor")); - ExecutorService executor2 = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-2-executor")); - - ReaderSettings readerSettings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder() - .setPath(testTopic) - .build()) - .setConsumerName(TEST_CONSUMER) - .build(); - - AsyncReader reader1 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() - .setExecutor(executor1) - .setEventHandler(new ReadEventHandler() { - @Override - public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { - } - - @Override - public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { - long partitionId = event.getPartitionSession().getPartitionId(); - logger.info("Reader-1: onStartPartitionSession partitionId={}", partitionId); - // Register this partition as "not yet stopped" before confirming. - partitionStopConfirmed.put(partitionId, new AtomicBoolean(false)); - event.confirm(); - reader1AllPartitionsAssigned.countDown(); - } - - @Override - public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) { - long partitionId = event.getPartitionSession().getPartitionId(); - logger.info("Reader-1: onStopPartitionSession partitionId={}", partitionId); - stopPartitionSessionCalled.set(true); - // Mark stop confirmed BEFORE event.confirm(). The server only sends - // StartPartitionSession to Reader-2 after receiving this confirm, so - // Reader-2 is guaranteed to see the flag as true when it starts. - partitionStopConfirmed.get(partitionId).set(true); - event.confirm(); - reader1StopReceived.countDown(); - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }) - .build() - ); - - reader1.init().join(); - - // Wait for Reader-1 to hold all partitions before Reader-2 joins, so that the - // server must perform a graceful reassignment rather than a direct assignment. - assertTrue("Reader-1 did not receive all partitions", - reader1AllPartitionsAssigned.await(15, TimeUnit.SECONDS)); - logger.info("Reader-1 holds {} partitions. Starting Reader-2 to trigger graceful rebalancing...", - partitionStopConfirmed.size()); - - // Reader-2 joins the same consumer group. The server rebalances by sending a - // graceful StopPartitionSessionRequest to Reader-1 and, after Reader-1 confirms, - // a StartPartitionSessionRequest to Reader-2. - AsyncReader reader2 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() - .setExecutor(executor2) - .setEventHandler(new ReadEventHandler() { - @Override - public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { - } - - @Override - public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { - long partitionId = event.getPartitionSession().getPartitionId(); - logger.info("Reader-2: onStartPartitionSession partitionId={}", partitionId); - - // If Reader-1 ever held this partition, its stop MUST have been - // confirmed before Reader-2 can receive a start. The flag was set - // before event.confirm() in Reader-1, which is what triggers this - // START message from the server, so any false here is a real race. - AtomicBoolean stopFlag = partitionStopConfirmed.get(partitionId); - if (stopFlag != null && !stopFlag.get()) { - logger.error("RACE CONDITION: Reader-2 started on partition {} before " + - "Reader-1 confirmed stop", partitionId); - raceConditionDetected.set(true); - } - - event.confirm(); - reader2StartReceived.countDown(); - } - }) - .build() - ); - - reader2.init().join(); - - assertTrue("onStopPartitionSession was not called on Reader-1", - reader1StopReceived.await(15, TimeUnit.SECONDS)); - assertTrue("stopPartitionSessionCalled should be true", stopPartitionSessionCalled.get()); - - assertTrue("Reader-2 did not receive StartPartitionSession after graceful reassignment", - reader2StartReceived.await(10, TimeUnit.SECONDS)); - - assertFalse("Race condition: Reader-2 started on partition before Reader-1 confirmed stop", - raceConditionDetected.get()); - - reader1.shutdown().get(10, TimeUnit.SECONDS); - reader2.shutdown().get(10, TimeUnit.SECONDS); - executor1.shutdownNow(); - executor2.shutdownNow(); - } - /** * Test for session close race condition: verifies that partitions are not reassigned to other readers * before the original reader completes its cleanup in onPartitionSessionClosed and onReaderClosed callbacks. @@ -368,15 +227,17 @@ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSes @Test public void testSessionCloseRaceCondition() throws Exception { logger.info("Starting testSessionCloseRaceCondition"); - // Shared state to track the race condition - AtomicReference reader1PartitionSession = new AtomicReference<>(); - AtomicBoolean reader1CleanupInProgress = new AtomicBoolean(false); - AtomicBoolean reader1CleanupCompleted = new AtomicBoolean(false); + ConcurrentHashMap> reader1PartitionSession = new ConcurrentHashMap<>(); + ConcurrentHashMap reader1CleanupInProgress = new ConcurrentHashMap<>(); + ConcurrentHashMap reader1CleanupCompleted = new ConcurrentHashMap<>(); + for (long i = 0; i < partitionCount; i++) { + reader1CleanupCompleted.put(i, new AtomicBoolean(false)); + } AtomicBoolean raceConditionDetected = new AtomicBoolean(false); - CountDownLatch reader1Started = new CountDownLatch(1); - CountDownLatch reader1CleanupStarted = new CountDownLatch(1); - CountDownLatch reader2Started = new CountDownLatch(1); + CountDownLatch reader1Started = new CountDownLatch(partitionCount); + CountDownLatch reader1CleanupStarted = new CountDownLatch(partitionCount); + CountDownLatch reader2Started = new CountDownLatch(partitionCount); CountDownLatch allowReader1ToFinish = new CountDownLatch(1); // Create two single-threaded executors to simulate the scenario @@ -401,22 +262,38 @@ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { @Override public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); PartitionSession session = event.getPartitionSession(); logger.info("Reader-1: onStartPartitionSession - partition={}, session={}", session.getPartitionId(), session.getId()); - reader1PartitionSession.set(session); + reader1PartitionSession.compute(partitionId, (k, ref) -> { + if (ref == null) { + ref = new AtomicReference<>(); + } + ref.set(session); + return ref; + }); + event.confirm(); reader1Started.countDown(); } @Override public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); PartitionSession session = event.getPartitionSession(); logger.info("Reader-1: onPartitionSessionClosed - partition={}, session={}", session.getPartitionId(), session.getId()); logger.info("Reader-1: before closing resources"); - reader1CleanupInProgress.set(true); + reader1CleanupInProgress.compute(partitionId, (k, ref) -> { + if (ref == null) { + ref = new AtomicBoolean(); + } + ref.set(true); + return ref; + }); + reader1CleanupStarted.countDown(); // Simulate slow cleanup (e.g., closing database connections, flushing buffers) @@ -431,8 +308,10 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession } logger.info("Reader-1: after closing resources"); - reader1CleanupInProgress.set(false); - reader1CleanupCompleted.set(true); + + reader1CleanupInProgress.get(partitionId).set(false); + reader1CleanupCompleted.get(partitionId).set(true); + } @Override @@ -450,17 +329,17 @@ public void onReaderClosed(tech.ydb.topic.read.events.ReaderClosedEvent event) { // Wait for Reader-1 to receive partition assertTrue("Reader-1 did not receive partition", reader1Started.await(10, TimeUnit.SECONDS)); - assertNotNull("Reader-1 partition session is null", reader1PartitionSession.get()); - - Long assignedPartitionId = reader1PartitionSession.get().getPartitionId(); - logger.info("Reader-1 received partition: {}", assignedPartitionId); + for (Map.Entry> v : reader1PartitionSession.entrySet()) { + assertNotNull("Reader-1 partition session is null", v.getValue().get()); + logger.info("Reader-1 received partition: {}", v.getKey()); + } // Start shutdown of Reader-1 logger.info("Before reader-1 shutdown"); CompletableFuture reader1ShutdownFuture = reader1.shutdown(); // Wait for Reader-1 cleanup to start - assertTrue("Reader-1 cleanup did not start", reader1CleanupStarted.await(10, TimeUnit.SECONDS)); + assertTrue("Reader-1 cleanup did not start", reader1CleanupStarted.await(60, TimeUnit.SECONDS)); logger.info("Reader-1 cleanup started"); // Create Reader-2 while Reader-1 is still cleaning up @@ -474,18 +353,19 @@ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { @Override public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); PartitionSession session = event.getPartitionSession(); logger.info("Reader-2: onStartPartitionSession - partition={}, session={}", session.getPartitionId(), session.getId()); // Check if Reader-1 is still cleaning up - if (reader1CleanupInProgress.get()) { + if (reader1CleanupInProgress.get(partitionId).get()) { logger.error("RACE CONDITION DETECTED: Reader-2 received partition {} while Reader-1 is still cleaning up", session.getPartitionId()); raceConditionDetected.set(true); } - if (!reader1CleanupCompleted.get()) { + if (!reader1CleanupCompleted.get(partitionId).get()) { logger.warn("Reader-2 received partition {} before Reader-1 completed cleanup", session.getPartitionId()); } @@ -522,9 +402,11 @@ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSes raceConditionDetected.get()); if (reader2GotPartition) { - assertTrue("Reader-1 cleanup should be completed before Reader-2 receives the partition", - reader1CleanupCompleted.get()); - logger.info("Test passed: Reader-2 received partition only after Reader-1 completed cleanup"); + for (Map.Entry v : reader1CleanupCompleted.entrySet()) { + assertTrue("Reader-1 cleanup should be completed before Reader-2 receives the partition : " + v.getKey(), + v.getValue().get()); + logger.info("Test passed: Reader-2 received partition only after Reader-1 completed cleanup, partition {}", v.getKey()); + } } else { logger.warn("Reader-2 did not receive partition within timeout - test inconclusive"); } From 62e4150a04e3ee6d9bf76d728ab646f9fd425d3d Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Thu, 5 Mar 2026 22:10:18 +0300 Subject: [PATCH 5/8] Remove future in partition stop --- .../ydb/topic/read/impl/AsyncReaderImpl.java | 8 ++++---- .../tech/ydb/topic/read/impl/ReaderImpl.java | 17 +++++------------ .../ydb/topic/read/impl/SyncReaderImpl.java | 7 +++---- .../impl/TopicReaderEventOrderingTest.java | 14 +++----------- 4 files changed, 15 insertions(+), 31 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 2173b79ef..66705ffdd 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -137,19 +137,19 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta } @Override - protected CompletableFuture handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, - PartitionSession partitionSession, Runnable confirmCallback) { + protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, + PartitionSession partitionSession, Runnable confirmCallback) { final long committedOffset = request.getCommittedOffset(); final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSession, committedOffset, confirmCallback); - return CompletableFuture.runAsync(() -> { + handlerExecutor.execute(() -> { try { eventHandler.onStopPartitionSession(event); } catch (Throwable th) { logUserThrowableAndStopWorking(th, "onStopPartitionSession"); throw th; } - }, handlerExecutor); + }); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index ea86cea26..7c671f538 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -109,17 +109,13 @@ protected CompletableFuture initImpl() { } protected abstract CompletableFuture handleDataReceivedEvent(DataReceivedEvent event); - protected abstract void handleSessionStarted(String sessionId); - protected abstract void handleCommitResponse(long committedOffset, PartitionSession partitionSession); - protected abstract void handleStartPartitionSessionRequest( YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, Consumer confirmCallback); - - protected abstract CompletableFuture handleStopPartitionSession( + protected abstract void handleStopPartitionSession( YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, PartitionSession partitionSession, Runnable confirmCallback); @@ -224,7 +220,6 @@ class ReadSessionImpl extends ReadSession { private final AtomicLong sizeBytesToRequest = new AtomicLong(0); private final MessageDecoder decoder; private final Map partitionSessions = new ConcurrentHashMap<>(); - private ReadSessionImpl(Executor decompressionExecutor) { super(topicRpc, id + '.' + seqNumberCounter.incrementAndGet()); this.decoder = new MessageDecoder(settings.getMaxMemoryUsageBytes(), decompressionExecutor); @@ -380,13 +375,11 @@ private void closePartitionSessions() { partitionSessions.values().forEach(partitionSession -> closeFutures.add(closePartitionSession(partitionSession)) ); - // Wait for all partition session close callbacks to complete before proceeding + if (!closeFutures.isEmpty()) { - for (int i = 0; i < closeFutures.size(); i++) { - CompletableFuture z = closeFutures.get(i); - z.join(); + for (CompletableFuture closeFuture : closeFutures) { + closeFuture.join(); } - // CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0])).join(); } partitionSessions.clear(); } @@ -445,7 +438,7 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart if (partitionSession != null) { logger.info("[{}] Received graceful StopPartitionSessionRequest", partitionSession.getFullId()); handleStopPartitionSession(request, partitionSession.getSessionId(), - () -> sendStopPartitionSessionResponse(request.getPartitionSessionId())).join(); + () -> sendStopPartitionSessionResponse(request.getPartitionSessionId())); } else { logger.error("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " + "but have no such partition session active", streamId, request.getPartitionSessionId()); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index f3faf2db1..0a5df08c4 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -187,16 +187,15 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta } @Override - protected CompletableFuture handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, - PartitionSession partitionSession, Runnable confirmCallback) { + protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, + PartitionSession partitionSession, Runnable confirmCallback) { confirmCallback.run(); - return null; } @Override protected CompletableFuture handleClosePartitionSession(PartitionSession partitionSession) { logger.debug("ClosePartitionSession event received. Ignoring."); - return null; + return CompletableFuture.completedFuture(null); } @Override diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java index 50b64b0f1..e18db4de7 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java @@ -1,9 +1,7 @@ package tech.ydb.topic.impl; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -14,7 +12,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; @@ -40,11 +37,9 @@ import tech.ydb.topic.write.Message; import tech.ydb.topic.write.SyncWriter; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Test to verify event ordering guarantees and session close race conditions in Topic API. @@ -56,8 +51,6 @@ * 2. Session Close Race Condition: Server reader sessions should not be closed before onPartitionSessionClosed * and onReaderClosed callbacks complete execution. This prevents partitions from being reassigned to other * readers before the original reader has finished cleaning up its resources. - * - * @author Generated Test */ public class TopicReaderEventOrderingTest { private static final Logger logger = LoggerFactory.getLogger(TopicReaderEventOrderingTest.class); @@ -67,7 +60,7 @@ public class TopicReaderEventOrderingTest { private static final String TEST_CONSUMER = "test-consumer"; - private static final int partitionCount = 1; + private static final int partitionCount = 2; private TopicClient client; private String testTopic; @@ -100,7 +93,7 @@ public void tearDown() { } } - private void sendMessage(String data) throws Exception { + private void sendMessage(String data) { WriterSettings settings = WriterSettings.newBuilder() .setTopicPath(testTopic) .setProducerId("test-producer") @@ -206,7 +199,7 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession int startIndex = -1; int stopIndex = -1; for (int i = 0; i < eventLog.get(partitionId).size(); i++) { - if (startIndex == - 1 && eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { + if (startIndex == -1 && eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { startIndex = i; } if (stopIndex == -1 && eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) { @@ -257,7 +250,6 @@ public void testSessionCloseRaceCondition() throws Exception { .setEventHandler(new ReadEventHandler() { @Override public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { - // No-op } @Override From bc1d9884dc8616fa4a02d40532d55895bfb8c188 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Thu, 5 Mar 2026 22:24:08 +0300 Subject: [PATCH 6/8] checkstyle fix --- .../java/tech/ydb/topic/read/impl/ReaderImpl.java | 12 ++++++------ .../ydb/topic/impl/TopicReaderEventOrderingTest.java | 2 ++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 7c671f538..faaaaa15c 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -194,7 +194,7 @@ protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransactio topicOffsets.forEach(partitionOffsets -> { YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.Builder partitionOffsetsBuilder = YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.newBuilder() - .setPartitionId(partitionOffsets.getPartitionSession().getPartitionId()); + .setPartitionId(partitionOffsets.getPartitionSession().getPartitionId()); partitionOffsets.getOffsets().forEach(offsetsRange -> partitionOffsetsBuilder.addPartitionOffsets( YdbTopic.OffsetsRange.newBuilder() .setStart(offsetsRange.getStart()) @@ -278,7 +278,7 @@ private void sendReadRequest() { } private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSession, - StartPartitionSessionSettings startSettings) { + StartPartitionSessionSettings startSettings) { if (!isWorking.get()) { logger.info("[{}] Need to send StartPartitionSessionResponse, but reading session is already closed", partitionSession.getFullId()); @@ -336,7 +336,7 @@ private void sendStopPartitionSessionResponse(long partitionSessionId) { } private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, - List rangesToCommit) { + List rangesToCommit) { if (!isWorking.get()) { if (logger.isInfoEnabled()) { StringBuilder message = new StringBuilder("[").append(streamId) @@ -356,8 +356,8 @@ private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, } YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder builder = - YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() - .setPartitionSessionId(partitionSessionId); + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() + .setPartitionSessionId(partitionSessionId); rangesToCommit.forEach(range -> { builder.addOffsets(YdbTopic.OffsetsRange.newBuilder() .setStart(range.getStart()) @@ -473,7 +473,7 @@ private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse batchReadFutures.add(readFuture); } else { logger.info("[{}] Received PartitionData for unknown(most likely already closed) " + - "PartitionSessionId={}", streamId, partitionId); + "PartitionSessionId={}", streamId, partitionId); } }); CompletableFuture.allOf(batchReadFutures.toArray(new CompletableFuture[0])) diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java index e18db4de7..c4322bd58 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java @@ -51,6 +51,8 @@ * 2. Session Close Race Condition: Server reader sessions should not be closed before onPartitionSessionClosed * and onReaderClosed callbacks complete execution. This prevents partitions from being reassigned to other * readers before the original reader has finished cleaning up its resources. + * + * @author Evgeny Kuvardin */ public class TopicReaderEventOrderingTest { private static final Logger logger = LoggerFactory.getLogger(TopicReaderEventOrderingTest.class); From cf579b941ae6ccd0cd45e185b1b4a3a1c131f514 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Fri, 6 Mar 2026 21:50:52 +0300 Subject: [PATCH 7/8] Make test more elegant --- .../ydb/topic/read/impl/AsyncReaderImpl.java | 2 +- .../impl/TopicReaderEventOrderingTest.java | 449 +++++++++++------- 2 files changed, 290 insertions(+), 161 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 66705ffdd..ae2fc8ff5 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -153,7 +153,7 @@ protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartiti } @Override - protected CompletableFuture handleClosePartitionSession(tech.ydb.topic.read.PartitionSession partitionSession) { + protected CompletableFuture handleClosePartitionSession(PartitionSession partitionSession) { final PartitionSessionClosedEvent event = new PartitionSessionClosedEventImpl(partitionSession); return CompletableFuture.runAsync(() -> { try { diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java index c4322bd58..d6e8a5054 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -62,6 +63,9 @@ public class TopicReaderEventOrderingTest { private static final String TEST_CONSUMER = "test-consumer"; + // Be careful to increment partition count! + // All single threads are stuck for 5 seconds + // Also should increase value to wait reader2GotPartition private static final int partitionCount = 2; private TopicClient client; @@ -108,14 +112,233 @@ private void sendMessage(String data) { } /** - * Test for event ordering: verifies that StopPartitionSessionEvent is never processed - * before its corresponding StartPartitionSessionEvent. + * Scenario: + * Verify that StartPartitionSessionEvent is always delivered before + * PartitionSessionClosedEvent for the same partition session. + *

+ * The test ensures that event ordering is preserved and that the client + * never receives a "close session" event before the corresponding "start session". + *

+ * Test steps: + *

+ * 1. Create structures to track events per partition. + * 2. Create AsyncReader with event handlers that log start/close events. + * 3. Initialize reader. + * 4. Send a message to trigger partition assignment. + * 5. Wait until start events are received + * 6. Shutdown reader to trigger close events + * 7. Wait until close events are received. + * 8. Verify that no ordering violation occurred. + * 9. Verify Start event occurs before Close event */ @Test public void testEventOrderingGuarantees() throws Exception { logger.info("Starting testEventOrderingGuarantees"); - // Track events for each partition session + // Step 1: Create structures to track events per partition. + StructureTest1 structureTest = getStructureForOrderGarantees(); + + // Step 2: Create AsyncReader with event handlers that log start/close events. + AsyncReader reader = getAsyncReaderForOrderGaran(structureTest.readerSettings, structureTest.executor, structureTest.eventLog, structureTest.activeSessions, structureTest.orderingViolation, structureTest.startReceived, structureTest.closeReceived); + + // Step 3: Initialize reader + reader.init().join(); + + // Step 4: Send message to trigger partition assignment + sendMessage("test-message"); + + // Step 5: Wait until start events are received + assertTrue("Start event not received", structureTest.startReceived.await(10, TimeUnit.SECONDS)); + + // Step 6: Shutdown reader to trigger close events + logger.info("Shutting down reader"); + reader.shutdown().get(10, TimeUnit.SECONDS); + + // Step 7: Wait for close events + assertTrue("Close event not received", structureTest.closeReceived.await(10, TimeUnit.SECONDS)); + + structureTest.executor.shutdownNow(); + + logger.info("Event log: {}", structureTest.eventLog); + // Step 8: Verify no ordering violations occurred + assertFalse("Event ordering violation detected", structureTest.orderingViolation.get()); + + // Step 9: Verify Start event occurs before Close event + for (long partitionId = 0; partitionId < partitionCount; partitionId++) { + // Verify event sequence + int startIndex = -1; + int stopIndex = -1; + for (int i = 0; i < structureTest.eventLog.get(partitionId).size(); i++) { + if (startIndex == -1 && structureTest.eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { + startIndex = i; + } + if (stopIndex == -1 && structureTest.eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) { + stopIndex = i; + } + } + + assertTrue("Start event should be present", startIndex >= 0); + assertTrue("Close event should be present", stopIndex >= 0); + assertTrue("Start event must come before Stop event", startIndex < stopIndex); + } + } + + /** + * Scenario: + * Verify that partition reassignment does not happen while the previous reader + * is still executing cleanup logic inside onPartitionSessionClosed. + *

+ * This test simulates a slow cleanup in Reader-1 and starts Reader-2 while + * Reader-1 is still closing the session. + *

+ * Steps: + *

+ * 1. Start Reader-1 and wait until it receives partitions. + * 2. Send a message to trigger partition assignment. + * 3. Shutdown Reader-1 to trigger session close. + * 4. Block Reader-1 cleanup to simulate slow resource release. + * 5. Start Reader-2 while Reader-1 cleanup is still in progress. + * 6. Allow Reader-1 cleanup to finish. + * 7. Wait for partition reassignment to Reader-2. + * 8. Verify that reassignment only happened after Reader-1 cleanup finished. + */ + @Test + public void testSessionCloseRaceCondition() throws Exception { + logger.info("Starting testSessionCloseRaceCondition"); + + StructureTest2 structureTest = getStructureForRaceCondition(); + + // Create Reader-1 + AsyncReader reader1 = getAsyncReader1ForRaceCondition(structureTest.readerSettings, structureTest.reader1Executor, structureTest.reader1PartitionSession, structureTest.reader1Started, structureTest.reader1CleanupInProgress, structureTest.reader1CleanupStarted, structureTest.allowReader1ToFinish, structureTest.reader1CleanupCompleted); + + // Step 1. Start Reader-1 and wait until it receives partitions. + reader1.init().join(); + + // Step 2. Send a message to trigger partition assignment. + sendMessage("test-message-1"); + + // Wait for Reader-1 to receive partition + assertTrue("Reader-1 did not receive partition", structureTest.reader1Started.await(10, TimeUnit.SECONDS)); + for (Map.Entry> v : structureTest.reader1PartitionSession.entrySet()) { + assertNotNull("Reader-1 partition session is null", v.getValue().get()); + logger.info("Reader-1 received partition: {}", v.getKey()); + } + + // Step 3.Reader-1 to trigger session close. + logger.info("Before reader-1 shutdown"); + CompletableFuture reader1ShutdownFuture = reader1.shutdown(); + + // Wait for Reader-1 cleanup to start + assertTrue("Reader-1 cleanup did not start", structureTest.reader1CleanupStarted.await(15, TimeUnit.SECONDS)); + logger.info("Reader-1 cleanup started"); + + // Create Reader-2 while Reader-1 is still cleaning up + AsyncReader reader2 = getAsyncReader2ForRaceCondition(structureTest.readerSettings, structureTest.reader2Executor, structureTest.reader1CleanupInProgress, structureTest.raceConditionDetected, structureTest.reader1CleanupCompleted, structureTest.reader2Started); + + // Step 5. Start Reader-2 while Reader-1 cleanup is still in progress. + reader2.init().join(); + + // Give some time for Reader-2 to potentially receive the partition during Reader-1's cleanup + Thread.sleep(500); + + // Step 6. Allow Reader-1 cleanup to finish. + structureTest.allowReader1ToFinish.countDown(); + + // Step 7. Wait for partition reassignment to Reader-2. + reader1ShutdownFuture.get(10, TimeUnit.SECONDS); + logger.info("After reader-1 shutdown"); + + // Wait a bit more for partition reassignment to Reader-2 + boolean reader2GotPartition = structureTest.reader2Started.await(15, TimeUnit.SECONDS); + + // Cleanup + reader2.shutdown().get(10, TimeUnit.SECONDS); + structureTest.reader1Executor.shutdownNow(); + structureTest.reader2Executor.shutdownNow(); + + // Step 8. Verify that reassignment only happened after Reader-1 cleanup finished. + assertFalse("Race condition detected: Reader-2 received partition while Reader-1 was still cleaning up", + structureTest.raceConditionDetected.get()); + + if (reader2GotPartition) { + for (Map.Entry v : structureTest.reader1CleanupCompleted.entrySet()) { + assertTrue("Reader-1 cleanup should be completed before Reader-2 receives the partition : " + v.getKey(), + v.getValue().get()); + logger.info("Test passed: Reader-2 received partition only after Reader-1 completed cleanup, partition {}", v.getKey()); + } + } else { + logger.warn("Reader-2 did not receive partition within timeout - test inconclusive"); + } + } + + private @NotNull TopicReaderEventOrderingTest.StructureTest2 getStructureForRaceCondition() { + // Map for tracking partition and attached sessions + ConcurrentHashMap> reader1PartitionSession = new ConcurrentHashMap<>(); + + // Map for tracking partition and is reader1 in cleanup. false -> reader 1 read partition is in progress + // true -> reader 1 read partition is detached from partition + ConcurrentHashMap reader1CleanupInProgress = new ConcurrentHashMap<>(); + + // Map for tracking partition and is reader1 in cleanup. false -> reader1 not started read partition or cleanUp wasn't completed + // true -> reader1 completed cleanup + ConcurrentHashMap reader1CleanupCompleted = new ConcurrentHashMap<>(); + for (long i = 0; i < partitionCount; i++) { + reader1CleanupCompleted.put(i, new AtomicBoolean(false)); + } + + // Simple value to detect race condition + AtomicBoolean raceConditionDetected = new AtomicBoolean(false); + CountDownLatch reader1Started = new CountDownLatch(partitionCount); + CountDownLatch reader1CleanupStarted = new CountDownLatch(partitionCount); + CountDownLatch reader2Started = new CountDownLatch(partitionCount); + + // Some latch in which reader1 stuck for 1 minute. Be careful to increment partition count! + // All single threads are stuck for 5 seconds + CountDownLatch allowReader1ToFinish = new CountDownLatch(1); + + // Create two single-threaded executors to simulate the scenario + ExecutorService reader1Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-1-executor")); + ExecutorService reader2Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-2-executor")); + + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(testTopic) + .build()) + .setConsumerName(TEST_CONSUMER) + .build(); + StructureTest2 structureTest2 = new StructureTest2(reader1PartitionSession, reader1CleanupInProgress, reader1CleanupCompleted, raceConditionDetected, reader1Started, reader1CleanupStarted, reader2Started, allowReader1ToFinish, reader1Executor, reader2Executor, readerSettings); + return structureTest2; + } + + private static class StructureTest2 { + public final ConcurrentHashMap> reader1PartitionSession; + public final ConcurrentHashMap reader1CleanupInProgress; + public final ConcurrentHashMap reader1CleanupCompleted; + public final AtomicBoolean raceConditionDetected; + public final CountDownLatch reader1Started; + public final CountDownLatch reader1CleanupStarted; + public final CountDownLatch reader2Started; + public final CountDownLatch allowReader1ToFinish; + public final ExecutorService reader1Executor; + public final ExecutorService reader2Executor; + public final ReaderSettings readerSettings; + + public StructureTest2(ConcurrentHashMap> reader1PartitionSession, ConcurrentHashMap reader1CleanupInProgress, ConcurrentHashMap reader1CleanupCompleted, AtomicBoolean raceConditionDetected, CountDownLatch reader1Started, CountDownLatch reader1CleanupStarted, CountDownLatch reader2Started, CountDownLatch allowReader1ToFinish, ExecutorService reader1Executor, ExecutorService reader2Executor, ReaderSettings readerSettings) { + this.reader1PartitionSession = reader1PartitionSession; + this.reader1CleanupInProgress = reader1CleanupInProgress; + this.reader1CleanupCompleted = reader1CleanupCompleted; + this.raceConditionDetected = raceConditionDetected; + this.reader1Started = reader1Started; + this.reader1CleanupStarted = reader1CleanupStarted; + this.reader2Started = reader2Started; + this.allowReader1ToFinish = allowReader1ToFinish; + this.reader1Executor = reader1Executor; + this.reader2Executor = reader2Executor; + this.readerSettings = readerSettings; + } + } + + private @NotNull TopicReaderEventOrderingTest.StructureTest1 getStructureForOrderGarantees() { Map> eventLog = new ConcurrentHashMap<>(); for (long i = 0; i < partitionCount; i++) { eventLog.put(i, Collections.synchronizedList(new ArrayList<>())); @@ -135,8 +358,31 @@ public void testEventOrderingGuarantees() throws Exception { .build()) .setConsumerName(TEST_CONSUMER) .build(); + return new StructureTest1(eventLog, activeSessions, startReceived, closeReceived, orderingViolation, executor, readerSettings); + } + + private static class StructureTest1 { + public final Map> eventLog; + public final Map activeSessions; + public final CountDownLatch startReceived; + public final CountDownLatch closeReceived; + public final AtomicBoolean orderingViolation; + public final ExecutorService executor; + public final ReaderSettings readerSettings; + + public StructureTest1(Map> eventLog, Map activeSessions, CountDownLatch startReceived, CountDownLatch closeReceived, AtomicBoolean orderingViolation, ExecutorService executor, ReaderSettings readerSettings) { + this.eventLog = eventLog; + this.activeSessions = activeSessions; + this.startReceived = startReceived; + this.closeReceived = closeReceived; + this.orderingViolation = orderingViolation; + this.executor = executor; + this.readerSettings = readerSettings; + } + } - AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + private AsyncReader getAsyncReaderForOrderGaran(ReaderSettings readerSettings, ExecutorService executor, Map> eventLog, Map activeSessions, AtomicBoolean orderingViolation, CountDownLatch startReceived, CountDownLatch closeReceived) { + return client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() .setExecutor(executor) .setEventHandler(new ReadEventHandler() { @@ -150,6 +396,8 @@ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { long sessionId = event.getPartitionSession().getId(); long partitionId = event.getPartitionSession().getPartitionId(); + + // Record start event eventLog.get(partitionId).add("onStartPartitionSession[partitionId = " + partitionId + ",session=" + sessionId + "]"); logger.info("onStartPartitionSession: session={}", sessionId); @@ -167,7 +415,10 @@ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSes public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) { long sessionId = event.getPartitionSession().getId(); long partitionId = event.getPartitionSession().getPartitionId(); + + // Record close event eventLog.get(partitionId).add("onPartitionSessionClosed[partitionId =" + partitionId + ",session=" + sessionId + "]"); + logger.info("onPartitionSessionClosed: session={}", sessionId); activeSessions.remove(partitionId); closeReceived.countDown(); @@ -175,79 +426,46 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession }) .build() ); + } - reader.init().join(); - - // Send a message to trigger partition assignment - sendMessage("test-message"); - - // Wait for start event - assertTrue("Start event not received", startReceived.await(10, TimeUnit.SECONDS)); - - // Shutdown reader to trigger stop event - logger.info("Shutting down reader"); - reader.shutdown().get(10, TimeUnit.SECONDS); - - // assertTrue("Stop event not received", stopReceived.await(10, TimeUnit.SECONDS)); - assertTrue("Close event not received", closeReceived.await(10, TimeUnit.SECONDS)); + private AsyncReader getAsyncReader2ForRaceCondition(ReaderSettings readerSettings, ExecutorService reader2Executor, ConcurrentHashMap reader1CleanupInProgress, AtomicBoolean raceConditionDetected, ConcurrentHashMap reader1CleanupCompleted, CountDownLatch reader2Started) { + return client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setExecutor(reader2Executor) + .setEventHandler(new ReadEventHandler() { + @Override + public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { + // No-op + } - executor.shutdownNow(); + @Override + public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { + long partitionId = event.getPartitionSession().getPartitionId(); + PartitionSession session = event.getPartitionSession(); + logger.info("Reader-2: onStartPartitionSession - partition={}, session={}", + session.getPartitionId(), session.getId()); - logger.info("Event log: {}", eventLog); - assertFalse("Event ordering violation detected", orderingViolation.get()); + // Check if Reader-1 is still cleaning up + if (reader1CleanupInProgress.get(partitionId).get()) { + logger.error("RACE CONDITION DETECTED: Reader-2 received partition {} while Reader-1 is still cleaning up", + session.getPartitionId()); + raceConditionDetected.set(true); + } - for (long partitionId = 0; partitionId < partitionCount; partitionId++) { - // Verify event sequence - int startIndex = -1; - int stopIndex = -1; - for (int i = 0; i < eventLog.get(partitionId).size(); i++) { - if (startIndex == -1 && eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) { - startIndex = i; - } - if (stopIndex == -1 && eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) { - stopIndex = i; - } - } + if (!reader1CleanupCompleted.get(partitionId).get()) { + logger.warn("Reader-2 received partition {} before Reader-1 completed cleanup", + session.getPartitionId()); + } - assertTrue("Start event should be present", startIndex >= 0); - assertTrue("Close event should be present", stopIndex >= 0); - assertTrue("Start event must come before Stop event", startIndex < stopIndex); - } + event.confirm(); + reader2Started.countDown(); + } + }) + .build() + ); } - /** - * Test for session close race condition: verifies that partitions are not reassigned to other readers - * before the original reader completes its cleanup in onPartitionSessionClosed and onReaderClosed callbacks. - */ - @Test - public void testSessionCloseRaceCondition() throws Exception { - logger.info("Starting testSessionCloseRaceCondition"); - // Shared state to track the race condition - ConcurrentHashMap> reader1PartitionSession = new ConcurrentHashMap<>(); - ConcurrentHashMap reader1CleanupInProgress = new ConcurrentHashMap<>(); - ConcurrentHashMap reader1CleanupCompleted = new ConcurrentHashMap<>(); - for (long i = 0; i < partitionCount; i++) { - reader1CleanupCompleted.put(i, new AtomicBoolean(false)); - } - AtomicBoolean raceConditionDetected = new AtomicBoolean(false); - CountDownLatch reader1Started = new CountDownLatch(partitionCount); - CountDownLatch reader1CleanupStarted = new CountDownLatch(partitionCount); - CountDownLatch reader2Started = new CountDownLatch(partitionCount); - CountDownLatch allowReader1ToFinish = new CountDownLatch(1); - - // Create two single-threaded executors to simulate the scenario - ExecutorService reader1Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-1-executor")); - ExecutorService reader2Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-2-executor")); - - ReaderSettings readerSettings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder() - .setPath(testTopic) - .build()) - .setConsumerName(TEST_CONSUMER) - .build(); - - // Create Reader-1 - AsyncReader reader1 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + private AsyncReader getAsyncReader1ForRaceCondition(ReaderSettings readerSettings, ExecutorService reader1Executor, ConcurrentHashMap> reader1PartitionSession, CountDownLatch reader1Started, ConcurrentHashMap reader1CleanupInProgress, CountDownLatch reader1CleanupStarted, CountDownLatch allowReader1ToFinish, ConcurrentHashMap reader1CleanupCompleted) { + return client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() .setExecutor(reader1Executor) .setEventHandler(new ReadEventHandler() { @Override @@ -290,7 +508,7 @@ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSession reader1CleanupStarted.countDown(); - // Simulate slow cleanup (e.g., closing database connections, flushing buffers) + // Step 4. Block Reader-1 cleanup to simulate slow resource release. (e.g., closing database connections, flushing buffers) try { boolean finished = allowReader1ToFinish.await(5, TimeUnit.SECONDS); if (!finished) { @@ -315,94 +533,5 @@ public void onReaderClosed(tech.ydb.topic.read.events.ReaderClosedEvent event) { }) .build() ); - - reader1.init().join(); - - // Send a message to trigger partition assignment to Reader-1 - sendMessage("test-message-1"); - - // Wait for Reader-1 to receive partition - assertTrue("Reader-1 did not receive partition", reader1Started.await(10, TimeUnit.SECONDS)); - for (Map.Entry> v : reader1PartitionSession.entrySet()) { - assertNotNull("Reader-1 partition session is null", v.getValue().get()); - logger.info("Reader-1 received partition: {}", v.getKey()); - } - - // Start shutdown of Reader-1 - logger.info("Before reader-1 shutdown"); - CompletableFuture reader1ShutdownFuture = reader1.shutdown(); - - // Wait for Reader-1 cleanup to start - assertTrue("Reader-1 cleanup did not start", reader1CleanupStarted.await(60, TimeUnit.SECONDS)); - logger.info("Reader-1 cleanup started"); - - // Create Reader-2 while Reader-1 is still cleaning up - AsyncReader reader2 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() - .setExecutor(reader2Executor) - .setEventHandler(new ReadEventHandler() { - @Override - public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) { - // No-op - } - - @Override - public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) { - long partitionId = event.getPartitionSession().getPartitionId(); - PartitionSession session = event.getPartitionSession(); - logger.info("Reader-2: onStartPartitionSession - partition={}, session={}", - session.getPartitionId(), session.getId()); - - // Check if Reader-1 is still cleaning up - if (reader1CleanupInProgress.get(partitionId).get()) { - logger.error("RACE CONDITION DETECTED: Reader-2 received partition {} while Reader-1 is still cleaning up", - session.getPartitionId()); - raceConditionDetected.set(true); - } - - if (!reader1CleanupCompleted.get(partitionId).get()) { - logger.warn("Reader-2 received partition {} before Reader-1 completed cleanup", - session.getPartitionId()); - } - - event.confirm(); - reader2Started.countDown(); - } - }) - .build() - ); - - reader2.init().join(); - - // Give some time for Reader-2 to potentially receive the partition during Reader-1's cleanup - Thread.sleep(500); - - // Allow Reader-1 to finish cleanup - allowReader1ToFinish.countDown(); - - // Wait for Reader-1 shutdown to complete - reader1ShutdownFuture.get(10, TimeUnit.SECONDS); - logger.info("After reader-1 shutdown"); - - // Wait a bit more for partition reassignment to Reader-2 - boolean reader2GotPartition = reader2Started.await(15, TimeUnit.SECONDS); - - // Cleanup - reader2.shutdown().get(10, TimeUnit.SECONDS); - reader1Executor.shutdownNow(); - reader2Executor.shutdownNow(); - - // Assertions - assertFalse("Race condition detected: Reader-2 received partition while Reader-1 was still cleaning up", - raceConditionDetected.get()); - - if (reader2GotPartition) { - for (Map.Entry v : reader1CleanupCompleted.entrySet()) { - assertTrue("Reader-1 cleanup should be completed before Reader-2 receives the partition : " + v.getKey(), - v.getValue().get()); - logger.info("Test passed: Reader-2 received partition only after Reader-1 completed cleanup, partition {}", v.getKey()); - } - } else { - logger.warn("Reader-2 did not receive partition within timeout - test inconclusive"); - } } } From 8f8b679fb4ade2063321152a4c1a92d7c4b69e2e Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sun, 15 Mar 2026 21:49:16 +0300 Subject: [PATCH 8/8] Fix race condition on closePartitionSession before StartPartition session --- .../main/java/tech/ydb/topic/read/impl/ReadSession.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java index b334b2d1b..a14198d66 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java @@ -102,7 +102,14 @@ protected void onStop() { sessions.values().forEach(ReadPartitionSession::shutdown); sessions.clear(); - partitions.values().forEach(reader::handleClosePartitionSession); + List> closeFutures = new ArrayList<>(); + + partitions.values().forEach(partitionSession -> + closeFutures.add(reader.handleClosePartitionSession(partitionSession)) + ); + + CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0])).join(); + partitions.clear(); }