Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ public void testTransferMods() {
Collections.singleton("5,"));

TestUtils.executeNonQueries(
senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime"));
senderEnv,
Arrays.asList("drop pipe if exists test_history", "drop pipe if exists test_realtime"));

TestUtils.executeNonQuery(receiverEnv, "drop database root.**");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.exception.LoadConfigurationException;
Expand Down Expand Up @@ -3287,6 +3288,15 @@ public void setPartitionCacheSize(int partitionCacheSize) {
this.partitionCacheSize = partitionCacheSize;
}

public int getPipeDataStructureTabletSizeInBytes() {
int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
if (size > thriftMaxFrameSize) {
size = (int) (thriftMaxFrameSize * 0.8);
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
}
return size;
}

public int getAuthorCacheSize() {
return authorCacheSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand Down Expand Up @@ -106,7 +106,7 @@ protected TsFileInsertionEventParser(
this.allocatedMemoryBlockForTablet =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
Expand Down Expand Up @@ -135,7 +136,7 @@ public TsFileInsertionEventScanParser(
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public TsFileInsertionEventTableParser(
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
long tableSize =
Math.min(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(),
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());

this.allocatedMemoryBlockForChunk =
Expand All @@ -107,7 +107,9 @@ public TsFileInsertionEventTableParser(
this.allocatedMemoryBlockForTableSchemas =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
IoTDBDescriptor.getInstance()
.getConfig()
.getPipeDataStructureTabletSizeInBytes());

this.startTime = startTime;
this.endTime = endTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.resource.memory;

import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.utils.MemUtils;

Expand Down Expand Up @@ -118,7 +119,10 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(RowRecord
}
}

return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
return calculateTabletRowCountAndMemoryBySize(
totalSizeInBytes,
schemaCount,
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}

/**
Expand Down Expand Up @@ -163,7 +167,8 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
}
}

return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
return calculateTabletRowCountAndMemoryBySize(
totalSizeInBytes, schemaCount, batchData.length());
}

/**
Expand All @@ -173,33 +178,38 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
* @return left is the row count of tablet, right is the memory cost of tablet in bytes
*/
public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(PipeRow row) {
return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size());
return calculateTabletRowCountAndMemoryBySize(
row.getCurrentRowSize(),
row.size(),
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}

private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
int rowSize, int schemaCount) {
if (rowSize <= 0) {
int rowBytesUsed, int schemaCount, int inputNum) {
if (rowBytesUsed <= 0) {
return new Pair<>(1, 0);
}

// Calculate row number according to the max size of a pipe tablet.
// "-100" is the estimated size of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per schema.
int rowNumber =
8
* (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
/ (8 * rowSize + schemaCount);
// Here we estimate the max use of
int sizeLimit =
Math.min(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
(int) (inputNum * rowBytesUsed * 1.2));

int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);

if ( // This means the row number is larger than the max row count of a pipe tablet
rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
// Bound the row number, the memory cost is rowSize * rowNumber
return new Pair<>(
PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
} else {
return new Pair<>(
rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
return new Pair<>(rowNumber, sizeLimit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.iotdb.db.pipe.event;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,9 +82,19 @@ public class TsFileInsertionEventParserTest {
private File alignedTsFile;
private File nonalignedTsFile;
private TsFileResource resource;
private boolean isPipeMemoryManagementEnabled;

@Before
public void setUp() throws Exception {
isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled();
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
}

@After
public void tearDown() throws Exception {
CommonDescriptor.getInstance()
.getConfig()
.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
if (alignedTsFile != null) {
alignedTsFile.delete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@
private boolean pipeRetryLocallyForParallelOrUserConflict = true;

private int pipeDataStructureTabletRowSize = 2048;
private int pipeDataStructureTabletSizeInBytes = 2097152;

// 128MB
private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;
Expand Down Expand Up @@ -1711,7 +1713,7 @@
return;
}
this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled);

Check warning on line 1716 in iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 106).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1xx07BjeKRFfAU6_i7&open=AZ1xx07BjeKRFfAU6_i7&pullRequest=17452
}

public long getPipePeriodicalLogMinIntervalSeconds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
} catch (Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to delete original receiver file dir %s, because %s.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
e.getMessage(),
e);
e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -183,9 +183,9 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
} catch (Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to create pipe receiver file folder because all disks of folders are full.",
receiverId.get(),
e);
receiverId.get());
return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}

Expand Down Expand Up @@ -524,11 +524,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to close current writing file writer %s, because %s.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
e.getMessage(),
e);
e.getMessage());
}
writingFileWriter = null;
} else {
Expand Down Expand Up @@ -563,11 +563,11 @@ private void deleteFile(final File file) {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to delete original writing file %s, because %s.",
receiverId.get(),
file.getPath(),
e.getMessage(),
e);
e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -648,11 +648,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
writingFile,
req,
e);
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
Expand Down Expand Up @@ -745,11 +745,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
files,
req,
e);
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
Expand Down
Loading