Skip to content

fix: stop double-counting child CPU in node-with-children Exec plans#6799

Open
brendanclement wants to merge 4 commits into
lance-format:mainfrom
brendanclement:fix/exec-double-count-child-cpu
Open

fix: stop double-counting child CPU in node-with-children Exec plans#6799
brendanclement wants to merge 4 commits into
lance-format:mainfrom
brendanclement:fix/exec-double-count-child-cpu

Conversation

@brendanclement
Copy link
Copy Markdown

@brendanclement brendanclement commented May 15, 2026

Summary

Fixes #5155.

InstrumentedRecordBatchStreamAdapter measures a node's elapsed_compute by timing its outer poll_next. For an ExecutionPlan node with child inputs, that poll transitively polls every child — so EXPLAIN ANALYZE shows the parent's CPU as parent + child + grandchild + ..., and each ancestor double-counts its descendants.

This PR fixes five nodes that have child inputs and were using the broken wrapper. The fix takes two shapes depending on the node:

  1. Four nodes with a clean "child input -> per-batch transform -> output" shape are converted to use a new helper, InstrumentedChildInputStream<F, Fut> (in rust/lance/src/io/exec/utils.rs), modeled on DataFusion's FilterExecStream. The helper pulls from a child input without the timer running, then drives a per-batch async transform with the timer running.

  2. FlatMatchQueryExec doesn't fit that shape — its work all happens inside flat_bm25_search_stream, which consumes the child input and spawn_cpus the per-batch tokenize/count work internally. For this node, the fix instruments flat_bm25_search_stream directly so it can report CPU time on a metric handle supplied by the caller.

Nodes fixed

Node File Mechanism
AddRowAddrExec rust/lance/src/io/exec/rowids.rs helper
MapIndexExec rust/lance/src/io/exec/scalar_index.rs helper
FlatMatchFilterExec rust/lance/src/io/exec/fts.rs helper
KNNVectorDistanceExec rust/lance/src/io/exec/knn.rs helper + caller-side Instant::now() around compute_distance(...).await to capture spawn_blocking work
FlatMatchQueryExec rust/lance/src/io/exec/fts.rs + rust/lance-index/src/scalar/inverted/index.rs new flat_bm25_search_stream_with_metrics that records CPU on a supplied Time handle

For KNNVectorDistanceExec, this also addresses the spawned-CPU undercount from #5155: the helper's timer doesn't measure work happening on spawn_blocking worker threads. KNN's transform closure wraps compute_distance(...).await with Instant::now() and adds the elapsed duration to elapsed_compute from the caller side. compute_distance's public signature is unchanged.

For FlatMatchQueryExec, the analogous mechanism lives in lance-index: flat_bm25_search_stream_with_metrics accepts an Option<Time> and records CPU around both the spawn_cpu body in tokenize_and_count (phase 1) and the synchronous initialize_scorer + flat_bm25_score (phase 2). The original flat_bm25_search_stream is preserved as a #[deprecated] thin wrapper that delegates with None — no breaking API change.

Test plan

Unit tests

  • utils.rs::instrumented_child_input_stream_excludes_child_poll_time: a child stream sleeps 60ms per poll_next; the transform sleeps 30ms. The helper's elapsed_compute should be ~3 × 30ms, not ~3 × 90ms. Verified to fail on the pre-fix implementation.
  • utils.rs::instrumented_child_input_stream_propagates_child_error: an error mid-stream from the child propagates through the helper without losing preceding OK batches.
  • fts.rs::test_flat_match_filter_find_matches_large_utf8: exercises the i64-offset specialization of find_matches (production path for long text columns).
  • lance-index/scalar/inverted/index.rs::flat_bm25_search_stream_with_metrics_records_elapsed_compute: runs _with_metrics with Some(time), asserts time.value() > 0 after drain.

Existing tests

cargo test -p lance --lib io::exec::utils io::exec::knn io::exec::rowids io::exec::scalar_index io::exec::fts
cargo test -p lance-index --lib scalar::inverted::index

All pass. cargo fmt --all --check and cargo clippy -p lance --no-deps are clean for changed files.

End-to-end EXPLAIN ANALYZE before/after

1M-row synthetic dataset with BTREE + INVERTED + IVF_PQ indexes, comparing elapsed_compute between this branch's parent (pre-fix) and the branch tip:

Scenario Node Before After Change
KNN + BTREE scalar prefilter KNNVectorDistance (493 rows out) 623µs 32µs ~20× smaller
KNN + FTS post-filter FlatMatchFilter (0 rows out) 102µs 38µs ~2.7× smaller
KNN + FTS prefilter FlatMatchQuery (2.45K rows out) 932µs 116ms was severely under-counted (spawn_cpu worker CPU was invisible to the poll-based timer the new value reflects real cross-thread CPU and naturally exceeds wall-clock when work is parallel)
KNN + FTS prefilter (control) MatchQuery unchanged unchanged noise only

