From 3ecdea7dc311c35658938ecce04450647084f319 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 20 Mar 2026 14:45:37 +0800 Subject: [PATCH 1/2] add configuration to disable join reordering --- datafusion/common/src/config.rs | 6 ++ .../physical-optimizer/src/join_selection.rs | 95 +++++++++++-------- .../test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/joins.slt | 62 ++++++++++++ 4 files changed, 127 insertions(+), 38 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9b6e6aa5dac37..e6d5fbfc50a21 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1088,6 +1088,12 @@ config_namespace! { /// process to reorder the join keys pub top_down_join_key_reordering: bool, default = true + /// When set to true, the physical plan optimizer may swap join inputs + /// based on statistics. When set to false, statistics-driven join + /// input reordering is disabled and the original join order in the + /// query is used. + pub join_reordering: bool, default = true + /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory pub prefer_hash_join: bool, default = true diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 88115b12f7820..2dcc362aa761d 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -55,11 +55,27 @@ impl JoinSelection { // TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller. // TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times. -/// Checks statistics for join swap. +/// Checks whether join inputs should be swapped using available statistics. +/// +/// It follows these steps: +/// 1. Compare the in-memory sizes of both sides, and place the smaller side on +/// the left (build) side. +/// 2. If in-memory byte sizes are unavailable, fall back to row counts. +/// 3. Do not reorder the join if neither statistic is available, or if +/// `datafusion.optimizer.join_reordering` is disabled. +/// +/// +/// Used configurations inside arg `config` +/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping pub(crate) fn should_swap_join_order( left: &dyn ExecutionPlan, right: &dyn ExecutionPlan, + config: &ConfigOptions, ) -> Result { + if !config.optimizer.join_reordering { + return Ok(false); + } + // Get the left and right table's total bytes // If both the left and right tables contain total_byte_size statistics, // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows` @@ -133,17 +149,8 @@ impl PhysicalOptimizerRule for JoinSelection { // do not modify join sides. // - We will also swap left and right sides for cross joins so that the left // side is the small side. - let config = &config.optimizer; - let collect_threshold_byte_size = config.hash_join_single_partition_threshold; - let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; new_plan - .transform_up(|plan| { - statistical_join_selection_subrule( - plan, - collect_threshold_byte_size, - collect_threshold_num_rows, - ) - }) + .transform_up(|plan| statistical_join_selection_subrule(plan, config)) .data() } @@ -162,26 +169,31 @@ impl PhysicalOptimizerRule for JoinSelection { /// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides. /// When the `ignore_threshold` is false, this function will also check left /// and right sizes in bytes or rows. +/// +/// Used configurations inside arg `config` +/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft` +/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft` +/// - `config.optimizer.join_reordering`: allows or forbids input swapping pub(crate) fn try_collect_left( hash_join: &HashJoinExec, ignore_threshold: bool, - threshold_byte_size: usize, - threshold_num_rows: usize, + config: &ConfigOptions, ) -> Result>> { let left = hash_join.left(); let right = hash_join.right(); + let optimizer_config = &config.optimizer; let left_can_collect = ignore_threshold || supports_collect_by_thresholds( &**left, - threshold_byte_size, - threshold_num_rows, + optimizer_config.hash_join_single_partition_threshold, + optimizer_config.hash_join_single_partition_threshold_rows, ); let right_can_collect = ignore_threshold || supports_collect_by_thresholds( &**right, - threshold_byte_size, - threshold_num_rows, + optimizer_config.hash_join_single_partition_threshold, + optimizer_config.hash_join_single_partition_threshold_rows, ); match (left_can_collect, right_can_collect) { @@ -189,7 +201,7 @@ pub(crate) fn try_collect_left( // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() && !hash_join.null_aware - && should_swap_join_order(&**left, &**right)? + && should_swap_join_order(&**left, &**right, config)? { Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?)) } else { @@ -209,7 +221,10 @@ pub(crate) fn try_collect_left( ))), (false, true) => { // Don't swap null-aware anti joins as they have specific side requirements - if hash_join.join_type().supports_swap() && !hash_join.null_aware { + if optimizer_config.join_reordering + && hash_join.join_type().supports_swap() + && !hash_join.null_aware + { hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some) } else { Ok(None) @@ -224,15 +239,19 @@ pub(crate) fn try_collect_left( /// Checks if the join order should be swapped based on the join type and input statistics. /// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise, /// creates a standard partitioned hash join. +/// +/// Used configurations inside arg `config` +/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping pub(crate) fn partitioned_hash_join( hash_join: &HashJoinExec, + config: &ConfigOptions, ) -> Result> { let left = hash_join.left(); let right = hash_join.right(); // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() && !hash_join.null_aware - && should_swap_join_order(&**left, &**right)? + && should_swap_join_order(&**left, &**right, config)? { hash_join.swap_inputs(PartitionMode::Partitioned) } else { @@ -256,28 +275,28 @@ pub(crate) fn partitioned_hash_join( } /// This subrule tries to modify a given plan so that it can -/// optimize hash and cross joins in the plan according to available statistical information. +/// optimize hash and cross joins in the plan according to available statistical +/// information. +/// +/// Used configurations inside arg `config` +/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft` +/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft` +/// - `config.optimizer.join_reordering`: allows or forbids input swapping fn statistical_join_selection_subrule( plan: Arc, - collect_threshold_byte_size: usize, - collect_threshold_num_rows: usize, + config: &ConfigOptions, ) -> Result>> { let transformed = if let Some(hash_join) = plan.as_any().downcast_ref::() { match hash_join.partition_mode() { - PartitionMode::Auto => try_collect_left( - hash_join, - false, - collect_threshold_byte_size, - collect_threshold_num_rows, - )? - .map_or_else( - || partitioned_hash_join(hash_join).map(Some), - |v| Ok(Some(v)), - )?, - PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)? + PartitionMode::Auto => try_collect_left(hash_join, false, config)? + .map_or_else( + || partitioned_hash_join(hash_join, config).map(Some), + |v| Ok(Some(v)), + )?, + PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)? .map_or_else( - || partitioned_hash_join(hash_join).map(Some), + || partitioned_hash_join(hash_join, config).map(Some), |v| Ok(Some(v)), )?, PartitionMode::Partitioned => { @@ -286,7 +305,7 @@ fn statistical_join_selection_subrule( // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() && !hash_join.null_aware - && should_swap_join_order(&**left, &**right)? + && should_swap_join_order(&**left, &**right, config)? { hash_join .swap_inputs(PartitionMode::Partitioned) @@ -299,7 +318,7 @@ fn statistical_join_selection_subrule( } else if let Some(cross_join) = plan.as_any().downcast_ref::() { let left = cross_join.left(); let right = cross_join.right(); - if should_swap_join_order(&**left, &**right)? { + if should_swap_join_order(&**left, &**right, config)? { cross_join.swap_inputs().map(Some)? } else { None @@ -308,7 +327,7 @@ fn statistical_join_selection_subrule( let left = nl_join.left(); let right = nl_join.right(); if nl_join.join_type().supports_swap() - && should_swap_join_order(&**left, &**right)? + && should_swap_join_order(&**left, &**right, config)? { nl_join.swap_inputs().map(Some)? } else { diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index aeeb3481c76b9..f843b2e63ccba 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -312,6 +312,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 +datafusion.optimizer.join_reordering true datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false @@ -452,6 +453,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum n datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition +datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used. datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 136a68573562a..e30343ddd985f 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1596,6 +1596,37 @@ physical_plan 05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 06)--------DataSourceExec: partitions=1, partition_sizes=[1] +# don't reorder join based on stats, and use the order in the query + +statement ok +set datafusion.optimizer.join_reordering = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN +SELECT join_t1.t1_id, join_t2.t2_id, join_t1.t1_name +FROM join_t1 +INNER JOIN join_t2 +ON join_t1.t1_id + cast(11 as INT UNSIGNED) = join_t2.t2_id +---- +physical_plan +01)ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)], projection=[t1_id@0, t1_name@1, t2_id@3] +03)----CoalescePartitionsExec +04)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] +07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +08)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.join_reordering = true; + +statement ok +set datafusion.explain.physical_plan_only = false; + statement ok set datafusion.optimizer.repartition_joins = true; @@ -2053,6 +2084,37 @@ ORDER BY 1 33 11 44 11 +# don't reorder join based on stats, and use the order in the query + +statement ok +set datafusion.optimizer.join_reordering = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN +SELECT join_t1.t1_id, join_t2.t2_id +FROM join_t1 +INNER JOIN join_t2 ON join_t1.t1_id > join_t2.t2_id +WHERE join_t1.t1_id > 10 AND join_t2.t2_int > 1 +---- +physical_plan +01)NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1 +02)--CoalescePartitionsExec +03)----FilterExec: t1_id@0 > 10 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------DataSourceExec: partitions=1, partition_sizes=[1] +06)--FilterExec: t2_int@1 > 1, projection=[t2_id@0] +07)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.join_reordering = true; + +statement ok +set datafusion.explain.physical_plan_only = false; + # Left as inner table nested loop join query TT From 54c20cb2064855a3f8cb4cbb5329bd49c683bf5c Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 20 Mar 2026 15:42:12 +0800 Subject: [PATCH 2/2] update config docs --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6f6d5b205877f..56ab4d1539f92 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -157,6 +157,7 @@ The following configuration settings are available: | datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.join_reordering | true | When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used. | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | | datafusion.optimizer.enable_piecewise_merge_join | false | When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. | | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |