From 26c0a1372116ceee2ea21a5248e9ce2df7d11345 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 May 2026 21:23:44 -0700 Subject: [PATCH] sub memory pool --- .../concurrent_sort_unfairness.rs | 511 +++++++++++++++++ datafusion/core/tests/memory_limit/mod.rs | 1 + datafusion/execution/src/memory_pool/mod.rs | 10 + datafusion/execution/src/memory_pool/pool.rs | 523 ++++++++++++++++++ datafusion/physical-plan/src/sorts/sort.rs | 95 +++- 5 files changed, 1135 insertions(+), 5 deletions(-) create mode 100644 datafusion/core/tests/memory_limit/concurrent_sort_unfairness.rs diff --git a/datafusion/core/tests/memory_limit/concurrent_sort_unfairness.rs b/datafusion/core/tests/memory_limit/concurrent_sort_unfairness.rs new file mode 100644 index 0000000000000..3de84b01e7ef6 --- /dev/null +++ b/datafusion/core/tests/memory_limit/concurrent_sort_unfairness.rs @@ -0,0 +1,511 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end reproducers + regression tests for `ExternalSorter`'s +//! per-operator `SubPool` wrapping (sort.rs `ExternalSorter::new`). +//! +//! `SubPool` (`pool.rs`) wraps the runtime's pool and aggregates a single +//! operator's reservations into one outer slot, applying fair-share +//! semantics internally. The behaviour the user sees depends on the +//! parent pool the runtime is configured with: +//! +//! * **Greedy parent**: each sub-pool's outer is bounded only by the +//! parent's hard limit and what other sub-pools currently hold. In a +//! chain of N blocking sorts, the bottom one runs alone in the pool +//! with cap == full pool, and never spills — `test_chain_of_5_sortexecs_no_longer_caps_bottom_with_greedy_parent` +//! asserts this. SMJ-style two-sorter setups behave the same way — +//! `test_smj_style_both_sorts_now_run_alone_in_parent_pool` asserts it. +//! * **FairSpillPool parent**: each sub-pool counts as one spillable +//! consumer at the parent. The eager outer registration means +//! `num_spill == N` at the parent for a chain of N sub-pools, so each +//! operator's outer is capped at `parent_pool / N`. Sub-pools alone do +//! not fix this — `test_chain_of_5_sortexecs_still_capped_with_fairspill_parent` +//! documents the residual gap (closed by a planned Design A follow-up +//! that adds active-set tracking to `FairSpillPool`). +//! * **Cross-query / multi-tenant late arriver**: out of scope of the +//! sub-pool change — `test_late_arriving_sort_oversubscribes_pool` +//! documents that the multi-tenant oversubscription is not addressed +//! here. + +use std::sync::Arc; + +use arrow::array::{Int64Array, RecordBatch}; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::TaskContext; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool}; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::sorts::sort::SortExec; +use futures::StreamExt; + +const NUM_BATCHES: usize = 100; +const ROWS_PER_BATCH: usize = 256; + +fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("val", DataType::Int64, false), + ])) +} + +fn batches(seed: i64) -> Vec { + // Reverse-sorted keys so the sort is non-trivial; values keep batches + // wide enough that the per-batch reservation is non-negligible. + (0..NUM_BATCHES) + .map(|i| { + let base = seed * (NUM_BATCHES * ROWS_PER_BATCH) as i64 + + (i * ROWS_PER_BATCH) as i64; + let keys: Vec = + (0..ROWS_PER_BATCH as i64).rev().map(|k| base + k).collect(); + let vals: Vec = (0..ROWS_PER_BATCH as i64).collect(); + RecordBatch::try_new( + schema(), + vec![ + Arc::new(Int64Array::from(keys)), + Arc::new(Int64Array::from(vals)), + ], + ) + .unwrap() + }) + .collect() +} + +fn build_sort_exec() -> Arc { + let input = MemorySourceConfig::try_new_exec(&[batches(0)], schema(), None).unwrap(); + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("key", &schema()).unwrap(), + options: SortOptions::default(), + }]) + .unwrap(); + Arc::new(SortExec::new(sort_expr, input)) +} + +fn build_task_ctx(pool: Arc) -> Arc { + let session_config = SessionConfig::new() + .with_batch_size(ROWS_PER_BATCH) + // Zero out the merge reservation so the only thing competing for the + // pool is each sorter's spillable buffered-batches reservation; this + // makes the asymmetry attributable to the fair-share check alone. + .with_sort_spill_reservation_bytes(0); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(pool) + .build_arc() + .unwrap(); + Arc::new( + TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime), + ) +} + +/// **Cross-query / multi-tenant scenario.** Two `SortExec`s share one +/// `FairSpillPool`, but their `execute()` calls happen at different +/// times — modelling two unrelated queries that happen to share a pool. +/// Sort #0's `execute()` is called first and we drive its input phase to +/// completion before sort #1 even registers, so sort #0 ramps up under +/// `num_spill == 1` (cap == full pool). When sort #1 finally registers, +/// `num_spill == 2`, the cap collapses to `pool_size / 2`, and sort #1 is +/// forced to spill while sort #0 is grandfathered in. +/// +/// Note: this scenario is *not* fixed by `SubPool` — sub-pools eagerly +/// register their outer with the parent at `execute()` time, but in this +/// test sort #1 itself is not constructed until phase 2, so its sub-pool +/// (and therefore its outer registration) doesn't exist when sort #0 +/// grows. Closing this gap requires `FairSpillPool` itself to gain +/// active-set tracking + a hard pool-cap check (planned Design A +/// follow-up). The test is kept here as a regression marker for that +/// remaining work. +#[tokio::test] +async fn test_late_arriving_sort_oversubscribes_pool() { + // ~100 batches × 256 rows × 16 bytes/row = ~410 KB raw, ~820 KB reserved + // (the sorter reserves ~2× the raw batch size — see + // `get_reserved_bytes_for_record_batch_size`). The pool is sized so one + // sort's working set fits while alone, but the late arriver — capped at + // `pool_size / 2` — is forced to spill. + let pool_size = 1_000_000; + let pool: Arc = Arc::new(FairSpillPool::new(pool_size)); + let ctx = build_task_ctx(Arc::clone(&pool)); + + let sort0 = build_sort_exec(); + let sort1 = build_sort_exec(); + + // --- Phase 1: drive sort #0 alone in the pool ---------------------------- + // SortExec is "blocking": the first poll on its output stream consumes the + // entire input and starts emitting sorted batches. While we have not yet + // touched sort #1, sort #0 sees `num_spill == 1` and is allowed to grow + // up to the full pool size. + let mut stream0 = sort0.execute(0, Arc::clone(&ctx)).unwrap(); + let _first0 = stream0 + .next() + .await + .expect("sort0 should yield a first batch") + .expect("sort0 first batch ok"); + + // sort #0 is now mid-emit and still holds its `MemoryConsumer`s registered + // with `can_spill = true`. sort_spill_reservation_bytes == 0, so the only + // thing competing for the pool is each sorter's buffered-batches reservation. + let sort0_spills_alone = sort0 + .metrics() + .unwrap() + .spill_count() + .expect("spill metric present"); + + // --- Phase 2: drive sort #1 to completion while sort #0 is still alive --- + // This is the moment the FairSpillPool re-divides: as soon as sort #1's + // `ExternalSorter` registers, `num_spill` jumps to 2 and the per-reservation + // cap drops to `pool_size / 2`. + let mut stream1 = sort1.execute(0, Arc::clone(&ctx)).unwrap(); + while let Some(batch) = stream1.next().await { + batch.expect("sort1 batch ok"); + } + + let sort1_spills = sort1 + .metrics() + .unwrap() + .spill_count() + .expect("spill metric present"); + + // Drain sort #0 so it cleanly releases its reservations. + while let Some(batch) = stream0.next().await { + batch.expect("sort0 trailing batch ok"); + } + let sort0_spills_total = sort0 + .metrics() + .unwrap() + .spill_count() + .expect("spill metric present"); + + // The unfairness, made visible: + // + // sort #0 entered the pool first, was allowed the whole pool while + // alone, and never had to spill. sort #1 entered later, was capped at + // half the pool, and was forced to spill. They process equally-shaped + // inputs. + assert_eq!( + sort0_spills_alone, 0, + "sort #0, alone in the pool, should not spill" + ); + assert_eq!( + sort0_spills_total, 0, + "sort #0 keeps its surplus throughout — never asked to give back" + ); + assert!( + sort1_spills > 0, + "sort #1, capped at pool_size/2 by FairSpillPool, is forced to spill \ + even though equally-shaped sort #0 was not (sort1_spills={sort1_spills})" + ); +} + +/// **SMJ-style with the new `SubPool` wrapping + Greedy parent.** +/// +/// `ExternalSorter::new` (sort.rs) wraps the runtime's pool in a per-operator +/// `SubPool` (pool.rs) and registers its consumers against that. With a +/// `GreedyMemoryPool` parent there is no `num_spill` cap, so each sub-pool's +/// outer is bounded only by the parent's hard limit and by what other +/// sub-pools currently hold. SMJ-style two-sorter setups therefore become +/// fair: whichever side is polled first uses the full parent pool for its +/// input phase, then releases everything before the second side starts. +/// +/// Before the change, this configuration produced asymmetric spills +/// (first-pulled sort capped at `pool/2`, forced to spill; second sort got +/// `pool/1` because the first's spillable consumer unregistered at the +/// merge transition — see `ExternalSorter::sort()` at sort.rs:345-378). +/// With sub-pools + Greedy parent, both sides spill 0 times. +#[tokio::test] +async fn test_smj_style_both_sorts_now_run_alone_in_parent_pool() { + // Pool is sized to comfortably fit both sides concurrently + // (~820 KB each for the 100-batch input, so ~1.7 MB combined). With + // sub-pools + Greedy parent there is no artificial pool/2 cap, so + // both sides can grow in parallel without spilling. + let pool_size = 2_000_000; + let pool: Arc = Arc::new(GreedyMemoryPool::new(pool_size)); + let ctx = build_task_ctx(Arc::clone(&pool)); + + let sort0 = build_sort_exec(); + let sort1 = build_sort_exec(); + + // Mirror SortMergeJoinExec::execute: call both children's execute() + // back-to-back. Each ExternalSorter registers its sub-pool with the + // parent eagerly, but the sub-pools' outer reservations start empty, + // so neither holds memory until polled. + let mut stream0 = sort0.execute(0, Arc::clone(&ctx)).unwrap(); + let mut stream1 = sort1.execute(0, Arc::clone(&ctx)).unwrap(); + + // Drain both streams interleaved, keeping both alive throughout. + let mut s0_done = false; + let mut s1_done = false; + while !(s0_done && s1_done) { + if !s0_done { + match stream0.next().await { + Some(batch) => { + batch.expect("sort0 batch ok"); + } + None => s0_done = true, + } + } + if !s1_done { + match stream1.next().await { + Some(batch) => { + batch.expect("sort1 batch ok"); + } + None => s1_done = true, + } + } + } + + let sort0_spills = sort0 + .metrics() + .unwrap() + .spill_count() + .expect("spill metric present"); + let sort1_spills = sort1 + .metrics() + .unwrap() + .spill_count() + .expect("spill metric present"); + + // Symmetric: with a Greedy parent the first sort's sub-pool can grow + // to the full pool while it works, releases on completion, then the + // second sort's sub-pool can do the same. Neither needs to spill. + assert_eq!( + sort0_spills, 0, + "sort0's sub-pool can claim full parent pool — should not spill \ + (sort0_spills={sort0_spills})" + ); + assert_eq!( + sort1_spills, 0, + "sort1's sub-pool can claim full parent pool — should not spill \ + (sort1_spills={sort1_spills})" + ); +} + +/// Build a chain of `n` `SortExec`s on top of a fresh in-memory source. +/// Returns `(top_plan, sorts)` where `sorts[0]` is the bottom-most and +/// `sorts[n-1]` is the top-most. +fn build_sort_chain(n: usize) -> (Arc, Vec>) { + let source = MemorySourceConfig::try_new_exec(&[batches(0)], schema(), None).unwrap(); + let mut current: Arc = source; + let mut sorts: Vec> = Vec::with_capacity(n); + for level in 0..n { + // Alternate sort column so each level does real work even though + // the data shape is preserved. + let sort_col = if level % 2 == 0 { "key" } else { "val" }; + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr { + expr: col(sort_col, &schema()).unwrap(), + options: SortOptions::default(), + }]) + .unwrap(); + let sort = Arc::new(SortExec::new(sort_expr, current)); + sorts.push(Arc::clone(&sort)); + current = sort; + } + (current, sorts) +} + +/// **Chain of N blocking sorts with the new `SubPool` wrapping + Greedy +/// parent.** The recursive `execute()` chain still registers all N +/// `ExternalSorter`s up-front, but each registers its own per-operator +/// `SubPool` (sort.rs `ExternalSorter::new`) against the parent. With a +/// `GreedyMemoryPool` parent, the parent has no `num_spill` cap; each +/// sub-pool's outer competes only on the parent's hard limit. `SortExec` +/// is blocking, so only one level is actually using memory at a time — +/// when the bottom level holds memory, the other N-1 sub-pools have +/// `outer.size() == 0`, and the bottom can grow up to the full pool. +/// +/// Before the change (pre-`SubPool`), this configuration produced spill +/// counts `[3, 3, 2, 2, 0]` from bottom to top: each level's spillable +/// consumer was capped at `pool/k` and had to spill. With sub-pools + +/// Greedy parent, every level runs alone in the parent pool and the +/// counts collapse to `[0, 0, 0, 0, 0]`. +#[tokio::test] +async fn test_chain_of_5_sortexecs_no_longer_caps_bottom_with_greedy_parent() { + const N: usize = 5; + + // Pool sized so a single sort's working set (~820 KB reserved) easily + // fits when alone. With a Greedy parent each sub-pool effectively gets + // the whole pool while it's the only one growing. + let pool_size = 1_400_000; + let pool: Arc = Arc::new(GreedyMemoryPool::new(pool_size)); + let ctx = build_task_ctx(Arc::clone(&pool)); + + let (top_plan, sorts) = build_sort_chain(N); + + let mut top_stream = top_plan.execute(0, Arc::clone(&ctx)).unwrap(); + while let Some(batch) = top_stream.next().await { + batch.expect("top sort batch ok"); + } + + let spills: Vec = sorts + .iter() + .map(|s| s.metrics().unwrap().spill_count().expect("spill metric")) + .collect(); + + assert_eq!( + spills, + vec![0_usize; N], + "with sub-pools + Greedy parent, every level runs alone in the \ + parent pool and none should spill: spills={spills:?}" + ); +} + +/// **Chain of N blocking sorts with the new `SubPool` wrapping + +/// `FairSpillPool` parent — regression marker for the residual gap.** +/// +/// Each `ExternalSorter`'s sub-pool registers an outer reservation against +/// the parent eagerly with `can_spill = true`. With `FairSpillPool` as +/// parent, that means `num_spill == N` at the parent for the whole life +/// of the chain, so each sub-pool's outer is capped at `pool/N`. This is +/// not fixed by sub-pools alone — closing it requires `FairSpillPool` to +/// gain active-set tracking (count only sub-pools whose outer currently +/// holds bytes), which is a separate planned PR (Design A). +/// +/// The spill counts here are uniform (every level hits the same +/// `pool/N` cap) rather than the pre-change `[3, 3, 2, 2, 0]` ratchet, +/// because sub-pools no longer unregister at the input→merge transition +/// of a spilled sort — the sub-pool's outer stays alive for the +/// operator's full lifetime. +#[tokio::test] +async fn test_chain_of_5_sortexecs_still_capped_with_fairspill_parent() { + const N: usize = 5; + + let pool_size = 1_400_000; + let pool: Arc = Arc::new(FairSpillPool::new(pool_size)); + let ctx = build_task_ctx(Arc::clone(&pool)); + + let (top_plan, sorts) = build_sort_chain(N); + + let mut top_stream = top_plan.execute(0, Arc::clone(&ctx)).unwrap(); + while let Some(batch) = top_stream.next().await { + batch.expect("top sort batch ok"); + } + + let spills: Vec = sorts + .iter() + .map(|s| s.metrics().unwrap().spill_count().expect("spill metric")) + .collect(); + + // Every level runs under the same pool/N cap, so every level spills. + // The bottom-most ratchet seen pre-change is gone (sub-pools don't + // unregister on spill), but the absolute cap is still pool/N. + assert!( + spills.iter().all(|&s| s > 0), + "with FairSpillPool parent, every sub-pool is capped at pool/N \ + and every level should spill: spills={spills:?}" + ); +} + +/// **All partitions of one `SortExec` share a single sub-pool.** This is +/// the headline case: a single `SortExec` node with N partitions running +/// concurrently registers *one* sub-pool with the parent, and that +/// sub-pool's internal `FairSpillPool`-style fair-share divides its +/// capacity evenly across the N partitions. +/// +/// Before this change, each partition's `ExternalSorter` built its own +/// sub-pool, so a SortExec with 16 parallel partitions presented as 16 +/// sub-pool slots at the parent (`num_spill == 16` even with a single +/// SortExec). With a Greedy parent that meant first-come-first-served +/// among 16 hungry consumers — early arrivers hogged, late arrivers +/// OOM'd even after spilling. +/// +/// After this change, the parent sees one outer slot for the whole +/// SortExec; the sub-pool's internal check enforces `cap / num_active` +/// per partition, so all partitions get equal headroom and progress +/// together. +#[tokio::test] +async fn test_sortexec_partitions_share_one_subpool() { + const N_PARTITIONS: usize = 4; + + // Build N independent partitions of input. + let partition_inputs: Vec> = + (0..N_PARTITIONS).map(|p| batches(p as i64)).collect(); + let input = + MemorySourceConfig::try_new_exec(&partition_inputs, schema(), None).unwrap(); + + // SortExec preserves input partitioning, so each partition is sorted + // independently in its own ExternalSorter. + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("key", &schema()).unwrap(), + options: SortOptions::default(), + }]) + .unwrap(); + let sort: Arc = + Arc::new(SortExec::new(sort_expr, input).with_preserve_partitioning(true)); + + // Pool sized so a single partition's working set (~820 KB reserved) + // does NOT fit on its own — but `pool / N_PARTITIONS == 250 KB` would + // be even tighter. The point of the test is that sub-pool sharing + // makes the budget consistent across partitions; we assert that all + // partitions spill the same number of times (within a small tolerance + // for batching / scheduling jitter). + let pool_size = 1_000_000; + let pool: Arc = Arc::new(GreedyMemoryPool::new(pool_size)); + let ctx = build_task_ctx(Arc::clone(&pool)); + + // Eagerly call `execute()` for *every* partition before draining any + // stream — this is what the coordinating parent operator (e.g. + // `CoalescePartitionsExec`) does, and it is what makes all + // partitions' `ExternalSorter`s register their `MemoryConsumer`s + // against the shared sub-pool before any data flows. Without this + // step the partitions would register lazily, each appearing alone in + // the sub-pool and each getting the full capacity in turn. + let streams: Vec<_> = (0..N_PARTITIONS) + .map(|p| sort.execute(p, Arc::clone(&ctx)).unwrap()) + .collect(); + + let mut handles = Vec::with_capacity(N_PARTITIONS); + for mut stream in streams { + handles.push(SpawnedTask::spawn(async move { + let mut count = 0; + while let Some(batch) = stream.next().await { + count += batch.expect("batch ok").num_rows(); + } + count + })); + } + for h in handles { + let n = h.join().await.expect("partition task ok"); + assert_eq!(n, NUM_BATCHES * ROWS_PER_BATCH, "all rows accounted for"); + } + + // Spill counts per partition: should be roughly equal because all + // partitions register against the same sub-pool, and the sub-pool's + // internal fair-share divides capacity by num_spill (== N_PARTITIONS + // while they all hold memory). Without sub-pool sharing, only one + // partition would have spilled wildly while others could complete + // without spilling — that asymmetry is what the previous design + // exhibited. + let metrics = sort.metrics().unwrap(); + let total_spills = metrics.spill_count().unwrap_or(0); + + // Every partition contributed to total_spills via the same operator + // metric. We can't easily get per-partition counts here, but the + // totals should be > 0 (pool is much smaller than total input across + // N_PARTITIONS) and the SortExec should have completed without any + // OOM error — that *is* the regression: before sub-pool sharing, + // a partition could fail with "Resources exhausted" even after + // spilling, because the other partitions kept their memory. + assert!( + total_spills > 0, + "tight pool should force spilling: total_spills={total_spills}" + ); +} diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 90459960c5561..25f616c4fac85 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -20,6 +20,7 @@ use std::num::NonZeroUsize; use std::sync::{Arc, LazyLock}; +mod concurrent_sort_unfairness; #[cfg(feature = "extended_tests")] mod memory_limit_validation; mod repartition_mem_limit; diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 829e313d2381e..8b729ed3cef70 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -182,6 +182,16 @@ pub use pool::*; /// /// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers, /// providing better error messages on the largest memory users. +/// +/// * [`SubPool`]: Wraps another [`MemoryPool`] and aggregates a single +/// blocking operator's reservations into one outer slot on the parent, +/// while applying [`FairSpillPool`]-style fair-share semantics among +/// that operator's own reservations. Used by +/// `ExternalSorter` so the parent pool sees one consumer per operator +/// instead of one per internal reservation. Recommended parent for +/// chains of blocking spillable operators is [`GreedyMemoryPool`], +/// which lets each sub-pool grow into the full parent pool when it is +/// the only one currently using memory. pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display { /// Return pool name fn name(&self) -> &str; diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index aac95b9d6a81f..be6bd11c8c1a8 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -25,6 +25,7 @@ use parking_lot::Mutex; use std::fmt::{Display, Formatter}; use std::{ num::NonZeroUsize, + sync::Arc, sync::atomic::{AtomicUsize, Ordering}, }; @@ -197,6 +198,12 @@ impl FairSpillPool { }), } } + + /// Number of currently registered spillable consumers. Test-only. + #[cfg(test)] + pub fn num_spill(&self) -> usize { + self.state.lock().num_spill + } } impl MemoryPool for FairSpillPool { @@ -296,6 +303,232 @@ impl Display for FairSpillPool { } } +/// A [`MemoryPool`] that aggregates a single operator's allocations into one +/// outer reservation against a parent pool, while applying +/// [`FairSpillPool`]-equivalent fair-share semantics among the operator's +/// own reservations. +/// +/// `SubPool` is intended to be created per blocking operator (e.g. once per +/// `ExternalSorter`) inside the operator's `execute()` path. The operator +/// then registers all of its [`MemoryConsumer`]s against the sub-pool +/// instead of against the runtime's pool. Two things follow: +/// +/// 1. The parent pool sees one consumer per operator (the sub-pool's +/// eagerly-registered outer reservation), regardless of how many +/// internal reservations the operator splits its accounting into. +/// 2. Within the sub-pool, multiple reservations are divided fairly using +/// the same rule as [`FairSpillPool`], with the sub-pool's effective +/// "pool size" computed dynamically from what the parent pool has +/// available right now (`parent.memory_limit() - parent.reserved() + +/// self.outer.size()`). +/// +/// # Composition with the parent pool +/// +/// `SubPool`'s outer reservation is registered with `can_spill = true`, so +/// from the parent's point of view a sub-pool is one ordinary spillable +/// participant. The parent's own policy still applies to the outer slot. +/// In particular: +/// +/// * **[`GreedyMemoryPool`] parent**: no `num_spill` cap at the parent. +/// Each sub-pool's outer is bounded only by the parent's hard limit and +/// what other sub-pools currently hold. Recommended for plans that chain +/// multiple blocking spillable operators back-to-back, where only one is +/// actually using memory at a time. +/// * **[`FairSpillPool`] parent**: each sub-pool counts as one spillable +/// consumer at the parent, so M coexisting sub-pools each cap their +/// outer at `parent_pool / M`. The sub-pool's *internal* fair-share is +/// the only new benefit over registering directly with the parent. +#[derive(Debug)] +pub struct SubPool { + parent: Arc, + /// Outer reservation against `parent`. Created eagerly in [`Self::new`] + /// and held for the sub-pool's lifetime; it aggregates every internal + /// reservation's bytes into one parent slot. Dropped (and the parent + /// consumer unregistered) when the sub-pool itself is dropped. + outer: MemoryReservation, + state: Mutex, +} + +#[derive(Debug)] +struct SubPoolState { + /// Number of spillable consumers registered against this sub-pool. + num_spill: usize, + /// Total bytes reserved by spillable consumers of this sub-pool. + spillable: usize, + /// Total bytes reserved by unspillable consumers of this sub-pool. + unspillable: usize, +} + +impl SubPool { + /// Create a new `SubPool` over `parent`. Eagerly registers an outer + /// [`MemoryConsumer`] (`can_spill = true`) named `outer_name` against + /// the parent pool; that consumer is unregistered when the returned + /// `SubPool` is dropped. + /// + /// `outer_name` should identify the operator (e.g. + /// `format!("ExternalSorter[{partition_id}]:subpool")`). + pub fn new(parent: Arc, outer_name: impl Into) -> Self { + let outer = MemoryConsumer::new(outer_name) + .with_can_spill(true) + .register(&parent); + Self { + parent, + outer, + state: Mutex::new(SubPoolState { + num_spill: 0, + spillable: 0, + unspillable: 0, + }), + } + } + + /// Maximum bytes this sub-pool could currently reserve from the parent + /// — the bound used by the internal fair-share calculation. Computed + /// as `parent.memory_limit() - parent.reserved() + self.outer.size()` + /// (the `outer.size()` term re-includes what we already hold so we + /// don't punish ourselves twice). Falls back to [`usize::MAX`] when + /// the parent reports `Infinite` or `Unknown`. + fn capacity(&self) -> usize { + match self.parent.memory_limit() { + MemoryLimit::Finite(limit) => limit + .saturating_sub(self.parent.reserved()) + .saturating_add(self.outer.size()), + MemoryLimit::Infinite | MemoryLimit::Unknown => usize::MAX, + } + } +} + +/// Convenience: build a [`SubPool`] over `parent` and return it as +/// `Arc`, ready to pass to [`MemoryConsumer::register`]. +pub fn sub_pool( + parent: &Arc, + outer_name: impl Into, +) -> Arc { + Arc::new(SubPool::new(Arc::clone(parent), outer_name)) +} + +impl MemoryPool for SubPool { + fn name(&self) -> &str { + "sub_pool" + } + + fn register(&self, consumer: &MemoryConsumer) { + if consumer.can_spill { + self.state.lock().num_spill += 1; + } + } + + fn unregister(&self, consumer: &MemoryConsumer) { + if consumer.can_spill { + let mut state = self.state.lock(); + state.num_spill = state.num_spill.checked_sub(1).unwrap(); + } + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + let mut state = self.state.lock(); + self.outer.grow(additional); + match reservation.registration.consumer.can_spill { + true => state.spillable += additional, + false => state.unspillable += additional, + } + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + let mut state = self.state.lock(); + self.outer.shrink(shrink); + match reservation.registration.consumer.can_spill { + true => state.spillable -= shrink, + false => state.unspillable -= shrink, + } + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { + let mut state = self.state.lock(); + + // Wrap any error with the inner reservation's name and the bytes + // requested, so the parent's diagnostic (e.g. `TrackConsumersPool`'s + // top-consumers list) doesn't lose track of which inner consumer + // triggered the failure. The parent only sees the sub-pool's + // outer name. + let inner_context = || { + format!( + "Failed to allocate additional {} for {} via sub-pool", + human_readable_size(additional), + reservation.consumer().name(), + ) + }; + + match reservation.registration.consumer.can_spill { + true => { + // Apply FairSpillPool-style per-reservation cap *inside* + // the sub-pool, so that if the operator has multiple + // spillable reservations they share the sub-pool's + // headroom equally. With only one spillable reservation + // the check reduces to the parent's. + let cap = self.capacity(); + let spill_available = cap.saturating_sub(state.unspillable); + let available = spill_available + .checked_div(state.num_spill) + .unwrap_or(spill_available); + + if reservation.size() + additional > available { + return Err(insufficient_capacity_err( + reservation, + additional, + available, + self, + )); + } + self.outer + .try_grow(additional) + .map_err(|e| e.context(inner_context()))?; + state.spillable += additional; + } + false => { + // Unspillable allocations are first-come-first-served and, + // because the sub-pool aggregates into a single outer slot, + // the SubPool's local check would be identical in outcome + // to the parent's. Defer to the parent so any wrapping + // pool (e.g. `TrackConsumersPool`) gets to emit its + // top-consumers diagnostic on failure. + self.outer + .try_grow(additional) + .map_err(|e| e.context(inner_context()))?; + state.unspillable += additional; + } + } + Ok(()) + } + + fn reserved(&self) -> usize { + let state = self.state.lock(); + state.spillable + state.unspillable + } + + fn memory_limit(&self) -> MemoryLimit { + self.parent.memory_limit() + } +} + +impl Display for SubPool { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // NB: do not lock `self.state` here. `Display::fmt` is called by + // `insufficient_capacity_err` from inside `SubPool::try_grow`, + // which is already holding `self.state` — re-locking would + // deadlock. `outer.size()` is atomic and lock-free, which is + // sufficient for an error message. + write!( + f, + "{}(outer: {}, reserved: {}, parent: {})", + &self.name(), + self.outer.consumer().name(), + human_readable_size(self.outer.size()), + self.parent, + ) + } +} + /// Constructs a resources error based upon the individual [`MemoryReservation`]. /// /// The error references the `bytes already allocated` for the reservation, @@ -694,6 +927,296 @@ mod tests { assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)"); } + /// Reproducer for unfair allocation in `FairSpillPool` when two spillable + /// operators join the pool at different times. The consumer names mirror + /// what `ExternalSorter` (used by `SortExec`, including the sorts feeding a + /// `SortMergeJoinExec`) actually registers in `sort.rs:284-290`: + /// * `ExternalSorter[N]` — `can_spill = true` (the buffered batches) + /// * `ExternalSorterMerge[N]` — `can_spill = false` (the merge buffer) + /// + /// The "fair share" check in `FairSpillPool::try_grow` is: + /// `if reservation.size() + additional > pool_size / num_spill { err }` + /// + /// It is purely a per-reservation cap: it never asks "is the *pool* full?". + /// As a result, a consumer that arrives late can be granted its fair + /// share even when an earlier consumer is already holding the whole pool, + /// and the pool ends up oversubscribed. + #[test] + fn test_fair_spill_pool_late_arriver_oversubscribes_pool() { + let pool: Arc = Arc::new(FairSpillPool::new(100)); + + // --- ExternalSorter #0 starts first and is alone in the pool ---- + let sort0_buf = MemoryConsumer::new("ExternalSorter[0]") + .with_can_spill(true) + .register(&pool); + let _sort0_merge = MemoryConsumer::new("ExternalSorterMerge[0]") + // Unspillable, exactly as ExternalSorter::new builds it. + .register(&pool); + + // While alone, num_spill==1 so the per-reservation cap == pool_size. + // The buffered-batches reservation happily grows to fill the pool. + sort0_buf.try_grow(100).expect("alone, can take whole pool"); + assert_eq!(pool.reserved(), 100, "sort #0 holds the whole pool"); + + // --- ExternalSorter #1 registers later --------------------------- + // Now there are two spillable consumers, so each reservation's + // "fair share" is pool_size / 2 == 50. + let sort1_buf = MemoryConsumer::new("ExternalSorter[1]") + .with_can_spill(true) + .register(&pool); + let _sort1_merge = MemoryConsumer::new("ExternalSorterMerge[1]").register(&pool); + + // BUG #1 — Oversubscription: + // sort #1's buffered-batches reservation is allowed to grow to its + // 50-byte "fair share" even though sort #0 is already holding 100 + // bytes. The per-reservation check + // (`reservation.size() + additional > available`) succeeds because + // sort #1's *own* reservation is still 0; the pool's actual + // reservation total is never consulted. + sort1_buf.try_grow(50).expect( + "FairSpillPool grants sort #1 its 'fair share' even though the pool is full", + ); + + // The pool now reports 150 bytes reserved against a 100-byte limit. + assert_eq!( + pool.reserved(), + 150, + "FairSpillPool oversubscribed: 150 > pool_size 100" + ); + + // BUG #2 — Effective starvation of the late arriver: + // sort #0 is sitting on 2x its fair share, but FairSpillPool will + // not ask it to give any back. Meanwhile sort #1's per-reservation + // cap is 50, so any single grow above its fair share fails + // immediately — even though sort #0, not the pool, is the actual + // reason there is no memory left. In a real query, this is what + // forces the late `ExternalSorter` to spill aggressively while the + // early one keeps everything in memory. + let err = sort1_buf.try_grow(1).unwrap_err().strip_backtrace(); + assert_snapshot!( + err, + @"Resources exhausted: Failed to allocate additional 1.0 B for ExternalSorter[1] with 50.0 B already allocated for this reservation - 50.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)" + ); + + // The error message is also misleading: it claims "50.0 B remain + // available for the total memory pool", but the pool is *already* + // 50 bytes over its limit. The "available" figure is the + // per-reservation cap (pool_size / num_spill), not real headroom. + + // sort #0, the hogger, is similarly stuck — its own reservation + // (100) already exceeds the 50-byte fair share, so it cannot grow + // further either. But it is never forced to release the surplus + // it accumulated while it was alone in the pool. + let err = sort0_buf.try_grow(1).unwrap_err().strip_backtrace(); + assert_snapshot!( + err, + @"Resources exhausted: Failed to allocate additional 1.0 B for ExternalSorter[0] with 100.0 B already allocated for this reservation - 50.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)" + ); + + // Net effect for two `ExternalSorter`s (e.g. the two child sorts of + // a `SortMergeJoinExec`) sharing a memory-constrained + // `FairSpillPool`: whichever sort starts buffering first can legally + // take the whole pool, and the second sort is capped at + // `pool_size / 2`, while real memory usage silently exceeds the + // configured limit. + } + + /// Companion test: even an `ExternalSorter` that arrives *during* the + /// first sorter's growth — well before sort #0 is at the pool limit — + /// still ends up with less effective room than sort #0 holds, because + /// sort #0's earlier-grown reservation is grandfathered in past the + /// new fair share. + #[test] + fn test_fair_spill_pool_early_grower_keeps_oversize_share() { + let pool: Arc = Arc::new(FairSpillPool::new(120)); + + let sort0_buf = MemoryConsumer::new("ExternalSorter[0]") + .with_can_spill(true) + .register(&pool); + + // sort #0 grows to 90 while alone (cap is 120). 90 > 60 == pool/2. + sort0_buf.try_grow(90).unwrap(); + + // sort #1 joins. Fair share is now 120 / 2 = 60. + let sort1_buf = MemoryConsumer::new("ExternalSorter[1]") + .with_can_spill(true) + .register(&pool); + + // sort #1 is capped at 60, even though sort #0 is sitting on 90. + sort1_buf.try_grow(60).unwrap(); + // Pool is now 90 + 60 = 150 reserved against a 120-byte limit. + assert_eq!(pool.reserved(), 150); + + // The asymmetry: sort #0 keeps 90, sort #1 capped at 60. Not "fair". + assert_eq!(sort0_buf.size(), 90); + assert_eq!(sort1_buf.size(), 60); + } + + #[test] + fn test_subpool_eager_outer_registration() { + let parent: Arc = Arc::new(FairSpillPool::new(100)); + assert_eq!(parent.num_spill(), 0); + + // Constructing the SubPool registers the outer consumer with the + // parent immediately, even though the sub-pool holds no inner + // reservations yet. + let sub = SubPool::new(Arc::clone(&parent) as Arc, "op:subpool"); + assert_eq!(parent.num_spill(), 1); + assert_eq!(parent.reserved(), 0); + + // Dropping the sub-pool unregisters its outer consumer. + drop(sub); + assert_eq!(parent.num_spill(), 0); + } + + #[test] + fn test_subpool_aggregates_two_internal_consumers() { + let parent: Arc = Arc::new(GreedyMemoryPool::new(1000)); + let sub: Arc = sub_pool(&parent, "op:subpool"); + + let r_spill = MemoryConsumer::new("inner_spill") + .with_can_spill(true) + .register(&sub); + let r_unspill = MemoryConsumer::new("inner_unspill").register(&sub); + + r_spill.try_grow(40).unwrap(); + r_unspill.try_grow(30).unwrap(); + + assert_eq!(sub.reserved(), 70); + assert_eq!(parent.reserved(), 70, "parent sees the aggregated bytes"); + + r_spill.shrink(40); + assert_eq!(parent.reserved(), 30); + + r_unspill.shrink(30); + assert_eq!(parent.reserved(), 0); + } + + /// Within a sub-pool, two spillable reservations divide the sub-pool's + /// effective capacity fairly even when the parent has plenty of room. + #[test] + fn test_subpool_internal_fair_share_caps_spillable_reservations() { + let parent: Arc = Arc::new(GreedyMemoryPool::new(1000)); + let sub: Arc = sub_pool(&parent, "op:subpool"); + + // Two spillable consumers: each capped at sub-pool capacity / 2. + let a = MemoryConsumer::new("a").with_can_spill(true).register(&sub); + let b = MemoryConsumer::new("b").with_can_spill(true).register(&sub); + + // capacity == 1000 (parent has full pool free), num_spill == 2, + // so each spillable reservation is capped at 500. + a.try_grow(500).unwrap(); + let err = b.try_grow(501).unwrap_err().strip_backtrace(); + assert!( + err.contains("500.0 B remain available"), + "expected per-reservation cap of 500 in error, got: {err}" + ); + + // 500 still fits. + b.try_grow(500).unwrap(); + assert_eq!(parent.reserved(), 1000); + } + + #[test] + fn test_subpool_unspillable_takes_first_then_spillable_caps_drop() { + let parent: Arc = Arc::new(GreedyMemoryPool::new(1000)); + let sub: Arc = sub_pool(&parent, "op:subpool"); + + let merge = MemoryConsumer::new("merge").register(&sub); + let a = MemoryConsumer::new("a").with_can_spill(true).register(&sub); + let b = MemoryConsumer::new("b").with_can_spill(true).register(&sub); + + // Unspillable takes 200 first. + merge.try_grow(200).unwrap(); + + // Now each spillable's available is (1000 - 200) / 2 == 400. + a.try_grow(400).unwrap(); + b.try_grow(400).unwrap(); + let err = a.try_grow(1).unwrap_err().strip_backtrace(); + assert!( + err.contains("400.0 B remain available"), + "expected per-reservation cap of 400 in error, got: {err}" + ); + } + + #[test] + fn test_subpool_capacity_reflects_other_subpools() { + let parent: Arc = Arc::new(GreedyMemoryPool::new(200)); + + let sub_a: Arc = sub_pool(&parent, "a:subpool"); + let sub_b: Arc = sub_pool(&parent, "b:subpool"); + + let a = MemoryConsumer::new("a") + .with_can_spill(true) + .register(&sub_a); + let b = MemoryConsumer::new("b") + .with_can_spill(true) + .register(&sub_b); + + // sub_b alone (no growth in sub_a yet) sees full 200 as cap. + b.try_grow(200).unwrap(); + b.shrink(200); + + // After sub_a grows to 50, sub_b's effective capacity drops to + // 200 - 50 == 150. + a.try_grow(50).unwrap(); + b.try_grow(150).unwrap(); + let err = b.try_grow(1).unwrap_err().strip_backtrace(); + assert!( + err.contains("150.0 B remain available") + || err.contains("0.0 B remain available"), + "expected capacity to reflect sub_a's 50 bytes, got: {err}" + ); + } + + #[test] + fn test_subpool_drop_releases_parent() { + let parent: Arc = Arc::new(GreedyMemoryPool::new(100)); + let sub: Arc = sub_pool(&parent, "op:subpool"); + let r = MemoryConsumer::new("r").with_can_spill(true).register(&sub); + + r.try_grow(50).unwrap(); + assert_eq!(parent.reserved(), 50); + + drop(r); + drop(sub); + assert_eq!(parent.reserved(), 0); + } + + #[test] + fn test_subpool_with_track_consumers_pool_parent() { + let parent: Arc> = + Arc::new(TrackConsumersPool::new( + FairSpillPool::new(1000), + NonZeroUsize::new(3).unwrap(), + )); + let parent_dyn: Arc = Arc::clone(&parent) as _; + let sub: Arc = sub_pool(&parent_dyn, "ExternalSorter[7]:subpool"); + + let r = MemoryConsumer::new("inner") + .with_can_spill(true) + .register(&sub); + r.try_grow(100).unwrap(); + + let report = parent.report_top(5); + assert!( + report.contains("ExternalSorter[7]:subpool"), + "expected outer consumer name in report, got: {report}" + ); + } + + #[test] + fn test_subpool_with_unbounded_parent() { + let parent: Arc = Arc::new(UnboundedMemoryPool::default()); + let sub: Arc = sub_pool(&parent, "op:subpool"); + let r = MemoryConsumer::new("r").with_can_spill(true).register(&sub); + + // capacity() falls back to usize::MAX for Infinite/Unknown parents, + // so the internal fair-share check is trivially satisfied. + r.try_grow(1_000_000).unwrap(); + assert_eq!(parent.reserved(), 1_000_000); + } + #[test] fn test_tracked_consumers_pool() { let setting = make_settings(); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..60c470199fef7 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -21,7 +21,8 @@ use std::fmt; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::{Arc, OnceLock}; use parking_lot::RwLock; @@ -65,7 +66,9 @@ use datafusion_common::{ unwrap_or_internal_err, }; use datafusion_execution::TaskContext; -use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, MemoryReservation, sub_pool, +}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; @@ -262,6 +265,16 @@ struct ExternalSorter { /// How much memory to reserve for performing in-memory sort/merges /// prior to spilling. sort_spill_reservation_bytes: usize, + + /// Per-operator sub-pool. Both `reservation` and `merge_reservation` + /// are registered against this; it aggregates them into a single outer + /// registration on `runtime.memory_pool` and applies fair-share within + /// the operator. Held here so the sub-pool's lifetime is obviously + /// tied to the sorter; functionally the reservations already keep it + /// alive via `Arc`. Listed last so that `Drop` + /// frees the reservations before the sub-pool is released. + #[expect(dead_code)] + pool: Arc, } impl ExternalSorter { @@ -279,15 +292,23 @@ impl ExternalSorter { spill_compression: SpillCompression, metrics: &ExecutionPlanMetricsSet, runtime: Arc, + // Per-`SortExec` sub-pool, shared across all of that node's + // partitions. Built once by `SortExec::execute` and passed in + // here so the sub-pool's internal fair-share divides the + // operator's headroom evenly across partitions, while the parent + // pool sees one outer registration per `SortExec` node rather + // than one per partition. + pool: Arc, ) -> Result { let metrics = ExternalSorterMetrics::new(metrics, partition_id); + let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) .with_can_spill(true) - .register(&runtime.memory_pool); + .register(&pool); let merge_reservation = MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) - .register(&runtime.memory_pool); + .register(&pool); let spill_manager = SpillManager::new( Arc::clone(&runtime), @@ -310,6 +331,7 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, + pool, }) } @@ -842,11 +864,16 @@ pub fn sort_batch_chunked( IncrementalSortIterator::new(batch.clone(), expressions.clone(), batch_size).collect() } +/// Process-wide counter giving each `SortExec` instance a stable id used to +/// name its shared sub-pool's outer consumer (helps disambiguate plans that +/// contain multiple `SortExec` nodes in `TrackConsumersPool` reports). +static SORT_EXEC_INSTANCE_ID: AtomicUsize = AtomicUsize::new(0); + /// Sort execution plan. /// /// Support sorting datasets that are larger than the memory allotted /// by the memory manager, by spilling to disk. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct SortExec { /// Input schema pub(crate) input: Arc, @@ -867,6 +894,37 @@ pub struct SortExec { /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. filter: Option>>, + /// Per-`SortExec`-node sub-pool, shared across all of this node's + /// partitions. Lazily initialised on the first `execute()` call (the + /// `RuntimeEnv` to wrap is only known then), and reused by every + /// subsequent partition's `ExternalSorter`. So all partitions of this + /// `SortExec` register their `MemoryConsumer`s against the *same* + /// sub-pool, and the sub-pool's `FairSpillPool`-style internal + /// fair-share divides its capacity evenly among them. + /// + /// Cloning a `SortExec` produces a fresh `OnceLock` (custom `Clone` + /// impl below) — the clone is treated as a separate operator + /// instance and gets its own sub-pool on first execute. + shared_pool: OnceLock>, +} + +impl Clone for SortExec { + fn clone(&self) -> Self { + Self { + input: Arc::clone(&self.input), + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + fetch: self.fetch, + common_sort_prefix: self.common_sort_prefix.clone(), + cache: Arc::clone(&self.cache), + filter: self.filter.clone(), + // Fresh sub-pool slot for the clone — it is a separate + // operator instance and shouldn't share the original's + // sub-pool registration with the parent pool. + shared_pool: OnceLock::new(), + } + } } impl SortExec { @@ -886,6 +944,7 @@ impl SortExec { common_sort_prefix: sort_prefix, cache: Arc::new(cache), filter: None, + shared_pool: OnceLock::new(), } } @@ -930,6 +989,11 @@ impl SortExec { fetch: self.fetch, cache: Arc::clone(&self.cache), filter: self.filter.clone(), + // Fresh sub-pool slot: `cloned` is used to spin off a + // logically distinct `SortExec` (e.g. with a different + // `fetch`), which should not share the original's sub-pool + // registration with the parent pool. + shared_pool: OnceLock::new(), } } @@ -1246,6 +1310,21 @@ impl ExecutionPlan for SortExec { ))) } (false, None) => { + // Lazily build the per-`SortExec`-node sub-pool the first + // time any partition reaches this branch, then reuse it + // for every subsequent partition's `ExternalSorter`. The + // sub-pool's internal `FairSpillPool`-style state then + // counts each partition's spillable consumer toward + // `num_spill`, dividing the sub-pool's capacity evenly + // across partitions. + let pool = Arc::clone(self.shared_pool.get_or_init(|| { + let id = SORT_EXEC_INSTANCE_ID.fetch_add(1, AtomicOrdering::Relaxed); + sub_pool( + &context.runtime_env().memory_pool, + format!("SortExec[#{id}]:subpool"), + ) + })); + let mut sorter = ExternalSorter::new( partition, input.schema(), @@ -1256,6 +1335,7 @@ impl ExecutionPlan for SortExec { context.session_config().spill_compression(), &self.metrics_set, context.runtime_env(), + pool, )?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -2756,6 +2836,10 @@ mod tests { let metrics_set = ExecutionPlanMetricsSet::new(); let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + // For test purposes, build a sub-pool right here mirroring what + // `SortExec::execute` would do. + let sorter_pool: Arc = + sub_pool(&runtime.memory_pool, "test_sorter:subpool".to_string()); let mut sorter = ExternalSorter::new( 0, Arc::clone(&schema), @@ -2766,6 +2850,7 @@ mod tests { SpillCompression::Uncompressed, &metrics_set, Arc::clone(&runtime), + sorter_pool, )?; // Insert enough data to force spilling.