diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 0e137a706fad7..4d46f84aa8167 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -37,6 +37,7 @@ mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use file_format::*; pub use metrics::ParquetFileMetrics; +pub use opener::ParquetMorselizer; pub use page_filter::PagePruningAccessPlanFilter; pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bb330c3f4caa1..148e45b99adbf 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ParquetOpener`] for opening Parquet files +//! [`ParquetMorselizer`] for morselizing Parquet files use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; @@ -25,12 +25,16 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{DataType, Schema}; -use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; use parquet::errors::ParquetError; use std::collections::HashMap; +use std::fmt::Debug; +use std::future::Future; +use std::mem; +use std::ops::Deref; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -56,7 +60,7 @@ use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{Stream, StreamExt, ready}; +use futures::{FutureExt, Stream, StreamExt, ready, stream::BoxStream}; use log::debug; use parquet::DecodeResult; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; @@ -67,9 +71,28 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use tokio::sync::oneshot; -/// Implements [`FileOpener`] for a parquet file -pub(super) struct ParquetOpener { +/// Implements [`Morselizer`] for a parquet file. +/// +/// The current implementation preserves parity with the existing opener path: +/// +/// 1. `morselize` creates a single planner for the input file +/// 2. the planner's first `plan` call returns an I/O future +/// 3. that future runs the copied parquet open/setup flow: +/// file pruning, metadata loading, optional page-index / bloom-filter work, +/// row-group pruning, decoder construction, and final stream setup +/// 4. the next `plan` call emits a single ready morsel wrapping that prepared stream +/// +/// This keeps the behavioral parity of `opener.rs` while routing execution +/// through the new `Morselizer` / `MorselPlanner` API. +#[derive(Clone)] +pub struct ParquetMorselizer { + state: Arc, +} + +/// State needed to plan Parquet morsels +pub struct ParquetMorselizerState { /// Execution partition index pub(crate) partition_index: usize, /// Projection to apply on top of the table schema (i.e. can reference partition columns). @@ -91,8 +114,8 @@ pub(super) struct ParquetOpener { pub metrics: ExecutionPlanMetricsSet, /// Factory for instantiating parquet reader pub parquet_file_reader_factory: Arc, - /// Should the filters be evaluated during the parquet scan using - /// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)? + /// Should the filters be evaluated during the parquet scan using the + /// parquet row-filter predicate machinery? pub pushdown_filters: bool, /// Should the filters be reordered to optimize the scan? pub reorder_filters: bool, @@ -108,15 +131,10 @@ pub(super) struct ParquetOpener { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, - /// Optional parquet FileDecryptionProperties - #[cfg(feature = "parquet_encryption")] - pub file_decryption_properties: Option>, /// Rewrite expressions in the context of the file schema pub(crate) expr_adapter_factory: Arc, - /// Optional factory to create file decryption properties dynamically - #[cfg(feature = "parquet_encryption")] - pub encryption_factory: - Option<(Arc, EncryptionFactoryOptions)>, + /// Encryption configuration used to resolve per-file decryption properties. + pub(crate) encryption_context: EncryptionContext, /// Maximum size of the predicate cache, in bytes. If none, uses /// the arrow-rs default. pub max_predicate_cache_size: Option, @@ -124,15 +142,87 @@ pub(super) struct ParquetOpener { pub reverse_row_groups: bool, } -impl FileOpener for ParquetOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { +impl ParquetMorselizer { + pub(crate) fn new(state: ParquetMorselizerState) -> Self { + Self { + state: Arc::new(state), + } + } +} + +impl Deref for ParquetMorselizer { + type Target = ParquetMorselizerState; + + fn deref(&self) -> &Self::Target { + &self.state + } +} + +impl Debug for ParquetMorselizer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetMorselizer") + .field("partition_index", &self.partition_index) + .field("batch_size", &self.batch_size) + .field("limit", &self.limit) + .field("preserve_order", &self.preserve_order) + .field("metadata_size_hint", &self.metadata_size_hint) + .field("pushdown_filters", &self.pushdown_filters) + .field("reorder_filters", &self.reorder_filters) + .field("force_filter_selections", &self.force_filter_selections) + .field("enable_page_index", &self.enable_page_index) + .field("enable_bloom_filter", &self.enable_bloom_filter) + .field( + "enable_row_group_stats_pruning", + &self.enable_row_group_stats_pruning, + ) + .field("coerce_int96", &self.coerce_int96) + .field("max_predicate_cache_size", &self.max_predicate_cache_size) + .field("reverse_row_groups", &self.reverse_row_groups) + .finish() + } +} + +/// Result of preparing a PartitionedFile using CPU before any I/O. +/// +/// This captures the state computed from `PartitionedFile`, the table schema, +/// and scan configuration so that later planner states only need to perform +/// async work such as metadata loading and stream construction. +struct PreparedParquetOpen { + state: Arc, + partitioned_file: PartitionedFile, + file_range: Option, + extensions: Option>, + file_metrics: ParquetFileMetrics, + file_pruner: Option, + metadata_size_hint: Option, + async_file_reader: Box, + logical_file_schema: SchemaRef, + output_schema: Arc, + projection: ProjectionExprs, + predicate: Option>, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: Option>, +} + +/// Result of loading parquet metadata after file-level pruning has completed. +struct MetadataLoadedParquetOpen { + prepared: PreparedParquetOpen, + reader_metadata: ArrowReaderMetadata, + options: ArrowReaderOptions, +} + +impl ParquetMorselizerState { + /// Perform the CPU-only setup for opening a parquet file. + fn prepare_open_file( + self: &Arc, + partitioned_file: PartitionedFile, + ) -> Result { // ----------------------------------- // Step: prepare configurations, etc. // ----------------------------------- let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); - let file_location = partitioned_file.object_meta.location.clone(); - let file_name = file_location.to_string(); + let file_name = partitioned_file.object_meta.location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); @@ -140,7 +230,7 @@ impl FileOpener for ParquetOpener { .metadata_size_hint .or(self.metadata_size_hint); - let mut async_file_reader: Box = + let async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, partitioned_file.clone(), @@ -148,8 +238,6 @@ impl FileOpener for ParquetOpener { &self.metrics, )?; - let batch_size = self.batch_size; - // Calculate the output schema from the original projection (before literal replacement) // so we get correct field names from column references let logical_file_schema = Arc::clone(self.table_schema.file_schema()); @@ -204,430 +292,737 @@ impl FileOpener for ParquetOpener { .transpose()?; } - let reorder_predicates = self.reorder_filters; - let pushdown_filters = self.pushdown_filters; - let force_filter_selections = self.force_filter_selections; - let coerce_int96 = self.coerce_int96; - let enable_bloom_filter = self.enable_bloom_filter; - let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; - let limit = self.limit; - let parquet_file_reader_factory = Arc::clone(&self.parquet_file_reader_factory); - let partition_index = self.partition_index; - let metrics = self.metrics.clone(); - let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); - let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory); + let file_pruner = predicate + .as_ref() + .filter(|p| is_dynamic_physical_expr(p) || partitioned_file.has_statistics()) + .and_then(|p| { + FilePruner::try_new( + Arc::clone(p), + &logical_file_schema, + &partitioned_file, + predicate_creation_errors.clone(), + ) + }); + + Ok(PreparedParquetOpen { + state: Arc::clone(self), + partitioned_file, + file_range, + extensions, + file_metrics, + file_pruner, + metadata_size_hint, + async_file_reader, + logical_file_schema, + output_schema, + projection, + predicate, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + }) + } - let enable_page_index = self.enable_page_index; - #[cfg(feature = "parquet_encryption")] + /// Resolve file-specific decryption properties before metadata I/O. + #[cfg(feature = "parquet_encryption")] + async fn load_file_decryption_properties( + self: &ParquetMorselizerState, + file_location: object_store::path::Path, + ) -> Result>> { let encryption_context = self.get_encryption_context(); - let max_predicate_cache_size = self.max_predicate_cache_size; + encryption_context + .get_file_decryption_properties(&file_location) + .await + } - let reverse_row_groups = self.reverse_row_groups; - let preserve_order = self.preserve_order; + /// Resolve file-specific decryption properties before metadata I/O. + #[cfg(not(feature = "parquet_encryption"))] + #[expect(dead_code)] + async fn load_file_decryption_properties( + self: &ParquetMorselizerState, + _file_location: object_store::path::Path, + ) -> Result>> { + Ok(None) + } - Ok(Box::pin(async move { - #[cfg(feature = "parquet_encryption")] - let file_decryption_properties = encryption_context - .get_file_decryption_properties(&file_location) - .await?; - - // --------------------------------------------- - // Step: try to prune the current file partition - // --------------------------------------------- - - // Prune this file using the file level statistics and partition values. - // Since dynamic filters may have been updated since planning it is possible that we are able - // to prune files now that we couldn't prune at planning time. - // It is assumed that there is no point in doing pruning here if the predicate is not dynamic, - // as it would have been done at planning time. - // We'll also check this after every record batch we read, - // and if at some point we are able to prove we can prune the file using just the file level statistics - // we can end the stream early. - let mut file_pruner = predicate - .as_ref() - .filter(|p| { - // Make a FilePruner only if there is either - // 1. a dynamic expr in the predicate - // 2. the file has file-level statistics. - // - // File-level statistics may prune the file without loading - // any row groups or metadata. - // - // Dynamic filters may prune the file after initial - // planning, as the dynamic filter is updated during - // execution. - // - // The case where there is a dynamic filter but no - // statistics corresponds to a dynamic filter that - // references partition columns. While rare, this is possible - // e.g. `select * from table order by partition_col limit - // 10` could hit this condition. - is_dynamic_physical_expr(p) || partitioned_file.has_statistics() - }) - .and_then(|p| { - FilePruner::try_new( - Arc::clone(p), - &logical_file_schema, - &partitioned_file, - predicate_creation_errors.clone(), - ) - }); - - if let Some(file_pruner) = &mut file_pruner - && file_pruner.should_prune()? - { - // Return an empty stream immediately to skip the work of setting up the actual stream - file_metrics.files_ranges_pruned_statistics.add_pruned(1); - return Ok(futures::stream::empty().boxed()); - } + /// CPU-only file pruning performed before metadata I/O begins. + /// + /// Returns `None` if the file was completely pruned + fn prune_prepared_file( + mut prepared: PreparedParquetOpen, + ) -> Result> { + if let Some(file_pruner) = &mut prepared.file_pruner + && file_pruner.should_prune()? + { + prepared + .file_metrics + .files_ranges_pruned_statistics + .add_pruned(1); + return Ok(None); + } - file_metrics.files_ranges_pruned_statistics.add_matched(1); + prepared + .file_metrics + .files_ranges_pruned_statistics + .add_matched(1); + Ok(Some(prepared)) + } - // -------------------------------------------------------- - // Step: fetch Parquet metadata (and optionally page index) - // -------------------------------------------------------- + /// Fetch parquet metadata once file-level pruning is complete. + async fn load_prepared_metadata( + mut prepared: PreparedParquetOpen, + ) -> Result { + let options = + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); + #[cfg(feature = "parquet_encryption")] + let mut options = options; + #[cfg(feature = "parquet_encryption")] + if let Some(fd_val) = &prepared.file_decryption_properties { + options = options.with_file_decryption_properties(Arc::clone(fd_val)); + } + let reader_metadata = { + let mut metadata_timer = prepared.file_metrics.metadata_load_time.timer(); + let reader_metadata = ArrowReaderMetadata::load_async( + &mut prepared.async_file_reader, + options.clone(), + ) + .await?; + metadata_timer.stop(); + reader_metadata + }; + Ok(MetadataLoadedParquetOpen { + prepared, + reader_metadata, + options, + }) + } - // Don't load the page index yet. Since it is not stored inline in - // the footer, loading the page index if it is not needed will do - // unnecessary I/O. We decide later if it is needed to evaluate the - // pruning predicates. Thus default to not requesting it from the - // underlying reader. - let mut options = - ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); + /// Continue opening once parquet metadata has been loaded. + async fn execute_metadata_loaded_open( + loaded: MetadataLoadedParquetOpen, + ) -> Result>> { + let MetadataLoadedParquetOpen { + prepared, + mut reader_metadata, + mut options, + } = loaded; + let PreparedParquetOpen { + state, + partitioned_file, + file_range, + extensions, + file_metrics, + file_pruner, + metadata_size_hint, + mut async_file_reader, + logical_file_schema, + output_schema, + mut projection, + mut predicate, #[cfg(feature = "parquet_encryption")] - if let Some(fd_val) = file_decryption_properties { - options = options.with_file_decryption_properties(Arc::clone(&fd_val)); - } - let mut metadata_timer = file_metrics.metadata_load_time.timer(); - - // Begin by loading the metadata from the underlying reader (note - // the returned metadata may actually include page indexes as some - // readers may return page indexes even when not requested -- for - // example when they are cached) - let mut reader_metadata = - ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) - .await?; - - // Note about schemas: we are actually dealing with **3 different schemas** here: - // - The table schema as defined by the TableProvider. - // This is what the user sees, what they get when they `SELECT * FROM table`, etc. - // - The logical file schema: this is the table schema minus any hive partition columns and projections. - // This is what the physical file schema is coerced to. - // - The physical file schema: this is the schema that the arrow-rs - // parquet reader will actually produce. - let mut physical_file_schema = Arc::clone(reader_metadata.schema()); - - // The schema loaded from the file may not be the same as the - // desired schema (for example if we want to instruct the parquet - // reader to read strings using Utf8View instead). Update if necessary - if let Some(merged) = apply_file_schema_type_coercions( - &logical_file_schema, - &physical_file_schema, - ) { - physical_file_schema = Arc::new(merged); - options = options.with_schema(Arc::clone(&physical_file_schema)); - reader_metadata = ArrowReaderMetadata::try_new( - Arc::clone(reader_metadata.metadata()), - options.clone(), - )?; - } - - if let Some(ref coerce) = coerce_int96 - && let Some(merged) = coerce_int96_to_resolution( - reader_metadata.parquet_schema(), - &physical_file_schema, - coerce, - ) - { - physical_file_schema = Arc::new(merged); - options = options.with_schema(Arc::clone(&physical_file_schema)); - reader_metadata = ArrowReaderMetadata::try_new( - Arc::clone(reader_metadata.metadata()), - options.clone(), - )?; - } - - // Adapt the projection & filter predicate to the physical file schema. - // This evaluates missing columns and inserts any necessary casts. - // After rewriting to the file schema, further simplifications may be possible. - // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` - // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). - // Additionally, if any casts were inserted we can move casts from the column to the literal side: - // `CAST(col AS INT) = 5` can become `col = CAST(5 AS )`, which can be evaluated statically. - let rewriter = expr_adapter_factory.create( - Arc::clone(&logical_file_schema), - Arc::clone(&physical_file_schema), + file_decryption_properties: _, + } = prepared; + + let file_name = partitioned_file.object_meta.location.to_string(); + let batch_size = state.batch_size; + let reorder_predicates = state.reorder_filters; + let pushdown_filters = state.pushdown_filters; + let force_filter_selections = state.force_filter_selections; + let coerce_int96 = state.coerce_int96; + let enable_bloom_filter = state.enable_bloom_filter; + let enable_row_group_stats_pruning = state.enable_row_group_stats_pruning; + let limit = state.limit; + let parquet_file_reader_factory = Arc::clone(&state.parquet_file_reader_factory); + let partition_index = state.partition_index; + let metrics = state.metrics.clone(); + let predicate_creation_errors = MetricBuilder::new(&state.metrics) + .global_counter("num_predicate_creation_errors"); + let expr_adapter_factory = Arc::clone(&state.expr_adapter_factory); + let enable_page_index = state.enable_page_index; + let max_predicate_cache_size = state.max_predicate_cache_size; + let reverse_row_groups = state.reverse_row_groups; + let preserve_order = state.preserve_order; + + // Note about schemas: we are actually dealing with **3 different schemas** here: + // - The table schema as defined by the TableProvider. + // This is what the user sees, what they get when they `SELECT * FROM table`, etc. + // - The logical file schema: this is the table schema minus any hive partition columns and projections. + // This is what the physical file schema is coerced to. + // - The physical file schema: this is the schema that the arrow-rs + // parquet reader will actually produce. + let mut physical_file_schema = Arc::clone(reader_metadata.schema()); + + // The schema loaded from the file may not be the same as the + // desired schema (for example if we want to instruct the parquet + // reader to read strings using Utf8View instead). Update if necessary + if let Some(merged) = + apply_file_schema_type_coercions(&logical_file_schema, &physical_file_schema) + { + physical_file_schema = Arc::new(merged); + options = options.with_schema(Arc::clone(&physical_file_schema)); + reader_metadata = ArrowReaderMetadata::try_new( + Arc::clone(reader_metadata.metadata()), + options.clone(), )?; - let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); - predicate = predicate - .map(|p| simplifier.simplify(rewriter.rewrite(p)?)) - .transpose()?; - // Adapt projections to the physical file schema as well - projection = projection - .try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?; + } - // Build predicates for this specific file - let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( - predicate.as_ref(), + if let Some(ref coerce) = coerce_int96 + && let Some(merged) = coerce_int96_to_resolution( + reader_metadata.parquet_schema(), &physical_file_schema, - &predicate_creation_errors, - ); + coerce, + ) + { + physical_file_schema = Arc::new(merged); + options = options.with_schema(Arc::clone(&physical_file_schema)); + reader_metadata = ArrowReaderMetadata::try_new( + Arc::clone(reader_metadata.metadata()), + options.clone(), + )?; + } - // The page index is not stored inline in the parquet footer so the - // code above may not have read the page index structures yet. If we - // need them for reading and they aren't yet loaded, we need to load them now. - if should_enable_page_index(enable_page_index, &page_pruning_predicate) { - reader_metadata = load_page_index( - reader_metadata, - &mut async_file_reader, - // Since we're manually loading the page index the option here should not matter but we pass it in for consistency - options.with_page_index_policy(PageIndexPolicy::Optional), - ) - .await?; - } + // Adapt the projection & filter predicate to the physical file schema. + // This evaluates missing columns and inserts any necessary casts. + // After rewriting to the file schema, further simplifications may be possible. + // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` + // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). + // Additionally, if any casts were inserted we can move casts from the column to the literal side: + // `CAST(col AS INT) = 5` can become `col = CAST(5 AS )`, which can be evaluated statically. + let rewriter = expr_adapter_factory.create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + )?; + let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); + predicate = predicate + .map(|p| simplifier.simplify(rewriter.rewrite(p)?)) + .transpose()?; + // Adapt projections to the physical file schema as well + projection = + projection.try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?; + + // Build predicates for this specific file + let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( + predicate.as_ref(), + &physical_file_schema, + &predicate_creation_errors, + ); - metadata_timer.stop(); + // The page index is not stored inline in the parquet footer so the + // code above may not have read the page index structures yet. If we + // need them for reading and they aren't yet loaded, we need to load them now. + if should_enable_page_index(enable_page_index, &page_pruning_predicate) { + reader_metadata = load_page_index( + reader_metadata, + &mut async_file_reader, + // Since we're manually loading the page index the option here should not matter but we pass it in for consistency + options.with_page_index_policy(PageIndexPolicy::Optional), + ) + .await?; + } + + // ------------------------------------------------------------ + // Step: prune row groups by range, predicate and bloom filter + // ------------------------------------------------------------ + + // Determine which row groups to actually read. The idea is to skip + // as many row groups as possible based on the metadata and query + let file_metadata = Arc::clone(reader_metadata.metadata()); + let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref()); + let rg_metadata = file_metadata.row_groups(); + // track which row groups to actually read + let access_plan = create_initial_plan(&file_name, extensions, rg_metadata.len())?; + let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); + // if there is a range restricting what parts of the file to read + if let Some(range) = file_range.as_ref() { + row_groups.prune_by_range(rg_metadata, range); + } - // ------------------------------------------------------------ - // Step: prune row groups by range, predicate and bloom filter - // ------------------------------------------------------------ - - // Determine which row groups to actually read. The idea is to skip - // as many row groups as possible based on the metadata and query - let file_metadata = Arc::clone(reader_metadata.metadata()); - let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref()); - let rg_metadata = file_metadata.row_groups(); - // track which row groups to actually read - let access_plan = - create_initial_plan(&file_name, extensions, rg_metadata.len())?; - let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); - // if there is a range restricting what parts of the file to read - if let Some(range) = file_range.as_ref() { - row_groups.prune_by_range(rg_metadata, range); + // If there is a predicate that can be evaluated against the metadata + if let Some(pruning_pred) = pruning_pred.as_ref() { + if enable_row_group_stats_pruning { + row_groups.prune_by_statistics( + &physical_file_schema, + reader_metadata.parquet_schema(), + rg_metadata, + pruning_pred, + &file_metrics, + ); + } else { + // Update metrics: statistics unavailable, so all row groups are + // matched (not pruned) + file_metrics + .row_groups_pruned_statistics + .add_matched(row_groups.remaining_row_group_count()); } - // If there is a predicate that can be evaluated against the metadata - if let Some(pruning_pred) = pruning_pred.as_ref() { - if enable_row_group_stats_pruning { - row_groups.prune_by_statistics( + if enable_bloom_filter && !row_groups.is_empty() { + // Use the existing reader for bloom filter I/O; + // replace with a fresh reader for decoding below. + let bf_reader = mem::replace( + &mut async_file_reader, + parquet_file_reader_factory.create_reader( + partition_index, + partitioned_file.clone(), + metadata_size_hint, + &metrics, + )?, + ); + let mut bf_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + bf_reader, + reader_metadata.clone(), + ); + row_groups + .prune_by_bloom_filters( &physical_file_schema, - reader_metadata.parquet_schema(), - rg_metadata, + &mut bf_builder, pruning_pred, &file_metrics, - ); - } else { - // Update metrics: statistics unavailable, so all row groups are - // matched (not pruned) - file_metrics - .row_groups_pruned_statistics - .add_matched(row_groups.remaining_row_group_count()); - } - - if enable_bloom_filter && !row_groups.is_empty() { - // Use the existing reader for bloom filter I/O; - // replace with a fresh reader for decoding below. - let bf_reader = std::mem::replace( - &mut async_file_reader, - parquet_file_reader_factory.create_reader( - partition_index, - partitioned_file.clone(), - metadata_size_hint, - &metrics, - )?, - ); - let mut bf_builder = - ParquetRecordBatchStreamBuilder::new_with_metadata( - bf_reader, - reader_metadata.clone(), - ); - row_groups - .prune_by_bloom_filters( - &physical_file_schema, - &mut bf_builder, - pruning_pred, - &file_metrics, - ) - .await; - } else { - // Update metrics: bloom filter unavailable, so all row groups are - // matched (not pruned) - file_metrics - .row_groups_pruned_bloom_filter - .add_matched(row_groups.remaining_row_group_count()); - } + ) + .await; } else { - // Update metrics: no predicate, so all row groups are matched (not pruned) - let n_remaining_row_groups = row_groups.remaining_row_group_count(); - file_metrics - .row_groups_pruned_statistics - .add_matched(n_remaining_row_groups); + // Update metrics: bloom filter unavailable, so all row groups are + // matched (not pruned) file_metrics .row_groups_pruned_bloom_filter - .add_matched(n_remaining_row_groups); + .add_matched(row_groups.remaining_row_group_count()); } + } else { + // Update metrics: no predicate, so all row groups are matched (not pruned) + let n_remaining_row_groups = row_groups.remaining_row_group_count(); + file_metrics + .row_groups_pruned_statistics + .add_matched(n_remaining_row_groups); + file_metrics + .row_groups_pruned_bloom_filter + .add_matched(n_remaining_row_groups); + } - // Prune by limit if limit is set and limit order is not sensitive - if let (Some(limit), false) = (limit, preserve_order) { - row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); - } + // Prune by limit if limit is set and limit order is not sensitive + if let (Some(limit), false) = (limit, preserve_order) { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); + } - // -------------------------------------------------------- - // Step: prune pages from the kept row groups - // - let mut access_plan = row_groups.build(); - // page index pruning: if all data on individual pages can - // be ruled using page metadata, rows from other columns - // with that range can be skipped as well - // -------------------------------------------------------- - if enable_page_index - && !access_plan.is_empty() - && let Some(p) = page_pruning_predicate - { - access_plan = p.prune_plan_with_page_index( - access_plan, - &physical_file_schema, - reader_metadata.parquet_schema(), - file_metadata.as_ref(), - &file_metrics, - ); - } + // -------------------------------------------------------- + // Step: prune pages from the kept row groups + // + let mut access_plan = row_groups.build(); + // page index pruning: if all data on individual pages can + // be ruled using page metadata, rows from other columns + // with that range can be skipped as well + // -------------------------------------------------------- + if enable_page_index + && !access_plan.is_empty() + && let Some(p) = page_pruning_predicate + { + access_plan = p.prune_plan_with_page_index( + access_plan, + &physical_file_schema, + reader_metadata.parquet_schema(), + file_metadata.as_ref(), + &file_metrics, + ); + } - // Prepare the access plan (extract row groups and row selection) - let mut prepared_plan = access_plan.prepare(rg_metadata)?; + // Prepare the access plan (extract row groups and row selection) + let mut prepared_plan = access_plan.prepare(rg_metadata)?; - // ---------------------------------------------------------- - // Step: potentially reverse the access plan for performance. - // See `ParquetSource::try_pushdown_sort` for the rationale. - // ---------------------------------------------------------- - if reverse_row_groups { - prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; - } + // ---------------------------------------------------------- + // Step: potentially reverse the access plan for performance. + // See `ParquetSource::try_pushdown_sort` for the rationale. + // ---------------------------------------------------------- + if reverse_row_groups { + prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; + } - if prepared_plan.row_group_indexes.is_empty() { - return Ok(futures::stream::empty().boxed()); - } + if prepared_plan.row_group_indexes.is_empty() { + return Ok(futures::stream::empty().boxed()); + } - // --------------------------------------------------------- - // Step: construct builder for the final RecordBatch stream - // --------------------------------------------------------- - - let mut builder = - ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone()) - .with_batch_size(batch_size); - - // --------------------------------------------------------------------- - // Step: optionally add row filter to the builder - // - // Row filter is used for late materialization in parquet decoding, see - // `row_filter` for details. - // --------------------------------------------------------------------- - - // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = - pushdown_filters.then_some(predicate.as_ref()).flatten() - { - let row_filter = row_filter::build_row_filter( - predicate, - &physical_file_schema, - file_metadata.as_ref(), - reorder_predicates, - &file_metrics, - ); + // --------------------------------------------------------- + // Step: construct builder for the final RecordBatch stream + // --------------------------------------------------------- + + let mut builder = + ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone()) + .with_batch_size(batch_size); + + // --------------------------------------------------------------------- + // Step: optionally add row filter to the builder + // + // Row filter is used for late materialization in parquet decoding, see + // `row_filter` for details. + // --------------------------------------------------------------------- + + // Filter pushdown: evaluate predicates during scan + if let Some(predicate) = pushdown_filters.then_some(predicate.as_ref()).flatten() + { + let row_filter = row_filter::build_row_filter( + predicate, + &physical_file_schema, + file_metadata.as_ref(), + reorder_predicates, + &file_metrics, + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + } + }; + }; + if force_filter_selections { + builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); + // Apply the prepared plan to the builder + if let Some(row_selection) = prepared_plan.row_selection { + builder = builder.with_row_selection(row_selection); + } + builder = builder.with_row_groups(prepared_plan.row_group_indexes); + + if let Some(limit) = limit { + builder = builder.with_limit(limit) + } + + if let Some(max_predicate_cache_size) = max_predicate_cache_size { + builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); + } + + // metrics from the arrow reader itself + let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + + let indices = projection.column_indices(); + let mask = + ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone()); + + let decoder = builder + .with_projection(mask) + .with_metrics(arrow_reader_metrics.clone()) + .build()?; + + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + let predicate_cache_inner_records = + file_metrics.predicate_cache_inner_records.clone(); + let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + + // Rebase column indices to match the narrowed stream schema. + // The projection expressions have indices based on physical_file_schema, + // but the stream only contains the columns selected by the ProjectionMask. + let stream_schema = Arc::new(physical_file_schema.project(&indices)?); + let replace_schema = stream_schema != output_schema; + let projection = projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projector = projection.make_projector(&stream_schema)?; + let stream = futures::stream::unfold( + PushDecoderStreamState { + decoder, + reader: async_file_reader, + projector, + output_schema, + replace_schema, + arrow_reader_metrics, + predicate_cache_inner_records, + predicate_cache_records, + }, + |mut state| async move { + let result = state.transition().await; + result.map(|r| (r, state)) + }, + ) + .fuse(); + + // ---------------------------------------------------------------------- + // Step: wrap the stream so a dynamic filter can stop the file scan early + // ---------------------------------------------------------------------- + if let Some(file_pruner) = file_pruner { + let boxed_stream = stream.boxed(); + Ok(EarlyStoppingStream::new( + boxed_stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(stream.boxed()) + } + } +} + +impl Morselizer for ParquetMorselizer { + fn morselize(&self, file: PartitionedFile) -> Result>> { + Ok(vec![Box::new(ParquetMorselPlanner::new( + Arc::clone(&self.state), + file, + ))]) + } +} + +/// CPU-only states for [`ParquetMorselPlanner`]. +/// +/// These are the states when the MorselPlanner has more CPU work to do +enum ReadyState { + /// Planner has not started any work yet. + Start(Box), + /// Planner is ready to resolve any file-specific encryption properties. + #[cfg(feature = "parquet_encryption")] + PrepareEncryption(Box), + /// Planner can do file-level pruning before requesting parquet metadata. + PruneFiles(Box), + /// Planner has loaded parquet metadata and is ready for further planning. + Prepared(Box), + /// Planner has a fully prepared stream ready to wrap as a morsel. + EmitMorsel(BoxStream<'static, Result>), +} + +impl ReadyState { + fn start(file: PartitionedFile) -> Self { + Self::Start(Box::new(file)) + } + + #[cfg(feature = "parquet_encryption")] + fn prepare_encryption(prepared: PreparedParquetOpen) -> Self { + Self::PrepareEncryption(Box::new(prepared)) + } + + fn prune_files(prepared: PreparedParquetOpen) -> Self { + Self::PruneFiles(Box::new(prepared)) + } + + fn prepared(prepared: MetadataLoadedParquetOpen) -> Self { + Self::Prepared(prepared.into()) + } + + fn emit_morsel(stream: BoxStream<'static, Result>) -> Self { + Self::EmitMorsel(stream) + } +} + +/// Scheduler-visible state for [`ParquetMorselPlanner`]. +/// +/// This allows tracking outstanding IOs +enum ParquetMorselPlannerState { + /// Planner can make progress using CPU only. + Ready(Box), + /// Planner has outstanding async I/O and will become ready again when it completes. + WaitingIo(WaitingIoState), + /// Planner has emitted its morsel and has no further work. + Done, +} + +impl ParquetMorselPlannerState { + fn ready(ready_state: ReadyState) -> Self { + Self::Ready(Box::new(ready_state)) + } + + /// Return a planner state that emits an empty morsel stream. + /// + /// This is used when file-level pruning determines the file can be skipped + /// before any parquet metadata or row-group work is needed, while still + /// flowing through the normal morsel emission path in `FileStream`. + fn empty_file() -> Self { + Self::ready(ReadyState::emit_morsel(futures::stream::empty().boxed())) + } +} + +/// Result of an in-flight planner I/O phase. +struct WaitingIoState { + /// Waiting for an async step to produce the next CPU-ready planner state. + receiver: oneshot::Receiver>, +} + +impl ParquetMorselPlannerState { + fn name(&self) -> &'static str { + match self { + Self::Ready(ready_state) => match ready_state.as_ref() { + ReadyState::Start(_) => "Ready(Start)", + #[cfg(feature = "parquet_encryption")] + ReadyState::PrepareEncryption(_) => "Ready(PrepareEncryption)", + ReadyState::PruneFiles(_) => "Ready(PruneFiles)", + ReadyState::Prepared(_) => "Ready(Prepared)", + ReadyState::EmitMorsel(_) => "Ready(EmitMorsel)", + }, + Self::WaitingIo(_) => "WaitingIo", + Self::Done => "Done", + } + } +} + +/// Planner wrapper that exposes the copied opener logic through the generic +/// morsel-planning API. +struct ParquetMorselPlanner { + morselizer: Arc, + state: ParquetMorselPlannerState, +} + +impl Debug for ParquetMorselPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetMorselPlanner") + .field("morselizer", &"...") + .field("state", &self.state.name()) + .finish() + } +} + +impl ParquetMorselPlanner { + fn new(morselizer: Arc, file: PartitionedFile) -> Self { + Self { + morselizer, + state: ParquetMorselPlannerState::ready(ReadyState::start(file)), + } + } + + #[cfg(feature = "parquet_encryption")] + fn needs_file_decryption_properties(&self) -> bool { + self.morselizer + .encryption_context + .needs_file_decryption_properties() + } + + /// Schedule an async step and transition the planner into `WaitingIo`. + /// + /// Sets `self.state` to ParquetMorselPlannerState::WaitingIo + fn schedule_io(&mut self, future: F) -> Option + where + F: Future> + Send + 'static, + { + let (sender, receiver) = oneshot::channel(); + let io_future = async move { + let next_state = future.await?; + // Ignore error as it means the receiver shutdown (likely due to a + // real error) and we don't want to confuse error reporting by + // reporting a closed channel. + let _ = sender.send(Ok(next_state)); + Ok(()) + }; + self.state = ParquetMorselPlannerState::WaitingIo(WaitingIoState { receiver }); + Some(MorselPlan::new().with_io_future(io_future.boxed())) + } +} + +impl MorselPlanner for ParquetMorselPlanner { + fn plan(&mut self) -> Result> { + // Core state machine transition + let state = mem::replace(&mut self.state, ParquetMorselPlannerState::Done); + match state { + ParquetMorselPlannerState::Ready(ready_state) => match *ready_state { + ReadyState::Start(file) => { + let prepared = self.morselizer.prepare_open_file(*file)?; + #[cfg(feature = "parquet_encryption")] + { + if self.needs_file_decryption_properties() { + self.state = ParquetMorselPlannerState::ready( + ReadyState::prepare_encryption(prepared), + ); + } else { + self.state = ParquetMorselPlannerState::ready( + ReadyState::prune_files(prepared), + ); + } } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" + #[cfg(not(feature = "parquet_encryption"))] + { + self.state = ParquetMorselPlannerState::ready( + ReadyState::prune_files(prepared), ); } - }; - }; - if force_filter_selections { - builder = - builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + Ok(Some(MorselPlan::new())) + } + #[cfg(feature = "parquet_encryption")] + ReadyState::PrepareEncryption(mut prepared) => { + let file_location = + prepared.partitioned_file.object_meta.location.clone(); + let state = Arc::clone(&prepared.state); + Ok(self.schedule_io(async move { + let properties = + state.load_file_decryption_properties(file_location).await?; + prepared.file_decryption_properties = properties; + Ok(ReadyState::prune_files(*prepared)) + })) + } + ReadyState::PruneFiles(prepared) => { + let Some(prepared) = + ParquetMorselizerState::prune_prepared_file(*prepared)? + else { + // File was totally pruned + self.state = ParquetMorselPlannerState::empty_file(); + return Ok(Some(MorselPlan::new())); + }; + Ok(self.schedule_io(async move { + let loaded = + ParquetMorselizerState::load_prepared_metadata(prepared) + .await?; + Ok(ReadyState::prepared(loaded)) + })) + } + ReadyState::Prepared(prepared) => Ok(self.schedule_io(async move { + let stream = + ParquetMorselizerState::execute_metadata_loaded_open(*prepared) + .await?; + Ok(ReadyState::emit_morsel(stream)) + })), + ReadyState::EmitMorsel(stream) => { + let morsel = ParquetStreamMorsel::new(stream); + Ok(Some(MorselPlan::new().with_morsels(vec![Box::new(morsel)]))) + } + }, + ParquetMorselPlannerState::WaitingIo(WaitingIoState { mut receiver }) => { + match receiver.try_recv() { + Ok(next_state) => { + self.state = ParquetMorselPlannerState::ready(next_state?); + Ok(Some(MorselPlan::new())) + } + Err(oneshot::error::TryRecvError::Empty) => { + self.state = + ParquetMorselPlannerState::WaitingIo(WaitingIoState { + receiver, + }); + Ok(None) + } + Err(oneshot::error::TryRecvError::Closed) => { + Err(DataFusionError::Execution( + "Parquet morsel planner I/O completion channel closed" + .to_string(), + )) + } + } } + ParquetMorselPlannerState::Done => Ok(None), + } + } +} - // Apply the prepared plan to the builder - if let Some(row_selection) = prepared_plan.row_selection { - builder = builder.with_row_selection(row_selection); - } - builder = builder.with_row_groups(prepared_plan.row_group_indexes); +struct ParquetStreamMorsel { + stream: BoxStream<'static, Result>, +} - if let Some(limit) = limit { - builder = builder.with_limit(limit) - } +impl Debug for ParquetStreamMorsel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetStreamMorsel") + .field("stream", &"...") + .finish() + } +} - if let Some(max_predicate_cache_size) = max_predicate_cache_size { - builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); - } +impl ParquetStreamMorsel { + fn new(stream: BoxStream<'static, Result>) -> Self { + Self { stream } + } +} - // metrics from the arrow reader itself - let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - - let indices = projection.column_indices(); - let mask = - ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone()); - - let decoder = builder - .with_projection(mask) - .with_metrics(arrow_reader_metrics.clone()) - .build()?; - - let files_ranges_pruned_statistics = - file_metrics.files_ranges_pruned_statistics.clone(); - let predicate_cache_inner_records = - file_metrics.predicate_cache_inner_records.clone(); - let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - - // Rebase column indices to match the narrowed stream schema. - // The projection expressions have indices based on physical_file_schema, - // but the stream only contains the columns selected by the ProjectionMask. - let stream_schema = Arc::new(physical_file_schema.project(&indices)?); - let replace_schema = stream_schema != output_schema; - let projection = projection - .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - let projector = projection.make_projector(&stream_schema)?; - let stream = futures::stream::unfold( - PushDecoderStreamState { - decoder, - reader: async_file_reader, - projector, - output_schema, - replace_schema, - arrow_reader_metrics, - predicate_cache_inner_records, - predicate_cache_records, - }, - |mut state| async move { - let result = state.transition().await; - result.map(|r| (r, state)) - }, - ) - .fuse(); - - // ---------------------------------------------------------------------- - // Step: wrap the stream so a dynamic filter can stop the file scan early - // ---------------------------------------------------------------------- - if let Some(file_pruner) = file_pruner { - let boxed_stream = stream.boxed(); - Ok(EarlyStoppingStream::new( - boxed_stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } else { - Ok(stream.boxed()) - } - })) +impl Morsel for ParquetStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } + + fn split(&mut self) -> Result>> { + Ok(vec![]) } } @@ -851,8 +1246,8 @@ where } } -#[derive(Default)] -struct EncryptionContext { +#[derive(Default, Clone)] +pub(crate) struct EncryptionContext { #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, #[cfg(feature = "parquet_encryption")] @@ -861,7 +1256,11 @@ struct EncryptionContext { #[cfg(feature = "parquet_encryption")] impl EncryptionContext { - fn new( + fn needs_file_decryption_properties(&self) -> bool { + self.file_decryption_properties.is_some() || self.encryption_factory.is_some() + } + + pub(crate) fn new( file_decryption_properties: Option>, encryption_factory: Option<( Arc, @@ -895,6 +1294,10 @@ impl EncryptionContext { #[cfg(not(feature = "parquet_encryption"))] #[expect(dead_code)] impl EncryptionContext { + fn needs_file_decryption_properties(&self) -> bool { + false + } + async fn get_file_decryption_properties( &self, _file_location: &object_store::path::Path, @@ -903,19 +1306,16 @@ impl EncryptionContext { } } -impl ParquetOpener { +impl ParquetMorselizerState { #[cfg(feature = "parquet_encryption")] fn get_encryption_context(&self) -> EncryptionContext { - EncryptionContext::new( - self.file_decryption_properties.clone(), - self.encryption_factory.clone(), - ) + self.encryption_context.clone() } #[cfg(not(feature = "parquet_encryption"))] #[expect(dead_code)] fn get_encryption_context(&self) -> EncryptionContext { - EncryptionContext::default() + self.encryption_context.clone() } } @@ -1032,15 +1432,19 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; + use super::{ + ConstantColumns, EncryptionContext, ParquetMorselizerState, + constant_columns_from_stats, + }; + use crate::{DefaultParquetFileReaderFactory, ParquetMorselizer, RowGroupAccess}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::record_batch::RecordBatch; use bytes::{BufMut, BytesMut}; use datafusion_common::{ ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, stats::Precision, }; - use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; + use datafusion_datasource::{PartitionedFile, TableSchema, morsel::Morselizer}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, @@ -1057,9 +1461,9 @@ mod test { use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; - /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. + /// Builder for creating [`ParquetMorselizer`] instances with sensible defaults for tests. /// This helps reduce code duplication and makes it clear what differs between test cases. - struct ParquetOpenerBuilder { + struct ParquetMorselizerBuilder { store: Option>, table_schema: Option, partition_index: usize, @@ -1082,7 +1486,7 @@ mod test { preserve_order: bool, } - impl ParquetOpenerBuilder { + impl ParquetMorselizerBuilder { /// Create a new builder with sensible defaults for tests. fn new() -> Self { Self { @@ -1163,17 +1567,17 @@ mod test { self } - /// Build the ParquetOpener instance. + /// Build the ParquetMorselizer instance. /// /// # Panics /// /// Panics if required fields (store, schema/table_schema) are not set. - fn build(self) -> ParquetOpener { + fn build(self) -> ParquetMorselizer { let store = self .store - .expect("ParquetOpenerBuilder: store must be set via with_store()"); + .expect("ParquetMorselizerBuilder: store must be set via with_store()"); let table_schema = self.table_schema.expect( - "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", + "ParquetMorselizerBuilder: table_schema must be set via with_schema() or with_table_schema()", ); let file_schema = Arc::clone(table_schema.file_schema()); @@ -1187,7 +1591,7 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; - ParquetOpener { + ParquetMorselizer::new(ParquetMorselizerState { partition_index: self.partition_index, projection, batch_size: self.batch_size, @@ -1206,15 +1610,12 @@ mod test { enable_bloom_filter: self.enable_bloom_filter, enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, coerce_int96: self.coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties: None, expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), - #[cfg(feature = "parquet_encryption")] - encryption_factory: None, + encryption_context: EncryptionContext::default(), max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, - } + }) } } @@ -1303,10 +1704,7 @@ mod test { async fn count_batches_and_rows( mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, + Box> + Send>, >, ) -> (usize, usize) { let mut num_batches = 0; @@ -1321,10 +1719,7 @@ mod test { /// Helper to collect all int32 values from the first column of batches async fn collect_int32_values( mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, + Box> + Send>, >, ) -> Vec { use arrow::array::Array; @@ -1344,10 +1739,42 @@ mod test { values } + async fn open_via_morselizer( + morselizer: &ParquetMorselizer, + file: PartitionedFile, + ) -> std::pin::Pin> + Send>> + { + let mut planners = morselizer.morselize(file).unwrap(); + assert_eq!(planners.len(), 1); + let mut planner = planners.swap_remove(0); + + loop { + match planner.plan().unwrap() { + Some(mut plan) => { + if let Some(io_future) = plan.take_io_future() { + io_future.await.unwrap(); + continue; + } + + let mut morsels = plan.take_morsels(); + if morsels.is_empty() { + continue; + } + assert_eq!(morsels.len(), 1); + return morsels.pop().unwrap().into_stream(); + } + None => { + return futures::stream::empty::>() + .boxed(); + } + } + } + } + async fn write_parquet( store: Arc, filename: &str, - batch: arrow::record_batch::RecordBatch, + batch: RecordBatch, ) -> usize { write_parquet_batches(store, filename, vec![batch], None).await } @@ -1356,7 +1783,7 @@ mod test { async fn write_parquet_batches( store: Arc, filename: &str, - batches: Vec, + batches: Vec, props: Option, ) -> usize { let mut out = BytesMut::new().writer(); @@ -1411,7 +1838,7 @@ mod test { )); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0, 1]) @@ -1424,7 +1851,7 @@ mod test { let expr = col("a").eq(lit(1)); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1433,7 +1860,7 @@ mod test { let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0)))); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1464,7 +1891,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -1479,7 +1906,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1490,7 +1917,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1533,7 +1960,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -1546,7 +1973,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1555,7 +1982,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1564,7 +1991,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1573,7 +2000,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1605,7 +2032,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -1619,7 +2046,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1628,7 +2055,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1637,7 +2064,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); @@ -1646,7 +2073,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1686,7 +2113,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -1699,7 +2126,7 @@ mod test { let expr = col("a").eq(lit(42)); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1708,7 +2135,7 @@ mod test { // This allows dynamic filters to prune partitions/files even if they are populated late into execution. let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1718,7 +2145,7 @@ mod test { let expr = col("part").eq(lit(2)); let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1727,7 +2154,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42))); let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1768,7 +2195,7 @@ mod test { ); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -1778,12 +2205,12 @@ mod test { // Test normal scan (forward) let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let forward_values = collect_int32_values(stream).await; // Test reverse scan let opener = make_opener(true); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let reverse_values = collect_int32_values(stream).await; // The forward scan should return data in the order written @@ -1794,6 +2221,42 @@ mod test { assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); } + #[tokio::test] + async fn test_morselizer_basic_parity() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = record_batch!(("a", Int32, vec![Some(4), Some(5)])).unwrap(); + + let data_len = write_parquet_batches( + Arc::clone(&store), + "morselizer_basic_parity.parquet", + vec![batch1, batch2], + None, + ) + .await; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(schema) + .with_projection_indices(&[0]) + .build(); + + let file = PartitionedFile::new( + "morselizer_basic_parity.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let opener_values = + collect_int32_values(open_via_morselizer(&opener, file.clone()).await).await; + let morsel_values = + collect_int32_values(open_via_morselizer(&opener, file).await).await; + + assert_eq!(opener_values, morsel_values); + } + #[tokio::test] async fn test_reverse_scan_single_row_group() { let store = Arc::new(InMemory::new()) as Arc; @@ -1810,7 +2273,7 @@ mod test { ); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -1821,11 +2284,11 @@ mod test { // With a single row group, forward and reverse should be the same // (only the row group order is reversed, not the rows within) let opener_forward = make_opener(false); - let stream_forward = opener_forward.open(file.clone()).unwrap().await.unwrap(); + let stream_forward = open_via_morselizer(&opener_forward, file.clone()).await; let (batches_forward, _) = count_batches_and_rows(stream_forward).await; let opener_reverse = make_opener(true); - let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap(); + let stream_reverse = open_via_morselizer(&opener_reverse, file).await; let (batches_reverse, _) = count_batches_and_rows(stream_reverse).await; // Both should have the same number of batches since there's only one row group @@ -1886,7 +2349,7 @@ mod test { .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -1896,7 +2359,7 @@ mod test { // Forward scan: RG0(3,4), RG1(5,6,7,8), RG2(9,10) let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let forward_values = collect_int32_values(stream).await; // Forward scan should produce: RG0(3,4), RG1(5,6,7,8), RG2(9,10) @@ -1912,7 +2375,7 @@ mod test { // - RG1 is read second, WITH RG1's selection (select all) -> 5, 6, 7, 8 // - RG0 is read third, WITH RG0's selection (skip 2, select 2) -> 3, 4 let opener = make_opener(true); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file).await; let reverse_values = collect_int32_values(stream).await; // Correct expected result: row groups reversed but each keeps its own selection @@ -1987,7 +2450,7 @@ mod test { .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -1998,7 +2461,7 @@ mod test { // Forward scan: RG0(1), RG2(5), RG3(7) // Note: RG1 is completely skipped let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file.clone()).await; let forward_values = collect_int32_values(stream).await; assert_eq!( @@ -2011,7 +2474,7 @@ mod test { // WITHOUT the bug fix, this would return WRONG values // because the RowSelection would be incorrectly mapped let opener = make_opener(true); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_via_morselizer(&opener, file).await; let reverse_values = collect_int32_values(stream).await; assert_eq!( diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 1355eff5b9f11..d439bc62252c2 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -23,19 +23,22 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; -use crate::opener::ParquetOpener; -use crate::opener::build_pruning_predicates; +use crate::ParquetMorselizer; +use crate::opener::{ + EncryptionContext, ParquetMorselizerState, build_pruning_predicates, +}; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::morsel::Morselizer; use arrow::datatypes::TimeUnit; -use datafusion_common::DataFusionError; use datafusion_common::config::TableParquetOptions; use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::{DataFusionError, internal_err}; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -474,48 +477,12 @@ impl ParquetSource { } } - pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self { - self.reverse_row_groups = reverse_row_groups; - self - } - #[cfg(test)] - pub(crate) fn reverse_row_groups(&self) -> bool { - self.reverse_row_groups - } -} - -/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit -pub(crate) fn parse_coerce_int96_string( - str_setting: &str, -) -> datafusion_common::Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - - match str_setting_lower { - "ns" => Ok(TimeUnit::Nanosecond), - "us" => Ok(TimeUnit::Microsecond), - "ms" => Ok(TimeUnit::Millisecond), - "s" => Ok(TimeUnit::Second), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet coerce_int96: \ - {str_setting}. Valid values are: ns, us, ms, and s." - ))), - } -} - -/// Allows easy conversion from ParquetSource to Arc<dyn FileSource> -impl From for Arc { - fn from(source: ParquetSource) -> Self { - as_file_source(source) - } -} - -impl FileSource for ParquetSource { - fn create_file_opener( + fn create_parquet_morselizer( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result> { + ) -> datafusion_common::Result { let expr_adapter_factory = base_config .expr_adapter_factory .clone() @@ -526,14 +493,24 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); + #[cfg(not(feature = "parquet_encryption"))] + let encryption_context = EncryptionContext::default(); + #[cfg(feature = "parquet_encryption")] - let file_decryption_properties = self - .table_parquet_options() - .crypto - .file_decryption - .clone() - .map(FileDecryptionProperties::from) - .map(Arc::new); + let encryption_context = { + let file_decryption_properties = self + .table_parquet_options() + .crypto + .file_decryption + .clone() + .map(FileDecryptionProperties::from) + .map(Arc::new); + + EncryptionContext::new( + file_decryption_properties, + self.get_encryption_factory_with_config(), + ) + }; let coerce_int96 = self .table_parquet_options @@ -542,12 +519,12 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); - let opener = Arc::new(ParquetOpener { + Ok(ParquetMorselizer::new(ParquetMorselizerState { partition_index: partition, projection: self.projection.clone(), batch_size: self .batch_size - .expect("Batch size must set before creating ParquetOpener"), + .expect("Batch size must set before creating ParquetMorselizer"), limit: base_config.limit, preserve_order: base_config.preserve_order, predicate: self.predicate.clone(), @@ -562,15 +539,69 @@ impl FileSource for ParquetSource { enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties, expr_adapter_factory, - #[cfg(feature = "parquet_encryption")] - encryption_factory: self.get_encryption_factory_with_config(), + encryption_context, max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, - }); - Ok(opener) + })) + } + + pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self { + self.reverse_row_groups = reverse_row_groups; + self + } + #[cfg(test)] + pub(crate) fn reverse_row_groups(&self) -> bool { + self.reverse_row_groups + } +} + +/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit +pub(crate) fn parse_coerce_int96_string( + str_setting: &str, +) -> datafusion_common::Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + + match str_setting_lower { + "ns" => Ok(TimeUnit::Nanosecond), + "us" => Ok(TimeUnit::Microsecond), + "ms" => Ok(TimeUnit::Millisecond), + "s" => Ok(TimeUnit::Second), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet coerce_int96: \ + {str_setting}. Valid values are: ns, us, ms, and s." + ))), + } +} + +/// Allows easy conversion from ParquetSource to Arc<dyn FileSource> +impl From for Arc { + fn from(source: ParquetSource) -> Self { + as_file_source(source) + } +} + +impl FileSource for ParquetSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> datafusion_common::Result> { + internal_err!( + "ParquetSource::create_file_opener called but it supports Morsel API" + ) + } + + fn create_morselizer( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> datafusion_common::Result> { + let morselizer = + self.create_parquet_morselizer(object_store, base_config, partition)?; + Ok(Box::new(morselizer)) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index b5a6760cae020..f13cc863c7761 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; +use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; @@ -63,13 +64,34 @@ pub fn as_file_source(source: T) -> Arc /// /// [`DataSource`]: crate::source::DataSource pub trait FileSource: Send + Sync { - /// Creates a `dyn FileOpener` based on given parameters + /// Creates a `dyn FileOpener` based on given parameters. + /// + /// `FileSource`s that implement the Morsel API should return a "Not + /// Implemented" or "Internal" error for this API. + /// + /// TODO: deprecate fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> Result>; + + /// Creates a `dyn Morselizer` based on given parameters. + /// + /// The default implementation preserves existing behavior by adapting the + /// legacy [`FileOpener`] API into a [`Morselizer`]. File formats with a + /// native morsel-driven implementation should override this method to + /// return a [`Morselizer`] and not implement the [`FileOpener`] API. + fn create_morselizer( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> Result> { + let opener = self.create_file_opener(object_store, base_config, partition)?; + Ok(Box::new(FileOpenerMorselizer::new(opener))) + } /// Any fn as_any(&self) -> &dyn Any; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 82a986a688ed7..8e5653fb52185 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -586,9 +586,14 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); - let opener = source.create_file_opener(object_store, self, partition)?; + let morselizer = source.create_morselizer(object_store, self, partition)?; - let stream = FileStream::new(self, partition, opener, source.metrics())?; + let stream = FileStream::new_with_morselizer( + self, + partition, + morselizer, + source.metrics(), + )?; Ok(Box::pin(cooperative(stream))) } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index b75e66849b7a1..a5166393113df 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -39,30 +39,95 @@ use datafusion_physical_plan::metrics::{ use arrow::record_batch::RecordBatch; use datafusion_common::instant::Instant; +use crate::morsel::{FileOpenerMorselizer, Morsel, MorselPlanner, Morselizer}; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::{FutureExt as _, Stream, StreamExt as _, ready}; +use futures::{FutureExt, Stream, StreamExt as _}; + +/// How many planners can be active (performing I/O or producing morsels) at once for a given `FileStream`? +/// +/// This setting controls the potential number of concurrent IOs. +/// +/// Setting this to 1 means that the `FileStream` will only have one active +/// planner at a time, and will not start opening the next file until the +/// current file is fully processed. Setting this to a higher number allows the +/// `FileStream` to start opening the next file while still processing the +/// current file, which can improve performance by overlapping IO and CPU work. +/// However, setting this too high may lead to more memory buffering and +/// resource contention if there are too many concurrent IOs. +/// +/// TODO make this a config option +const TARGET_CONCURRENT_PLANNERS: usize = 2; + +/// Keep at most this many morsels buffered before pausing additional planning. +/// +/// The default is one morsel per available core. The intent is that once work +/// stealing is added, each other core can find at least one morsel to steal +/// without requiring the scan to eagerly buffer an unbounded amount of work. +/// +/// TODO make this a config option +fn max_buffered_morsels() -> usize { + std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(1) +} /// A stream that iterates record batch by record batch, file over file. +/// +/// When running, a FileStream has some number of waiting planners (that are +/// waiting on IO) and some number of read_planners that are waiting on CPU. +/// +/// When the next batch is requested, the FileStream will first poll any +/// outstanding io_requests to ensure I/O is making progress in parallel with +/// batch processing. +/// +/// It then tries to prioritize processing data it has in its cache by read from +/// the active stream, if any. If that is not ready, it will use the CPUto +/// prepare more morsels or discover new IO before launching the next morsel. +/// +/// The flow is: +/// 1. Read each file from `file_iter` and turn it into morsels (potentially in parallel) +/// 2. Read each morsel individually and produce `RecordBatch`es for processing +/// +/// Future feature: +/// Other FileStreams may steal morsels from this stream to increase parallelism and resource utilization. pub struct FileStream { /// An iterator over input files. file_iter: VecDeque, + /// One or more planners are waiting on I/O + waiting_planners: VecDeque, + /// One or more planners that is waiting on CPU time + ready_planners: VecDeque>, + /// Morsels which are waiting on CPU for processing + /// + /// (TODO steal morsels from other streams) + morsels: VecDeque>, + /// The current reader, if any + reader: Option>>, /// The stream schema (file schema including partition columns and after /// projection). projected_schema: SchemaRef, - /// The remaining number of records to parse, None if no limit + /// The remaining number of records to parse until limit is reached, None if no limit remain: Option, - /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], - /// which can be resolved to a stream of `RecordBatch`. - file_opener: Arc, - /// The stream state - state: FileStreamState, + /// A type specific [`Morselizer`] that examines the input files and produces a stream of `Morsels` + morselizer: Box, /// File stream specific metrics file_stream_metrics: FileStreamMetrics, /// runtime baseline metrics baseline_metrics: BaselineMetrics, /// Describes the behavior of the `FileStream` if file opening or scanning fails on_error: OnError, + /// Is the stream complete? + state: StreamState, +} + +enum StreamState { + /// Stream can make progress when polled + Active, + /// Stream is done + Done, + /// Stream is done, and errord + Error, } impl FileStream { @@ -72,6 +137,21 @@ impl FileStream { partition: usize, file_opener: Arc, metrics: &ExecutionPlanMetricsSet, + ) -> Result { + Self::new_with_morselizer( + config, + partition, + Box::new(FileOpenerMorselizer::new(file_opener)), + metrics, + ) + } + + /// Create a new FileStream from morsels + pub fn new_with_morselizer( + config: &FileScanConfig, + partition: usize, + morselizer: Box, + metrics: &ExecutionPlanMetricsSet, ) -> Result { let projected_schema = config.projected_schema()?; @@ -79,16 +159,19 @@ impl FileStream { Ok(Self { file_iter: file_group.into_inner().into_iter().collect(), + ready_planners: VecDeque::new(), + waiting_planners: VecDeque::new(), + morsels: VecDeque::new(), + reader: None, projected_schema, remain: config.limit, - file_opener, - state: FileStreamState::Idle, + morselizer, file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), on_error: OnError::Fail, + state: StreamState::Active, }) } - /// Specify the behavior when an error occurs opening or scanning a file /// /// If `OnError::Skip` the stream will skip files which encounter an error and continue @@ -98,187 +181,250 @@ impl FileStream { self } - /// Begin opening the next file in parallel while decoding the current file in FileStream. + /// Run a planner on CPU until it either needs I/O or fully completes. /// - /// Since file opening is mostly IO (and may involve a - /// bunch of sequential IO), it can be parallelized with decoding. - fn start_next_file(&mut self) -> Option> { - let part_file = self.file_iter.pop_front()?; - Some(self.file_opener.open(part_file)) + /// Any morsels produced along the way are appended to `self.morsels`. If + /// the planner needs more I/O, it is moved to `waiting_planners`. + fn plan_morsels(&mut self, mut planner: Box) -> Result<()> { + let max_buffered_morsels = max_buffered_morsels(); + while let Some(mut plan) = planner.plan()? { + self.morsels.extend(plan.take_morsels()); + if let Some(io_future) = plan.take_io_future() { + self.waiting_planners + .push_back(WaitingPlanner::new(planner, io_future)); + break; + } + + if self.morsels.len() >= max_buffered_morsels { + self.ready_planners.push_back(planner); + break; + } + } + Ok(()) } - fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { - loop { - match &mut self.state { - FileStreamState::Idle => { - self.file_stream_metrics.time_opening.start(); - - match self.start_next_file().transpose() { - Ok(Some(future)) => self.state = FileStreamState::Open { future }, - Ok(None) => return Poll::Ready(None), - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); + /// Turn one file into one or more planners and immediately drive each of + /// them until they either block on I/O or complete. + fn morselize_next_file(&mut self, file: PartitionedFile) -> Result<()> { + for planner in self.morselizer.morselize(file)? { + self.plan_morsels(planner)?; + } + Ok(()) + } + + /// Pull additional files into the planner pipeline until the configured + /// planner concurrency target is reached. + /// + /// This is where new file-level work enters the stream. Formats that do all + /// of their planning synchronously may immediately populate `self.morsels`, + /// while formats that need metadata I/O will populate `waiting_planners`. + fn start_next_files(&mut self) -> Result<()> { + let max_buffered_morsels = max_buffered_morsels(); + while (self.waiting_planners.len() + self.ready_planners.len()) + < TARGET_CONCURRENT_PLANNERS + { + if self.morsels.len() >= max_buffered_morsels { + break; + } + let Some(file) = self.file_iter.pop_front() else { + break; + }; + self.morselize_next_file(file)?; + } + Ok(()) + } + + /// Poll each waiting planner's outstanding I/O once. + /// + /// When a future completes successfully, the planner becomes CPU-ready + /// again and is moved back to `ready_planners`. Failed futures are handled + /// according to `OnError`. + fn check_io(&mut self, cx: &mut Context<'_>) -> Result<()> { + for mut waiting_planner in mem::take(&mut self.waiting_planners) { + match waiting_planner.io_future.poll_unpin(cx) { + Poll::Ready(Ok(())) => { + self.file_stream_metrics.files_opened.add(1); + self.ready_planners.push_back(waiting_planner.planner); + } + Poll::Ready(Err(e)) => { + self.file_stream_metrics.file_open_errors.add(1); + match self.on_error { + OnError::Skip => { + self.file_stream_metrics.files_processed.add(1); } + OnError::Fail => return Err(e), } } - FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { - Ok(reader) => { - self.file_stream_metrics.files_opened.add(1); - // include time needed to start opening in `start_next_file` - self.file_stream_metrics.time_opening.stop(); - let next = { - let scanning_total_metric = self - .file_stream_metrics - .time_scanning_total - .metrics - .clone(); - let _timer = scanning_total_metric.timer(); - self.start_next_file().transpose() + Poll::Pending => self.waiting_planners.push_back(waiting_planner), + } + } + Ok(()) + } + + /// Convert the next ready morsel into an active `RecordBatch` reader. + /// + /// This only happens when there is no reader currently in flight. The + /// corresponding scan timers start here because the morsel is now eligible + /// to produce batches. + fn start_next_morsel(&mut self) { + if self.reader.is_none() + && let Some(morsel) = self.morsels.pop_front() + { + self.reader = Some(morsel.into_stream()); + self.file_stream_metrics.time_scanning_until_data.start(); + self.file_stream_metrics.time_scanning_total.start(); + } + } + + /// Drive the `FileStream` scheduler forward by one poll. + /// + /// The order is important: + /// 1. Admit more files into the planner pipeline up to the concurrency + /// target (ensures I/O are scheduled if needed) + /// 2. Poll outstanding planner I/O (ensure I/O completes in parallel) + /// 3. Spend CPU on ready planners only when there is no morsel already ready + /// to execute. + /// 4. Launch and poll the active morsel reader. + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { + loop { + match self.state { + StreamState::Active => {} + StreamState::Done => return Poll::Ready(None), + StreamState::Error => return Poll::Ready(None), + } + + if let Err(e) = self.start_next_files() { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + if let Err(e) = self.check_io(cx) { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + + // Give ready planners CPU whenever there is buffer space, even if a + // reader is currently active. This avoids starving planner work + // behind a reader that is itself waiting on I/O. + while self.morsels.len() < max_buffered_morsels() { + let Some(planner) = self.ready_planners.pop_front() else { + break; + }; + if let Err(e) = self.plan_morsels(planner) { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + + // Once a morsel is buffered and a reader is already active, + // return to the scan side of the scheduler rather than + // continuing to spend CPU on planning in this poll. + if self.reader.is_some() && !self.morsels.is_empty() { + break; + } + } + + // Newly planned work may have just discovered fresh I/O. Poll it + // once now so the future can register the current waker before we + // return `Pending`; otherwise the stream can stall waiting on an + // I/O future that has never been polled. + if let Err(e) = self.check_io(cx) { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + + // The second I/O poll may have completed planner work discovered + // during this same call to `poll_inner`. Loop back so newly ready + // planners get CPU time before we consider returning `Pending`. + if !self.ready_planners.is_empty() + && self.morsels.len() < max_buffered_morsels() + { + continue; + } + + self.start_next_morsel(); + + if let Some(reader) = self.reader.as_mut() { + match reader.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + self.file_stream_metrics.time_scanning_until_data.stop(); + self.file_stream_metrics.time_scanning_total.stop(); + let batch = match &mut self.remain { + Some(remain) => { + if batch.num_rows() > *remain { + let batch = batch.slice(0, *remain); + *remain = 0; + self.state = StreamState::Done; + batch + } else { + *remain -= batch.num_rows(); + batch + } + } + None => batch, }; - self.file_stream_metrics.time_scanning_until_data.start(); self.file_stream_metrics.time_scanning_total.start(); + return Poll::Ready(Some(Ok(batch))); + } + Poll::Ready(Some(Err(e))) => { + self.reader = None; + self.file_stream_metrics.file_scan_errors.add(1); + self.file_stream_metrics.time_scanning_until_data.stop(); + self.file_stream_metrics.time_scanning_total.stop(); - match next { - Ok(Some(next_future)) => { - self.state = FileStreamState::Scan { - reader, - next: Some(NextOpen::Pending(next_future)), - }; - } - Ok(None) => { - self.state = FileStreamState::Scan { reader, next: None }; - } - Err(e) => { - self.state = FileStreamState::Error; + match self.on_error { + OnError::Fail => { + self.waiting_planners.clear(); + self.ready_planners.clear(); + self.morsels.clear(); + self.state = StreamState::Error; return Poll::Ready(Some(Err(e))); } - } - } - Err(e) => { - self.file_stream_metrics.file_open_errors.add(1); - match self.on_error { OnError::Skip => { self.file_stream_metrics.files_processed.add(1); - self.file_stream_metrics.time_opening.stop(); - self.state = FileStreamState::Idle - } - OnError::Fail => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); + continue; } } } - }, - FileStreamState::Scan { reader, next } => { - // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some(next_open_future) = next - && let NextOpen::Pending(f) = next_open_future - && let Poll::Ready(reader) = f.as_mut().poll(cx) - { - *next_open_future = NextOpen::Ready(reader); - } - match ready!(reader.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - let batch = match &mut self.remain { - Some(remain) => { - if *remain > batch.num_rows() { - *remain -= batch.num_rows(); - batch - } else { - let batch = batch.slice(0, *remain); - // Count this file, the prefetched next file - // (if any), and all remaining files we will - // never open. - let done = 1 - + self.file_iter.len() - + usize::from(next.is_some()); - self.file_stream_metrics - .files_processed - .add(done); - self.state = FileStreamState::Limit; - *remain = 0; - batch - } - } - None => batch, - }; - self.file_stream_metrics.time_scanning_total.start(); - return Poll::Ready(Some(Ok(batch))); - } - Some(Err(err)) => { - self.file_stream_metrics.file_scan_errors.add(1); - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - - match self.on_error { - // If `OnError::Skip` we skip the file as soon as we hit the first error - OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); - match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = - FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin( - std::future::ready(reader), - ), - } - } - } - } - None => return Poll::Ready(None), - } - } - OnError::Fail => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(err))); - } - } - } - None => { - self.file_stream_metrics.files_processed.add(1); - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - - match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin(std::future::ready( - reader, - )), - } - } - } - } - None => return Poll::Ready(None), - } - } + Poll::Ready(None) => { + self.reader = None; + self.file_stream_metrics.files_processed.add(1); + self.file_stream_metrics.time_scanning_until_data.stop(); + self.file_stream_metrics.time_scanning_total.stop(); + continue; } + Poll::Pending => {} } - FileStreamState::Error | FileStreamState::Limit => { - return Poll::Ready(None); - } } + + if self.reader.is_none() + && self.morsels.is_empty() + && self.ready_planners.is_empty() + && self.waiting_planners.is_empty() + && self.file_iter.is_empty() + { + self.state = StreamState::Done; + return Poll::Ready(None); + } + + return Poll::Pending; } } } +/// A planner that has already discovered its next I/O phase. +struct WaitingPlanner { + planner: Box, + io_future: BoxFuture<'static, Result<()>>, +} + +impl WaitingPlanner { + fn new( + planner: Box, + io_future: BoxFuture<'static, Result<()>>, + ) -> Self { + Self { planner, io_future } + } +} + impl Stream for FileStream { type Item = Result; @@ -300,6 +446,9 @@ impl RecordBatchStream for FileStream { } /// A fallible future that resolves to a stream of [`RecordBatch`] +/// +/// This is typically an `async` function that opens the file, and returns a +/// stream that reads the file and produces `RecordBatch`es. pub type FileOpenFuture = BoxFuture<'static, Result>>>; @@ -320,42 +469,11 @@ pub enum OnError { pub trait FileOpener: Unpin + Send + Sync { /// Asynchronously open the specified file and return a stream /// of [`RecordBatch`] + /// + /// TODO: describe prefetching behavior here, and expectations around IO fn open(&self, partitioned_file: PartitionedFile) -> Result; } -/// Represents the state of the next `FileOpenFuture`. Since we need to poll -/// this future while scanning the current file, we need to store the result if it -/// is ready -pub enum NextOpen { - Pending(FileOpenFuture), - Ready(Result>>), -} - -pub enum FileStreamState { - /// The idle state, no file is currently being read - Idle, - /// Currently performing asynchronous IO to obtain a stream of RecordBatch - /// for a given file - Open { - /// A [`FileOpenFuture`] returned by [`FileOpener::open`] - future: FileOpenFuture, - }, - /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] - /// returned by [`FileOpener::open`] - Scan { - /// The reader instance - reader: BoxStream<'static, Result>, - /// A [`FileOpenFuture`] for the next file to be processed. - /// This allows the next file to be opened in parallel while the - /// current file is read. - next: Option, - }, - /// Encountered an error - Error, - /// Reached the row limit - Limit, -} - /// A timer that can be started and stopped. pub struct StartableTime { pub metrics: Time, diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index bcc4627050d4a..6bb172f86e38d 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -38,6 +38,7 @@ pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; pub mod memory; +pub mod morsel; pub mod projection; pub mod schema_adapter; pub mod sink; @@ -126,7 +127,8 @@ pub struct PartitionedFile { /// [`wrap_partition_value_in_dict`]: crate::file_scan_config::wrap_partition_value_in_dict /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L87 pub partition_values: Vec, - /// An optional file range for a more fine-grained parallel execution + /// An optional file range for this file. This is used to statically + /// schedule non-overlapping sections of a file to be read in parallel. pub range: Option, /// Optional statistics that describe the data in this file if known. /// diff --git a/datafusion/datasource/src/morsel/adapters.rs b/datafusion/datasource/src/morsel/adapters.rs new file mode 100644 index 0000000000000..460ba314f3264 --- /dev/null +++ b/datafusion/datasource/src/morsel/adapters.rs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionedFile; +use crate::file_stream::FileOpener; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use futures::FutureExt; +use futures::stream::BoxStream; +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; + +/// An adapter for `FileOpener` that allows it to be used as a `Morselizer` for +/// backwards compatibility. +/// +/// This is useful for file formats that do not support morselization, where we +/// can treat the entire file as a single morsel. +pub struct FileOpenerMorselizer { + file_opener: Arc, +} + +impl Debug for FileOpenerMorselizer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileOpenerMorselizer") + .field("file_opener", &"...") + .finish() + } +} + +impl FileOpenerMorselizer { + pub fn new(file_opener: Arc) -> Self { + Self { file_opener } + } +} + +impl Morselizer for FileOpenerMorselizer { + fn morselize(&self, file: PartitionedFile) -> Result>> { + let opener = Arc::clone(&self.file_opener); + let planner = FileOpenFutureMorselPlanner::new(opener, file); + Ok(vec![Box::new(planner)]) + } +} + +/// Adapter for `FileOpenFuture` that allows it to be used as a `MorselPlanner` +/// for backwards compatibility. +struct FileOpenFutureMorselPlanner { + file_opener: Arc, + stream: Arc>>>>, + file: Mutex>, +} + +impl Debug for FileOpenFutureMorselPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileOpenFutureMorselPlanner") + .field("file_opener", &"...") + .field("stream", &"...") + .field("file", &self.file) + .finish() + } +} + +impl FileOpenFutureMorselPlanner { + pub fn new(file_opener: Arc, file: PartitionedFile) -> Self { + Self { + file_opener, + stream: Arc::new(Mutex::new(None)), + file: Mutex::new(Some(file)), + } + } +} + +impl MorselPlanner for FileOpenFutureMorselPlanner { + fn plan(&mut self) -> Result> { + let mut morsel_plan = MorselPlan::new(); + let mut made_progress = false; + + // Note that plan should **not** do IO work so setup a callback if needed + if let Some(file) = self.file.lock().unwrap().take() { + let file_opener = Arc::clone(&self.file_opener); + let output_stream = Arc::clone(&self.stream); + let load_future = async move { + let stream = file_opener + // open the file to get a stream + .open(file)? + // create the stream + .await?; + // store the stream for later retrieval + *(output_stream.lock().unwrap()) = Some(stream); + Ok(()) + }; + morsel_plan = morsel_plan.with_io_future(load_future.boxed()); + made_progress = true; + } + + // If the stream is ready, return it as a morsel + if let Some(stream) = self.stream.lock().unwrap().take() { + let morsel = FileStreamMorsel::new(stream); + morsel_plan = morsel_plan.with_morsels(vec![Box::new(morsel)]); + made_progress = true; + } + + Ok(made_progress.then_some(morsel_plan)) + } +} + +struct FileStreamMorsel { + stream: BoxStream<'static, Result>, +} + +impl Debug for FileStreamMorsel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileStreamMorsel") + .field("stream", &"...") + .finish() + } +} + +impl FileStreamMorsel { + pub fn new(stream: BoxStream<'static, Result>) -> Self { + Self { stream } + } +} + +impl Morsel for FileStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } + + fn split(&mut self) -> Result>> { + Ok(vec![]) // no splitting supported + } +} diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs new file mode 100644 index 0000000000000..14b9f31d34e95 --- /dev/null +++ b/datafusion/datasource/src/morsel/mod.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Structures for Morsel Driven IO +//! +//! Morsel Driven IO is a technique for parallelizing the reading of large files +//! by dividing them into smaller "morsels" that can be processed independently. +//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query +//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf) + +mod adapters; + +use crate::PartitionedFile; +use arrow::array::RecordBatch; +use datafusion_common::error::Result; +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use std::fmt::Debug; + +pub use adapters::FileOpenerMorselizer; + +/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es +/// +/// This represents a single morsel of work that is ready to be processed. It +/// has all data necessary (does not need any I/O) and is ready to be turned +/// into a stream of RecordBatches for processing by the execution engine. +pub trait Morsel: Send + Debug { + /// Consume this morsel and produce a stream of RecordBatches for processing. + /// + /// This should not do any IO work, such as reading from the file. + fn into_stream(self: Box) -> BoxStream<'static, Result>; + + /// If supported, split this morsel into smaller morsels. + /// + /// If not possible or not supported, return an empty Vector. + /// + /// This is used for dynamic load balancing of work where there are some + /// tasks that have nothing else scheduled. + fn split(&mut self) -> Result>>; +} + +/// A Morselizer takes a single PartitionedFile and breaks it down into smaller chunks +/// that can be planned and read in parallel by the execution engine. This is the entry point for +/// morsel driven IO. +pub trait Morselizer: Send + Sync + Debug { + /// Return MorselPlanners for this file. + /// + /// Each MorselPlanner is responsible for I/O and planning morsels for a + /// single scan of the file. Returning multiple MorselPlanners allows for + /// multiple concurrent scans of the same file. + /// + /// This may involve CPU work, such as parsing parquet metadata and evaluating pruning predicates. + /// It should NOT do any IO work, such as reading from the file. If IO is required, it should + /// return a future that the caller can poll to drive the IO work to completion, and once the future + /// is complete, the caller can call `morselize` again to get the next morsels. + fn morselize(&self, file: PartitionedFile) -> Result>>; +} + +/// A Morsel Planner is responsible for creating morsels for a given scan. +/// +/// The MorselPlanner is the unit of I/O -- there is only ever a single IO +/// outstanding for a specific MorselPlanner. DataFusion will potentially run +/// multiple MorselPlanners in parallel which corresponds to multiple parallel +/// I/O requests. +/// +/// It is not a Rust `Stream` so that it can explicitly separate CPU bound +/// work from IO work. +/// +/// The design is similar to `ParquetPushDecoder` -- when `plan` is called, it +/// should do CPU work to produce the next morsels or discover the next I/O +/// phase. +/// +/// Best practice is to spawn IO in a tokio Task in a separate IO runtime to +/// ensure that CPU work doesn't block/slowdown IO work, but this is not +/// strictly required by the API. +pub trait MorselPlanner: Send + Debug { + /// Attempt to plan morsels. This may involve CPU work, such as parsing + /// parquet metadata and evaluating pruning predicates. + /// + /// It should NOT do any IO work, such as reading from the file. If IO is + /// required, it should return a future that the caller can poll to drive + /// the IO work to completion, and once the future is complete, the caller + /// can call `plan` again to get the next morsels. + /// + /// Note this function is not async to make it clear explicitly that if IO + /// is required, it should be done in the returned `io_future`. + /// + /// Returns None if the MorselPlanner has no more work to do (is done). + /// + /// It may return Some(..) with an empty MorselPlan, which means it is ready + /// for more CPU work and should be called again. + fn plan(&mut self) -> Result>; +} + +/// Return result of [`MorselPlanner::plan`] +#[derive(Default)] +pub struct MorselPlan { + /// Any Morsels that are ready for processing. + morsels: Vec>, + /// A future that will drive any IO work to completion + /// + /// DataFusion will poll this future occasionally to drive the IO work to + /// completion. Once the future resolves, DataFusion will call `plan` again + /// to get the next morsels. Best practice is to run this in a task on a + /// separate IO runtime to ensure that CPU work is not blocked by IO work, + /// but this is not strictly required by the API. + io_future: Option>>, +} + +impl MorselPlan { + pub fn new() -> Self { + Default::default() + } + + pub fn with_morsels(mut self, morsels: Vec>) -> Self { + self.morsels = morsels; + self + } + + pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { + self.io_future = Some(io_future); + self + } + + pub fn take_io_future(&mut self) -> Option>> { + self.io_future.take() + } + + pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { + self.io_future = Some(io_future); + } + + pub fn take_morsels(&mut self) -> Vec> { + std::mem::take(&mut self.morsels) + } + + pub fn has_io_future(&self) -> bool { + self.io_future.is_some() + } +}