Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 147 additions & 42 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,96 @@ mod tests {
)
}

fn empty_build_with_probe_error_inputs()
-> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
let left_batch =
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
let left_schema = left_batch.schema();
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[vec![left_batch]],
Arc::clone(&left_schema),
None,
)
.unwrap();

let err = exec_err!("bad data error");
let right_batch =
build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let right_schema = right_batch.schema();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
)];
let right: Arc<dyn ExecutionPlan> = Arc::new(
MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
);

(left, right, on)
}

async fn assert_empty_build_probe_behavior(
join_types: &[JoinType],
expect_probe_error: bool,
) {
let (left, right, on) = empty_build_with_probe_error_inputs();

for join_type in join_types {
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
join_type,
NullEquality::NullEqualsNothing,
)
.unwrap();

let result = common::collect(
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
)
.await;

if expect_probe_error {
let result_string = result.unwrap_err().to_string();
assert!(
result_string.contains("bad data error"),
"actual: {result_string}"
);
} else {
let batches = result.unwrap();
assert!(
batches.is_empty(),
"expected no output batches for {join_type}, got {batches:?}"
);
}
}
}

fn hash_join_with_dynamic_filter(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&join_type,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: Arc::clone(&dynamic_filter),
build_accumulator: OnceLock::new(),
});

Ok((join, dynamic_filter))
}

async fn join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -4983,6 +5073,36 @@ mod tests {
}
}

#[tokio::test]
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
assert_empty_build_probe_behavior(
&[
JoinType::Inner,
JoinType::Left,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightSemi,
],
false,
)
.await;
}

#[tokio::test]
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
assert_empty_build_probe_behavior(
&[
JoinType::Right,
JoinType::Full,
JoinType::RightAnti,
JoinType::RightMark,
],
true,
)
.await;
}

#[tokio::test]
async fn join_split_batch() {
let left = build_table(
Expand Down Expand Up @@ -5426,34 +5546,16 @@ mod tests {
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let (join, dynamic_filter) =
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// After the join completes, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;
dynamic_filter.wait_complete().await;

Ok(())
}
Expand All @@ -5475,34 +5577,37 @@ mod tests {
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let (join, dynamic_filter) =
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// Even with empty build side, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;
dynamic_filter.wait_complete().await;

Ok(())
}

#[tokio::test]
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
-> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let (left, right, on) = empty_build_with_probe_error_inputs();

// Keep an extra consumer reference so execute() enables dynamic filter pushdown
// and enters the WaitPartitionBoundsReport path before deciding whether to poll
// the probe side.
let (join, dynamic_filter) =
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;

let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
assert!(batches.is_empty());

dynamic_filter.wait_complete().await;

Ok(())
}
Expand Down
32 changes: 29 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_empty_build_side, build_batch_from_indices,
need_produce_result_in_final,
empty_build_side_produces_empty_result, need_produce_result_in_final,
},
};

Expand Down Expand Up @@ -406,6 +406,23 @@ impl HashJoinStream {
}
}

/// Returns the next state after the build side has been fully collected
/// and any required build-side coordination has completed.
fn state_after_build_ready(
has_filter: bool,
join_type: JoinType,
left_data: &JoinLeftData,
) -> HashJoinStreamState {
if !has_filter
&& left_data.map().is_empty()
&& empty_build_side_produces_empty_result(join_type)
{
HashJoinStreamState::Completed
} else {
HashJoinStreamState::FetchProbeBatch
}
}

/// Separate implementation function that unpins the [`HashJoinStream`] so
/// that partial borrows work correctly
fn poll_next_impl(
Expand Down Expand Up @@ -469,7 +486,12 @@ impl HashJoinStream {
if let Some(ref mut fut) = self.build_waiter {
ready!(fut.get_shared(cx))?;
}
self.state = HashJoinStreamState::FetchProbeBatch;
let build_side = self.build_side.try_as_ready()?;
self.state = Self::state_after_build_ready(
self.filter.is_some(),
self.join_type,
build_side.left_data.as_ref(),
);
Poll::Ready(Ok(StatefulStreamResult::Continue))
}

Expand Down Expand Up @@ -540,7 +562,11 @@ impl HashJoinStream {
}));
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
} else {
self.state = HashJoinStreamState::FetchProbeBatch;
self.state = Self::state_after_build_ready(
self.filter.is_some(),
self.join_type,
left_data.as_ref(),
);
}

self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
Expand Down
78 changes: 41 additions & 37 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,22 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
)
}

/// Returns true when an empty build side necessarily produces an empty result.
///
/// This is the shared source of truth for both state-machine short-circuiting
/// and `build_batch_empty_build_side`.
pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool {
matches!(
join_type,
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark
| JoinType::RightSemi
)
}

pub(crate) fn get_final_indices_from_shared_bitmap(
shared_bitmap: &SharedBitmapBuilder,
join_type: JoinType,
Expand Down Expand Up @@ -1060,47 +1076,35 @@ pub(crate) fn build_batch_empty_build_side(
column_indices: &[ColumnIndex],
join_type: JoinType,
) -> Result<RecordBatch> {
match join_type {
// these join types only return data if the left side is not empty, so we return an
// empty RecordBatch
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))),
if empty_build_side_produces_empty_result(join_type) {
// These join types only return data if the left side is not empty.
return Ok(RecordBatch::new_empty(Arc::new(schema.clone())));
}

// the remaining joins will return data for the right columns and null for the left ones
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
let num_rows = probe_batch.num_rows();
if schema.fields().is_empty() {
return new_empty_schema_batch(schema, num_rows);
}
let mut columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(schema.fields().len());

for column_index in column_indices {
let array = match column_index.side {
// left -> null array
JoinSide::Left => new_null_array(
build_batch.column(column_index.index).data_type(),
num_rows,
),
// right -> respective right array
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
// right mark -> unset boolean array as there are no matches on the left side
JoinSide::None => Arc::new(BooleanArray::new(
BooleanBuffer::new_unset(num_rows),
None,
)),
};
// The remaining joins return right-side rows and nulls for the left side.
let num_rows = probe_batch.num_rows();
if schema.fields().is_empty() {
return new_empty_schema_batch(schema, num_rows);
}

columns.push(array);
let columns = column_indices
.iter()
.map(|column_index| match column_index.side {
// left -> null array
JoinSide::Left => new_null_array(
build_batch.column(column_index.index).data_type(),
num_rows,
),
// right -> respective right array
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
// right mark -> unset boolean array as there are no matches on the left side
JoinSide::None => {
Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None))
}
})
.collect();

Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}
}
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}

/// The input is the matched indices for left and right and
Expand Down
Loading