The first two rows mirror the issue's described shape: heavy synchronous upstream inflating a downstream node's metric. The third row demonstrates the spawned-CPU half of the fix — FlatMatchQuery's tokenize/score work was nearly invisible pre-fix and is now correctly attributed.

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added the bug Something isn't working label May 15, 2026
@brendanclement
Copy link
Copy Markdown
Author

@claude review

1 similar comment
@brendanclement
Copy link
Copy Markdown
Author

@claude review

@wjones127
Copy link
Copy Markdown
Contributor

@claude review once

Comment thread rust/lance/src/io/exec/fts.rs Outdated
Comment on lines +746 to +749
let result = do_work();
async move { result }
},
1,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 FlatMatchFilterExec's transform closure runs all the filter work synchronously before returning an already-ready future, so tokenization / find_matches / filter_record_batch all execute during the untimed input-pulling phase of InstrumentedChildInputStream::poll_next and never contribute to elapsed_compute. The other three converted nodes correctly put their work inside the async block. Fix: change let result = do_work(); async move { result } to async move { do_work() } so the work is driven during the timed in_flight.poll_next_unpin call.

Extended reasoning...

What the bug is

In rust/lance/src/io/exec/fts.rs (around lines 740-749), the transform closure passed to InstrumentedChildInputStream::new has this shape:

move |batch| {
    let mut do_work = || -> DataFusionResult<RecordBatch> {
        // tokenize, find_matches, filter_record_batch ...
    };
    let result = do_work();      // <-- runs synchronously HERE
    async move { result }        // <-- returns an already-ready future
}

do_work() is invoked eagerly when the closure is called, and the returned async move { result } block does nothing but yield an immediately-Ready future.

Why this defeats the metric

InstrumentedChildInputStream::poll_next (utils.rs:393-422) does two phases per loop iteration:

// Phase 1 — fill in-flight queue. NOT timed.
while !this.input_done && this.in_flight.len() < this.concurrency {
    match this.input.poll_next_unpin(cx) {
        Poll::Ready(Some(Ok(batch))) => {
            this.in_flight.push((this.transform)(batch));   // closure called here
        }
        ...
    }
}

// Phase 2 — drive in-flight futures. TIMED.
let timer = this.baseline_metrics.elapsed_compute().timer();
let poll = this.in_flight.poll_next_unpin(cx);
timer.done();

The closure call (this.transform)(batch) happens in phase 1, before the timer starts. Because the FlatMatchFilterExec closure executes do_work() synchronously at that call site, all the heavy work (tokenization, find_matches, arrow::compute::filter_record_batch) runs in the untimed phase. The future pushed into in_flight is already Ready, so when phase 2 polls it, the timer captures only the nanoseconds of poll overhead.

Cross-check against the other three converted nodes

All three correctly defer work into the async block, so it executes during phase 2:

  • KNNVectorDistanceExec (knn.rs:271-291): async move { ... compute_distance(...).await ... }
  • AddRowAddrExec (rowids.rs:194-213): async move { index_prereq.wait_ready().await?; ... compute_row_addrs(...) }
  • MapIndexExec (scalar_index.rs:316-326): closure returns Self::map_batch(...), which is an async fn — its future captures the work and only runs when polled.

FlatMatchFilterExec is the odd one out.

Step-by-step proof on a concrete batch

Suppose the child yields one 10k-row batch and the FTS tokenizer takes 50µs:

  1. Outer caller calls poll_next on the InstrumentedChildInputStream.
  2. Phase 1: this.input.poll_next_unpin(cx) returns Ready(Some(Ok(batch))).
  3. (this.transform)(batch) is invoked. Inside the closure:
    • do_work() runs find_matches::<i32> over 10k rows → ~50µs of CPU.
    • arrow::compute::filter_record_batch runs.
    • let result = do_work() completes, async move { result } is constructed.
  4. The ready-future is pushed into in_flight. Phase 1 ends. The 50µs is NOT timed.
  5. Phase 2: timer.start(); in_flight.poll_next_unpin(cx) polls the future, which is already Ready(Ok(batch)) → returns immediately. timer.done(). ~10-100ns gets attributed to elapsed_compute.

This matches exactly the PRs benchmark table: FlatMatchFilter (0 rows out) goes from 98µs to 251ns — a ~400x drop is implausible as a real speedup of tokenization + scoring; it is consistent with the work moving out of the timer.

