From 0f58e8cf6a66c3a6c71401b9536b4ba605bfddb9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 19:26:44 +0800 Subject: [PATCH] Fix WAL roll file test flush wait --- .../wal/node/WALNodeWaitForRollFileTest.java | 99 ++++++++++--------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java index b24a8cd29cf50..e50d8cdeb25df 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java @@ -31,6 +31,8 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; @@ -40,7 +42,6 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.schema.MeasurementSchema; -import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -124,12 +126,11 @@ public void testWaitForNextReadyNotWokenByFlushWithoutRoll() throws Exception { // write a small amount of data (not enough to trigger roll) InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + waitForFlush( + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}))); // data is flushed to buffer but no WAL file roll happened yet, iterator at search index 1 // should not find data (because the current-writing WAL file is not readable by the iterator) @@ -160,17 +161,15 @@ public void testWaitForNextReadySucceedsAfterRollFile() throws Exception { for (int i = 1; i <= 5; i++) { InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {i}); insertTabletNode.setSearchIndex(i); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + waitForFlush( + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}))); } - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); - // roll the WAL file so the data is in a closed file readable by the iterator walNode.rollWALFile(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); // iterator at search index 1 should find the data after roll ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -185,16 +184,15 @@ public void testLegacySeparatorStillWorksAfterRollFile() throws Exception { InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode()); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + waitForFlush( + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}))); + waitForFlush( + walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode())); walNode.rollWALFile(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); assertTrue(iterator.hasNext()); @@ -214,12 +212,11 @@ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception { InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); insertTabletNode.setLastFragment(true); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + waitForFlush( + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}))); ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -246,7 +243,6 @@ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception { // trigger WAL file roll — this should signal rollLogWriterCondition and wake up the iterator walNode.rollWALFile(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); waitFuture.get(20, TimeUnit.SECONDS); executor.shutdown(); @@ -274,14 +270,11 @@ public void testWaitForNextReadyWithAutoRollOnSizeThreshold() throws Exception { // write initial data with search index InsertTabletNode first = getInsertTabletNode(devicePath, new long[] {1}); first.setSearchIndex(1); - walNode.log( - memTable.getMemTableId(), - first, - Collections.singletonList(new int[] {0, first.getRowCount()})); - - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .until(() -> walNode.isAllWALEntriesConsumed()); + waitForFlush( + walNode.log( + memTable.getMemTableId(), + first, + Collections.singletonList(new int[] {0, first.getRowCount()}))); ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -305,14 +298,17 @@ public void testWaitForNextReadyWithAutoRollOnSizeThreshold() throws Exception { Thread.sleep(500); // write more data to exceed the small threshold and trigger auto-roll + WALFlushListener lastFlushListener = null; for (int i = 2; i <= 50; i++) { InsertTabletNode node = getInsertTabletNode(devicePath, new long[] {i}); node.setSearchIndex(i); - walNode.log( - memTable.getMemTableId(), - node, - Collections.singletonList(new int[] {0, node.getRowCount()})); + lastFlushListener = + walNode.log( + memTable.getMemTableId(), + node, + Collections.singletonList(new int[] {0, node.getRowCount()})); } + waitForFlush(lastFlushListener); waitFuture.get(40, TimeUnit.SECONDS); executor.shutdown(); @@ -341,12 +337,11 @@ public void testWaitForNextReadyAutoTriggersRollOnTimeout() throws Exception { InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); insertTabletNode.setLastFragment(true); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + waitForFlush( + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}))); // iterator cannot read the active WAL file, so hasNext() should be false ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -443,4 +438,14 @@ private InsertTabletNode getInsertTabletNode(String devicePath, long[] times) columns, times.length); } + + private void waitForFlush(WALFlushListener flushListener) throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future waitFuture = executor.submit(flushListener::waitForResult); + assertEquals(Status.SUCCESS, waitFuture.get(10, TimeUnit.SECONDS)); + } finally { + executor.shutdownNow(); + } + } }