perf(upsert): prune destination scan via df partition-column ranges a…#3387
Open
paultmathew wants to merge 1 commit into
Open
perf(upsert): prune destination scan via df partition-column ranges a…#3387paultmathew wants to merge 1 commit into
paultmathew wants to merge 1 commit into
Conversation
…nd project join_cols only Two complementary optimizations to ``Transaction.upsert`` for tables whose partition spec sources from columns NOT in ``join_cols`` (a common pattern for append-only event logs partitioned by time but keyed by composite IDs): 1. Partition-range augmentation: ``upsert_util.augment_filter_with_partition_ranges`` derives ``[min, max]`` predicates from ``df`` for every partition source column present in the frame and ANDs them into the row filter built by ``create_match_filter``. ``inclusive_projection`` then projects each range through the partition transform at scan plan time, enabling manifest- and file-level pruning that the key-only filter can't trigger. 2. Column-projection for the insert-only path: when ``when_matched_update_all=False`` the consumer loop only reads ``join_cols`` off each destination batch. Passing ``selected_fields=tuple(join_cols)`` to ``DataScan`` lets the parquet reader prune wide non-key columns. The existing ``_projected_field_ids`` auto-union with row-filter columns keeps the partition-range predicate's data accessible. Correctness guards skip the augmentation per-column when the source column is absent from df, entirely null, or partially null (a non-null range predicate would exclude NULL-partition destination rows whose keys may collide with the null-partition source rows). Related to apache#2138, apache#2159, apache#3129. Complementary to (closed-stale) apache#2943's "coarse match filter" approach: that PR shrinks the row predicate itself; this one adds partition pruning the row predicate can't trigger on its own. Co-authored-by: Cursor <cursoragent@cursor.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
Transaction.upsertbuilds its scanrow_filterfromjoin_colsalone via
create_match_filter. When the partition spec sources fromcolumns NOT in
join_cols— a common pattern for append-only eventlogs partitioned by time but keyed by composite IDs — two amplifying
problems fall out at scan plan time:
inclusive_projectioncollapses the predicate toAlwaysTrueagainst the partition spec, so partition pruning never fires and
every file in the table is listed (related: Upsertion memory usage grows exponentially as table size grows #2138, Upserting large table extremely slow #2159, Upsert with 1M rows extremely slow due to
create_match_filterandtxn.delete()performance #3129).Or(And(EqualTo, EqualTo), …)predicate on UUID-shaped key columns can't prune either —
per-file
lower_bound/upper_boundstats span essentially thefull UUID space, so the metrics evaluator passes every file.
The result is a full-table scan at every upsert. For tables with 10k+
partitions this is multi-minute / multi-gigabyte work per call.
What this PR does
Two complementary optimisations to
Transaction.upsert:Partition-range augmentation. New helper
upsert_util.augment_filter_with_partition_rangesderives[min, max]predicates fromdffor every partition source columnpresent in the frame and ANDs them into the row filter.
inclusive_projectionthen projects each range through itspartition transform (
hours,days,months,years,identity,truncate) at scan-plan time, enabling manifest- and file-levelpruning.
Column-projection for the insert-only path. When
when_matched_update_all=Falsethe consumer loop only readsjoin_colsoff each destination batch (to build the per-batchsource-side match filter). Passing
selected_fields=tuple(join_cols)to
DataScanlets the parquet reader prune wide non-key columns.The existing
_projected_field_ids.union(extract_field_ids(...))keeps the partition-range predicate's columns readable.
Correctness guards
The augmentation skips per-column in three cases:
df(no bound to derive).df(nomin/max).df— a non-nullGreaterThanOrEqualpredicate would exclude NULL-partition destination rows whose
(join_cols)may collide with null-partition source rows. Skippruning over emitting an unsafe predicate.
When
min == max, anEqualTois emitted instead of the range pair.Multiple partition fields sourcing from the same column emit one
source-column range;
inclusive_projectionprojects through eachpartition field independently at scan time. Bucket and other
non-monotonic transforms return
Nonefrom theirprojectmethod oninequalities — the projection contributes
AlwaysTruefor them, noharm.
Are these changes tested?
Yes:
tests/table/test_upsert.py:augment_filter_with_partition_ranges(unpartitioned, missing column, all-null, partial-null,
single-value, range, multi-field-sharing-source).
join_cols, INjoin_cols, and unpartitioned.DataScan.plan_files()count assertion against a deterministicallyseeded table that defeats per-file metrics pruning — confirms the
augmentation prunes vs the original predicate.
selected_fieldsprojection assertions for bothwhen_matched_update_all=True(legacy('*',)) and=False(narrow
join_cols-only).when_matched_update_all=Trueagainst awide table to confirm column projection doesn't break the update
path.
Smoke test — real Iceberg-on-S3 + Glue table
Run against a real Iceberg table representative of the workload this
optimisation targets.
Stack
pyiceberg.catalog.glue.GlueCatalogTarget table
unique_keys:[conversation_id, id](composite UUID/string key)partition_spec:hours(created_at)conversation_id(string, UUID v4) — keyid(string, UUID v4) — keyevent(string, short tag, ~10 B/row)log(string, JSON payload, ~400 B/row median)created_at(timestamp[us, UTC]) — partition sourceversion(int32)Source synthesis (two modes for the comparison):
synthetic: random UUIDs;conversation_iddrawn from a pool sizedrows/30so leading-key cardinality matches realistic parent-childdistribution;
created_atuniformly in[now − hours, now]. Keysdon't overlap destination → metrics evaluator rejects every file at
scan-plan time, so both projections return 0 files. Isolates the
planning cost.
from-destination: reads N recent rows from the destination via acreated_atrange scan, used as the source. Keys DO overlap →exercises the file-count reduction and column-read effect.
Results (read-only via
DataScan.plan_files()andto_arrow_batch_reader()):The 146× plan-time win is on a 1 000-disjunct predicate against a
~10k-file table; the original cost scales linearly with table file
count and predicate disjunct count, the augmented cost scales with the
source's
created_atspan instead.The 86% byte reduction is dominated by skipping the
log(JSONpayload) column at the parquet reader — that one column carries ~80%
of the row width on this table.
For a representative larger flush — 1.16M source rows spanning ~24 h
— extrapolating both wins reduces the destination-scan working set
from multiple GiB (which is OOM-territory on 8 GiB worker pods) down
to tens of MiB.
Are there any user-facing changes?
No API change. The optimisation is purely internal to
Transaction.upsert:pyiceberg.table.upsert_utilfortestability but isn't part of the public API.
selected_fields=tuple(join_cols)is passed conditionally inside themethod — no signature change to
Table.upsertorTransaction.upsert.Why a range augmentation rather than reusing
_build_partition_predicate?Transaction._build_partition_predicateplus_determine_partitionstogether could express the same intent — apply each partition
transform to
df, take the distinct partition tuples, and emitOr(And(EqualTo, …), …). It would prune marginally harder forgappy sources (where
[min, max]over-fetches the gap). I pickedthe range approach over that combination for three reasons:
Predicate size. Exact partition match emits one disjunct per
distinct partition value present in
df. For a daily-partitionedtable with a multi-month source the
Orreaches hundreds of nodes— exactly the predicate-bloat shape that motivated Optimize upsert performance for large datasets #2943. The range
approach is 2 nodes per partition column regardless of cardinality
and downstream metrics-evaluator cost scales with that.
Reuse boundary.
_determine_partitionsis bound to the writepath: it filters +
combine_chunksper partition for the writerto consume. Reusing it for read-side planning either wastes that
work or requires lifting the partition-key extraction into a shared
helper — a separable, slightly larger refactor.
Idiomatic projection.
GreaterThanOrEqual(source_col, min)feeds
inclusive_projectionsource-side bounds and lets theexisting
transform.project(...)machinery rewrite them intopartition-column predicates at scan time. That's the same
projection path the rest of pyiceberg uses.
For temporally-dense sources (the dominant upsert pattern: a recent
batch of activity, no internal gaps) the two approaches prune the same
files. For gappy sources the exact-match approach prunes strictly
harder at the cost of a larger predicate. Happy to switch to the
_determine_partitions+_build_partition_predicatecombo ifreviewers prefer that direction — or to factor a thin
partition_records_from_arrow_tablehelper out of_determine_partitionsso both sides can share it.Related
@koenvo), Upserting large table extremely slow #2159(umbrella slow-upsert tracker), Upsert with 1M rows extremely slow due to
create_match_filterandtxn.delete()performance #3129 (recent:create_match_filterapproach — that PR shrinks the row predicate itself; this one adds
partition pruning the row predicate can't trigger on its own. Both
can coexist.
Was generative AI tooling used to co-author this PR?