From dda235c6b0219b8f9d855ac29bf5540ff58f6d1d Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Sun, 31 May 2026 23:30:40 -0700 Subject: [PATCH 1/2] API, SPark 4.1: Add ignore_missing_files to migrate procedure --- .../apache/iceberg/actions/MigrateTable.java | 12 +++++ .../extensions/TestMigrateTableProcedure.java | 44 +++++++++++++++++++ .../actions/MigrateTableSparkAction.java | 17 ++++++- .../procedures/MigrateTableProcedure.java | 15 ++++++- 4 files changed, 86 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java index 8f9e8d69c90a..08387bde0fba 100644 --- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java @@ -72,6 +72,18 @@ default MigrateTable executeWith(ExecutorService service) { throw new UnsupportedOperationException("Setting executor service is not supported"); } + /** + * Sets whether to ignore {@link java.io.FileNotFoundException} when listing source data files. + * When set to {@code true}, partitions whose files have been deleted concurrently are skipped + * with a warning instead of failing the migration. The default is {@code false}. + * + * @param ignore whether to ignore missing source files + * @return this for method chaining + */ + default MigrateTable ignoreMissingFiles(boolean ignore) { + throw new UnsupportedOperationException("Ignoring missing files is not supported"); + } + /** The action result that contains a summary of the execution. */ interface Result { /** Returns the number of migrated data files. */ diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 60645e1c35c2..523da4950f09 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -23,9 +23,14 @@ import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; @@ -305,4 +310,43 @@ public void testMigrateBucketedTable() throws IOException { "Cannot create an Iceberg table from a bucketed source table: " + "4 buckets, bucket columns: [id]"); } + + @TestTemplate + public void testMigrateIgnoreMissingFiles() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", tableName); + + // Remove one partition's directory while leaving the catalog entry intact to simulate a + // concurrent deletion racing with the migration. + deleteDirectory(Paths.get(location, "id=1")); + + Object result = + scalarSql( + "CALL %s.system.migrate(table => '%s', ignore_missing_files => true)", + catalogName, tableName); + assertThat(result).as("Should have imported only the surviving partition").isEqualTo(1L); + + assertEquals( + "Migrated table should only contain rows from the surviving partition", + ImmutableList.of(row("b", 2L)), + sql("SELECT * FROM %s", tableName)); + } + + private static void deleteDirectory(Path dir) throws IOException { + try (Stream walk = Files.walk(dir)) { + walk.sorted(Comparator.reverseOrder()) + .forEach( + p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index bdffeb465405..88b617287e43 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.actions; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.iceberg.Snapshot; @@ -61,6 +62,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction call(InternalRow args) { migrateTableSparkAction.executeWith(SparkTableUtil.migrationService(parallelism)); } + if (input.isProvided(IGNORE_MISSING_FILES_PARAM)) { + migrateTableSparkAction = + migrateTableSparkAction.ignoreMissingFiles( + input.asBoolean(IGNORE_MISSING_FILES_PARAM, false)); + } + MigrateTable.Result result = migrateTableSparkAction.execute(); return asScanIterator(OUTPUT_TYPE, newInternalRow(result.migratedDataFilesCount())); } From 17a41fc20c8bd4b624cc405b1e0597449e70b78b Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Tue, 2 Jun 2026 19:07:50 -0700 Subject: [PATCH 2/2] address comments --- .../apache/iceberg/actions/MigrateTable.java | 10 ++-- docs/docs/spark-procedures.md | 1 + .../extensions/TestMigrateTableProcedure.java | 51 +++++++++---------- .../actions/MigrateTableSparkAction.java | 4 +- .../procedures/MigrateTableProcedure.java | 7 ++- 5 files changed, 35 insertions(+), 38 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java index 08387bde0fba..7a82ddabb104 100644 --- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java @@ -73,14 +73,14 @@ default MigrateTable executeWith(ExecutorService service) { } /** - * Sets whether to ignore {@link java.io.FileNotFoundException} when listing source data files. - * When set to {@code true}, partitions whose files have been deleted concurrently are skipped - * with a warning instead of failing the migration. The default is {@code false}. + * Enables ignoring {@link java.io.FileNotFoundException} when listing source data files. When + * enabled, source data files that have disappeared (for example, because a partition directory + * was removed by concurrent cleanup) are skipped with a warning instead of failing the migration. + * The default is to fail. * - * @param ignore whether to ignore missing source files * @return this for method chaining */ - default MigrateTable ignoreMissingFiles(boolean ignore) { + default MigrateTable ignoreMissingFiles() { throw new UnsupportedOperationException("Ignoring missing files is not supported"); } diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 8e594caa12d4..e413babbdcb2 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -653,6 +653,7 @@ By default, the original table is retained with the name `table_BACKUP_`. | `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) | | `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) | | `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | +| `ignore_missing_files` | | boolean | When true, skip source data files that cannot be found instead of failing (defaults to false) | #### Output diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 523da4950f09..5fd0fe966517 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -22,15 +22,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.stream.Stream; +import org.apache.commons.io.FileUtils; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; @@ -312,17 +310,17 @@ public void testMigrateBucketedTable() throws IOException { } @TestTemplate - public void testMigrateIgnoreMissingFiles() throws IOException { - assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); - String location = Files.createTempDirectory(temp, "junit").toFile().toString(); - sql( - "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", - tableName, location); - sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", tableName); + public void testMigrateMissingFilesFailByDefault() throws IOException { + createPartitionedTableWithMissingFiles(); - // Remove one partition's directory while leaving the catalog entry intact to simulate a - // concurrent deletion racing with the migration. - deleteDirectory(Paths.get(location, "id=1")); + assertThatThrownBy(() -> sql("CALL %s.system.migrate('%s')", catalogName, tableName)) + .as("Migrate should fail by default when source files are missing") + .hasRootCauseInstanceOf(FileNotFoundException.class); + } + + @TestTemplate + public void testMigrateIgnoreMissingFiles() throws IOException { + createPartitionedTableWithMissingFiles(); Object result = scalarSql( @@ -336,17 +334,16 @@ public void testMigrateIgnoreMissingFiles() throws IOException { sql("SELECT * FROM %s", tableName)); } - private static void deleteDirectory(Path dir) throws IOException { - try (Stream walk = Files.walk(dir)) { - walk.sorted(Comparator.reverseOrder()) - .forEach( - p -> { - try { - Files.delete(p); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } + private void createPartitionedTableWithMissingFiles() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", tableName); + + // Remove one partition's directory while leaving the catalog entry intact to simulate a + // concurrent deletion racing with the migration. + FileUtils.deleteDirectory(new File(location, "id=1")); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index 88b617287e43..9e5649a71f49 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -119,8 +119,8 @@ public MigrateTableSparkAction executeWith(ExecutorService service) { } @Override - public MigrateTableSparkAction ignoreMissingFiles(boolean ignore) { - this.ignoreMissingFiles = ignore; + public MigrateTableSparkAction ignoreMissingFiles() { + this.ignoreMissingFiles = true; return this; } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java index 3d64125c97b3..16e958bf21bc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java @@ -124,10 +124,9 @@ public Iterator call(InternalRow args) { migrateTableSparkAction.executeWith(SparkTableUtil.migrationService(parallelism)); } - if (input.isProvided(IGNORE_MISSING_FILES_PARAM)) { - migrateTableSparkAction = - migrateTableSparkAction.ignoreMissingFiles( - input.asBoolean(IGNORE_MISSING_FILES_PARAM, false)); + boolean ignoreMissingFiles = input.asBoolean(IGNORE_MISSING_FILES_PARAM, false); + if (ignoreMissingFiles) { + migrateTableSparkAction = migrateTableSparkAction.ignoreMissingFiles(); } MigrateTable.Result result = migrateTableSparkAction.execute();