From 4b45ca43cc81aafb748d247cbfadcfc0c01a9fef Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 1 Jun 2026 16:20:23 +0700 Subject: [PATCH] Core: Add branch support for RewriteManifests operation RewriteManifests was the only SnapshotUpdate that could not target a named branch: BaseRewriteManifests inherited the default SnapshotUpdate.toBranch, which throws UnsupportedOperationException. Even if that were unblocked, apply() read manifests from base.currentSnapshot() (always the main tip) instead of the snapshot parameter SnapshotProducer passes in (the tip of the target branch), so a branch commit would have rewritten the wrong manifests. This overrides toBranch to delegate to the inherited targetBranch validation (which rejects null and tag refs with IllegalArgumentException) and reads the current manifests from the snapshot parameter, mirroring FastAppend and MergingSnapshotProducer. Rewriting an existing branch now compacts that branch's manifests without affecting main; targeting a branch that does not exist yet forks it from main, consistent with other SnapshotUpdate operations. This enables per-branch manifest clustering and Write-Audit-Publish workflows. The implementation revives the approach from the stale PR #15982 by Joaquin van Loon, with additional tests covering branch/main divergence, branch creation, and null/tag validation. Closes #15981 Co-Authored-By: Joaquin van Loon Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/iceberg/BaseRewriteManifests.java | 11 +- .../apache/iceberg/TestRewriteManifests.java | 102 ++++++++++++++++-- 2 files changed, 105 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index e98027ec4a0a..32fc5bcf1d26 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -83,6 +83,12 @@ protected RewriteManifests self() { return this; } + @Override + public RewriteManifests toBranch(String branch) { + targetBranch(branch); + return this; + } + @Override protected String operation() { return DataOperations.REPLACE; @@ -168,10 +174,11 @@ private ManifestFile copyManifest(ManifestFile manifest) { @Override public List apply(TableMetadata base, Snapshot snapshot) { - List currentManifests = base.currentSnapshot().allManifests(ops().io()); + List currentManifests = + snapshot != null ? snapshot.allManifests(ops().io()) : Collections.emptyList(); Set currentManifestSet = ImmutableSet.copyOf(currentManifests); - validateDeletedManifests(currentManifestSet, base.currentSnapshot().snapshotId()); + validateDeletedManifests(currentManifestSet, snapshot != null ? snapshot.snapshotId() : -1L); if (requiresRewrite(currentManifestSet)) { performRewrite(currentManifests); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index dab323743bb1..75f311cb4988 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -1078,16 +1078,106 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE } @TestTemplate - public void testRewriteManifestsOnBranchUnsupported() { + public void testRewriteManifestsOnBranch() throws IOException { + String branch = "branch"; - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + table.newFastAppend().appendFile(FILE_A).commit(); + long mainSnapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branch, mainSnapshotId).commit(); + + // append a second file only on the branch so the branch diverges from main + table.newFastAppend().appendFile(FILE_B).toBranch(branch).commit(); + long branchAppendId = table.snapshot(branch).snapshotId(); + + assertThat(table.snapshot(branch).allManifests(table.io())).hasSize(2); + + // cluster by a constant combines the branch's manifests into one + table.rewriteManifests().clusterBy(file -> "").toBranch(branch).commit(); + + // main must be untouched: same snapshot, same single manifest with only FILE_A + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(mainSnapshotId); + List mainManifests = table.currentSnapshot().allManifests(table.io()); + assertThat(mainManifests).hasSize(1); + validateManifestEntries( + mainManifests.get(0), + ids(mainSnapshotId), + files(FILE_A), + statuses(ManifestEntry.Status.ADDED)); + + // the branch reflects the rewrite over its own (diverged) manifests, including FILE_B + List branchManifests = table.snapshot(branch).allManifests(table.io()); + assertThat(branchManifests).hasSize(1); + + // get the file order correct + List files; + List ids; + try (ManifestReader reader = + ManifestFiles.read(branchManifests.get(0), table.io(), table.specs())) { + if (reader.iterator().next().location().equals(FILE_A.location())) { + files = Arrays.asList(FILE_A, FILE_B); + ids = Arrays.asList(mainSnapshotId, branchAppendId); + } else { + files = Arrays.asList(FILE_B, FILE_A); + ids = Arrays.asList(branchAppendId, mainSnapshotId); + } + } + validateManifestEntries( + branchManifests.get(0), + ids.iterator(), + files.iterator(), + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + } + + @TestTemplate + public void testRewriteManifestsCreatesBranchIfNeeded() { + String branch = "newBranch"; + + table.newFastAppend().appendFile(FILE_A).commit(); + long mainSnapshotId = table.currentSnapshot().snapshotId(); + + // the branch does not exist yet; rewriting to it forks the branch from main + table.rewriteManifests().clusterBy(file -> "").toBranch(branch).commit(); + + // main must be untouched + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(mainSnapshotId); assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1); - assertThatThrownBy(() -> table.rewriteManifests().toBranch("someBranch").commit()) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage( - "Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits"); + // the branch was created, forked from main, with main's manifests rewritten onto it + SnapshotRef branchRef = table.ops().current().ref(branch); + assertThat(branchRef).isNotNull(); + assertThat(branchRef.isBranch()).isTrue(); + + Snapshot branchSnapshot = table.snapshot(branch); + assertThat(branchSnapshot.snapshotId()).isNotEqualTo(mainSnapshotId); + assertThat(branchSnapshot.parentId()).isEqualTo(mainSnapshotId); + assertThat(branchSnapshot.operation()).isEqualTo(DataOperations.REPLACE); + + List branchManifests = branchSnapshot.allManifests(table.io()); + assertThat(branchManifests).hasSize(1); + validateManifestEntries( + branchManifests.get(0), + ids(mainSnapshotId), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING)); + } + + @TestTemplate + public void testRewriteManifestsToBranchRejectsNullBranch() { + assertThatThrownBy(() -> table.rewriteManifests().toBranch(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid branch name: null"); + } + + @TestTemplate + public void testRewriteManifestsToBranchRejectsTag() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag("tag1", snapshotId).commit(); + + assertThatThrownBy(() -> table.rewriteManifests().toBranch("tag1")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("tag1 is a tag, not a branch. Tags cannot be targets for producing snapshots"); } @TestTemplate