Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1277,15 +1277,15 @@ protected TSStatus loadFileV2(
PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final boolean isTreeModelDataAllowedToBeCaptured =
parameters.containsKey(PipeTransferFileSealReqV2.TREE);
PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters);
final TreePattern treePattern =
TreePattern.parsePatternFromString(
parameters.get(ColumnHeaderConstant.PATH_PATTERN),
isTreeModelDataAllowedToBeCaptured,
p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
final TablePattern tablePattern =
new TablePattern(
parameters.containsKey(PipeTransferFileSealReqV2.TABLE),
PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters),
parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN),
parameters.get(ColumnHeaderConstant.TABLE_NAME));
final List<TSStatus> results = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,15 @@ private TSStatus loadSchemaSnapShot(
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final boolean isTreeModelDataAllowedToBeCaptured =
parameters.containsKey(PipeTransferFileSealReqV2.TREE);
PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters);
final TreePattern treePattern =
TreePattern.parsePatternFromString(
parameters.get(ColumnHeaderConstant.PATH_PATTERN),
isTreeModelDataAllowedToBeCaptured,
p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
final TablePattern tablePattern =
new TablePattern(
parameters.containsKey(PipeTransferFileSealReqV2.TABLE),
PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters),
parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN),
parameters.get(ColumnHeaderConstant.TABLE_NAME));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,7 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq(
final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) {
final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();

final int startPosition = buffer.position();
try {
final InsertTabletStatement insertTabletStatement =
TabletStatementConverter.deserializeStatementFromTabletFormat(
buffer, false, tabletStringInternPool);
tabletReq.isAligned = insertTabletStatement.isAligned();
tabletReq.statement = insertTabletStatement;
} catch (final Exception e) {
buffer.position(startPosition);
tabletReq.tablet =
PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
}
tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool);

return tabletReq;
}
Expand Down Expand Up @@ -167,6 +155,33 @@ public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferR
return tabletReq;
}

private void deserializeTPipeTransferRawReq(
final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) {
final int startPosition = buffer.position();
try {
final InsertTabletStatement insertTabletStatement =
TabletStatementConverter.deserializeLegacyStatementFromTabletFormat(
buffer, tabletStringInternPool);
isAligned = insertTabletStatement.isAligned();
statement = insertTabletStatement;
return;
} catch (final Exception e) {
buffer.position(startPosition);
}

try {
final InsertTabletStatement insertTabletStatement =
TabletStatementConverter.deserializeStatementFromTabletFormat(
buffer, false, tabletStringInternPool);
isAligned = insertTabletStatement.isAligned();
statement = insertTabletStatement;
} catch (final Exception e) {
buffer.position(startPosition);
tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
isAligned = ReadWriteIOUtils.readBool(buffer);
}
}

/////////////////////////////// Air Gap ///////////////////////////////

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat(
final boolean readDatabaseName,
final TabletStringInternPool tabletStringInternPool)
throws IllegalPathException {
return deserializeStatementFromTabletFormat(
byteBuffer, readDatabaseName, tabletStringInternPool, true);
}

public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat(
final ByteBuffer byteBuffer) throws IllegalPathException {
return deserializeLegacyStatementFromTabletFormat(byteBuffer, null);
}

public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat(
final ByteBuffer byteBuffer, final TabletStringInternPool tabletStringInternPool)
throws IllegalPathException {
return deserializeStatementFromTabletFormat(byteBuffer, false, tabletStringInternPool, false);
}

private static InsertTabletStatement deserializeStatementFromTabletFormat(
final ByteBuffer byteBuffer,
final boolean readDatabaseName,
final TabletStringInternPool tabletStringInternPool,
final boolean readColumnCategory)
throws IllegalPathException {
final InsertTabletStatement statement = new InsertTabletStatement();

// Calculate memory size during deserialization, use INSTANCE_SIZE constant
Expand Down Expand Up @@ -132,9 +153,11 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat(
final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, tabletStringInternPool);
measurement[i] = pair.getLeft();
dataTypes[i] = pair.getRight();
columnCategories[i] =
TsTableColumnCategory.fromTsFileColumnCategory(
ColumnCategory.values()[byteBuffer.get()]);
if (readColumnCategory) {
columnCategories[i] =
TsTableColumnCategory.fromTsFileColumnCategory(
ColumnCategory.values()[byteBuffer.get()]);
}

// Calculate memory for each measurement string
if (measurement[i] != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.sink;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.schema.SchemaConstant;
Expand Down Expand Up @@ -518,6 +520,42 @@ public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() thro
insertTabletStatements.get(1).getMeasurements()[0]);
}