Impact

Functional correctness is unaffected (filter results are still correct). However the entire point of this PR is to make EXPLAIN ANALYZE elapsed_compute numbers attribute CPU correctly per node. For FlatMatchFilterExec, the post-PR metric will report ~0 CPU time even when tokenization is expensive — the opposite of useful for the PRs stated goal, and now misleading compared to before.

Fix

Move the work inside the async block so it is deferred until phase-2 polling:

move |batch| {
    async move {
        // tokenize, find_matches, filter_record_batch ...
    }
}

tokenizer is captured by &mut from the outer closure, so the simplest fix is to inline the body of do_work directly into the async move { ... } (taking ownership of tokenizer into the closure state and re-binding &mut tokenizer inside the future), mirroring the pattern used by the other three converted nodes.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closure now does the work inside async move with a per-batch tokenizer.box_clone() so it runs during the helper's timed in-flight phase

Comment on lines 421 to 440
partition: usize,
context: Arc<datafusion::execution::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let index_vals = self.input.execute(partition, context)?;
let metrics = Arc::new(IndexMetrics::new(&self.metrics, partition));
let stream_fut = Self::do_execute(
index_vals,
let input = self.input.execute(partition, context)?;
let stream_fut = Self::build_stream(
input,
partition,
self.dataset.clone(),
self.column_name.clone(),
self.index_name.clone(),
metrics,
Arc::new(IndexMetrics::new(&self.metrics, partition)),
self.metrics.clone(),
);
let stream = futures::stream::iter(vec![stream_fut])
.then(|stream_fut| stream_fut)
.try_flatten()
.boxed();
Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new(
let stream = futures::stream::once(stream_fut).try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(
INDEX_LOOKUP_SCHEMA.clone(),
stream,
partition,
&self.metrics,
)))
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Minor metric-coverage regression in MapIndexExec::execute (scalar_index.rs:421-440) and FlatMatchFilterExec::execute (fts.rs:809-815): the new outer wrapper is a plain RecordBatchStreamAdapter, so the setup awaits inside build_stream / build_filter_streamscalar_index_fragment_bitmap + DatasetPreFilter::create_restricted_deletion_mask for MapIndex, and load_tokenizer/load_tokenizer_from_preset_segments for FlatMatchFilter — run before InstrumentedChildInputStream is constructed and are now untimed (previously they were polled inside an InstrumentedRecordBatchStreamAdapter). Practical impact is small (the work is one-shot per execute() and largely I/O), but it would be nice to either document the new semantics in the PR description or wrap setup with a small add_duration so EXPLAIN ANALYZE reflects cold-start cost.

Extended reasoning...

Trace

Before this PRMapIndexExec::execute wrapped its stream in InstrumentedRecordBatchStreamAdapter. That adapter's poll_next runs a timer around polling the inner stream, so the first poll — which drove do_execute's setup awaits (scalar_index_fragment_bitmap + DatasetPreFilter::create_restricted_deletion_mask) — contributed those awaits' synchronous poll work to elapsed_compute. (That same wrapping is what caused the descendant-double-counting bug being fixed here.)

After this PR (scalar_index.rs:421-440) — the outer wrapper is plain RecordBatchStreamAdapter::new (no timer). build_stream is the async future inside stream::once(...).try_flatten(); its body awaits scalar_index_fragment_bitmap(...).await? (line 302) and deletion_mask_fut.await (line 312) before constructing InstrumentedChildInputStream (line 317). Once the helper exists, only in_flight.poll_next_unpin is timed (utils.rs:386-388). The setup poll work is never timed.

The same pattern appears in FlatMatchFilterExec::build_filter_stream (fts.rs:707-720): load_tokenizer / load_tokenizer_from_preset_segments is awaited before the helper is built, and the outer wrapper at fts.rs:809-815 is a plain RecordBatchStreamAdapter.

Step-by-step proof (MapIndexExec, cold cache)

  1. Caller does exec.execute(0, ctx) — returns immediately with the boxed RecordBatchStreamAdapter (no work yet).
  2. Caller polls the returned stream.
  3. stream::once(stream_fut).try_flatten() polls stream_fut = build_stream(...), which awaits scalar_index_fragment_bitmap(...) — this drives object-storage reads of the scalar index manifest. Synchronous poll-time deserialization happens inside this future, while we sit in plain RecordBatchStreamAdapter, which has no timer.
  4. build_stream then awaits DatasetPreFilter::create_restricted_deletion_mask(...) — more reads, more poll-time work, still untimed.
  5. build_stream constructs InstrumentedChildInputStream and returns it; only now does the timer start.
  6. Steps 3-4 are real wall-clock CPU time on the calling thread, but metrics().elapsed_compute() will not reflect them.

