Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;

Expand All @@ -40,7 +42,6 @@
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -55,6 +56,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -124,12 +126,11 @@ public void testWaitForNextReadyNotWokenByFlushWithoutRoll() throws Exception {
// write a small amount of data (not enough to trigger roll)
InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1});
insertTabletNode.setSearchIndex(1);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}));

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());
waitForFlush(
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})));

// data is flushed to buffer but no WAL file roll happened yet, iterator at search index 1
// should not find data (because the current-writing WAL file is not readable by the iterator)
Expand Down Expand Up @@ -160,17 +161,15 @@ public void testWaitForNextReadySucceedsAfterRollFile() throws Exception {
for (int i = 1; i <= 5; i++) {
InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {i});
insertTabletNode.setSearchIndex(i);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}));
waitForFlush(
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})));
}

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());

// roll the WAL file so the data is in a closed file readable by the iterator
walNode.rollWALFile();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());

// iterator at search index 1 should find the data after roll
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Expand All @@ -185,16 +184,15 @@ public void testLegacySeparatorStillWorksAfterRollFile() throws Exception {

InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1});
insertTabletNode.setSearchIndex(1);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}));
walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode());

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());
waitForFlush(
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})));
waitForFlush(
walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode()));

walNode.rollWALFile();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());

ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
assertTrue(iterator.hasNext());
Expand All @@ -214,12 +212,11 @@ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception {
InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1});
insertTabletNode.setSearchIndex(1);
insertTabletNode.setLastFragment(true);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}));

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());
waitForFlush(
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})));

ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);

Expand All @@ -246,7 +243,6 @@ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception {

// trigger WAL file roll — this should signal rollLogWriterCondition and wake up the iterator
walNode.rollWALFile();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());

waitFuture.get(20, TimeUnit.SECONDS);
executor.shutdown();
Expand Down Expand Up @@ -274,14 +270,11 @@ public void testWaitForNextReadyWithAutoRollOnSizeThreshold() throws Exception {
// write initial data with search index
InsertTabletNode first = getInsertTabletNode(devicePath, new long[] {1});
first.setSearchIndex(1);
walNode.log(
memTable.getMemTableId(),
first,
Collections.singletonList(new int[] {0, first.getRowCount()}));

Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> walNode.isAllWALEntriesConsumed());
waitForFlush(
walNode.log(
memTable.getMemTableId(),
first,
Collections.singletonList(new int[] {0, first.getRowCount()})));

ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);

Expand All @@ -305,14 +298,17 @@ public void testWaitForNextReadyWithAutoRollOnSizeThreshold() throws Exception {
Thread.sleep(500);

// write more data to exceed the small threshold and trigger auto-roll
WALFlushListener lastFlushListener = null;
for (int i = 2; i <= 50; i++) {
InsertTabletNode node = getInsertTabletNode(devicePath, new long[] {i});
node.setSearchIndex(i);
walNode.log(
memTable.getMemTableId(),
node,
Collections.singletonList(new int[] {0, node.getRowCount()}));
lastFlushListener =
walNode.log(
memTable.getMemTableId(),
node,
Collections.singletonList(new int[] {0, node.getRowCount()}));
}
waitForFlush(lastFlushListener);

waitFuture.get(40, TimeUnit.SECONDS);
executor.shutdown();
Expand Down Expand Up @@ -341,12 +337,11 @@ public void testWaitForNextReadyAutoTriggersRollOnTimeout() throws Exception {
InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1});
insertTabletNode.setSearchIndex(1);
insertTabletNode.setLastFragment(true);
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()}));

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());
waitForFlush(
walNode.log(
memTable.getMemTableId(),
insertTabletNode,
Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})));

// iterator cannot read the active WAL file, so hasNext() should be false
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Expand Down Expand Up @@ -443,4 +438,14 @@ private InsertTabletNode getInsertTabletNode(String devicePath, long[] times)
columns,
times.length);
}

private void waitForFlush(WALFlushListener flushListener) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<Status> waitFuture = executor.submit(flushListener::waitForResult);
assertEquals(Status.SUCCESS, waitFuture.get(10, TimeUnit.SECONDS));
} finally {
executor.shutdownNow();
}
}
}
Loading