fix: stop double-counting child CPU in node-with-children Exec plans#6799
fix: stop double-counting child CPU in node-with-children Exec plans#6799brendanclement wants to merge 4 commits into
Conversation
|
@claude review |
1 similar comment
|
@claude review |
|
@claude review once |
| let result = do_work(); | ||
| async move { result } | ||
| }, | ||
| 1, |
There was a problem hiding this comment.
🔴 FlatMatchFilterExec's transform closure runs all the filter work synchronously before returning an already-ready future, so tokenization / find_matches / filter_record_batch all execute during the untimed input-pulling phase of InstrumentedChildInputStream::poll_next and never contribute to elapsed_compute. The other three converted nodes correctly put their work inside the async block. Fix: change let result = do_work(); async move { result } to async move { do_work() } so the work is driven during the timed in_flight.poll_next_unpin call.
Extended reasoning...
What the bug is
In rust/lance/src/io/exec/fts.rs (around lines 740-749), the transform closure passed to InstrumentedChildInputStream::new has this shape:
move |batch| {
let mut do_work = || -> DataFusionResult<RecordBatch> {
// tokenize, find_matches, filter_record_batch ...
};
let result = do_work(); // <-- runs synchronously HERE
async move { result } // <-- returns an already-ready future
}do_work() is invoked eagerly when the closure is called, and the returned async move { result } block does nothing but yield an immediately-Ready future.
Why this defeats the metric
InstrumentedChildInputStream::poll_next (utils.rs:393-422) does two phases per loop iteration:
// Phase 1 — fill in-flight queue. NOT timed.
while !this.input_done && this.in_flight.len() < this.concurrency {
match this.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
this.in_flight.push((this.transform)(batch)); // closure called here
}
...
}
}
// Phase 2 — drive in-flight futures. TIMED.
let timer = this.baseline_metrics.elapsed_compute().timer();
let poll = this.in_flight.poll_next_unpin(cx);
timer.done();The closure call (this.transform)(batch) happens in phase 1, before the timer starts. Because the FlatMatchFilterExec closure executes do_work() synchronously at that call site, all the heavy work (tokenization, find_matches, arrow::compute::filter_record_batch) runs in the untimed phase. The future pushed into in_flight is already Ready, so when phase 2 polls it, the timer captures only the nanoseconds of poll overhead.
Cross-check against the other three converted nodes
All three correctly defer work into the async block, so it executes during phase 2:
KNNVectorDistanceExec(knn.rs:271-291):async move { ... compute_distance(...).await ... }AddRowAddrExec(rowids.rs:194-213):async move { index_prereq.wait_ready().await?; ... compute_row_addrs(...) }MapIndexExec(scalar_index.rs:316-326): closure returnsSelf::map_batch(...), which is anasync fn— its future captures the work and only runs when polled.
FlatMatchFilterExec is the odd one out.
Step-by-step proof on a concrete batch
Suppose the child yields one 10k-row batch and the FTS tokenizer takes 50µs:
- Outer caller calls
poll_nexton theInstrumentedChildInputStream. - Phase 1:
this.input.poll_next_unpin(cx)returnsReady(Some(Ok(batch))). (this.transform)(batch)is invoked. Inside the closure:do_work()runsfind_matches::<i32>over 10k rows → ~50µs of CPU.arrow::compute::filter_record_batchruns.let result = do_work()completes,async move { result }is constructed.
- The ready-future is pushed into
in_flight. Phase 1 ends. The 50µs is NOT timed. - Phase 2:
timer.start();in_flight.poll_next_unpin(cx)polls the future, which is alreadyReady(Ok(batch))→ returns immediately.timer.done(). ~10-100ns gets attributed toelapsed_compute.
This matches exactly the PRs benchmark table: FlatMatchFilter (0 rows out) goes from 98µs to 251ns — a ~400x drop is implausible as a real speedup of tokenization + scoring; it is consistent with the work moving out of the timer.
Impact
Functional correctness is unaffected (filter results are still correct). However the entire point of this PR is to make EXPLAIN ANALYZE elapsed_compute numbers attribute CPU correctly per node. For FlatMatchFilterExec, the post-PR metric will report ~0 CPU time even when tokenization is expensive — the opposite of useful for the PRs stated goal, and now misleading compared to before.
Fix
Move the work inside the async block so it is deferred until phase-2 polling:
move |batch| {
async move {
// tokenize, find_matches, filter_record_batch ...
}
}tokenizer is captured by &mut from the outer closure, so the simplest fix is to inline the body of do_work directly into the async move { ... } (taking ownership of tokenizer into the closure state and re-binding &mut tokenizer inside the future), mirroring the pattern used by the other three converted nodes.
There was a problem hiding this comment.
The closure now does the work inside async move with a per-batch tokenizer.box_clone() so it runs during the helper's timed in-flight phase
| partition: usize, | ||
| context: Arc<datafusion::execution::TaskContext>, | ||
| ) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> { | ||
| let index_vals = self.input.execute(partition, context)?; | ||
| let metrics = Arc::new(IndexMetrics::new(&self.metrics, partition)); | ||
| let stream_fut = Self::do_execute( | ||
| index_vals, | ||
| let input = self.input.execute(partition, context)?; | ||
| let stream_fut = Self::build_stream( | ||
| input, | ||
| partition, | ||
| self.dataset.clone(), | ||
| self.column_name.clone(), | ||
| self.index_name.clone(), | ||
| metrics, | ||
| Arc::new(IndexMetrics::new(&self.metrics, partition)), | ||
| self.metrics.clone(), | ||
| ); | ||
| let stream = futures::stream::iter(vec![stream_fut]) | ||
| .then(|stream_fut| stream_fut) | ||
| .try_flatten() | ||
| .boxed(); | ||
| Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new( | ||
| let stream = futures::stream::once(stream_fut).try_flatten(); | ||
| Ok(Box::pin(RecordBatchStreamAdapter::new( | ||
| INDEX_LOOKUP_SCHEMA.clone(), | ||
| stream, | ||
| partition, | ||
| &self.metrics, | ||
| ))) | ||
| } | ||
|
|
There was a problem hiding this comment.
🟡 Minor metric-coverage regression in MapIndexExec::execute (scalar_index.rs:421-440) and FlatMatchFilterExec::execute (fts.rs:809-815): the new outer wrapper is a plain RecordBatchStreamAdapter, so the setup awaits inside build_stream / build_filter_stream — scalar_index_fragment_bitmap + DatasetPreFilter::create_restricted_deletion_mask for MapIndex, and load_tokenizer/load_tokenizer_from_preset_segments for FlatMatchFilter — run before InstrumentedChildInputStream is constructed and are now untimed (previously they were polled inside an InstrumentedRecordBatchStreamAdapter). Practical impact is small (the work is one-shot per execute() and largely I/O), but it would be nice to either document the new semantics in the PR description or wrap setup with a small add_duration so EXPLAIN ANALYZE reflects cold-start cost.
Extended reasoning...
Trace
Before this PR — MapIndexExec::execute wrapped its stream in InstrumentedRecordBatchStreamAdapter. That adapter's poll_next runs a timer around polling the inner stream, so the first poll — which drove do_execute's setup awaits (scalar_index_fragment_bitmap + DatasetPreFilter::create_restricted_deletion_mask) — contributed those awaits' synchronous poll work to elapsed_compute. (That same wrapping is what caused the descendant-double-counting bug being fixed here.)
After this PR (scalar_index.rs:421-440) — the outer wrapper is plain RecordBatchStreamAdapter::new (no timer). build_stream is the async future inside stream::once(...).try_flatten(); its body awaits scalar_index_fragment_bitmap(...).await? (line 302) and deletion_mask_fut.await (line 312) before constructing InstrumentedChildInputStream (line 317). Once the helper exists, only in_flight.poll_next_unpin is timed (utils.rs:386-388). The setup poll work is never timed.
The same pattern appears in FlatMatchFilterExec::build_filter_stream (fts.rs:707-720): load_tokenizer / load_tokenizer_from_preset_segments is awaited before the helper is built, and the outer wrapper at fts.rs:809-815 is a plain RecordBatchStreamAdapter.
Step-by-step proof (MapIndexExec, cold cache)
- Caller does
exec.execute(0, ctx)— returns immediately with the boxedRecordBatchStreamAdapter(no work yet). - Caller polls the returned stream.
stream::once(stream_fut).try_flatten()pollsstream_fut=build_stream(...), which awaitsscalar_index_fragment_bitmap(...)— this drives object-storage reads of the scalar index manifest. Synchronous poll-time deserialization happens inside this future, while we sit in plainRecordBatchStreamAdapter, which has no timer.build_streamthen awaitsDatasetPreFilter::create_restricted_deletion_mask(...)— more reads, more poll-time work, still untimed.build_streamconstructsInstrumentedChildInputStreamand returns it; only now does the timer start.- Steps 3-4 are real wall-clock CPU time on the calling thread, but
metrics().elapsed_compute()will not reflect them.
Why existing code doesn't prevent it
InstrumentedChildInputStream::poll_next only starts its timer around in_flight.poll_next_unpin (utils.rs:386-388). Anything polled before the helper is constructed is invisible to that timer. The helper's docstring is explicit about this contract: 'elapsed_compute measures only the time spent driving the transform futures — never the time spent polling the child input.' That is consistent and intentional for the child-input axis, but it has the secondary effect of also excluding the setup awaits that happen earlier in the same outer future.
Impact
Bounded:
- Setup is one-shot per
execute(), not per-batch, so it amortizes away on long scans. BaselineMetricsonly captures synchronous-poll wall-clock anyway, not async-await wait time. Most setup is I/O —Pendingreturns were never counted, even before this PR. The lost time is just the CPU portion of poll calls during setup (deserialization, hash construction).- The PR's own measurements (FlatMatchFilter 98µs → 251ns) show that almost all of the previously-counted time was double-counted child compute, not setup. So the net change to
elapsed_computefor these nodes is overwhelmingly positive. - No correctness impact.
But it is a real, unannounced change in metric coverage for nodes the PR explicitly touches. On cold object storage where scalar-index manifest reads and tokenizer opens dominate first-batch latency, EXPLAIN ANALYZE will now show a node consuming wall-clock with near-zero elapsed_compute for the cold-start phase.
Addressing the refutations
The refutations correctly point out that:
- The helper's doc comment defines the new semantics.
- The author demonstrated awareness by adding a custom
elapsed_compute.add_duration(start.elapsed())for KNN'sspawn_blockingdistance kernel — they had a clear pattern available and chose not to apply it to setup. - The bug finder itself admits 'Whether this is intentional is unclear.'
- Impact is small.
All true, and that's why this is nit, not normal. But the helper's doc speaks only to the child-input axis — it does not say setup is intentionally excluded. The PR description focuses entirely on removing double-counting of descendant compute and doesn't mention that parent-node setup is also no longer counted. A user reading the PR would not learn this. Either a sentence in the PR description, a note in the helper's doc clarifying that setup-before-helper is also excluded, or wrapping setup with the same add_duration pattern used for KNN's spawn_blocking would resolve it.
Fix options (any one, in increasing order of effort)
- Document: add a sentence to the PR description and/or helper docstring noting that setup awaits before helper construction are also excluded.
- Time setup explicitly: wrap
scalar_index_fragment_bitmap+create_restricted_deletion_mask(andload_tokenizer) inlet start = Instant::now(); ... ; elapsed_compute.add_duration(start.elapsed());— same pattern used atknn.rs:283-289for thespawn_blockingdistance kernel. - Restructure: move setup into
MapIndexExec::new/FlatMatchFilterExec::new(or a lazyOnceCelllikeAddRowAddrExecdoes forrow_id_index) so it happens once per node, not per partition execute — sidesteps the timing question entirely.
There was a problem hiding this comment.
deferred as a Followup in the PR
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
a6c8ec7 to
868ace9
Compare
wjones127
left a comment
There was a problem hiding this comment.
This looks good! No blocking feedback, just a minor question. I will merge soon.
| /// `concurrency` caps how many transform futures may be in flight at once. | ||
| /// Use `1` for sequential transforms; larger values parallelize per-batch | ||
| /// work (e.g., KNN distance computation). |
There was a problem hiding this comment.
note: This is pre-existing, but the fact that our streams do multiple CPU-bound things concurrency is generally an anti-pattern. Ideally, our DataFusion nodes would achieve concurrency through partitioning: creating N independent streams. We haven't migrated to that, but I suspect we will in the future.
| #[test] | ||
| fn test_flat_match_filter_find_matches_large_utf8() { |
There was a problem hiding this comment.
question: this a drive-by change fixing coverage? Or was there a bug found?
There was a problem hiding this comment.
it was a coverage gap in the codecov report that got generated. It flagged the LargeUtf8 path as untested and it was working correctly before and now works correctly after we made the PR
21f8b71 to
cd4078b
Compare
Summary
Fixes #5155.
InstrumentedRecordBatchStreamAdaptermeasures a node'selapsed_computeby timing its outerpoll_next. For anExecutionPlannode with child inputs, that poll transitively polls every child — soEXPLAIN ANALYZEshows the parent's CPU asparent + child + grandchild + ..., and each ancestor double-counts its descendants.This PR fixes five nodes that have child inputs and were using the broken wrapper. The fix takes two shapes depending on the node:
Four nodes with a clean "child input -> per-batch transform -> output" shape are converted to use a new helper,
InstrumentedChildInputStream<F, Fut>(inrust/lance/src/io/exec/utils.rs), modeled on DataFusion'sFilterExecStream. The helper pulls from a child input without the timer running, then drives a per-batch async transform with the timer running.FlatMatchQueryExecdoesn't fit that shape — its work all happens insideflat_bm25_search_stream, which consumes the child input andspawn_cpus the per-batch tokenize/count work internally. For this node, the fix instrumentsflat_bm25_search_streamdirectly so it can report CPU time on a metric handle supplied by the caller.Nodes fixed
AddRowAddrExecrust/lance/src/io/exec/rowids.rsMapIndexExecrust/lance/src/io/exec/scalar_index.rsFlatMatchFilterExecrust/lance/src/io/exec/fts.rsKNNVectorDistanceExecrust/lance/src/io/exec/knn.rsInstant::now()aroundcompute_distance(...).awaitto capturespawn_blockingworkFlatMatchQueryExecrust/lance/src/io/exec/fts.rs+rust/lance-index/src/scalar/inverted/index.rsflat_bm25_search_stream_with_metricsthat records CPU on a suppliedTimehandleFor
KNNVectorDistanceExec, this also addresses the spawned-CPU undercount from #5155: the helper's timer doesn't measure work happening onspawn_blockingworker threads. KNN's transform closure wrapscompute_distance(...).awaitwithInstant::now()and adds the elapsed duration toelapsed_computefrom the caller side.compute_distance's public signature is unchanged.For
FlatMatchQueryExec, the analogous mechanism lives inlance-index:flat_bm25_search_stream_with_metricsaccepts anOption<Time>and records CPU around both thespawn_cpubody intokenize_and_count(phase 1) and the synchronousinitialize_scorer+flat_bm25_score(phase 2). The originalflat_bm25_search_streamis preserved as a#[deprecated]thin wrapper that delegates withNone— no breaking API change.Test plan
Unit tests
utils.rs::instrumented_child_input_stream_excludes_child_poll_time: a child stream sleeps 60ms perpoll_next; the transform sleeps 30ms. The helper'selapsed_computeshould be~3 × 30ms, not~3 × 90ms. Verified to fail on the pre-fix implementation.utils.rs::instrumented_child_input_stream_propagates_child_error: an error mid-stream from the child propagates through the helper without losing preceding OK batches.fts.rs::test_flat_match_filter_find_matches_large_utf8: exercises thei64-offset specialization offind_matches(production path for long text columns).lance-index/scalar/inverted/index.rs::flat_bm25_search_stream_with_metrics_records_elapsed_compute: runs_with_metricswithSome(time), assertstime.value() > 0after drain.Existing tests
All pass.
cargo fmt --all --checkandcargo clippy -p lance --no-depsare clean for changed files.End-to-end EXPLAIN ANALYZE before/after
1M-row synthetic dataset with BTREE + INVERTED + IVF_PQ indexes, comparing
elapsed_computebetween this branch's parent (pre-fix) and the branch tip:KNNVectorDistance(493 rows out)FlatMatchFilter(0 rows out)FlatMatchQuery(2.45K rows out)MatchQueryThe first two rows mirror the issue's described shape: heavy synchronous upstream inflating a downstream node's metric. The third row demonstrates the spawned-CPU half of the fix —
FlatMatchQuery's tokenize/score work was nearly invisible pre-fix and is now correctly attributed.