feat(index): wire aggregate pushdown into the physical optimizer#6831
feat(index): wire aggregate pushdown into the physical optimizer#6831westonpace wants to merge 3 commits into
Conversation
Lands the plan-time and execute-time halves of aggregate pushdown. Not yet wired into the scanner. Plan side (rust/lance-index/src/expression/): - Moves scalar/expression.rs to expression/scalar.rs, paralleling the new expression/aggregate.rs. - AnyAggregateQuery and AggregateQueryParser traits. - AggregateIndexSearch leaf with optional index_name, parsed query, optional per-aggregate filter, and the original SELECT expression. - CountQuery (basic / approx / distinct / approx_distinct) and CountQueryParser. Scalar index trait (rust/lance-index/src/scalar.rs): - ScalarIndex::calculate_aggregate returning a partial-state ArrowScalar; default-error stubs added to btree, bitmap, bloomfilter, inverted, json, label_list, ngram, rtree, zonemap, and the LogicalScalarIndex wrapper. - Re-exports lance_arrow_scalar::ArrowScalar through scalar::. Execute side (rust/lance/src/io/exec/aggregate_index.rs): - AggregateIndexSearchExec emits one partial-state RecordBatch whose schema is the concatenation of state_fields() for each paired AggregateFunctionExpr, so a downstream AggregateExec(Final) consumes it unchanged. - One optional child input — a ScalarIndexExec — supplies a prefilter RowAddrMask. The prefilter load and per-aggregate index loads run in parallel. - Intersects fragment bitmaps across indexed aggregates, materializes the allow list as concrete [0..physical_rows) ranges (avoids the RoaringBitmap::full() inflation in RowAddrTreeMap::Sub), then composes prefilter ∩ fragments_allow − deletion_mask into a single AllowList. - Calls calculate_aggregate per indexed aggregate; falls back to counting the combined mask directly when an aggregate is a non-distinct COUNT without an associated index. - Unit tests cover try_new validation, the Full+Partial count helper, and end-to-end execution with no prefilter, an AllowList prefilter, a BlockList prefilter, and deletions. Also includes aggregate-pushdown-research.md surveying how mature query engines structure aggregate pushdown. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds AggregateIndexPushdown — a PhysicalOptimizerRule that walks the plan
top-down and rewrites COUNT-shaped aggregates into AggregateIndexSearchExec
so they're answered from index metadata + the deletion mask + an optional
scalar-index prefilter, without scanning column data.
Recognized shape:
AggregateExec(Single, aggs=[COUNT(*)], group_by=[])
└── FilteredReadExec { no refine_filter, full_filter only when
index_input is present, no scan range, no
with_deleted_rows, no fragment subset }
Rewritten to:
AggregateExec(Final, aggs=[COUNT(*)], group_by=[])
└── AggregateIndexSearchExec { prefilter_input = index_input }
The outer AggregateExec is dropped to AggregateMode::Final because
AggregateIndexSearchExec emits one row of partial state.
is_count_star is intentionally conservative: function name == "count",
not distinct, single non-null Literal argument. Anything else (COUNT(col)
with a column ref, DISTINCT, FILTER (WHERE), GROUP BY, residual filter,
scan range, with_deleted_rows, fragment subset) leaves the existing scan
path untouched.
Registered first in get_physical_optimizer so generic rules don't see
the rewritten subtree.
Tests (4, driving the rule end-to-end through Scanner::create_plan):
- rule_fires_on_unfiltered_count_star
- rule_fires_when_filter_fully_indexed (BTree filter pushdown)
- rule_skips_when_filter_needs_refine (unindexed column residual)
- rule_skips_count_with_group_by
Existing count_rows tests (3) and aggregate_index exec tests (7) all
continue to pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@claude review once |
| let mut fragments_allow = RowAddrTreeMap::new(); | ||
| for frag_id in fragments_covered.iter() { | ||
| let frag = frag_map.get(&frag_id).ok_or_else(|| { | ||
| Error::internal(format!( | ||
| "AggregateIndexSearchExec: fragment {} not in manifest", | ||
| frag_id | ||
| )) | ||
| })?; | ||
| let physical = frag.physical_rows.ok_or_else(|| { | ||
| Error::internal(format!( | ||
| "AggregateIndexSearchExec: physical_rows missing for fragment {}", | ||
| frag_id | ||
| )) | ||
| })?; | ||
| let mut bitmap = RoaringBitmap::new(); | ||
| bitmap.insert_range(0u32..(physical as u32)); | ||
| fragments_allow.insert_bitmap(frag_id, bitmap); |
There was a problem hiding this comment.
🔴 When the dataset has stable row IDs enabled and at least one deletion (or missing fragment), this rule produces silently wrong COUNT(*) results. fragments_allow is built in row-address space ((frag_id<<32)|offset, lines 358-374), but DatasetPreFilter::create_deletion_mask dispatches to do_create_deletion_mask_row_id for stable-row-id datasets (prefilter.rs:270), which builds an AllowList of the raw stable u64 row IDs. combine_masks AND-s the two AllowLists across mismatched ID spaces, so most fragments drop out of the intersection and the count is dramatically undercounted. Suggest gating the rule on !dataset.manifest.uses_stable_row_ids() until the two masks can be reconciled, since CI does not exercise stable row IDs and this regression would otherwise be invisible.
Extended reasoning...
The bug. AggregateIndexSearchExec::do_execute builds fragments_allow (aggregate_index.rs:358-374) by iterating each covered fragment and inserting bitmap = 0..physical_rows via fragments_allow.insert_bitmap(frag_id, bitmap). RowAddrTreeMap keys entries by the fragment id in the high 32 bits — so this is firmly in row-address space ((frag_id<<32) | offset).
The other side of the AND. DatasetPreFilter::create_deletion_mask (prefilter.rs:226-298) dispatches to do_create_deletion_mask_row_id whenever dataset.manifest.uses_stable_row_ids() is true and any fragment has a deletion file (or is missing). That function (prefilter.rs:130-184) walks each fragment's RowIdSequence, applies its deletion vector, then does allow_list |= RowAddrTreeMap::from(seq.as_ref()). The From<&RowIdSequence> for RowAddrTreeMap impl in rowids.rs:514-545 inserts the raw stable u64 row-ID values via insert_range / insert — these are arbitrary opaque u64s (typically allocated as a monotonic counter starting at 0), not (frag_id<<32) | offset addresses. The OldIndexDataFilter docs in lance-index/src/scalar.rs explicitly call this out: 'stable row IDs ... are opaque and should not be interpreted as encoded row addresses.'
The fatal AND. combine_masks (aggregate_index.rs:284-302) does after_prefilter & (*deletion_mask).clone(), which is AllowList & AllowList = AllowList(a & b) (see RowAddrMask BitAnd in mask.rs). RowAddrTreeMap intersection is keyed by the high-32-bit 'fragment id' bucket. Stable row IDs allocated sequentially from 0 all live in bucket high=0 — collapsing into 'fragment 0' from the treemap's perspective. fragments_allow has buckets for every real fragment id 0..N. Result: only fragment 0's entries can possibly match, and every row in fragments 1..N is silently dropped from the count.
Concrete walkthrough. 4 fragments × 10 rows, stable row IDs allocated 0..39, then row 5 is deleted (creating a deletion file on fragment 0). fragments_covered = {0,1,2,3}. do_create_deletion_mask_row_id produces AllowList with bucket high=0 = {0..40}{5} = 39 IDs. fragments_allow has high=0:{0..10}, high=1:{0..10}, high=2:{0..10}, high=3:{0..10}. Intersection: only bucket high=0 matches, giving {0..10}{5} = 9 entries. count_from_mask returns 9 instead of the correct 39.
Why CI doesn't catch this. All four tests in this PR (rule_fires_on_unfiltered_count_star, rule_fires_when_filter_fully_indexed, rule_skips_when_filter_needs_refine, rule_skips_count_with_group_by) build their datasets via lance_datagen::gen_batch().into_dataset(...) with no enable_stable_row_ids toggle — the default WriteParams have address-style row IDs. No test enables stable row IDs and exercises a deletion, so the bad intersection never fires. The optimizer rule itself in aggregate_index_pushdown.rs is unconditional — there is no uses_stable_row_ids() guard anywhere in either aggregate_index.rs or aggregate_index_pushdown.rs.
Impact. Silent wrong COUNT(*) answers on any stable-row-IDs dataset that has accumulated at least one deletion (or has missing fragments). Both stable row IDs and deletions are well-supported features that users combine. Stable row IDs being arbitrary opaque u64s means the magnitude of the undercount is unpredictable: typically near-zero when IDs are <2^32 and fragments >0 exist.
Fix. Simplest: short-circuit try_rewrite in aggregate_index_pushdown.rs when filtered_read.dataset().manifest().uses_stable_row_ids() is true, falling back to the existing scan path. Proper fix: detect stable row IDs and either build fragments_allow in stable-id space (by iterating each fragment's RowIdSequence) or translate the deletion mask's stable IDs back to row addresses before AND-ing. A test that toggles WriteParams { enable_stable_row_ids: true, .. } plus a dataset.delete("...") step would catch the regression.
| if options.refine_filter.is_some() { | ||
| return Ok(None); | ||
| } | ||
| // A full_filter without an index_input means the filter is evaluated by | ||
| // re-reading every row; not pushdownable. | ||
| if options.full_filter.is_some() && filtered_read.index_input().is_none() { | ||
| return Ok(None); | ||
| } |
There was a problem hiding this comment.
🔴 The pushdown rule fires whenever the filter is fully evaluable by some index (refine_filter is None, index_input is Some) but does not verify the index covers every dataset fragment. When the index has partial coverage (e.g. fragments were appended after the index was built), the rewritten plan silently drops matching rows in unindexed fragments — SELECT COUNT(*) WHERE indexed_col < N then undercounts. The original FilteredReadExec path handles this case correctly by falling back to a full filter scan on unindexed fragments; the rewrite needs to either require full index coverage or merge in a scan path for unindexed fragments.
Extended reasoning...
What the bug is
AggregateIndexPushdown::try_rewrite at rust/lance/src/io/exec/aggregate_index_pushdown.rs:117-124 gates pushdown on options.refine_filter.is_none() plus the implication full_filter.is_some() ⇒ index_input.is_some(). Those checks tell us only that the filter expression is fully evaluable by some scalar index on the column. They say nothing about whether the index actually covers every fragment in the dataset.
FilterPlan is built column-level, not fragment-level: IndexInformationProvider::get_index (lance-index/src/scalar/expression.rs) sets index_query=Some whenever a column has any usable index. A common, realistic scenario where this diverges from full coverage: an index is created over the dataset, then new fragments are appended without re-indexing. The new fragments are unindexed but the planner still attaches an index_input for the filter.
Why the original plan is correct
FilteredReadExec handles partial coverage at runtime. In filtered_read.rs:599-614, when evaluated_index.applicable_fragments does not contain a fragment, the code falls back to options.full_filter for that fragment, and apply_index_to_fragment (filtered_read.rs:712-748) adds the entire fragment to fragments_to_read so it gets scanned with the filter applied. Matching rows in unindexed fragments are counted correctly.
Why the rewrite is wrong
The rewrite drops the fallback. It feeds the same ScalarIndexExec as prefilter_input. The ScalarIndexExec row-address mask, by construction, only contains addresses from fragments the index covers — the underlying ScalarIndex::search can't produce row addresses for fragments it doesn't index. Meanwhile, AggregateIndexSearchExec::do_execute (aggregate_index.rs:330-355) sees that every AggregateIndexSearch has index_name: None (the rewrite hardcodes this for COUNT) and falls back to fragments_covered = dataset.fragments().map(|f| f.id).collect::<RoaringBitmap>() — i.e. all dataset fragments. fragments_allow is built over all those fragments, and combine_masks does AllowList(all-fragments) & prefilter_mask. The prefilter has no rows from unindexed fragments, so those rows are silently dropped from the final count.
Step-by-step proof
- Create a dataset with 5 fragments × 10 rows, column
xcontaining values 0..50, and build a BTree index onx. At this point the index's fragment bitmap covers fragments 0-4. - Append a new fragment 5 containing 10 more rows (values 50..60) without re-indexing. The index still covers only fragments 0-4.
scalar_index_fragment_bitmapreturns a bitmap over fragments 0-4. - Run
SELECT COUNT(*) FROM t WHERE x < 100. Correct answer: 60. - Planner builds a
FilterPlanwithindex_query = Some(...)(a BTree exists on the column) andrefine_expr = None(the BTree fully evaluates<).FilteredReadExecis constructed withfull_filter = Some(x<100)andindex_input = Some(ScalarIndexExec). try_rewritechecks pass:refine_filterisNone,full_filterisSomeandindex_inputisSome, no scan range, nowith_deleted_rows, no fragment subset, count-shaped agg, no group by. Rule fires.- At execute time,
ScalarIndexExecreturns a row-addressAllowListcovering matching rows in fragments 0-4 only (50 rows). AggregateIndexSearchExec::do_executeseesindex_name: None, falls back tofragments_covered= all 6 fragments.fragments_allowis built asAllowListover fragments 0-5 (60 row addresses).combine_masksANDs the two:AllowList(fragments 0-5) & AllowList(fragments 0-4)=AllowList(fragments 0-4)(50 rows).- Final count returned: 50 (wrong — correct answer is 60).
In the original scan path, fragment 5 would have been read in full with full_filter applied during the scan and counted correctly.
How to fix
Two viable options:
- Require the index bitmap to equal the dataset fragment bitmap before firing.
FilteredReadExecalready exposes the dataset and we can compare againstscalar_index_fragment_bitmapfor the relevant index. Conservative — refuses pushdown in partial-coverage cases — but matches the spirit of the PR's safety envelope. - Plumb a scan path for unindexed fragments through
AggregateIndexSearchExec. Keep pushdown, but for fragments not in the index bitmap, fall back to a per-fragment count + filter (the existing scan path) and add it to the partial count. More work but doesn't lose the win.
Either way, the test rule_fires_when_filter_fully_indexed does not catch this because create_index runs after all 4 fragments are committed, so the bitmap covers everything. Adding a regression test that creates the index, appends a fragment, and asserts the count includes the new fragment would have caught this.
| @@ -0,0 +1,225 @@ | |||
| # Aggregate Pushdown in Mature Query Engines | |||
There was a problem hiding this comment.
🟡 The PR adds a 225-line research/design document, aggregate-pushdown-research.md, at the repository root. It is a one-off survey of aggregate-pushdown techniques in other engines (Postgres, DuckDB, SQL Server, Iceberg, etc.), not referenced by any code or other docs, and contains TODO-style notes ("Things I Couldn't Pin Down Authoritatively") plus links to external PRs that will rot over time. Consider dropping it from the PR — or, if it should live in-tree as design rationale, moving it under docs/ or rfcs/ rather than the repo root, where it shows up in directory listings and IDE indexes.
Extended reasoning...
What the file is
The PR includes a new file, aggregate-pushdown-research.md, added at the repo root with 225 lines (~33 KB). It is a survey/taxonomy of how other query engines (Postgres MinMaxAggPath, DuckDB zonemaps, SQL Server columnstore aggregate pushdown, ClickHouse projections, Druid bitmap indexes, Pinot star-trees, Snowflake micro-partitions, Iceberg PR #6622, Spark V2 SupportsPushDownAggregates, DataFusion AggregateStatistics) handle aggregate pushdown. It includes an executive summary, taxonomy diagrams, per-engine sections, an index→aggregate matrix, planner-integration patterns, and a list of correctness gotchas.
Why this is a concern
The file is not referenced anywhere — no source file, no other doc, no build script. It is a standalone planning artifact. The internal Claude/repo guidance (which the assistant-generated PR cites in its own commit trailer) is explicit: "Do not create planning, decision, or analysis documents unless the user asks — work from conversation context, not intermediate files." This kind of design rationale typically belongs in the PR description, an internal wiki, or an rfcs//docs/ subdirectory — not at the repo root, which is reserved for top-level navigational files (README.md, CLAUDE.md, Cargo.toml, etc.).
A few concrete downsides of leaving it at the root:
- Discoverability noise. It will appear in directory listings, IDE file pickers, and GitHub's repo home page right next to
README.md, where new contributors may mistake it for an authoritative reference document. - Maintenance debt. The file links to specific PR numbers (Iceberg fix(json): detect float64-stored numbers in json type extraction #6622, DataFusion #19938) and contains a section literally titled "Things I Couldn't Pin Down Authoritatively" with TODO-style notes. These will rot as the surveyed engines evolve, and nobody owns updating them.
- Indexing surface. It gets indexed by search/IDE tooling, generating false hits for terms like
MIN/MAX,COUNT(DISTINCT), etc., when contributors are searching for actual Lance code.
Step-by-step proof
- Check
git log --oneline -- aggregate-pushdown-research.mdon this branch: the file is introduced by this PR and has no prior history. grep -r "aggregate-pushdown-research" rust/ docs/ python/returns no hits — nothing in the codebase or docs references it.- The file's own opening line reads: "Background research for the Lance
feat-aggregate-pushdownwork." — confirming it is design background, not user/contributor-facing documentation. - Section 7 ("Open Questions / Things I Couldn't Pin Down Authoritatively") explicitly contains unresolved notes ("Worth confirming on
pgsql-hackers", "Need to readsrc/optimizer/in the DuckDB tree directly"), which is the hallmark of an engineer's working document rather than a polished artifact.
Suggested fix
Drop the file from the PR. If the content is genuinely valuable as design rationale, the PR description is the natural home (and is already extensive). Alternatively, move it under a structured location like docs/design/aggregate-pushdown.md or rfcs/0001-aggregate-pushdown.md and trim out the TODO/“couldn't pin down” sections, so future maintainers know who owns it.
Severity
This is a repo-hygiene concern, not a correctness bug. It does not affect any runtime behavior or test outcome and is trivially fixable. Filed as nit so the PR author can decide whether to drop/move the file or leave it in place as intentional design rationale.
CI fixes: - Add SPDX license headers to expression.rs and scalar/expression.rs. - cargo fmt the rule file (3 spots). - Update test_count_star_single_fragment and test_scanner_count_rows in dataset_aggregate.rs to expect the new AggregateExec(Final) → AggregateIndexSearchExec shape now that the rule fires by default. Correctness fixes (both pointed out by automated review on lance-format#6831): - Stable row IDs: DatasetPreFilter::create_deletion_mask returns an AllowList in stable-id space when the dataset uses stable row IDs, but AggregateIndexSearchExec builds its fragments-allow list in row-address space. ANDing across mismatched id spaces undercounts silently. Gate the rule on !manifest.uses_stable_row_ids() until the exec can reconcile the two id spaces. - Partial index coverage: when an index is built and then a fragment is appended, the index's fragment bitmap no longer covers the whole dataset. The original rule fired anyway and silently dropped rows in the unindexed fragments. The proper fix needs an async coverage check that's not expressible in a sync PhysicalOptimizerRule; until we plumb that through, narrow the rule to only fire when there is no filter at all (no full_filter, no refine_filter, no index_input). Unfiltered counts remain correct and still benefit from the rewrite. Both narrowings are documented in the module-level doc and the inline `try_rewrite` comments so a follow-up can lift them once the underlying machinery is in place. Repo hygiene: - Drop aggregate-pushdown-research.md from the repo root. It was a one-off survey not referenced by any code or doc. New regression tests in aggregate_index_pushdown.rs: - rule_skips_with_stable_row_ids — toggles enable_stable_row_ids + delete, asserts the rule does not fire and the count is correct. - rule_skips_partial_index_coverage — builds index over 4 fragments, appends a 5th, runs COUNT(*) WHERE indexed_col < N, asserts the rule does not fire and the count includes the appended fragment. - rule_skips_when_filter_present_even_if_indexed — replaces the old rule_fires_when_filter_fully_indexed; documents that the indexed- filter case is deferred. All 13 aggregate_index* tests pass; cargo check --workspace and cargo clippy -p lance -p lance-index --tests --benches -- -D warnings are clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Summary
Turns on the aggregate-pushdown scaffolding from the previous PR (
AggregateIndexSearchExec+ the plan-time parser) by adding the physical optimizer rule that actually invokes them. After this PR, aSELECT COUNT(*) FROM t(with or without an indexed filter) is answered from index metadata + the deletion mask + the optional scalar-index prefilter — no column scan.Conservative on purpose: every condition that's not provably safe leaves the existing scan path untouched.
Depends on: the foundation PR (
feat-aggregate-pushdown). This PR is a thin layer on top.What this PR adds
AggregateIndexPushdown—rust/lance/src/io/exec/aggregate_index_pushdown.rsA
PhysicalOptimizerRulethat walks the plan top-down. When it sees:it rewrites to:
The outer
AggregateExecis dropped toAggregateMode::FinalbecauseAggregateIndexSearchExecemits one row of partial state.Safety envelope
The rule fires only when every condition below holds. Anything else leaves the scan path alone.
AggregateModeSinglegroup_byAggregateFunctionExprcount, not distinct, single non-nullLiteralargument (i.e.COUNT(*)/COUNT(1))FILTER (WHERE …)FilteredReadExecsiderefine_filterNone(any residual would need to be applied to data)full_filterNone, orSomewithindex_inputalsoSome(i.e. the index fully evaluates the filter)scan_range_before_filter/scan_range_after_filterNone(aLIMIT/OFFSETwould change the count)with_deleted_rowsfalsefragments(subset)Noneis_count_starmatchescount(<literal>)with a non-null literal — coveringCOUNT(*)andCOUNT(1)as DataFusion lowers them, without accidentally acceptingCOUNT(col)(which has different null semantics) orCOUNT(NULL)(which is always 0).Optimizer registration
Added at the front of
get_physical_optimizer()so the rewritten subtree is in place before generic rules (CoalesceTake,SimplifyProjection,LimitPushdown) see the plan.Test plan
All four tests drive the rule end-to-end through
Scanner::create_plan, so the registered optimizer chain actually fires (no direct construction shortcuts).rule_fires_on_unfiltered_count_star—SELECT COUNT(*) FROM ton a 4×10 dataset → 40, plan containsAggregateIndexSearchExec.rule_fires_when_filter_fully_indexed—SELECT COUNT(*) FROM t WHERE ordered < 25(BTree-indexed) → 25, plan containsAggregateIndexSearchExec.rule_skips_when_filter_needs_refine—SELECT COUNT(*) FROM t WHERE unindexed > 5(no index onunindexed) → 34 via the scan path, noAggregateIndexSearchExec.rule_skips_count_with_group_by—GROUP BY ordered→ rule does not fire.Non-regression:
cargo test -p lance --lib io::exec::aggregate_index— 7/7 (foundation tests unchanged).cargo test -p lance --lib test_count— 3/3 (existing count tests unchanged).cargo clippy -p lance -p lance-index --tests --benches -- -D warnings— clean.Out of scope (follow-ups)
FilteredReadExec(e.g. aLanceScanExecdirect child).Dataset::count_rows(None)(the public no-filter fast path) through the optimizer so it also hitsAggregateIndexSearchExecrather than the per-fragment count helper.ScalarIndex::calculate_aggregateon a real index (e.g. exactCOUNT(DISTINCT)from a bitmap-index dictionary) so the rule can drive distinct counts.is_count_starto handleCOUNT(col)when the column's null count is known per fragment (MIN/MAX-style metadata answer).🤖 Generated with Claude Code