Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 202 additions & 4 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,15 +839,23 @@ fn collect_new_statistics(
};
};
let (lower, upper) = interval.into_bounds();
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
let min_value = interval_bound_to_precision(lower, is_exact);
let max_value = interval_bound_to_precision(upper, is_exact);
let is_single_value =
!lower.is_null() && !upper.is_null() && lower == upper;
let min_value = interval_bound_to_precision(lower, is_single_value);
let max_value = interval_bound_to_precision(upper, is_single_value);
// When the interval collapses to a single value (equality
// predicate), the column has exactly 1 distinct value
let capped_distinct_count = if is_single_value {
Precision::Exact(1)
} else {
distinct_count.to_inexact()
};
ColumnStatistics {
null_count: input_column_stats[idx].null_count.to_inexact(),
max_value,
min_value,
sum_value: Precision::Absent,
distinct_count: distinct_count.to_inexact(),
distinct_count: capped_distinct_count,
byte_size: input_column_stats[idx].byte_size,
}
},
Expand Down Expand Up @@ -2274,4 +2282,194 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> {
// a: min=1, max=100, ndv=80
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(400),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
distinct_count: Precision::Inexact(80),
..Default::default()
}],
},
schema.clone(),
));

// a = 42 collapses interval to a single value
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(
statistics.column_statistics[0].distinct_count,
Precision::Exact(1)
);
Ok(())
}

#[tokio::test]
async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> {
// a: min=1, max=100, ndv=80
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(400),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
distinct_count: Precision::Inexact(80),
..Default::default()
}],
},
schema.clone(),
));

// a = 42 OR a = 22: interval stays [1, 100], not a single value
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
)),
Operator::Or,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(22)))),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(
statistics.column_statistics[0].distinct_count,
Precision::Inexact(80)
);
Ok(())
}

#[tokio::test]
async fn test_filter_statistics_and_equality_ndv() -> Result<()> {
// a: min=1, max=100, ndv=80
// b: min=1, max=50, ndv=40
// c: min=1, max=200, ndv=150
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1200),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
distinct_count: Precision::Inexact(80),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
distinct_count: Precision::Inexact(40),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
distinct_count: Precision::Inexact(150),
..Default::default()
},
],
},
schema.clone(),
));

// a = 42 AND b > 10 AND c = 7
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(7)))),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
// a = 42 collapses to single value
assert_eq!(
statistics.column_statistics[0].distinct_count,
Precision::Exact(1)
);
// b > 10 narrows to [11, 50] but doesn't collapse
assert_eq!(
statistics.column_statistics[1].distinct_count,
Precision::Inexact(40)
);
// c = 7 collapses to single value
assert_eq!(
statistics.column_statistics[2].distinct_count,
Precision::Exact(1)
);
Ok(())
}

#[tokio::test]
async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> {
// a: ndv=80, no min/max
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(400),
column_statistics: vec![ColumnStatistics {
distinct_count: Precision::Inexact(80),
..Default::default()
}],
},
schema.clone(),
));

// a = 42: even without known bounds, interval analysis resolves
// the equality to [42, 42], so NDV is correctly set to Exact(1)
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.partition_statistics(None)?;
assert_eq!(
statistics.column_statistics[0].distinct_count,
Precision::Exact(1)
);
Ok(())
}
}
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/parquet_statistics.slt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ query TT
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests with e.g. floating point numbers, strings? I think it should work just the same but....

----
physical_plan
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]]
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]]
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]

Expand All @@ -84,7 +84,7 @@ query TT
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
----
physical_plan
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]]
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]]
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]

Expand All @@ -110,7 +110,7 @@ query TT
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
----
physical_plan
01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]]
01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Distinct=Exact(1))]]
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]

Expand Down
Loading