Skip to content

Support replace_table_transaction for RTAS workflows#3220

Draft
smaheshwar-pltr wants to merge 1 commit into
apache:mainfrom
smaheshwar-pltr:sm/replace-table
Draft

Support replace_table_transaction for RTAS workflows#3220
smaheshwar-pltr wants to merge 1 commit into
apache:mainfrom
smaheshwar-pltr:sm/replace-table

Conversation

@smaheshwar-pltr
Copy link
Copy Markdown
Contributor

@smaheshwar-pltr smaheshwar-pltr commented Apr 7, 2026

Closes #281.

Rationale for this change

From docs in PR, we support the below RTAS workflow:

with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
    txn.append(df)

See #281 (comment) in particular for design motivation.

Note: Largely inspired by the reference (Java) implementation throughout (I've gotten an LLM to drop review comments referencing relevant places in the Iceberg Java codebase)

Field IDs are reused by name from the current schema. Schema, partition-spec, and sort-order IDs are reused from history when identical to a previous entry — see #433 (comment)

Are these changes tested?

Yes, with both unit and integration tests in:

  • tests/catalog/test_catalog_behaviors.py
  • tests/catalog/test_rest.py
  • tests/integration/test_catalog.py
  • tests/test_schema.py
  • tests/table/test_partitioning.py

Are there any user-facing changes?

  • New public method on Catalog
  • Docs!

@smaheshwar-pltr smaheshwar-pltr force-pushed the sm/replace-table branch 4 times, most recently from 4361d29 to 93d77d3 Compare April 19, 2026 08:12
@smaheshwar-pltr smaheshwar-pltr changed the title Support replace_table and replace_table_transaction for REST catalog Support replace_table and replace_table_transaction May 18, 2026
return self._table


class ReplaceTableTransaction(Transaction):
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Same role as Java's Transactions.replaceTableTransaction — collects the metadata updates that transform the existing table into the replacement and commits them with the replace-specific requirements set.

self._updates += (UpgradeFormatVersionUpdate(format_version=requested_format_version),)

# Remove the main branch ref to clear the current snapshot.
self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Only main is cleared, matching Java's buildReplacement which calls removeRef(SnapshotRef.MAIN_BRANCH). Other branches / tags survive replace.

Comment thread pyiceberg/partitioning.py
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


def assign_fresh_partition_spec_ids_for_replace(
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors the v2 path in Java's TableMetadata.reassignPartitionIds — collect (source_id, transform) -> field_id across all existing specs, reuse on match, fresh ids for the rest.

Comment thread pyiceberg/partitioning.py
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id


def _assign_fresh_partition_spec_ids_for_replace_v1(
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors the v1 branch of Java's reassignPartitionIds. V1 partition specs are append-only by spec rule, so a replace that drops a partition field would produce an invalid v1 spec without carrying it forward as VoidTransform. The _unique_void_name suffix loop matches Java's collision-renaming pattern, generalized to loop further if both name and name_<field_id> are taken. Covered by test_replace_table_partition_field_carry_forward[v1-carries-forward] and the helper-level v1 tests in tests/table/test_partitioning.py.

Comment thread pyiceberg/schema.py
return new_id


def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Maps to Java's TypeUtil.assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) and its AssignFreshIds visitor. Name-based reuse from the current schema only (not the full history) — matches Java's behaviour. Type compatibility is the caller's responsibility.

return existing.order_id
return None

def commit_transaction(self) -> Table:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors Java's UpdateRequirements.forReplaceTable plus the per-update additions for AddSchema / AddPartitionSpec. AssertCurrentSchemaID / AssertDefaultSpecID / AssertDefaultSortOrderID are intentionally not emitted — Java suppresses them via the !isReplace guards.

Deliberate stricter divergence: Python emits AssertLastAssignedFieldId / AssertLastAssignedPartitionId unconditionally; Java emits them only when the changeset contains a corresponding Add*. They differ only when the new schema/spec structurally matches a historical entry, so Python emits SetCurrent* rather than Add* and Java would skip the assertion. Keeping it stricter guards against silently dropping a concurrent column-add: a concurrent last_column_id bump fails fast with an assert-last-assigned-field-id mismatch rather than producing a current schema missing the new column. Covered by test_concurrent_replace_transaction_schema_conflict / ..._partition_spec_conflict.

self._updates += (SetPropertiesUpdate(updates=new_properties),)

@staticmethod
def _find_matching_schema_id(table_metadata: TableMetadata, schema: Schema) -> int | None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors Java's reuseOrCreateNewSchemaId (and the spec / sort-order siblings) — walk all historical entries, return the existing id on structural match, otherwise generate a fresh one. Covers the case Fokko walked through in #433 (comment): CREATE OR REPLACE back to a previously-seen schema reuses its schema_id and does not append a duplicate.

)


def test_replace_transaction(catalog: Catalog, test_table_identifier: Identifier) -> None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors Java's testReplaceTransaction (UUID + schema swap + time-travel-readable old snapshot) and folds in the snapshot-log invariant from testReplaceTableKeepsSnapshotLog (pre-replace snapshot_log entries must survive).

assert time_travel.column("id").to_pylist() == original_data.column("id").to_pylist()


def test_replace_transaction_requires_table_exists(catalog: Catalog, test_table_identifier: Identifier) -> None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors Java's testReplaceTransactionRequiresTableExists.

@smaheshwar-pltr smaheshwar-pltr changed the title Support replace_table and replace_table_transaction Support replace_table_transaction for RTAS workflows May 19, 2026
int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version
)
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version))
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.

