Skip to content

Commit 3e4661b

Browse files
authored
Pipe: Optimized the tablet size by memory estimation (#17452)
* fix * push * ger-limit * fix * fix * fix * sptls * Update CommonConfig.java
1 parent ab6289a commit 3e4661b

File tree

8 files changed

+68
-30
lines changed

8 files changed

+68
-30
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.commons.conf.CommonDescriptor;
2626
import org.apache.iotdb.commons.conf.IoTDBConstant;
2727
import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
28+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2829
import org.apache.iotdb.commons.utils.FileUtils;
2930
import org.apache.iotdb.consensus.ConsensusFactory;
3031
import org.apache.iotdb.db.exception.LoadConfigurationException;
@@ -3287,6 +3288,15 @@ public void setPartitionCacheSize(int partitionCacheSize) {
32873288
this.partitionCacheSize = partitionCacheSize;
32883289
}
32893290

3291+
public int getPipeDataStructureTabletSizeInBytes() {
3292+
int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
3293+
if (size > thriftMaxFrameSize) {
3294+
size = (int) (thriftMaxFrameSize * 0.8);
3295+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
3296+
}
3297+
return size;
3298+
}
3299+
32903300
public int getAuthorCacheSize() {
32913301
return authorCacheSize;
32923302
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.apache.iotdb.commons.audit.IAuditEntity;
2323
import org.apache.iotdb.commons.path.PatternTreeMap;
2424
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
25-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2625
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
2726
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
27+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2828
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
2929
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
3030
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -106,7 +106,7 @@ protected TsFileInsertionEventParser(
106106
this.allocatedMemoryBlockForTablet =
107107
PipeDataNodeResourceManager.memory()
108108
.forceAllocateForTabletWithRetry(
109-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
109+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
110110
}
111111

112112
/**

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.commons.pipe.config.PipeConfig;
3030
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
3131
import org.apache.iotdb.db.auth.AuthorityChecker;
32+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3233
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
3334
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
3435
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
@@ -135,7 +136,7 @@ public TsFileInsertionEventScanParser(
135136
this.allocatedMemoryBlockForBatchData =
136137
PipeDataNodeResourceManager.memory()
137138
.forceAllocateForTabletWithRetry(
138-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
139+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
139140
this.allocatedMemoryBlockForChunk =
140141
PipeDataNodeResourceManager.memory()
141142
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public TsFileInsertionEventTableParser(
9393
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
9494
long tableSize =
9595
Math.min(
96-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(),
96+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
9797
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
9898

9999
this.allocatedMemoryBlockForChunk =
@@ -107,7 +107,9 @@ public TsFileInsertionEventTableParser(
107107
this.allocatedMemoryBlockForTableSchemas =
108108
PipeDataNodeResourceManager.memory()
109109
.forceAllocateForTabletWithRetry(
110-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
110+
IoTDBDescriptor.getInstance()
111+
.getConfig()
112+
.getPipeDataStructureTabletSizeInBytes());
111113

112114
this.startTime = startTime;
113115
this.endTime = endTime;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.resource.memory;
2121

2222
import org.apache.iotdb.commons.pipe.config.PipeConfig;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2324
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
2425
import org.apache.iotdb.db.utils.MemUtils;
2526

@@ -118,7 +119,10 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(RowRecord
118119
}
119120
}
120121

121-
return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
122+
return calculateTabletRowCountAndMemoryBySize(
123+
totalSizeInBytes,
124+
schemaCount,
125+
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
122126
}
123127

124128
/**
@@ -163,7 +167,8 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
163167
}
164168
}
165169

166-
return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
170+
return calculateTabletRowCountAndMemoryBySize(
171+
totalSizeInBytes, schemaCount, batchData.length());
167172
}
168173

169174
/**
@@ -173,33 +178,38 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
173178
* @return left is the row count of tablet, right is the memory cost of tablet in bytes
174179
*/
175180
public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(PipeRow row) {
176-
return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size());
181+
return calculateTabletRowCountAndMemoryBySize(
182+
row.getCurrentRowSize(),
183+
row.size(),
184+
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
177185
}
178186

179187
private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
180-
int rowSize, int schemaCount) {
181-
if (rowSize <= 0) {
188+
int rowBytesUsed, int schemaCount, int inputNum) {
189+
if (rowBytesUsed <= 0) {
182190
return new Pair<>(1, 0);
183191
}
184192

185193
// Calculate row number according to the max size of a pipe tablet.
186194
// "-100" is the estimated size of other data structures in a pipe tablet.
187195
// "*8" converts bytes to bits, because the bitmap size is 1 bit per schema.
188-
int rowNumber =
189-
8
190-
* (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
191-
/ (8 * rowSize + schemaCount);
196+
// Here we estimate the max use of
197+
int sizeLimit =
198+
Math.min(
199+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
200+
(int) (inputNum * rowBytesUsed * 1.2));
201+
202+
int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
192203
rowNumber = Math.max(1, rowNumber);
193204

194205
if ( // This means the row number is larger than the max row count of a pipe tablet
195206
rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
196207
// Bound the row number, the memory cost is rowSize * rowNumber
197208
return new Pair<>(
198209
PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
199-
rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
210+
rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
200211
} else {
201-
return new Pair<>(
202-
rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
212+
return new Pair<>(rowNumber, sizeLimit);
203213
}
204214
}
205215

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.apache.iotdb.db.pipe.event;
2121

22+
import org.apache.iotdb.commons.conf.CommonDescriptor;
2223
import org.apache.iotdb.commons.path.PartialPath;
24+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2325
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
2426
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
2527
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -48,6 +50,7 @@
4850
import org.apache.tsfile.write.schema.MeasurementSchema;
4951
import org.junit.After;
5052
import org.junit.Assert;
53+
import org.junit.Before;
5154
import org.junit.Test;
5255
import org.slf4j.Logger;
5356
import org.slf4j.LoggerFactory;
@@ -79,9 +82,19 @@ public class TsFileInsertionEventParserTest {
7982
private File alignedTsFile;
8083
private File nonalignedTsFile;
8184
private TsFileResource resource;
85+
private boolean isPipeMemoryManagementEnabled;
86+
87+
@Before
88+
public void setUp() throws Exception {
89+
isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled();
90+
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
91+
}
8292

8393
@After
8494
public void tearDown() throws Exception {
95+
CommonDescriptor.getInstance()
96+
.getConfig()
97+
.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
8598
if (alignedTsFile != null) {
8699
alignedTsFile.delete();
87100
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,9 @@ public class CommonConfig {
218218
private boolean pipeRetryLocallyForParallelOrUserConflict = true;
219219

220220
private int pipeDataStructureTabletRowSize = 2048;
221-
private int pipeDataStructureTabletSizeInBytes = 2097152;
221+
222+
// 60MB
223+
private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
222224
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3;
223225
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3;
224226
private volatile double pipeTotalFloatingMemoryProportion = 0.5;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
146146
} catch (Exception e) {
147147
PipeLogger.log(
148148
LOGGER::warn,
149+
e,
149150
"Receiver id = %s: Failed to delete original receiver file dir %s, because %s.",
150151
receiverId.get(),
151152
receiverFileDirWithIdSuffix.get().getPath(),
152-
e.getMessage(),
153-
e);
153+
e.getMessage());
154154
}
155155
} else {
156156
if (LOGGER.isDebugEnabled()) {
@@ -184,9 +184,9 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
184184
} catch (Exception e) {
185185
PipeLogger.log(
186186
LOGGER::warn,
187+
e,
187188
"Receiver id = %s: Failed to create pipe receiver file folder because all disks of folders are full.",
188-
receiverId.get(),
189-
e);
189+
receiverId.get());
190190
return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
191191
}
192192

@@ -535,11 +535,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
535535
} catch (final Exception e) {
536536
PipeLogger.log(
537537
LOGGER::warn,
538+
e,
538539
"Receiver id = %s: Failed to close current writing file writer %s, because %s.",
539540
receiverId.get(),
540541
writingFile == null ? "null" : writingFile.getPath(),
541-
e.getMessage(),
542-
e);
542+
e.getMessage());
543543
}
544544
writingFileWriter = null;
545545
} else {
@@ -574,11 +574,11 @@ private void deleteFile(final File file) {
574574
} catch (final Exception e) {
575575
PipeLogger.log(
576576
LOGGER::warn,
577+
e,
577578
"Receiver id = %s: Failed to delete original writing file %s, because %s.",
578579
receiverId.get(),
579580
file.getPath(),
580-
e.getMessage(),
581-
e);
581+
e.getMessage());
582582
}
583583
} else {
584584
if (LOGGER.isDebugEnabled()) {
@@ -659,11 +659,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil
659659
} catch (final Exception e) {
660660
PipeLogger.log(
661661
LOGGER::warn,
662+
e,
662663
"Receiver id = %s: Failed to seal file %s from req %s.",
663664
receiverId.get(),
664665
writingFile,
665-
req,
666-
e);
666+
req);
667667
return new TPipeTransferResp(
668668
RpcUtils.getStatus(
669669
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -756,11 +756,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil
756756
} catch (final Exception e) {
757757
PipeLogger.log(
758758
LOGGER::warn,
759+
e,
759760
"Receiver id = %s: Failed to seal file %s from req %s.",
760761
receiverId.get(),
761762
files,
762-
req,
763-
e);
763+
req);
764764
return new TPipeTransferResp(
765765
RpcUtils.getStatus(
766766
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,

0 commit comments

Comments
 (0)