From e5437a2c6a32b374cdae1e77357577edfe75cbbb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 16:41:54 +0800 Subject: [PATCH 1/3] Support legacy pipe snapshot seal parameters --- .../protocol/IoTDBConfigNodeReceiver.java | 4 +- .../thrift/IoTDBDataNodeReceiver.java | 4 +- .../request/PipeTransferFileSealReqV2.java | 8 +++ .../PipeTransferFileSealReqV2Test.java | 65 +++++++++++++++++++ 4 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 7de8e8bf78d9d..5f414f2cd69fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -1277,7 +1277,7 @@ protected TSStatus loadFileV2( PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); final boolean isTreeModelDataAllowedToBeCaptured = - parameters.containsKey(PipeTransferFileSealReqV2.TREE); + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters); final TreePattern treePattern = TreePattern.parsePatternFromString( parameters.get(ColumnHeaderConstant.PATH_PATTERN), @@ -1285,7 +1285,7 @@ protected TSStatus loadFileV2( p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); final TablePattern tablePattern = new TablePattern( - parameters.containsKey(PipeTransferFileSealReqV2.TABLE), + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters), parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN), parameters.get(ColumnHeaderConstant.TABLE_NAME)); final List results = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 55f46be8f99d5..60ea19246f0f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -614,7 +614,7 @@ private TSStatus loadSchemaSnapShot( PipeSchemaRegionSnapshotEvent.getStatementTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); final boolean isTreeModelDataAllowedToBeCaptured = - parameters.containsKey(PipeTransferFileSealReqV2.TREE); + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters); final TreePattern treePattern = TreePattern.parsePatternFromString( parameters.get(ColumnHeaderConstant.PATH_PATTERN), @@ -622,7 +622,7 @@ private TSStatus loadSchemaSnapShot( p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); final TablePattern tablePattern = new TablePattern( - parameters.containsKey(PipeTransferFileSealReqV2.TABLE), + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters), parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN), parameters.get(ColumnHeaderConstant.TABLE_NAME)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java index cf73948c3f14d..227d5871d8b5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java @@ -54,6 +54,14 @@ public final Map getParameters() { return parameters; } + public static boolean isTreeModelDataAllowedToBeCaptured(final Map parameters) { + return parameters.containsKey(TREE) || !parameters.containsKey(TABLE); + } + + public static boolean isTableModelDataAllowedToBeCaptured(final Map parameters) { + return parameters.containsKey(TABLE); + } + protected abstract PipeRequestType getPlanType(); /////////////////////////////// Thrift /////////////////////////////// diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java new file mode 100644 index 0000000000000..4ac0e5cb7c333 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.request; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PipeTransferFileSealReqV2Test { + + @Test + public void testLegacyV13SnapshotSealCapturesTreeOnly() { + final Map parameters = new HashMap<>(); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTreeOnlySnapshotSealCapturesTreeOnly() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TREE, ""); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTableOnlySnapshotSealCapturesTableOnly() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TABLE, ""); + + Assert.assertFalse(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTreeAndTableSnapshotSealCapturesBoth() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TREE, ""); + parameters.put(PipeTransferFileSealReqV2.TABLE, ""); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } +} From deee07b7997d13df756fb58d0c961abacd34a997 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 17:49:16 +0800 Subject: [PATCH 2/3] Support legacy pipe tablet requests --- .../request/PipeTransferTabletBatchReq.java | 5 +- .../request/PipeTransferTabletRawReq.java | 43 ++++++--- .../sink/util/TabletStatementConverter.java | 19 +++- .../sink/PipeDataNodeThriftRequestTest.java | 91 +++++++++++++++++++ 4 files changed, 138 insertions(+), 20 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 8d75e9864bb52..3dcef945d6d3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -34,7 +34,6 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.apache.tsfile.write.record.Tablet; import java.io.DataOutputStream; import java.io.IOException; @@ -142,9 +141,7 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { - batchReq.tabletReqs.add( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - Tablet.deserialize(transferReq.body), ReadWriteIOUtils.readBool(transferReq.body))); + batchReq.tabletReqs.add(PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body)); } batchReq.version = transferReq.version; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 1504b3eadb9b9..271ab1f223b88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -112,6 +112,14 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq( return tabletReq; } + public static PipeTransferTabletRawReq toTPipeTransferRawReq(final ByteBuffer buffer) { + final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); + + tabletReq.deserializeTPipeTransferRawReq(buffer); + + return tabletReq; + } + /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletRawReq toTPipeTransferReq( @@ -135,27 +143,36 @@ public static PipeTransferTabletRawReq toTPipeTransferReq( } public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { - final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); + final PipeTransferTabletRawReq tabletReq = toTPipeTransferRawReq(transferReq.body); + + tabletReq.version = transferReq.version; + tabletReq.type = transferReq.type; + + return tabletReq; + } - final ByteBuffer buffer = transferReq.body; + private void deserializeTPipeTransferRawReq(final ByteBuffer buffer) { final int startPosition = buffer.position(); try { - // V1: no databaseName, readDatabaseName = false final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false); - tabletReq.isAligned = insertTabletStatement.isAligned(); - // devicePath is already set in deserializeStatementFromTabletFormat for V1 format - tabletReq.statement = insertTabletStatement; + TabletStatementConverter.deserializeLegacyStatementFromTabletFormat(buffer); + isAligned = insertTabletStatement.isAligned(); + statement = insertTabletStatement; + return; } catch (final Exception e) { buffer.position(startPosition); - tabletReq.tablet = Tablet.deserialize(buffer); - tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); } - tabletReq.version = transferReq.version; - tabletReq.type = transferReq.type; - - return tabletReq; + try { + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false); + isAligned = insertTabletStatement.isAligned(); + statement = insertTabletStatement; + } catch (final Exception e) { + buffer.position(startPosition); + tablet = Tablet.deserialize(buffer); + isAligned = ReadWriteIOUtils.readBool(buffer); + } } /////////////////////////////// Air Gap /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index e8b8e36cb49c9..2e863ffaf010f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -76,6 +76,17 @@ private TabletStatementConverter() { */ public static InsertTabletStatement deserializeStatementFromTabletFormat( final ByteBuffer byteBuffer, final boolean readDatabaseName) throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, readDatabaseName, true); + } + + public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( + final ByteBuffer byteBuffer) throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, false, false); + } + + private static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer, final boolean readDatabaseName, final boolean readColumnCategory) + throws IllegalPathException { final InsertTabletStatement statement = new InsertTabletStatement(); // Calculate memory size during deserialization, use INSTANCE_SIZE constant @@ -121,9 +132,11 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( final Pair pair = readMeasurement(byteBuffer); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); - columnCategories[i] = - TsTableColumnCategory.fromTsFileColumnCategory( - ColumnCategory.values()[byteBuffer.get()]); + if (readColumnCategory) { + columnCategories[i] = + TsTableColumnCategory.fromTsFileColumnCategory( + ColumnCategory.values()[byteBuffer.get()]); + } // Calculate memory for each measurement string if (measurement[i] != null) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 38704ec7fea18..bf860da540f76 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.schema.SchemaConstant; @@ -46,6 +48,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -494,6 +497,42 @@ public void testPipeTransferTabletBatchReq() throws IOException { Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); } + @Test + public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws IOException { + final List tabletBuffers = new ArrayList<>(); + tabletBuffers.add(serializeLegacyTabletRawBuffer(false)); + tabletBuffers.add(serializeLegacyTabletRawBuffer(true)); + + final PipeTransferTabletBatchReq req = + PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + + Assert.assertEquals(2, deserializedReq.getTabletReqs().size()); + Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); + Assert.assertTrue(deserializedReq.getTabletReqs().get(1).getIsAligned()); + + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(1).constructStatement()); + } + + @Test + public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOException { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType(); + req.body = serializeLegacyTabletRawBuffer(true); + + final PipeTransferTabletRawReq deserializedReq = + PipeTransferTabletRawReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializedReq.getVersion()); + Assert.assertEquals(req.getType(), deserializedReq.getType()); + Assert.assertTrue(deserializedReq.getIsAligned()); + assertLegacyTabletStatement(deserializedReq.constructStatement()); + } + @Test public void testPipeTransferTabletBatchReqV2() throws IOException { final List insertNodeBuffers = new ArrayList<>(); @@ -770,4 +809,56 @@ public void testPipeTransferFilePieceResp() throws IOException { Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus()); Assert.assertEquals(resp.getEndWritingOffset(), deserializeResp.getEndWritingOffset()); } + + private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write("root.sg.d", outputStream); + ReadWriteIOUtils.write(2, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32); + writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2L, outputStream); + ReadWriteIOUtils.write(1L, outputStream); + + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(new Binary("2", TSFileConfig.STRING_CHARSET), outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(new Binary("1", TSFileConfig.STRING_CHARSET), outputStream); + + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static void writeLegacyMeasurementSchema( + final DataOutputStream outputStream, final String measurement, final TSDataType dataType) + throws IOException { + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(measurement, outputStream); + ReadWriteIOUtils.write(dataType.serialize(), outputStream); + ReadWriteIOUtils.write(TSEncoding.PLAIN.serialize(), outputStream); + ReadWriteIOUtils.write(CompressionType.UNCOMPRESSED.serialize(), outputStream); + ReadWriteIOUtils.write(0, outputStream); + } + + private static void assertLegacyTabletStatement(final InsertTabletStatement statement) { + Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath()); + Assert.assertArrayEquals(new String[] {"s1", "s2"}, statement.getMeasurements()); + Assert.assertArrayEquals( + new TSDataType[] {TSDataType.INT32, TSDataType.TEXT}, statement.getDataTypes()); + Assert.assertEquals(2, statement.getRowCount()); + } } From 26d8cb9713083edbb2ca58a072d907d6bc61451b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 18:09:36 +0800 Subject: [PATCH 3/3] Resolve legacy pipe tablet request conflicts --- .../request/PipeTransferTabletBatchReq.java | 5 +- .../request/PipeTransferTabletRawReq.java | 21 +++-- .../sink/util/TabletStatementConverter.java | 85 ++++++++++++++----- .../sink/PipeDataNodeThriftRequestTest.java | 43 ++++++++++ 4 files changed, 123 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 3dcef945d6d3e..ede3370f5b0d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; @@ -129,6 +130,7 @@ public static PipeTransferTabletBatchReq toTPipeTransferReq( public static PipeTransferTabletBatchReq fromTPipeTransferReq( final TPipeTransferReq transferReq) { final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); + final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // Binary size, for rolling upgrade ReadWriteIOUtils.readInt(transferReq.body); @@ -141,7 +143,8 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { - batchReq.tabletReqs.add(PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body)); + batchReq.tabletReqs.add( + PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool)); } batchReq.version = transferReq.version; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 271ab1f223b88..1d2c1dd71cd17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -112,10 +114,11 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq( return tabletReq; } - public static PipeTransferTabletRawReq toTPipeTransferRawReq(final ByteBuffer buffer) { + public static PipeTransferTabletRawReq toTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); - tabletReq.deserializeTPipeTransferRawReq(buffer); + tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool); return tabletReq; } @@ -143,7 +146,8 @@ public static PipeTransferTabletRawReq toTPipeTransferReq( } public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { - final PipeTransferTabletRawReq tabletReq = toTPipeTransferRawReq(transferReq.body); + final PipeTransferTabletRawReq tabletReq = + toTPipeTransferRawReq(transferReq.body, new TabletStringInternPool()); tabletReq.version = transferReq.version; tabletReq.type = transferReq.type; @@ -151,11 +155,13 @@ public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferR return tabletReq; } - private void deserializeTPipeTransferRawReq(final ByteBuffer buffer) { + private void deserializeTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { final int startPosition = buffer.position(); try { final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeLegacyStatementFromTabletFormat(buffer); + TabletStatementConverter.deserializeLegacyStatementFromTabletFormat( + buffer, tabletStringInternPool); isAligned = insertTabletStatement.isAligned(); statement = insertTabletStatement; return; @@ -165,12 +171,13 @@ private void deserializeTPipeTransferRawReq(final ByteBuffer buffer) { try { final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false); + TabletStatementConverter.deserializeStatementFromTabletFormat( + buffer, false, tabletStringInternPool); isAligned = insertTabletStatement.isAligned(); statement = insertTabletStatement; } catch (final Exception e) { buffer.position(startPosition); - tablet = Tablet.deserialize(buffer); + tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); isAligned = ReadWriteIOUtils.readBool(buffer); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 2e863ffaf010f..6656110f2c0bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.utils.PathUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -40,6 +41,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Objects; /** * Utility class for converting between InsertTabletStatement and Tablet format ByteBuffer. This @@ -76,23 +78,42 @@ private TabletStatementConverter() { */ public static InsertTabletStatement deserializeStatementFromTabletFormat( final ByteBuffer byteBuffer, final boolean readDatabaseName) throws IllegalPathException { - return deserializeStatementFromTabletFormat(byteBuffer, readDatabaseName, true); + return deserializeStatementFromTabletFormat(byteBuffer, readDatabaseName, null); + } + + public static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer, + final boolean readDatabaseName, + final TabletStringInternPool tabletStringInternPool) + throws IllegalPathException { + return deserializeStatementFromTabletFormat( + byteBuffer, readDatabaseName, tabletStringInternPool, true); } public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( final ByteBuffer byteBuffer) throws IllegalPathException { - return deserializeStatementFromTabletFormat(byteBuffer, false, false); + return deserializeLegacyStatementFromTabletFormat(byteBuffer, null); + } + + public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( + final ByteBuffer byteBuffer, final TabletStringInternPool tabletStringInternPool) + throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, false, tabletStringInternPool, false); } private static InsertTabletStatement deserializeStatementFromTabletFormat( - final ByteBuffer byteBuffer, final boolean readDatabaseName, final boolean readColumnCategory) + final ByteBuffer byteBuffer, + final boolean readDatabaseName, + final TabletStringInternPool tabletStringInternPool, + final boolean readColumnCategory) throws IllegalPathException { final InsertTabletStatement statement = new InsertTabletStatement(); // Calculate memory size during deserialization, use INSTANCE_SIZE constant long memorySize = InsertTabletStatement.getInstanceSize(); - final String insertTargetName = ReadWriteIOUtils.readString(byteBuffer); + final String insertTargetName = + intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool); final int rowSize = ReadWriteIOUtils.readInt(byteBuffer); @@ -129,7 +150,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( for (int i = 0; i < schemaSize; i++) { final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); if (hasSchema) { - final Pair pair = readMeasurement(byteBuffer); + final Pair pair = readMeasurement(byteBuffer, tabletStringInternPool); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); if (readColumnCategory) { @@ -182,15 +203,12 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( if (isBitMapsNotNull) { // Use the method that returns both BitMap array and memory size final Pair bitMapsAndMemory = - readBitMapsFromBufferWithMemory(byteBuffer, schemaSize); + readBitMapsFromBufferWithMemory(byteBuffer, schemaSize, rowSize); bitMaps = bitMapsAndMemory.getLeft(); bitMapsMemorySize = bitMapsAndMemory.getRight(); } else { - // Calculate memory for empty BitMap array: array header + references - bitMaps = new BitMap[schemaSize]; - bitMapsMemorySize = - org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( - NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + bitMaps = null; + bitMapsMemorySize = 0; } // Add bitMaps memory to total @@ -230,7 +248,8 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( // Read databaseName if requested (V2 format) if (readDatabaseName) { - final String databaseName = ReadWriteIOUtils.readString(byteBuffer); + final String databaseName = + intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool); if (databaseName != null) { statement.setDatabaseName(databaseName); // Calculate memory for databaseName @@ -239,7 +258,9 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( if (PathUtils.isTableModelDatabase(databaseName)) { statement.setWriteToTable(true); // For table model, insertTargetName is table name, convert to lowercase - statement.setDevicePath(new PartialPath(insertTargetName.toLowerCase(), false)); + statement.setDevicePath( + new PartialPath( + intern(insertTargetName.toLowerCase(), tabletStringInternPool), false)); statement.setColumnCategories(columnCategories); memorySize += columnCategoriesMemorySize; @@ -282,6 +303,11 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( return deserializeStatementFromTabletFormat(byteBuffer, false); } + private static String intern( + final String value, final TabletStringInternPool tabletStringInternPool) { + return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(value) : value; + } + /** * Skip a string in ByteBuffer without reading it. This is more efficient than reading and * discarding the string. @@ -302,10 +328,13 @@ private static void skipString(final ByteBuffer buffer) { * @param buffer ByteBuffer containing serialized measurement schema * @return Pair of measurement name and data type */ - private static Pair readMeasurement(final ByteBuffer buffer) { + private static Pair readMeasurement( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { // Read measurement name and data type final Pair pair = - new Pair<>(ReadWriteIOUtils.readString(buffer), TSDataType.deserializeFrom(buffer)); + new Pair<>( + intern(ReadWriteIOUtils.readString(buffer), tabletStringInternPool), + TSDataType.deserializeFrom(buffer)); // Skip encoding type (byte) and compression type (byte) - 2 bytes total buffer.position(buffer.position() + 2); @@ -328,13 +357,11 @@ private static Pair readMeasurement(final ByteBuffer buffer) * array and the calculated memory size. */ private static Pair readBitMapsFromBufferWithMemory( - final ByteBuffer byteBuffer, final int columns) { + final ByteBuffer byteBuffer, final int columns, final int rowSize) { final BitMap[] bitMaps = new BitMap[columns]; - // Calculate memory: array header + object references - long memorySize = - org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( - NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns); + long bitMapsMemorySize = 0; + boolean hasMarkedBitMap = false; for (int i = 0; i < columns; i++) { final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); @@ -342,18 +369,30 @@ private static Pair readBitMapsFromBufferWithMemory( final int size = ReadWriteIOUtils.readInt(byteBuffer); final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer); final byte[] byteArray = valueBinary.getValues(); - bitMaps[i] = new BitMap(size, byteArray); + final BitMap bitMap = new BitMap(size, byteArray); + if (bitMap.isAllUnmarked(Math.min(rowSize, bitMap.getSize()))) { + continue; + } + bitMaps[i] = bitMap; + hasMarkedBitMap = true; // Calculate memory for this BitMap: BitMap object + byte array // BitMap shallow size + byte array (array header + array length) - memorySize += + bitMapsMemorySize += SIZE_OF_BITMAP + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + byteArray.length); } } - return new Pair<>(bitMaps, memorySize); + if (!hasMarkedBitMap) { + return new Pair<>(null, 0L); + } + return new Pair<>( + bitMaps, + bitMapsMemorySize + + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns)); } /** diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index bf860da540f76..db7c2b409fd54 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -57,6 +57,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.record.Tablet; @@ -497,6 +498,28 @@ public void testPipeTransferTabletBatchReq() throws IOException { Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); } + @Test + public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() throws IOException { + final List tabletBuffers = new ArrayList<>(); + tabletBuffers.add( + serializeTablet(createSingleValueTablet(new String("root.sg.d"), new String("s1")), false)); + tabletBuffers.add( + serializeTablet(createSingleValueTablet(new String("root.sg.d"), new String("s1")), false)); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq( + PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers)); + final Pair statements = + deserializedReq.constructStatements(); + final List insertTabletStatements = + statements.getRight().getInsertTabletStatementList(); + + Assert.assertEquals(2, insertTabletStatements.size()); + Assert.assertSame( + insertTabletStatements.get(0).getMeasurements()[0], + insertTabletStatements.get(1).getMeasurements()[0]); + } + @Test public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws IOException { final List tabletBuffers = new ArrayList<>(); @@ -810,6 +833,26 @@ public void testPipeTransferFilePieceResp() throws IOException { Assert.assertEquals(resp.getEndWritingOffset(), deserializeResp.getEndWritingOffset()); } + private static Tablet createSingleValueTablet(final String deviceId, final String measurement) { + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32)); + + final Tablet tablet = new Tablet(deviceId, schemaList, 8); + tablet.addTimestamp(0, 1); + tablet.addValue(measurement, 0, 1); + return tablet; + } + + private static ByteBuffer serializeTablet(final Tablet tablet, final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + tablet.serialize(outputStream); + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned) throws IOException { try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();