diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 7de8e8bf78d9d..5f414f2cd69fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -1277,7 +1277,7 @@ protected TSStatus loadFileV2( PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); final boolean isTreeModelDataAllowedToBeCaptured = - parameters.containsKey(PipeTransferFileSealReqV2.TREE); + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters); final TreePattern treePattern = TreePattern.parsePatternFromString( parameters.get(ColumnHeaderConstant.PATH_PATTERN), @@ -1285,7 +1285,7 @@ protected TSStatus loadFileV2( p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); final TablePattern tablePattern = new TablePattern( - parameters.containsKey(PipeTransferFileSealReqV2.TABLE), + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters), parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN), parameters.get(ColumnHeaderConstant.TABLE_NAME)); final List results = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 55f46be8f99d5..60ea19246f0f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -614,7 +614,7 @@ private TSStatus loadSchemaSnapShot( PipeSchemaRegionSnapshotEvent.getStatementTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); final boolean isTreeModelDataAllowedToBeCaptured = - parameters.containsKey(PipeTransferFileSealReqV2.TREE); + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters); final TreePattern treePattern = TreePattern.parsePatternFromString( parameters.get(ColumnHeaderConstant.PATH_PATTERN), @@ -622,7 +622,7 @@ private TSStatus loadSchemaSnapShot( p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); final TablePattern tablePattern = new TablePattern( - parameters.containsKey(PipeTransferFileSealReqV2.TABLE), + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters), parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN), parameters.get(ColumnHeaderConstant.TABLE_NAME)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 98ea83b6d7631..1d2c1dd71cd17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -118,19 +118,7 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq( final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); - final int startPosition = buffer.position(); - try { - final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeStatementFromTabletFormat( - buffer, false, tabletStringInternPool); - tabletReq.isAligned = insertTabletStatement.isAligned(); - tabletReq.statement = insertTabletStatement; - } catch (final Exception e) { - buffer.position(startPosition); - tabletReq.tablet = - PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); - tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); - } + tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool); return tabletReq; } @@ -167,6 +155,33 @@ public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferR return tabletReq; } + private void deserializeTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { + final int startPosition = buffer.position(); + try { + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeLegacyStatementFromTabletFormat( + buffer, tabletStringInternPool); + isAligned = insertTabletStatement.isAligned(); + statement = insertTabletStatement; + return; + } catch (final Exception e) { + buffer.position(startPosition); + } + + try { + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeStatementFromTabletFormat( + buffer, false, tabletStringInternPool); + isAligned = insertTabletStatement.isAligned(); + statement = insertTabletStatement; + } catch (final Exception e) { + buffer.position(startPosition); + tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); + isAligned = ReadWriteIOUtils.readBool(buffer); + } + } + /////////////////////////////// Air Gap /////////////////////////////// /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 773d40e99d1f7..6656110f2c0bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -86,6 +86,27 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( final boolean readDatabaseName, final TabletStringInternPool tabletStringInternPool) throws IllegalPathException { + return deserializeStatementFromTabletFormat( + byteBuffer, readDatabaseName, tabletStringInternPool, true); + } + + public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( + final ByteBuffer byteBuffer) throws IllegalPathException { + return deserializeLegacyStatementFromTabletFormat(byteBuffer, null); + } + + public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( + final ByteBuffer byteBuffer, final TabletStringInternPool tabletStringInternPool) + throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, false, tabletStringInternPool, false); + } + + private static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer, + final boolean readDatabaseName, + final TabletStringInternPool tabletStringInternPool, + final boolean readColumnCategory) + throws IllegalPathException { final InsertTabletStatement statement = new InsertTabletStatement(); // Calculate memory size during deserialization, use INSTANCE_SIZE constant @@ -132,9 +153,11 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( final Pair pair = readMeasurement(byteBuffer, tabletStringInternPool); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); - columnCategories[i] = - TsTableColumnCategory.fromTsFileColumnCategory( - ColumnCategory.values()[byteBuffer.get()]); + if (readColumnCategory) { + columnCategories[i] = + TsTableColumnCategory.fromTsFileColumnCategory( + ColumnCategory.values()[byteBuffer.get()]); + } // Calculate memory for each measurement string if (measurement[i] != null) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 10573e5609dba..db7c2b409fd54 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.schema.SchemaConstant; @@ -518,6 +520,42 @@ public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() thro insertTabletStatements.get(1).getMeasurements()[0]); } + @Test + public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws IOException { + final List tabletBuffers = new ArrayList<>(); + tabletBuffers.add(serializeLegacyTabletRawBuffer(false)); + tabletBuffers.add(serializeLegacyTabletRawBuffer(true)); + + final PipeTransferTabletBatchReq req = + PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + + Assert.assertEquals(2, deserializedReq.getTabletReqs().size()); + Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); + Assert.assertTrue(deserializedReq.getTabletReqs().get(1).getIsAligned()); + + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(1).constructStatement()); + } + + @Test + public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOException { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType(); + req.body = serializeLegacyTabletRawBuffer(true); + + final PipeTransferTabletRawReq deserializedReq = + PipeTransferTabletRawReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializedReq.getVersion()); + Assert.assertEquals(req.getType(), deserializedReq.getType()); + Assert.assertTrue(deserializedReq.getIsAligned()); + assertLegacyTabletStatement(deserializedReq.constructStatement()); + } + @Test public void testPipeTransferTabletBatchReqV2() throws IOException { final List insertNodeBuffers = new ArrayList<>(); @@ -814,4 +852,56 @@ private static ByteBuffer serializeTablet(final Tablet tablet, final boolean isA return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } } + + private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write("root.sg.d", outputStream); + ReadWriteIOUtils.write(2, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32); + writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2L, outputStream); + ReadWriteIOUtils.write(1L, outputStream); + + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(new Binary("2", TSFileConfig.STRING_CHARSET), outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(new Binary("1", TSFileConfig.STRING_CHARSET), outputStream); + + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static void writeLegacyMeasurementSchema( + final DataOutputStream outputStream, final String measurement, final TSDataType dataType) + throws IOException { + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(measurement, outputStream); + ReadWriteIOUtils.write(dataType.serialize(), outputStream); + ReadWriteIOUtils.write(TSEncoding.PLAIN.serialize(), outputStream); + ReadWriteIOUtils.write(CompressionType.UNCOMPRESSED.serialize(), outputStream); + ReadWriteIOUtils.write(0, outputStream); + } + + private static void assertLegacyTabletStatement(final InsertTabletStatement statement) { + Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath()); + Assert.assertArrayEquals(new String[] {"s1", "s2"}, statement.getMeasurements()); + Assert.assertArrayEquals( + new TSDataType[] {TSDataType.INT32, TSDataType.TEXT}, statement.getDataTypes()); + Assert.assertEquals(2, statement.getRowCount()); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java index cf73948c3f14d..227d5871d8b5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java @@ -54,6 +54,14 @@ public final Map getParameters() { return parameters; } + public static boolean isTreeModelDataAllowedToBeCaptured(final Map parameters) { + return parameters.containsKey(TREE) || !parameters.containsKey(TABLE); + } + + public static boolean isTableModelDataAllowedToBeCaptured(final Map parameters) { + return parameters.containsKey(TABLE); + } + protected abstract PipeRequestType getPlanType(); /////////////////////////////// Thrift /////////////////////////////// diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java new file mode 100644 index 0000000000000..4ac0e5cb7c333 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.request; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PipeTransferFileSealReqV2Test { + + @Test + public void testLegacyV13SnapshotSealCapturesTreeOnly() { + final Map parameters = new HashMap<>(); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTreeOnlySnapshotSealCapturesTreeOnly() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TREE, ""); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTableOnlySnapshotSealCapturesTableOnly() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TABLE, ""); + + Assert.assertFalse(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTreeAndTableSnapshotSealCapturesBoth() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TREE, ""); + parameters.put(PipeTransferFileSealReqV2.TABLE, ""); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } +}