From 3b11b5de017fbf658cd86a25d601f0b2dff5970c Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Fri, 20 Mar 2026 21:43:45 +0800 Subject: [PATCH] Add configurable UNION DISTINCT to FILTER rewrite optimization --- datafusion/common/src/config.rs | 7 + datafusion/optimizer/src/lib.rs | 1 + datafusion/optimizer/src/optimizer.rs | 2 + datafusion/optimizer/src/unions_to_filter.rs | 443 ++++++++++++++++++ .../sqllogictest/test_files/explain.slt | 4 + .../test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/union.slt | 43 ++ docs/source/user-guide/configs.md | 1 + 8 files changed, 503 insertions(+) create mode 100644 datafusion/optimizer/src/unions_to_filter.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9b6e6aa5dac37..0dc9a13550e1e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1168,6 +1168,13 @@ config_namespace! { /// closer to the leaf table scans, and push those projections down /// towards the leaf nodes. pub enable_leaf_expression_pushdown: bool, default = true + + /// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that + /// read from the same source and differ only by filter predicates into a single branch + /// with a combined filter. This optimization is conservative and only applies when the + /// branches share the same source and compatible wrapper nodes such as identical + /// projections or aliases. + pub enable_unions_to_filter: bool, default = false } } diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index e610091824092..47adc99ff21b4 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -70,6 +70,7 @@ pub mod rewrite_set_comparison; pub mod scalar_subquery_to_join; pub mod simplify_expressions; pub mod single_distinct_to_groupby; +pub mod unions_to_filter; pub mod utils; #[cfg(test)] diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index bdea6a83072cd..22775a4136ded 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -56,6 +56,7 @@ use crate::rewrite_set_comparison::RewriteSetComparison; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; use crate::single_distinct_to_groupby::SingleDistinctToGroupBy; +use crate::unions_to_filter::UnionsToFilter; use crate::utils::log_plan; /// Transforms one [`LogicalPlan`] into another which computes the same results, @@ -280,6 +281,7 @@ impl Optimizer { let rules: Vec> = vec![ Arc::new(RewriteSetComparison::new()), Arc::new(OptimizeUnions::new()), + Arc::new(UnionsToFilter::new()), Arc::new(SimplifyExpressions::new()), Arc::new(ReplaceDistinctWithAggregate::new()), Arc::new(EliminateJoin::new()), diff --git a/datafusion/optimizer/src/unions_to_filter.rs b/datafusion/optimizer/src/unions_to_filter.rs new file mode 100644 index 0000000000000..a22e514e40948 --- /dev/null +++ b/datafusion/optimizer/src/unions_to_filter.rs @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Rewrites `UNION DISTINCT` branches that differ only by filter predicates +//! into a single filtered branch plus `DISTINCT`. + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; +use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; +use datafusion_expr::utils::disjunction; +use datafusion_expr::{ + Distinct, Expr, Filter, LogicalPlan, Projection, SubqueryAlias, Union, +}; +use std::collections::HashMap; +use std::sync::Arc; + +#[derive(Default, Debug)] +pub struct UnionsToFilter; + +impl UnionsToFilter { + #[expect(missing_docs)] + pub fn new() -> Self { + Self + } +} + +impl OptimizerRule for UnionsToFilter { + fn name(&self) -> &str { + "unions_to_filter" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + if !config.options().optimizer.enable_unions_to_filter { + return Ok(Transformed::no(plan)); + } + + match plan { + LogicalPlan::Distinct(Distinct::All(input)) => { + let inner = Arc::unwrap_or_clone(input); + match try_rewrite_distinct_union(inner.clone())? { + Some(rewritten) => Ok(Transformed::yes(rewritten)), + None => Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( + Arc::new(inner), + )))), + } + } + _ => Ok(Transformed::no(plan)), + } + } +} + +fn try_rewrite_distinct_union(plan: LogicalPlan) -> Result> { + let LogicalPlan::Union(Union { inputs, schema }) = plan else { + return Ok(None); + }; + + if inputs.len() < 2 { + return Ok(None); + } + + let mut grouped: HashMap> = HashMap::new(); + let mut input_order: Vec = Vec::new(); + let mut transformed = false; + + for input in inputs { + let Some(branch) = extract_branch(Arc::unwrap_or_clone(input))? else { + return Ok(None); + }; + + let key = GroupKey { + source: branch.source, + wrappers: branch.wrappers, + }; + if let Some(conds) = grouped.get_mut(&key) { + conds.push(branch.predicate); + transformed = true; + } else { + input_order.push(key.clone()); + grouped.insert(key, vec![branch.predicate]); + } + } + + if !transformed { + return Ok(None); + } + + let mut builder: Option = None; + for key in input_order { + let predicates = grouped + .remove(&key) + .expect("grouped predicates should exist for every source"); + let combined = + disjunction(predicates).expect("union branches always provide predicates"); + let branch = LogicalPlanBuilder::from(key.source) + .filter(combined)? + .build()?; + let branch = wrap_branch(branch, &key.wrappers)?; + let branch = coerce_plan_expr_for_schema(branch, &schema)?; + let branch = align_plan_to_schema(branch, Arc::clone(&schema))?; + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(builder) => builder.union(branch)?, + }); + } + + let union = builder + .expect("at least one branch after rewrite") + .build()?; + Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(union))))) +} + +struct Branch { + source: LogicalPlan, + predicate: Expr, + wrappers: Vec, +} + +fn extract_branch(plan: LogicalPlan) -> Result> { + let (wrappers, plan) = peel_wrappers(plan); + match plan { + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + if !is_mergeable_predicate(&predicate) { + return Ok(None); + } + Ok(Some(Branch { + source: strip_passthrough_nodes(Arc::unwrap_or_clone(input)), + predicate, + wrappers, + })) + } + other => Ok(Some(Branch { + source: strip_passthrough_nodes(other.clone()), + predicate: Expr::Literal( + datafusion_common::ScalarValue::Boolean(Some(true)), + None, + ), + wrappers, + })), + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct GroupKey { + source: LogicalPlan, + wrappers: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum Wrapper { + Projection { + expr: Vec, + schema: datafusion_common::DFSchemaRef, + }, + SubqueryAlias { + alias: datafusion_common::TableReference, + schema: datafusion_common::DFSchemaRef, + }, +} + +fn peel_wrappers(mut plan: LogicalPlan) -> (Vec, LogicalPlan) { + let mut wrappers = vec![]; + loop { + match plan { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => { + wrappers.push(Wrapper::Projection { expr, schema }); + plan = Arc::unwrap_or_clone(input); + } + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + .. + }) => { + wrappers.push(Wrapper::SubqueryAlias { alias, schema }); + plan = Arc::unwrap_or_clone(input); + } + other => return (wrappers, other), + } + } +} + +fn wrap_branch(mut plan: LogicalPlan, wrappers: &[Wrapper]) -> Result { + for wrapper in wrappers.iter().rev() { + plan = match wrapper { + Wrapper::Projection { expr, schema } => { + LogicalPlan::Projection(Projection::try_new_with_schema( + expr.clone(), + Arc::new(plan), + Arc::clone(schema), + )?) + } + Wrapper::SubqueryAlias { alias, .. } => LogicalPlan::SubqueryAlias( + SubqueryAlias::try_new(Arc::new(plan), alias.clone())?, + ), + }; + } + Ok(plan) +} + +fn strip_passthrough_nodes(plan: LogicalPlan) -> LogicalPlan { + match plan { + LogicalPlan::Projection(Projection { input, .. }) => { + strip_passthrough_nodes(Arc::unwrap_or_clone(input)) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + strip_passthrough_nodes(Arc::unwrap_or_clone(input)) + } + other => other, + } +} + +fn align_plan_to_schema( + plan: LogicalPlan, + schema: datafusion_common::DFSchemaRef, +) -> Result { + if plan.schema() == &schema { + return Ok(plan); + } + + let expr = plan + .schema() + .iter() + .enumerate() + .map(|(i, _)| { + Expr::Column(datafusion_common::Column::from( + plan.schema().qualified_field(i), + )) + }) + .collect::>(); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr, + Arc::new(plan), + schema, + )?)) +} + +fn is_mergeable_predicate(expr: &Expr) -> bool { + !expr.is_volatile() && !expr_contains_subquery(expr) +} + +fn expr_contains_subquery(expr: &Expr) -> bool { + expr.exists(|e| match e { + Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true), + _ => Ok(false), + }) + .expect("boolean expression walk is infallible") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::OptimizerContext; + use crate::assert_optimized_plan_eq_snapshot; + use crate::test::test_table_scan_with_name; + use arrow::datatypes::DataType; + use datafusion_common::Result; + use datafusion_expr::{ + ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, col, lit, + }; + use std::any::Any; + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let mut options = datafusion_common::config::ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let optimizer_ctx = OptimizerContext::new_with_config_options(Arc::new(options)) + .with_max_passes(1); + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + assert_optimized_plan_eq_snapshot!( + optimizer_ctx, + rules, + $plan, + @ $expected, + ) + }}; + } + + #[derive(Debug, PartialEq, Eq, Hash)] + struct VolatileTestUdf; + + impl ScalarUDFImpl for VolatileTestUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "volatile_test" + } + + fn signature(&self) -> &Signature { + static SIGNATURE: std::sync::LazyLock = + std::sync::LazyLock::new(|| Signature::nullary(Volatility::Volatile)); + &SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + panic!("VolatileTestUdf is not intended for execution") + } + } + + fn volatile_expr() -> Expr { + ScalarUDF::new_from_impl(VolatileTestUdf).call(vec![]) + } + + #[test] + fn rewrite_union_distinct_same_source_filters() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: t.a, t.b, t.c + Filter: t.a = Int32(1) OR t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + #[test] + fn keep_union_distinct_different_sources() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .filter(col("a").eq(lit(1)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t2")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Filter: t1.a = Int32(1) + TableScan: t1 + Filter: t2.a = Int32(2) + TableScan: t2 + ")?; + Ok(()) + } + + #[test] + fn keep_union_distinct_with_volatile_predicate() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(volatile_expr().gt(lit(0.5_f64)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Filter: volatile_test() > Float64(0.5) + TableScan: t + Filter: t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + #[test] + fn rewrite_union_distinct_with_matching_projection_prefix() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .filter(col("b").eq(lit(5)))? + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: emp.a AS mgr, emp.b AS comm + Filter: Boolean(true) OR emp.b = Int32(5) + TableScan: emp + ")?; + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 9916892058569..0a37fbd99cac4 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -178,6 +178,7 @@ logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -202,6 +203,7 @@ logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -549,6 +551,7 @@ logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -573,6 +576,7 @@ logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index aeeb3481c76b9..0fde12a9a1e4d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -305,6 +305,7 @@ datafusion.optimizer.enable_sort_pushdown true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_topk_dynamic_filter_pushdown true datafusion.optimizer.enable_topk_repartition true +datafusion.optimizer.enable_unions_to_filter false datafusion.optimizer.enable_window_limits true datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false @@ -445,6 +446,7 @@ datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. +datafusion.optimizer.enable_unions_to_filter false When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index d858d0ae3ea4e..e2e38acae4265 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -273,6 +273,49 @@ physical_plan 04)--ProjectionExec: expr=[name@0 || _new as name] 05)----DataSourceExec: partitions=1, partition_sizes=[1] +# unions_to_filter is disabled by default +query TT +EXPLAIN SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +logical_plan +01)Aggregate: groupBy=[[id, name]], aggr=[[]] +02)--Union +03)----Filter: t1.id = Int32(1) +04)------TableScan: t1 projection=[id, name] +05)----Filter: t1.id = Int32(2) +06)------TableScan: t1 projection=[id, name] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------UnionExec +06)----------FilterExec: id@0 = 1 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] +08)----------FilterExec: id@0 = 2 +09)------------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.enable_unions_to_filter = true; + +query TT +EXPLAIN SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +logical_plan +01)Aggregate: groupBy=[[id, name]], aggr=[[]] +02)--Filter: t1.id = Int32(1) OR t1.id = Int32(2) +03)----TableScan: t1 projection=[id, name] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------FilterExec: id@0 = 1 OR id@0 = 2 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.enable_unions_to_filter = false; + # Make sure to choose a small batch size to introduce parallelism to the plan. statement ok set datafusion.execution.batch_size = 2; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6f6d5b205877f..4fa2299ecb05d 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -168,6 +168,7 @@ The following configuration settings are available: | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | | datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. | +| datafusion.optimizer.enable_unions_to_filter | false | When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |