-
Notifications
You must be signed in to change notification settings - Fork 494
Support replace_table_transaction for RTAS workflows
#3220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,20 +42,25 @@ | |
| ) | ||
| from pyiceberg.io import FileIO, load_file_io | ||
| from pyiceberg.manifest import ManifestFile | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.partitioning import ( | ||
| UNPARTITIONED_PARTITION_SPEC, | ||
| PartitionSpec, | ||
| assign_fresh_partition_spec_ids_for_replace, | ||
| ) | ||
| from pyiceberg.schema import Schema, assign_fresh_schema_ids_for_replace | ||
| from pyiceberg.serializers import ToOutputFile | ||
| from pyiceberg.table import ( | ||
| DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, | ||
| CommitTableResponse, | ||
| CreateTableTransaction, | ||
| ReplaceTableTransaction, | ||
| StagedTable, | ||
| Table, | ||
| TableProperties, | ||
| ) | ||
| from pyiceberg.table.locations import load_location_provider | ||
| from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids | ||
| from pyiceberg.table.update import ( | ||
| TableRequirement, | ||
| TableUpdate, | ||
|
|
@@ -444,6 +449,90 @@ def create_table_if_not_exists( | |
| except TableAlreadyExistsError: | ||
| return self.load_table(identifier) | ||
|
|
||
| def replace_table_transaction( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pointing out that I've made this a concrete method and not abstract - curious for thoughts! Originally, I had this be abstract and overrode in ( |
||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Schema | pa.Schema, | ||
| location: str | None = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> ReplaceTableTransaction: | ||
| """Create a ReplaceTableTransaction. | ||
|
|
||
| The transaction can be used to stage additional changes (schema evolution, | ||
| partition evolution, etc.) before committing. | ||
|
|
||
| Args: | ||
| identifier (str | Identifier): Table identifier. | ||
| schema (Schema): New table schema. | ||
| location (str | None): New table location. Defaults to the existing location. | ||
| partition_spec (PartitionSpec): New partition spec. | ||
| sort_order (SortOrder): New sort order. | ||
| properties (Properties): Properties to apply. Merged on top of the existing | ||
| table properties: keys present here override existing values; existing keys | ||
| not present here are preserved. To remove a property, follow up with a | ||
| transaction that removes it explicitly. | ||
|
|
||
| Returns: | ||
| ReplaceTableTransaction: A transaction for the replace operation. | ||
|
|
||
| Raises: | ||
| NoSuchTableError: If the table does not exist. | ||
| """ | ||
| existing_table = self.load_table(identifier) | ||
| existing_metadata = existing_table.metadata | ||
|
|
||
| raw_format_version = properties.get(TableProperties.FORMAT_VERSION) | ||
| if raw_format_version is not None: | ||
| try: | ||
| requested_format_version = int(raw_format_version) | ||
| except (TypeError, ValueError) as exc: | ||
| raise ValueError(f"Invalid format-version property: {raw_format_version!r}") from exc | ||
| if requested_format_version < existing_metadata.format_version: | ||
| raise ValueError( | ||
| f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [AI Reviewer Aid] Java's |
||
| ) | ||
| resolved_format_version = requested_format_version | ||
| else: | ||
| resolved_format_version = existing_metadata.format_version | ||
| iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version)) | ||
| iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [AI Reviewer Aid] Same call |
||
|
|
||
| fresh_schema, _ = assign_fresh_schema_ids_for_replace( | ||
| iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id | ||
| ) | ||
| fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace( | ||
| partition_spec, | ||
| iceberg_schema, | ||
| fresh_schema, | ||
| existing_metadata.partition_specs, | ||
| existing_metadata.last_partition_id, | ||
| format_version=existing_metadata.format_version, | ||
| current_spec=existing_metadata.spec(), | ||
| ) | ||
| fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) | ||
|
|
||
| resolved_location = location.rstrip("/") if location else existing_metadata.location | ||
| if not resolved_location: | ||
| raise ValueError("Resolved table location must not be empty") | ||
|
|
||
| staged_table = StagedTable( | ||
| identifier=existing_table.name(), | ||
| metadata=existing_metadata, | ||
| metadata_location=existing_table.metadata_location, | ||
| io=existing_table.io, | ||
| catalog=self, | ||
| ) | ||
| return ReplaceTableTransaction( | ||
| table=staged_table, | ||
| new_schema=fresh_schema, | ||
| new_spec=fresh_partition_spec, | ||
| new_sort_order=fresh_sort_order, | ||
| new_location=resolved_location, | ||
| new_properties=properties, | ||
| ) | ||
|
|
||
| @abstractmethod | ||
| def load_table(self, identifier: str | Identifier) -> Table: | ||
| """Load the table's metadata and returns the table instance. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,13 +68,18 @@ | |
| FileIO, | ||
| load_file_io, | ||
| ) | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids | ||
| from pyiceberg.partitioning import ( | ||
| UNPARTITIONED_PARTITION_SPEC, | ||
| PartitionSpec, | ||
| assign_fresh_partition_spec_ids, | ||
| ) | ||
| from pyiceberg.schema import Schema, assign_fresh_schema_ids | ||
| from pyiceberg.table import ( | ||
| CommitTableRequest, | ||
| CommitTableResponse, | ||
| CreateTableTransaction, | ||
| FileScanTask, | ||
| ReplaceTableTransaction, | ||
| StagedTable, | ||
| Table, | ||
| TableIdentifier, | ||
|
|
@@ -957,6 +962,19 @@ def create_table_transaction( | |
| staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response) | ||
| return CreateTableTransaction(staged_table) | ||
|
|
||
| @override | ||
| @retry(**_RETRY_ARGS) | ||
| def replace_table_transaction( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java's |
||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Schema | pa.Schema, | ||
| location: str | None = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> ReplaceTableTransaction: | ||
| return super().replace_table_transaction(identifier, schema, location, partition_spec, sort_order, properties) | ||
|
|
||
| @override | ||
| @retry(**_RETRY_ARGS) | ||
| def create_view( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre | |
| return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID) | ||
|
|
||
|
|
||
| def assign_fresh_partition_spec_ids_for_replace( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [AI Reviewer Aid] Mirrors the v2 path in Java's |
||
| spec: PartitionSpec, | ||
| old_schema: Schema, | ||
| fresh_schema: Schema, | ||
| existing_specs: list[PartitionSpec], | ||
| last_partition_id: int | None, | ||
| format_version: int = 2, | ||
| current_spec: PartitionSpec | None = None, | ||
| ) -> tuple[PartitionSpec, int]: | ||
| """Assign partition field IDs for a replace operation, reusing IDs from existing specs. | ||
|
|
||
| - For v2+, reuse partition field IDs by `(source_id, transform)` across all existing specs. | ||
| New fields get IDs starting from `last_partition_id + 1`. | ||
| - For v1, the current spec's fields must be preserved (v1 specs are append-only). Fields | ||
| absent from the new spec are carried forward with a `VoidTransform`. Matching new fields | ||
| reuse the existing partition field ID; remaining new fields are appended with fresh IDs. | ||
|
|
||
| Args: | ||
| spec: The new partition spec to assign IDs to. Its `source_id`s reference `old_schema`. | ||
| old_schema: The schema that the new spec's `source_id`s reference. | ||
| fresh_schema: The schema with freshly assigned field IDs. | ||
| existing_specs: All partition specs from the existing table metadata. | ||
| last_partition_id: The current table's `last_partition_id`. | ||
| format_version: Table format version. Required to be set to 1 for v1 carry-forward. | ||
| current_spec: The current default partition spec. Required when `format_version <= 1`. | ||
|
|
||
| Returns: | ||
| A tuple of `(fresh_spec, new_last_partition_id)`. | ||
| """ | ||
| effective_last_partition_id = last_partition_id if last_partition_id is not None else PARTITION_FIELD_ID_START - 1 | ||
|
|
||
| if format_version <= 1: | ||
| if current_spec is None: | ||
| raise ValueError("current_spec is required for v1 replace_table") | ||
| return _assign_fresh_partition_spec_ids_for_replace_v1( | ||
| spec, old_schema, fresh_schema, current_spec, effective_last_partition_id | ||
| ) | ||
|
|
||
| # v2+: reuse field IDs by (source_id, transform) across all specs. When the same | ||
| # (source_id, transform) appears in multiple specs, prefer the highest field_id. | ||
| transform_to_field_id: dict[tuple[int, str], int] = {} | ||
| for existing_spec in existing_specs: | ||
| for field in existing_spec.fields: | ||
| key = (field.source_id, str(field.transform)) | ||
| if key not in transform_to_field_id or field.field_id > transform_to_field_id[key]: | ||
| transform_to_field_id[key] = field.field_id | ||
|
|
||
| next_id = effective_last_partition_id | ||
| partition_fields = [] | ||
| for field in spec.fields: | ||
| original_column_name = old_schema.find_column_name(field.source_id) | ||
| if original_column_name is None: | ||
| raise ValueError(f"Could not find in old schema: {field}") | ||
| fresh_field = fresh_schema.find_field(original_column_name) | ||
| if fresh_field is None: | ||
| raise ValueError(f"Could not find field in fresh schema: {original_column_name}") | ||
|
|
||
| validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema, set()) | ||
|
|
||
| key = (fresh_field.field_id, str(field.transform)) | ||
| if key in transform_to_field_id: | ||
| partition_field_id = transform_to_field_id[key] | ||
| else: | ||
| next_id += 1 | ||
| partition_field_id = next_id | ||
| transform_to_field_id[key] = partition_field_id | ||
|
|
||
| partition_fields.append( | ||
| PartitionField( | ||
| name=field.name, | ||
| source_id=fresh_field.field_id, | ||
| field_id=partition_field_id, | ||
| transform=field.transform, | ||
| ) | ||
| ) | ||
|
|
||
| # `next_id` starts at `effective_last_partition_id` and only increments, so it is the | ||
| # new last partition id. | ||
| return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id | ||
|
|
||
|
|
||
| def _assign_fresh_partition_spec_ids_for_replace_v1( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [AI Reviewer Aid] Mirrors the v1 branch of Java's |
||
| spec: PartitionSpec, | ||
| old_schema: Schema, | ||
| fresh_schema: Schema, | ||
| current_spec: PartitionSpec, | ||
| effective_last_partition_id: int, | ||
| ) -> tuple[PartitionSpec, int]: | ||
| """v1 branch of `assign_fresh_partition_spec_ids_for_replace`. See parent docstring.""" | ||
| # Build (fresh_source_id, transform) → (new_field, fresh_source_id) for the new spec, | ||
| # in insertion order so leftover fields keep their declared order on append. | ||
| new_field_by_key: dict[tuple[int, str], tuple[PartitionField, int]] = {} | ||
| new_field_names: list[str] = [] | ||
| for new_field in spec.fields: | ||
| col_name = old_schema.find_column_name(new_field.source_id) | ||
| if col_name is None: | ||
| raise ValueError(f"Could not find in old schema: {new_field}") | ||
| fresh_field = fresh_schema.find_field(col_name) | ||
| if fresh_field is None: | ||
| raise ValueError(f"Could not find field in fresh schema: {col_name}") | ||
| validate_partition_name(new_field.name, new_field.transform, fresh_field.field_id, fresh_schema, set()) | ||
| key = (fresh_field.field_id, str(new_field.transform)) | ||
| new_field_by_key[key] = (new_field, fresh_field.field_id) | ||
| new_field_names.append(new_field.name) | ||
|
|
||
| # Walk current spec, carrying forward each field. Matching new fields consume their key; | ||
| # missing fields become void transforms. | ||
| used_names: set[str] = set(new_field_names) | ||
| partition_fields = [] | ||
| for cur_field in current_spec.fields: | ||
| key = (cur_field.source_id, str(cur_field.transform)) | ||
| match = new_field_by_key.pop(key, None) | ||
| if match is not None: | ||
| new_field, fresh_source_id = match | ||
| partition_fields.append( | ||
| PartitionField( | ||
| name=new_field.name, | ||
| source_id=fresh_source_id, | ||
| field_id=cur_field.field_id, | ||
| transform=new_field.transform, | ||
| ) | ||
| ) | ||
| used_names.add(new_field.name) | ||
| else: | ||
| void_name = _unique_void_name(cur_field.name, cur_field.field_id, used_names) | ||
| used_names.add(void_name) | ||
| partition_fields.append( | ||
| PartitionField( | ||
| name=void_name, | ||
| source_id=cur_field.source_id, | ||
| field_id=cur_field.field_id, | ||
| transform=VoidTransform(), | ||
| ) | ||
| ) | ||
|
|
||
| # Append remaining new fields at the end with fresh partition IDs. | ||
| next_id = effective_last_partition_id | ||
| for new_field, fresh_source_id in new_field_by_key.values(): | ||
| next_id += 1 | ||
| partition_fields.append( | ||
| PartitionField( | ||
| name=new_field.name, | ||
| source_id=fresh_source_id, | ||
| field_id=next_id, | ||
| transform=new_field.transform, | ||
| ) | ||
| ) | ||
|
|
||
| # `next_id` starts at `effective_last_partition_id` and only increments, so it is the | ||
| # new last partition id. | ||
| return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id | ||
|
|
||
|
|
||
| def _unique_void_name(base_name: str, field_id: int, used_names: set[str]) -> str: | ||
| """Pick a void-transform name that does not collide with already-used names. | ||
|
|
||
| First tries `base_name`; if taken, tries `base_name_{field_id}`; if still taken, | ||
| appends `_2`, `_3`, ... until unique. | ||
| """ | ||
| if base_name not in used_names: | ||
| return base_name | ||
| candidate = f"{base_name}_{field_id}" | ||
| suffix = 2 | ||
| while candidate in used_names: | ||
| candidate = f"{base_name}_{field_id}_{suffix}" | ||
| suffix += 1 | ||
| return candidate | ||
|
|
||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went back and forth on adding a top-level
Catalog.replace_tablein this PR.Iceberg Java doesn't support this catalog API — its
Catalog.newReplaceTableTransaction(...)andTableBuilder.replaceTransaction()both return aTransactionyou have to.commitTransaction()yourself..I decided to drop support for this, because adding it to the catalog API felt contentious - and we can always add it as a follow-up if folks want to (it's a bit tricky also as we ideally don't want an unnecessary
load_tablecall after the commit). This keeps this PR smaller, simpler and isolated too.