diff --git a/crates/core/common/src/context/exec.rs b/crates/core/common/src/context/exec.rs index 744a37020..65489b83f 100644 --- a/crates/core/common/src/context/exec.rs +++ b/crates/core/common/src/context/exec.rs @@ -11,6 +11,8 @@ use datafusion::{ self, arrow::array::RecordBatch, catalog::{AsyncCatalogProvider as TableAsyncCatalogProvider, MemorySchemaProvider}, + common::tree_node::Transformed, + datasource::{DefaultTableSource, TableType}, error::DataFusionError, execution::{ RecordBatchStream, SendableRecordBatchStream, TaskContext, @@ -20,7 +22,7 @@ use datafusion::{ memory_pool::{MemoryPool, human_readable_size}, object_store::ObjectStoreRegistry, }, - logical_expr::LogicalPlan, + logical_expr::{LogicalPlan, TableScan}, physical_optimizer::PhysicalOptimizerRule, physical_plan::{ExecutionPlan, displayable, execute_stream, stream::RecordBatchStreamAdapter}, sql::parser, @@ -72,6 +74,15 @@ pub struct ExecContext { } impl ExecContext { + /// Attaches a detached logical plan to this query context by replacing + /// `PlanTable` table sources in `TableScan` nodes with actual + /// `QueryableSnapshot` providers from the catalog. + #[tracing::instrument(skip_all, err)] + pub(crate) fn attach(&self, plan: LogicalPlan) -> Result { + plan.transform_with_subqueries(|node| attach_table_node(node, self)) + .map(|t| t.data) + } + /// Returns the physical catalog snapshot backing this query context. /// /// Exposes segment-level data for streaming query consumers that need to @@ -710,6 +721,30 @@ pub enum RegisterTableError { RegisterUdf(#[source] DataFusionError), } +/// Replaces `PlanTable` table sources with actual `QueryableSnapshot` providers +/// in a single plan node. +fn attach_table_node( + mut node: LogicalPlan, + ctx: &ExecContext, +) -> Result, DataFusionError> { + match &mut node { + LogicalPlan::TableScan(TableScan { + table_name, source, .. + }) if source.table_type() == TableType::Base && source.get_logical_plan().is_none() => { + let table_ref: TableReference = table_name + .clone() + .try_into() + .map_err(|err| DataFusionError::External(Box::new(err)))?; + let provider = ctx + .get_table(&table_ref) + .map_err(|err| DataFusionError::External(Box::new(err)))?; + *source = Arc::new(DefaultTableSource::new(provider)); + Ok(Transformed::yes(node)) + } + _ => Ok(Transformed::no(node)), + } +} + /// `logical_optimize` controls whether logical optimizations should be applied to `plan`. #[tracing::instrument(skip_all, err)] async fn execute_plan( diff --git a/crates/core/common/src/detached_logical_plan.rs b/crates/core/common/src/detached_logical_plan.rs index 3f29c3c87..117732608 100644 --- a/crates/core/common/src/detached_logical_plan.rs +++ b/crates/core/common/src/detached_logical_plan.rs @@ -1,20 +1,9 @@ -use std::sync::Arc; - -use datafusion::{ - common::{ - DFSchemaRef, - tree_node::{Transformed, TreeNode, TreeNodeRecursion}, - }, - datasource::{DefaultTableSource, TableType}, - error::DataFusionError, - logical_expr::{LogicalPlan, TableScan}, -}; +use datafusion::{common::DFSchemaRef, error::DataFusionError, logical_expr::LogicalPlan}; use crate::{ context::exec::ExecContext, incrementalizer::NonIncrementalQueryError, plan_visitors::{is_incremental, propagate_block_num}, - sql::TableReference, }; /// A plan that has `PlanTable` for its `TableProvider`s. It cannot be executed before being @@ -28,35 +17,6 @@ impl DetachedLogicalPlan { Self(plan) } - /// Attaches this plan to a query context by replacing `PlanTable` providers - /// with actual `QueryableSnapshot` providers from the catalog. - #[tracing::instrument(skip_all, err)] - pub fn attach_to(self, ctx: &ExecContext) -> Result { - Ok(self - .0 - .transform_with_subqueries(|mut node| match &mut node { - // Insert the clauses in non-view table scans - LogicalPlan::TableScan(TableScan { - table_name, source, .. - }) if source.table_type() == TableType::Base - && source.get_logical_plan().is_none() => - { - let table_ref: TableReference = table_name - .clone() - .try_into() - .map_err(|e| DataFusionError::External(Box::new(e)))?; - let provider = ctx - .get_table(&table_ref) - .map_err(|e| DataFusionError::External(e.into()))?; - *source = Arc::new(DefaultTableSource::new(provider)); - Ok(Transformed::yes(node)) - } - _ => Ok(Transformed::no(node)), - }) - .map_err(AttachPlanError)? - .data) - } - /// Validates that the plan can be processed incrementally. pub fn is_incremental(&self) -> Result<(), NonIncrementalQueryError> { is_incremental(&self.0) @@ -72,12 +32,12 @@ impl DetachedLogicalPlan { Ok(Self(propagate_block_num(self.0)?)) } - /// Applies a visitor closure to each node in the logical plan tree. - pub fn apply(&self, f: F) -> Result - where - F: FnMut(&LogicalPlan) -> Result, - { - self.0.apply(f) + /// Attaches this plan to a query context, replacing `PlanTable` table + /// sources and `PlanJsUdf`-backed scalar functions with execution-ready + /// providers. + #[tracing::instrument(skip_all, err)] + pub fn attach_to(self, ctx: &ExecContext) -> Result { + ctx.attach(self.0).map_err(AttachPlanError) } }