Why existing code doesn't prevent it

InstrumentedChildInputStream::poll_next only starts its timer around in_flight.poll_next_unpin (utils.rs:386-388). Anything polled before the helper is constructed is invisible to that timer. The helper's docstring is explicit about this contract: 'elapsed_compute measures only the time spent driving the transform futures — never the time spent polling the child input.' That is consistent and intentional for the child-input axis, but it has the secondary effect of also excluding the setup awaits that happen earlier in the same outer future.

Impact

Bounded:

  • Setup is one-shot per execute(), not per-batch, so it amortizes away on long scans.
  • BaselineMetrics only captures synchronous-poll wall-clock anyway, not async-await wait time. Most setup is I/O — Pending returns were never counted, even before this PR. The lost time is just the CPU portion of poll calls during setup (deserialization, hash construction).
  • The PR's own measurements (FlatMatchFilter 98µs → 251ns) show that almost all of the previously-counted time was double-counted child compute, not setup. So the net change to elapsed_compute for these nodes is overwhelmingly positive.
  • No correctness impact.

But it is a real, unannounced change in metric coverage for nodes the PR explicitly touches. On cold object storage where scalar-index manifest reads and tokenizer opens dominate first-batch latency, EXPLAIN ANALYZE will now show a node consuming wall-clock with near-zero elapsed_compute for the cold-start phase.

Addressing the refutations

The refutations correctly point out that:

  1. The helper's doc comment defines the new semantics.
  2. The author demonstrated awareness by adding a custom elapsed_compute.add_duration(start.elapsed()) for KNN's spawn_blocking distance kernel — they had a clear pattern available and chose not to apply it to setup.
  3. The bug finder itself admits 'Whether this is intentional is unclear.'
  4. Impact is small.

All true, and that's why this is nit, not normal. But the helper's doc speaks only to the child-input axis — it does not say setup is intentionally excluded. The PR description focuses entirely on removing double-counting of descendant compute and doesn't mention that parent-node setup is also no longer counted. A user reading the PR would not learn this. Either a sentence in the PR description, a note in the helper's doc clarifying that setup-before-helper is also excluded, or wrapping setup with the same add_duration pattern used for KNN's spawn_blocking would resolve it.

Fix options (any one, in increasing order of effort)

  1. Document: add a sentence to the PR description and/or helper docstring noting that setup awaits before helper construction are also excluded.
  2. Time setup explicitly: wrap scalar_index_fragment_bitmap + create_restricted_deletion_mask (and load_tokenizer) in let start = Instant::now(); ... ; elapsed_compute.add_duration(start.elapsed()); — same pattern used at knn.rs:283-289 for the spawn_blocking distance kernel.
  3. Restructure: move setup into MapIndexExec::new/FlatMatchFilterExec::new (or a lazy OnceCell like AddRowAddrExec does for row_id_index) so it happens once per node, not per partition execute — sidesteps the timing question entirely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deferred as a Followup in the PR

@codecov
Copy link
Copy Markdown

codecov Bot commented May 15, 2026

@brendanclement brendanclement force-pushed the fix/exec-double-count-child-cpu branch from a6c8ec7 to 868ace9 Compare May 15, 2026 22:25
@brendanclement brendanclement changed the title fix: don't double-count child CPU in node-with-children Exec plans fix: stop double-counting child CPU in node-with-children Exec plans May 18, 2026
Copy link
Copy Markdown
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good! No blocking feedback, just a minor question. I will merge soon.

Comment on lines +309 to +311
/// `concurrency` caps how many transform futures may be in flight at once.
/// Use `1` for sequential transforms; larger values parallelize per-batch
/// work (e.g., KNN distance computation).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: This is pre-existing, but the fact that our streams do multiple CPU-bound things concurrency is generally an anti-pattern. Ideally, our DataFusion nodes would achieve concurrency through partitioning: creating N independent streams. We haven't migrated to that, but I suspect we will in the future.

Comment on lines +2084 to +2085
#[test]
fn test_flat_match_filter_find_matches_large_utf8() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: this a drive-by change fixing coverage? Or was there a bug found?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a coverage gap in the codecov report that got generated. It flagged the LargeUtf8 path as untested and it was working correctly before and now works correctly after we made the PR

@brendanclement brendanclement force-pushed the fix/exec-double-count-child-cpu branch from 21f8b71 to cd4078b Compare May 19, 2026 22:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

The InstrumentedRecordBatchStreamAdapter is often misused and double counts CPU time

2 participants