Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testTimePartition() throws Exception {
}
timestatmps.forEach(
t -> {
long timePartitionId = TimePartitionUtils.getTimePartitionId(t);
long timePartitionId = TimePartitionUtils.getTimePartitionId(t, "root.sg1");
assertTrue(timePartitions.contains(timePartitionId));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ databaseAttributeClause
databaseAttributeKey
: TTL
| TIME_PARTITION_INTERVAL
| TIME_PARTITION_ORIGIN
| SCHEMA_REGION_GROUP_NUM
| DATA_REGION_GROUP_NUM
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,10 @@ TIME_PARTITION_INTERVAL
: T I M E '_' P A R T I T I O N '_' I N T E R V A L
;

TIME_PARTITION_ORIGIN
: T I M E '_' P A R T I T I O N '_' O R I G I N
;

SCHEMA_REGION_GROUP_NUM
: S C H E M A '_' R E G I O N '_' G R O U P '_' N U M
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -232,6 +233,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case SetTimePartitionInterval:
plan = new SetTimePartitionIntervalPlan();
break;
case SetTimePartitionOrigin:
plan = new SetTimePartitionOriginPlan();
break;
case AdjustMaxRegionGroupNum:
plan = new AdjustMaxRegionGroupNumPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum ConfigPhysicalPlanType {
SetSchemaReplicationFactor((short) 202),
SetDataReplicationFactor((short) 203),
SetTimePartitionInterval((short) 204),
SetTimePartitionOrigin((short) 212),
AdjustMaxRegionGroupNum((short) 205),
DeleteDatabase((short) 206),
PreDeleteDatabase((short) 207),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.database;

import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class SetTimePartitionOriginPlan extends ConfigPhysicalPlan {

private String storageGroup;

private long timePartitionOrigin;

public SetTimePartitionOriginPlan() {
super(ConfigPhysicalPlanType.SetTimePartitionOrigin);
}

public SetTimePartitionOriginPlan(String storageGroup, long timePartitionOrigin) {
this();
this.storageGroup = storageGroup;
this.timePartitionOrigin = timePartitionOrigin;
}

public String getDatabase() {
return storageGroup;
}

public long getTimePartitionOrigin() {
return timePartitionOrigin;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

BasicStructureSerDeUtil.write(storageGroup, stream);
stream.writeLong(timePartitionOrigin);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
storageGroup = BasicStructureSerDeUtil.readString(buffer);
timePartitionOrigin = buffer.getLong();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SetTimePartitionOriginPlan that = (SetTimePartitionOriginPlan) o;
return timePartitionOrigin == that.timePartitionOrigin
&& storageGroup.equals(that.storageGroup);
}

@Override
public int hashCode() {
return Objects.hash(storageGroup, timePartitionOrigin);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
Expand Down Expand Up @@ -720,6 +721,16 @@ public TSStatus setTimePartitionInterval(
}
}

@Override
public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.setTimePartitionOrigin(setTimePartitionOriginPlan);
} else {
return status;
}
}

@Override
public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.cq.CQManager;
Expand Down Expand Up @@ -386,6 +387,8 @@ public interface IManager {

TSStatus setTimePartitionInterval(SetTimePartitionIntervalPlan configPhysicalPlan);

TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan configPhysicalPlan);

/**
* Count Databases.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableColumnCommentPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableCommentPlan;
Expand Down Expand Up @@ -470,6 +471,17 @@ public TSStatus setTimePartitionInterval(
}
}

public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
try {
return getConsensusManager().write(setTimePartitionOriginPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}

/**
* Only leader use this interface. Adjust the maxSchemaRegionGroupNum and maxDataRegionGroupNum of
* each Database based on existing cluster resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -435,6 +436,8 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
case SetTimePartitionInterval:
return clusterSchemaInfo.setTimePartitionInterval(
(SetTimePartitionIntervalPlan) physicalPlan);
case SetTimePartitionOrigin:
return clusterSchemaInfo.setTimePartitionOrigin((SetTimePartitionOriginPlan) physicalPlan);
case CreateRegionGroups:
return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan);
case OfferRegionMaintainTasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
*/
public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) {
long[] removedTimePartitionSlots =
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot).stream()
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot, databaseName).stream()
.map(TTimePartitionSlot::getStartTime)
.collect(Collectors.toList())
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan;
Expand All @@ -53,6 +54,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
Expand Down Expand Up @@ -196,6 +198,10 @@ public TSStatus createDatabase(final DatabaseSchemaPlan plan) {
final TDatabaseSchema databaseSchema = plan.getSchema();
final PartialPath partialPathName = getQualifiedDatabasePartialPath(databaseSchema.getName());

// Update TimePartitionUtils cache with database-specific time partition settings
TimePartitionUtils.updateDatabaseTimePartitionConfig(
databaseSchema.getName(), databaseSchema);

final ConfigMTree mTree = databaseSchema.isIsTableModel() ? tableModelMTree : treeModelMTree;
mTree.setStorageGroup(partialPathName);

Expand Down Expand Up @@ -305,6 +311,9 @@ public TSStatus deleteDatabase(final DeleteDatabasePlan plan) {
(isTableModel ? tableModelMTree : treeModelMTree)
.deleteDatabase(getQualifiedDatabasePartialPath(plan.getName()));

// Remove database-specific time partition configuration from cache
TimePartitionUtils.removeDatabaseTimePartitionConfig(plan.getName());

result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final MetadataException e) {
LOGGER.warn("Database not exist", e);
Expand Down Expand Up @@ -482,6 +491,32 @@ public TSStatus setTimePartitionInterval(final SetTimePartitionIntervalPlan plan
return result;
}

public TSStatus setTimePartitionOrigin(final SetTimePartitionOriginPlan plan) {
final TSStatus result = new TSStatus();
databaseReadWriteLock.writeLock().lock();
try {
final ConfigMTree mTree =
PathUtils.isTableModelDatabase(plan.getDatabase()) ? tableModelMTree : treeModelMTree;
final PartialPath path = getQualifiedDatabasePartialPath(plan.getDatabase());
if (mTree.isDatabaseAlreadySet(path)) {
mTree
.getDatabaseNodeByDatabasePath(path)
.getAsMNode()
.getDatabaseSchema()
.setTimePartitionOrigin(plan.getTimePartitionOrigin());
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode());
}
} catch (final MetadataException e) {
LOGGER.error(ERROR_NAME, e);
result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME);
} finally {
databaseReadWriteLock.writeLock().unlock();
}
return result;
}

