From 39c22f5acd0afb8276c9611a7a2510be78a0df3e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:27:32 +0800 Subject: [PATCH 1/9] Optimize HashJoinStream for empty build side Implement a staged mini-plan for HashJoinStream to immediately exit when the build side is empty and the join type's result is fully determined. This change avoids unnecessary entry into FetchProbeBatch for Inner, Left, LeftSemi, LeftAnti, LeftMark, and RightSemi joins without filters. Add tests to verify join behavior with empty build: - join_does_not_consume_probe_when_empty_build_fixes_output - join_still_consumes_probe_when_empty_build_needs_probe_rows These use MockExec to distinguish between short-circuiting and necessary probe row consumption. --- .../physical-plan/src/joins/hash_join/exec.rs | 103 ++++++++++++++++++ .../src/joins/hash_join/stream.rs | 32 +++++- 2 files changed, 133 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 038eb96b7b45..9eb70949c37b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4983,6 +4983,109 @@ mod tests { } } + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output() { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + + let err = exec_err!("bad data error"); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, + )]; + let schema = right.schema(); + let right_input = + Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ]; + + for join_type in join_types { + let join = join( + Arc::clone(&left), + Arc::clone(&right_input) as Arc, + on.clone(), + &join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + let task_ctx = Arc::new(TaskContext::default()); + + let stream = join.execute(0, task_ctx).unwrap(); + let batches = common::collect(stream).await.unwrap(); + + assert!( + batches.is_empty(), + "expected no output batches for {join_type}, got {batches:?}" + ); + } + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + + let err = exec_err!("bad data error"); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, + )]; + let schema = right.schema(); + let right_input = + Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); + + let join_types = vec![ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ]; + + for join_type in join_types { + let join = join( + Arc::clone(&left), + Arc::clone(&right_input) as Arc, + on.clone(), + &join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + let task_ctx = Arc::new(TaskContext::default()); + + let stream = join.execute(0, task_ctx).unwrap(); + let result_string = common::collect(stream).await.unwrap_err().to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } + } + #[tokio::test] async fn join_split_batch() { let left = build_table( diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index ab630920184d..c8c88b519fc8 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -406,6 +406,21 @@ impl HashJoinStream { } } + /// Returns true when an empty build side fully determines the join result, + /// so the probe side does not need to be consumed. + fn can_skip_probe_on_empty_build_side(&self) -> bool { + self.filter.is_none() + && matches!( + self.join_type, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -469,7 +484,14 @@ impl HashJoinStream { if let Some(ref mut fut) = self.build_waiter { ready!(fut.get_shared(cx))?; } - self.state = HashJoinStreamState::FetchProbeBatch; + let build_side = self.build_side.try_as_ready()?; + self.state = if build_side.left_data.map().is_empty() + && self.can_skip_probe_on_empty_build_side() + { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + }; Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -540,7 +562,13 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = HashJoinStreamState::FetchProbeBatch; + self.state = if left_data.map().is_empty() + && self.can_skip_probe_on_empty_build_side() + { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + }; } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); From 8e4c6eb9dc503cf4fa51664525935440e5e6a2be Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:32:04 +0800 Subject: [PATCH 2/9] Refactor post-build transition handling Extract duplicate post-build transition logic into next_state_after_build_ready in stream.rs. This centralizes the decision between Completed and FetchProbeBatch in one location and streamlines both collect_build_side and wait_for_partition_bounds_report to use the new helper function. --- .../src/joins/hash_join/stream.rs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index c8c88b519fc8..650aae5cb473 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -421,6 +421,16 @@ impl HashJoinStream { ) } + /// Returns the next state after the build side has been fully collected + /// and any required build-side coordination has completed. + fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + if left_data.map().is_empty() && self.can_skip_probe_on_empty_build_side() { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + } + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -485,13 +495,7 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = if build_side.left_data.map().is_empty() - && self.can_skip_probe_on_empty_build_side() - { - HashJoinStreamState::Completed - } else { - HashJoinStreamState::FetchProbeBatch - }; + self.state = self.next_state_after_build_ready(build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -562,13 +566,7 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = if left_data.map().is_empty() - && self.can_skip_probe_on_empty_build_side() - { - HashJoinStreamState::Completed - } else { - HashJoinStreamState::FetchProbeBatch - }; + self.state = self.next_state_after_build_ready(left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); From dcb4cd920d84adb38912a4d62cb66c6cac06287b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:33:50 +0800 Subject: [PATCH 3/9] Refactor join type logic into utils.rs Move the pure JoinType semantic rule to utils.rs, placing it alongside the existing join behavior helpers. Update HashJoinStream in stream.rs to focus solely on its stream-specific execution concern by removing unnecessary logic related to filtering. --- .../src/joins/hash_join/stream.rs | 22 +++++-------------- datafusion/physical-plan/src/joins/utils.rs | 14 ++++++++++++ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 650aae5cb473..523db10c8355 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -42,7 +42,7 @@ use crate::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, - need_produce_result_in_final, + can_skip_probe_on_empty_build_side, need_produce_result_in_final, }, }; @@ -406,25 +406,13 @@ impl HashJoinStream { } } - /// Returns true when an empty build side fully determines the join result, - /// so the probe side does not need to be consumed. - fn can_skip_probe_on_empty_build_side(&self) -> bool { - self.filter.is_none() - && matches!( - self.join_type, - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti - | JoinType::LeftMark - | JoinType::RightSemi - ) - } - /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { - if left_data.map().is_empty() && self.can_skip_probe_on_empty_build_side() { + if left_data.map().is_empty() + && self.filter.is_none() + && can_skip_probe_on_empty_build_side(self.join_type) + { HashJoinStreamState::Completed } else { HashJoinStreamState::FetchProbeBatch diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 3130134e253d..ccb09e17f9b7 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -855,6 +855,20 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool { ) } +/// Returns true when an empty build side fully determines the join result, +/// so the probe side does not need to be consumed. +pub(crate) fn can_skip_probe_on_empty_build_side(join_type: JoinType) -> bool { + matches!( + join_type, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) +} + pub(crate) fn get_final_indices_from_shared_bitmap( shared_bitmap: &SharedBitmapBuilder, join_type: JoinType, From 072cbea6cb4acf7c18d83d1195cc0c87bb727b36 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:36:35 +0800 Subject: [PATCH 4/9] Refactor test setup for shared empty build fixtures Extract shared empty-build/probe-error test setup into a new function, empty_build_with_probe_error_inputs(), in exec.rs. Both regression tests now reuse this setup, allowing each test to focus more on the join-type behavior it asserts rather than rebuilding the same fixture. --- .../physical-plan/src/joins/hash_join/exec.rs | 68 +++++++------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 9eb70949c37b..752678ca050a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2275,6 +2275,28 @@ mod tests { ) } + fn empty_build_with_probe_error_inputs() -> (Arc, Arc, JoinOn) { + let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + let left: Arc = + TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None) + .unwrap(); + + let err = exec_err!("bad data error"); + let right_batch = + build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + let right_schema = right_batch.schema(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _, + )]; + let right: Arc = Arc::new( + MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false), + ); + + (left, right, on) + } + async fn join_collect( left: Arc, right: Arc, @@ -4985,26 +5007,7 @@ mod tests { #[tokio::test] async fn join_does_not_consume_probe_when_empty_build_fixes_output() { - let left_batch = - build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); - let left_schema = left_batch.schema(); - - let err = exec_err!("bad data error"); - let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); - - let on = vec![( - Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, - Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, - )]; - let schema = right.schema(); - let right_input = - Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); - let left: Arc = TestMemoryExec::try_new_exec( - &[vec![left_batch]], - Arc::clone(&left_schema), - None, - ) - .unwrap(); + let (left, right_input, on) = empty_build_with_probe_error_inputs(); let join_types = vec![ JoinType::Inner, @@ -5018,7 +5021,7 @@ mod tests { for join_type in join_types { let join = join( Arc::clone(&left), - Arc::clone(&right_input) as Arc, + Arc::clone(&right_input), on.clone(), &join_type, NullEquality::NullEqualsNothing, @@ -5038,26 +5041,7 @@ mod tests { #[tokio::test] async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { - let left_batch = - build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); - let left_schema = left_batch.schema(); - - let err = exec_err!("bad data error"); - let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); - - let on = vec![( - Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, - Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, - )]; - let schema = right.schema(); - let right_input = - Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); - let left: Arc = TestMemoryExec::try_new_exec( - &[vec![left_batch]], - Arc::clone(&left_schema), - None, - ) - .unwrap(); + let (left, right_input, on) = empty_build_with_probe_error_inputs(); let join_types = vec![ JoinType::Right, @@ -5069,7 +5053,7 @@ mod tests { for join_type in join_types { let join = join( Arc::clone(&left), - Arc::clone(&right_input) as Arc, + Arc::clone(&right_input), on.clone(), &join_type, NullEquality::NullEqualsNothing, From 30f2a3e30829d2b8855fe3235359248c7aed594f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:41:29 +0800 Subject: [PATCH 5/9] Add regression test for empty build side in hash join Implement test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report in exec.rs. Ensure that dynamic filtering is enabled by keeping a consumer reference alive. Verify that an Inner join with an empty build side correctly skips probe consumption, even when passing through the WaitPartitionBoundsReport path. --- .../physical-plan/src/joins/hash_join/exec.rs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 752678ca050a..1e3d221bff7b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -5594,6 +5594,43 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report( + ) -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let (left, right, on) = empty_build_with_probe_error_inputs(); + + // Keep an extra consumer reference so execute() enables dynamic filter pushdown + // and enters the WaitPartitionBoundsReport path before deciding whether to poll + // the probe side. + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + build_accumulator: OnceLock::new(), + }); + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + assert!(batches.is_empty()); + + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } + #[tokio::test] async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> { let task_ctx = prepare_task_ctx(8192, true); From 626f7710683dc13e9e96bc1c59e3e74826a43156 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:59:35 +0800 Subject: [PATCH 6/9] Consolidate empty-build probe tests and join setup Refactor exec.rs by consolidating empty-build probe-behavior tests into `assert_empty_build_probe_behavior(...)` and repeated dynamic filter join setup into `hash_join_with_dynamic_filter(...)`. Maintain existing runtime logic while reducing duplicate test boilerplate and redundant local setup for improved clarity and maintainability. --- .../physical-plan/src/joins/hash_join/exec.rs | 222 ++++++++---------- .../src/joins/hash_join/stream.rs | 5 +- 2 files changed, 104 insertions(+), 123 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 1e3d221bff7b..74989fd7bd64 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2275,8 +2275,10 @@ mod tests { ) } - fn empty_build_with_probe_error_inputs() -> (Arc, Arc, JoinOn) { - let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + fn empty_build_with_probe_error_inputs() + -> (Arc, Arc, JoinOn) { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); let left_schema = left_batch.schema(); let left: Arc = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None) @@ -2297,6 +2299,69 @@ mod tests { (left, right, on) } + async fn assert_empty_build_probe_behavior( + join_types: &[JoinType], + expect_probe_error: bool, + ) { + let (left, right, on) = empty_build_with_probe_error_inputs(); + + for join_type in join_types { + let join = join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + + let result = common::collect( + join.execute(0, Arc::new(TaskContext::default())).unwrap(), + ) + .await; + + if expect_probe_error { + let result_string = result.unwrap_err().to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } else { + let batches = result.unwrap(); + assert!( + batches.is_empty(), + "expected no output batches for {join_type}, got {batches:?}" + ); + } + } + } + + fn hash_join_with_dynamic_filter( + left: Arc, + right: Arc, + on: JoinOn, + join_type: JoinType, + ) -> Result<(HashJoinExec, Arc)> { + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + build_accumulator: OnceLock::new(), + }); + + Ok((join, dynamic_filter)) + } + async fn join_collect( left: Arc, right: Arc, @@ -5007,67 +5072,32 @@ mod tests { #[tokio::test] async fn join_does_not_consume_probe_when_empty_build_fixes_output() { - let (left, right_input, on) = empty_build_with_probe_error_inputs(); - - let join_types = vec![ - JoinType::Inner, - JoinType::Left, - JoinType::LeftSemi, - JoinType::LeftAnti, - JoinType::LeftMark, - JoinType::RightSemi, - ]; - - for join_type in join_types { - let join = join( - Arc::clone(&left), - Arc::clone(&right_input), - on.clone(), - &join_type, - NullEquality::NullEqualsNothing, - ) - .unwrap(); - let task_ctx = Arc::new(TaskContext::default()); - - let stream = join.execute(0, task_ctx).unwrap(); - let batches = common::collect(stream).await.unwrap(); - - assert!( - batches.is_empty(), - "expected no output batches for {join_type}, got {batches:?}" - ); - } + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + ) + .await; } #[tokio::test] async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { - let (left, right_input, on) = empty_build_with_probe_error_inputs(); - - let join_types = vec![ - JoinType::Right, - JoinType::Full, - JoinType::RightAnti, - JoinType::RightMark, - ]; - - for join_type in join_types { - let join = join( - Arc::clone(&left), - Arc::clone(&right_input), - on.clone(), - &join_type, - NullEquality::NullEqualsNothing, - ) - .unwrap(); - let task_ctx = Arc::new(TaskContext::default()); - - let stream = join.execute(0, task_ctx).unwrap(); - let result_string = common::collect(stream).await.unwrap_err().to_string(); - assert!( - result_string.contains("bad data error"), - "actual: {result_string}" - ); - } + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + ) + .await; } #[tokio::test] @@ -5513,26 +5543,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -5540,7 +5552,7 @@ mod tests { // After the join completes, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } @@ -5562,26 +5574,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -5589,44 +5583,28 @@ mod tests { // Even with empty build side, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } #[tokio::test] - async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report( - ) -> Result<()> { + async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report() + -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let (left, right, on) = empty_build_with_probe_error_inputs(); // Keep an extra consumer reference so execute() enables dynamic filter pushdown // and enters the WaitPartitionBoundsReport path before deciding whether to poll // the probe side. - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; assert!(batches.is_empty()); - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 523db10c8355..4609e198666e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,7 +408,10 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + fn next_state_after_build_ready( + &self, + left_data: &JoinLeftData, + ) -> HashJoinStreamState { if left_data.map().is_empty() && self.filter.is_none() && can_skip_probe_on_empty_build_side(self.join_type) From 45ee6d17929fb614a49839322a4f0697faaaa0cb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 22:40:11 +0800 Subject: [PATCH 7/9] Refactor empty build side handling Remove duplication by introducing a shared helper, empty_build_side_produces_empty_result, in utils.rs. Update build_batch_empty_build_side to use this helper directly, ensuring alignment in the short-circuit and batch-construction logic within the hash join state transition in stream.rs. --- .../src/joins/hash_join/stream.rs | 4 +- datafusion/physical-plan/src/joins/utils.rs | 76 +++++++++---------- 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 4609e198666e..1d11d4d0601d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -42,7 +42,7 @@ use crate::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, - can_skip_probe_on_empty_build_side, need_produce_result_in_final, + empty_build_side_produces_empty_result, need_produce_result_in_final, }, }; @@ -414,7 +414,7 @@ impl HashJoinStream { ) -> HashJoinStreamState { if left_data.map().is_empty() && self.filter.is_none() - && can_skip_probe_on_empty_build_side(self.join_type) + && empty_build_side_produces_empty_result(self.join_type) { HashJoinStreamState::Completed } else { diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index ccb09e17f9b7..aadb9715787e 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -855,9 +855,11 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool { ) } -/// Returns true when an empty build side fully determines the join result, -/// so the probe side does not need to be consumed. -pub(crate) fn can_skip_probe_on_empty_build_side(join_type: JoinType) -> bool { +/// Returns true when an empty build side necessarily produces an empty result. +/// +/// This is the shared source of truth for both state-machine short-circuiting +/// and `build_batch_empty_build_side`. +pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool { matches!( join_type, JoinType::Inner @@ -1074,46 +1076,38 @@ pub(crate) fn build_batch_empty_build_side( column_indices: &[ColumnIndex], join_type: JoinType, ) -> Result { - match join_type { - // these join types only return data if the left side is not empty, so we return an - // empty RecordBatch - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::RightSemi - | JoinType::LeftAnti - | JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))), - - // the remaining joins will return data for the right columns and null for the left ones - JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => { - let num_rows = probe_batch.num_rows(); - if schema.fields().is_empty() { - return new_empty_schema_batch(schema, num_rows); - } - let mut columns: Vec> = - Vec::with_capacity(schema.fields().len()); - - for column_index in column_indices { - let array = match column_index.side { - // left -> null array - JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), - num_rows, - ), - // right -> respective right array - JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), - // right mark -> unset boolean array as there are no matches on the left side - JoinSide::None => Arc::new(BooleanArray::new( - BooleanBuffer::new_unset(num_rows), - None, - )), - }; - - columns.push(array); - } + if empty_build_side_produces_empty_result(join_type) { + // These join types only return data if the left side is not empty. + Ok(RecordBatch::new_empty(Arc::new(schema.clone()))) + } else { + // The remaining joins return right-side rows and nulls for the left side. + let num_rows = probe_batch.num_rows(); + if schema.fields().is_empty() { + return new_empty_schema_batch(schema, num_rows); + } + let mut columns: Vec> = + Vec::with_capacity(schema.fields().len()); + + for column_index in column_indices { + let array = match column_index.side { + // left -> null array + JoinSide::Left => new_null_array( + build_batch.column(column_index.index).data_type(), + num_rows, + ), + // right -> respective right array + JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), + // right mark -> unset boolean array as there are no matches on the left side + JoinSide::None => Arc::new(BooleanArray::new( + BooleanBuffer::new_unset(num_rows), + None, + )), + }; - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + columns.push(array); } + + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } } From 12a52ef0d65cb43c58b61bbcd5f97d335f2a7f2b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 20 Mar 2026 11:19:40 +0800 Subject: [PATCH 8/9] Simplify hash-join state machine and batch construction Refactor stream.rs and utils.rs to streamline the hash-join state machine. Compute the post-build state directly from inputs, eliminating unnecessary indirection. Update the empty-build-side batch construction to utilize early returns and iterator-based collection for columns, replacing manual Vec setup and push logic. --- .../src/joins/hash_join/stream.rs | 23 +++++--- datafusion/physical-plan/src/joins/utils.rs | 54 +++++++++---------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1d11d4d0601d..7f0274e619c2 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,13 +408,14 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn next_state_after_build_ready( - &self, + fn state_after_build_ready( + has_filter: bool, + join_type: JoinType, left_data: &JoinLeftData, ) -> HashJoinStreamState { - if left_data.map().is_empty() - && self.filter.is_none() - && empty_build_side_produces_empty_result(self.join_type) + if !has_filter + && left_data.map().is_empty() + && empty_build_side_produces_empty_result(join_type) { HashJoinStreamState::Completed } else { @@ -486,7 +487,11 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = self.next_state_after_build_ready(build_side.left_data.as_ref()); + self.state = Self::state_after_build_ready( + self.filter.is_some(), + self.join_type, + build_side.left_data.as_ref(), + ); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -557,7 +562,11 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = self.next_state_after_build_ready(left_data.as_ref()); + self.state = Self::state_after_build_ready( + self.filter.is_some(), + self.join_type, + left_data.as_ref(), + ); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index aadb9715787e..540fa3c373aa 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1078,37 +1078,33 @@ pub(crate) fn build_batch_empty_build_side( ) -> Result { if empty_build_side_produces_empty_result(join_type) { // These join types only return data if the left side is not empty. - Ok(RecordBatch::new_empty(Arc::new(schema.clone()))) - } else { - // The remaining joins return right-side rows and nulls for the left side. - let num_rows = probe_batch.num_rows(); - if schema.fields().is_empty() { - return new_empty_schema_batch(schema, num_rows); - } - let mut columns: Vec> = - Vec::with_capacity(schema.fields().len()); - - for column_index in column_indices { - let array = match column_index.side { - // left -> null array - JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), - num_rows, - ), - // right -> respective right array - JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), - // right mark -> unset boolean array as there are no matches on the left side - JoinSide::None => Arc::new(BooleanArray::new( - BooleanBuffer::new_unset(num_rows), - None, - )), - }; - - columns.push(array); - } + return Ok(RecordBatch::new_empty(Arc::new(schema.clone()))); + } - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + // The remaining joins return right-side rows and nulls for the left side. + let num_rows = probe_batch.num_rows(); + if schema.fields().is_empty() { + return new_empty_schema_batch(schema, num_rows); } + + let columns = column_indices + .iter() + .map(|column_index| match column_index.side { + // left -> null array + JoinSide::Left => new_null_array( + build_batch.column(column_index.index).data_type(), + num_rows, + ), + // right -> respective right array + JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), + // right mark -> unset boolean array as there are no matches on the left side + JoinSide::None => { + Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)) + } + }) + .collect(); + + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } /// The input is the matched indices for left and right and From 48cdea9f1eef91dbbc20be214e8df584214ff659 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 20 Mar 2026 13:06:02 +0800 Subject: [PATCH 9/9] clippy fix --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 74989fd7bd64..d75d40511f34 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2280,9 +2280,12 @@ mod tests { let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); let left_schema = left_batch.schema(); - let left: Arc = - TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None) - .unwrap(); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); let err = exec_err!("bad data error"); let right_batch =