Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 125 additions & 145 deletions native/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ arrow = { version = "58.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "58.3.0", default-features = false, features = ["experimental"] }
datafusion = { version = "53.1.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "53.1.0" }
datafusion-physical-expr-adapter = { version = "53.1.0" }
datafusion-spark = { version = "53.1.0", features = ["core"] }
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-54", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-spark = { git = "https://github.com/apache/datafusion", branch = "branch-54", features = ["core"] }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-common = { path = "common" }
datafusion-comet-jni-bridge = { path = "jni-bridge" }
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jni = { version = "0.22.4", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "53.1.0" }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", branch = "branch-54" }

[features]
backtrace = ["datafusion/backtrace"]
Expand Down
15 changes: 8 additions & 7 deletions native/core/src/debug/debug_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -82,15 +81,20 @@ impl datafusion::physical_plan::ExecutionPlan for DebugExecutionDataStream {
fn name(&self) -> &str {
"DebugExecutionDataStream"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
self.inner.properties()
}
fn children(&self) -> Vec<&Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
vec![&self.inner]
}
fn apply_expressions(
&self,
_f: &mut dyn FnMut(
&dyn PhysicalExpr,
) -> Result<datafusion::common::tree_node::TreeNodeRecursion>,
) -> Result<datafusion::common::tree_node::TreeNodeRecursion> {
Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue)
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
Expand Down Expand Up @@ -155,9 +159,6 @@ impl Hash for DebugExecutionDataPhyExpr {
}

impl PhysicalExpr for DebugExecutionDataPhyExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.inner.data_type(input_schema)
}
Expand Down
4 changes: 0 additions & 4 deletions native/core/src/execution/expressions/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ impl Hash for CheckedBinaryExpr {
}

impl PhysicalExpr for CheckedBinaryExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.child.fmt_sql(f)
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/expressions/list_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -68,10 +67,6 @@ impl Hash for ListPositionsExpr {
}

impl PhysicalExpr for ListPositionsExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ExpressionBuilder for RlikeBuilder {
let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?;

match right.as_any().downcast_ref::<Literal>().unwrap().value() {
match right.downcast_ref::<Literal>().unwrap().value() {
ScalarValue::Utf8(Some(pattern)) => Ok(Arc::new(RLike::try_new(left, pattern)?)),
_ => Err(ExecutionError::GeneralError(
"RLike only supports scalar patterns".to_string(),
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use jni::{
sys::{jboolean, jbyte, jint, jlong, jshort},
};
use std::{
any::Any,
fmt::{Display, Formatter},
hash::Hash,
sync::Arc,
Expand Down Expand Up @@ -63,10 +62,6 @@ impl Display for Subquery {
}

impl PhysicalExpr for Subquery {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}
Expand Down
12 changes: 11 additions & 1 deletion native/core/src/execution/memory_pools/fair_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::{
fmt::{Debug, Formatter, Result as FmtResult},
fmt::{Debug, Display, Formatter, Result as FmtResult},
sync::Arc,
};

Expand Down Expand Up @@ -83,10 +83,20 @@ impl CometFairMemoryPool {
}
}

impl Display for CometFairMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "CometFairMemoryPool")
}
}

unsafe impl Send for CometFairMemoryPool {}
unsafe impl Sync for CometFairMemoryPool {}

impl MemoryPool for CometFairMemoryPool {
fn name(&self) -> &str {
"CometFairMemoryPool"
}

fn register(&self, _: &MemoryConsumer) {
let mut state = self.state.lock();
state.num = state
Expand Down
11 changes: 11 additions & 0 deletions native/core/src/execution/memory_pools/logging_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion::execution::memory_pool::{
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
};
use log::{info, warn};
use std::fmt::{self, Display};
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -36,7 +37,17 @@ impl LoggingMemoryPool {
}
}

impl Display for LoggingMemoryPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LoggingMemoryPool(task={})", self.task_attempt_id)
}
}

impl MemoryPool for LoggingMemoryPool {
fn name(&self) -> &str {
"LoggingMemoryPool"
}

fn register(&self, consumer: &MemoryConsumer) {
info!(
"[Task {}] MemoryPool[{}].register()",
Expand Down
12 changes: 11 additions & 1 deletion native/core/src/execution/memory_pools/unified_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::{
fmt::{Debug, Formatter, Result as FmtResult},
fmt::{Debug, Display, Formatter, Result as FmtResult},
sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
Arc,
Expand Down Expand Up @@ -93,7 +93,17 @@ impl Drop for CometUnifiedMemoryPool {
unsafe impl Send for CometUnifiedMemoryPool {}
unsafe impl Sync for CometUnifiedMemoryPool {}

impl Display for CometUnifiedMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "CometUnifiedMemoryPool(task={})", self.task_attempt_id)
}
}

impl MemoryPool for CometUnifiedMemoryPool {
fn name(&self) -> &str {
"CometUnifiedMemoryPool"
}

fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.try_grow(reservation, additional).unwrap();
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/merge_as_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
//! outputs state) but redirects `update_batch` calls to `merge_batch`, giving merge
//! semantics with state output.

use std::any::Any;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};

Expand Down Expand Up @@ -100,10 +99,6 @@ impl MergeAsPartialUDF {
}

impl AggregateUDFImpl for MergeAsPartialUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
19 changes: 14 additions & 5 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::DataFusionError;
use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
Expand All @@ -29,7 +30,6 @@ use datafusion::{
};
use futures::{Stream, StreamExt};
use std::{
any::Any,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -91,10 +91,6 @@ impl DisplayAs for ExpandExec {
}

impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand All @@ -103,6 +99,19 @@ impl ExecutionPlan for ExpandExec {
vec![&self.child]
}

fn apply_expressions(
&self,
f: &mut dyn FnMut(&dyn PhysicalExpr) -> datafusion::common::Result<TreeNodeRecursion>,
) -> datafusion::common::Result<TreeNodeRecursion> {
let mut tnr = TreeNodeRecursion::Continue;
for projection in &self.projections {
for expr in projection {
tnr = tnr.visit_sibling(|| f(expr.as_ref()))?;
}
}
Ok(tnr)
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
13 changes: 8 additions & 5 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Native Iceberg table scan operator using iceberg-rust

use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
Expand All @@ -26,6 +25,7 @@ use std::task::{Context, Poll};

use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{DataFusionError, Result as DFResult};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::expressions::Column;
Expand Down Expand Up @@ -109,10 +109,6 @@ impl ExecutionPlan for IcebergScanExec {
"IcebergScanExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.output_schema)
}
Expand All @@ -125,6 +121,13 @@ impl ExecutionPlan for IcebergScanExec {
vec![]
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DFResult<TreeNodeRecursion>,
) -> DFResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
15 changes: 9 additions & 6 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Parquet writer operator for writing RecordBatches to Parquet files

use std::{
any::Any,
collections::HashMap,
fmt,
fmt::{Debug, Formatter},
Expand All @@ -39,9 +38,10 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::{
common::tree_node::TreeNodeRecursion,
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_expr::{EquivalenceProperties, PhysicalExpr},
physical_plan::{
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
Expand Down Expand Up @@ -404,10 +404,6 @@ impl DisplayAs for ParquetWriterExec {

#[async_trait]
impl ExecutionPlan for ParquetWriterExec {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ParquetWriterExec"
}
Expand All @@ -428,6 +424,13 @@ impl ExecutionPlan for ParquetWriterExec {
vec![&self.input]
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
13 changes: 8 additions & 5 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::compute::{cast_with_options, take, CastOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::ffi::FFI_ArrowArray;
use arrow::ffi::FFI_ArrowSchema;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Expand All @@ -43,7 +44,6 @@ use itertools::Itertools;
use jni::objects::{Global, JObject, JValue};
use std::rc::Rc;
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -383,10 +383,6 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ScanExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand All @@ -395,6 +391,13 @@ impl ExecutionPlan for ScanExec {
vec![]
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult<TreeNodeRecursion>,
) -> DataFusionResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
Loading
Loading