Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod access_plan;
pub mod file_format;
pub mod metadata;
mod metrics;
mod morsel;
mod opener;
mod page_filter;
mod reader;
Expand All @@ -37,6 +38,7 @@ mod writer;
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use file_format::*;
pub use metrics::ParquetFileMetrics;
pub use morsel::ParquetMorselizer;
pub use page_filter::PagePruningAccessPlanFilter;
pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
Expand Down
2,482 changes: 2,482 additions & 0 deletions datafusion/datasource-parquet/src/morsel.rs

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use std::sync::Arc;

use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::ParquetMorselizer;
use crate::morsel::{EncryptionContext, ParquetMorselizerState};
use crate::opener::ParquetOpener;
use crate::opener::build_pruning_predicates;
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
Expand All @@ -31,6 +33,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::morsel::Morselizer;

use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -573,6 +576,76 @@ impl FileSource for ParquetSource {
Ok(opener)
}

fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Box<dyn Morselizer>> {
let expr_adapter_factory = base_config
.expr_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory) as _);

let parquet_file_reader_factory =
self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
});

#[cfg(not(feature = "parquet_encryption"))]
let encryption_context = EncryptionContext::default();

#[cfg(feature = "parquet_encryption")]
let encryption_context = {
let file_decryption_properties = self
.table_parquet_options()
.crypto
.file_decryption
.clone()
.map(FileDecryptionProperties::from)
.map(Arc::new);

EncryptionContext::new(
file_decryption_properties,
self.get_encryption_factory_with_config(),
)
};

let coerce_int96 = self
.table_parquet_options
.global
.coerce_int96
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let morselizer = ParquetMorselizer::new(ParquetMorselizerState {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetMorselizer"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
expr_adapter_factory,
encryption_context,
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
});
Ok(Box::new(morselizer))
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
16 changes: 16 additions & 0 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::morsel::{FileOpenerMorselizer, Morselizer};
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -70,6 +71,21 @@ pub trait FileSource: Send + Sync {
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;

/// Creates a `dyn Morselizer` based on given parameters.
///
/// The default implementation preserves existing behavior by adapting the
/// legacy [`FileOpener`] API into a [`Morselizer`]. File formats with a
/// native morsel-driven implementation should override this method.
fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Box<dyn Morselizer>> {
let opener = self.create_file_opener(object_store, base_config, partition)?;
Ok(Box::new(FileOpenerMorselizer::new(opener)))
}
/// Any
fn as_any(&self) -> &dyn Any;

Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,14 @@ impl DataSource for FileScanConfig {

let source = self.file_source.with_batch_size(batch_size);

let opener = source.create_file_opener(object_store, self, partition)?;
let morselizer = source.create_morselizer(object_store, self, partition)?;

let stream = FileStream::new(self, partition, opener, source.metrics())?;
let stream = FileStream::new_with_morselizer(
self,
partition,
morselizer,
source.metrics(),
)?;
Ok(Box::pin(cooperative(stream)))
}

Expand Down
Loading
Loading