From e79ac90cbb454f0a81664bb035e0bb759143157d Mon Sep 17 00:00:00 2001 From: "wenchao.wu" Date: Wed, 1 Apr 2026 16:43:01 +0800 Subject: [PATCH] [core] chain table support special partition expire. --- docs/content/primary-key-table/chain-table.md | 64 ++ .../org/apache/paimon/AbstractFileStore.java | 50 +- .../ChainTableCommitPreCallback.java | 98 ++- .../operation/ChainTablePartitionExpire.java | 441 ++++++++++++ .../operation/NormalPartitionExpire.java | 228 ++++++ .../paimon/operation/PartitionExpire.java | 222 +----- .../paimon/schema/SchemaValidation.java | 8 + .../ChainTablePartitionExpireTest.java | 655 ++++++++++++++++++ .../paimon/operation/PartitionExpireTest.java | 18 +- .../flink/action/ExpirePartitionsAction.java | 6 +- .../procedure/ExpirePartitionsProcedure.java | 2 - 11 files changed, 1558 insertions(+), 234 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java diff --git a/docs/content/primary-key-table/chain-table.md b/docs/content/primary-key-table/chain-table.md index 839eb12fc954..aa3afd9f69b4 100644 --- a/docs/content/primary-key-table/chain-table.md +++ b/docs/content/primary-key-table/chain-table.md @@ -206,3 +206,67 @@ partition keys: This treats `(dt, hour)` as the composite chain dimension and everything before it (e.g., `region`) as the group dimension. + +## Partition Expiration + +Chain tables support automatic partition expiration via the standard `partition.expiration-time` option. +However, the expiration algorithm differs from normal tables to preserve chain integrity. + +### How It Works + +In a normal table, every partition older than the cutoff (`now - partition.expiration-time`) is dropped +independently. Chain tables cannot do this because a delta partition depends on its nearest earlier +snapshot partition as an anchor for merge-on-read. Dropping the anchor would break the chain. + +Chain table expiration works in **segments**. A segment consists of one snapshot partition and all the +delta partitions whose time falls between that snapshot and the next snapshot in sorted order. The +segment is the atomic unit of expiration: either the entire segment is expired, or nothing in it is. + +The algorithm per group: +1. List all snapshot branch partitions sorted by chain partition time. +2. Filter to those before the cutoff (`now - partition.expiration-time`). +3. If fewer than 2 snapshots are before the cutoff, nothing can be expired — the only one must be kept + as the anchor. +4. The most recent snapshot before the cutoff is the **anchor** (kept). All earlier snapshots and their + associated delta partitions form expirable segments. +5. Delta partitions are dropped before snapshot partitions so that the commit pre-check always passes. + +For tables with group partitions, each group is processed independently. A group with many expired +snapshots can have segments expired while another group with only one snapshot before the cutoff retains +all of its data. + +### Example + +```sql +ALTER TABLE default.t SET TBLPROPERTIES ( + 'partition.expiration-time' = '30 d', + 'partition.expiration-check-interval' = '1 d' +); +ALTER TABLE `default`.`t$branch_snapshot` SET TBLPROPERTIES ( + 'partition.expiration-time' = '30 d', + 'partition.expiration-check-interval' = '1 d' +); +ALTER TABLE `default`.`t$branch_delta` SET TBLPROPERTIES ( + 'partition.expiration-time' = '30 d', + 'partition.expiration-check-interval' = '1 d' +); +``` + +Suppose the snapshot branch has partitions `S(0101)`, `S(0201)`, `S(0301)` and the delta branch has +`D(0110)`, `D(0210)`, `D(0315)`. On `2025-03-31` with a 30-day retention the cutoff is `2025-03-01`: + +- Snapshots before cutoff: `S(0101)`, `S(0201)`. Anchor = `S(0201)` (kept). +- Segment 1 expired: `S(0101)` + `D(0110)` (delta between `S(0101)` and `S(0201)`). +- Remaining: `S(0201)`, `S(0301)`, `D(0210)`, `D(0315)`. + +### Important Notes + +- **Delta-only groups are not expired.** If a group has delta partitions but no snapshot partition, its + deltas are the only copy of that group's data. Partition expiration will not touch them. They will + start to be expired once at least two snapshot partitions exist for the group and fall before the + cutoff. +- **Conflict detection is anchor-aware.** When `partition.expiration-strategy` is `values-time`, the + conflict detection during writes correctly recognizes that anchor partitions are retained and does not + reject writes to them. +- The `partition.expiration-time` and `partition.expiration-check-interval` options should be set + consistently across the main table and both branches. diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 9aad0259cdf8..2f123cb250c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -38,10 +38,12 @@ import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback; import org.apache.paimon.metastore.TagPreviewCommitCallback; import org.apache.paimon.metastore.VisibilityWaitCallback; +import org.apache.paimon.operation.ChainTablePartitionExpire; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.ManifestsReader; +import org.apache.paimon.operation.NormalPartitionExpire; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; @@ -65,6 +67,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.tag.TagPreview; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChainTableUtils; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IndexFilePathFactories; @@ -440,6 +443,10 @@ public PartitionExpire newPartitionExpire(String commitUser, FileStoreTable tabl return null; } + if (options.isChainTable()) { + return newChainTablePartitionExpire(table); + } + return newPartitionExpire( commitUser, table, @@ -459,12 +466,16 @@ public PartitionExpire newPartitionExpire( Duration expirationTime, Duration checkInterval, PartitionExpireStrategy expireStrategy) { + if (options.isChainTable()) { + return newChainTablePartitionExpire(table, expirationTime, checkInterval); + } + PartitionModification partitionModification = null; if (options.partitionedTableInMetastore()) { partitionModification = catalogEnvironment.partitionModification(); } - return new PartitionExpire( + return new NormalPartitionExpire( expirationTime, checkInterval, expireStrategy, @@ -476,6 +487,43 @@ public PartitionExpire newPartitionExpire( options.partitionExpireBatchSize()); } + @Nullable + private ChainTablePartitionExpire newChainTablePartitionExpire(FileStoreTable table) { + Duration partitionExpireTime = options.partitionExpireTime(); + if (partitionExpireTime == null) { + return null; + } + return newChainTablePartitionExpire( + table, partitionExpireTime, options.partitionExpireCheckInterval()); + } + + @Nullable + private ChainTablePartitionExpire newChainTablePartitionExpire( + FileStoreTable table, Duration expirationTime, Duration checkInterval) { + if (partitionType().getFieldCount() == 0) { + return null; + } + FileStoreTable primaryTable = ChainTableUtils.resolveChainPrimaryTable(table); + FileStoreTable snapshotTable = + primaryTable.switchToBranch(options.scanFallbackSnapshotBranch()); + FileStoreTable deltaTable = primaryTable.switchToBranch(options.scanFallbackDeltaBranch()); + PartitionModification partitionModification = null; + if (options.partitionedTableInMetastore()) { + partitionModification = catalogEnvironment.partitionModification(); + } + return new ChainTablePartitionExpire( + expirationTime, + checkInterval, + snapshotTable, + deltaTable, + options, + partitionType(), + options.endInputCheckPartitionExpire(), + options.partitionExpireMaxNum(), + options.partitionExpireBatchSize(), + partitionModification); + } + @Override public TagAutoManager newTagAutoManager(FileStoreTable table) { return TagAutoManager.create( diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java index 0b26cb637540..03399a4fde77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.sink.CommitPreCallback; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChainPartitionProjector; import org.apache.paimon.utils.ChainTableUtils; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.RowDataToObjectArrayConverter; @@ -43,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -111,33 +113,59 @@ public void call( partitionType, table.schema().partitionKeys().toArray(new String[0]), coreOptions.legacyPartitionName()); - RecordComparator partitionComparator = - CodeGenUtils.newRecordComparator(partitionType.getFieldTypes()); + + List chainKeys = + ChainTableUtils.chainPartitionKeys(coreOptions, table.schema().partitionKeys()); + int chainFieldCount = chainKeys.size(); + ChainPartitionProjector projector = + new ChainPartitionProjector(partitionType, chainFieldCount); + int groupFieldCount = projector.groupFieldCount(); + RecordComparator chainComparator = + CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes()); + List snapshotPartitions = table.newSnapshotReader().partitionEntries().stream() .map(PartitionEntry::partition) - .sorted(partitionComparator) .collect(Collectors.toList()); SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader(); PredicateBuilder builder = new PredicateBuilder(partitionType); for (BinaryRow partition : changedPartitions) { + BinaryRow partitionGroup = projector.extractGroupPartition(partition); + BinaryRow partitionChain = projector.extractChainPartition(partition); + + List sameGroupSnapshots = + filterSameGroup(snapshotPartitions, partitionGroup, projector); + sameGroupSnapshots.sort( + (a, b) -> + chainComparator.compare( + projector.extractChainPartition(a), + projector.extractChainPartition(b))); + Optional preSnapshotPartition = - findPreSnapshotPartition(snapshotPartitions, partition, partitionComparator); + findPreSnapshotInGroup( + sameGroupSnapshots, partitionChain, chainComparator, projector); Optional nextSnapshotPartition = - findNextSnapshotPartition(snapshotPartitions, partition, partitionComparator); + findNextSnapshotInGroup( + sameGroupSnapshots, partitionChain, chainComparator, projector); + Predicate deltaFollowingPredicate = - ChainTableUtils.createTriangularPredicate( - partition, partitionConverter, builder::equal, builder::greaterThan); + ChainTableUtils.createGroupChainPredicate( + partition, + partitionConverter, + groupFieldCount, + builder::equal, + builder::greaterThan); List deltaFollowingPartitions = deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate) .partitionEntries().stream() .map(PartitionEntry::partition) .filter( deltaPartition -> - isBeforeNextSnapshotPartition( + isBeforeNextSnapshotInGroup( deltaPartition, nextSnapshotPartition, - partitionComparator)) + chainComparator, + projector)) .collect(Collectors.toList()); boolean canDrop = deltaFollowingPartitions.isEmpty() || preSnapshotPartition.isPresent(); @@ -159,13 +187,26 @@ private boolean isPureDeleteCommit( && indexFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE); } - private Optional findPreSnapshotPartition( - List snapshotPartitions, - BinaryRow partition, - RecordComparator partitionComparator) { + private List filterSameGroup( + List partitions, BinaryRow groupKey, ChainPartitionProjector projector) { + List result = new ArrayList<>(); + for (BinaryRow partition : partitions) { + if (projector.extractGroupPartition(partition).equals(groupKey)) { + result.add(partition); + } + } + return result; + } + + private Optional findPreSnapshotInGroup( + List sortedSameGroupPartitions, + BinaryRow targetChain, + RecordComparator chainComparator, + ChainPartitionProjector projector) { BinaryRow pre = null; - for (BinaryRow snapshotPartition : snapshotPartitions) { - if (partitionComparator.compare(snapshotPartition, partition) < 0) { + for (BinaryRow snapshotPartition : sortedSameGroupPartitions) { + BinaryRow chain = projector.extractChainPartition(snapshotPartition); + if (chainComparator.compare(chain, targetChain) < 0) { pre = snapshotPartition; } else { break; @@ -174,24 +215,31 @@ private Optional findPreSnapshotPartition( return Optional.ofNullable(pre); } - private Optional findNextSnapshotPartition( - List snapshotPartitions, - BinaryRow partition, - RecordComparator partitionComparator) { - for (BinaryRow snapshotPartition : snapshotPartitions) { - if (partitionComparator.compare(snapshotPartition, partition) > 0) { + private Optional findNextSnapshotInGroup( + List sortedSameGroupPartitions, + BinaryRow targetChain, + RecordComparator chainComparator, + ChainPartitionProjector projector) { + for (BinaryRow snapshotPartition : sortedSameGroupPartitions) { + BinaryRow chain = projector.extractChainPartition(snapshotPartition); + if (chainComparator.compare(chain, targetChain) > 0) { return Optional.of(snapshotPartition); } } return Optional.empty(); } - private boolean isBeforeNextSnapshotPartition( + private boolean isBeforeNextSnapshotInGroup( BinaryRow partition, Optional nextSnapshotPartition, - RecordComparator partitionComparator) { - return !nextSnapshotPartition.isPresent() - || partitionComparator.compare(partition, nextSnapshotPartition.get()) < 0; + RecordComparator chainComparator, + ChainPartitionProjector projector) { + if (!nextSnapshotPartition.isPresent()) { + return true; + } + BinaryRow partitionChain = projector.extractChainPartition(partition); + BinaryRow nextChain = projector.extractChainPartition(nextSnapshotPartition.get()); + return chainComparator.compare(partitionChain, nextChain) < 0; } private String generatePartitionValues( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java new file mode 100644 index 000000000000..d18eeee54346 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionModification; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChainPartitionProjector; +import org.apache.paimon.utils.ChainTableUtils; +import org.apache.paimon.utils.InternalRowPartitionComputer; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * Partition expiration for chain tables. + * + *

