Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,12 @@ config_namespace! {
/// process to reorder the join keys
pub top_down_join_key_reordering: bool, default = true

/// When set to true, the physical plan optimizer may swap join inputs
/// based on statistics. When set to false, statistics-driven join
/// input reordering is disabled and the original join order in the
/// query is used.
pub join_reordering: bool, default = true

/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
pub prefer_hash_join: bool, default = true
Expand Down
95 changes: 57 additions & 38 deletions datafusion/physical-optimizer/src/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,27 @@ impl JoinSelection {

// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
/// Checks statistics for join swap.
/// Checks whether join inputs should be swapped using available statistics.
///
/// It follows these steps:
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
/// the left (build) side.
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
/// 3. Do not reorder the join if neither statistic is available, or if
/// `datafusion.optimizer.join_reordering` is disabled.
///
///
/// Used configurations inside arg `config`
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
pub(crate) fn should_swap_join_order(
left: &dyn ExecutionPlan,
right: &dyn ExecutionPlan,
config: &ConfigOptions,
) -> Result<bool> {
if !config.optimizer.join_reordering {
return Ok(false);
}

// Get the left and right table's total bytes
// If both the left and right tables contain total_byte_size statistics,
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
Expand Down Expand Up @@ -133,17 +149,8 @@ impl PhysicalOptimizerRule for JoinSelection {
// do not modify join sides.
// - We will also swap left and right sides for cross joins so that the left
// side is the small side.
let config = &config.optimizer;
let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
new_plan
.transform_up(|plan| {
statistical_join_selection_subrule(
plan,
collect_threshold_byte_size,
collect_threshold_num_rows,
)
})
.transform_up(|plan| statistical_join_selection_subrule(plan, config))
.data()
}

Expand All @@ -162,34 +169,39 @@ impl PhysicalOptimizerRule for JoinSelection {
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
/// When the `ignore_threshold` is false, this function will also check left
/// and right sizes in bytes or rows.
///
/// Used configurations inside arg `config`
/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft`
/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft`
/// - `config.optimizer.join_reordering`: allows or forbids input swapping
pub(crate) fn try_collect_left(
hash_join: &HashJoinExec,
ignore_threshold: bool,
threshold_byte_size: usize,
threshold_num_rows: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let left = hash_join.left();
let right = hash_join.right();
let optimizer_config = &config.optimizer;

let left_can_collect = ignore_threshold
|| supports_collect_by_thresholds(
&**left,
threshold_byte_size,
threshold_num_rows,
optimizer_config.hash_join_single_partition_threshold,
optimizer_config.hash_join_single_partition_threshold_rows,
);
let right_can_collect = ignore_threshold
|| supports_collect_by_thresholds(
&**right,
threshold_byte_size,
threshold_num_rows,
optimizer_config.hash_join_single_partition_threshold,
optimizer_config.hash_join_single_partition_threshold_rows,
);

match (left_can_collect, right_can_collect) {
(true, true) => {
// Don't swap null-aware anti joins as they have specific side requirements
if hash_join.join_type().supports_swap()
&& !hash_join.null_aware
&& should_swap_join_order(&**left, &**right)?
&& should_swap_join_order(&**left, &**right, config)?
{
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
} else {
Expand All @@ -209,7 +221,10 @@ pub(crate) fn try_collect_left(
))),
(false, true) => {
// Don't swap null-aware anti joins as they have specific side requirements
if hash_join.join_type().supports_swap() && !hash_join.null_aware {
if optimizer_config.join_reordering
&& hash_join.join_type().supports_swap()
&& !hash_join.null_aware
{
hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
} else {
Ok(None)
Expand All @@ -224,15 +239,19 @@ pub(crate) fn try_collect_left(
/// Checks if the join order should be swapped based on the join type and input statistics.
/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
/// creates a standard partitioned hash join.
///
/// Used configurations inside arg `config`
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
pub(crate) fn partitioned_hash_join(
hash_join: &HashJoinExec,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let left = hash_join.left();
let right = hash_join.right();
// Don't swap null-aware anti joins as they have specific side requirements
if hash_join.join_type().supports_swap()
&& !hash_join.null_aware
&& should_swap_join_order(&**left, &**right)?
&& should_swap_join_order(&**left, &**right, config)?
{
hash_join.swap_inputs(PartitionMode::Partitioned)
} else {
Expand All @@ -256,28 +275,28 @@ pub(crate) fn partitioned_hash_join(
}

/// This subrule tries to modify a given plan so that it can
/// optimize hash and cross joins in the plan according to available statistical information.
/// optimize hash and cross joins in the plan according to available statistical
/// information.
///
/// Used configurations inside arg `config`
/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft`
/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft`
/// - `config.optimizer.join_reordering`: allows or forbids input swapping
fn statistical_join_selection_subrule(
plan: Arc<dyn ExecutionPlan>,
collect_threshold_byte_size: usize,
collect_threshold_num_rows: usize,
config: &ConfigOptions,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let transformed =
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
match hash_join.partition_mode() {
PartitionMode::Auto => try_collect_left(
hash_join,
false,
collect_threshold_byte_size,
collect_threshold_num_rows,
)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
PartitionMode::Auto => try_collect_left(hash_join, false, config)?
.map_or_else(
|| partitioned_hash_join(hash_join, config).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)?
.map_or_else(
|| partitioned_hash_join(hash_join).map(Some),
|| partitioned_hash_join(hash_join, config).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::Partitioned => {
Expand All @@ -286,7 +305,7 @@ fn statistical_join_selection_subrule(
// Don't swap null-aware anti joins as they have specific side requirements
if hash_join.join_type().supports_swap()
&& !hash_join.null_aware
&& should_swap_join_order(&**left, &**right)?
&& should_swap_join_order(&**left, &**right, config)?
{
hash_join
.swap_inputs(PartitionMode::Partitioned)
Expand All @@ -299,7 +318,7 @@ fn statistical_join_selection_subrule(
} else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
let left = cross_join.left();
let right = cross_join.right();
if should_swap_join_order(&**left, &**right)? {
if should_swap_join_order(&**left, &**right, config)? {
cross_join.swap_inputs().map(Some)?
} else {
None
Expand All @@ -308,7 +327,7 @@ fn statistical_join_selection_subrule(
let left = nl_join.left();
let right = nl_join.right();
if nl_join.join_type().supports_swap()
&& should_swap_join_order(&**left, &**right)?
&& should_swap_join_order(&**left, &**right, config)?
{
nl_join.swap_inputs().map(Some)?
} else {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150
datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.join_reordering true
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
Expand Down Expand Up @@ -452,6 +453,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum n
datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used.
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
Expand Down
62 changes: 62 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,37 @@ physical_plan
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
06)--------DataSourceExec: partitions=1, partition_sizes=[1]

# don't reorder join based on stats, and use the order in the query

statement ok
set datafusion.optimizer.join_reordering = false;

statement ok
set datafusion.explain.physical_plan_only = true;

query TT
EXPLAIN
SELECT join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
FROM join_t1
INNER JOIN join_t2
ON join_t1.t1_id + cast(11 as INT UNSIGNED) = join_t2.t2_id
----
physical_plan
01)ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)], projection=[t1_id@0, t1_name@1, t2_id@3]
03)----CoalescePartitionsExec
04)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
08)------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.optimizer.join_reordering = true;

statement ok
set datafusion.explain.physical_plan_only = false;

statement ok
set datafusion.optimizer.repartition_joins = true;

Expand Down Expand Up @@ -2053,6 +2084,37 @@ ORDER BY 1
33 11
44 11

# don't reorder join based on stats, and use the order in the query

statement ok
set datafusion.optimizer.join_reordering = false;

statement ok
set datafusion.explain.physical_plan_only = true;

query TT
EXPLAIN
SELECT join_t1.t1_id, join_t2.t2_id
FROM join_t1
INNER JOIN join_t2 ON join_t1.t1_id > join_t2.t2_id
WHERE join_t1.t1_id > 10 AND join_t2.t2_int > 1
----
physical_plan
01)NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
02)--CoalescePartitionsExec
03)----FilterExec: t1_id@0 > 10
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
06)--FilterExec: t2_int@1 > 1, projection=[t2_id@0]
07)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.optimizer.join_reordering = true;

statement ok
set datafusion.explain.physical_plan_only = false;

# Left as inner table nested loop join

query TT
Expand Down
Loading
Loading