/**
* Adjust the maximum RegionGroup count of each Database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) {
}

if (localEarliestSlotStartTime
> TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot)) {
> TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot, database)) {
databasesWithLostDataPartition.add(database);
LOG.warn(
"[DataPartitionIntegrity] Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
Expand Down Expand Up @@ -194,6 +195,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionOriginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
Expand Down Expand Up @@ -449,15 +451,13 @@ public TSStatus alterDatabase(final TDatabaseSchema databaseSchema) {
if (databaseSchema.isSetTimePartitionOrigin()) {
errorResp =
new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
.setMessage(
"Failed to alter database. Doesn't support ALTER TimePartitionOrigin yet.");
.setMessage("Failed to alter database. Doesn't support ALTER TimePartitionOrigin.");
}

if (databaseSchema.isSetTimePartitionInterval()) {
errorResp =
new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
.setMessage(
"Failed to alter database. Doesn't support ALTER TimePartitionInterval yet.");
.setMessage("Failed to alter database. Doesn't support ALTER TimePartitionInterval.");
}

if (errorResp != null) {
Expand Down Expand Up @@ -513,6 +513,12 @@ public TSStatus setTimePartitionInterval(final TSetTimePartitionIntervalReq req)
new SetTimePartitionIntervalPlan(req.getDatabase(), req.getTimePartitionInterval()));
}

@Override
public TSStatus setTimePartitionOrigin(final TSetTimePartitionOriginReq req) throws TException {
return configManager.setTimePartitionOrigin(
new SetTimePartitionOriginPlan(req.getDatabase(), req.getTimePartitionOrigin()));
}

@Override
public TCountDatabaseResp countMatchedDatabases(final TGetDatabaseReq req) {
final PathPatternTree scope =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ public void AutoCleanPartitionTablePlan() throws IOException {
databaseTTLMap.put("root.db2", 3600L * 1000 * 24); // 1d_TTL
databaseTTLMap.put("root.db3", 3600L * 1000 * 24 * 30); // 1m_TTL
TTimePartitionSlot currentTimeSlot =
new TTimePartitionSlot(TimePartitionUtils.getTimePartitionSlot(System.currentTimeMillis()));
new TTimePartitionSlot(
TimePartitionUtils.getTimePartitionSlot(System.currentTimeMillis(), ""));
AutoCleanPartitionTablePlan req0 =
new AutoCleanPartitionTablePlan(databaseTTLMap, currentTimeSlot);
AutoCleanPartitionTablePlan req1 =
Expand Down
Loading