Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```

## Replace a table

Atomically replace an existing table's schema, partition spec, sort order, location, and properties via `replace_table_transaction`. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Open the transaction with the new definition, stage any additional changes (writes, property updates, schema evolution), and commit — for example, an RTAS (replace-table-as-select) that swaps the schema and writes the new data atomically:

```python
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
txn.append(df)
Comment on lines +193 to +194
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth on adding a top-level Catalog.replace_table in this PR.

Iceberg Java doesn't support this catalog API — its Catalog.newReplaceTableTransaction(...) and TableBuilder.replaceTransaction() both return a Transaction you have to .commitTransaction() yourself..

I decided to drop support for this, because adding it to the catalog API felt contentious - and we can always add it as a follow-up if folks want to (it's a bit tricky also as we ideally don't want an unnecessary load_table call after the commit). This keeps this PR smaller, simpler and isolated too.

```

Field IDs are reused by name from the previous schema; new columns get fresh IDs above `last-column-id`.

Table properties are *merged* on replace: properties you don't pass are preserved on the table. To remove a property, drop it explicitly within the transaction.

Pass `format-version` in `properties` to upgrade the table's format version as part of the replace.

## Register a table

To register a table using existing metadata:
Expand Down
95 changes: 92 additions & 3 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,25 @@
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec,
assign_fresh_partition_spec_ids_for_replace,
)
from pyiceberg.schema import Schema, assign_fresh_schema_ids_for_replace
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableResponse,
CreateTableTransaction,
ReplaceTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.table.update import (
TableRequirement,
TableUpdate,
Expand Down Expand Up @@ -444,6 +449,90 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)

def replace_table_transaction(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointing out that I've made this a concrete method and not abstract - curious for thoughts!

Originally, I had this be abstract and overrode in RestCatalog and MetastoreCatalog. I then realised that the way PyIceberg is wired up, the methods are identical and so I collapsed into a concrete method here.

(replace_table_transaction differs from create_table_transaction in that the latter requires a stage-create flag to be sent to the REST server; there's no such flow for replace_table_transactions).

self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
"""Create a ReplaceTableTransaction.

The transaction can be used to stage additional changes (schema evolution,
partition evolution, etc.) before committing.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): New table schema.
location (str | None): New table location. Defaults to the existing location.
partition_spec (PartitionSpec): New partition spec.
sort_order (SortOrder): New sort order.
properties (Properties): Properties to apply. Merged on top of the existing
table properties: keys present here override existing values; existing keys
not present here are preserved. To remove a property, follow up with a
transaction that removes it explicitly.

Returns:
ReplaceTableTransaction: A transaction for the replace operation.

Raises:
NoSuchTableError: If the table does not exist.
"""
existing_table = self.load_table(identifier)
existing_metadata = existing_table.metadata

raw_format_version = properties.get(TableProperties.FORMAT_VERSION)
if raw_format_version is not None:
try:
requested_format_version = int(raw_format_version)
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid format-version property: {raw_format_version!r}") from exc
if requested_format_version < existing_metadata.format_version:
raise ValueError(
f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Java's buildReplacement reads format-version from properties and only upgrades. Rejecting downgrade explicitly here — otherwise _convert_schema_if_needed would run with v1 semantics while the actual upgrade silently drops, producing a confusing mismatch.

)
resolved_format_version = requested_format_version
else:
resolved_format_version = existing_metadata.format_version
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version))
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.


fresh_schema, _ = assign_fresh_schema_ids_for_replace(
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
)
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
partition_spec,
iceberg_schema,
fresh_schema,
existing_metadata.partition_specs,
existing_metadata.last_partition_id,
format_version=existing_metadata.format_version,
current_spec=existing_metadata.spec(),
)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

resolved_location = location.rstrip("/") if location else existing_metadata.location
if not resolved_location:
raise ValueError("Resolved table location must not be empty")

staged_table = StagedTable(
identifier=existing_table.name(),
metadata=existing_metadata,
metadata_location=existing_table.metadata_location,
io=existing_table.io,
catalog=self,
)
return ReplaceTableTransaction(
table=staged_table,
new_schema=fresh_schema,
new_spec=fresh_partition_spec,
new_sort_order=fresh_sort_order,
new_location=resolved_location,
new_properties=properties,
)

@abstractmethod
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.table import (
CommitTableResponse,
CreateTableTransaction,
ReplaceTableTransaction,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -68,6 +69,18 @@ def create_table_transaction(
) -> CreateTableTransaction:
raise NotImplementedError

@override
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
raise NotImplementedError

@override
def load_table(self, identifier: str | Identifier) -> Table:
raise NotImplementedError
Expand Down
20 changes: 19 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@
FileIO,
load_file_io,
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec,
assign_fresh_partition_spec_ids,
)
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
FileScanTask,
ReplaceTableTransaction,
StagedTable,
Table,
TableIdentifier,
Expand Down Expand Up @@ -957,6 +962,19 @@ def create_table_transaction(
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
return CreateTableTransaction(staged_table)

@override
@retry(**_RETRY_ARGS)
def replace_table_transaction(
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's RESTSessionCatalog.replaceTransaction does a view-existence check before replacing (see apache/iceberg#9012). Leaving that implementation + tests for a follow-up PR (if folks are fine with that).

self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
return super().replace_table_transaction(identifier, schema, location, partition_spec, sort_order, properties)

@override
@retry(**_RETRY_ARGS)
def create_view(
Expand Down
169 changes: 169 additions & 0 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


def assign_fresh_partition_spec_ids_for_replace(
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors the v2 path in Java's TableMetadata.reassignPartitionIds — collect (source_id, transform) -> field_id across all existing specs, reuse on match, fresh ids for the rest.

spec: PartitionSpec,
old_schema: Schema,
fresh_schema: Schema,
existing_specs: list[PartitionSpec],
last_partition_id: int | None,
format_version: int = 2,
current_spec: PartitionSpec | None = None,
) -> tuple[PartitionSpec, int]:
"""Assign partition field IDs for a replace operation, reusing IDs from existing specs.

- For v2+, reuse partition field IDs by `(source_id, transform)` across all existing specs.
New fields get IDs starting from `last_partition_id + 1`.
- For v1, the current spec's fields must be preserved (v1 specs are append-only). Fields
absent from the new spec are carried forward with a `VoidTransform`. Matching new fields
reuse the existing partition field ID; remaining new fields are appended with fresh IDs.

Args:
spec: The new partition spec to assign IDs to. Its `source_id`s reference `old_schema`.
old_schema: The schema that the new spec's `source_id`s reference.
fresh_schema: The schema with freshly assigned field IDs.
existing_specs: All partition specs from the existing table metadata.
last_partition_id: The current table's `last_partition_id`.
format_version: Table format version. Required to be set to 1 for v1 carry-forward.
current_spec: The current default partition spec. Required when `format_version <= 1`.

Returns:
A tuple of `(fresh_spec, new_last_partition_id)`.
"""
effective_last_partition_id = last_partition_id if last_partition_id is not None else PARTITION_FIELD_ID_START - 1

if format_version <= 1:
if current_spec is None:
raise ValueError("current_spec is required for v1 replace_table")
return _assign_fresh_partition_spec_ids_for_replace_v1(
spec, old_schema, fresh_schema, current_spec, effective_last_partition_id
)

# v2+: reuse field IDs by (source_id, transform) across all specs. When the same
# (source_id, transform) appears in multiple specs, prefer the highest field_id.
transform_to_field_id: dict[tuple[int, str], int] = {}
for existing_spec in existing_specs:
for field in existing_spec.fields:
key = (field.source_id, str(field.transform))
if key not in transform_to_field_id or field.field_id > transform_to_field_id[key]:
transform_to_field_id[key] = field.field_id

next_id = effective_last_partition_id
partition_fields = []
for field in spec.fields:
original_column_name = old_schema.find_column_name(field.source_id)
if original_column_name is None:
raise ValueError(f"Could not find in old schema: {field}")
fresh_field = fresh_schema.find_field(original_column_name)
if fresh_field is None:
raise ValueError(f"Could not find field in fresh schema: {original_column_name}")

validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema, set())

key = (fresh_field.field_id, str(field.transform))
if key in transform_to_field_id:
partition_field_id = transform_to_field_id[key]
else:
next_id += 1
partition_field_id = next_id
transform_to_field_id[key] = partition_field_id

partition_fields.append(
PartitionField(
name=field.name,
source_id=fresh_field.field_id,
field_id=partition_field_id,
transform=field.transform,
)
)

# `next_id` starts at `effective_last_partition_id` and only increments, so it is the
# new last partition id.
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id


def _assign_fresh_partition_spec_ids_for_replace_v1(
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Reviewer Aid] Mirrors the v1 branch of Java's reassignPartitionIds. V1 partition specs are append-only by spec rule, so a replace that drops a partition field would produce an invalid v1 spec without carrying it forward as VoidTransform. The _unique_void_name suffix loop matches Java's collision-renaming pattern, generalized to loop further if both name and name_<field_id> are taken. Covered by test_replace_table_partition_field_carry_forward[v1-carries-forward] and the helper-level v1 tests in tests/table/test_partitioning.py.

spec: PartitionSpec,
old_schema: Schema,
fresh_schema: Schema,
current_spec: PartitionSpec,
effective_last_partition_id: int,
) -> tuple[PartitionSpec, int]:
"""v1 branch of `assign_fresh_partition_spec_ids_for_replace`. See parent docstring."""
# Build (fresh_source_id, transform) → (new_field, fresh_source_id) for the new spec,
# in insertion order so leftover fields keep their declared order on append.
new_field_by_key: dict[tuple[int, str], tuple[PartitionField, int]] = {}
new_field_names: list[str] = []
for new_field in spec.fields:
col_name = old_schema.find_column_name(new_field.source_id)
if col_name is None:
raise ValueError(f"Could not find in old schema: {new_field}")
fresh_field = fresh_schema.find_field(col_name)
if fresh_field is None:
raise ValueError(f"Could not find field in fresh schema: {col_name}")
validate_partition_name(new_field.name, new_field.transform, fresh_field.field_id, fresh_schema, set())
key = (fresh_field.field_id, str(new_field.transform))
new_field_by_key[key] = (new_field, fresh_field.field_id)
new_field_names.append(new_field.name)

# Walk current spec, carrying forward each field. Matching new fields consume their key;
# missing fields become void transforms.
used_names: set[str] = set(new_field_names)
partition_fields = []
for cur_field in current_spec.fields:
key = (cur_field.source_id, str(cur_field.transform))
match = new_field_by_key.pop(key, None)
if match is not None:
new_field, fresh_source_id = match
partition_fields.append(
PartitionField(
name=new_field.name,
source_id=fresh_source_id,
field_id=cur_field.field_id,
transform=new_field.transform,
)
)
used_names.add(new_field.name)
else:
void_name = _unique_void_name(cur_field.name, cur_field.field_id, used_names)
used_names.add(void_name)
partition_fields.append(
PartitionField(
name=void_name,
source_id=cur_field.source_id,
field_id=cur_field.field_id,
transform=VoidTransform(),
)
)

# Append remaining new fields at the end with fresh partition IDs.
next_id = effective_last_partition_id
for new_field, fresh_source_id in new_field_by_key.values():
next_id += 1
partition_fields.append(
PartitionField(
name=new_field.name,
source_id=fresh_source_id,
field_id=next_id,
transform=new_field.transform,
)
)

# `next_id` starts at `effective_last_partition_id` and only increments, so it is the
# new last partition id.
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id


def _unique_void_name(base_name: str, field_id: int, used_names: set[str]) -> str:
"""Pick a void-transform name that does not collide with already-used names.

First tries `base_name`; if taken, tries `base_name_{field_id}`; if still taken,
appends `_2`, `_3`, ... until unique.
"""
if base_name not in used_names:
return base_name
candidate = f"{base_name}_{field_id}"
suffix = 2
while candidate in used_names:
candidate = f"{base_name}_{field_id}_{suffix}"
suffix += 1
return candidate


T = TypeVar("T")


Expand Down
Loading
Loading