From 9383cdfb43430320f7c4f1696c37f37fb8608b3b Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 27 Feb 2026 14:30:20 +0000 Subject: [PATCH] add proto converter reference to PhysicalExtensionCodec to allow dynamic filter deduping to work in custom plan nodes --- .../adapter_serialization.rs | 2 + .../proto/composed_extension_codec.rs | 7 +- .../proto/expression_deduplication.rs | 2 + .../ffi/src/proto/physical_extension_codec.rs | 17 +- datafusion/proto/src/physical_plan/mod.rs | 21 +- .../tests/cases/roundtrip_physical_plan.rs | 246 +++++++++++++++++- 6 files changed, 279 insertions(+), 16 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs index a2cd187fee067..80a10ecf65f13 100644 --- a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs +++ b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs @@ -274,6 +274,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { // Try to parse as our extension payload if let Ok(payload) = serde_json::from_slice::(buf) @@ -302,6 +303,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { // We don't need this for the example - we use serialize_physical_plan instead not_impl_err!( diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index df3d58b7bfb81..f05fe917118a1 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -44,6 +44,7 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_proto::physical_plan::{ AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec, + PhysicalProtoConverterExtension, }; use datafusion_proto::protobuf; @@ -150,6 +151,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "ParentExec".as_bytes() { Ok(Arc::new(ParentExec { @@ -160,7 +162,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { } } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode(&self, node: Arc, buf: &mut Vec, _proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> { if node.as_any().downcast_ref::().is_some() { buf.extend_from_slice("ParentExec".as_bytes()); Ok(()) @@ -235,6 +237,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "ChildExec".as_bytes() { Ok(Arc::new(ChildExec {})) @@ -243,7 +246,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { } } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode(&self, node: Arc, buf: &mut Vec, _proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> { if node.as_any().downcast_ref::().is_some() { buf.extend_from_slice("ChildExec".as_bytes()); Ok(()) diff --git a/datafusion-examples/examples/proto/expression_deduplication.rs b/datafusion-examples/examples/proto/expression_deduplication.rs index 0dec807f8043a..a591f41d8682a 100644 --- a/datafusion-examples/examples/proto/expression_deduplication.rs +++ b/datafusion-examples/examples/proto/expression_deduplication.rs @@ -187,6 +187,7 @@ impl PhysicalExtensionCodec for CachingCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { datafusion::common::not_impl_err!("No custom extension nodes") } @@ -196,6 +197,7 @@ impl PhysicalExtensionCodec for CachingCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { datafusion::common::not_impl_err!("No custom extension nodes") } diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs b/datafusion/ffi/src/proto/physical_extension_codec.rs index 0577e72366478..a077ca0e675a9 100644 --- a/datafusion/ffi/src/proto/physical_extension_codec.rs +++ b/datafusion/ffi/src/proto/physical_extension_codec.rs @@ -26,7 +26,7 @@ use datafusion_expr::{ AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl, }; use datafusion_physical_plan::ExecutionPlan; -use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use datafusion_proto::physical_plan::{DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalProtoConverterExtension}; use tokio::runtime::Handle; use crate::execution::FFI_TaskContextProvider; @@ -142,7 +142,7 @@ unsafe extern "C" fn try_decode_fn_wrapper( let inputs = rresult_return!(inputs); let plan = - rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref())); + rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref(), &DefaultPhysicalProtoConverter)); RResult::ROk(FFI_ExecutionPlan::new(plan, None)) } @@ -156,7 +156,7 @@ unsafe extern "C" fn try_encode_fn_wrapper( let plan: Arc = rresult_return!((&node).try_into()); let mut bytes = Vec::new(); - rresult_return!(codec.try_encode(plan, &mut bytes)); + rresult_return!(codec.try_encode(plan, &mut bytes, &DefaultPhysicalProtoConverter)); RResult::ROk(bytes.into()) } @@ -327,6 +327,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let inputs = inputs .iter() @@ -340,7 +341,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { Ok(plan) } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode(&self, node: Arc, buf: &mut Vec, _proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> { let plan = FFI_ExecutionPlan::new(node, None); let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?; @@ -418,7 +419,7 @@ pub(crate) mod tests { use datafusion_functions_aggregate::sum::Sum; use datafusion_functions_window::rank::{Rank, RankType}; use datafusion_physical_plan::ExecutionPlan; - use datafusion_proto::physical_plan::PhysicalExtensionCodec; + use datafusion_proto::physical_plan::{DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalProtoConverterExtension}; use crate::execution_plan::tests::EmptyExec; use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec; @@ -441,6 +442,7 @@ pub(crate) mod tests { buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf[0] != Self::MAGIC_NUMBER { return exec_err!( @@ -459,6 +461,7 @@ pub(crate) mod tests { &self, node: Arc, buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { buf.push(Self::MAGIC_NUMBER); @@ -579,10 +582,10 @@ pub(crate) mod tests { let exec = create_test_exec(); let input_execs = [create_test_exec()]; let mut bytes = Vec::new(); - foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?; + foreign_codec.try_encode(Arc::clone(&exec), &mut bytes, &DefaultPhysicalProtoConverter)?; let returned_exec = - foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?; + foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref(), &DefaultPhysicalProtoConverter)?; assert!(returned_exec.as_any().is::()); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 99cd39d9f2bef..0bb0482064e9b 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -569,7 +569,7 @@ impl protobuf::PhysicalPlanNode { } let mut buf: Vec = vec![]; - match codec.try_encode(Arc::clone(&plan_clone), &mut buf) { + match codec.try_encode(Arc::clone(&plan_clone), &mut buf, proto_converter) { Ok(_) => { let inputs: Vec = plan_clone .children() @@ -1740,7 +1740,7 @@ impl protobuf::PhysicalPlanNode { .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i)) .collect::>()?; - let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?; + let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx, proto_converter)?; Ok(extension_node) } @@ -3660,9 +3660,15 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { buf: &[u8], inputs: &[Arc], ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result>; - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()>; + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()>; fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}") @@ -3716,6 +3722,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3724,6 +3731,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -4108,12 +4116,13 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx)) + self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx, proto_converter)) } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { - self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data)) + fn try_encode(&self, node: Arc, buf: &mut Vec, proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> { + self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data, proto_converter)) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0a5ed766e6cc1..bfbcb70d64db2 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -83,7 +83,8 @@ use datafusion::physical_plan::windows::{ create_udwf_window_expr, }; use datafusion::physical_plan::{ - ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr, Statistics, displayable, + DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, Partitioning, + PhysicalExpr, SendableRecordBatchStream, Statistics, displayable, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion::scalar::ScalarValue; @@ -1141,6 +1142,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { unreachable!() } @@ -1149,6 +1151,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { unreachable!() } @@ -1254,6 +1257,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("No extension codec provided") } @@ -1262,6 +1266,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("No extension codec provided") } @@ -3196,3 +3201,242 @@ fn roundtrip_lead_with_default_value() -> Result<()> { true, )?)) } + +/// A custom ExecutionPlan that stores `Vec>` fields, +/// simulating a custom scan node (like NumpangFileSource) that has dynamic +/// filters pushed down during optimization. +struct CustomExecWithExprs { + exprs: Vec>, + child: Arc, + properties: datafusion::physical_plan::PlanProperties, +} + +impl std::fmt::Debug for CustomExecWithExprs { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CustomExecWithExprs") + .field("exprs", &self.exprs) + .field("child", &self.child) + .finish() + } +} + +impl CustomExecWithExprs { + fn new( + exprs: Vec>, + child: Arc, + ) -> Self { + use datafusion_physical_expr::equivalence::EquivalenceProperties; + use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + let properties = datafusion::physical_plan::PlanProperties::new( + EquivalenceProperties::new(child.schema()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + exprs, + child, + properties, + } + } +} + +impl DisplayAs for CustomExecWithExprs { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "CustomExecWithExprs") + } +} + +impl ExecutionPlan for CustomExecWithExprs { + fn name(&self) -> &str { + "CustomExecWithExprs" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unreachable!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!() + } +} + +/// A PhysicalExtensionCodec that uses proto_converter to serialize/deserialize +/// the PhysicalExpr fields stored in CustomExecWithExprs. +#[derive(Debug)] +struct CustomExecWithExprsCodec { + /// The schema used for expression deserialization (shared between encode/decode). + schema: Arc, +} + +impl PhysicalExtensionCodec for CustomExecWithExprsCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result> { + let num_exprs = + u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize; + let mut offset = 4; + + let mut exprs = Vec::with_capacity(num_exprs); + for _ in 0..num_exprs { + let expr_len = + u32::from_le_bytes(buf[offset..offset + 4].try_into().unwrap()) + as usize; + offset += 4; + let expr_proto = PhysicalExprNode::decode(&buf[offset..offset + expr_len]) + .map_err(|e| { + internal_datafusion_err!("Failed to decode expression: {e}") + })?; + let expr = proto_converter.proto_to_physical_expr( + &expr_proto, + ctx, + &self.schema, + self, + )?; + exprs.push(expr); + offset += expr_len; + } + + Ok(Arc::new(CustomExecWithExprs::new( + exprs, + inputs[0].clone(), + ))) + } + + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { + let custom = node + .as_any() + .downcast_ref::() + .ok_or_else(|| { + internal_datafusion_err!("Expected CustomExecWithExprs") + })?; + + buf.extend_from_slice(&(custom.exprs.len() as u32).to_le_bytes()); + for expr in &custom.exprs { + let expr_proto = proto_converter.physical_expr_to_proto(expr, self)?; + let expr_bytes = expr_proto.encode_to_vec(); + buf.extend_from_slice(&(expr_bytes.len() as u32).to_le_bytes()); + buf.extend_from_slice(&expr_bytes); + } + + Ok(()) + } +} + +/// Tests that a custom ExecutionPlan node storing PhysicalExpr fields can +/// use the proto_converter parameter to serialize/deserialize those expressions +/// with deduplication support. When the same DynamicFilterPhysicalExpr is shared +/// between the custom node and another part of the plan (e.g., a FilterExec), +/// the inner_id should be preserved after roundtrip (proving dedup works). +#[test] +fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + ])); + + // Create a dynamic filter expression + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )); + let dynamic_filter_expr = dynamic_filter.clone() as Arc; + + // Create the plan: + // FilterExec(dynamic_filter) + // -> CustomExecWithExprs(exprs: [dynamic_filter]) + // -> EmptyExec + // + // The same dynamic_filter Arc is stored in both the FilterExec and the custom node. + let empty = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let custom_exec = Arc::new(CustomExecWithExprs::new( + vec![Arc::clone(&dynamic_filter_expr)], + empty, + )); + let filter_exec = + Arc::new(FilterExec::try_new(Arc::clone(&dynamic_filter_expr), custom_exec)?) + as Arc; + + // Roundtrip with DeduplicatingProtoConverter + let codec = CustomExecWithExprsCodec { + schema: Arc::clone(&schema), + }; + let converter = DeduplicatingProtoConverter {}; + + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&filter_exec), + &codec, + &converter, + )?; + + let ctx = SessionContext::new(); + let deser_converter = DeduplicatingProtoConverter {}; + let deserialized = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), + &codec, + &deser_converter, + )?; + + // Extract the deserialized FilterExec's dynamic filter + let deser_filter = deserialized + .as_any() + .downcast_ref::() + .expect("Top-level should be FilterExec"); + let deser_filter_df = deser_filter + .predicate() + .as_any() + .downcast_ref::() + .expect("FilterExec predicate should be DynamicFilterPhysicalExpr"); + + // Extract the deserialized custom node's dynamic filter + let deser_custom = deser_filter + .input() + .as_any() + .downcast_ref::() + .expect("FilterExec child should be CustomExecWithExprs"); + assert_eq!(deser_custom.exprs.len(), 1, "Should have one expression"); + let deser_custom_df = deser_custom.exprs[0] + .as_any() + .downcast_ref::() + .expect("Custom node expr should be DynamicFilterPhysicalExpr"); + + // After roundtrip with deduplication, both references should share the same + // inner state (same inner_id), proving that proto_converter was used correctly + // within the custom codec and deduplication was preserved across the plan boundary. + assert_eq!( + deser_filter_df.inner_id(), + deser_custom_df.inner_id(), + "FilterExec's dynamic filter and CustomExecWithExprs's dynamic filter \ + should share the same inner state after roundtrip with deduplication" + ); + + Ok(()) +}