Skip to content

Bug: dynamic_partition_overwrite silently skips spec-0 manifests after partition spec evolution #3148

@tusharchou

Description

@tusharchou

Summary

dynamic_partition_overwrite produces incorrect results when a table has undergone
partition spec evolution. Manifests written under older specs are silently skipped
by the manifest pruning logic introduced in #3011, leaving stale data files that
should have been deleted.

Root cause

In Table.dynamic_partition_overwrite (table/__init__.py), the delete predicate
is built using only the current partition spec:

delete_filter = self._build_partition_predicate(
    partition_records=partitions_to_overwrite,
    spec=self.table_metadata.spec(),       # always current spec
    schema=self.table_metadata.schema()
)

A snapshot with mixed partition_spec_ids (spec-0 and spec-1 manifests) passes
this single predicate to _DeleteFiles. The manifest evaluator in _build_partition_projection
uses inclusive_projection(schema, spec) — when projecting a spec-1 predicate
(e.g. category=A AND region=us) through spec-0 (which only has category), the
region reference has no corresponding partition field, causing the evaluator to
incorrectly skip spec-0 manifests entirely.

Reproduction

import tempfile, pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform

schema = Schema(
    NestedField(1, "category", StringType(), required=False),
    NestedField(2, "region",   StringType(), required=False),
    NestedField(3, "value",    LongType(),   required=False),
)
spec_v0 = PartitionSpec(
    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category")
)

with tempfile.TemporaryDirectory() as warehouse:
    catalog = load_catalog("test", **{"type": "sql", "uri": f"sqlite:///{warehouse}/catalog.db", "warehouse": f"file://{warehouse}"})
    catalog.create_namespace("default")
    table = catalog.create_table("default.test", schema=schema, partition_spec=spec_v0)

    # Write under spec 0
    table.append(pa.table({"category": ["A","A","B"], "region": [None,None,None], "value": [1,2,10]}))

    # Evolve spec
    with table.update_spec() as u:
        u.add_field("region", IdentityTransform(), "region")
    table = catalog.load_table("default.test")

    # Write under spec 1
    table.append(pa.table({"category": ["A","B"], "region": ["us","us"], "value": [100,200]}))

    # Overwrite category=A — should delete ALL A rows (both specs)
    table.dynamic_partition_overwrite(
        pa.table({"category": ["A"], "region": ["us"], "value": [999]})
    )

    result = table.scan().to_arrow().to_pydict()
    a_values = [v for c,v in zip(result["category"], result["value"]) if c == "A"]
    print(a_values)  # BUG: prints [1, 2, 100, 999] — stale rows from spec-0 not deleted
                     # EXPECTED: [999]

Fix

Build the delete predicate per historical spec present in the snapshot, projecting
the new data files' partition values into each spec's coordinate space before evaluating.
PR with fix and regression tests to follow.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions