Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ default MigrateTable executeWith(ExecutorService service) {
throw new UnsupportedOperationException("Setting executor service is not supported");
}

/**
* Enables ignoring {@link java.io.FileNotFoundException} when listing source data files. When
* enabled, source data files that have disappeared (for example, because a partition directory
* was removed by concurrent cleanup) are skipped with a warning instead of failing the migration.
* The default is to fail.
*
* @return this for method chaining
*/
default MigrateTable ignoreMissingFiles() {
throw new UnsupportedOperationException("Ignoring missing files is not supported");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of migrated data files. */
Expand Down
1 change: 1 addition & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ By default, the original table is retained with the name `table_BACKUP_`.
| `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) |
| `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |
| `ignore_missing_files` | | boolean | When true, skip source data files that cannot be found instead of failing (defaults to false) |

#### Output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -305,4 +308,42 @@ public void testMigrateBucketedTable() throws IOException {
"Cannot create an Iceberg table from a bucketed source table: "
+ "4 buckets, bucket columns: [id]");
}

@TestTemplate
public void testMigrateMissingFilesFailByDefault() throws IOException {
createPartitionedTableWithMissingFiles();

assertThatThrownBy(() -> sql("CALL %s.system.migrate('%s')", catalogName, tableName))
.as("Migrate should fail by default when source files are missing")
.hasRootCauseInstanceOf(FileNotFoundException.class);
}

@TestTemplate
public void testMigrateIgnoreMissingFiles() throws IOException {
createPartitionedTableWithMissingFiles();

Object result =
scalarSql(
"CALL %s.system.migrate(table => '%s', ignore_missing_files => true)",
catalogName, tableName);
assertThat(result).as("Should have imported only the surviving partition").isEqualTo(1L);

assertEquals(
"Migrated table should only contain rows from the surviving partition",
ImmutableList.of(row("b", 2L)),
sql("SELECT * FROM %s", tableName));
}

private void createPartitionedTableWithMissingFiles() throws IOException {
assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'",
tableName, location);
sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')", tableName);

// Remove one partition's directory while leaving the catalog entry intact to simulate a
// concurrent deletion racing with the migration.
FileUtils.deleteDirectory(new File(location, "id=1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.actions;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
private Identifier backupIdent;
private boolean dropBackup = false;
private ExecutorService executorService;
private boolean ignoreMissingFiles = false;

MigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
Expand Down Expand Up @@ -116,6 +118,12 @@ public MigrateTableSparkAction executeWith(ExecutorService service) {
return this;
}

@Override
public MigrateTableSparkAction ignoreMissingFiles() {
this.ignoreMissingFiles = true;
return this;
}

@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
Expand Down Expand Up @@ -146,7 +154,14 @@ private MigrateTable.Result doExecute() {
String stagingLocation = getMetadataLocation(icebergTable);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
SparkTableUtil.importSparkTable(
spark(), v1BackupIdent, icebergTable, stagingLocation, executorService);
spark(),
v1BackupIdent,
icebergTable,
stagingLocation,
Collections.emptyMap(),
false,
ignoreMissingFiles,
executorService);

LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ class MigrateTableProcedure extends BaseProcedure {
optionalInParameter("backup_table_name", DataTypes.StringType);
private static final ProcedureParameter PARALLELISM_PARAM =
optionalInParameter("parallelism", DataTypes.IntegerType);
private static final ProcedureParameter IGNORE_MISSING_FILES_PARAM =
optionalInParameter("ignore_missing_files", DataTypes.BooleanType);
Comment thread
huaxingao marked this conversation as resolved.

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
TABLE_PARAM, PROPERTIES_PARAM, DROP_BACKUP_PARAM, BACKUP_TABLE_NAME_PARAM, PARALLELISM_PARAM
TABLE_PARAM,
PROPERTIES_PARAM,
DROP_BACKUP_PARAM,
BACKUP_TABLE_NAME_PARAM,
PARALLELISM_PARAM,
IGNORE_MISSING_FILES_PARAM
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -117,6 +124,11 @@ public Iterator<Scan> call(InternalRow args) {
migrateTableSparkAction.executeWith(SparkTableUtil.migrationService(parallelism));
}

boolean ignoreMissingFiles = input.asBoolean(IGNORE_MISSING_FILES_PARAM, false);
if (ignoreMissingFiles) {
migrateTableSparkAction = migrateTableSparkAction.ignoreMissingFiles();
}

MigrateTable.Result result = migrateTableSparkAction.execute();
return asScanIterator(OUTPUT_TYPE, newInternalRow(result.migratedDataFilesCount()));
}
Expand Down