diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index f263c905faf6b..a210155b9e416 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -551,6 +551,10 @@ impl Statistics { } Precision::Absent => Precision::Absent, }; + // NDV can never exceed the number of rows + if let Some(&rows) = self.num_rows.get_value() { + cs.distinct_count = cs.distinct_count.min(&Precision::Inexact(rows)); + } cs }) .collect(); @@ -1974,7 +1978,8 @@ mod tests { result_col_stats.sum_value, Precision::Inexact(ScalarValue::Int32(Some(123456))) ); - assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789)); + // NDV is capped at the new row count (250) since 789 > 250 + assert_eq!(result_col_stats.distinct_count, Precision::Inexact(250)); } #[test] @@ -2075,6 +2080,46 @@ mod tests { assert_eq!(result.total_byte_size, Precision::Inexact(800)); } + #[test] + fn test_with_fetch_caps_ndv_at_row_count() { + // NDV=500 but after LIMIT 10, NDV should be capped at 10 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 0, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(10) + ); + } + + #[test] + fn test_with_fetch_ndv_below_row_count_unchanged() { + // NDV=5 and LIMIT 10: NDV should stay at 5 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(5), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 0, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + } + #[test] fn test_try_merge_iter_basic() { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 561c6b3b246ff..8210ffb34bd4b 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -285,17 +285,18 @@ async fn sql_limit() -> Result<()> { let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is smaller than the original number of lines we mark the statistics as inexact + // and cap NDV at the new row count + let limit_stats = physical_plan.partition_statistics(None)?; + assert_eq!(limit_stats.num_rows, Precision::Exact(5)); + // c1: NDV=2 stays at 2 (already below limit of 5) assert_eq!( - Statistics { - num_rows: Precision::Exact(5), - column_statistics: stats - .column_statistics - .iter() - .map(|c| c.clone().to_inexact()) - .collect(), - total_byte_size: Precision::Absent - }, - *physical_plan.partition_statistics(None)? + limit_stats.column_statistics[0].distinct_count, + Precision::Inexact(2) + ); + // c2: NDV=13 capped to 5 (the limit row count) + assert_eq!( + limit_stats.column_statistics[1].distinct_count, + Precision::Inexact(5) ); let df = ctx diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 141d9c38469d8..c81a4673d93fb 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -341,6 +341,7 @@ impl FilterExec { schema, &input_stats.column_statistics, analysis_ctx.boundaries, + num_rows.get_value().copied(), ); Ok(Statistics { num_rows, @@ -809,6 +810,7 @@ fn collect_new_statistics( schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, + filtered_num_rows: Option, ) -> Vec { analysis_boundaries .into_iter() @@ -842,12 +844,19 @@ fn collect_new_statistics( let is_exact = !lower.is_null() && !upper.is_null() && lower == upper; let min_value = interval_bound_to_precision(lower, is_exact); let max_value = interval_bound_to_precision(upper, is_exact); + // NDV can never exceed the number of rows after filtering + let capped_distinct_count = match filtered_num_rows { + Some(rows) => { + distinct_count.to_inexact().min(&Precision::Inexact(rows)) + } + None => distinct_count.to_inexact(), + }; ColumnStatistics { null_count: input_column_stats[idx].null_count.to_inexact(), max_value, min_value, sum_value: Precision::Absent, - distinct_count: distinct_count.to_inexact(), + distinct_count: capped_distinct_count, byte_size: input_column_stats[idx].byte_size, } }, @@ -2274,4 +2283,41 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> { + // Table: a: min=1, max=100, distinct_count=80, 100 rows + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a <= 10 => ~10 rows out of 100 + let predicate: Arc = + binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + + let statistics = filter.partition_statistics(None)?; + // Filter estimates ~10 rows (selectivity = 10/100) + assert_eq!(statistics.num_rows, Precision::Inexact(10)); + // NDV should be capped at the filtered row count (10), not the original 80 + let ndv = &statistics.column_statistics[0].distinct_count; + assert!( + ndv.get_value().copied() <= Some(10), + "Expected NDV <= 10 (filtered row count), got {ndv:?}" + ); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 3130134e253d9..ee491161ecca4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -707,7 +707,13 @@ fn max_distinct_count( stats: &ColumnStatistics, ) -> Precision { match &stats.distinct_count { - &dc @ (Precision::Exact(_) | Precision::Inexact(_)) => dc, + &dc @ (Precision::Exact(_) | Precision::Inexact(_)) => { + // NDV can never exceed the number of rows + match num_rows { + Precision::Absent => dc, + _ => dc.min(num_rows).to_inexact(), + } + } _ => { // The number can never be greater than the number of rows we have // minus the nulls (since they don't count as distinct values). @@ -2292,6 +2298,22 @@ mod tests { (10, Inexact(1), Inexact(10), Absent, Absent), Some(Inexact(0)), ), + // NDV > num_rows: distinct count should be capped at row count + ( + (5, Inexact(1), Inexact(100), Inexact(50), Absent), + (10, Inexact(1), Inexact(100), Inexact(50), Absent), + // max_distinct_count caps: left NDV=min(50,5)=5, right NDV=min(50,10)=10 + // cardinality = (5 * 10) / max(5, 10) = 50 / 10 = 5 + Some(Inexact(5)), + ), + // NDV > num_rows on one side only + ( + (3, Inexact(1), Inexact(100), Inexact(100), Absent), + (10, Inexact(1), Inexact(100), Inexact(5), Absent), + // max_distinct_count caps: left NDV=min(100,3)=3, right NDV=min(5,10)=5 + // cardinality = (3 * 10) / max(3, 5) = 30 / 5 = 6 + Some(Inexact(6)), + ), ]; for (left_info, right_info, expected_cardinality) in cases { @@ -2431,11 +2453,14 @@ mod tests { // y: min=0, max=100, distinct=None // // Join on a=c, b=d (ignore x/y) + // Right column d has NDV=2500 but only 2000 rows, so NDV is capped + // to 2000. join_selectivity = max(500, 2000) = 2000. + // Inner cardinality = (1000 * 2000) / 2000 = 1000 let cases = vec![ - (JoinType::Inner, 800), + (JoinType::Inner, 1000), (JoinType::Left, 1000), (JoinType::Right, 2000), - (JoinType::Full, 2200), + (JoinType::Full, 2000), ]; let left_col_stats = vec![