@Test
public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws IOException {
final List<ByteBuffer> tabletBuffers = new ArrayList<>();
tabletBuffers.add(serializeLegacyTabletRawBuffer(false));
tabletBuffers.add(serializeLegacyTabletRawBuffer(true));

final PipeTransferTabletBatchReq req =
PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers);

final PipeTransferTabletBatchReq deserializedReq =
PipeTransferTabletBatchReq.fromTPipeTransferReq(req);

Assert.assertEquals(2, deserializedReq.getTabletReqs().size());
Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
Assert.assertTrue(deserializedReq.getTabletReqs().get(1).getIsAligned());

assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(1).constructStatement());
}

@Test
public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOException {
final TPipeTransferReq req = new TPipeTransferReq();
req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
req.body = serializeLegacyTabletRawBuffer(true);

final PipeTransferTabletRawReq deserializedReq =
PipeTransferTabletRawReq.fromTPipeTransferReq(req);

Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
Assert.assertEquals(req.getType(), deserializedReq.getType());
Assert.assertTrue(deserializedReq.getIsAligned());
assertLegacyTabletStatement(deserializedReq.constructStatement());
}

@Test
public void testPipeTransferTabletBatchReqV2() throws IOException {
final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
Expand Down Expand Up @@ -814,4 +852,56 @@ private static ByteBuffer serializeTablet(final Tablet tablet, final boolean isA
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
}

private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write("root.sg.d", outputStream);
ReadWriteIOUtils.write(2, outputStream);

ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write(2, outputStream);
writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32);
writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT);

ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write(2L, outputStream);
ReadWriteIOUtils.write(1L, outputStream);

ReadWriteIOUtils.write((byte) 0, outputStream);

ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write(2, outputStream);
ReadWriteIOUtils.write(1, outputStream);
ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write(new Binary("2", TSFileConfig.STRING_CHARSET), outputStream);
ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write(new Binary("1", TSFileConfig.STRING_CHARSET), outputStream);

ReadWriteIOUtils.write(isAligned, outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
}

private static void writeLegacyMeasurementSchema(
final DataOutputStream outputStream, final String measurement, final TSDataType dataType)
throws IOException {
ReadWriteIOUtils.write((byte) 1, outputStream);
ReadWriteIOUtils.write(measurement, outputStream);
ReadWriteIOUtils.write(dataType.serialize(), outputStream);
ReadWriteIOUtils.write(TSEncoding.PLAIN.serialize(), outputStream);
ReadWriteIOUtils.write(CompressionType.UNCOMPRESSED.serialize(), outputStream);
ReadWriteIOUtils.write(0, outputStream);
}

private static void assertLegacyTabletStatement(final InsertTabletStatement statement) {
Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath());
Assert.assertArrayEquals(new String[] {"s1", "s2"}, statement.getMeasurements());
Assert.assertArrayEquals(
new TSDataType[] {TSDataType.INT32, TSDataType.TEXT}, statement.getDataTypes());
Assert.assertEquals(2, statement.getRowCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ public final Map<String, String> getParameters() {
return parameters;
}

public static boolean isTreeModelDataAllowedToBeCaptured(final Map<String, String> parameters) {
return parameters.containsKey(TREE) || !parameters.containsKey(TABLE);
}

public static boolean isTableModelDataAllowedToBeCaptured(final Map<String, String> parameters) {
return parameters.containsKey(TABLE);
}

protected abstract PipeRequestType getPlanType();

/////////////////////////////// Thrift ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class PipeTransferFileSealReqV2Test {

@Test
public void testLegacyV13SnapshotSealCapturesTreeOnly() {
final Map<String, String> parameters = new HashMap<>();

Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
}

@Test
public void testExplicitTreeOnlySnapshotSealCapturesTreeOnly() {
final Map<String, String> parameters = new HashMap<>();
parameters.put(PipeTransferFileSealReqV2.TREE, "");

Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
}

@Test
public void testExplicitTableOnlySnapshotSealCapturesTableOnly() {
final Map<String, String> parameters = new HashMap<>();
parameters.put(PipeTransferFileSealReqV2.TABLE, "");

Assert.assertFalse(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
}

@Test
public void testExplicitTreeAndTableSnapshotSealCapturesBoth() {
final Map<String, String> parameters = new HashMap<>();
parameters.put(PipeTransferFileSealReqV2.TREE, "");
parameters.put(PipeTransferFileSealReqV2.TABLE, "");

Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
}
}
Loading