Skip to content

perf(upsert): prune destination scan via df partition-column ranges a…#3387

Open
paultmathew wants to merge 1 commit into
apache:mainfrom
paultmathew:perf/upsert-partition-pruning
Open

perf(upsert): prune destination scan via df partition-column ranges a…#3387
paultmathew wants to merge 1 commit into
apache:mainfrom
paultmathew:perf/upsert-partition-pruning

Conversation

@paultmathew
Copy link
Copy Markdown
Contributor

@paultmathew paultmathew commented May 20, 2026

Rationale for this change

Transaction.upsert builds its scan row_filter from join_cols
alone via create_match_filter. When the partition spec sources from
columns NOT in join_cols — a common pattern for append-only event
logs partitioned by time but keyed by composite IDs — two amplifying
problems fall out at scan plan time:

The result is a full-table scan at every upsert. For tables with 10k+
partitions this is multi-minute / multi-gigabyte work per call.

What this PR does

Two complementary optimisations to Transaction.upsert:

  1. Partition-range augmentation. New helper
    upsert_util.augment_filter_with_partition_ranges derives
    [min, max] predicates from df for every partition source column
    present in the frame and ANDs them into the row filter.
    inclusive_projection then projects each range through its
    partition transform (hours, days, months, years, identity,
    truncate) at scan-plan time, enabling manifest- and file-level
    pruning.

  2. Column-projection for the insert-only path. When
    when_matched_update_all=False the consumer loop only reads
    join_cols off each destination batch (to build the per-batch
    source-side match filter). Passing selected_fields=tuple(join_cols)
    to DataScan lets the parquet reader prune wide non-key columns.
    The existing _projected_field_ids.union(extract_field_ids(...))
    keeps the partition-range predicate's columns readable.

Correctness guards

The augmentation skips per-column in three cases:

  • The partition source column isn't present in df (no bound to derive).
  • The column is entirely null in df (no min/max).
  • The column has any null in df — a non-null GreaterThanOrEqual
    predicate would exclude NULL-partition destination rows whose
    (join_cols) may collide with null-partition source rows. Skip
    pruning over emitting an unsafe predicate.

When min == max, an EqualTo is emitted instead of the range pair.
Multiple partition fields sourcing from the same column emit one
source-column range; inclusive_projection projects through each
partition field independently at scan time. Bucket and other
non-monotonic transforms return None from their project method on
inequalities — the projection contributes AlwaysTrue for them, no
harm.

Are these changes tested?

Yes:

  • 13 new unit tests in tests/table/test_upsert.py:
    • Pure-function tests for augment_filter_with_partition_ranges
      (unpartitioned, missing column, all-null, partial-null,
      single-value, range, multi-field-sharing-source).
    • End-to-end upsert semantics with partition spec NOT in
      join_cols, IN join_cols, and unpartitioned.
    • DataScan.plan_files() count assertion against a deterministically
      seeded table that defeats per-file metrics pruning — confirms the
      augmentation prunes vs the original predicate.
    • selected_fields projection assertions for both
      when_matched_update_all=True (legacy ('*',)) and =False
      (narrow join_cols-only).
    • End-to-end upsert with when_matched_update_all=True against a
      wide table to confirm column projection doesn't break the update
      path.
  • 23 existing upsert tests still pass.

Smoke test — real Iceberg-on-S3 + Glue table

Run against a real Iceberg table representative of the workload this
optimisation targets.

Stack

  • pyiceberg.catalog.glue.GlueCatalog
  • AWS S3 warehouse, parquet data files
  • Iceberg format v2

Target table

  • Write mode: append-only event log
  • unique_keys: [conversation_id, id] (composite UUID/string key)
  • partition_spec: hours(created_at)
  • Size at the test snapshot: ~10,450 data files, ~3.2 GiB total
  • Hourly partitions over ~15 months of history
  • Avg file size: ~0.32 MiB (post hourly OPTIMIZE compaction)
  • Schema (6 columns):
    • conversation_id (string, UUID v4) — key
    • id (string, UUID v4) — key
    • event (string, short tag, ~10 B/row)
    • log (string, JSON payload, ~400 B/row median)
    • created_at (timestamp[us, UTC]) — partition source
    • version (int32)

Source synthesis (two modes for the comparison):

  • synthetic: random UUIDs; conversation_id drawn from a pool sized
    rows/30 so leading-key cardinality matches realistic parent-child
    distribution; created_at uniformly in [now − hours, now]. Keys
    don't overlap destination → metrics evaluator rejects every file at
    scan-plan time, so both projections return 0 files. Isolates the
    planning cost.
  • from-destination: reads N recent rows from the destination via a
    created_at range scan, used as the source. Keys DO overlap →
    exercises the file-count reduction and column-read effect.

Results (read-only via DataScan.plan_files() and
to_arrow_batch_reader()):

