diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 4852b9d116e25..64fa6aa760335 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -502,6 +502,13 @@ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) { return this; } + @Override + public CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + setProperty( + "single_measurement_check_cache_size", String.valueOf(singleMeasurementCheckCacheSize)); + return this; + } + @Override public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) { setProperty("dn_connection_timeout_ms", String.valueOf(connectionTimeoutMs)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 582c9a049e492..522c09e119935 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -521,6 +521,13 @@ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) { return this; } + @Override + public CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + dnConfig.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize); + cnConfig.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize); + return this; + } + @Override public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) { dnConfig.setDnConnectionTimeoutMs(connectionTimeoutMs); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48c157e957be8..1420420f6adbf 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -368,6 +368,11 @@ public CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize) { return this; } + @Override + public CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + return this; + } + @Override public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index dc21234e2bad2..51a2ccd12f75f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -162,6 +162,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize); + CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize); + CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs); CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java new file mode 100644 index 0000000000000..5606bf6c7738e --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.performance; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.TSDataType; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +@Ignore +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSingleMeasurementCheckCachePerformanceIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBSingleMeasurementCheckCachePerformanceIT.class); + + private static final ExperimentGroup[] EXPERIMENT_GROUPS = { + new ExperimentGroup(1_000, 500), + new ExperimentGroup(2_000, 2_000), + new ExperimentGroup(4_000, 8_000) + }; + private static final int CACHE_DISABLED_SIZE = 0; + private static final int REPEAT_COUNT = 3; + private static final int BATCH_COUNT = 800; + private static final int ROWS_PER_BATCH = 100; + private static final String DEVICE = "root.sg_cache_perf.d1"; + + @Test + public void testEndToEndWritePerformanceWithDifferentSingleMeasurementCheckCacheSizes() + throws Exception { + for (ExperimentGroup experimentGroup : EXPERIMENT_GROUPS) { + long totalDisabledCost = 0; + long totalEnabledCost = 0; + for (int repeatIndex = 0; repeatIndex < REPEAT_COUNT; repeatIndex++) { + long disabledCost = + runWritePerformanceExperiment( + new Experiment(experimentGroup.measurementCount, CACHE_DISABLED_SIZE)); + long enabledCost = + runWritePerformanceExperiment( + new Experiment(experimentGroup.measurementCount, experimentGroup.cacheSize)); + totalDisabledCost += disabledCost; + totalEnabledCost += enabledCost; + LOGGER.info( + "End-to-end write cost repeat {}/{} with measurementCount={}, cache disabled: {} ms, " + + "cacheSize={} (cacheSize {} measurementCount): {} ms, enabled/disabled ratio: {}", + repeatIndex + 1, + REPEAT_COUNT, + experimentGroup.measurementCount, + disabledCost / 1_000_000, + experimentGroup.cacheSize, + experimentGroup.cacheSizeRelation(), + enabledCost / 1_000_000, + String.format("%.3f", (double) enabledCost / disabledCost)); + } + long averageDisabledCost = totalDisabledCost / REPEAT_COUNT; + long averageEnabledCost = totalEnabledCost / REPEAT_COUNT; + LOGGER.info( + "Average end-to-end write cost after {} repeats with measurementCount={}, cache disabled: " + + "{} ms, cacheSize={} (cacheSize {} measurementCount): {} ms, enabled/disabled " + + "ratio: {}", + REPEAT_COUNT, + experimentGroup.measurementCount, + averageDisabledCost / 1_000_000, + experimentGroup.cacheSize, + experimentGroup.cacheSizeRelation(), + averageEnabledCost / 1_000_000, + String.format("%.3f", (double) averageEnabledCost / averageDisabledCost)); + Assert.assertTrue(totalDisabledCost > 0); + Assert.assertTrue(totalEnabledCost > 0); + } + } + + private long runWritePerformanceExperiment(Experiment experiment) throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setSingleMeasurementCheckCacheSize(experiment.cacheSize) + .setAutoCreateSchemaEnabled(false); + EnvFactory.getEnv().initClusterEnvironment(); + try { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + createTimeseries(session, experiment.measurementCount); + long cost = executeWriteWorkload(session, experiment.measurementCount); + assertRowCount(session); + return cost; + } + } finally { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + } + + private void createTimeseries(ISession session, int measurementCount) + throws IoTDBConnectionException, StatementExecutionException { + session.executeNonQueryStatement("CREATE DATABASE root.sg_cache_perf"); + for (int i = 0; i < measurementCount; i++) { + session.executeNonQueryStatement( + "CREATE TIMESERIES " + + DEVICE + + ".`sensor+" + + i + + "` WITH DATATYPE=INT64, ENCODING=PLAIN"); + } + } + + private long executeWriteWorkload(ISession session, int measurementCount) + throws IoTDBConnectionException, StatementExecutionException { + List measurements = new ArrayList<>(measurementCount); + List types = new ArrayList<>(measurementCount); + List values = new ArrayList<>(measurementCount); + for (int i = 0; i < measurementCount; i++) { + measurements.add("`sensor+" + i + "`"); + types.add(TSDataType.INT64); + values.add((long) i); + } + + long startTime = System.nanoTime(); + for (int batchIndex = 0; batchIndex < BATCH_COUNT; batchIndex++) { + List timestamps = new ArrayList<>(ROWS_PER_BATCH); + List> measurementsList = new ArrayList<>(ROWS_PER_BATCH); + List> typesList = new ArrayList<>(ROWS_PER_BATCH); + List> valuesList = new ArrayList<>(ROWS_PER_BATCH); + for (int rowIndex = 0; rowIndex < ROWS_PER_BATCH; rowIndex++) { + timestamps.add((long) batchIndex * ROWS_PER_BATCH + rowIndex); + measurementsList.add(measurements); + typesList.add(types); + valuesList.add(values); + } + session.insertRecordsOfOneDevice(DEVICE, timestamps, measurementsList, typesList, valuesList); + } + return System.nanoTime() - startTime; + } + + private void assertRowCount(ISession session) + throws IoTDBConnectionException, StatementExecutionException { + try (org.apache.iotdb.isession.SessionDataSet dataSet = + session.executeQueryStatement("SELECT COUNT(`sensor+0`) FROM " + DEVICE)) { + Assert.assertTrue(dataSet.hasNext()); + Assert.assertEquals( + (long) BATCH_COUNT * ROWS_PER_BATCH, dataSet.next().getFields().get(0).getLongV()); + Assert.assertFalse(dataSet.hasNext()); + } + } + + private static class ExperimentGroup { + + private final int measurementCount; + private final int cacheSize; + + private ExperimentGroup(int measurementCount, int cacheSize) { + this.measurementCount = measurementCount; + this.cacheSize = cacheSize; + } + + private String cacheSizeRelation() { + if (cacheSize < measurementCount) { + return "<"; + } + if (cacheSize == measurementCount) { + return "="; + } + return ">"; + } + } + + private static class Experiment { + + private final int measurementCount; + private final int cacheSize; + + private Experiment(int measurementCount, int cacheSize) { + this.measurementCount = measurementCount; + this.cacheSize = cacheSize; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java index c32f2301289db..64eb85a175391 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java @@ -28,7 +28,9 @@ import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.external.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -355,6 +357,8 @@ private void createLinksFromSnapshotToSourceDir( String targetSuffix, File[] files, FolderManager folderManager) throws IOException { Map fileTarget = new HashMap<>(); for (File file : files) { + checkTsFileResourceExists(file); + String fileKey = file.getName().split("\\.")[0]; String dataDir = fileTarget.get(fileKey); @@ -388,6 +392,17 @@ private void createLinksFromSnapshotToSourceDir( } } + private void checkTsFileResourceExists(File file) { + if (!file.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) { + return; + } + + String resourceFileName = file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX; + if (!new File(resourceFileName).exists()) { + LOGGER.warn("The associated resource file of {} is not found in the snapshot", file); + } + } + private void createLinksFromSnapshotDirToDataDirWithLog() throws IOException { String snapshotId = logAnalyzer.getSnapshotId(); int loggedFileNum = logAnalyzer.getTotalFileCountInSnapshot(); diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 531cbac91a800..9910dace7d882 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -909,6 +909,12 @@ tag_attribute_flush_interval=1000 # Datatype: int tag_attribute_total_size=700 +# The maximum number of single measurement check results cached by PathUtils. +# Set to 0 to disable this cache. +# effectiveMode: restart +# Datatype: int +single_measurement_check_cache_size=10000 + # max measurement num of internal request # When creating timeseries with Session.createMultiTimeseries, the user input plan, the timeseries num of # which exceeds this num, will be split to several plans with timeseries no more than this num. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 6a8956e423b48..830659f7e17fc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -407,6 +407,8 @@ public class CommonConfig { // Max size for tag and attribute of one time series private int tagAttributeTotalSize = 700; + private int singleMeasurementCheckCacheSize = 10_000; + // maximum number of Cluster Databases allowed private int databaseLimitThreshold = -1; @@ -2611,6 +2613,14 @@ public void setTagAttributeTotalSize(int tagAttributeTotalSize) { this.tagAttributeTotalSize = tagAttributeTotalSize; } + public int getSingleMeasurementCheckCacheSize() { + return singleMeasurementCheckCacheSize; + } + + public void setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + this.singleMeasurementCheckCacheSize = singleMeasurementCheckCacheSize; + } + public int getDatabaseLimitThreshold() { return databaseLimitThreshold; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 5cd954a09f7b8..3215d29843f15 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -276,6 +276,15 @@ public void loadCommonProps(TrimProperties properties) throws IOException { properties.getProperty( "tag_attribute_total_size", String.valueOf(config.getTagAttributeTotalSize())))); + int singleMeasurementCheckCacheSize = + Integer.parseInt( + properties.getProperty( + "single_measurement_check_cache_size", + String.valueOf(config.getSingleMeasurementCheckCacheSize()))); + if (singleMeasurementCheckCacheSize >= 0) { + config.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize); + } + config.setTimePartitionOrigin( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java index 615bd62453fbc..1803ebb68cae7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java @@ -18,10 +18,13 @@ */ package org.apache.iotdb.commons.utils; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.exception.PathParseException; import org.apache.tsfile.file.metadata.IDeviceID; @@ -35,6 +38,12 @@ public class PathUtils { + private static final Cache SINGLE_MEASUREMENT_CHECK_CACHE = + Caffeine.newBuilder() + .maximumSize( + CommonDescriptor.getInstance().getConfig().getSingleMeasurementCheckCacheSize()) + .build(); + /** * @param path the path will split. ex, root.ln. * @return string array. ex, [root, ln] @@ -162,20 +171,29 @@ public static String checkAndReturnSingleMeasurement(String measurement) if (measurement == null) { return null; } + SingleMeasurementCheckResult result = + SINGLE_MEASUREMENT_CHECK_CACHE.get(measurement, PathUtils::checkSingleMeasurement); + if (result.isLegal()) { + return result.getMeasurement(); + } + throw new IllegalPathException(measurement); + } + + private static SingleMeasurementCheckResult checkSingleMeasurement(String measurement) { if (measurement.startsWith(TsFileConstant.BACK_QUOTE_STRING) && measurement.endsWith(TsFileConstant.BACK_QUOTE_STRING)) { if (checkBackQuotes(measurement.substring(1, measurement.length() - 1))) { - return removeBackQuotesIfNecessary(measurement); + return SingleMeasurementCheckResult.legal(removeBackQuotesIfNecessary(measurement)); } else { - throw new IllegalPathException(measurement); + return SingleMeasurementCheckResult.illegal(); } } if (IoTDBConstant.reservedWords.contains(measurement.toUpperCase()) || isRealNumber(measurement) || !TsFileConstant.NODE_NAME_PATTERN.matcher(measurement).matches()) { - throw new IllegalPathException(measurement); + return SingleMeasurementCheckResult.illegal(); } - return measurement; + return SingleMeasurementCheckResult.legal(measurement); } /** Return true if the str is a real number. Examples: 1.0; +1.0; -1.0; 0011; 011e3; +23e-3 */ @@ -225,4 +243,34 @@ public static String unQualifyDatabaseName(String databaseName) { public static boolean isTableModelDatabase(final String databaseName) { return !databaseName.startsWith("root."); } + + private static class SingleMeasurementCheckResult { + + private static final SingleMeasurementCheckResult ILLEGAL = + new SingleMeasurementCheckResult(false, null); + + private final boolean legal; + private final String measurement; + + private SingleMeasurementCheckResult(boolean legal, String measurement) { + this.legal = legal; + this.measurement = measurement; + } + + private static SingleMeasurementCheckResult legal(String measurement) { + return new SingleMeasurementCheckResult(true, measurement); + } + + private static SingleMeasurementCheckResult illegal() { + return ILLEGAL; + } + + private boolean isLegal() { + return legal; + } + + private String getMeasurement() { + return measurement; + } + } }