Chain tables store data across snapshot and delta branches. A delta partition depends on its + * nearest earlier snapshot partition as an anchor for merge-on-read. This class expires partitions + * in "segments" defined by consecutive snapshot partitions to maintain chain integrity. + * + *

A segment consists of one snapshot partition and all delta partitions whose time falls between + * that snapshot and the next snapshot in sorted order. The segment is the atomic unit of + * expiration: either the entire segment (snapshot + deltas) is expired, or nothing in it is. + * + *

Algorithm per group: + * + *

    + *
  1. List all snapshot branch partitions sorted by chain partition time. + *
  2. Filter to those before the cutoff ({@code now - expirationTime}). + *
  3. If fewer than 2 snapshots are before the cutoff, nothing can be expired (the last one must + * be kept as anchor). + *
  4. The most recent snapshot before the cutoff is the anchor (kept). All earlier snapshots form + * expirable segments together with their associated delta partitions. + *
  5. The number of segments expired is limited by {@code maxExpireNum}. + *
  6. Delta partitions are dropped first, then snapshot partitions, so that {@code + * ChainTableCommitPreCallback} validation passes. + *
+ */ +public class ChainTablePartitionExpire implements PartitionExpire { + + private static final Logger LOG = LoggerFactory.getLogger(ChainTablePartitionExpire.class); + + private final Duration expirationTime; + private final Duration checkInterval; + private final FileStoreTable snapshotTable; + private final FileStoreTable deltaTable; + private final PartitionTimeExtractor timeExtractor; + private final ChainPartitionProjector projector; + private final RecordComparator chainPartitionComparator; + private final InternalRowPartitionComputer partitionComputer; + private final List partitionKeys; + private final List chainPartitionKeys; + private final boolean endInputCheckPartitionExpire; + private final int maxExpireNum; + private final int expireBatchSize; + @Nullable private final PartitionModification partitionModification; + private LocalDateTime lastCheck; + + public ChainTablePartitionExpire( + Duration expirationTime, + Duration checkInterval, + FileStoreTable snapshotTable, + FileStoreTable deltaTable, + CoreOptions options, + RowType partitionType, + boolean endInputCheckPartitionExpire, + int maxExpireNum, + int expireBatchSize, + @Nullable PartitionModification partitionModification) { + this.expirationTime = expirationTime; + this.checkInterval = checkInterval; + this.snapshotTable = snapshotTable; + this.deltaTable = deltaTable; + this.partitionKeys = partitionType.getFieldNames(); + this.maxExpireNum = maxExpireNum; + this.expireBatchSize = expireBatchSize; + this.partitionModification = partitionModification; + + List allPartitionKeys = partitionType.getFieldNames(); + this.chainPartitionKeys = ChainTableUtils.chainPartitionKeys(options, allPartitionKeys); + int chainFieldCount = chainPartitionKeys.size(); + this.projector = new ChainPartitionProjector(partitionType, chainFieldCount); + this.chainPartitionComparator = + CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes()); + this.timeExtractor = + new PartitionTimeExtractor( + options.partitionTimestampPattern(), options.partitionTimestampFormatter()); + this.partitionComputer = + new InternalRowPartitionComputer( + options.partitionDefaultName(), + partitionType, + allPartitionKeys.toArray(new String[0]), + options.legacyPartitionName()); + this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; + + long rndSeconds = 0; + long checkIntervalSeconds = checkInterval.toMillis() / 1000; + if (checkIntervalSeconds > 0) { + rndSeconds = ThreadLocalRandom.current().nextLong(checkIntervalSeconds); + } + this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds); + } + + @Override + public List> expire(long commitIdentifier) { + return expire(LocalDateTime.now(), commitIdentifier); + } + + @Override + public boolean isValueExpiration() { + return true; + } + + @Override + public boolean isValueAllExpired(Collection partitions) { + return isValueAllExpired(partitions, LocalDateTime.now()); + } + + @VisibleForTesting + boolean isValueAllExpired(Collection partitions, LocalDateTime now) { + LocalDateTime expireDateTime = now.minus(expirationTime); + for (BinaryRow partition : partitions) { + LocalDateTime partTime = extractPartitionTime(partition); + if (partTime == null || !expireDateTime.isAfter(partTime)) { + return false; + } + } + + // All partitions are time-wise before cutoff, but chain table retains anchors + // (the most recent snapshot before cutoff per group) and their segment's deltas. + // Compute per-group retain boundary: partitions at or after the boundary are retained. + Map retainBoundary = computeGroupRetainBoundary(expireDateTime); + for (BinaryRow partition : partitions) { + BinaryRow groupKey = projector.extractGroupPartition(partition); + LocalDateTime boundary = retainBoundary.get(groupKey); + if (boundary == null) { + continue; + } + LocalDateTime partTime = extractPartitionTime(partition); + if (partTime != null && !partTime.isBefore(boundary)) { + return false; + } + } + return true; + } + + /** + * For each group that has snapshot partitions, compute the time boundary at or above which + * partitions are retained (not expired). Returns {@link LocalDateTime#MIN} for groups where + * fewer than 2 snapshots fall before the cutoff (nothing can be expired). Groups with no + * snapshot partitions at all (delta-only) are not included in the result — their orphan deltas + * before the cutoff are unconditionally expired. + */ + private Map computeGroupRetainBoundary(LocalDateTime cutoffTime) { + List snapshotEntries = snapshotTable.newSnapshotReader().partitionEntries(); + Map> groupedSnapshots = groupByGroupKey(snapshotEntries); + + Map boundaries = new HashMap<>(); + for (Map.Entry> entry : groupedSnapshots.entrySet()) { + BinaryRow groupKey = entry.getKey(); + int countBeforeCutoff = 0; + LocalDateTime latestBeforeCutoff = null; + for (BinaryRow snapshot : entry.getValue()) { + LocalDateTime time = extractPartitionTime(snapshot); + if (time != null && cutoffTime.isAfter(time)) { + countBeforeCutoff++; + if (latestBeforeCutoff == null || time.isAfter(latestBeforeCutoff)) { + latestBeforeCutoff = time; + } + } + } + if (countBeforeCutoff < 2) { + boundaries.put(groupKey, LocalDateTime.MIN); + } else { + boundaries.put(groupKey, latestBeforeCutoff); + } + } + return boundaries; + } + + @VisibleForTesting + void setLastCheck(LocalDateTime time) { + lastCheck = time; + } + + @VisibleForTesting + List> expire(LocalDateTime now, long commitIdentifier) { + if (checkInterval.isZero() + || now.isAfter(lastCheck.plus(checkInterval)) + || (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) { + List> expired = doExpire(now.minus(expirationTime)); + lastCheck = now; + return expired; + } + return null; + } + + private List> doExpire(LocalDateTime cutoffTime) { + List snapshotPartitions = + snapshotTable.newSnapshotReader().partitionEntries(); + List deltaPartitions = deltaTable.newSnapshotReader().partitionEntries(); + + Map> groupedSnapshots = groupByGroupKey(snapshotPartitions); + Map> groupedDeltas = groupByGroupKey(deltaPartitions); + + List snapshotPartitionsToExpire = new ArrayList<>(); + List deltaPartitionsToExpire = new ArrayList<>(); + + for (Map.Entry> entry : groupedSnapshots.entrySet()) { + BinaryRow groupKey = entry.getKey(); + List groupSnapshots = entry.getValue(); + + groupSnapshots.sort( + (a, b) -> + chainPartitionComparator.compare( + projector.extractChainPartition(a), + projector.extractChainPartition(b))); + + List snapshotsBeforeCutoff = new ArrayList<>(); + for (BinaryRow partition : groupSnapshots) { + LocalDateTime partTime = extractPartitionTime(partition); + if (partTime != null && cutoffTime.isAfter(partTime)) { + snapshotsBeforeCutoff.add(partition); + } + } + + if (snapshotsBeforeCutoff.size() < 2) { + continue; + } + + // Anchor = most recent snapshot before cutoff, kept as merge base + int anchorIndex = snapshotsBeforeCutoff.size() - 1; + + // Expirable snapshots: all before anchor, oldest first. + // Each forms a segment with its associated deltas. + int segmentsToExpire = Math.min(anchorIndex, maxExpireNum); + + List groupDeltas = groupedDeltas.get(groupKey); + + for (int i = 0; i < segmentsToExpire; i++) { + BinaryRow segmentSnapshot = snapshotsBeforeCutoff.get(i); + snapshotPartitionsToExpire.add(segmentSnapshot); + + if (groupDeltas != null) { + // Segment boundary: from this snapshot's time up to the next snapshot's time + LocalDateTime segmentStart = extractPartitionTime(segmentSnapshot); + BinaryRow nextSnapshot = snapshotsBeforeCutoff.get(i + 1); + LocalDateTime segmentEnd = extractPartitionTime(nextSnapshot); + + if (segmentStart != null && segmentEnd != null) { + for (BinaryRow deltaPartition : groupDeltas) { + LocalDateTime deltaTime = extractPartitionTime(deltaPartition); + if (deltaTime != null + && !deltaTime.isBefore(segmentStart) + && deltaTime.isBefore(segmentEnd)) { + deltaPartitionsToExpire.add(deltaPartition); + } + } + } + } + } + + // Also collect orphan deltas before the earliest expired snapshot + if (segmentsToExpire > 0 && groupDeltas != null) { + LocalDateTime firstSnapshotTime = + extractPartitionTime(snapshotsBeforeCutoff.get(0)); + if (firstSnapshotTime != null) { + for (BinaryRow deltaPartition : groupDeltas) { + LocalDateTime deltaTime = extractPartitionTime(deltaPartition); + if (deltaTime != null && deltaTime.isBefore(firstSnapshotTime)) { + deltaPartitionsToExpire.add(deltaPartition); + } + } + } + } + } + + if (snapshotPartitionsToExpire.isEmpty() && deltaPartitionsToExpire.isEmpty()) { + return new ArrayList<>(); + } + + List> deltaSpecs = toPartitionSpecs(deltaPartitionsToExpire); + List> snapshotSpecs = toPartitionSpecs(snapshotPartitionsToExpire); + List> allExpired = new ArrayList<>(); + + if (!deltaSpecs.isEmpty()) { + LOG.info("Chain table expire delta partitions: {}", deltaSpecs); + batchDropPartitions(deltaTable, deltaSpecs); + allExpired.addAll(deltaSpecs); + } + + if (!snapshotSpecs.isEmpty()) { + LOG.info("Chain table expire snapshot partitions: {}", snapshotSpecs); + batchDropPartitions(snapshotTable, snapshotSpecs); + allExpired.addAll(snapshotSpecs); + } + + return allExpired; + } + + private void batchDropPartitions( + FileStoreTable table, List> partitionSpecs) { + if (partitionModification != null) { + try { + if (expireBatchSize > 0 && expireBatchSize < partitionSpecs.size()) { + for (List> batch : + Lists.partition(partitionSpecs, expireBatchSize)) { + partitionModification.dropPartitions(batch); + partitionModification.dropPartitions(toDonePartitions(batch)); + } + } else { + partitionModification.dropPartitions(partitionSpecs); + partitionModification.dropPartitions(toDonePartitions(partitionSpecs)); + } + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } else { + if (expireBatchSize > 0 && expireBatchSize < partitionSpecs.size()) { + for (List> batch : + Lists.partition(partitionSpecs, expireBatchSize)) { + dropPartitions(table, batch); + } + } else { + dropPartitions(table, partitionSpecs); + } + } + } + + private List> toDonePartitions( + List> expiredPartitions) { + List> donePartitions = new ArrayList<>(expiredPartitions.size()); + for (Map partition : expiredPartitions) { + LinkedHashMap donePartition = new LinkedHashMap<>(partition); + Map.Entry lastEntry = null; + for (Map.Entry entry : donePartition.entrySet()) { + lastEntry = entry; + } + if (lastEntry != null) { + donePartition.put(lastEntry.getKey(), lastEntry.getValue() + ".done"); + donePartitions.add(donePartition); + } + } + return donePartitions; + } + + private Map> groupByGroupKey(List partitionEntries) { + Map> grouped = new LinkedHashMap<>(); + for (PartitionEntry entry : partitionEntries) { + BinaryRow fullPartition = entry.partition(); + BinaryRow groupKey = projector.extractGroupPartition(fullPartition); + grouped.computeIfAbsent(groupKey, k -> new ArrayList<>()).add(fullPartition); + } + return grouped; + } + + private LocalDateTime extractPartitionTime(BinaryRow partition) { + try { + LinkedHashMap partValues = + partitionComputer.generatePartValues(partition); + List chainValues = new ArrayList<>(); + for (String key : chainPartitionKeys) { + chainValues.add(partValues.get(key)); + } + return timeExtractor.extract(chainPartitionKeys, chainValues); + } catch (Exception e) { + LOG.warn("Failed to extract partition time from {}", partition, e); + return null; + } + } + + private List> toPartitionSpecs(List partitions) { + return partitions.stream() + .map( + p -> { + LinkedHashMap values = + partitionComputer.generatePartValues(p); + Map spec = new LinkedHashMap<>(); + for (String key : partitionKeys) { + String value = values.get(key); + if (value != null) { + spec.put(key, value); + } + } + return spec; + }) + .collect(Collectors.toList()); + } + + private void dropPartitions(FileStoreTable table, List> partitionSpecs) { + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { + commit.truncatePartitions(partitionSpecs); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to drop partitions from %s: %s.", table.name(), partitionSpecs), + e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java new file mode 100644 index 000000000000..5a23f538a9f3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.PartitionExpireStrategy; +import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy; +import org.apache.paimon.table.PartitionModification; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** Expire partitions. */ +public class NormalPartitionExpire implements PartitionExpire { + + private static final Logger LOG = LoggerFactory.getLogger(NormalPartitionExpire.class); + + private static final String DELIMITER = ","; + + private final Duration expirationTime; + private final Duration checkInterval; + private final FileStoreScan scan; + private final FileStoreCommit commit; + @Nullable private final PartitionModification partitionModification; + private LocalDateTime lastCheck; + private final PartitionExpireStrategy strategy; + private final boolean endInputCheckPartitionExpire; + private final int maxExpireNum; + private final int expireBatchSize; + + public NormalPartitionExpire( + Duration expirationTime, + Duration checkInterval, + PartitionExpireStrategy strategy, + FileStoreScan scan, + FileStoreCommit commit, + @Nullable PartitionModification partitionModification, + boolean endInputCheckPartitionExpire, + int maxExpireNum, + int expireBatchSize) { + this.expirationTime = expirationTime; + this.checkInterval = checkInterval; + this.strategy = strategy; + this.scan = scan; + this.commit = commit; + this.partitionModification = partitionModification; + // Avoid the execution time of stream jobs from being too short and preventing partition + // expiration + long rndSeconds = 0; + long checkIntervalSeconds = checkInterval.toMillis() / 1000; + if (checkIntervalSeconds > 0) { + rndSeconds = ThreadLocalRandom.current().nextLong(checkIntervalSeconds); + } + this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds); + this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; + this.maxExpireNum = maxExpireNum; + this.expireBatchSize = expireBatchSize; + } + + public NormalPartitionExpire( + Duration expirationTime, + Duration checkInterval, + PartitionExpireStrategy strategy, + FileStoreScan scan, + FileStoreCommit commit, + @Nullable PartitionModification partitionModification, + int maxExpireNum, + int expireBatchSize) { + this( + expirationTime, + checkInterval, + strategy, + scan, + commit, + partitionModification, + false, + maxExpireNum, + expireBatchSize); + } + + @Override + public List> expire(long commitIdentifier) { + return expire(LocalDateTime.now(), commitIdentifier); + } + + @Override + public boolean isValueExpiration() { + return strategy instanceof PartitionValuesTimeExpireStrategy; + } + + @Override + public boolean isValueAllExpired(Collection partitions) { + PartitionValuesTimeExpireStrategy valuesStrategy = + (PartitionValuesTimeExpireStrategy) strategy; + LocalDateTime expireDateTime = LocalDateTime.now().minus(expirationTime); + for (BinaryRow partition : partitions) { + if (!valuesStrategy.isExpired(expireDateTime, partition)) { + return false; + } + } + return true; + } + + @VisibleForTesting + void setLastCheck(LocalDateTime time) { + lastCheck = time; + } + + @VisibleForTesting + List> expire(LocalDateTime now, long commitIdentifier) { + if (checkInterval.isZero() + || now.isAfter(lastCheck.plus(checkInterval)) + || (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) { + List> expired = + doExpire(now.minus(expirationTime), commitIdentifier); + lastCheck = now; + return expired; + } + return null; + } + + private List> doExpire( + LocalDateTime expireDateTime, long commitIdentifier) { + List partitionEntries = + strategy.selectExpiredPartitions(scan, expireDateTime); + List> expiredPartValues = new ArrayList<>(partitionEntries.size()); + for (PartitionEntry partition : partitionEntries) { + Object[] array = strategy.convertPartition(partition.partition()); + expiredPartValues.add(strategy.toPartitionValue(array)); + } + + List> expired = new ArrayList<>(); + if (!expiredPartValues.isEmpty()) { + // convert partition value to partition string, and limit the partition num + expired = convertToPartitionString(expiredPartValues); + LOG.info("Expire Partitions: {}", expired); + if (expireBatchSize > 0 && expireBatchSize < expired.size()) { + Lists.partition(expired, expireBatchSize) + .forEach( + expiredBatchPartitions -> + doBatchExpire(expiredBatchPartitions, commitIdentifier)); + } else { + doBatchExpire(expired, commitIdentifier); + } + } + return expired; + } + + private void doBatchExpire( + List> expiredBatchPartitions, long commitIdentifier) { + if (partitionModification != null) { + try { + partitionModification.dropPartitions(expiredBatchPartitions); + // also drop corresponding .done partitions + partitionModification.dropPartitions(toDonePartitions(expiredBatchPartitions)); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } else { + // .done partitions only exist when partitionModification != null + // (metastore.partitioned-table = true), so no need to handle them here + commit.dropPartitions(expiredBatchPartitions, commitIdentifier); + } + } + + private List> toDonePartitions( + List> expiredPartitions) { + List> donePartitions = new ArrayList<>(expiredPartitions.size()); + for (Map partition : expiredPartitions) { + LinkedHashMap donePartition = new LinkedHashMap<>(partition); + // append .done suffix to the last partition field value + Map.Entry lastEntry = null; + for (Map.Entry entry : donePartition.entrySet()) { + lastEntry = entry; + } + if (lastEntry != null) { + donePartition.put(lastEntry.getKey(), lastEntry.getValue() + ".done"); + donePartitions.add(donePartition); + } + } + return donePartitions; + } + + private List> convertToPartitionString( + List> expiredPartValues) { + return expiredPartValues.stream() + .map(values -> String.join(DELIMITER, values)) + .sorted() + // Use split(DELIMITER, -1) to preserve trailing empty strings + .map(s -> s.split(DELIMITER, -1)) + .map(strategy::toPartitionString) + .limit(Math.min(expiredPartValues.size(), maxExpireNum)) + .collect(Collectors.toList()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 74b7850ed73d..bb1487735b27 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -18,208 +18,38 @@ package org.apache.paimon.operation; -import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.manifest.PartitionEntry; -import org.apache.paimon.partition.PartitionExpireStrategy; -import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy; -import org.apache.paimon.table.PartitionModification; - -import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; - -/** Expire partitions. */ -public class PartitionExpire { - - private static final Logger LOG = LoggerFactory.getLogger(PartitionExpire.class); - - private static final String DELIMITER = ","; - - private final Duration expirationTime; - private final Duration checkInterval; - private final FileStoreScan scan; - private final FileStoreCommit commit; - @Nullable private final PartitionModification partitionModification; - private LocalDateTime lastCheck; - private final PartitionExpireStrategy strategy; - private final boolean endInputCheckPartitionExpire; - private final int maxExpireNum; - private final int expireBatchSize; - - public PartitionExpire( - Duration expirationTime, - Duration checkInterval, - PartitionExpireStrategy strategy, - FileStoreScan scan, - FileStoreCommit commit, - @Nullable PartitionModification partitionModification, - boolean endInputCheckPartitionExpire, - int maxExpireNum, - int expireBatchSize) { - this.expirationTime = expirationTime; - this.checkInterval = checkInterval; - this.strategy = strategy; - this.scan = scan; - this.commit = commit; - this.partitionModification = partitionModification; - // Avoid the execution time of stream jobs from being too short and preventing partition - // expiration - long rndSeconds = 0; - long checkIntervalSeconds = checkInterval.toMillis() / 1000; - if (checkIntervalSeconds > 0) { - rndSeconds = ThreadLocalRandom.current().nextLong(checkIntervalSeconds); - } - this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds); - this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; - this.maxExpireNum = maxExpireNum; - this.expireBatchSize = expireBatchSize; - } - - public PartitionExpire( - Duration expirationTime, - Duration checkInterval, - PartitionExpireStrategy strategy, - FileStoreScan scan, - FileStoreCommit commit, - @Nullable PartitionModification partitionModification, - int maxExpireNum, - int expireBatchSize) { - this( - expirationTime, - checkInterval, - strategy, - scan, - commit, - partitionModification, - false, - maxExpireNum, - expireBatchSize); - } - - public List> expire(long commitIdentifier) { - return expire(LocalDateTime.now(), commitIdentifier); - } - - public boolean isValueExpiration() { - return strategy instanceof PartitionValuesTimeExpireStrategy; - } - public boolean isValueAllExpired(Collection partitions) { - PartitionValuesTimeExpireStrategy valuesStrategy = - (PartitionValuesTimeExpireStrategy) strategy; - LocalDateTime expireDateTime = LocalDateTime.now().minus(expirationTime); - for (BinaryRow partition : partitions) { - if (!valuesStrategy.isExpired(expireDateTime, partition)) { - return false; - } - } - return true; - } - - @VisibleForTesting - void setLastCheck(LocalDateTime time) { - lastCheck = time; - } - - @VisibleForTesting - List> expire(LocalDateTime now, long commitIdentifier) { - if (checkInterval.isZero() - || now.isAfter(lastCheck.plus(checkInterval)) - || (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) { - List> expired = - doExpire(now.minus(expirationTime), commitIdentifier); - lastCheck = now; - return expired; - } - return null; - } - - private List> doExpire( - LocalDateTime expireDateTime, long commitIdentifier) { - List partitionEntries = - strategy.selectExpiredPartitions(scan, expireDateTime); - List> expiredPartValues = new ArrayList<>(partitionEntries.size()); - for (PartitionEntry partition : partitionEntries) { - Object[] array = strategy.convertPartition(partition.partition()); - expiredPartValues.add(strategy.toPartitionValue(array)); - } - - List> expired = new ArrayList<>(); - if (!expiredPartValues.isEmpty()) { - // convert partition value to partition string, and limit the partition num - expired = convertToPartitionString(expiredPartValues); - LOG.info("Expire Partitions: {}", expired); - if (expireBatchSize > 0 && expireBatchSize < expired.size()) { - Lists.partition(expired, expireBatchSize) - .forEach( - expiredBatchPartitions -> - doBatchExpire(expiredBatchPartitions, commitIdentifier)); - } else { - doBatchExpire(expired, commitIdentifier); - } - } - return expired; - } - - private void doBatchExpire( - List> expiredBatchPartitions, long commitIdentifier) { - if (partitionModification != null) { - try { - partitionModification.dropPartitions(expiredBatchPartitions); - // also drop corresponding .done partitions - partitionModification.dropPartitions(toDonePartitions(expiredBatchPartitions)); - } catch (Catalog.TableNotExistException e) { - throw new RuntimeException(e); - } - } else { - // .done partitions only exist when partitionModification != null - // (metastore.partitioned-table = true), so no need to handle them here - commit.dropPartitions(expiredBatchPartitions, commitIdentifier); - } - } - - private List> toDonePartitions( - List> expiredPartitions) { - List> donePartitions = new ArrayList<>(expiredPartitions.size()); - for (Map partition : expiredPartitions) { - LinkedHashMap donePartition = new LinkedHashMap<>(partition); - // append .done suffix to the last partition field value - Map.Entry lastEntry = null; - for (Map.Entry entry : donePartition.entrySet()) { - lastEntry = entry; - } - if (lastEntry != null) { - donePartition.put(lastEntry.getKey(), lastEntry.getValue() + ".done"); - donePartitions.add(donePartition); - } - } - return donePartitions; - } - - private List> convertToPartitionString( - List> expiredPartValues) { - return expiredPartValues.stream() - .map(values -> String.join(DELIMITER, values)) - .sorted() - // Use split(DELIMITER, -1) to preserve trailing empty strings - .map(s -> s.split(DELIMITER, -1)) - .map(strategy::toPartitionString) - .limit(Math.min(expiredPartValues.size(), maxExpireNum)) - .collect(Collectors.toList()); - } +/** + * Common interface for partition expiration strategies. + * + *

Implementations include {@link NormalPartitionExpire} for normal tables and {@link + * ChainTablePartitionExpire} for chain tables that require segment-based expiration across snapshot + * and delta branches. + */ +public interface PartitionExpire { + + /** + * Expire partitions that are older than the configured expiration time. + * + * @return the list of expired partition specs, or null if the check interval has not elapsed + */ + @Nullable + List> expire(long commitIdentifier); + + /** Whether this expiration uses values-time strategy. */ + boolean isValueExpiration(); + + /** + * Check whether all given partitions are expired according to the values-time strategy. + * + *

Only valid when {@link #isValueExpiration()} returns true. + */ + boolean isValueAllExpired(Collection partitions); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 271709c47ef5..8dbd35acbf56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -805,6 +805,14 @@ public static void validateChainTable(TableSchema schema, CoreOptions options) { options.partitionTimestampFormatter() != null, "Partition timestamp formatter is required for chain table."); + if (options.partitionExpireTime() != null) { + Preconditions.checkArgument( + "values-time".equals(options.partitionExpireStrategy()), + "Chain table only supports 'values-time' partition expiration strategy, " + + "but found '%s'.", + options.partitionExpireStrategy()); + } + // validate chain-table.chain-partition-keys List chainPartKeys = options.chainTableChainPartitionKeys(); if (chainPartKeys != null) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java new file mode 100644 index 000000000000..4ece950551fd --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java @@ -0,0 +1,655 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ChainTablePartitionExpire}. */ +public class ChainTablePartitionExpireTest { + + @TempDir java.nio.file.Path tempDir; + + private String commitUser; + + @BeforeEach + public void before() { + commitUser = UUID.randomUUID().toString(); + } + + @Test + public void testExpireWithSinglePartitionKey() throws Exception { + Path tablePath = tablePath("simple_expire"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + write(snapshotTable, "20250101", "v1"); + write(snapshotTable, "20250201", "v2"); + write(snapshotTable, "20250301", "v3"); + + write(deltaTable, "20250110", "v4"); + write(deltaTable, "20250115", "v5"); + write(deltaTable, "20250210", "v6"); + write(deltaTable, "20250215", "v7"); + write(deltaTable, "20250315", "v8"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + assertThat(listPartitions(snapshotTable)) + .containsExactlyInAnyOrder("20250101", "20250201", "20250301"); + assertThat(listPartitions(deltaTable)) + .containsExactlyInAnyOrder( + "20250110", "20250115", "20250210", "20250215", "20250315"); + + // cutoff = 2025-03-31 - 20d = 2025-03-11 + // Snapshots before cutoff: 20250101, 20250201, 20250301 (3 snapshots) + // Anchor = 20250301 (kept), expire 2 segments: + // Segment0: S(20250101), d(20250110), d(20250115) + // Segment1: S(20250201), d(20250210), d(20250215) + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(20), false); + expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0)); + List> expired = + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + assertThat(expired).isNotNull(); + assertThat(expired).isNotEmpty(); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250301"); + assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250315"); + } + + @Test + public void testNoExpireWhenOnlyOneSnapshotBeforeCutoff() throws Exception { + Path tablePath = tablePath("no_expire_one_snapshot"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + write(snapshotTable, "20250201", "v1"); + write(snapshotTable, "20250315", "v2"); + write(deltaTable, "20250205", "v3"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), false); + expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0)); + List> expired = + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + assertThat(expired).isEmpty(); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250201", "20250315"); + assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250205"); + } + + @Test + public void testExpireMultipleSegments() throws Exception { + Path tablePath = tablePath("multi_segments"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + write(snapshotTable, "20250101", "v1"); + write(snapshotTable, "20250115", "v2"); + write(snapshotTable, "20250201", "v3"); + write(snapshotTable, "20250315", "v4"); + + write(deltaTable, "20250105", "v5"); + write(deltaTable, "20250120", "v6"); + write(deltaTable, "20250210", "v7"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // cutoff = 2025-03-31 - 30d = 2025-03-01 + // Snapshots before cutoff: 20250101, 20250115, 20250201 (3 snapshots) + // Anchor = 20250201 (kept), expire S(20250101), S(20250115) + // Delta before anchor: d(20250105), d(20250120) + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), false); + expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0)); + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250201", "20250315"); + assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250210"); + } + + @Test + public void testNoExpireWhenNoSnapshotsBeforeCutoff() throws Exception { + Path tablePath = tablePath("no_expire_no_snapshot"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + write(snapshotTable, "20250315", "v1"); + write(snapshotTable, "20250320", "v2"); + write(deltaTable, "20250316", "v3"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // cutoff = 2025-03-31 - 30d = 2025-03-01 + // No snapshots before cutoff + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), false); + expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0)); + List> expired = + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + assertThat(expired).isEmpty(); + } + + @Test + public void testCheckIntervalPreventsExpire() throws Exception { + Path tablePath = tablePath("check_interval"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + write(snapshotTable, "20250101", "v1"); + write(snapshotTable, "20250201", "v2"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + ChainTablePartitionExpire expire = + new ChainTablePartitionExpire( + Duration.ofDays(30), + Duration.ofDays(1), + snapshotTable, + deltaTable, + CoreOptions.fromMap(buildOptions(Duration.ofDays(30), false)), + snapshotTable.schema().logicalPartitionType(), + false, + Integer.MAX_VALUE, + 0, + null); + expire.setLastCheck(LocalDateTime.of(2025, 3, 31, 0, 0)); + + List> expired = + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + assertThat(expired).isNull(); + } + + @Test + public void testMaxExpireNumLimitsSegments() throws Exception { + Path tablePath = tablePath("max_expire_num"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + // Snapshots: S(0101), S(0115), S(0201), S(0315) + // cutoff = 03-01, anchor = S(0201) + // Segments to expire: Segment1={S(0101), d(0105)}, Segment2={S(0115), d(0120)} + write(snapshotTable, "20250101", "v1"); + write(snapshotTable, "20250115", "v2"); + write(snapshotTable, "20250201", "v3"); + write(snapshotTable, "20250315", "v4"); + + write(deltaTable, "20250105", "v5"); + write(deltaTable, "20250120", "v6"); + write(deltaTable, "20250210", "v7"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // maxExpireNum=1 means only 1 segment: Segment1={S(0101), d(0105)} + ChainTablePartitionExpire expire = + new ChainTablePartitionExpire( + Duration.ofDays(30), + Duration.ZERO, + snapshotTable, + deltaTable, + CoreOptions.fromMap(buildOptions(Duration.ofDays(30), false)), + snapshotTable.schema().logicalPartitionType(), + false, + 1, + 0, + null); + expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0)); + List> expired = + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + assertThat(expired).isNotNull(); + // 1 segment = S(0101) + d(0105) = 2 partitions + assertThat(expired).hasSize(2); + + // Verify: S(0101) expired, S(0115) still exists (not expired, was in segment 2) + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + assertThat(listPartitions(snapshotTable)) + .containsExactlyInAnyOrder("20250115", "20250201", "20250315"); + // d(0105) expired, d(0120) and d(0210) kept + assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250120", "20250210"); + } + + @Test + public void testExpireWithGroupPartition() throws Exception { + Path tablePath = tablePath("group_partition"); + createChainTable(tablePath, true); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + // Group "US": snapshots 0101, 0201, 0301 + writeGrouped(snapshotTable, "US", "20250101", "v1"); + writeGrouped(snapshotTable, "US", "20250201", "v2"); + writeGrouped(snapshotTable, "US", "20250301", "v3"); + // Group "US": deltas 0110, 0210 + writeGrouped(deltaTable, "US", "20250110", "d1"); + writeGrouped(deltaTable, "US", "20250210", "d2"); + + // Group "EU": only one snapshot before cutoff, so nothing should expire + writeGrouped(snapshotTable, "EU", "20250215", "v4"); + writeGrouped(snapshotTable, "EU", "20250320", "v5"); + writeGrouped(deltaTable, "EU", "20250220", "d3"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // cutoff = 2025-03-31 - 30d = 2025-03-01 + // Group "US": snapshots before cutoff = [0101, 0201]. Anchor = 0201 (kept). + // Expire: S(0101), delta(0110) (before anchor 0201) + // Keep: S(0201), S(0301), delta(0210) + // Group "EU": snapshots before cutoff = [0215] (only 1). Nothing expired. + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), true); + expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0)); + List> expired = + expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE); + + assertThat(expired).isNotNull(); + assertThat(expired).hasSize(2); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + List snapshotParts = listGroupedPartitions(snapshotTable); + List deltaParts = listGroupedPartitions(deltaTable); + + assertThat(snapshotParts).contains("US|20250201", "US|20250301"); + assertThat(snapshotParts).doesNotContain("US|20250101"); + assertThat(snapshotParts).contains("EU|20250215", "EU|20250320"); + + assertThat(deltaParts).contains("US|20250210"); + assertThat(deltaParts).doesNotContain("US|20250110"); + assertThat(deltaParts).contains("EU|20250220"); + } + + @Test + public void testIsValueAllExpiredReturnsFalseForAnchor() throws Exception { + Path tablePath = tablePath("value_expired_anchor"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + // S(0101), S(0201), S(0301) + write(snapshotTable, "20250101", "v1"); + write(snapshotTable, "20250201", "v2"); + write(snapshotTable, "20250301", "v3"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // expirationTime = 30d, "now" = 2025-03-31 → cutoff = 2025-03-01 + // Snapshots before cutoff: S(0101), S(0201). Anchor = S(0201) (kept). + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), false); + LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0); + + BinaryRow anchor0201 = findPartition(snapshotTable, "20250201"); + BinaryRow expired0101 = findPartition(snapshotTable, "20250101"); + + // Anchor partition alone → not all expired (anchor is retained) + assertThat(expire.isValueAllExpired(Collections.singletonList(anchor0201), now)).isFalse(); + + // Truly expired partition alone → all expired + assertThat(expire.isValueAllExpired(Collections.singletonList(expired0101), now)).isTrue(); + + // Mix of anchor + expired → not all expired + assertThat(expire.isValueAllExpired(Arrays.asList(expired0101, anchor0201), now)).isFalse(); + } + + @Test + public void testIsValueAllExpiredReturnsFalseWhenTooFewSnapshots() throws Exception { + Path tablePath = tablePath("value_expired_few_snapshots"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + // Only 1 snapshot before cutoff + write(snapshotTable, "20250201", "v1"); + write(snapshotTable, "20250315", "v2"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // cutoff = 2025-03-31 - 30d = 2025-03-01 + // Only S(0201) before cutoff → < 2, nothing can expire + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), false); + LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0); + + BinaryRow partition0201 = findPartition(snapshotTable, "20250201"); + assertThat(expire.isValueAllExpired(Collections.singletonList(partition0201), now)) + .isFalse(); + } + + @Test + public void testIsValueAllExpiredWithGroupPartitions() throws Exception { + Path tablePath = tablePath("value_expired_groups"); + createChainTable(tablePath, true); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + // Group "US": 3 snapshots, anchor = S(US,0201) + writeGrouped(snapshotTable, "US", "20250101", "v1"); + writeGrouped(snapshotTable, "US", "20250201", "v2"); + writeGrouped(snapshotTable, "US", "20250301", "v3"); + + // Group "EU": only 1 snapshot before cutoff → nothing expires + writeGrouped(snapshotTable, "EU", "20250215", "v4"); + writeGrouped(snapshotTable, "EU", "20250320", "v5"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // cutoff = 2025-03-31 - 30d = 2025-03-01 + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), true); + LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0); + + BinaryRow usExpired = findGroupedPartition(snapshotTable, "US", "20250101"); + BinaryRow usAnchor = findGroupedPartition(snapshotTable, "US", "20250201"); + BinaryRow euRetained = findGroupedPartition(snapshotTable, "EU", "20250215"); + + // US expired partition → truly expired + assertThat(expire.isValueAllExpired(Collections.singletonList(usExpired), now)).isTrue(); + + // US anchor → retained + assertThat(expire.isValueAllExpired(Collections.singletonList(usAnchor), now)).isFalse(); + + // EU partition (< 2 snapshots before cutoff) → retained + assertThat(expire.isValueAllExpired(Collections.singletonList(euRetained), now)).isFalse(); + + // Mix across groups: US expired + EU retained + assertThat(expire.isValueAllExpired(Arrays.asList(usExpired, euRetained), now)).isFalse(); + } + + @Test + public void testIsValueAllExpiredReturnsFalseForPartitionsAfterCutoff() throws Exception { + Path tablePath = tablePath("value_expired_after_cutoff"); + createChainTable(tablePath, false); + FileStoreTable mainTable = loadTable(tablePath); + FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot"); + FileStoreTable deltaTable = mainTable.switchToBranch("delta"); + + write(snapshotTable, "20250101", "v1"); + write(snapshotTable, "20250315", "v2"); + + snapshotTable = loadTable(tablePath).switchToBranch("snapshot"); + deltaTable = loadTable(tablePath).switchToBranch("delta"); + + // cutoff = 2025-03-31 - 30d = 2025-03-01 + // S(0315) is after cutoff → not expired at all + ChainTablePartitionExpire expire = + newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), false); + LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0); + + BinaryRow afterCutoff = findPartition(snapshotTable, "20250315"); + assertThat(expire.isValueAllExpired(Collections.singletonList(afterCutoff), now)).isFalse(); + } + + // ========== Helper methods ========== + + private BinaryRow findPartition(FileStoreTable table, String dtValue) { + return table.newSnapshotReader().partitionEntries().stream() + .map(PartitionEntry::partition) + .filter(p -> p.getString(0).toString().equals(dtValue)) + .findFirst() + .orElseThrow(() -> new RuntimeException("Partition " + dtValue + " not found")); + } + + private BinaryRow findGroupedPartition(FileStoreTable table, String region, String dt) { + return table.newSnapshotReader().partitionEntries().stream() + .map(PartitionEntry::partition) + .filter( + p -> + p.getString(0).toString().equals(region) + && p.getString(1).toString().equals(dt)) + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Partition " + region + "|" + dt + " not found")); + } + + private Path tablePath(String tableName) { + return new Path(tempDir.toUri().toString(), tableName); + } + + private void createChainTable(Path tablePath, boolean withGroupPartition) throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); + + Map options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("merge-engine", "deduplicate"); + options.put("sequence.field", "v"); + + Schema schema; + if (withGroupPartition) { + schema = + new Schema( + RowType.of( + new org.apache.paimon.types.DataType[] { + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"region", "dt", "pk", "v"}) + .getFields(), + Arrays.asList("region", "dt"), + Arrays.asList("pk", "region", "dt"), + options, + ""); + } else { + schema = + new Schema( + RowType.of( + new org.apache.paimon.types.DataType[] { + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"dt", "pk", "v"}) + .getFields(), + Collections.singletonList("dt"), + Arrays.asList("pk", "dt"), + options, + ""); + } + schemaManager.createTable(schema); + + FileStoreTable mainTable = loadTable(tablePath); + mainTable.createBranch("snapshot"); + mainTable.createBranch("delta"); + + List chainTableOptions = + Arrays.asList( + SchemaChange.setOption("chain-table.enabled", "true"), + SchemaChange.setOption("scan.fallback-snapshot-branch", "snapshot"), + SchemaChange.setOption("scan.fallback-delta-branch", "delta"), + SchemaChange.setOption("partition.timestamp-pattern", "$dt"), + SchemaChange.setOption("partition.timestamp-formatter", "yyyyMMdd")); + if (withGroupPartition) { + chainTableOptions = new java.util.ArrayList<>(chainTableOptions); + chainTableOptions.add(SchemaChange.setOption("chain-table.chain-partition-keys", "dt")); + } + schemaManager.commitChanges(chainTableOptions); + new SchemaManager(fileIO, tablePath, "snapshot").commitChanges(chainTableOptions); + new SchemaManager(fileIO, tablePath, "delta").commitChanges(chainTableOptions); + } + + private FileStoreTable loadTable(Path tablePath) { + LocalFileIO fileIO = LocalFileIO.create(); + Options options = new Options(); + options.set(CoreOptions.PATH, tablePath.toString()); + String branchName = CoreOptions.branch(options.toMap()); + TableSchema tableSchema = new SchemaManager(fileIO, tablePath, branchName).latest().get(); + return FileStoreTableFactory.create( + fileIO, tablePath, tableSchema, CatalogEnvironment.empty()); + } + + private void write(FileStoreTable table, String dt, String v) throws Exception { + StreamTableWrite write = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")) + .newWrite(commitUser); + write.write( + GenericRow.of( + BinaryString.fromString(dt), + BinaryString.fromString(v), + BinaryString.fromString(v))); + TableCommitImpl commit = table.newCommit(commitUser); + List commitMessages = write.prepareCommit(true, 0); + commit.commit(0, commitMessages); + write.close(); + commit.close(); + } + + private void writeGrouped(FileStoreTable table, String region, String dt, String v) + throws Exception { + StreamTableWrite write = + table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")) + .newWrite(commitUser); + write.write( + GenericRow.of( + BinaryString.fromString(region), + BinaryString.fromString(dt), + BinaryString.fromString(v), + BinaryString.fromString(v))); + TableCommitImpl commit = table.newCommit(commitUser); + List commitMessages = write.prepareCommit(true, 0); + commit.commit(0, commitMessages); + write.close(); + commit.close(); + } + + private List listPartitions(FileStoreTable table) { + return table.newSnapshotReader().partitionEntries().stream() + .map(PartitionEntry::partition) + .map(p -> p.getString(0).toString()) + .sorted() + .collect(Collectors.toList()); + } + + private List listGroupedPartitions(FileStoreTable table) { + return table.newSnapshotReader().partitionEntries().stream() + .map(PartitionEntry::partition) + .map(p -> p.getString(0).toString() + "|" + p.getString(1).toString()) + .sorted() + .collect(Collectors.toList()); + } + + private Map buildOptions(Duration expirationTime, boolean withGroupPartition) { + Map opts = new HashMap<>(); + opts.put("partition.timestamp-pattern", "$dt"); + opts.put("partition.timestamp-formatter", "yyyyMMdd"); + opts.put("scan.fallback-snapshot-branch", "snapshot"); + opts.put("scan.fallback-delta-branch", "delta"); + opts.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), expirationTime.toDays() + " d"); + if (withGroupPartition) { + opts.put("chain-table.chain-partition-keys", "dt"); + } + return opts; + } + + private ChainTablePartitionExpire newChainExpire( + FileStoreTable snapshotTable, + FileStoreTable deltaTable, + Duration expirationTime, + boolean withGroupPartition) { + return new ChainTablePartitionExpire( + expirationTime, + Duration.ZERO, + snapshotTable, + deltaTable, + CoreOptions.fromMap(buildOptions(expirationTime, withGroupPartition)), + snapshotTable.schema().logicalPartitionType(), + false, + Integer.MAX_VALUE, + 0, + null); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java index fd406dfad373..9887d3c28cb9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java @@ -85,7 +85,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link PartitionExpire}. */ +/** Test for {@link NormalPartitionExpire}. */ public class PartitionExpireTest { @TempDir java.nio.file.Path tempDir; @@ -189,7 +189,7 @@ public void testIllegalPartition() throws Exception { write("20230103", "31"); write("20230103", "32"); write("20230105", "51"); - PartitionExpire expire = newExpire(); + NormalPartitionExpire expire = newExpire(); expire.setLastCheck(date(1)); Assertions.assertDoesNotThrow(() -> expire.expire(date(8), Long.MAX_VALUE)); assertThat(read()).containsExactlyInAnyOrder("abcd:12"); @@ -215,7 +215,7 @@ public void testBatchExpire() throws Exception { write("20230103", "31"); write("20230103", "32"); write("20230105", "51"); - PartitionExpire expire = newExpire(); + NormalPartitionExpire expire = newExpire(); expire.setLastCheck(date(1)); Assertions.assertDoesNotThrow(() -> expire.expire(date(8), Long.MAX_VALUE)); @@ -246,7 +246,7 @@ public void testExpireWithNullOrEmptyPartition() throws Exception { write("20230103", "32"); write("20230105", "51"); - PartitionExpire expire = newExpire(); + NormalPartitionExpire expire = newExpire(); expire.setLastCheck(date(1)); Assertions.assertDoesNotThrow(() -> expire.expire(date(6), Long.MAX_VALUE)); @@ -272,7 +272,7 @@ public void test() throws Exception { write("20230103", "32"); write("20230105", "51"); - PartitionExpire expire = newExpire(); + NormalPartitionExpire expire = newExpire(); expire.setLastCheck(date(1)); expire.expire(date(3), Long.MAX_VALUE); @@ -319,7 +319,7 @@ public void testDonePartitionExpire() throws Exception { doneAction.markDone("f0=20230103"); doneAction.markDone("f0=20230108"); - PartitionExpire expire = newExpire(); + NormalPartitionExpire expire = newExpire(); expire.setLastCheck(date(1)); expire.expire(date(8), Long.MAX_VALUE); @@ -423,7 +423,7 @@ public void testDeleteExpiredPartition() throws Exception { List commitMessages = write("20230101", "11"); write("20230105", "51"); - PartitionExpire expire = newExpire(); + NormalPartitionExpire expire = newExpire(); expire.setLastCheck(date(1)); expire.expire(date(5), Long.MAX_VALUE); assertThat(read()).containsExactlyInAnyOrder("20230105:51"); @@ -471,9 +471,9 @@ private List write(String f0, String f1) throws Exception { return commitMessages; } - private PartitionExpire newExpire() { + private NormalPartitionExpire newExpire() { FileStoreTable table = newExpireTable(); - return table.store().newPartitionExpire("", table); + return (NormalPartitionExpire) table.store().newPartitionExpire("", table); } private FileStoreTable newExpireTable() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java index ff4cb93426f7..7efb65d2d61d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.TimeUtils; import java.time.Duration; @@ -64,6 +65,7 @@ public ExpirePartitionsAction( public void executeLocally() throws Exception { FileStoreTable fileStoreTable = (FileStoreTable) table; FileStore fileStore = fileStoreTable.store(); + PartitionExpire partitionExpire = fileStore.newPartitionExpire( "", @@ -76,7 +78,9 @@ public void executeLocally() throws Exception { catalogLoader(), new Identifier( identifier.getDatabaseName(), identifier.getTableName()))); - + Preconditions.checkNotNull( + partitionExpire, + "Both the partition expiration time and partition field can not be null."); partitionExpire.expire(Long.MAX_VALUE); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index 8a6528bd0229..ecddcf11e16e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -90,11 +90,9 @@ public String identifier() { FileStore fileStore = fileStoreTable.store(); PartitionExpire partitionExpire = fileStore.newPartitionExpire("", fileStoreTable); - Preconditions.checkNotNull( partitionExpire, "Both the partition expiration time and partition field can not be null."); - List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() ? new Row[] {Row.of("No expired partitions.")}