diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 68089beb54..4c624ff4e3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -542,12 +542,69 @@ def dynamic_partition_overwrite( ) ) - partitions_to_overwrite = {data_file.partition for data_file in data_files} - delete_filter = self._build_partition_predicate( - partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema() - ) - self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + # partitions_to_overwrite = {data_file.partition for data_file in data_files} + # delete_filter = self._build_partition_predicate( + # partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema() + # ) + # self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + + # Build the partition predicate per-spec to handle tables that have + # undergone partition spec evolution. Manifests in the snapshot may be + # written under different (older) specs. We need to project the overwrite + # partitions into each historical spec's coordinate space so that the + # manifest evaluator correctly identifies which old manifests to delete. + current_spec = self.table_metadata.spec() + current_schema = self.table_metadata.schema() + # Collect the source column names (e.g. "category") that are being + # overwritten — these are stable across spec evolution (only field IDs matter). + overwrite_source_ids = {field.source_id for field in current_spec.fields} + + delete_filter: BooleanExpression = AlwaysFalse() + + # For each historical spec in the snapshot, build a predicate using + # only the fields that spec knows about, matched against the + # corresponding positions in the new data files' partition records. + snapshot = self.table_metadata.snapshot_by_name(branch or MAIN_BRANCH) + if snapshot is not None: + spec_ids_in_snapshot = {m.partition_spec_id for m in snapshot.manifests(io=self._table.io)} + else: + spec_ids_in_snapshot = {current_spec.spec_id} + + for spec_id in spec_ids_in_snapshot: + historical_spec = self.table_metadata.specs()[spec_id] + # Find which fields this historical spec shares with the current spec + shared_source_ids = {f.source_id for f in historical_spec.fields} & overwrite_source_ids + if not shared_source_ids: + # No overlap — this spec's manifests cannot contain our partitions + continue + + # Project the new data files' partitions into this historical spec's space: + # for each new data file, build a partition record using only the + # fields this historical spec knows about. + historical_partitions: set[Record] = set() + for data_file in data_files: + # data_file.partition is under current_spec — extract shared field values + record_values = [] + for field in historical_spec.fields: + if field.source_id in overwrite_source_ids: + # find position of this source_id in current spec + current_pos = next(i for i, f in enumerate(current_spec.fields) if f.source_id == field.source_id) + record_values.append(data_file.partition[current_pos]) + else: + record_values.append(None) + historical_partitions.add(Record(*record_values)) + + delete_filter = Or( + delete_filter, + self._build_partition_predicate( + partition_records=historical_partitions, + spec=historical_spec, + schema=current_schema, + ), + ) + + self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid for data_file in data_files: diff --git a/tests/integration/test_manifest_pruning_spec_evolution.py b/tests/integration/test_manifest_pruning_spec_evolution.py new file mode 100644 index 0000000000..d836e5be4a --- /dev/null +++ b/tests/integration/test_manifest_pruning_spec_evolution.py @@ -0,0 +1,283 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Regression / investigation test for manifest pruning correctness under partition spec evolution. + +Context +------- +PR #3011 (merged Feb 20 2026) added manifest pruning to _OverwriteFiles and _DeleteFiles +in pyiceberg/table/update/snapshot.py. The pruning builds a partition predicate from the +*current* partition spec and evaluates it against every manifest in the snapshot via a +KeyDefaultDict of per-spec evaluators. + +The question this test file investigates: + When a table has been through partition spec evolution, its snapshot may contain manifests + written under *different* partition_spec_ids. Does the manifest evaluator correctly resolve + each manifest's own spec before deciding whether to include or skip it? + +If the answer is "no", the overwrite will silently skip manifests from the old spec, leaving +stale data files that should have been deleted -- a silent correctness bug. + +How to run +---------- + pytest tests/integration/test_manifest_pruning_spec_evolution.py -v +""" + +import tempfile +from typing import Any + +import pyarrow as pa + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import LongType, NestedField, StringType + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +SCHEMA = Schema( + NestedField(field_id=1, name="category", field_type=StringType(), required=False), + NestedField(field_id=2, name="region", field_type=StringType(), required=False), + NestedField(field_id=3, name="value", field_type=LongType(), required=False), +) + +# Spec 0: partitioned only by category +SPEC_V0 = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category")) + + +def make_catalog(warehouse: str) -> Catalog: + """Spin up a local SQLite-backed catalog -- no services needed.""" + return load_catalog( + "test", + type="sql", + uri=f"sqlite:///{warehouse}/catalog.db", + warehouse=f"file://{warehouse}", + ) + + +def arrow_table(rows: list[dict[str, Any]]) -> pa.Table: + return pa.Table.from_pylist( + rows, + schema=pa.schema( + [ + pa.field("category", pa.string()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), + ] + ), + ) + + +# --------------------------------------------------------------------------- +# Test 1: Mixed spec snapshot -- overwrite partition present in both specs +# --------------------------------------------------------------------------- + + +def test_overwrite_after_partition_spec_evolution_correctness() -> None: + """ + Verifies that dynamic_partition_overwrite correctly replaces ALL data files + for the target partition, including those written under a previous partition spec. + + Setup: + - Spec 0: partition by identity(category) + - Write A(1,2,3) and B(10,11) under spec 0 + - Evolve to spec 1: add identity(region) + - Write A(100,101) and B(200) under spec 1 + - Overwrite category=A with new rows (999, 888) + + Expected after overwrite: + - Only new A rows: values [888, 999] + - All B rows untouched: values [10, 11, 200] + - Total: 5 rows + + Bug (pre-fix): spec-0 A manifests are skipped by the evaluator, + leaving stale A rows (1, 2, 3) in the table -> 8 rows total. + """ + with tempfile.TemporaryDirectory() as warehouse: + catalog = make_catalog(warehouse) + catalog.create_namespace("default") + + # --- Step 1: create table with spec 0 --- + table = catalog.create_table( + "default.test_spec_evolution_overwrite", + schema=SCHEMA, + partition_spec=SPEC_V0, + ) + + # --- Step 2: write data under spec 0 --- + table.append( + arrow_table( + [ + {"category": "A", "region": None, "value": 1}, + {"category": "A", "region": None, "value": 2}, + {"category": "A", "region": None, "value": 3}, + {"category": "B", "region": None, "value": 10}, + {"category": "B", "region": None, "value": 11}, + ] + ) + ) + assert table.scan().to_arrow().num_rows == 5 + + # --- Step 3: evolve partition spec -- add identity(region) --- + with table.update_spec() as update: + update.add_field( + source_column_name="region", + transform=IdentityTransform(), + partition_field_name="region", + ) + table = catalog.load_table("default.test_spec_evolution_overwrite") + assert table.spec().spec_id == 1, f"Expected spec_id=1, got {table.spec().spec_id}" + + # --- Step 4: write data under spec 1 --- + table.append( + arrow_table( + [ + {"category": "A", "region": "us", "value": 100}, + {"category": "A", "region": "eu", "value": 101}, + {"category": "B", "region": "us", "value": 200}, + ] + ) + ) + assert table.scan().to_arrow().num_rows == 8 + + # Confirm mixed-spec snapshot is actually set up + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + manifests = current_snapshot.manifests(table.io) + spec_ids_in_snapshot = {m.partition_spec_id for m in manifests} + assert len(spec_ids_in_snapshot) > 1, f"Test setup failed: expected manifests from >1 spec, got {spec_ids_in_snapshot}" + + # --- Step 5: dynamic_partition_overwrite for category=A only --- + table.dynamic_partition_overwrite( + arrow_table( + [ + {"category": "A", "region": "us", "value": 999}, + {"category": "A", "region": "eu", "value": 888}, + ] + ) + ) + + table = catalog.load_table("default.test_spec_evolution_overwrite") + result = table.scan().to_arrow().to_pydict() + + categories = result["category"] + values = result["value"] + + a_values = [v for c, v in zip(categories, values, strict=True) if c == "A"] + b_values = [v for c, v in zip(categories, values, strict=True) if c == "B"] + + # Total rows: 2 new A + 3 B = 5 + assert len(a_values) + len(b_values) == 5, ( + f"Row count mismatch: expected 5, got {len(a_values) + len(b_values)}.\n" + f"A values: {sorted(a_values)} -- stale values would be any of [1, 2, 3, 100, 101]\n" + f"B values: {sorted(b_values)}" + ) + + # A rows must be only the new ones + stale = [v for v in a_values if v in (1, 2, 3, 100, 101)] + assert not stale, ( + f"Stale A rows found (should have been deleted): {stale}\n" + f"spec-0 manifests were incorrectly skipped during manifest pruning." + ) + assert sorted(a_values) == [888, 999], f"Expected A=[888,999], got {sorted(a_values)}" + + # B rows completely untouched + assert sorted(b_values) == [10, 11, 200], f"Expected B=[10,11,200], got {sorted(b_values)}" + + +# --------------------------------------------------------------------------- +# Test 2: Overwrite partition that ONLY exists in spec-0 manifests +# This is the most dangerous case -- silent data duplication, no exception raised +# --------------------------------------------------------------------------- + + +def test_overwrite_partition_only_in_old_spec() -> None: + """ + Sharpest form of the bug: the overwrite target (category=B) has data + ONLY under spec-0. After spec evolution to spec-1, overwriting B should + delete the old spec-0 B files and write new ones. + + Bug (pre-fix): the manifest evaluator, built against spec-1's predicate, + finds zero matching manifests for B (because B only exists in spec-0 + manifests) -> UserWarning "did not match any records" -> old B rows survive + -> silent data duplication: [999, 10, 11] instead of [999]. + """ + with tempfile.TemporaryDirectory() as warehouse: + catalog = make_catalog(warehouse) + catalog.create_namespace("default") + + table = catalog.create_table( + "default.test_old_spec_only_overwrite", + schema=SCHEMA, + partition_spec=SPEC_V0, + ) + + # Write ONLY category=B under spec 0 + table.append( + arrow_table( + [ + {"category": "B", "region": None, "value": 10}, + {"category": "B", "region": None, "value": 11}, + ] + ) + ) + + # Evolve spec -- add identity(region) + with table.update_spec() as update: + update.add_field( + source_column_name="region", + transform=IdentityTransform(), + partition_field_name="region", + ) + table = catalog.load_table("default.test_old_spec_only_overwrite") + + # Write ONLY category=A under spec 1 (B has no spec-1 data) + table.append( + arrow_table( + [ + {"category": "A", "region": "us", "value": 100}, + ] + ) + ) + + # Overwrite category=B -- it only exists in spec-0 manifests + table.dynamic_partition_overwrite( + arrow_table( + [ + {"category": "B", "region": "us", "value": 999}, + ] + ) + ) + + table = catalog.load_table("default.test_old_spec_only_overwrite") + result = table.scan().to_arrow().to_pydict() + + categories = result["category"] + values = result["value"] + + b_values = [v for c, v in zip(categories, values, strict=True) if c == "B"] + a_values = [v for c, v in zip(categories, values, strict=True) if c == "A"] + + assert b_values == [999], ( + f"Expected B=[999] only, got {b_values}.\n" + f"Stale rows {[v for v in b_values if v != 999]} were not deleted -- " + f"spec-0 manifests were incorrectly skipped." + ) + assert a_values == [100], f"A data unexpectedly modified: {a_values}"