Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
689 changes: 441 additions & 248 deletions native/Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ edition = "2021"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "58.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "57.3.0", default-features = false, features = ["experimental"] }
datafusion = { version = "52.2.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "52.2.0" }
datafusion-physical-expr-adapter = { version = "52.2.0" }
datafusion-spark = { version = "52.2.0" }
parquet = { version = "58.0.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-53", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-spark = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-proto = { path = "proto" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand All @@ -51,7 +51,7 @@ num = "0.4"
rand = "0.10"
regex = "1.12.3"
thiserror = "2"
object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"] }
object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] }
url = "2.2"
aws-config = "1.8.14"
aws-credential-types = "1.2.13"
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "52.2.0" }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", branch = "branch-53" }

[features]
backtrace = ["datafusion/backtrace"]
Expand Down
8 changes: 4 additions & 4 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct ExpandExec {
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl ExpandExec {
Expand All @@ -52,12 +52,12 @@ impl ExpandExec {
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> Self {
let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Self {
projections,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl ExecutionPlan for ExpandExec {
Ok(Box::pin(expand_stream))
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
18 changes: 12 additions & 6 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct IcebergScanExec {
/// Output schema after projection
output_schema: SchemaRef,
/// Cached execution plan properties
plan_properties: PlanProperties,
plan_properties: Arc<PlanProperties>,
/// Catalog-specific configuration for FileIO
catalog_properties: HashMap<String, String>,
/// Pre-planned file scan tasks
Expand Down Expand Up @@ -92,13 +92,13 @@ impl IcebergScanExec {
})
}

fn compute_properties(schema: SchemaRef, num_partitions: usize) -> PlanProperties {
PlanProperties::new(
fn compute_properties(schema: SchemaRef, num_partitions: usize) -> Arc<PlanProperties> {
Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(num_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)
))
}
}

Expand All @@ -115,7 +115,7 @@ impl ExecutionPlan for IcebergScanExec {
Arc::clone(&self.output_schema)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}

Expand Down Expand Up @@ -269,7 +269,13 @@ where
_ => {
let adapter = self
.adapter_factory
.create(Arc::clone(&self.schema), Arc::clone(&file_schema));
.create(Arc::clone(&self.schema), Arc::clone(&file_schema))
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create physical expr adapter: {}",
e
))
})?;
let exprs =
build_projection_expressions(&self.schema, &adapter).map_err(|e| {
DataFusionError::Execution(format!(
Expand Down
12 changes: 4 additions & 8 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub struct ParquetWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl ParquetWriterExec {
Expand All @@ -228,12 +228,12 @@ impl ParquetWriterExec {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();

let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
input_partitioning,
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(ParquetWriterExec {
input,
Expand Down Expand Up @@ -405,11 +405,7 @@ impl ExecutionPlan for ParquetWriterExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
8 changes: 4 additions & 4 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct ScanExec {
/// It is also used in unit test to mock the input data from JVM.
pub batch: Arc<Mutex<Option<InputBatch>>>,
/// Cache of expensive-to-compute plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
Expand All @@ -95,14 +95,14 @@ impl ScanExec {
// Build schema directly from data types since get_next now always unpacks dictionaries
let schema = schema_from_data_types(&data_types);

let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
// The partitioning is not important because we are not using DataFusion's
// query planner or optimizer
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(Self {
exec_context_id,
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ExecutionPlan for ScanExec {
)))
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,7 @@ impl PhysicalPlanner {
// null doesn't equal to null in Spark join key. If the join key is
// `EqualNullSafe`, Spark will rewrite it during planning.
NullEquality::NullEqualsNothing,
false,
)?);

// If the hash join is build right, we need to swap the left and right
Expand Down
12 changes: 4 additions & 8 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct ShuffleWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for expensive-to-compute plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
/// The compression codec to use when compressing shuffle blocks
codec: CompressionCodec,
tracing_enabled: bool,
Expand All @@ -82,12 +82,12 @@ impl ShuffleWriterExec {
tracing_enabled: bool,
write_buffer_size: usize,
) -> Result<Self> {
let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(ShuffleWriterExec {
input,
Expand Down Expand Up @@ -133,11 +133,7 @@ impl ExecutionPlan for ShuffleWriterExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
8 changes: 4 additions & 4 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
) -> DataFusionResult<Arc<dyn PhysicalExprAdapter>> {
// When case-insensitive, remap physical schema field names to match logical
// field names. The DefaultPhysicalExprAdapter uses exact name matching, so
// without this remapping, columns like "a" won't match logical "A" and will
Expand Down Expand Up @@ -145,16 +145,16 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
let default_adapter = default_factory.create(
Arc::clone(&logical_file_schema),
Arc::clone(&adapted_physical_schema),
);
)?;

Arc::new(SparkPhysicalExprAdapter {
Ok(Arc::new(SparkPhysicalExprAdapter {
logical_file_schema,
physical_file_schema: adapted_physical_schema,
parquet_options: self.parquet_options.clone(),
default_values: self.default_values.clone(),
default_adapter,
logical_to_physical_names,
})
}))
}
}

Expand Down
Loading