diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index e98027ec4a0a..32fc5bcf1d26 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -83,6 +83,12 @@ protected RewriteManifests self() { return this; } + @Override + public RewriteManifests toBranch(String branch) { + targetBranch(branch); + return this; + } + @Override protected String operation() { return DataOperations.REPLACE; @@ -168,10 +174,11 @@ private ManifestFile copyManifest(ManifestFile manifest) { @Override public List apply(TableMetadata base, Snapshot snapshot) { - List currentManifests = base.currentSnapshot().allManifests(ops().io()); + List currentManifests = + snapshot != null ? snapshot.allManifests(ops().io()) : Collections.emptyList(); Set currentManifestSet = ImmutableSet.copyOf(currentManifests); - validateDeletedManifests(currentManifestSet, base.currentSnapshot().snapshotId()); + validateDeletedManifests(currentManifestSet, snapshot != null ? snapshot.snapshotId() : -1L); if (requiresRewrite(currentManifestSet)) { performRewrite(currentManifests); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index dab323743bb1..75f311cb4988 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -1078,16 +1078,106 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE } @TestTemplate - public void testRewriteManifestsOnBranchUnsupported() { + public void testRewriteManifestsOnBranch() throws IOException { + String branch = "branch"; - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + table.newFastAppend().appendFile(FILE_A).commit(); + long mainSnapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branch, mainSnapshotId).commit(); + + // append a second file only on the branch so the branch diverges from main + table.newFastAppend().appendFile(FILE_B).toBranch(branch).commit(); + long branchAppendId = table.snapshot(branch).snapshotId(); + + assertThat(table.snapshot(branch).allManifests(table.io())).hasSize(2); + + // cluster by a constant combines the branch's manifests into one + table.rewriteManifests().clusterBy(file -> "").toBranch(branch).commit(); + + // main must be untouched: same snapshot, same single manifest with only FILE_A + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(mainSnapshotId); + List mainManifests = table.currentSnapshot().allManifests(table.io()); + assertThat(mainManifests).hasSize(1); + validateManifestEntries( + mainManifests.get(0), + ids(mainSnapshotId), + files(FILE_A), + statuses(ManifestEntry.Status.ADDED)); + + // the branch reflects the rewrite over its own (diverged) manifests, including FILE_B + List branchManifests = table.snapshot(branch).allManifests(table.io()); + assertThat(branchManifests).hasSize(1); + + // get the file order correct + List files; + List ids; + try (ManifestReader reader = + ManifestFiles.read(branchManifests.get(0), table.io(), table.specs())) { + if (reader.iterator().next().location().equals(FILE_A.location())) { + files = Arrays.asList(FILE_A, FILE_B); + ids = Arrays.asList(mainSnapshotId, branchAppendId); + } else { + files = Arrays.asList(FILE_B, FILE_A); + ids = Arrays.asList(branchAppendId, mainSnapshotId); + } + } + validateManifestEntries( + branchManifests.get(0), + ids.iterator(), + files.iterator(), + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + } + + @TestTemplate + public void testRewriteManifestsCreatesBranchIfNeeded() { + String branch = "newBranch"; + + table.newFastAppend().appendFile(FILE_A).commit(); + long mainSnapshotId = table.currentSnapshot().snapshotId(); + + // the branch does not exist yet; rewriting to it forks the branch from main + table.rewriteManifests().clusterBy(file -> "").toBranch(branch).commit(); + + // main must be untouched + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(mainSnapshotId); assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1); - assertThatThrownBy(() -> table.rewriteManifests().toBranch("someBranch").commit()) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage( - "Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits"); + // the branch was created, forked from main, with main's manifests rewritten onto it + SnapshotRef branchRef = table.ops().current().ref(branch); + assertThat(branchRef).isNotNull(); + assertThat(branchRef.isBranch()).isTrue(); + + Snapshot branchSnapshot = table.snapshot(branch); + assertThat(branchSnapshot.snapshotId()).isNotEqualTo(mainSnapshotId); + assertThat(branchSnapshot.parentId()).isEqualTo(mainSnapshotId); + assertThat(branchSnapshot.operation()).isEqualTo(DataOperations.REPLACE); + + List branchManifests = branchSnapshot.allManifests(table.io()); + assertThat(branchManifests).hasSize(1); + validateManifestEntries( + branchManifests.get(0), + ids(mainSnapshotId), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING)); + } + + @TestTemplate + public void testRewriteManifestsToBranchRejectsNullBranch() { + assertThatThrownBy(() -> table.rewriteManifests().toBranch(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid branch name: null"); + } + + @TestTemplate + public void testRewriteManifestsToBranchRejectsTag() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag("tag1", snapshotId).commit(); + + assertThatThrownBy(() -> table.rewriteManifests().toBranch("tag1")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("tag1 is a tag, not a branch. Tags cannot be targets for producing snapshots"); } @TestTemplate