Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/content/primary-key-table/chain-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,67 @@ partition keys:

This treats `(dt, hour)` as the composite chain dimension and everything before it (e.g., `region`) as
the group dimension.

## Partition Expiration

Chain tables support automatic partition expiration via the standard `partition.expiration-time` option.
However, the expiration algorithm differs from normal tables to preserve chain integrity.

### How It Works

In a normal table, every partition older than the cutoff (`now - partition.expiration-time`) is dropped
independently. Chain tables cannot do this because a delta partition depends on its nearest earlier
snapshot partition as an anchor for merge-on-read. Dropping the anchor would break the chain.

Chain table expiration works in **segments**. A segment consists of one snapshot partition and all the
delta partitions whose time falls between that snapshot and the next snapshot in sorted order. The
segment is the atomic unit of expiration: either the entire segment is expired, or nothing in it is.

The algorithm per group:
1. List all snapshot branch partitions sorted by chain partition time.
2. Filter to those before the cutoff (`now - partition.expiration-time`).
3. If fewer than 2 snapshots are before the cutoff, nothing can be expired — the only one must be kept
as the anchor.
4. The most recent snapshot before the cutoff is the **anchor** (kept). All earlier snapshots and their
associated delta partitions form expirable segments.
5. Delta partitions are dropped before snapshot partitions so that the commit pre-check always passes.

For tables with group partitions, each group is processed independently. A group with many expired
snapshots can have segments expired while another group with only one snapshot before the cutoff retains
all of its data.

### Example

```sql
ALTER TABLE default.t SET TBLPROPERTIES (
'partition.expiration-time' = '30 d',
'partition.expiration-check-interval' = '1 d'
);
ALTER TABLE `default`.`t$branch_snapshot` SET TBLPROPERTIES (
'partition.expiration-time' = '30 d',
'partition.expiration-check-interval' = '1 d'
);
ALTER TABLE `default`.`t$branch_delta` SET TBLPROPERTIES (
'partition.expiration-time' = '30 d',
'partition.expiration-check-interval' = '1 d'
);
```

Suppose the snapshot branch has partitions `S(0101)`, `S(0201)`, `S(0301)` and the delta branch has
`D(0110)`, `D(0210)`, `D(0315)`. On `2025-03-31` with a 30-day retention the cutoff is `2025-03-01`:

- Snapshots before cutoff: `S(0101)`, `S(0201)`. Anchor = `S(0201)` (kept).
- Segment 1 expired: `S(0101)` + `D(0110)` (delta between `S(0101)` and `S(0201)`).
- Remaining: `S(0201)`, `S(0301)`, `D(0210)`, `D(0315)`.

### Important Notes

- **Delta-only groups are not expired.** If a group has delta partitions but no snapshot partition, its
deltas are the only copy of that group's data. Partition expiration will not touch them. They will
start to be expired once at least two snapshot partitions exist for the group and fall before the
cutoff.
- **Conflict detection is anchor-aware.** When `partition.expiration-strategy` is `values-time`, the
conflict detection during writes correctly recognizes that anchor partitions are retained and does not
reject writes to them.
- The `partition.expiration-time` and `partition.expiration-check-interval` options should be set
consistently across the main table and both branches.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.metastore.VisibilityWaitCallback;
import org.apache.paimon.operation.ChainTablePartitionExpire;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.NormalPartitionExpire;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
Expand All @@ -65,6 +67,7 @@
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChainTableUtils;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
Expand Down Expand Up @@ -440,6 +443,10 @@ public PartitionExpire newPartitionExpire(String commitUser, FileStoreTable tabl
return null;
}

if (options.isChainTable()) {
return newChainTablePartitionExpire(table);
}

