diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java index 22e042e285382..6dfede5a8ecec 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java @@ -129,17 +129,31 @@ public void testTsFileDecompositionWithMods() { executeNonQueryWithRetry(senderEnv, "FLUSH"); + HashSet expectedResults = new HashSet<>(); + expectedResults.add( + "t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,"); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + TableModelUtils.getQuerySql("table1"), + TableModelUtils.generateHeaderResults(), + expectedResults, + "sg1"); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4", + "s4,", + Collections.emptySet(), + "sg1"); + executeNonQueryWithRetry( senderEnv, String.format( - "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true', 'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')", + "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true', 'capture.table'='true', 'inclusion'='data.insert,data.delete') WITH CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')", receiverEnv.getDataNodeWrapperList().get(0).getIp(), receiverEnv.getDataNodeWrapperList().get(0).getPort())); - HashSet expectedResults = new HashSet<>(); - expectedResults.add( - "t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,"); - TestUtils.assertDataEventuallyOnEnv( receiverEnv, TableModelUtils.getQuerySql("table1"), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java index d00ee54428bf0..6d619914c09f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.analyze; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IoTDBException; @@ -39,7 +38,8 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; -import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; @@ -320,18 +320,34 @@ public static void analyzeDelete(final Delete node, final MPPQueryContext queryC try (final ConfigNodeClient configNodeClient = ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - // TODO: may use time and db/table to filter - final TRegionRouteMapResp latestRegionRouteMap = configNodeClient.getLatestRegionRouteMap(); - final Set replicaSets = new HashSet<>(); - latestRegionRouteMap.getRegionRouteMap().entrySet().stream() - .filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion) - .forEach(e -> replicaSets.add(e.getValue())); - node.setReplicaSets(replicaSets); + node.setReplicaSets(fetchDeleteReplicaSets(configNodeClient, node)); + } catch (final IoTDBRuntimeException e) { + throw e; } catch (final Exception e) { throw new IoTDBRuntimeException(e, TSStatusCode.CAN_NOT_CONNECT_CONFIGNODE.getStatusCode()); } } + static Set fetchDeleteReplicaSets( + final ConfigNodeClient configNodeClient, final Delete node) throws Exception { + final Set replicaSets = new HashSet<>(); + for (final TableDeletionEntry tableDeletionEntry : node.getTableDeletionEntries()) { + final TGetRegionGroupsByTimeResp resp = + configNodeClient.getRegionGroupsByTime( + new TGetRegionGroupsByTimeReq( + node.getDatabaseName(), + tableDeletionEntry.getStartTime(), + tableDeletionEntry.getEndTime())); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(resp.getStatus()); + } + if (resp.isSetRegionReplicaSets()) { + replicaSets.addAll(resp.getRegionReplicaSets()); + } + } + return replicaSets; + } + @SuppressWarnings("java:S3655") // optional is checked public static String getDatabaseName(final Delete node, final MPPQueryContext queryContext) { final String databaseName; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java index 5d0ccd74f4598..bbeaa629d4f72 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java @@ -19,20 +19,40 @@ package org.apache.iotdb.db.queryengine.plan.analyze; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ComparisonExpression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.QualifiedName; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Table; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; +import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.TimeRange; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class AnalyzeUtilsTest { @@ -52,4 +72,58 @@ public void testParseDeletePredicateWithRenamedTimeColumn() { assertEquals(Long.MIN_VALUE, entries.get(0).getStartTime()); assertEquals(100, entries.get(0).getEndTime()); } + + @Test + public void testFetchDeleteReplicaSetsOnlyQueriesTargetDatabaseRegions() throws Exception { + final Delete delete = new Delete(new Table(QualifiedName.of("table1"))); + delete.setDatabaseName("root.db1"); + delete.setTableDeletionEntries( + Arrays.asList( + new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(10, 20)), + new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(30, 40)))); + + final TRegionReplicaSet regionReplicaSet1 = dataRegionReplicaSet(1); + final TRegionReplicaSet regionReplicaSet2 = dataRegionReplicaSet(2); + final TGetRegionGroupsByTimeResp resp1 = + successRegionGroupsResp(Collections.singleton(regionReplicaSet1)); + final TGetRegionGroupsByTimeResp resp2 = + successRegionGroupsResp(new HashSet<>(Arrays.asList(regionReplicaSet1, regionReplicaSet2))); + final ConfigNodeClient configNodeClient = Mockito.mock(ConfigNodeClient.class); + Mockito.when( + configNodeClient.getRegionGroupsByTime(Mockito.any(TGetRegionGroupsByTimeReq.class))) + .thenReturn(resp1, resp2); + + final Set result = + AnalyzeUtils.fetchDeleteReplicaSets(configNodeClient, delete); + + assertEquals(2, result.size()); + assertTrue(result.contains(regionReplicaSet1)); + assertTrue(result.contains(regionReplicaSet2)); + + final ArgumentCaptor reqCaptor = + ArgumentCaptor.forClass(TGetRegionGroupsByTimeReq.class); + Mockito.verify(configNodeClient, Mockito.times(2)).getRegionGroupsByTime(reqCaptor.capture()); + Mockito.verify(configNodeClient, Mockito.never()).getLatestRegionRouteMap(); + + final List requests = reqCaptor.getAllValues(); + assertEquals("root.db1", requests.get(0).getDatabase()); + assertEquals(10, requests.get(0).getStartTime()); + assertEquals(20, requests.get(0).getEndTime()); + assertEquals("root.db1", requests.get(1).getDatabase()); + assertEquals(30, requests.get(1).getStartTime()); + assertEquals(40, requests.get(1).getEndTime()); + } + + private static TGetRegionGroupsByTimeResp successRegionGroupsResp( + final Set replicaSets) { + final TGetRegionGroupsByTimeResp resp = + new TGetRegionGroupsByTimeResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + resp.setRegionReplicaSets(replicaSets); + return resp; + } + + private static TRegionReplicaSet dataRegionReplicaSet(final int regionId) { + return new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId), Collections.emptyList()); + } }