diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index b276ae32cf247..7abb39e1a7130 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource { struct CustomExec { db: CustomDataSource, projected_schema: SchemaRef, - cache: PlanProperties, + cache: Arc, } impl CustomExec { @@ -207,7 +207,7 @@ impl CustomExec { Self { db, projected_schema, - cache, + cache: Arc::new(cache), } } @@ -238,7 +238,7 @@ impl ExecutionPlan for CustomExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index 48475acbb1542..6f377ea1ce3b9 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -199,7 +199,7 @@ impl ExternalBatchBufferer { struct BufferingExecutionPlan { schema: SchemaRef, input: Arc, - properties: PlanProperties, + properties: Arc, } impl BufferingExecutionPlan { @@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan { self.schema.clone() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index f3910d461b6a8..b4f3d4f098996 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec { self } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } @@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec { self } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 657432ef31362..895f2fdd4ff3a 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -618,7 +618,7 @@ pub struct SampleExec { upper_bound: f64, seed: u64, metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl SampleExec { @@ -656,7 +656,7 @@ impl SampleExec { upper_bound, seed, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 7865eb016bee1..484b5f805e547 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -549,7 +549,7 @@ fn evaluate_filters_to_mask( struct DmlResultExec { rows_affected: u64, schema: SchemaRef, - properties: PlanProperties, + properties: Arc, } impl DmlResultExec { @@ -570,7 +570,7 @@ impl DmlResultExec { Self { rows_affected, schema, - properties, + properties: Arc::new(properties), } } } @@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec { Arc::clone(&self.schema) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/core/benches/reset_plan_states.rs b/datafusion/core/benches/reset_plan_states.rs index f2f81f755b96e..5afae7f43242d 100644 --- a/datafusion/core/benches/reset_plan_states.rs +++ b/datafusion/core/benches/reset_plan_states.rs @@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc) { /// making an independent instance of the execution plan to re-execute it, avoiding /// re-planning stage. fn bench_reset_plan_states(c: &mut Criterion) { + env_logger::init(); + let rt = Runtime::new().unwrap(); let ctx = SessionContext::new(); ctx.register_table( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b1aa850284aee..22f239ce472b2 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -3713,13 +3713,15 @@ mod tests { #[derive(Debug)] struct NoOpExecutionPlan { - cache: PlanProperties, + cache: Arc, } impl NoOpExecutionPlan { fn new(schema: SchemaRef) -> Self { let cache = Self::compute_properties(schema); - Self { cache } + Self { + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -3757,7 +3759,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -3911,7 +3913,7 @@ digraph { fn children(&self) -> Vec<&Arc> { self.0.iter().collect::>() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( @@ -3960,7 +3962,7 @@ digraph { fn children(&self) -> Vec<&Arc> { unimplemented!() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( @@ -4081,7 +4083,7 @@ digraph { fn children(&self) -> Vec<&Arc> { vec![] } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 8453615c2886b..6dd09ebd8832f 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -79,7 +79,7 @@ struct CustomTableProvider; #[derive(Debug, Clone)] struct CustomExecutionPlan { projection: Option>, - cache: PlanProperties, + cache: Arc, } impl CustomExecutionPlan { @@ -88,7 +88,10 @@ impl CustomExecutionPlan { let schema = project_schema(&schema, projection.as_ref()).expect("projected schema"); let cache = Self::compute_properties(schema); - Self { projection, cache } + Self { + projection, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index ca1eaa1f958ea..7a624d0636530 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result { #[derive(Debug)] struct CustomPlan { batches: Vec, - cache: PlanProperties, + cache: Arc, } impl CustomPlan { fn new(schema: SchemaRef, batches: Vec) -> Self { let cache = Self::compute_properties(schema); - Self { batches, cache } + Self { + batches, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 820c2a470b376..6964d72eacffe 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -45,7 +45,7 @@ use async_trait::async_trait; struct StatisticsValidation { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsValidation { @@ -59,7 +59,7 @@ impl StatisticsValidation { Self { stats, schema, - cache, + cache: Arc::new(cache), } } @@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index 49e2caaa7417c..69edf9be1d825 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex}; pub struct OnceExec { /// the results to send back stream: Mutex>, - cache: PlanProperties, + cache: Arc, } impl Debug for OnceExec { @@ -46,7 +46,7 @@ impl OnceExec { let cache = Self::compute_properties(stream.schema()); Self { stream: Mutex::new(Some(stream)), - cache, + cache: Arc::new(cache), } } @@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 94ae82a9ad755..b3a3d29d070b1 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -120,7 +120,7 @@ macro_rules! assert_plan { struct SortRequiredExec { input: Arc, expr: LexOrdering, - cache: PlanProperties, + cache: Arc, } impl SortRequiredExec { @@ -132,7 +132,7 @@ impl SortRequiredExec { Self { input, expr: requirement, - cache, + cache: Arc::new(cache), } } @@ -174,7 +174,7 @@ impl ExecutionPlan for SortRequiredExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a95591baa..137ffc4702a16 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,10 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!( + swapped_join.projection.as_ref().map(|p| p.to_vec()), + Some(vec![0_usize]) + ); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) @@ -979,7 +982,7 @@ impl RecordBatchStream for UnboundedStream { pub struct UnboundedExec { batch_produce: Option, batch: RecordBatch, - cache: PlanProperties, + cache: Arc, } impl UnboundedExec { @@ -995,7 +998,7 @@ impl UnboundedExec { Self { batch_produce, batch, - cache, + cache: Arc::new(cache), } } @@ -1052,7 +1055,7 @@ impl ExecutionPlan for UnboundedExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1091,7 +1094,7 @@ pub enum SourceType { pub struct StatisticsExec { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsExec { @@ -1105,7 +1108,7 @@ impl StatisticsExec { Self { stats, schema: Arc::new(schema), - cache, + cache: Arc::new(cache), } } @@ -1153,7 +1156,7 @@ impl ExecutionPlan for StatisticsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index 524d33ae6edb6..91ae6c414e9eb 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -474,7 +474,7 @@ impl ExecutionPlan for TestNode { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.input.properties() } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index feac8190ffde4..f8c91ba272a9f 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -454,7 +454,7 @@ impl ExecutionPlan for RequirementsTestExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.input.properties() } @@ -825,7 +825,7 @@ pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr { pub struct TestScan { schema: SchemaRef, output_ordering: Vec, - plan_properties: PlanProperties, + plan_properties: Arc, // Store the requested ordering for display requested_ordering: Option, } @@ -859,7 +859,7 @@ impl TestScan { Self { schema, output_ordering, - plan_properties, + plan_properties: Arc::new(plan_properties), requested_ordering: None, } } @@ -915,7 +915,7 @@ impl ExecutionPlan for TestScan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 7ad00dece1b24..4d2a31ca1f960 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -122,20 +122,22 @@ impl TableProvider for TestInsertTableProvider { #[derive(Debug)] struct TestInsertExec { op: InsertOp, - plan_properties: PlanProperties, + plan_properties: Arc, } impl TestInsertExec { fn new(op: InsertOp) -> Self { Self { op, - plan_properties: PlanProperties::new( - EquivalenceProperties::new(make_count_schema()), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - ) - .with_scheduling_type(SchedulingType::Cooperative), + plan_properties: Arc::new( + PlanProperties::new( + EquivalenceProperties::new(make_count_schema()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + .with_scheduling_type(SchedulingType::Cooperative), + ), } } } @@ -159,7 +161,7 @@ impl ExecutionPlan for TestInsertExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index d53e076739608..c2533e73d2be9 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -653,13 +653,17 @@ struct TopKExec { input: Arc, /// The maximum number of values k: usize, - cache: PlanProperties, + cache: Arc, } impl TopKExec { fn new(input: Arc, k: usize) -> Self { let cache = Self::compute_properties(input.schema()); - Self { input, k, cache } + Self { + input, + k, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -704,7 +708,7 @@ impl ExecutionPlan for TopKExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 5acc89722b200..f149109dff5cc 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -89,7 +89,7 @@ pub struct DataSinkExec { count_schema: SchemaRef, /// Optional required sort order for output data. sort_order: Option, - cache: PlanProperties, + cache: Arc, } impl Debug for DataSinkExec { @@ -117,7 +117,7 @@ impl DataSinkExec { sink, count_schema: make_count_schema(), sort_order, - cache, + cache: Arc::new(cache), } } @@ -174,7 +174,7 @@ impl ExecutionPlan for DataSinkExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index de18b6be2235f..3cba24ce8f420 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -74,8 +74,8 @@ use datafusion_physical_plan::filter_pushdown::{ /// ```text /// ┌─────────────────────┐ -----► execute path /// │ │ ┄┄┄┄┄► init path -/// │ DataSourceExec │ -/// │ │ +/// │ DataSourceExec │ +/// │ │ /// └───────▲─────────────┘ /// ┊ │ /// ┊ │ @@ -234,7 +234,7 @@ pub struct DataSourceExec { /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig` data_source: Arc, /// Cached plan properties such as sort order - cache: PlanProperties, + cache: Arc, } impl DisplayAs for DataSourceExec { @@ -258,7 +258,7 @@ impl ExecutionPlan for DataSourceExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -328,7 +328,7 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; - let cache = self.cache.clone(); + let cache = Arc::clone(&self.cache); Some(Arc::new(Self { data_source, cache })) } @@ -372,7 +372,8 @@ impl ExecutionPlan for DataSourceExec { let mut new_node = self.clone(); new_node.data_source = data_source; // Re-compute properties since we have new filters which will impact equivalence info - new_node.cache = Self::compute_properties(&new_node.data_source); + new_node.cache = + Arc::new(Self::compute_properties(&new_node.data_source)); Ok(FilterPushdownPropagation { filters: res.filters, @@ -420,7 +421,10 @@ impl DataSourceExec { // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`. pub fn new(data_source: Arc) -> Self { let cache = Self::compute_properties(&data_source); - Self { data_source, cache } + Self { + data_source, + cache: Arc::new(cache), + } } /// Return the source object @@ -429,20 +433,20 @@ impl DataSourceExec { } pub fn with_data_source(mut self, data_source: Arc) -> Self { - self.cache = Self::compute_properties(&data_source); + self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; self } /// Assign constraints pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.cache = self.cache.with_constraints(constraints); + Arc::make_mut(&mut self.cache).set_constraints(constraints); self } /// Assign output partitioning pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self { - self.cache = self.cache.with_partitioning(partitioning); + Arc::make_mut(&mut self.cache).partitioning = partitioning; self } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index c879b022067c3..ec8f7538fee1a 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -90,7 +90,7 @@ impl FFI_ExecutionPlan { unsafe extern "C" fn properties_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> FFI_PlanProperties { - plan.inner().properties().into() + plan.inner().properties().as_ref().into() } unsafe extern "C" fn children_fn_wrapper( @@ -192,7 +192,7 @@ impl Drop for FFI_ExecutionPlan { pub struct ForeignExecutionPlan { name: String, plan: FFI_ExecutionPlan, - properties: PlanProperties, + properties: Arc, children: Vec>, } @@ -244,7 +244,7 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc { let plan = ForeignExecutionPlan { name, plan: plan.clone(), - properties, + properties: Arc::new(properties), children, }; @@ -262,7 +262,7 @@ impl ExecutionPlan for ForeignExecutionPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } @@ -278,7 +278,7 @@ impl ExecutionPlan for ForeignExecutionPlan { plan: self.plan.clone(), name: self.name.clone(), children, - properties: self.properties.clone(), + properties: Arc::clone(&self.properties), })) } @@ -305,19 +305,19 @@ pub(crate) mod tests { #[derive(Debug)] pub struct EmptyExec { - props: PlanProperties, + props: Arc, children: Vec>, } impl EmptyExec { pub fn new(schema: arrow::datatypes::SchemaRef) -> Self { Self { - props: PlanProperties::new( + props: Arc::new(PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), EmissionType::Incremental, Boundedness::Bounded, - ), + )), children: Vec::default(), } } @@ -342,7 +342,7 @@ pub(crate) mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.props } @@ -355,7 +355,7 @@ pub(crate) mod tests { children: Vec>, ) -> Result> { Ok(Arc::new(EmptyExec { - props: self.props.clone(), + props: Arc::clone(&self.props), children, })) } diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 6149736c58555..8370cf19e6589 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -162,7 +162,7 @@ impl Drop for AsyncTableProvider { #[derive(Debug)] struct AsyncTestExecutionPlan { - properties: datafusion_physical_plan::PlanProperties, + properties: Arc, batch_request: mpsc::Sender, batch_receiver: broadcast::Receiver>, } @@ -173,12 +173,12 @@ impl AsyncTestExecutionPlan { batch_receiver: broadcast::Receiver>, ) -> Self { Self { - properties: datafusion_physical_plan::PlanProperties::new( + properties: Arc::new(datafusion_physical_plan::PlanProperties::new( EquivalenceProperties::new(super::create_test_schema()), Partitioning::UnknownPartitioning(3), datafusion_physical_plan::execution_plan::EmissionType::Incremental, datafusion_physical_plan::execution_plan::Boundedness::Bounded, - ), + )), batch_request, batch_receiver, } @@ -194,7 +194,7 @@ impl ExecutionPlan for AsyncTestExecutionPlan { self } - fn properties(&self) -> &datafusion_physical_plan::PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 996bc4b08fcd2..a98341b10765a 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -207,8 +207,13 @@ impl EquivalenceProperties { } /// Adds constraints to the properties. - pub fn with_constraints(mut self, constraints: Constraints) -> Self { + pub fn set_constraints(&mut self, constraints: Constraints) { self.constraints = constraints; + } + + /// Adds constraints to the properties. + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.set_constraints(constraints); self } diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index 5d00d00bce21d..ef8946f9a49d1 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -281,7 +281,7 @@ mod tests { input: Arc, scheduling_type: SchedulingType, evaluation_type: EvaluationType, - properties: PlanProperties, + properties: Arc, } impl DummyExec { @@ -305,7 +305,7 @@ mod tests { input, scheduling_type, evaluation_type, - properties, + properties: Arc::new(properties), } } } @@ -327,7 +327,7 @@ mod tests { fn as_any(&self) -> &dyn Any { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } fn children(&self) -> Vec<&Arc> { diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 0dc6a25fbc0b7..9c4169ec654f8 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -98,7 +98,7 @@ pub struct OutputRequirementExec { input: Arc, order_requirement: Option, dist_requirement: Distribution, - cache: PlanProperties, + cache: Arc, fetch: Option, } @@ -114,7 +114,7 @@ impl OutputRequirementExec { input, order_requirement: requirements, dist_requirement, - cache, + cache: Arc::new(cache), fetch, } } @@ -200,7 +200,7 @@ impl ExecutionPlan for OutputRequirementExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index aa0f5a236cac7..3d97380d46baa 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -25,7 +25,9 @@ use crate::aggregates::{ no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, topk_stream::GroupedTopKAggregateStream, }; -use crate::execution_plan::{CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + CardinalityEffect, EmissionType, has_same_children_properties, +}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, @@ -33,7 +35,7 @@ use crate::filter_pushdown::{ use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, - SendableRecordBatchStream, Statistics, + SendableRecordBatchStream, Statistics, check_if_same_properties, }; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::utils::collect_columns; @@ -648,7 +650,7 @@ pub struct AggregateExec { required_input_ordering: Option, /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, - cache: PlanProperties, + cache: Arc, /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. /// @@ -672,7 +674,7 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), mode: self.mode, group_by: self.group_by.clone(), filter_expr: self.filter_expr.clone(), @@ -692,7 +694,7 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), mode: self.mode, group_by: self.group_by.clone(), aggr_expr: self.aggr_expr.clone(), @@ -829,7 +831,7 @@ impl AggregateExec { required_input_ordering, limit_options: None, input_order_mode, - cache, + cache: Arc::new(cache), dynamic_filter: None, }; @@ -1187,6 +1189,17 @@ impl AggregateExec { _ => Precision::Absent, } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for AggregateExec { @@ -1325,7 +1338,7 @@ impl ExecutionPlan for AggregateExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1366,14 +1379,16 @@ impl ExecutionPlan for AggregateExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); + let mut me = AggregateExec::try_new_with_schema( self.mode, self.group_by.clone(), self.aggr_expr.clone(), self.filter_expr.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), Arc::clone(&self.input_schema), Arc::clone(&self.schema), )?; @@ -2403,14 +2418,17 @@ mod tests { struct TestYieldingExec { /// True if this exec should yield back to runtime the first time it is polled pub yield_first: bool, - cache: PlanProperties, + cache: Arc, } impl TestYieldingExec { fn new(yield_first: bool) -> Self { let schema = some_data().0; let cache = Self::compute_properties(schema); - Self { yield_first, cache } + Self { + yield_first, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -2451,7 +2469,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 1fb8f93a38782..eca31ea0e194f 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -51,7 +51,7 @@ pub struct AnalyzeExec { pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node schema: SchemaRef, - cache: PlanProperties, + cache: Arc, } impl AnalyzeExec { @@ -70,7 +70,7 @@ impl AnalyzeExec { metric_types, input, schema, - cache, + cache: Arc::new(cache), } } @@ -131,7 +131,7 @@ impl ExecutionPlan for AnalyzeExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index a61fd95949d1a..93701a25b3bf4 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -16,10 +16,12 @@ // under the License. use crate::coalesce::LimitedBatchCoalescer; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::stream::RecordBatchStreamAdapter; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + check_if_same_properties, }; use arrow::array::RecordBatch; use arrow_schema::{Fields, Schema, SchemaRef}; @@ -45,12 +47,12 @@ use std::task::{Context, Poll, ready}; /// /// The schema of the output of the AsyncFuncExec is: /// Input columns followed by one column for each async expression -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AsyncFuncExec { /// The async expressions to evaluate async_exprs: Vec>, input: Arc, - cache: PlanProperties, + cache: Arc, metrics: ExecutionPlanMetricsSet, } @@ -84,7 +86,7 @@ impl AsyncFuncExec { Ok(Self { input, async_exprs, - cache, + cache: Arc::new(cache), metrics: ExecutionPlanMetricsSet::new(), }) } @@ -113,6 +115,17 @@ impl AsyncFuncExec { pub fn input(&self) -> &Arc { &self.input } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for AsyncFuncExec { @@ -149,7 +162,7 @@ impl ExecutionPlan for AsyncFuncExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -159,16 +172,17 @@ impl ExecutionPlan for AsyncFuncExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { assert_eq_or_internal_err!( children.len(), 1, "AsyncFuncExec wrong number of children" ); + check_if_same_properties!(self, children); Ok(Arc::new(AsyncFuncExec::try_new( self.async_exprs.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), )?)) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index dfcd3cb0bcae7..27a521ec6bc5d 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -27,6 +27,7 @@ use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::projection::ProjectionExec; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, + check_if_same_properties, }; use arrow::datatypes::SchemaRef; @@ -36,7 +37,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -71,7 +72,7 @@ pub struct CoalesceBatchesExec { fetch: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } #[expect(deprecated)] @@ -84,7 +85,7 @@ impl CoalesceBatchesExec { target_batch_size, fetch: None, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -115,6 +116,17 @@ impl CoalesceBatchesExec { input.boundedness(), ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } #[expect(deprecated)] @@ -159,7 +171,7 @@ impl ExecutionPlan for CoalesceBatchesExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -177,10 +189,11 @@ impl ExecutionPlan for CoalesceBatchesExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new( - CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size) + CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size) .with_fetch(self.fetch), )) } @@ -222,7 +235,7 @@ impl ExecutionPlan for CoalesceBatchesExec { target_batch_size: self.target_batch_size, fetch: limit, metrics: self.metrics.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 22dcc85d6ea3a..a7a2ce8a18b5c 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -27,11 +27,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; +use crate::execution_plan::{ + CardinalityEffect, EvaluationType, SchedulingType, has_same_children_properties, +}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; use crate::sort_pushdown::SortOrderPushdownResult; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; @@ -47,7 +49,7 @@ pub struct CoalescePartitionsExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, /// Optional number of rows to fetch. Stops producing rows after this fetch pub(crate) fetch: Option, } @@ -59,7 +61,7 @@ impl CoalescePartitionsExec { CoalescePartitionsExec { input, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), fetch: None, } } @@ -100,6 +102,17 @@ impl CoalescePartitionsExec { .with_evaluation_type(drive) .with_scheduling_type(scheduling) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for CoalescePartitionsExec { @@ -135,7 +148,7 @@ impl ExecutionPlan for CoalescePartitionsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -149,9 +162,10 @@ impl ExecutionPlan for CoalescePartitionsExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { - let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0])); + check_if_same_properties!(self, children); + let mut plan = CoalescePartitionsExec::new(children.swap_remove(0)); plan.fetch = self.fetch; Ok(Arc::new(plan)) } @@ -274,7 +288,7 @@ impl ExecutionPlan for CoalescePartitionsExec { input: Arc::clone(&self.input), fetch: limit, metrics: self.metrics.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index ce54a451ac4d1..acc79ee009690 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -87,14 +87,14 @@ use crate::filter_pushdown::{ use crate::projection::ProjectionExec; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, SortOrderPushdownResult, + SendableRecordBatchStream, SortOrderPushdownResult, check_if_same_properties, }; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use datafusion_common::{Result, Statistics, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use crate::execution_plan::SchedulingType; +use crate::execution_plan::{SchedulingType, has_same_children_properties}; use crate::stream::RecordBatchStreamAdapter; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::{Stream, StreamExt}; @@ -217,16 +217,15 @@ where #[derive(Debug, Clone)] pub struct CooperativeExec { input: Arc, - properties: PlanProperties, + properties: Arc, } impl CooperativeExec { /// Creates a new `CooperativeExec` operator that wraps the given input execution plan. pub fn new(input: Arc) -> Self { - let properties = input - .properties() - .clone() - .with_scheduling_type(SchedulingType::Cooperative); + let properties = PlanProperties::clone(input.properties()) + .with_scheduling_type(SchedulingType::Cooperative) + .into(); Self { input, properties } } @@ -235,6 +234,16 @@ impl CooperativeExec { pub fn input(&self) -> &Arc { &self.input } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for CooperativeExec { @@ -260,7 +269,7 @@ impl ExecutionPlan for CooperativeExec { self.input.schema() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } @@ -281,6 +290,7 @@ impl ExecutionPlan for CooperativeExec { 1, "CooperativeExec requires exactly one child" ); + check_if_same_properties!(self, children); Ok(Arc::new(CooperativeExec::new(children.swap_remove(0)))) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 52c37a106b39e..d525f44541d5f 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1153,7 +1153,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index fcfbcfa3e8277..d3e62f91c0570 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -43,7 +43,7 @@ pub struct EmptyExec { schema: SchemaRef, /// Number of partitions partitions: usize, - cache: PlanProperties, + cache: Arc, } impl EmptyExec { @@ -53,7 +53,7 @@ impl EmptyExec { EmptyExec { schema, partitions: 1, - cache, + cache: Arc::new(cache), } } @@ -62,7 +62,7 @@ impl EmptyExec { self.partitions = partitions; // Changing partitions may invalidate output partitioning, so update it: let output_partitioning = Self::output_partitioning_helper(self.partitions); - self.cache = self.cache.with_partitioning(output_partitioning); + Arc::make_mut(&mut self.cache).partitioning = output_partitioning; self } @@ -114,7 +114,7 @@ impl ExecutionPlan for EmptyExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 52f4829127651..d36d5a8ab6889 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -128,7 +128,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// This information is available via methods on [`ExecutionPlanProperties`] /// trait, which is implemented for all `ExecutionPlan`s. - fn properties(&self) -> &PlanProperties; + fn properties(&self) -> &Arc; /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. @@ -1060,12 +1060,17 @@ impl PlanProperties { self } - /// Overwrite equivalence properties with its new value. - pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { + /// Set equivalence properties having mut reference. + pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) { // Changing equivalence properties also changes output ordering, so // make sure to overwrite it: self.output_ordering = eq_properties.output_ordering(); self.eq_properties = eq_properties; + } + + /// Overwrite equivalence properties with its new value. + pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { + self.set_eq_properties(eq_properties); self } @@ -1097,9 +1102,14 @@ impl PlanProperties { self } + /// Set constraints having mut reference. + pub fn set_constraints(&mut self, constraints: Constraints) { + self.eq_properties.set_constraints(constraints); + } + /// Overwrite constraints with its new value. pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.eq_properties = self.eq_properties.with_constraints(constraints); + self.set_constraints(constraints); self } @@ -1422,6 +1432,41 @@ pub fn reset_plan_states(plan: Arc) -> Result, + children: &[Arc], +) -> Result { + let old_children = plan.children(); + assert_eq_or_internal_err!( + children.len(), + old_children.len(), + "Wrong number of children" + ); + for (lhs, rhs) in plan.children().iter().zip(children.iter()) { + if !Arc::ptr_eq(lhs.properties(), rhs.properties()) { + return Ok(false); + } + } + Ok(true) +} + +/// Helper macro to avoid properties re-computation if passed children properties +/// the same as plan already has. Could be used to implement fast-path for method +/// [`ExecutionPlan::with_new_children`]. +#[macro_export] +macro_rules! check_if_same_properties { + ($plan: expr, $children: expr) => { + if has_same_children_properties(&$plan, &$children)? { + let plan = $plan.with_new_children_and_same_properties($children); + return Ok(Arc::new(plan)); + } + }; +} + /// Utility function yielding a string representation of the given [`ExecutionPlan`]. pub fn get_plan_string(plan: &Arc) -> Vec { let formatted = displayable(plan.as_ref()).indent(true).to_string(); @@ -1484,7 +1529,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } @@ -1551,7 +1596,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index aa3c0afefe8b5..bf21b0484689a 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -44,7 +44,7 @@ pub struct ExplainExec { stringified_plans: Vec, /// control which plans to print verbose: bool, - cache: PlanProperties, + cache: Arc, } impl ExplainExec { @@ -59,7 +59,7 @@ impl ExplainExec { schema, stringified_plans, verbose, - cache, + cache: Arc::new(cache), } } @@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 50fae84b85d0d..77c5fe4f55c1c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -26,9 +26,10 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::check_if_same_properties; use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::common::can_project; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, PushedDownPredicate, @@ -83,7 +84,7 @@ pub struct FilterExec { /// Selectivity for statistics. 0 = no rows, 100 = all rows default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. - cache: PlanProperties, + cache: Arc, /// The projection indices of the columns in the output schema of join projection: Option>, /// Target batch size for output batches @@ -206,7 +207,7 @@ impl FilterExecBuilder { input: self.input, metrics: ExecutionPlanMetricsSet::new(), default_selectivity: self.default_selectivity, - cache, + cache: Arc::new(cache), projection: self.projection, batch_size: self.batch_size, fetch: self.fetch, @@ -279,7 +280,7 @@ impl FilterExec { input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), projection: self.projection.clone(), batch_size, fetch: self.fetch, @@ -432,6 +433,17 @@ impl FilterExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for FilterExec { @@ -486,7 +498,7 @@ impl ExecutionPlan for FilterExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -503,6 +515,7 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let new_input = children.swap_remove(0); FilterExecBuilder::from(&*self) .with_input(new_input) @@ -680,12 +693,12 @@ impl ExecutionPlan for FilterExec { input: Arc::clone(&filter_input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: Self::compute_properties( + cache: Arc::new(Self::compute_properties( &filter_input, &new_predicate, self.default_selectivity, self.projection.as_ref(), - )?, + )?), projection: None, batch_size: self.batch_size, fetch: self.fetch, @@ -705,7 +718,7 @@ impl ExecutionPlan for FilterExec { input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), projection: self.projection.clone(), batch_size: self.batch_size, fetch, diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 7ada14be66543..c25408e49db64 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -25,7 +25,9 @@ use super::utils::{ OnceAsync, OnceFut, StatefulStreamResult, adjust_right_output_partitioning, reorder_output_after_swap, }; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, @@ -34,7 +36,7 @@ use crate::projection::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, handle_state, + SendableRecordBatchStream, Statistics, check_if_same_properties, handle_state, }; use arrow::array::{RecordBatch, RecordBatchOptions}; @@ -94,7 +96,7 @@ pub struct CrossJoinExec { /// Execution plan metrics metrics: ExecutionPlanMetricsSet, /// Properties such as schema, equivalence properties, ordering, partitioning, etc. - cache: PlanProperties, + cache: Arc, } impl CrossJoinExec { @@ -125,7 +127,7 @@ impl CrossJoinExec { schema, left_fut: Default::default(), metrics: ExecutionPlanMetricsSet::default(), - cache, + cache: Arc::new(cache), } } @@ -192,6 +194,23 @@ impl CrossJoinExec { &self.right.schema(), ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + left_fut: Default::default(), + cache: Arc::clone(&self.cache), + schema: Arc::clone(&self.schema), + } + } } /// Asynchronously collect the result of the left child @@ -256,7 +275,7 @@ impl ExecutionPlan for CrossJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -272,6 +291,7 @@ impl ExecutionPlan for CrossJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(CrossJoinExec::new( Arc::clone(&children[0]), Arc::clone(&children[1]), @@ -285,7 +305,7 @@ impl ExecutionPlan for CrossJoinExec { schema: Arc::clone(&self.schema), left_fut: Default::default(), // reset the build side! metrics: ExecutionPlanMetricsSet::default(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), }; Ok(Arc::new(new_exec)) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c249dfb10aacf..84ce138c8db8b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -22,7 +22,9 @@ use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; use crate::ExecutionPlanProperties; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -474,7 +476,7 @@ pub struct HashJoinExec { /// Flag to indicate if this is a null-aware anti join pub null_aware: bool, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Dynamic filter for pushing down to the probe side /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. @@ -596,7 +598,7 @@ impl HashJoinExec { column_indices, null_equality, null_aware, - cache, + cache: Arc::new(cache), dynamic_filter: None, }) } @@ -937,7 +939,7 @@ impl ExecutionPlan for HashJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -998,6 +1000,20 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { + let cache = if has_same_children_properties(&self, &children)? { + Arc::clone(&self.cache) + } else { + Arc::new(Self::compute_properties( + &children[0], + &children[1], + &self.join_schema, + self.join_type, + &self.on, + self.mode, + self.projection.as_ref(), + )?) + }; + Ok(Arc::new(HashJoinExec { left: Arc::clone(&children[0]), right: Arc::clone(&children[1]), @@ -1013,15 +1029,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: Self::compute_properties( - &children[0], - &children[1], - &self.join_schema, - self.join_type, - &self.on, - self.mode, - self.projection.as_ref(), - )?, + cache, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), })) @@ -1044,7 +1052,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, })) @@ -1373,7 +1381,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), dynamic_filter: Some(HashJoinExecDynamicFilter { filter: dynamic_filter, build_accumulator: OnceLock::new(), diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b57f9132253bf..303410a963d72 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -29,7 +29,9 @@ use super::utils::{ reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::joins::SharedBitmapBuilder; use crate::joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, @@ -46,6 +48,7 @@ use crate::projection::{ use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + check_if_same_properties, }; use arrow::array::{ @@ -197,7 +200,7 @@ pub struct NestedLoopJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl NestedLoopJoinExec { @@ -233,7 +236,7 @@ impl NestedLoopJoinExec { column_indices, projection, metrics: Default::default(), - cache, + cache: Arc::new(cache), }) } @@ -399,6 +402,27 @@ impl NestedLoopJoinExec { Ok(plan) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + build_side_data: Default::default(), + cache: Arc::clone(&self.cache), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + column_indices: self.column_indices.clone(), + projection: self.projection.clone(), + } + } } impl DisplayAs for NestedLoopJoinExec { @@ -453,7 +477,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -476,6 +500,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(NestedLoopJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index d7ece845e943c..014d4cb13e2b7 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -41,7 +41,9 @@ use std::fmt::Formatter; use std::sync::Arc; use std::sync::atomic::AtomicUsize; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::joins::piecewise_merge_join::classic_join::{ ClassicPWMJStream, PiecewiseMergeJoinStreamState, @@ -51,7 +53,9 @@ use crate::joins::piecewise_merge_join::utils::{ }; use crate::joins::utils::asymmetric_join_output_partitioning; use crate::metrics::MetricsSet; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties, +}; use crate::{ ExecutionPlan, PlanProperties, joins::{ @@ -86,7 +90,7 @@ use crate::{ /// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures /// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining /// probe rows from the match position onward, without rescanning earlier probe rows. -/// +/// /// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators /// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance /// monotonically as we stream new batches from the stream side. @@ -129,34 +133,34 @@ use crate::{ /// /// Processing Row 1: /// -/// Sorted Buffered Side Sorted Streamed Side -/// ┌──────────────────┐ ┌──────────────────┐ -/// 1 │ 100 │ 1 │ 100 │ -/// ├──────────────────┤ ├──────────────────┤ -/// 2 │ 200 │ ─┐ 2 │ 200 │ -/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤ -/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │ +/// Sorted Buffered Side Sorted Streamed Side +/// ┌──────────────────┐ ┌──────────────────┐ +/// 1 │ 100 │ 1 │ 100 │ +/// ├──────────────────┤ ├──────────────────┤ +/// 2 │ 200 │ ─┐ 2 │ 200 │ +/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤ +/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │ /// ├──────────────────┤ │ as matches when the operator is └──────────────────┘ /// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all /// ├──────────────────┤ │ rows after the first match (row /// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200) -/// └──────────────────┘ +/// └──────────────────┘ /// /// Processing Row 2: /// By sorting the streamed side we know /// -/// Sorted Buffered Side Sorted Streamed Side -/// ┌──────────────────┐ ┌──────────────────┐ -/// 1 │ 100 │ 1 │ 100 │ -/// ├──────────────────┤ ├──────────────────┤ -/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │ -/// ├──────────────────┤ streamed side row 2. ├──────────────────┤ -/// 3 │ 200 │ 3 │ 500 │ +/// Sorted Buffered Side Sorted Streamed Side +/// ┌──────────────────┐ ┌──────────────────┐ +/// 1 │ 100 │ 1 │ 100 │ +/// ├──────────────────┤ ├──────────────────┤ +/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │ +/// ├──────────────────┤ streamed side row 2. ├──────────────────┤ +/// 3 │ 200 │ 3 │ 500 │ /// ├──────────────────┤ └──────────────────┘ -/// 4 │ 300 │ -/// ├──────────────────┤ +/// 4 │ 300 │ +/// ├──────────────────┤ /// 5 │ 400 │ -/// └──────────────────┘ +/// └──────────────────┘ /// ``` /// /// ## Existence Joins (Semi, Anti, Mark) @@ -202,10 +206,10 @@ use crate::{ /// 1 │ 100 │ 1 │ 500 │ /// ├──────────────────┤ ├──────────────────┤ /// 2 │ 200 │ 2 │ 200 │ -/// ├──────────────────┤ ├──────────────────┤ +/// ├──────────────────┤ ├──────────────────┤ /// 3 │ 200 │ 3 │ 300 │ /// ├──────────────────┤ └──────────────────┘ -/// 4 │ 300 │ ─┐ +/// 4 │ 300 │ ─┐ /// ├──────────────────┤ | We emit matches for row 4 - 5 /// 5 │ 400 │ ─┘ on the buffered side. /// └──────────────────┘ @@ -236,11 +240,11 @@ use crate::{ /// /// # Mark Join: /// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only -/// within that range. +/// within that range. /// Complexity: `O(|S| + scan(R[range]))`. /// /// ## Nested Loop Join -/// Compares every row from `S` with every row from `R`. +/// Compares every row from `S` with every row from `R`. /// Complexity: `O(|S| * |R|)`. /// /// ## Nested Loop Join @@ -273,13 +277,12 @@ pub struct PiecewiseMergeJoinExec { left_child_plan_required_order: LexOrdering, /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations /// Unsorted for mark joins - #[expect(dead_code)] right_batch_required_orders: LexOrdering, /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans. sort_options: SortOptions, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Number of partitions to process num_partitions: usize, } @@ -373,7 +376,7 @@ impl PiecewiseMergeJoinExec { left_child_plan_required_order, right_batch_required_orders, sort_options, - cache, + cache: Arc::new(cache), num_partitions, }) } @@ -466,6 +469,31 @@ impl PiecewiseMergeJoinExec { pub fn swap_inputs(&self) -> Result> { todo!() } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let buffered = children.swap_remove(0); + let streamed = children.swap_remove(0); + Self { + buffered, + streamed, + on: self.on.clone(), + operator: self.operator, + join_type: self.join_type, + schema: Arc::clone(&self.schema), + left_child_plan_required_order: self.left_child_plan_required_order.clone(), + right_batch_required_orders: self.right_batch_required_orders.clone(), + sort_options: self.sort_options, + cache: Arc::clone(&self.cache), + num_partitions: self.num_partitions, + + // Re-set state. + metrics: ExecutionPlanMetricsSet::new(), + buffered_fut: Default::default(), + } + } } impl ExecutionPlan for PiecewiseMergeJoinExec { @@ -477,7 +505,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -511,6 +539,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match &children[..] { [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new( Arc::clone(left), @@ -527,6 +556,13 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { } } + fn reset_state(self: Arc) -> Result> { + Ok(Arc::new(self.with_new_children_and_same_properties(vec![ + Arc::clone(&self.buffered), + Arc::clone(&self.streamed), + ]))) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index ae7a5fa764bcc..be4c83fc2d35c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -23,7 +23,9 @@ use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::expressions::PhysicalSortExpr; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; use crate::joins::sort_merge_join::stream::SortMergeJoinStream; @@ -39,7 +41,7 @@ use crate::projection::{ }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, SendableRecordBatchStream, Statistics, + PlanProperties, SendableRecordBatchStream, Statistics, check_if_same_properties, }; use arrow::compute::SortOptions; @@ -127,7 +129,7 @@ pub struct SortMergeJoinExec { /// Defines the null equality for the join. pub null_equality: NullEquality, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl SortMergeJoinExec { @@ -198,7 +200,7 @@ impl SortMergeJoinExec { right_sort_exprs, sort_options, null_equality, - cache, + cache: Arc::new(cache), }) } @@ -340,6 +342,20 @@ impl SortMergeJoinExec { reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema()) } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for SortMergeJoinExec { @@ -405,7 +421,7 @@ impl ExecutionPlan for SortMergeJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -440,6 +456,7 @@ impl ExecutionPlan for SortMergeJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match &children[..] { [left, right] => Ok(Arc::new(SortMergeJoinExec::try_new( Arc::clone(left), diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..bfb8df1df8095 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -32,8 +32,11 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; +use crate::check_if_same_properties; use crate::common::SharedMemoryReservation; -use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; +use crate::execution_plan::{ + boundedness_from_children, emission_type_from_children, has_same_children_properties, +}; use crate::joins::stream_join_utils::{ PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics, calculate_filter_expr_intervals, combine_two_batches, @@ -197,7 +200,7 @@ pub struct SymmetricHashJoinExec { /// Partition Mode mode: StreamJoinPartitionMode, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl SymmetricHashJoinExec { @@ -253,7 +256,7 @@ impl SymmetricHashJoinExec { left_sort_exprs, right_sort_exprs, mode, - cache, + cache: Arc::new(cache), }) } @@ -360,6 +363,20 @@ impl SymmetricHashJoinExec { } Ok(false) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for SymmetricHashJoinExec { @@ -411,7 +428,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -453,6 +470,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(SymmetricHashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index fea7acb221304..e1bd606fa25ba 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -27,8 +27,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{Boundedness, CardinalityEffect}; -use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, has_same_children_properties, +}; +use crate::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + check_if_same_properties, +}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -51,10 +56,10 @@ pub struct GlobalLimitExec { fetch: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, /// Does the limit have to preserve the order of its input, and if so what is it? /// Some optimizations may reorder the input if no particular sort is required required_ordering: Option, + cache: Arc, } impl GlobalLimitExec { @@ -66,8 +71,8 @@ impl GlobalLimitExec { skip, fetch, metrics: ExecutionPlanMetricsSet::new(), - cache, required_ordering: None, + cache: Arc::new(cache), } } @@ -106,6 +111,17 @@ impl GlobalLimitExec { pub fn set_required_ordering(&mut self, required_ordering: Option) { self.required_ordering = required_ordering; } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for GlobalLimitExec { @@ -144,7 +160,7 @@ impl ExecutionPlan for GlobalLimitExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -166,10 +182,11 @@ impl ExecutionPlan for GlobalLimitExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(GlobalLimitExec::new( - Arc::clone(&children[0]), + children.swap_remove(0), self.skip, self.fetch, ))) @@ -229,7 +246,7 @@ impl ExecutionPlan for GlobalLimitExec { } /// LocalLimitExec applies a limit to a single partition -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct LocalLimitExec { /// Input execution plan input: Arc, @@ -237,10 +254,10 @@ pub struct LocalLimitExec { fetch: usize, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, /// If the child plan is a sort node, after the sort node is removed during /// physical optimization, we should add the required ordering to the limit node required_ordering: Option, + cache: Arc, } impl LocalLimitExec { @@ -251,8 +268,8 @@ impl LocalLimitExec { input, fetch, metrics: ExecutionPlanMetricsSet::new(), - cache, required_ordering: None, + cache: Arc::new(cache), } } @@ -286,6 +303,17 @@ impl LocalLimitExec { pub fn set_required_ordering(&mut self, required_ordering: Option) { self.required_ordering = required_ordering; } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for LocalLimitExec { @@ -315,7 +343,7 @@ impl ExecutionPlan for LocalLimitExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -335,6 +363,7 @@ impl ExecutionPlan for LocalLimitExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match children.len() { 1 => Ok(Arc::new(LocalLimitExec::new( Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 4a406ca648d57..5c684bec98f9b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -161,7 +161,7 @@ pub struct LazyMemoryExec { /// Functions to generate batches for each partition batch_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode - cache: PlanProperties, + cache: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -200,7 +200,8 @@ impl LazyMemoryExec { EmissionType::Incremental, boundedness, ) - .with_scheduling_type(SchedulingType::Cooperative); + .with_scheduling_type(SchedulingType::Cooperative) + .into(); Ok(Self { schema, @@ -215,9 +216,9 @@ impl LazyMemoryExec { match projection.as_ref() { Some(columns) => { let projected = Arc::new(self.schema.project(columns).unwrap()); - self.cache = self.cache.with_eq_properties(EquivalenceProperties::new( - Arc::clone(&projected), - )); + Arc::make_mut(&mut self.cache).set_eq_properties( + EquivalenceProperties::new(Arc::clone(&projected)), + ); self.schema = projected; self.projection = projection; self @@ -236,12 +237,12 @@ impl LazyMemoryExec { partition_count, generator_count ); - self.cache.partitioning = partitioning; + Arc::make_mut(&mut self.cache).partitioning = partitioning; Ok(()) } pub fn add_ordering(&mut self, ordering: impl IntoIterator) { - self.cache + Arc::make_mut(&mut self.cache) .eq_properties .add_orderings(std::iter::once(ordering)); } @@ -306,7 +307,7 @@ impl ExecutionPlan for LazyMemoryExec { Arc::clone(&self.schema) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -365,7 +366,7 @@ impl ExecutionPlan for LazyMemoryExec { Ok(Arc::new(LazyMemoryExec { schema: Arc::clone(&self.schema), batch_generators: generators, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), })) diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 4d00b73cff39c..f95b10771c02f 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -43,7 +43,7 @@ pub struct PlaceholderRowExec { schema: SchemaRef, /// Number of partitions partitions: usize, - cache: PlanProperties, + cache: Arc, } impl PlaceholderRowExec { @@ -54,7 +54,7 @@ impl PlaceholderRowExec { PlaceholderRowExec { schema, partitions, - cache, + cache: Arc::new(cache), } } @@ -63,7 +63,7 @@ impl PlaceholderRowExec { self.partitions = partitions; // Update output partitioning when updating partitions: let output_partitioning = Self::output_partitioning_helper(self.partitions); - self.cache = self.cache.with_partitioning(output_partitioning); + Arc::make_mut(&mut self.cache).partitioning = output_partitioning; self } @@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8d4c775f87348..6398ac8aa9e67 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -27,13 +27,13 @@ use super::{ SendableRecordBatchStream, SortOrderPushdownResult, Statistics, }; use crate::column_rewriter::PhysicalColumnRewriter; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterColumnChecker, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; -use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr}; +use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties}; use std::any::Any; use std::collections::HashMap; use std::pin::Pin; @@ -78,7 +78,7 @@ pub struct ProjectionExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl ProjectionExec { @@ -153,7 +153,7 @@ impl ProjectionExec { projector, input, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -216,6 +216,17 @@ impl ProjectionExec { } Ok(alias_map) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for ProjectionExec { @@ -269,7 +280,7 @@ impl ExecutionPlan for ProjectionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -301,6 +312,7 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); ProjectionExec::try_new( self.projector.projection().clone(), children.swap_remove(0), diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 936a02581e89c..f8847cbacefb5 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -74,7 +74,7 @@ pub struct RecursiveQueryExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl RecursiveQueryExec { @@ -97,7 +97,7 @@ impl RecursiveQueryExec { is_distinct, work_table, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -143,7 +143,7 @@ impl ExecutionPlan for RecursiveQueryExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 612c7bb27ddf4..2e02cd3210786 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -31,7 +31,9 @@ use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::coalesce::LimitedBatchCoalescer; -use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; +use crate::execution_plan::{ + CardinalityEffect, EvaluationType, SchedulingType, has_same_children_properties, +}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr}; @@ -39,7 +41,10 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; use crate::stream::RecordBatchStreamAdapter; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; +use crate::{ + DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, + check_if_same_properties, +}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; @@ -763,7 +768,7 @@ pub struct RepartitionExec { /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. preserve_order: bool, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } #[derive(Debug, Clone)] @@ -832,6 +837,18 @@ impl RepartitionExec { pub fn name(&self) -> &str { "RepartitionExec" } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + state: Default::default(), + ..Self::clone(self) + } + } } impl DisplayAs for RepartitionExec { @@ -891,7 +908,7 @@ impl ExecutionPlan for RepartitionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -903,6 +920,7 @@ impl ExecutionPlan for RepartitionExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let mut repartition = RepartitionExec::try_new( children.swap_remove(0), self.partitioning().clone(), @@ -1204,7 +1222,7 @@ impl ExecutionPlan for RepartitionExec { _config: &ConfigOptions, ) -> Result>> { use Partitioning::*; - let mut new_properties = self.cache.clone(); + let mut new_properties = PlanProperties::clone(&self.cache); new_properties.partitioning = match new_properties.partitioning { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), @@ -1215,7 +1233,7 @@ impl ExecutionPlan for RepartitionExec { state: Arc::clone(&self.state), metrics: self.metrics.clone(), preserve_order: self.preserve_order, - cache: new_properties, + cache: new_properties.into(), }))) } } @@ -1235,7 +1253,7 @@ impl RepartitionExec { state: Default::default(), metrics: ExecutionPlanMetricsSet::new(), preserve_order, - cache, + cache: Arc::new(cache), }) } @@ -1296,7 +1314,7 @@ impl RepartitionExec { // to maintain order self.input.output_partitioning().partition_count() > 1; let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order); - self.cache = self.cache.with_eq_properties(eq_properties); + Arc::make_mut(&mut self.cache).set_eq_properties(eq_properties); self } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 73ba889c9e40b..8dd498f10368b 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -57,11 +57,13 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use arrow::compute::concat_batches; @@ -93,7 +95,7 @@ pub struct PartialSortExec { /// Fetch highest/lowest n results fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl PartialSortExec { @@ -114,7 +116,7 @@ impl PartialSortExec { metrics_set: ExecutionPlanMetricsSet::new(), preserve_partitioning, fetch: None, - cache, + cache: Arc::new(cache), } } @@ -132,12 +134,8 @@ impl PartialSortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.cache = self - .cache - .with_partitioning(Self::output_partitioning_helper( - &self.input, - self.preserve_partitioning, - )); + Arc::make_mut(&mut self.cache).partitioning = + Self::output_partitioning_helper(&self.input, self.preserve_partitioning); self } @@ -207,6 +205,16 @@ impl PartialSortExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for PartialSortExec { @@ -255,7 +263,7 @@ impl ExecutionPlan for PartialSortExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -283,6 +291,7 @@ impl ExecutionPlan for PartialSortExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let new_partial_sort = PartialSortExec::new( self.expr.clone(), Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a8361f7b2941e..0158234205d0a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use parking_lot::RwLock; use crate::common::spawn_buffered; -use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, EmissionType, has_same_children_properties, +}; use crate::expressions::PhysicalSortExpr; use crate::filter_pushdown::{ ChildFilterDescription, FilterDescription, FilterPushdownPhase, @@ -951,7 +953,7 @@ pub struct SortExec { /// Normalized common sort prefix between the input and the sort expressions (only used with fetch) common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Filter matching the state of the sort for dynamic filter pushdown. /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. @@ -973,7 +975,7 @@ impl SortExec { preserve_partitioning, fetch: None, common_sort_prefix: sort_prefix, - cache, + cache: Arc::new(cache), filter: None, } } @@ -992,12 +994,8 @@ impl SortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.cache = self - .cache - .with_partitioning(Self::output_partitioning_helper( - &self.input, - self.preserve_partitioning, - )); + Arc::make_mut(&mut self.cache).partitioning = + Self::output_partitioning_helper(&self.input, self.preserve_partitioning); self } @@ -1021,7 +1019,7 @@ impl SortExec { preserve_partitioning: self.preserve_partitioning, common_sort_prefix: self.common_sort_prefix.clone(), fetch: self.fetch, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), filter: self.filter.clone(), } } @@ -1034,12 +1032,12 @@ impl SortExec { /// operation since rows that are not going to be included /// can be dropped. pub fn with_fetch(&self, fetch: Option) -> Self { - let mut cache = self.cache.clone(); + let mut cache = PlanProperties::clone(&self.cache); // If the SortExec can emit incrementally (that means the sort requirements // and properties of the input match), the SortExec can generate its result // without scanning the entire input when a fetch value exists. let is_pipeline_friendly = matches!( - self.cache.emission_type, + cache.emission_type, EmissionType::Incremental | EmissionType::Both ); if fetch.is_some() && is_pipeline_friendly { @@ -1051,7 +1049,7 @@ impl SortExec { }); let mut new_sort = self.cloned(); new_sort.fetch = fetch; - new_sort.cache = cache; + new_sort.cache = cache.into(); new_sort.filter = filter; new_sort } @@ -1206,7 +1204,7 @@ impl ExecutionPlan for SortExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1235,14 +1233,17 @@ impl ExecutionPlan for SortExec { let mut new_sort = self.cloned(); assert_eq!(children.len(), 1, "SortExec should have exactly one child"); new_sort.input = Arc::clone(&children[0]); - // Recompute the properties based on the new input since they may have changed - let (cache, sort_prefix) = Self::compute_properties( - &new_sort.input, - new_sort.expr.clone(), - new_sort.preserve_partitioning, - )?; - new_sort.cache = cache; - new_sort.common_sort_prefix = sort_prefix; + + if !has_same_children_properties(&self, &children)? { + // Recompute the properties based on the new input since they may have changed + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + )?; + new_sort.cache = Arc::new(cache); + new_sort.common_sort_prefix = sort_prefix; + } Ok(Arc::new(new_sort)) } @@ -1466,7 +1467,7 @@ mod tests { pub struct SortedUnboundedExec { schema: Schema, batch_size: u64, - cache: PlanProperties, + cache: Arc, } impl DisplayAs for SortedUnboundedExec { @@ -1506,7 +1507,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -2259,7 +2260,9 @@ mod tests { let source = SortedUnboundedExec { schema: schema.clone(), batch_size: 2, - cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())), + cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new( + schema.clone(), + ))), }; let mut plan = SortExec::new( [PhysicalSortExpr::new_default(Arc::new(Column::new( diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 68c457a0d8a3c..57eb9bcbb0746 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -28,6 +28,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; @@ -35,7 +36,9 @@ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; -use crate::execution_plan::{EvaluationType, SchedulingType}; +use crate::execution_plan::{ + EvaluationType, SchedulingType, has_same_children_properties, +}; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -93,7 +96,7 @@ pub struct SortPreservingMergeExec { /// Optional number of rows to fetch. Stops producing rows after this fetch fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Use round-robin selection of tied winners of loser tree /// /// See [`Self::with_round_robin_repartition`] for more information. @@ -109,7 +112,7 @@ impl SortPreservingMergeExec { expr, metrics: ExecutionPlanMetricsSet::new(), fetch: None, - cache, + cache: Arc::new(cache), enable_round_robin_repartition: true, } } @@ -180,6 +183,16 @@ impl SortPreservingMergeExec { .with_evaluation_type(drive) .with_scheduling_type(scheduling) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for SortPreservingMergeExec { @@ -225,7 +238,7 @@ impl ExecutionPlan for SortPreservingMergeExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -240,7 +253,7 @@ impl ExecutionPlan for SortPreservingMergeExec { expr: self.expr.clone(), metrics: self.metrics.clone(), fetch: limit, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), enable_round_robin_repartition: true, })) } @@ -280,10 +293,11 @@ impl ExecutionPlan for SortPreservingMergeExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new( - SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(&children[0])) + SortPreservingMergeExec::new(self.expr.clone(), children.swap_remove(0)) .with_fetch(self.fetch), )) } @@ -1363,7 +1377,7 @@ mod tests { #[derive(Debug, Clone)] struct CongestedExec { schema: Schema, - cache: PlanProperties, + cache: Arc, congestion: Arc, } @@ -1399,7 +1413,7 @@ mod tests { fn as_any(&self) -> &dyn Any { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } fn children(&self) -> Vec<&Arc> { @@ -1492,7 +1506,7 @@ mod tests { }; let source = CongestedExec { schema: schema.clone(), - cache: properties, + cache: Arc::new(properties), congestion: Arc::new(Congestion::new(partition_count)), }; let spm = SortPreservingMergeExec::new( diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index c8b8d95718cb8..1535482374110 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -67,7 +67,7 @@ pub struct StreamingTableExec { projected_output_ordering: Vec, infinite: bool, limit: Option, - cache: PlanProperties, + cache: Arc, metrics: ExecutionPlanMetricsSet, } @@ -111,7 +111,7 @@ impl StreamingTableExec { projected_output_ordering, infinite, limit, - cache, + cache: Arc::new(cache), metrics: ExecutionPlanMetricsSet::new(), }) } @@ -236,7 +236,7 @@ impl ExecutionPlan for StreamingTableExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -335,7 +335,7 @@ impl ExecutionPlan for StreamingTableExec { projected_output_ordering: self.projected_output_ordering.clone(), infinite: self.infinite, limit, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), metrics: self.metrics.clone(), })) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c6d0940c35480..aa079e73dd086 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -75,7 +75,7 @@ pub struct TestMemoryExec { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. fetch: Option, - cache: PlanProperties, + cache: Arc, } impl DisplayAs for TestMemoryExec { @@ -134,7 +134,7 @@ impl ExecutionPlan for TestMemoryExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -239,7 +239,7 @@ impl TestMemoryExec { Ok(Self { partitions: partitions.to_vec(), schema, - cache: PlanProperties::new( + cache: Arc::new(PlanProperties::new( EquivalenceProperties::new_with_orderings( Arc::clone(&projected_schema), Vec::::new(), @@ -247,7 +247,7 @@ impl TestMemoryExec { Partitioning::UnknownPartitioning(partitions.len()), EmissionType::Incremental, Boundedness::Bounded, - ), + )), projected_schema, projection, sort_information: vec![], @@ -265,7 +265,7 @@ impl TestMemoryExec { ) -> Result> { let mut source = Self::try_new(partitions, schema, projection)?; let cache = source.compute_properties(); - source.cache = cache; + source.cache = Arc::new(cache); Ok(Arc::new(source)) } @@ -273,7 +273,7 @@ impl TestMemoryExec { pub fn update_cache(source: &Arc) -> TestMemoryExec { let cache = source.compute_properties(); let mut source = (**source).clone(); - source.cache = cache; + source.cache = Arc::new(cache); source } @@ -342,7 +342,7 @@ impl TestMemoryExec { } self.sort_information = sort_information; - self.cache = self.compute_properties(); + self.cache = Arc::new(self.compute_properties()); Ok(self) } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 4507cccba05a9..a8b21f70f7760 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -125,7 +125,7 @@ pub struct MockExec { /// if true (the default), sends data using a separate task to ensure the /// batches are not available without this stream yielding first use_task: bool, - cache: PlanProperties, + cache: Arc, } impl MockExec { @@ -142,7 +142,7 @@ impl MockExec { data, schema, use_task: true, - cache, + cache: Arc::new(cache), } } @@ -192,7 +192,7 @@ impl ExecutionPlan for MockExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -299,7 +299,7 @@ pub struct BarrierExec { /// all streams wait on this barrier to produce barrier: Arc, - cache: PlanProperties, + cache: Arc, } impl BarrierExec { @@ -312,7 +312,7 @@ impl BarrierExec { data, schema, barrier, - cache, + cache: Arc::new(cache), } } @@ -364,7 +364,7 @@ impl ExecutionPlan for BarrierExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -429,7 +429,7 @@ impl ExecutionPlan for BarrierExec { /// A mock execution plan that errors on a call to execute #[derive(Debug)] pub struct ErrorExec { - cache: PlanProperties, + cache: Arc, } impl Default for ErrorExec { @@ -446,7 +446,9 @@ impl ErrorExec { true, )])); let cache = Self::compute_properties(schema); - Self { cache } + Self { + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -487,7 +489,7 @@ impl ExecutionPlan for ErrorExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -517,7 +519,7 @@ impl ExecutionPlan for ErrorExec { pub struct StatisticsExec { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsExec { pub fn new(stats: Statistics, schema: Schema) -> Self { @@ -530,7 +532,7 @@ impl StatisticsExec { Self { stats, schema: Arc::new(schema), - cache, + cache: Arc::new(cache), } } @@ -577,7 +579,7 @@ impl ExecutionPlan for StatisticsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -623,7 +625,7 @@ pub struct BlockingExec { /// Ref-counting helper to check if the plan and the produced stream are still in memory. refs: Arc<()>, - cache: PlanProperties, + cache: Arc, } impl BlockingExec { @@ -633,7 +635,7 @@ impl BlockingExec { Self { schema, refs: Default::default(), - cache, + cache: Arc::new(cache), } } @@ -684,7 +686,7 @@ impl ExecutionPlan for BlockingExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -766,7 +768,7 @@ pub struct PanicExec { /// Number of output partitions. Each partition will produce this /// many empty output record batches prior to panicking batches_until_panics: Vec, - cache: PlanProperties, + cache: Arc, } impl PanicExec { @@ -778,7 +780,7 @@ impl PanicExec { Self { schema, batches_until_panics, - cache, + cache: Arc::new(cache), } } @@ -830,7 +832,7 @@ impl ExecutionPlan for PanicExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index b6f943886e309..7a3fa8eb3a3fe 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -32,9 +32,10 @@ use super::{ SendableRecordBatchStream, Statistics, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; +use crate::check_if_same_properties; use crate::execution_plan::{ InvariantLevel, boundedness_from_children, check_default_invariants, - emission_type_from_children, + emission_type_from_children, has_same_children_properties, }; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::metrics::BaselineMetrics; @@ -100,7 +101,7 @@ pub struct UnionExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl UnionExec { @@ -118,7 +119,7 @@ impl UnionExec { UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -147,7 +148,7 @@ impl UnionExec { Ok(Arc::new(UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), })) } } @@ -183,6 +184,17 @@ impl UnionExec { boundedness_from_children(inputs), )) } + + fn with_new_children_and_same_properties( + &self, + children: Vec>, + ) -> Self { + Self { + inputs: children, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for UnionExec { @@ -210,7 +222,7 @@ impl ExecutionPlan for UnionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -259,6 +271,7 @@ impl ExecutionPlan for UnionExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); UnionExec::try_new(children) } @@ -411,7 +424,7 @@ pub struct InterleaveExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl InterleaveExec { @@ -425,7 +438,7 @@ impl InterleaveExec { Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -447,6 +460,17 @@ impl InterleaveExec { boundedness_from_children(inputs), )) } + + fn with_new_children_and_same_properties( + &self, + children: Vec>, + ) -> Self { + Self { + inputs: children, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for InterleaveExec { @@ -474,7 +498,7 @@ impl ExecutionPlan for InterleaveExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -495,6 +519,7 @@ impl ExecutionPlan for InterleaveExec { can_interleave(children.iter()), "Can not create InterleaveExec: new children can not be interleaved" ); + check_if_same_properties!(self, children); Ok(Arc::new(InterleaveExec::try_new(children)?)) } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 5fef754e80780..d8a8937c46b5e 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -26,9 +26,10 @@ use super::metrics::{ RecordOutput, }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; +use crate::execution_plan::has_same_children_properties; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, - SendableRecordBatchStream, + SendableRecordBatchStream, check_if_same_properties, }; use arrow::array::{ @@ -74,7 +75,7 @@ pub struct UnnestExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl UnnestExec { @@ -100,7 +101,7 @@ impl UnnestExec { struct_column_indices, options, metrics: Default::default(), - cache, + cache: Arc::new(cache), }) } @@ -193,6 +194,17 @@ impl UnnestExec { pub fn options(&self) -> &UnnestOptions { &self.options } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for UnnestExec { @@ -221,7 +233,7 @@ impl ExecutionPlan for UnnestExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -231,10 +243,11 @@ impl ExecutionPlan for UnnestExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(UnnestExec::new( - Arc::clone(&children[0]), + children.swap_remove(0), self.list_column_indices.clone(), self.struct_column_indices.clone(), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 987a400ec369e..9dd61b2170cf6 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, @@ -36,7 +37,7 @@ use crate::windows::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, + SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -93,7 +94,7 @@ pub struct BoundedWindowAggExec { // See `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// If `can_rerepartition` is false, partition_keys is always empty. can_repartition: bool, } @@ -134,7 +135,7 @@ impl BoundedWindowAggExec { metrics: ExecutionPlanMetricsSet::new(), input_order_mode, ordered_partition_by_indices, - cache, + cache: Arc::new(cache), can_repartition, }) } @@ -248,6 +249,17 @@ impl BoundedWindowAggExec { total_byte_size: Precision::Absent, }) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for BoundedWindowAggExec { @@ -304,7 +316,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -339,6 +351,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(BoundedWindowAggExec::try_new( self.window_expr.clone(), Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index aa99f4f49885a..734a20800641b 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; -use crate::execution_plan::EmissionType; +use crate::execution_plan::{EmissionType, has_same_children_properties}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, @@ -32,7 +32,7 @@ use crate::windows::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, + SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, }; use arrow::array::ArrayRef; @@ -65,7 +65,7 @@ pub struct WindowAggExec { // see `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// If `can_partition` is false, partition_keys is always empty. can_repartition: bool, } @@ -89,7 +89,7 @@ impl WindowAggExec { schema, metrics: ExecutionPlanMetricsSet::new(), ordered_partition_by_indices, - cache, + cache: Arc::new(cache), can_repartition, }) } @@ -158,6 +158,17 @@ impl WindowAggExec { .unwrap_or_else(Vec::new) } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for WindowAggExec { @@ -206,7 +217,7 @@ impl ExecutionPlan for WindowAggExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -242,11 +253,12 @@ impl ExecutionPlan for WindowAggExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), true, )?)) } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 1313909adbba2..b01ea513a8ede 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -109,7 +109,7 @@ pub struct WorkTableExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl WorkTableExec { @@ -129,7 +129,7 @@ impl WorkTableExec { projection, work_table: Arc::new(WorkTable::new(name)), metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -181,7 +181,7 @@ impl ExecutionPlan for WorkTableExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -263,7 +263,7 @@ impl ExecutionPlan for WorkTableExec { projection: self.projection.clone(), metrics: ExecutionPlanMetricsSet::new(), work_table, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } } diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 8e1dee9e843ac..50005a7527da0 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -108,7 +108,7 @@ impl ExecutionPlan for CustomExec { } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } @@ -232,7 +232,7 @@ The `scan` method of the `TableProvider` returns a `Result &PlanProperties { +# fn properties(&self) -> &Arc { # unreachable!() # } # @@ -424,7 +424,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # } # # -# fn properties(&self) -> &PlanProperties { +# fn properties(&self) -> &Arc { # unreachable!() # } # diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index f5c4fc09214df..57bf9dfc8b0e6 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -61,6 +61,61 @@ FileSinkConfig { } ``` +### `ExecutionPlan` properties method return type + +Now `ExecutionPlan::properties()` must return `&Arc` instead of a reference. This was done to enable the comparison of properties and to determine that they have not changed within the +`with_new_children` method. To migrate, in all `ExecutionPlan` implementations, you need to wrap stored `PlanProperties` in an `Arc`: + +```diff +- cache: PlanProperties, ++ cache: Arc, + +... + +- fn properties(&self) -> &PlanProperties { ++ fn properties(&self) -> &Arc { + &self.cache + } +``` + +Note: The optimization for `with_new_children` can be implemented for any `ExecutionPlan`. This can reduce planning time as well as the time for resetting plan states. +To support it, you can use the macro: `check_if_same_properties`. For it to work, you need to implement the function: `with_new_children_and_same_properties` with semantics +identical to `with_new_children`, but operating under the assumption that the properties of the children plans have not changed. + +An example of supporting this optimization for `ProjectionExec`: + +```diff + impl ProjectionExec { ++ fn with_new_children_and_same_properties( ++ &self, ++ mut children: Vec>, ++ ) -> Self { ++ Self { ++ input: children.swap_remove(0), ++ metrics: ExecutionPlanMetricsSet::new(), ++ ..Self::clone(self) ++ } ++ } + } + + impl ExecutionPlan for ProjectionExec { + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { ++ check_if_same_properties!(self, children); + ProjectionExec::try_new( + self.projector.projection().into_iter().cloned(), + children.swap_remove(0), + ) + .map(|p| Arc::new(p) as _) + } + } + +... + +``` + ### `SimplifyInfo` trait removed, `SimplifyContext` now uses builder-style API The `SimplifyInfo` trait has been removed and replaced with the concrete `SimplifyContext` struct. This simplifies the expression simplification API and removes the need for trait objects.