assert len(after.schemas) == len(before.schemas)


def test_concurrent_replace_transaction_schema_conflict(catalog: Catalog, test_table_identifier: Identifier) -> None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors Java's testConcurrentReplaceTransactionSchemaConflict. The non-conflict variants (testConcurrentReplaceTransactions, testConcurrentReplaceTransactionSchema) are deliberately not ported — PyIceberg emits AssertLastAssignedFieldId unconditionally (see the divergence note on commit_transaction), so those happy-path concurrent flows would fail-fast here rather than succeed.

txn_b.commit_transaction()


def test_concurrent_replace_transaction_partition_spec_conflict(catalog: Catalog, test_table_identifier: Identifier) -> None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors Java's testConcurrentReplaceTransactionPartitionSpecConflict. Same deliberate-not-ported reasoning for the non-conflict spec variants as for the schema case above.


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_replace_table_transaction(test_catalog: Catalog, database_name: str, table_name: str) -> None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] This test exercises only the RTAS path. The DDL-only invariant — that after a replace with no follow-up write, current_snapshot_id is None because the main branch ref was removed — is covered against InMemoryCatalog + SqlCatalog by test_replace_transaction in the catalog-behavior suite.

Comment thread mkdocs/docs/api.md
Comment on lines +193 to +194
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
txn.append(df)
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth on adding a top-level Catalog.replace_table in this PR.

Iceberg Java doesn't support this catalog API — its Catalog.newReplaceTableTransaction(...) and TableBuilder.replaceTransaction() both return a Transaction you have to .commitTransaction() yourself..

I decided to drop support for this, because adding it to the catalog API felt contentious - and we can always add it as a follow-up if folks want to (it's a bit tricky also as we ideally don't want an unnecessary load_table call after the commit). This keeps this PR smaller, simpler and isolated too.

# with a fresh schema_id (max + 1, matching UpdateSchema's convention).
existing_schema_id = self._find_matching_schema_id(table_metadata, new_schema)
if existing_schema_id is not None:
self._updates += (SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Per Java's reuseOrCreateNewSchemaId — walks all historical schemas, reuses the id if structurally identical, otherwise max(id) + 1. SetCurrentSchemaUpdate is emitted even on the reuse branch (this line), mirroring Java's RESTSessionCatalog.replaceTransaction which always ensures a SetCurrentSchema change is in the request.

effective_spec = UNPARTITIONED_PARTITION_SPEC if new_spec.is_unpartitioned() else new_spec
existing_spec_id = self._find_matching_spec_id(table_metadata, effective_spec)
if existing_spec_id is not None:
self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Per Java's reuseOrCreateNewSpecId. SetDefaultSpecUpdate is emitted even on the reuse branch (this line), also per the RESTSessionCatalog.replaceTransaction block.

)

# Sort order: same reuse-or-add pattern with fresh order_id on add.
effective_sort_order = UNSORTED_SORT_ORDER if new_sort_order.is_unsorted else new_sort_order
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Per Java's reuseOrCreateNewSortOrderId. Unsorted reuses id 0 in Java; the effective_sort_order substitution on this line achieves the same — a caller-supplied unsorted order folds back to the canonical UNSORTED_SORT_ORDER so the matching lookup below hits the always-present order_id 0.


@override
@retry(**_RETRY_ARGS)
def replace_table_transaction(
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's RESTSessionCatalog.replaceTransaction does a view-existence check before replacing (see apache/iceberg#9012). Leaving that implementation + tests for a follow-up PR (if folks are fine with that).

raise ValueError(f"Invalid format-version property: {raw_format_version!r}") from exc
if requested_format_version < existing_metadata.format_version:
raise ValueError(
f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Java's buildReplacement reads format-version from properties and only upgrades. Rejecting downgrade explicitly here — otherwise _convert_schema_if_needed would run with v1 semantics while the actual upgrade silently drops, producing a confusing mismatch.

except TableAlreadyExistsError:
return self.load_table(identifier)

def replace_table_transaction(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointing out that I've made this a concrete method and not abstract - curious for thoughts!

Originally, I had this be abstract and overrode in RestCatalog and MetastoreCatalog. I then realised that the way PyIceberg is wired up, the methods are identical and so I collapsed into a concrete method here.

(replace_table_transaction differs from create_table_transaction in that the latter requires a stage-create flag to be sent to the REST server; there's no such flow for replace_table_transactions).

Comment on lines +94 to +95
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Introducing in credentials for the new test below)

old_snapshot_id: int


def _run_complete_replace(catalog: Catalog, identifier: Identifier, tmp_path: Path) -> _ReplaceFixture:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] The three test_complete_replace_transaction_* tests below share this setup, which together mirror Java's testCompleteReplaceTransaction — exercises all six replace_table_transaction args (schema + spec + sort + location + properties) with an RTAS append, and asserts history accumulates, the new snapshot has no parent, and property-merge semantics (keep / override / add — which Java covers under testReplaceTransactionProperties* in the same file).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

REPLACE TABLE Support

1 participant