Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -879,13 +879,13 @@ The benchmark includes queries that:

The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
```bash
./bench.sh data data_sorted_clickbench
./bench.sh data clickbench_sorted
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

threw this in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m assuming the new version is correct 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

```

To create the sorted dataset, for example with 16GB of memory, run:

```bash
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted
```

This command will:
Expand All @@ -896,7 +896,7 @@ This command will:
#### Running the Benchmark

```bash
./bench.sh run data_sorted_clickbench
./bench.sh run clickbench_sorted
```

This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.
34 changes: 4 additions & 30 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,15 @@
//! 2. Determine whether each predicate can be evaluated as an `ArrowPredicate`.
//! 3. Determine, for each predicate, the total compressed size of all
//! columns required to evaluate the predicate.
//! 4. Determine, for each predicate, whether all columns required to
//! evaluate the expression are sorted.
//! 5. Re-order the predicate by total size (from step 3).
//! 6. Partition the predicates according to whether they are sorted (from step 4)
//! 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
//! 8. Build the `RowFilter` with the sorted predicates followed by
//! the unsorted predicates. Within each partition, predicates are
//! still be sorted by size.
//! 4. Re-order predicates by total size (from step 3).
//! 5. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
//! 6. Build the `RowFilter` from the ordered predicates.
//!
//! List-aware predicates (for example, `array_has`, `array_has_all`, and
//! `array_has_any`) can be evaluated directly during Parquet decoding. Struct
//! columns and other nested projections that are not explicitly supported will
//! continue to be evaluated after the batches are materialized.

use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;

Expand Down Expand Up @@ -186,8 +180,6 @@ pub(crate) struct FilterCandidate {
/// the filter and to order the filters when `reorder_predicates` is true.
/// This is generated by summing the compressed size of all columns that the filter references.
required_bytes: usize,
/// Can this filter use an index (e.g. a page index) to prune rows?
can_use_index: bool,
/// Column indices into the parquet file schema required to evaluate this filter.
projection: LeafProjection,
/// The Arrow schema containing only the columns required by this filter,
Expand Down Expand Up @@ -251,12 +243,9 @@ impl FilterCandidateBuilder {
let projected_schema = Arc::new(self.file_schema.project(&root_indices)?);

let required_bytes = size_of_columns(&leaf_indices, metadata)?;
let can_use_index = columns_sorted(&leaf_indices, metadata)?;

Ok(Some(FilterCandidate {
expr: self.expr,
required_bytes,
can_use_index,
projection: LeafProjection { leaf_indices },
filter_schema: projected_schema,
}))
Expand Down Expand Up @@ -547,16 +536,6 @@ fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usiz
Ok(total_size)
}

/// For a given set of `Column`s required for predicate `Expr` determine whether
/// all columns are sorted.
///
/// Sorted columns may be queried more efficiently in the presence of
/// a PageIndex.
fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
// TODO How do we know this?
Ok(false)
}

/// Build a [`RowFilter`] from the given predicate expression if possible.
///
/// # Arguments
Expand Down Expand Up @@ -611,12 +590,7 @@ pub fn build_row_filter(
}

if reorder_predicates {
candidates.sort_unstable_by(|c1, c2| {
match c1.can_use_index.cmp(&c2.can_use_index) {
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
ord => ord,
}
});
candidates.sort_unstable_by_key(|c| c.required_bytes);
}

// To avoid double-counting metrics when multiple predicates are used:
Expand Down
Loading