diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java index 8f9e8d69c90a..7a82ddabb104 100644 --- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java @@ -72,6 +72,18 @@ default MigrateTable executeWith(ExecutorService service) { throw new UnsupportedOperationException("Setting executor service is not supported"); } + /** + * Enables ignoring {@link java.io.FileNotFoundException} when listing source data files. When + * enabled, source data files that have disappeared (for example, because a partition directory + * was removed by concurrent cleanup) are skipped with a warning instead of failing the migration. + * The default is to fail. + * + * @return this for method chaining + */ + default MigrateTable ignoreMissingFiles() { + throw new UnsupportedOperationException("Ignoring missing files is not supported"); + } + /** The action result that contains a summary of the execution. */ interface Result { /** Returns the number of migrated data files. */ diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 8e594caa12d4..e413babbdcb2 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -653,6 +653,7 @@ By default, the original table is retained with the name `table_BACKUP_`. | `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) | | `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) | | `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | +| `ignore_missing_files` | | boolean | When true, skip source data files that cannot be found instead of failing (defaults to false) | #### Output diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 60645e1c35c2..5fd0fe966517 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -22,10 +22,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.util.List; import java.util.Map; +import org.apache.commons.io.FileUtils; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; @@ -305,4 +308,42 @@ public void testMigrateBucketedTable() throws IOException { "Cannot create an Iceberg table from a bucketed source table: " + "4 buckets, bucket columns: [id]"); } + + @TestTemplate + public void testMigrateMissingFilesFailByDefault() throws IOException { + createPartitionedTableWithMissingFiles(); + + assertThatThrownBy(() -> sql("CALL %s.system.migrate('%s')", catalogName, tableName)) + .as("Migrate should fail by default when source files are missing") + .hasRootCauseInstanceOf(FileNotFoundException.class); + } + + @TestTemplate + public void testMigrateIgnoreMissingFiles() throws IOException { + createPartitionedTableWithMissingFiles(); + + Object result = + scalarSql( + "CALL %s.system.migrate(table => '%s', ignore_missing_files => true)", + catalogName, tableName); + assertThat(result).as("Should have imported only the surviving partition").isEqualTo(1L); + + assertEquals( + "Migrated table should only contain rows from the surviving partition", + ImmutableList.of(row("b", 2L)), + sql("SELECT * FROM %s", tableName)); + } + + private void createPartitionedTableWithMissingFiles() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", tableName); + + // Remove one partition's directory while leaving the catalog entry intact to simulate a + // concurrent deletion racing with the migration. + FileUtils.deleteDirectory(new File(location, "id=1")); + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index bdffeb465405..9e5649a71f49 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.actions; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.iceberg.Snapshot; @@ -61,6 +62,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction call(InternalRow args) { migrateTableSparkAction.executeWith(SparkTableUtil.migrationService(parallelism)); } + boolean ignoreMissingFiles = input.asBoolean(IGNORE_MISSING_FILES_PARAM, false); + if (ignoreMissingFiles) { + migrateTableSparkAction = migrateTableSparkAction.ignoreMissingFiles(); + } + MigrateTable.Result result = migrateTableSparkAction.execute(); return asScanIterator(OUTPUT_TYPE, newInternalRow(result.migratedDataFilesCount())); }