Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
// Try to parse as our extension payload
if let Ok(payload) = serde_json::from_slice::<ExtensionPayload>(buf)
Expand Down Expand Up @@ -302,6 +303,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
// We don't need this for the example - we use serialize_physical_plan instead
not_impl_err!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::{
AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec,
PhysicalProtoConverterExtension,
};
use datafusion_proto::protobuf;

Expand Down Expand Up @@ -150,6 +151,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ParentExec".as_bytes() {
Ok(Arc::new(ParentExec {
Expand All @@ -160,7 +162,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
}
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>, _proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> {
if node.as_any().downcast_ref::<ParentExec>().is_some() {
buf.extend_from_slice("ParentExec".as_bytes());
Ok(())
Expand Down Expand Up @@ -235,6 +237,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ChildExec".as_bytes() {
Ok(Arc::new(ChildExec {}))
Expand All @@ -243,7 +246,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
}
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>, _proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> {
if node.as_any().downcast_ref::<ChildExec>().is_some() {
buf.extend_from_slice("ChildExec".as_bytes());
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl PhysicalExtensionCodec for CachingCodec {
_buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
datafusion::common::not_impl_err!("No custom extension nodes")
}
Expand All @@ -196,6 +197,7 @@ impl PhysicalExtensionCodec for CachingCodec {
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
datafusion::common::not_impl_err!("No custom extension nodes")
}
Expand Down
17 changes: 10 additions & 7 deletions datafusion/ffi/src/proto/physical_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_expr::{
AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::physical_plan::{DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalProtoConverterExtension};
use tokio::runtime::Handle;

use crate::execution::FFI_TaskContextProvider;
Expand Down Expand Up @@ -142,7 +142,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
let inputs = rresult_return!(inputs);

let plan =
rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref(), &DefaultPhysicalProtoConverter));

RResult::ROk(FFI_ExecutionPlan::new(plan, None))
}
Expand All @@ -156,7 +156,7 @@ unsafe extern "C" fn try_encode_fn_wrapper(
let plan: Arc<dyn ExecutionPlan> = rresult_return!((&node).try_into());

let mut bytes = Vec::new();
rresult_return!(codec.try_encode(plan, &mut bytes));
rresult_return!(codec.try_encode(plan, &mut bytes, &DefaultPhysicalProtoConverter));

RResult::ROk(bytes.into())
}
Expand Down Expand Up @@ -327,6 +327,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
let inputs = inputs
.iter()
Expand All @@ -340,7 +341,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
Ok(plan)
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>, _proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> {
let plan = FFI_ExecutionPlan::new(node, None);
let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;

Expand Down Expand Up @@ -418,7 +419,7 @@ pub(crate) mod tests {
use datafusion_functions_aggregate::sum::Sum;
use datafusion_functions_window::rank::{Rank, RankType};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::physical_plan::{DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalProtoConverterExtension};

use crate::execution_plan::tests::EmptyExec;
use crate::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
Expand All @@ -441,6 +442,7 @@ pub(crate) mod tests {
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf[0] != Self::MAGIC_NUMBER {
return exec_err!(
Expand All @@ -459,6 +461,7 @@ pub(crate) mod tests {
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
buf.push(Self::MAGIC_NUMBER);

Expand Down Expand Up @@ -579,10 +582,10 @@ pub(crate) mod tests {
let exec = create_test_exec();
let input_execs = [create_test_exec()];
let mut bytes = Vec::new();
foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;
foreign_codec.try_encode(Arc::clone(&exec), &mut bytes, &DefaultPhysicalProtoConverter)?;

let returned_exec =
foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref(), &DefaultPhysicalProtoConverter)?;

assert!(returned_exec.as_any().is::<EmptyExec>());

Expand Down
21 changes: 15 additions & 6 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ impl protobuf::PhysicalPlanNode {
}

let mut buf: Vec<u8> = vec![];
match codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
match codec.try_encode(Arc::clone(&plan_clone), &mut buf, proto_converter) {
Ok(_) => {
let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
.children()
Expand Down Expand Up @@ -1740,7 +1740,7 @@ impl protobuf::PhysicalPlanNode {
.map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i))
.collect::<Result<_>>()?;

let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?;
let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx, proto_converter)?;

Ok(extension_node)
}
Expand Down Expand Up @@ -3660,9 +3660,15 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
ctx: &TaskContext,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>>;

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()>;

fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
Expand Down Expand Up @@ -3716,6 +3722,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
_buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("PhysicalExtensionCodec is not provided")
}
Expand All @@ -3724,6 +3731,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
not_impl_err!("PhysicalExtensionCodec is not provided")
}
Expand Down Expand Up @@ -4108,12 +4116,13 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
ctx: &TaskContext,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx, proto_converter))
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>, proto_converter: &dyn PhysicalProtoConverterExtension) -> Result<()> {
self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data, proto_converter))
}

fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
Expand Down
Loading