return newPartitionExpire(
commitUser,
table,
Expand All @@ -459,12 +466,16 @@ public PartitionExpire newPartitionExpire(
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy expireStrategy) {
if (options.isChainTable()) {
return newChainTablePartitionExpire(table, expirationTime, checkInterval);
}

PartitionModification partitionModification = null;
if (options.partitionedTableInMetastore()) {
partitionModification = catalogEnvironment.partitionModification();
}

return new PartitionExpire(
return new NormalPartitionExpire(
expirationTime,
checkInterval,
expireStrategy,
Expand All @@ -476,6 +487,43 @@ public PartitionExpire newPartitionExpire(
options.partitionExpireBatchSize());
}

@Nullable
private ChainTablePartitionExpire newChainTablePartitionExpire(FileStoreTable table) {
Duration partitionExpireTime = options.partitionExpireTime();
if (partitionExpireTime == null) {
return null;
}
return newChainTablePartitionExpire(
table, partitionExpireTime, options.partitionExpireCheckInterval());
}

@Nullable
private ChainTablePartitionExpire newChainTablePartitionExpire(
FileStoreTable table, Duration expirationTime, Duration checkInterval) {
if (partitionType().getFieldCount() == 0) {
return null;
}
FileStoreTable primaryTable = ChainTableUtils.resolveChainPrimaryTable(table);
FileStoreTable snapshotTable =
primaryTable.switchToBranch(options.scanFallbackSnapshotBranch());
FileStoreTable deltaTable = primaryTable.switchToBranch(options.scanFallbackDeltaBranch());
PartitionModification partitionModification = null;
if (options.partitionedTableInMetastore()) {
partitionModification = catalogEnvironment.partitionModification();
}
return new ChainTablePartitionExpire(
expirationTime,
checkInterval,
snapshotTable,
deltaTable,
options,
partitionType(),
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum(),
options.partitionExpireBatchSize(),
partitionModification);
}

@Override
public TagAutoManager newTagAutoManager(FileStoreTable table) {
return TagAutoManager.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChainPartitionProjector;
import org.apache.paimon.utils.ChainTableUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,33 +113,59 @@ public void call(
partitionType,
table.schema().partitionKeys().toArray(new String[0]),
coreOptions.legacyPartitionName());
RecordComparator partitionComparator =
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());

List<String> chainKeys =
ChainTableUtils.chainPartitionKeys(coreOptions, table.schema().partitionKeys());
int chainFieldCount = chainKeys.size();
ChainPartitionProjector projector =
new ChainPartitionProjector(partitionType, chainFieldCount);
int groupFieldCount = projector.groupFieldCount();
RecordComparator chainComparator =
CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes());

List<BinaryRow> snapshotPartitions =
table.newSnapshotReader().partitionEntries().stream()
.map(PartitionEntry::partition)
.sorted(partitionComparator)
.collect(Collectors.toList());
SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (BinaryRow partition : changedPartitions) {
BinaryRow partitionGroup = projector.extractGroupPartition(partition);
BinaryRow partitionChain = projector.extractChainPartition(partition);

List<BinaryRow> sameGroupSnapshots =
filterSameGroup(snapshotPartitions, partitionGroup, projector);
sameGroupSnapshots.sort(
(a, b) ->
chainComparator.compare(
projector.extractChainPartition(a),
projector.extractChainPartition(b)));

Optional<BinaryRow> preSnapshotPartition =
findPreSnapshotPartition(snapshotPartitions, partition, partitionComparator);
findPreSnapshotInGroup(
sameGroupSnapshots, partitionChain, chainComparator, projector);
Optional<BinaryRow> nextSnapshotPartition =
findNextSnapshotPartition(snapshotPartitions, partition, partitionComparator);
findNextSnapshotInGroup(
sameGroupSnapshots, partitionChain, chainComparator, projector);

Predicate deltaFollowingPredicate =
ChainTableUtils.createTriangularPredicate(
partition, partitionConverter, builder::equal, builder::greaterThan);
ChainTableUtils.createGroupChainPredicate(
partition,
partitionConverter,
groupFieldCount,
builder::equal,
builder::greaterThan);
List<BinaryRow> deltaFollowingPartitions =
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
.partitionEntries().stream()
.map(PartitionEntry::partition)
.filter(
deltaPartition ->
isBeforeNextSnapshotPartition(
isBeforeNextSnapshotInGroup(
deltaPartition,
nextSnapshotPartition,
partitionComparator))
chainComparator,
projector))
.collect(Collectors.toList());
boolean canDrop =
deltaFollowingPartitions.isEmpty() || preSnapshotPartition.isPresent();
Expand All @@ -159,13 +187,26 @@ private boolean isPureDeleteCommit(
&& indexFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE);
}

private Optional<BinaryRow> findPreSnapshotPartition(
List<BinaryRow> snapshotPartitions,
BinaryRow partition,
RecordComparator partitionComparator) {
private List<BinaryRow> filterSameGroup(
List<BinaryRow> partitions, BinaryRow groupKey, ChainPartitionProjector projector) {
List<BinaryRow> result = new ArrayList<>();
for (BinaryRow partition : partitions) {
if (projector.extractGroupPartition(partition).equals(groupKey)) {
result.add(partition);
}
}
return result;
}

private Optional<BinaryRow> findPreSnapshotInGroup(
List<BinaryRow> sortedSameGroupPartitions,
BinaryRow targetChain,
RecordComparator chainComparator,
ChainPartitionProjector projector) {
BinaryRow pre = null;
for (BinaryRow snapshotPartition : snapshotPartitions) {
if (partitionComparator.compare(snapshotPartition, partition) < 0) {
for (BinaryRow snapshotPartition : sortedSameGroupPartitions) {
BinaryRow chain = projector.extractChainPartition(snapshotPartition);
if (chainComparator.compare(chain, targetChain) < 0) {
pre = snapshotPartition;
} else {
break;
Expand All @@ -174,24 +215,31 @@ private Optional<BinaryRow> findPreSnapshotPartition(
return Optional.ofNullable(pre);
}

private Optional<BinaryRow> findNextSnapshotPartition(
List<BinaryRow> snapshotPartitions,
BinaryRow partition,
RecordComparator partitionComparator) {
for (BinaryRow snapshotPartition : snapshotPartitions) {
if (partitionComparator.compare(snapshotPartition, partition) > 0) {
private Optional<BinaryRow> findNextSnapshotInGroup(
List<BinaryRow> sortedSameGroupPartitions,
BinaryRow targetChain,
RecordComparator chainComparator,
ChainPartitionProjector projector) {
for (BinaryRow snapshotPartition : sortedSameGroupPartitions) {
BinaryRow chain = projector.extractChainPartition(snapshotPartition);
if (chainComparator.compare(chain, targetChain) > 0) {
return Optional.of(snapshotPartition);
}
}
return Optional.empty();
}

private boolean isBeforeNextSnapshotPartition(
private boolean isBeforeNextSnapshotInGroup(
BinaryRow partition,
Optional<BinaryRow> nextSnapshotPartition,
RecordComparator partitionComparator) {
return !nextSnapshotPartition.isPresent()
|| partitionComparator.compare(partition, nextSnapshotPartition.get()) < 0;
RecordComparator chainComparator,
ChainPartitionProjector projector) {
if (!nextSnapshotPartition.isPresent()) {
return true;
}
BinaryRow partitionChain = projector.extractChainPartition(partition);
BinaryRow nextChain = projector.extractChainPartition(nextSnapshotPartition.get());
return chainComparator.compare(partitionChain, nextChain) < 0;
}

private String generatePartitionValues(
Expand Down
Loading
Loading