Scenario Original plan Augmented plan Plan-time win Wide read Narrow read
1 000 synthetic rows / 24 h 0 / 10 452 (454.04 s) 0 / 10 452 (3.11 s) 146×
1 000 dest-sampled rows / 24 h (skipped, 7-min baseline) 7 / 10 452 (99.93%) 493 KiB / 1 000 rows 68 KiB / 1 000 rows (86% smaller)
10 000 dest-sampled rows / 168 h (catch-up flush) (skipped) 58 / 10 492 (99.4%)

The 146× plan-time win is on a 1 000-disjunct predicate against a
~10k-file table; the original cost scales linearly with table file
count and predicate disjunct count, the augmented cost scales with the
source's created_at span instead.

The 86% byte reduction is dominated by skipping the log (JSON
payload) column at the parquet reader — that one column carries ~80%
of the row width on this table.

For a representative larger flush — 1.16M source rows spanning ~24 h
— extrapolating both wins reduces the destination-scan working set
from multiple GiB (which is OOM-territory on 8 GiB worker pods) down
to tens of MiB.

Are there any user-facing changes?

No API change. The optimisation is purely internal to
Transaction.upsert:

  • The new helper is exported from pyiceberg.table.upsert_util for
    testability but isn't part of the public API.
  • selected_fields=tuple(join_cols) is passed conditionally inside the
    method — no signature change to Table.upsert or
    Transaction.upsert.

Why a range augmentation rather than reusing _build_partition_predicate?

Transaction._build_partition_predicate plus _determine_partitions
together could express the same intent — apply each partition
transform to df, take the distinct partition tuples, and emit
Or(And(EqualTo, …), …). It would prune marginally harder for
gappy sources (where [min, max] over-fetches the gap). I picked
the range approach over that combination for three reasons:

  1. Predicate size. Exact partition match emits one disjunct per
    distinct partition value present in df. For a daily-partitioned
    table with a multi-month source the Or reaches hundreds of nodes
    — exactly the predicate-bloat shape that motivated Optimize upsert performance for large datasets #2943. The range
    approach is 2 nodes per partition column regardless of cardinality
    and downstream metrics-evaluator cost scales with that.

  2. Reuse boundary. _determine_partitions is bound to the write
    path: it filters + combine_chunks per partition for the writer
    to consume. Reusing it for read-side planning either wastes that
    work or requires lifting the partition-key extraction into a shared
    helper — a separable, slightly larger refactor.

  3. Idiomatic projection. GreaterThanOrEqual(source_col, min)
    feeds inclusive_projection source-side bounds and lets the
    existing transform.project(...) machinery rewrite them into
    partition-column predicates at scan time. That's the same
    projection path the rest of pyiceberg uses.

For temporally-dense sources (the dominant upsert pattern: a recent
batch of activity, no internal gaps) the two approaches prune the same
files. For gappy sources the exact-match approach prunes strictly
harder at the cost of a larger predicate. Happy to switch to the
_determine_partitions + _build_partition_predicate combo if
reviewers prefer that direction — or to factor a thin
partition_records_from_arrow_table helper out of
_determine_partitions so both sides can share it.

Related

Was generative AI tooling used to co-author this PR?
  • Yes — Claude (Cursor agent)

…nd project join_cols only

Two complementary optimizations to ``Transaction.upsert`` for tables
whose partition spec sources from columns NOT in ``join_cols`` (a
common pattern for append-only event logs partitioned by time but
keyed by composite IDs):

1. Partition-range augmentation: ``upsert_util.augment_filter_with_partition_ranges``
   derives ``[min, max]`` predicates from ``df`` for every partition
   source column present in the frame and ANDs them into the row
   filter built by ``create_match_filter``. ``inclusive_projection``
   then projects each range through the partition transform at scan
   plan time, enabling manifest- and file-level pruning that the
   key-only filter can't trigger.

2. Column-projection for the insert-only path: when
   ``when_matched_update_all=False`` the consumer loop only reads
   ``join_cols`` off each destination batch. Passing
   ``selected_fields=tuple(join_cols)`` to ``DataScan`` lets the
   parquet reader prune wide non-key columns. The existing
   ``_projected_field_ids`` auto-union with row-filter columns keeps
   the partition-range predicate's data accessible.

Correctness guards skip the augmentation per-column when the source
column is absent from df, entirely null, or partially null (a non-null
range predicate would exclude NULL-partition destination rows whose
keys may collide with the null-partition source rows).

Related to apache#2138, apache#2159, apache#3129. Complementary to (closed-stale) apache#2943's
"coarse match filter" approach: that PR shrinks the row predicate
itself; this one adds partition pruning the row predicate can't
trigger on its own.

Co-authored-by: Cursor <cursoragent@cursor.com>
@paultmathew paultmathew marked this pull request as ready for review May 20, 2026 20:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant