From d015b065e43e96302814acf5a5f58ee004d539c9 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Tue, 5 May 2026 22:05:19 +0100 Subject: [PATCH] fix: Correct the number of pruned/matched Parquet pages --- .../src/datasource/physical_plan/parquet.rs | 67 +++++++++++++++++++ .../datasource-parquet/src/page_filter.rs | 37 +++++++--- .../test_files/explain_analyze.slt | 6 +- .../sqllogictest/test_files/limit_pruning.slt | 4 +- 4 files changed, 99 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index dd8c20628b43e..6f38df46e3d2e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1800,6 +1800,73 @@ mod tests { assert_eq!(page_index_pages_matched, 1); } + #[tokio::test] + async fn parquet_page_index_exec_metrics_multiple_predicates() { + let c1: ArrayRef = + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)])); + let c2: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(11), + Some(12), + Some(13), + Some(14), + ])); + let batch = create_batch(vec![("a", c1), ("b", c2)]); + + // matches the last page + let filter = col("a").gt_eq(lit(3)).and(col("b").lt_eq(lit(13))); + let rt = RoundTrip::new() + .with_predicate(filter) + .with_page_index_predicate() + .round_trip(vec![batch.clone()]) + .await; + let metrics = rt.parquet_exec.metrics().unwrap(); + + let (page_index_rows_pruned, page_index_rows_matched) = + get_pruning_metric(&metrics, "page_index_rows_pruned"); + assert_eq!(page_index_rows_pruned, 2); + assert_eq!(page_index_rows_matched, 2); + let (page_index_pages_pruned, page_index_pages_matched) = + get_pruning_metric(&metrics, "page_index_pages_pruned"); + assert_eq!(page_index_pages_pruned, 1); + assert_eq!(page_index_pages_matched, 1); + + // matches no pages + let filter = col("a").gt_eq(lit(3)).and(col("b").lt_eq(lit(12))); + let rt = RoundTrip::new() + .with_predicate(filter) + .with_page_index_predicate() + .round_trip(vec![batch.clone()]) + .await; + let metrics = rt.parquet_exec.metrics().unwrap(); + + let (page_index_rows_pruned, page_index_rows_matched) = + get_pruning_metric(&metrics, "page_index_rows_pruned"); + assert_eq!(page_index_rows_pruned, 4); + assert_eq!(page_index_rows_matched, 0); + let (page_index_pages_pruned, page_index_pages_matched) = + get_pruning_metric(&metrics, "page_index_pages_pruned"); + assert_eq!(page_index_pages_pruned, 2); + assert_eq!(page_index_pages_matched, 0); + + // matches both pages + let filter = col("a").gt_eq(lit(2)).and(col("b").lt_eq(lit(13))); + let rt = RoundTrip::new() + .with_predicate(filter) + .with_page_index_predicate() + .round_trip(vec![batch.clone()]) + .await; + let metrics = rt.parquet_exec.metrics().unwrap(); + + let (page_index_rows_pruned, page_index_rows_matched) = + get_pruning_metric(&metrics, "page_index_rows_pruned"); + assert_eq!(page_index_rows_pruned, 0); + assert_eq!(page_index_rows_matched, 4); + let (page_index_pages_pruned, page_index_pages_matched) = + get_pruning_metric(&metrics, "page_index_pages_pruned"); + assert_eq!(page_index_pages_pruned, 0); + assert_eq!(page_index_pages_matched, 2); + } + /// Returns a string array with contents: /// "[Foo, null, bar, bar, bar, bar, zzz]" fn string_batch() -> RecordBatch { diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index baef36ce147d4..419cf4f42867d 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -199,6 +199,10 @@ impl PagePruningAccessPlanFilter { for row_group_index in row_group_indexes { // The selection for this particular row group let mut overall_selection = None; + let mut total_pages_in_group = 0; + // stores the indexes of the matched pages + let mut matched_pages_in_group: Option> = None; + for predicate in page_index_predicates { let column = predicate .required_columns() @@ -230,12 +234,10 @@ impl PagePruningAccessPlanFilter { file_metrics, ); - let Some((selection, total_pages, matched_pages)) = selection else { + let Some((selection, pages)) = selection else { trace!("No pages pruned in prune_pages_in_one_row_group"); continue; }; - total_pages_select += matched_pages; - total_pages_skip += total_pages - matched_pages; debug!( "Use filter and page index to create RowSelection {:?} from predicate: {:?}", @@ -243,6 +245,20 @@ impl PagePruningAccessPlanFilter { predicate.predicate_expr(), ); + total_pages_in_group = pages.len(); + let matched_pages_indexes: HashSet<_> = pages + .into_iter() + .enumerate() + .filter(|x| x.1) + .map(|x| x.0) + .collect(); + if let Some(ref mut m) = matched_pages_in_group { + // only keep pages that also matched in the previous predicate(s) + m.retain(|x| matched_pages_indexes.contains(x)); + } else { + matched_pages_in_group = Some(matched_pages_indexes); + } + overall_selection = update_selection(overall_selection, selection); // if the overall selection has ruled out all rows, no need to @@ -278,6 +294,10 @@ impl PagePruningAccessPlanFilter { ); } } + + let pages_matched = matched_pages_in_group.map_or(0, |m| m.len()); + total_pages_select += pages_matched; + total_pages_skip += total_pages_in_group - pages_matched; } file_metrics.page_index_rows_pruned.add_pruned(total_skip); @@ -309,8 +329,8 @@ fn update_selection( } } -/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to the number of -/// total and matched pages. +/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to a vec of +/// booleans that state if each page was matched (true) or not (false). /// /// This Row Selection is formed from the page index and the predicate skips row /// ranges that can be ruled out based on the predicate. @@ -323,7 +343,7 @@ fn prune_pages_in_one_row_group( converter: StatisticsConverter<'_>, parquet_metadata: &ParquetMetaData, metrics: &ParquetFileMetrics, -) -> Option<(RowSelection, usize, usize)> { +) -> Option<(RowSelection, Vec)> { let pruning_stats = PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?; @@ -376,10 +396,7 @@ fn prune_pages_in_one_row_group( }; vec.push(selector); - let total_pages = values.len(); - let matched_pages = values.iter().filter(|v| **v).count(); - - Some((RowSelection::from(vec), total_pages, matched_pages)) + Some((RowSelection::from(vec), values)) } /// Implement [`PruningStatistics`] for one column's PageIndex (column_index + offset_index) diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index b272a8cd9c57e..1ccb9c92a7be2 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -247,7 +247,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)] statement ok reset datafusion.explain.analyze_categories; @@ -262,7 +262,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] statement ok reset datafusion.explain.analyze_categories; @@ -277,7 +277,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] statement ok reset datafusion.explain.analyze_categories; diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt index 34acb98f60033..b793425a6e500 100644 --- a/datafusion/sqllogictest/test_files/limit_pruning.slt +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary; query TT explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; ---- -Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] # limit_pruned_row_groups=0 total → 0 matched # because of order by, scan needs to preserve sort, so limit pruning is disabled @@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species > 'M' AND s >= 50 orde ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] statement ok drop table tracking_data;