From 0e3e263435a3ce53430ee321dd8f1b86d9633ed4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 May 2026 17:38:53 -0700 Subject: [PATCH 1/4] test: Add ignored multi-partition NLJ memory-limited correctness tests Adds five tests reproducing the cross-partition coordination bug in NestedLoopJoinExec's memory-limited fallback path: each output partition independently constructs a per-chunk JoinLeftData with AtomicUsize::new(1), so the left visited bitmap and probe-thread counter are not shared across right partitions. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and LEFT MARK, this causes wrong results when target_partitions > 1 (duplicate unmatched rows; for LEFT SEMI, duplicate matched rows when multiple right partitions match the same left row). The tests are #[ignore]'d for now (with reason) and re-enabled in a follow-up commit that implements the shared-state fix. Discussed in https://github.com/apache/datafusion/pull/21833#discussion_r3172430872 and https://github.com/apache/datafusion/pull/21833#discussion_r3172496633 Co-authored-by: Claude Code --- .../src/joins/nested_loop_join.rs | 250 ++++++++++++++++++ 1 file changed, 250 insertions(+) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index db8c75b4a578b..e08a85eaa43bb 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -3935,4 +3935,254 @@ pub(crate) mod tests { ")); Ok(()) } + + // ======================================================================== + // Multi-partition memory-limited correctness tests + // + // These tests reproduce the cross-partition coordination bug in the + // memory-limited fallback path: each output partition independently + // constructs a per-chunk `JoinLeftData` with `AtomicUsize::new(1)`, + // so left-side visited state is not shared across right partitions. + // For join types that emit unmatched left rows in the final output + // (LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, FULL), this leads to a + // left row being emitted as unmatched by partitions whose right + // input did not match it — even when another partition did match. + // ======================================================================== + + /// Build the right table as one batch per row, so RepartitionExec can + /// distribute rows across multiple output partitions. + fn build_right_table_one_batch_per_row() -> Arc { + build_table( + ("a2", &vec![12, 2, 10]), + ("b2", &vec![10, 2, 10]), + ("c2", &vec![40, 80, 100]), + Some(1), + Vec::new(), + ) + } + + /// Run a NLJ across 4 right partitions under a tight memory limit, so + /// every output partition takes the memory-limited fallback path. The + /// right side is shuffled via `RepartitionExec(RoundRobinBatch(4))`. + async fn multi_partition_memory_limited_join_collect( + left: Arc, + right: Arc, + join_type: &JoinType, + join_filter: Option, + context: Arc, + ) -> Result<(Vec, Vec, MetricsSet)> { + let partition_count = 4; + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(partition_count), + )?) as Arc; + + let nested_loop_join = + NestedLoopJoinExec::try_new(left, right, join_filter, join_type, None)?; + let columns = columns(&nested_loop_join.schema()); + + let mut batches = vec![]; + for i in 0..partition_count { + let stream = nested_loop_join.execute(i, Arc::clone(&context))?; + let more = common::collect(stream).await?; + batches.extend(more.into_iter().filter(|b| b.num_rows() > 0)); + } + + let metrics = nested_loop_join.metrics().unwrap(); + Ok((columns, batches, metrics)) + } + + #[tokio::test] + #[ignore = "fails until cross-partition shared left-visited state is implemented"] + async fn test_nlj_memory_limited_multi_partition_left_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::Left, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Expected output is identical to the single-partition spill path + // and the multi-partition non-spill path. Each left row appears + // exactly once: the matched (5,5,50)+(2,2,80) row, plus the two + // left rows filtered out by `b1 != 8` as unmatched. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+-----+----+----+----+ + | 11 | 8 | 110 | | | | + | 5 | 5 | 50 | 2 | 2 | 80 | + | 9 | 8 | 90 | | | | + +----+----+-----+----+----+----+ + ")); + Ok(()) + } + + #[tokio::test] + #[ignore = "fails until cross-partition shared left-visited state is implemented"] + async fn test_nlj_memory_limited_multi_partition_full_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::Full, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Expected: 1 matched + 2 left-unmatched + 2 right-unmatched. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+----+----+-----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+-----+----+----+-----+ + | | | | 10 | 10 | 100 | + | | | | 12 | 10 | 40 | + | 11 | 8 | 110 | | | | + | 5 | 5 | 50 | 2 | 2 | 80 | + | 9 | 8 | 90 | | | | + +----+----+-----+----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + #[ignore = "fails until cross-partition shared left-visited state is implemented"] + async fn test_nlj_memory_limited_multi_partition_left_semi_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + // Two right rows that both match the same left row (5,5,50). Without + // shared left-visited state across partitions, each matching partition + // emits the left row once, producing duplicates. + let right = build_table( + ("a2", &vec![2, 3, 10]), + ("b2", &vec![2, 2, 10]), + ("c2", &vec![80, 70, 100]), + Some(1), + Vec::new(), + ); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::LeftSemi, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Left semi: each left row appears at most once, even if it matches + // multiple right rows distributed across partitions. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+----+ + | a1 | b1 | c1 | + +----+----+----+ + | 5 | 5 | 50 | + +----+----+----+ + ")); + Ok(()) + } + + #[tokio::test] + #[ignore = "fails until cross-partition shared left-visited state is implemented"] + async fn test_nlj_memory_limited_multi_partition_left_anti_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::LeftAnti, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Left anti: only left rows with no matching right row. + // (5,5,50) matches (2,2,80) under the filter, so it must NOT appear. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 11 | 8 | 110 | + | 9 | 8 | 90 | + +----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + #[ignore = "fails until cross-partition shared left-visited state is implemented"] + async fn test_nlj_memory_limited_multi_partition_left_mark_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::LeftMark, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Left mark: every left row appears exactly once with a bool + // indicating whether it matched at least one right row. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+-------+ + | a1 | b1 | c1 | mark | + +----+----+-----+-------+ + | 11 | 8 | 110 | false | + | 5 | 5 | 50 | true | + | 9 | 8 | 90 | false | + +----+----+-----+-------+ + ")); + Ok(()) + } } From efbd6d0b2eb36fda1a5b8bbbf077d242252c5941 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 May 2026 19:44:06 -0700 Subject: [PATCH 2/4] fix(physical-plan): Share per-chunk JoinLeftData across right partitions in NLJ memory-limited fallback When `target_partitions > 1`, the memory-limited fallback path was building a per-output-partition `JoinLeftData` with `AtomicUsize::new(1)` for each left chunk, so each partition emitted unmatched left rows based only on its own right-side matches. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and LEFT MARK, this produced wrong results (duplicate unmatched rows; for LEFT SEMI, duplicate matched rows when multiple right partitions matched the same left row). This change introduces a plan-level `FallbackCoordinator` that: - Owns the left spill stream and a single chunk-sized `MemoryReservation`, - Has the first partition reaching a chunk become its "leader": it loads the chunk and publishes an `Arc` (with `probe_threads_counter == right_partition_count`) into a shared slot, - Lets every other right partition take an `Arc` clone of the same `JoinLeftData`, so the visited bitmap and probe-thread counter are shared exactly as in the single-pass `collect_left_input` path, - Releases the slot only after the partition that brings the counter to zero finishes emitting unmatched left rows for the chunk, then notifies waiters so the next chunk can be loaded. The per-chunk in-flight fetch and release are driven through `BoxFuture` fields on `SpillStateActive`, polled across `poll_next` iterations. The FULL-join multi-partition guard added in #21833 is removed; FULL joins now use the shared coordination path. The five `#[ignore]`-d multi-partition correctness tests added in the previous commit are unignored and now pass. `test_overallocation` is updated to expect FULL multi-partition to spill (not OOM). Discussed in https://github.com/apache/datafusion/pull/21833#discussion_r3172430872 and https://github.com/apache/datafusion/pull/21833#discussion_r3172496633 Co-authored-by: Claude Code --- .../src/joins/nested_loop_join.rs | 804 +++++++++++++----- 1 file changed, 580 insertions(+), 224 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index e08a85eaa43bb..cf8d98d45bc88 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -74,7 +74,8 @@ use datafusion_physical_expr::equivalence::{ }; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::future::BoxFuture; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -216,6 +217,13 @@ pub struct NestedLoopJoinExec { /// partitions share the same spill file via this `OnceAsync`, /// avoiding redundant re-execution of the left child. left_spill_data: Arc>, + /// Coordinator that, in the memory-limited fallback path, shares + /// per-chunk `JoinLeftData` (visited bitmap + probe-thread counter) + /// across all right-side output partitions. This makes the fallback + /// path's left-side tracking consistent with the single-pass path + /// (where `collect_left_input(..., probe_threads_count)` initializes + /// the counter to `right_partition_count`). + fallback_coordinator: Arc, /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join @@ -292,6 +300,8 @@ impl NestedLoopJoinExecBuilder { join_type, projection.as_deref(), )?; + let right_partition_count = right.output_partitioning().partition_count().max(1); + let with_visited_bitmap = need_produce_result_in_final(join_type); Ok(NestedLoopJoinExec { left, right, @@ -300,6 +310,10 @@ impl NestedLoopJoinExecBuilder { join_schema, build_side_data: Default::default(), left_spill_data: Arc::new(OnceAsync::default()), + fallback_coordinator: Arc::new(FallbackCoordinator::new( + right_partition_count, + with_visited_bitmap, + )), column_indices, projection, metrics: Default::default(), @@ -496,6 +510,8 @@ impl NestedLoopJoinExec { ) -> Self { let left = children.swap_remove(0); let right = children.swap_remove(0); + let right_partition_count = right.output_partitioning().partition_count().max(1); + let with_visited_bitmap = need_produce_result_in_final(self.join_type); Self { left, @@ -503,6 +519,10 @@ impl NestedLoopJoinExec { metrics: ExecutionPlanMetricsSet::new(), build_side_data: Default::default(), left_spill_data: Arc::new(OnceAsync::default()), + fallback_coordinator: Arc::new(FallbackCoordinator::new( + right_partition_count, + with_visited_bitmap, + )), cache: Arc::clone(&self.cache), filter: self.filter.clone(), join_type: self.join_type, @@ -656,29 +676,20 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, Arc::clone(&context))?; // Determine if OOM fallback to memory-limited mode is possible. - // Conditions: - // 1. Disk manager supports temp files (needed for spilling). - // 2. FULL join with multiple right partitions is not yet supported - // in the fallback path. FULL join needs to track BOTH left-side - // matches (for unmatched left rows) AND right-side matches (for - // unmatched right rows). The fallback path builds a per-partition - // `JoinLeftData` with `probe_threads_counter == 1`, so each - // partition emits unmatched left rows based only on its own - // right-side matches, producing incorrect duplicate output for - // left rows that match in another partition. Other join types - // that need only one-sided final emission (LEFT, LEFT SEMI, - // LEFT ANTI, LEFT MARK) have a similar latent issue in the - // fallback path which predates this change; tracking is out of - // scope for this PR. - let full_join_multi_partition = - matches!(self.join_type, JoinType::Full) && right_partition_count > 1; - let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() - && !full_join_multi_partition - { + // Condition: disk manager supports temp files (needed for spilling). + // + // For join types that emit unmatched left rows in the final output + // (LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, FULL), the fallback path + // shares per-chunk `JoinLeftData` (visited bitmap + probe-thread + // counter) across all right-side partitions via + // [`FallbackCoordinator`], so left-side tracking is coordinated + // exactly as in the single-pass path. + let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() { SpillState::Pending { left_plan: Arc::clone(&self.left), task_context: Arc::clone(&context), left_spill_data: Arc::clone(&self.left_spill_data), + fallback_coordinator: Arc::clone(&self.fallback_coordinator), } } else { SpillState::Disabled @@ -906,6 +917,365 @@ pub(crate) struct LeftSpillData { schema: SchemaRef, } +/// Per-chunk shared state in the memory-limited fallback path. +/// +/// Each chunk's `JoinLeftData` is loaded once by a "leader" partition and +/// shared (via `Arc`) with every right-side output partition. The +/// `probe_threads_counter` inside the `JoinLeftData` is initialized to +/// `right_partition_count`, so `report_probe_completed` returns `true` +/// only when the *last* partition has finished probing the chunk. That +/// last partition is then responsible for emitting unmatched left rows +/// for the chunk, mirroring the single-pass path's coordination via +/// `collect_left_input(..., probe_threads_count)`. +struct CurrentChunk { + /// 0-based monotonically increasing chunk index. + chunk_index: usize, + /// Shared per-chunk left data. Cloned by every partition that probes + /// this chunk; the last to call `report_probe_completed` emits + /// unmatched left rows. + data: Arc, + /// True if the left stream was exhausted while loading this chunk — + /// no further chunks will be produced after it. + is_last: bool, +} + +/// Inner state of [`FallbackCoordinator`], guarded by an async mutex. +struct FallbackCoordinatorInner { + /// Reservation owned by the coordinator. Holds the memory for the + /// currently-loaded chunk. Reset (`resize(0)`) between chunks. + /// Lazily registered by the first leader, after the runtime context + /// becomes available via `initiate_fallback`. + reservation: Option, + /// The shared left spill stream from which chunks are read. Owned by + /// the coordinator so only one partition reads it at a time. + left_stream: Option, + /// Left schema. Set after the first leader resolves the spill future. + left_schema: Option, + /// One batch carried over from the previous chunk's load: when + /// reservation `try_grow` failed for chunk N, the offending batch is + /// recorded here and becomes the first batch of chunk N+1. + carryover: Option, + /// True once the left spill stream has produced `None`. + left_exhausted: bool, + /// Index of the next chunk to be loaded. + next_chunk_index: usize, + /// The currently-loaded chunk, or `None` if no chunk is currently + /// loaded (initial state, or the last partition has just released + /// chunk `next_chunk_index - 1` and the next leader hasn't taken + /// over yet). + current: Option, + /// True while a partition has claimed leader role for the next + /// chunk and is loading it; prevents two partitions from racing. + loader_in_flight: bool, +} + +/// Plan-level shared coordinator for the memory-limited fallback path. +/// +/// All right-side output partitions share one of these. It serializes +/// access to the left spill stream (so each chunk is read exactly once), +/// publishes the loaded chunk as an `Arc` for every +/// partition to clone, and uses a `Notify` so partitions waiting for the +/// next chunk can sleep without busy-looping. +pub(crate) struct FallbackCoordinator { + /// Number of right-side partitions; equals the + /// `probe_threads_counter` initial value for each chunk. + right_partition_count: usize, + /// Whether `JoinLeftData` should carry a left visited bitmap (for + /// join types that emit unmatched left rows in the final output). + with_visited_bitmap: bool, + inner: tokio::sync::Mutex, + /// Notified when a new chunk becomes available, when the left stream + /// is exhausted, or when a chunk is released. + notify: tokio::sync::Notify, +} + +impl FallbackCoordinator { + fn new(right_partition_count: usize, with_visited_bitmap: bool) -> Self { + Self { + right_partition_count, + with_visited_bitmap, + inner: tokio::sync::Mutex::new(FallbackCoordinatorInner { + reservation: None, + left_stream: None, + left_schema: None, + carryover: None, + left_exhausted: false, + next_chunk_index: 0, + current: None, + loader_in_flight: false, + }), + notify: tokio::sync::Notify::new(), + } + } + + /// After the last partition finishes processing chunk + /// `released_chunk_index`, drop the slot so the next leader can + /// load chunk `released_chunk_index + 1`. + async fn release_chunk(self: &Arc, released_chunk_index: usize) { + let mut inner = self.inner.lock().await; + if let Some(cur) = &inner.current + && cur.chunk_index == released_chunk_index + { + inner.current = None; + inner.next_chunk_index = released_chunk_index + 1; + } + // Always notify: waiters may be blocked because they couldn't + // become leader while a previous chunk was current. + drop(inner); + self.notify.notify_waiters(); + } + + /// Fetch `expected_chunk_index`, becoming leader to load it from the + /// left spill stream if no other partition has done so. Returns + /// `Ok(None)` when the left stream is exhausted and no chunk with + /// the requested index exists. + async fn next_chunk( + self: Arc, + expected_chunk_index: usize, + left_spill_fut: OnceFut, + task_context: Arc, + ) -> Result, bool)>> { + // Resolve the left spill future once. All partitions share the + // same OnceFut so this only does real work the first time. + let spill_data = left_spill_fut_get_shared(left_spill_fut).await?; + + loop { + let mut inner = self.inner.lock().await; + + // Lazily initialize the shared left stream and schema from + // the spill file. + if inner.left_stream.is_none() && !inner.left_exhausted { + let stream = spill_data + .spill_manager + .read_spill_as_stream(spill_data.spill_file.clone(), None)?; + inner.left_stream = Some(stream); + inner.left_schema = Some(Arc::clone(&spill_data.schema)); + } + // Lazily register the coordinator's chunk reservation. + if inner.reservation.is_none() { + inner.reservation = Some( + MemoryConsumer::new("NestedLoopJoinFallbackChunk".to_string()) + .with_can_spill(true) + .register(task_context.memory_pool()), + ); + } + + // Case 1: requested chunk is already loaded. + if let Some(cur) = &inner.current + && cur.chunk_index == expected_chunk_index + { + return Ok(Some((Arc::clone(&cur.data), cur.is_last))); + } + + // Case 2: left stream exhausted and no current chunk to + // deliver — caller is past the last chunk. + if inner.left_exhausted + && inner.current.is_none() + && inner.carryover.is_none() + { + return Ok(None); + } + + // Case 3: no chunk loaded and no leader yet — claim leader. + if inner.current.is_none() && !inner.loader_in_flight { + inner.loader_in_flight = true; + // Drop the lock while reading the stream + concatenating + // batches (this can take significant time and memory). + let mut left_stream = inner + .left_stream + .take() + .expect("left_stream installed above"); + let mut reservation = inner + .reservation + .take() + .expect("reservation installed above"); + let left_schema = Arc::clone( + inner + .left_schema + .as_ref() + .expect("left_schema installed above"), + ); + let carryover = inner.carryover.take(); + let chunk_index_to_load = inner.next_chunk_index; + debug_assert_eq!(chunk_index_to_load, expected_chunk_index); + drop(inner); + + let load_result = Arc::clone(&self) + .load_one_chunk( + chunk_index_to_load, + &mut left_stream, + &mut reservation, + carryover, + Arc::clone(&left_schema), + ) + .await; + + // Re-acquire lock and publish the result. + let mut inner = self.inner.lock().await; + inner.left_stream = Some(left_stream); + inner.reservation = Some(reservation); + inner.loader_in_flight = false; + + match load_result { + Ok(LoadOutcome::Chunk { + data, + is_last, + carryover, + }) => { + inner.carryover = carryover; + if is_last { + inner.left_exhausted = true; + } + let arc_data = Arc::new(data); + inner.current = Some(CurrentChunk { + chunk_index: chunk_index_to_load, + data: Arc::clone(&arc_data), + is_last, + }); + drop(inner); + self.notify.notify_waiters(); + return Ok(Some((arc_data, is_last))); + } + Ok(LoadOutcome::Empty) => { + // No data at all. Mark exhausted; let other + // partitions observe and exit. + inner.left_exhausted = true; + drop(inner); + self.notify.notify_waiters(); + return Ok(None); + } + Err(e) => { + drop(inner); + self.notify.notify_waiters(); + return Err(e); + } + } + } + + // Case 4: another partition is loading the next chunk, or + // the current chunk is for a previous index we've already + // moved past — wait to be notified. + let notified = self.notify.notified(); + drop(inner); + notified.await; + } + } + + /// Read one chunk worth of left batches into a `JoinLeftData`, + /// honoring the coordinator's reservation as the memory budget. + async fn load_one_chunk( + self: Arc, + _chunk_index: usize, + left_stream: &mut SendableRecordBatchStream, + reservation: &mut MemoryReservation, + carryover: Option, + left_schema: SchemaRef, + ) -> Result { + // Reset the per-chunk reservation budget. The previous chunk's + // memory has been released when its `JoinLeftData` was dropped. + reservation.resize(0); + + let mut pending_batches: Vec = Vec::new(); + let mut left_stream_exhausted = false; + let mut next_carryover: Option = None; + + // First, account for any carryover batch from the previous + // chunk's load attempt. Its memory is already in-flight, so we + // grow the reservation infallibly. + if let Some(batch) = carryover { + let bytes = batch.get_array_memory_size(); + reservation.grow(bytes); + pending_batches.push(batch); + } + + loop { + match left_stream.next().await { + Some(Ok(batch)) => { + if batch.num_rows() == 0 { + continue; + } + let bytes = batch.get_array_memory_size(); + let can_grow = reservation.try_grow(bytes).is_ok(); + if !can_grow && !pending_batches.is_empty() { + // Defer this batch to the next chunk. + next_carryover = Some(batch); + break; + } else if !can_grow { + // No pending batches — accept the batch even + // over budget so we make progress. + reservation.grow(bytes); + } + pending_batches.push(batch); + } + Some(Err(e)) => return Err(e), + None => { + left_stream_exhausted = true; + break; + } + } + } + + if pending_batches.is_empty() { + debug_assert!(left_stream_exhausted); + return Ok(LoadOutcome::Empty); + } + + let merged_batch = concat_batches(&left_schema, &pending_batches)?; + let n_rows = merged_batch.num_rows(); + let visited_left_side = if self.with_visited_bitmap { + let buffer_size = n_rows.div_ceil(8); + reservation.grow(buffer_size); + let mut buffer = BooleanBufferBuilder::new(n_rows); + buffer.append_n(n_rows, false); + buffer + } else { + BooleanBufferBuilder::new(0) + }; + + // The chunk's reservation is owned by the coordinator (so it + // can be reset between chunks). `JoinLeftData` carries an empty + // RAII placeholder. + let dummy_reservation = reservation.new_empty(); + + let data = JoinLeftData::new( + merged_batch, + Mutex::new(visited_left_side), + AtomicUsize::new(self.right_partition_count), + dummy_reservation, + ); + + Ok(LoadOutcome::Chunk { + data, + is_last: left_stream_exhausted, + carryover: next_carryover, + }) + } +} + +enum LoadOutcome { + Chunk { + data: JoinLeftData, + is_last: bool, + carryover: Option, + }, + Empty, +} + +/// Helper to await `OnceFut::get_shared` outside of `poll_next` context. +async fn left_spill_fut_get_shared( + mut fut: OnceFut, +) -> Result> { + futures::future::poll_fn(move |cx| fut.get_shared(cx)).await +} + +impl std::fmt::Debug for FallbackCoordinator { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FallbackCoordinator") + .field("right_partition_count", &self.right_partition_count) + .finish() + } +} + /// Tracks the state of the memory-limited spill fallback for NLJ. /// /// The NLJ always starts with the standard OnceFut path. If the in-memory @@ -928,6 +1298,9 @@ pub(crate) enum SpillState { /// Shared OnceAsync for left-side spill data. The first partition /// to initiate fallback spills the left side; others share the file. left_spill_data: Arc>, + /// Shared coordinator that publishes per-chunk `JoinLeftData` to + /// every right-side partition. + fallback_coordinator: Arc, }, /// Fallback has been triggered. Left data is being loaded in chunks @@ -935,21 +1308,30 @@ pub(crate) enum SpillState { Active(Box), } +/// Result of a single chunk fetch from the [`FallbackCoordinator`]: +/// either the chunk itself with a flag indicating whether it is the +/// final chunk, or `None` if the left input is fully consumed. +type ChunkFetchOutput = Option<(Arc, bool)>; +/// In-flight future for a chunk fetch. +type ChunkFetchFuture = BoxFuture<'static, Result>; + /// State for active memory-limited spill execution. /// Boxed inside [`SpillState::Active`] to reduce enum size. pub(crate) struct SpillStateActive { /// Shared future for left-side spill data. All partitions wait on /// the same future — the first to poll triggers the actual spill. left_spill_fut: OnceFut, - /// Left input stream for incremental chunk reading (from spill file). - /// None until `left_spill_fut` resolves. - left_stream: Option, - /// Left-side schema (set once `left_spill_fut` resolves) - left_schema: Option, - /// Memory reservation for left-side buffering - reservation: MemoryReservation, - /// Accumulated left batches for the current chunk - pending_batches: Vec, + /// Plan-level coordinator that publishes per-chunk `JoinLeftData` + /// shared across all right-side partitions. + coordinator: Arc, + /// Index of the next chunk this partition expects from the + /// coordinator. Increments after the partition finishes processing + /// a chunk (regardless of whether it was the one that emitted + /// unmatched left rows). + next_chunk_index: usize, + /// Captured `TaskContext` so that the first leader can register the + /// coordinator's reservation against the runtime's memory pool. + task_context: Arc, /// Right input that spills on the first pass and replays from spill later. right_input: ReplayableStreamSource, /// Per-batch accumulated right bitmaps across all left chunks. @@ -957,12 +1339,22 @@ pub(crate) struct SpillStateActive { /// Only populated when `should_track_unmatched_right` is true. global_right_bitmaps: Vec, /// Separate reservation for `global_right_bitmaps`. These buffers live - /// for the full operator lifetime (not per-chunk), so they must be - /// tracked separately from `reservation`, which gets `resize(0)`-ed - /// between chunks. + /// for the full operator lifetime (not per-chunk). global_right_bitmaps_reservation: MemoryReservation, /// Current right batch sequence index within the current pass. right_batch_index: usize, + /// In-flight chunk fetch future. Created by `BufferingLeft` when a + /// new chunk is needed; polled across iterations of `poll_next` + /// until it resolves to either the next chunk or `None` (left side + /// exhausted with no chunk to deliver). + chunk_fetch_in_flight: Option, + /// In-flight chunk release future. Created by `EmitLeftUnmatched` + /// when the last partition for a chunk has finished its work. + chunk_release_in_flight: Option>, + /// Cached left schema, set after the first chunk is fetched. + /// Used by `EmitGlobalRightUnmatched` to build NULL-padded left + /// columns for unmatched right rows. + left_schema: Option, } impl SpillStateActive { @@ -1063,6 +1455,16 @@ pub(crate) struct NestedLoopJoinStream { left_exhausted: bool, /// If we can buffer all left data in one pass (false means memory-limited multi-pass) left_buffered_in_one_pass: bool, + /// True iff this partition is the one that brought the current + /// chunk's `probe_threads_counter` to zero, i.e. it is responsible + /// for emitting unmatched left rows for the chunk and (in + /// memory-limited mode) for releasing the chunk via + /// `FallbackCoordinator::release_chunk`. + is_chunk_emitter: bool, + /// True once we have decided emitter status for the current chunk + /// (i.e. `report_probe_completed` has been called once). Reset to + /// `false` when transitioning to a new chunk's `BufferingLeft`. + chunk_emitter_decided: bool, // Probe(right) side // ----------------- @@ -1262,7 +1664,7 @@ impl Stream for NestedLoopJoinStream { let join_metric = self.metrics.join_metrics.join_time.clone(); let _join_timer = join_metric.timer(); - match self.handle_emit_left_unmatched() { + match self.handle_emit_left_unmatched(cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(poll) => { return self.metrics.join_metrics.baseline.record_poll(poll); @@ -1344,6 +1746,8 @@ impl NestedLoopJoinStream { left_emit_idx: 0, left_exhausted: false, left_buffered_in_one_pass: true, + is_chunk_emitter: false, + chunk_emitter_decided: false, handled_empty_output: false, should_track_unmatched_right: need_produce_right_in_final(join_type), spill_state, @@ -1371,13 +1775,19 @@ impl NestedLoopJoinStream { /// it to disk. Other partitions share the same spill file. fn initiate_fallback(&mut self) -> Result<()> { // Take ownership of Pending state - let (left_plan, context, left_spill_data) = + let (left_plan, context, left_spill_data, fallback_coordinator) = match std::mem::replace(&mut self.spill_state, SpillState::Disabled) { SpillState::Pending { left_plan, task_context, left_spill_data, - } => (left_plan, task_context, left_spill_data), + fallback_coordinator, + } => ( + left_plan, + task_context, + left_spill_data, + fallback_coordinator, + ), _ => { return internal_err!( "initiate_fallback called in non-Pending spill state" @@ -1422,14 +1832,9 @@ impl NestedLoopJoinStream { }) })?; - // Create reservation with can_spill for fair memory allocation - let reservation = MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string()) - .with_can_spill(true) - .register(context.memory_pool()); - // Separate reservation for the global right bitmaps. These buffers - // persist across all left chunks, whereas `reservation` is reset - // between chunks via `resize(0)`. + // are per-partition (each partition tracks matches against its own + // right input) and persist across all left chunks. let global_right_bitmaps_reservation = MemoryConsumer::new("NestedLoopJoinGlobalRightBitmaps".to_string()) .register(context.memory_pool()); @@ -1453,10 +1858,9 @@ impl NestedLoopJoinStream { self.spill_state = SpillState::Active(Box::new(SpillStateActive { left_spill_fut, - left_stream: None, - left_schema: None, - reservation, - pending_batches: Vec::new(), + coordinator: fallback_coordinator, + next_chunk_index: 0, + task_context: Arc::clone(&context), right_input: ReplayableStreamSource::new( right_data, right_spill_manager, @@ -1465,6 +1869,9 @@ impl NestedLoopJoinStream { global_right_bitmaps: Vec::new(), global_right_bitmaps_reservation, right_batch_index: 0, + chunk_fetch_in_flight: None, + chunk_release_in_flight: None, + left_schema: None, })); // State stays BufferingLeft — next poll will enter @@ -1519,9 +1926,8 @@ impl NestedLoopJoinStream { /// Memory-limited path for handle_buffering_left. /// - /// Incrementally polls the left stream and accumulates batches until: - /// - Memory reservation fails (chunk is full, more data remains) - /// - Left stream is exhausted (this is the last/only chunk) + /// Drives an in-flight `next_chunk` future on the coordinator, which + /// loads (or re-uses) the next per-chunk shared `JoinLeftData`. fn handle_buffering_left_memory_limited( &mut self, cx: &mut std::task::Context<'_>, @@ -1532,147 +1938,79 @@ impl NestedLoopJoinStream { ); }; - // On first entry (or after re-entry for a new chunk pass when - // left_stream was consumed), wait for the shared left spill - // future to resolve and then open a stream from the spill file. - if active.left_stream.is_none() { - match active.left_spill_fut.get_shared(cx) { - Poll::Ready(Ok(spill_data)) => { - match spill_data - .spill_manager - .read_spill_as_stream(spill_data.spill_file.clone(), None) - { - Ok(stream) => { - active.left_schema = Some(Arc::clone(&spill_data.schema)); - active.left_stream = Some(stream); - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - } - } - Poll::Ready(Err(e)) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - Poll::Pending => { - return ControlFlow::Break(Poll::Pending); + // Drain any pending chunk-release future before fetching the next + // chunk. The coordinator slot must be released so the next leader + // can load the following chunk. + if let Some(fut) = active.chunk_release_in_flight.as_mut() { + match fut.poll_unpin(cx) { + Poll::Ready(()) => { + active.chunk_release_in_flight = None; } + Poll::Pending => return ControlFlow::Break(Poll::Pending), } } - let left_stream = active - .left_stream - .as_mut() - .expect("left_stream must be set after spill future resolves"); + // Lazily start a chunk-fetch future for `active.next_chunk_index`. + if active.chunk_fetch_in_flight.is_none() { + let coordinator = Arc::clone(&active.coordinator); + let left_spill_fut = active.left_spill_fut.clone(); + let task_context = Arc::clone(&active.task_context); + let expected = active.next_chunk_index; + active.chunk_fetch_in_flight = Some( + coordinator + .next_chunk(expected, left_spill_fut, task_context) + .boxed(), + ); + } - // Poll left stream for more batches. - // Note: pending_batches may already contain a batch from the - // previous chunk iteration (the batch that triggered the memory limit). - loop { - match left_stream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(batch))) => { - if batch.num_rows() == 0 { - continue; - } - let batch_rows = batch.num_rows(); - let batch_size = batch.get_array_memory_size(); - let can_grow = active.reservation.try_grow(batch_size).is_ok(); - - if !can_grow && !active.pending_batches.is_empty() { - // Memory limit reached and we already have data. - // Push this batch into pending (it's already in memory) - // and stop buffering for this chunk. - active.pending_batches.push(batch); - self.left_exhausted = false; - self.left_buffered_in_one_pass = false; - break; - } else if !can_grow { - // No pending batches yet — we must accept this batch - // to make progress, even if it exceeds the budget. - active.reservation.grow(batch_size); - } + let fut = active + .chunk_fetch_in_flight + .as_mut() + .expect("chunk_fetch_in_flight installed above"); + let result = match fut.poll_unpin(cx) { + Poll::Ready(r) => r, + Poll::Pending => return ControlFlow::Break(Poll::Pending), + }; + active.chunk_fetch_in_flight = None; - self.metrics.join_metrics.build_mem_used.add(batch_size); - self.metrics.join_metrics.build_input_batches.add(1); - self.metrics.join_metrics.build_input_rows.add(batch_rows); - active.pending_batches.push(batch); - } - Poll::Ready(Some(Err(e))) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + match result { + Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + Ok(None) => { + // No chunk to deliver: left side fully consumed. + self.left_exhausted = true; + if self.is_memory_limited() && self.should_track_unmatched_right { + self.right_data = None; + self.state = NLJState::EmitGlobalRightUnmatched; + } else { + self.state = NLJState::Done; } - Poll::Ready(None) => { - // Left stream exhausted - self.left_exhausted = true; - break; + ControlFlow::Continue(()) + } + Ok(Some((data, is_last))) => { + let n_rows = data.batch().num_rows(); + self.metrics.join_metrics.build_input_batches.add(1); + self.metrics.join_metrics.build_input_rows.add(n_rows); + if active.left_schema.is_none() { + active.left_schema = Some(data.batch().schema()); } - Poll::Pending => { - return ControlFlow::Break(Poll::Pending); + self.buffered_left_data = Some(data); + self.left_exhausted = is_last; + self.left_buffered_in_one_pass = is_last && active.next_chunk_index == 0; + + active.right_batch_index = 0; + match active.right_input.open_pass() { + Ok(stream) => { + self.right_data = Some(stream); + } + Err(e) => { + return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + } } - } - } - - if active.pending_batches.is_empty() { - // No data at all — go directly to Done - self.left_exhausted = true; - self.state = NLJState::Done; - return ControlFlow::Continue(()); - } - - let merged_batch = match concat_batches( - active - .left_schema - .as_ref() - .expect("left_schema must be set"), - &active.pending_batches, - ) { - Ok(batch) => batch, - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e.into())))); - } - }; - active.pending_batches.clear(); - // Build visited bitmap if needed for this join type - let with_visited = need_produce_result_in_final(self.join_type); - let n_rows = merged_batch.num_rows(); - let visited_left_side = if with_visited { - let buffer_size = n_rows.div_ceil(8); - // Use infallible grow for bitmap — it's small - active.reservation.grow(buffer_size); - self.metrics.join_metrics.build_mem_used.add(buffer_size); - let mut buffer = BooleanBufferBuilder::new(n_rows); - buffer.append_n(n_rows, false); - buffer - } else { - BooleanBufferBuilder::new(0) - }; - - // Create an empty reservation for JoinLeftData's RAII field. - // The actual memory tracking is managed by the Active state's reservation. - let dummy_reservation = active.reservation.new_empty(); - - let left_data = JoinLeftData::new( - merged_batch, - Mutex::new(visited_left_side), - // In memory-limited mode, only 1 probe thread per chunk - AtomicUsize::new(1), - dummy_reservation, - ); - - self.buffered_left_data = Some(Arc::new(left_data)); - - active.right_batch_index = 0; - match active.right_input.open_pass() { - Ok(stream) => { - self.right_data = Some(stream); - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + self.state = NLJState::FetchingRight; + ControlFlow::Continue(()) } } - - self.state = NLJState::FetchingRight; - ControlFlow::Continue(()) } /// Handle FetchingRight state - fetch next right batch and prepare for processing. @@ -1836,13 +2174,27 @@ impl NestedLoopJoinStream { /// next chunk (if the left stream is not yet exhausted). fn handle_emit_left_unmatched( &mut self, + cx: &mut std::task::Context<'_>, ) -> ControlFlow>>> { // Return any completed batches first if let Some(poll) = self.maybe_flush_ready_batch() { return ControlFlow::Break(poll); } - // Process current unmatched state + // First, drive any pending chunk-release future to completion so + // we don't transition to the next state while another partition + // is waiting for the slot to be freed. + if let SpillState::Active(active) = &mut self.spill_state + && let Some(fut) = active.chunk_release_in_flight.as_mut() + { + match fut.poll_unpin(cx) { + Poll::Ready(()) => { + active.chunk_release_in_flight = None; + } + Poll::Pending => return ControlFlow::Break(Poll::Pending), + } + } + match self.process_left_unmatched() { // State unchanged (EmitLeftUnmatched) // Continue processing until we have processed all unmatched rows @@ -1858,13 +2210,35 @@ impl NestedLoopJoinStream { return ControlFlow::Break(poll); } - if !self.left_exhausted && self.is_memory_limited() { - // More left data to process — free current chunk and - // go back to BufferingLeft for the next chunk - if let SpillState::Active(ref active) = self.spill_state { - active.reservation.resize(0); + // Drop our reference to the current chunk's + // `JoinLeftData` before releasing the slot. Once the + // last partition does this, the `Arc` reaches zero + // refcount and the per-chunk reservation is freed. + self.buffered_left_data = None; + + if self.is_memory_limited() { + if let SpillState::Active(active) = &mut self.spill_state { + // The last partition for this chunk releases + // the coordinator slot so the next leader can + // load the following chunk. + if self.is_chunk_emitter { + let coordinator = Arc::clone(&active.coordinator); + let released_index = active.next_chunk_index; + active.chunk_release_in_flight = Some( + async move { + coordinator.release_chunk(released_index).await + } + .boxed(), + ); + } + active.next_chunk_index += 1; } - self.buffered_left_data = None; + // Reset emitter state for the next chunk. + self.is_chunk_emitter = false; + self.chunk_emitter_decided = false; + } + + if !self.left_exhausted && self.is_memory_limited() { self.left_probe_idx = 0; self.left_emit_idx = 0; self.state = NLJState::BufferingLeft; @@ -1874,9 +2248,7 @@ impl NestedLoopJoinStream { // All left chunks done — emit global right unmatched. // Drop the exhausted right stream so that // EmitGlobalRightUnmatched opens a fresh replay pass - // from the spill file. (process_left_unmatched_range - // already ran with right_data still set, so its - // schema access is not affected.) + // from the spill file. self.right_data = None; self.state = NLJState::EmitGlobalRightUnmatched; } else { @@ -2345,18 +2717,24 @@ impl NestedLoopJoinStream { /// true -> continue in the same EmitLeftUnmatched state /// false -> next state (Done) fn process_left_unmatched(&mut self) -> Result { - let left_data = self.get_left_data()?; - let left_batch = left_data.batch(); - - // ======== - // Check early return conditions - // ======== - // Early return if join type can't have unmatched rows let join_type_no_produce_left = !need_produce_result_in_final(self.join_type); - // Early return if another thread is already processing unmatched rows - let handled_by_other_partition = - self.left_emit_idx == 0 && !left_data.report_probe_completed(); + + // Decide emitter status exactly once per chunk: the partition + // whose `report_probe_completed` brings the counter to zero is + // the chunk emitter (and, in memory-limited mode, releases the + // chunk slot so the next leader can load the following chunk). + if !self.chunk_emitter_decided { + let left_data = self.get_left_data()?; + self.is_chunk_emitter = left_data.report_probe_completed(); + self.chunk_emitter_decided = true; + } + // Early return if another partition is the chunk emitter — we + // are not allowed to emit unmatched left rows. + let handled_by_other_partition = !self.is_chunk_emitter; + + let left_data = self.get_left_data()?; + let left_batch = left_data.batch(); // Stop processing unmatched rows, the caller will go to the next state let finished = self.left_emit_idx >= left_batch.num_rows(); @@ -3520,8 +3898,9 @@ pub(crate) mod tests { ); let filter = prepare_join_filter(); - // Join types that support memory-limited fallback should succeed - // even under tight memory limits (they spill to disk instead of OOM). + // All join types support memory-limited fallback under + // multi-partition right inputs (left visited state is shared + // across partitions via `FallbackCoordinator`). let fallback_join_types = vec![ JoinType::Inner, JoinType::Left, @@ -3532,6 +3911,7 @@ pub(crate) mod tests { JoinType::RightSemi, JoinType::RightAnti, JoinType::RightMark, + JoinType::Full, ]; for join_type in &fallback_join_types { @@ -3552,25 +3932,6 @@ pub(crate) mod tests { .await?; } - // FULL JOIN with multiple right partitions is intentionally not - // supported in the fallback path yet (cross-partition left-bitmap - // coordination is missing). It should still OOM under tight memory. - let runtime = RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build_arc()?; - let task_ctx = TaskContext::default().with_runtime(runtime); - let task_ctx = Arc::new(task_ctx); - let err = multi_partitioned_join_collect( - Arc::clone(&left), - Arc::clone(&right), - &JoinType::Full, - Some(filter.clone()), - task_ctx, - ) - .await - .unwrap_err(); - assert_contains!(err.to_string(), "Resources exhausted"); - Ok(()) } @@ -3993,7 +4354,6 @@ pub(crate) mod tests { } #[tokio::test] - #[ignore = "fails until cross-partition shared left-visited state is implemented"] async fn test_nlj_memory_limited_multi_partition_left_join() -> Result<()> { let task_ctx = task_ctx_with_memory_limit(50, 16)?; let left = build_left_table(); @@ -4032,7 +4392,6 @@ pub(crate) mod tests { } #[tokio::test] - #[ignore = "fails until cross-partition shared left-visited state is implemented"] async fn test_nlj_memory_limited_multi_partition_full_join() -> Result<()> { let task_ctx = task_ctx_with_memory_limit(50, 16)?; let left = build_left_table(); @@ -4070,7 +4429,6 @@ pub(crate) mod tests { } #[tokio::test] - #[ignore = "fails until cross-partition shared left-visited state is implemented"] async fn test_nlj_memory_limited_multi_partition_left_semi_join() -> Result<()> { let task_ctx = task_ctx_with_memory_limit(50, 16)?; let left = build_left_table(); @@ -4114,7 +4472,6 @@ pub(crate) mod tests { } #[tokio::test] - #[ignore = "fails until cross-partition shared left-visited state is implemented"] async fn test_nlj_memory_limited_multi_partition_left_anti_join() -> Result<()> { let task_ctx = task_ctx_with_memory_limit(50, 16)?; let left = build_left_table(); @@ -4150,7 +4507,6 @@ pub(crate) mod tests { } #[tokio::test] - #[ignore = "fails until cross-partition shared left-visited state is implemented"] async fn test_nlj_memory_limited_multi_partition_left_mark_join() -> Result<()> { let task_ctx = task_ctx_with_memory_limit(50, 16)?; let left = build_left_table(); From f1de46ec20d7c9a32b72c6d926d6038df9b59160 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 May 2026 22:32:40 -0700 Subject: [PATCH 3/4] test: Add multi-partition NLJ spill SLT cases Adds end-to-end SLT cases under target_partitions=4 + tight memory_limit, covering LEFT, FULL, LEFT SEMI, and LEFT ANTI joins. Each query uses a non-equi predicate that forces NLJ and verifies that left-row counts (matched/unmatched) match the single-partition expectation, exercising the cross-partition shared-state path introduced in the previous commit. Co-authored-by: Claude Code --- .../test_files/nested_loop_join_spill.slt | 90 ++++++++++++++++++- 1 file changed, 87 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt index b47fc5ac877c1..4fc7543429a71 100644 --- a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt +++ b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt @@ -128,12 +128,96 @@ FULL JOIN generate_series(1, 200) AS t2(v2) ---- 100199 199 99999 -# Restore settings to slt runner defaults -statement ok -RESET datafusion.runtime.memory_limit +# ============================================================================= +# Multi-partition memory-limited correctness tests +# +# These exercise the cross-partition shared-state path of NLJ's spill +# fallback: every right partition must observe the same per-chunk +# `JoinLeftData` (visited bitmap + probe-thread counter) so that join +# types which emit unmatched left rows (LEFT, LEFT SEMI, LEFT ANTI, +# LEFT MARK, FULL) produce the same results as the single-pass and +# single-partition paths. Without the coordinator, each partition would +# independently emit unmatched left rows from its own bitmap, producing +# duplicates. +# ============================================================================= statement ok SET datafusion.execution.target_partitions = 4 +# Memory budget tight enough to force NLJ left-side OOM and trigger the +# memory-limited fallback. Shared between all queries in this section. +statement ok +SET datafusion.runtime.memory_limit = '50K' + +# --- LEFT JOIN --- +# v1 in [1,5000], v2 in [1,100] with predicate (v1+v2)=101 AND v2<=50. +# Matches: v2 in [1..50] each pairs with v1 = 101 - v2 in [51..100] → 50 pairs +# Left only: v1 in [1..5000] \ [51..100] → 4950 unmatched +# Total LEFT JOIN output rows = 50 + 4950 = 5000 (each left row exactly once). +query I nosort +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +LEFT JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +5000 + +# Same predicate, also asserts the unmatched-left count is correct +# (would be 4950 * N where N is the partition count without coordination). +query II nosort +SELECT count(*) as cnt, + sum(case when t2.v2 IS NULL then 1 else 0 end) as unmatched_left +FROM generate_series(1, 5000) AS t1(v1) +LEFT JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +5000 4950 + +# --- FULL JOIN --- +# Same predicate. Output: +# - 50 matched pairs +# - 4950 unmatched left rows (NULL-padded right) +# - 50 unmatched right rows (v2 in [51..100], rejected by `v2 <= 50`) +# Total = 5050. +query III nosort +SELECT count(*) as cnt, + sum(case when t2.v2 IS NULL then 1 else 0 end) as unmatched_left, + sum(case when t1.v1 IS NULL then 1 else 0 end) as unmatched_right +FROM generate_series(1, 5000) AS t1(v1) +FULL JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +5050 4950 50 + +# --- LEFT SEMI JOIN --- +# Each left row appears at most once, even though multiple right rows +# (potentially across multiple right partitions) could match it. +# The 50 left rows in [51..100] each match exactly one right row. +query I nosort +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +WHERE EXISTS ( + SELECT 1 FROM generate_series(1, 100) AS t2(v2) + WHERE (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +) +---- +50 + +# --- LEFT ANTI JOIN --- +# Left rows with NO matching right row: 5000 - 50 = 4950. +query I nosort +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +WHERE NOT EXISTS ( + SELECT 1 FROM generate_series(1, 100) AS t2(v2) + WHERE (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +) +---- +4950 + +# Restore settings to slt runner defaults +statement ok +RESET datafusion.runtime.memory_limit + statement ok RESET datafusion.catalog.create_default_catalog_and_schema From a1a8bbedfa5dba5ee85e3932b92bca9548c55498 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 May 2026 22:38:50 -0700 Subject: [PATCH 4/4] test: Assert spill_count in multi-partition NLJ SLT cases The previous SLT cases verified output correctness under target_partitions=4 + tight memory_limit, but did not assert that the memory-limited fallback path was actually taken. Add an EXPLAIN ANALYZE assertion that the NestedLoopJoinExec line shows spill_count=2, confirming both the left-side and right-side spills fired. Co-authored-by: Claude Code --- .../test_files/nested_loop_join_spill.slt | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt index 4fc7543429a71..6eaf2b38a5bcf 100644 --- a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt +++ b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt @@ -162,6 +162,30 @@ LEFT JOIN generate_series(1, 100) AS t2(v2) ---- 5000 +# Verify the memory-limited fallback was actually taken (spill_count > 0) +# under target_partitions=4. The fallback shares per-chunk `JoinLeftData` +# across right partitions via `FallbackCoordinator`; without that path +# the test would simply OOM rather than spill. +query TT +EXPLAIN ANALYZE +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +LEFT JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +Plan with Metrics +01)ProjectionExec: expr=[count(Int64(1))@0 as cnt], metrics=[] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[] +03)----CoalescePartitionsExec, metrics=[] +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[] +05)--------NestedLoopJoinExec: join_type=Left, filter=v1@0 + v2@1 = 101, projection=[], metrics=[output_rows=5.00 K, spill_count=2, ] +06)----------ProjectionExec: expr=[value@0 as v1], metrics=[] +07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=5000, batch_size=8192], metrics=[] +08)----------ProjectionExec: expr=[value@0 as v2], metrics=[] +09)------------FilterExec: value@0 <= 50, metrics=[] +10)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[] +11)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100, batch_size=8192], metrics=[] + # Same predicate, also asserts the unmatched-left count is correct # (would be 4950 * N where N is the partition count without coordination). query II nosort