From 0613f1af4f71e56442ad154e0aede0a41698cd62 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 5 Mar 2026 14:47:17 +0100 Subject: [PATCH 1/3] Set distinct_count to 1 when filter narrows interval to single value When a filter predicate collapses a column's interval to a single value (e.g. d_qoy = 1), the output has exactly 1 distinct value. Previously the original Parquet NDV was propagated, inflating GROUP BY output estimates for CTE self-join patterns like Q31. --- datafusion/physical-plan/src/filter.rs | 206 +++++++++++++++++- .../test_files/parquet_statistics.slt | 6 +- 2 files changed, 205 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 141d9c38469d8..65745cafd9bb0 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -839,15 +839,23 @@ fn collect_new_statistics( }; }; let (lower, upper) = interval.into_bounds(); - let is_exact = !lower.is_null() && !upper.is_null() && lower == upper; - let min_value = interval_bound_to_precision(lower, is_exact); - let max_value = interval_bound_to_precision(upper, is_exact); + let is_single_value = + !lower.is_null() && !upper.is_null() && lower == upper; + let min_value = interval_bound_to_precision(lower, is_single_value); + let max_value = interval_bound_to_precision(upper, is_single_value); + // When the interval collapses to a single value (equality + // predicate), the column has exactly 1 distinct value + let capped_distinct_count = if is_single_value { + Precision::Exact(1) + } else { + distinct_count.to_inexact() + }; ColumnStatistics { null_count: input_column_stats[idx].null_count.to_inexact(), max_value, min_value, sum_value: Precision::Absent, - distinct_count: distinct_count.to_inexact(), + distinct_count: capped_distinct_count, byte_size: input_column_stats[idx].byte_size, } }, @@ -2274,4 +2282,194 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> { + // a: min=1, max=100, ndv=80 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a = 42 collapses interval to a single value + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> { + // a: min=1, max=100, ndv=80 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a = 42 OR a = 22: interval stays [1, 100], not a single value + let predicate = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::Or, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(22)))), + )), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Inexact(80) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_and_equality_ndv() -> Result<()> { + // a: min=1, max=100, ndv=80 + // b: min=1, max=50, ndv=40 + // c: min=1, max=200, ndv=150 + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1200), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(50))), + distinct_count: Precision::Inexact(40), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + distinct_count: Precision::Inexact(150), + ..Default::default() + }, + ], + }, + schema.clone(), + )); + + // a = 42 AND b > 10 AND c = 7 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(7)))), + )), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + // a = 42 collapses to single value + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + // b > 10 narrows to [11, 50] but doesn't collapse + assert_eq!( + statistics.column_statistics[1].distinct_count, + Precision::Inexact(40) + ); + // c = 7 collapses to single value + assert_eq!( + statistics.column_statistics[2].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> { + // a: ndv=80, no min/max + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a = 42: even without known bounds, interval analysis resolves + // the equality to [42, 42], so NDV is correctly set to Exact(1) + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/parquet_statistics.slt b/datafusion/sqllogictest/test_files/parquet_statistics.slt index 8c77fb96ba75c..d86c8a78ae46d 100644 --- a/datafusion/sqllogictest/test_files/parquet_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_statistics.slt @@ -59,7 +59,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -84,7 +84,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -110,7 +110,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Distinct=Exact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] From 1531e5cfb55388e3f92643ea6a6f17fdb87d2f11 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Mon, 23 Mar 2026 12:14:41 +0100 Subject: [PATCH 2/3] Add tests for additional numeric types and reversed operand order Cover Int8, Int64, Float32 equality predicates and reversed operand order (literal = column) for NDV single-value optimization. --- datafusion/physical-plan/src/filter.rs | 133 +++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 65745cafd9bb0..ad51418650039 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -2472,4 +2472,137 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_equality_int8_ndv() -> Result<()> { + // a: min=-100, max=100, ndv=50 + let schema = Schema::new(vec![Field::new("a", DataType::Int8, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int8(Some(-100))), + max_value: Precision::Inexact(ScalarValue::Int8(Some(100))), + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int8(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_int64_ndv() -> Result<()> { + // a: min=0, max=1_000_000, ndv=100_000 + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100_000), + total_byte_size: Precision::Inexact(800_000), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int64(Some(0))), + max_value: Precision::Inexact(ScalarValue::Int64(Some(1_000_000))), + distinct_count: Precision::Inexact(100_000), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int64(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_float32_ndv() -> Result<()> { + // a: min=0.0, max=100.0, ndv=50 + let schema = Schema::new(vec![Field::new("a", DataType::Float32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Float32(Some(0.0))), + max_value: Precision::Inexact(ScalarValue::Float32(Some(100.0))), + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Float32(Some(3.14)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_reversed_ndv() -> Result<()> { + // a: min=1, max=100, ndv=80 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // 42 = a (literal on the left) + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + Operator::Eq, + Arc::new(Column::new("a", 0)), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } } From 238ab0275d144cdb7f4ab7c332e08ae7025d48f6 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Mon, 23 Mar 2026 13:56:55 +0100 Subject: [PATCH 3/3] Fix clippy approx_constant warning in Float32 test --- datafusion/physical-plan/src/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ad51418650039..7a9937fd3e7e4 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -2560,7 +2560,7 @@ mod tests { let predicate = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), Operator::Eq, - Arc::new(Literal::new(ScalarValue::Float32(Some(3.14)))), + Arc::new(Literal::new(ScalarValue::Float32(Some(42.5)))), )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?);