diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9b6e6aa5dac37..13af8c7476e31 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1206,6 +1206,11 @@ config_namespace! { /// "summary" shows common metrics for high-level insights. /// "dev" provides deep operator-level introspection for developers. pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev + + /// Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the + /// [`PlanObserver`] are called. By default, the [`SessionContext`] uses the + /// [`DefaultPlanObserver`] implementation. + pub auto_explain: bool, default = false } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ad254d61b5c0b..a5e6286774e2f 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -24,6 +24,7 @@ use std::time::Duration; use super::options::ReadOptions; use crate::datasource::dynamic_file::DynamicListTableFactory; +use crate::execution::plan_observer::PlanObserver; use crate::execution::session_state::SessionStateBuilder; use crate::{ catalog::listing_schema::ListingSchemaProvider, @@ -1971,6 +1972,14 @@ impl SessionContext { .write() .register_table_options_extension(extension) } + + /// Sets a new plan observer, used for the auto_explain feature. Note that the auto explain mode + /// needs to be enabled for the methods in [`PlanObserver`] to be called, through the + /// `datafusion.explain.auto_explain` config. + pub fn with_plan_observer(self, plan_observer: Arc) -> Self { + self.state.write().set_plan_observer(plan_observer); + self + } } impl FunctionRegistry for SessionContext { @@ -2251,7 +2260,11 @@ mod tests { use arrow_schema::FieldRef; use datafusion_common::DataFusionError; use datafusion_common::datatype::DataTypeExt; + use datafusion_expr::{col, lit}; + use regex::Regex; use std::error::Error; + use std::fs; + use std::io::Write; use std::path::PathBuf; use datafusion_common::test_util::batches_to_string; @@ -2986,4 +2999,169 @@ mod tests { assert!(have.unwrap_err().to_string().contains(MEMORY_LIMIT)); } } + + #[derive(Debug)] + struct TestPlanObserver { + output: String, + } + + impl TestPlanObserver { + pub fn new(output: String) -> Self { + Self { output } + } + } + + impl PlanObserver for TestPlanObserver { + fn plan_created(&self, _id: String, sql: Option) -> Result<()> { + let Some(sql) = sql else { + return Ok(()); + }; + let fd = &mut fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.output)?; + writeln!(fd, "QUERY: {sql}")?; + Ok(()) + } + + fn plan_executed( + &self, + _id: String, + explain_result: RecordBatch, + _duration_nanos: u128, + ) -> Result<()> { + let analyze = arrow::util::pretty::pretty_format_batches(&[explain_result])? + .to_string(); + // the plan is simplified to become deterministic + let re = Regex::new(r"\s+\w+?Exec").unwrap(); + let plan = re + .find_iter(&analyze) + .map(|m| m.as_str()) + .collect::>() + .join("\n"); + + let fd = &mut fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.output)?; + writeln!(fd, "EXPLAIN:\n{plan}\n")?; + + Ok(()) + } + } + + #[tokio::test] + async fn test_auto_explain() -> Result<()> { + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("auto_explain_output.txt"); + let plan_observer = + TestPlanObserver::new(output_path.as_os_str().to_str().unwrap().to_owned()); + + let ctx = SessionContext::new().with_plan_observer(Arc::new(plan_observer)); + ctx.sql("create table t (k int, v int)") + .await? + .collect() + .await?; + ctx.sql("insert into t values (1, 10), (2, 20), (3, 30)") + .await? + .collect() + .await?; + + // without enabling the auto explain mode, the plan is not exported + ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5") + .await? + .collect() + .await?; + assert!(!output_path.exists()); + + // enabling the auto explain mode + ctx.sql("set datafusion.explain.auto_explain = true") + .await? + .collect() + .await?; + let result = ctx + .sql("select * from t where k = 1 or k = 2 order by v desc limit 5") + .await? + .collect() + .await?; + // query output is not affected + assert_snapshot!( + batches_to_string(&result), + @r" + +---+----+ + | k | v | + +---+----+ + | 2 | 20 | + | 1 | 10 | + +---+----+ + " + ); + // the first output comes from the "set" query + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + QUERY: SELECT * + EXPLAIN: + EmptyExec + + QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5 + EXPLAIN: + SortExec + FilterExec + DataSourceExec + "); + + // also works with the dataframe API + ctx.table("t") + .await? + .filter(col("v").lt(lit(11)))? + .select(vec![col("k")])? + .collect() + .await?; + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + QUERY: SELECT * + EXPLAIN: + EmptyExec + + QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5 + EXPLAIN: + SortExec + FilterExec + DataSourceExec + + QUERY: SELECT t.k FROM t WHERE (t.v < 11) + EXPLAIN: + FilterExec + DataSourceExec + "); + + // when disabling the auto explain mode the plan is again not exported + ctx.sql("set datafusion.explain.auto_explain = false") + .await? + .collect() + .await?; + ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5") + .await? + .collect() + .await?; + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + QUERY: SELECT * + EXPLAIN: + EmptyExec + + QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5 + EXPLAIN: + SortExec + FilterExec + DataSourceExec + + QUERY: SELECT t.k FROM t WHERE (t.v < 11) + EXPLAIN: + FilterExec + DataSourceExec + "); + + Ok(()) + } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9560616c1b6da..3992df18c3b5f 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -29,6 +29,7 @@ use crate::datasource::file_format::FileFormatFactory; use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; +use crate::execution::plan_observer::{DefaultPlanObserver, PlanObserver}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; @@ -189,6 +190,8 @@ pub struct SessionState { /// Cache logical plans of prepared statements for later execution. /// Key is the prepared statement name. prepared_plans: HashMap>, + /// Plan observer used for the auto explain mode (if enabled). + plan_observer: Option>, } impl Debug for SessionState { @@ -963,6 +966,16 @@ impl SessionState { None => exec_err!("Prepared statement '{}' does not exist", name), } } + + /// Set a new plan observer. + pub(crate) fn set_plan_observer(&mut self, plan_observer: Arc) { + self.plan_observer = Some(plan_observer); + } + + /// Returns a ref to the current plan observer. + pub(crate) fn plan_observer(&self) -> &Option> { + &self.plan_observer + } } /// A builder to be used for building [`SessionState`]'s. Defaults will @@ -1503,6 +1516,7 @@ impl SessionStateBuilder { function_factory, cache_factory, prepared_plans: HashMap::new(), + plan_observer: Some(Arc::new(DefaultPlanObserver::default())), }; if let Some(file_formats) = file_formats { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b4fb44f670e8d..fee4b28835491 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -27,6 +27,7 @@ use crate::datasource::physical_plan::{FileOutputMode, FileSinkConfig}; use crate::datasource::{DefaultTableSource, source_as_provider}; use crate::error::{DataFusionError, Result}; use crate::execution::context::{ExecutionProps, SessionState}; +use crate::execution::plan_observer::PlanObserver; use crate::logical_expr::utils::generate_sort_key; use crate::logical_expr::{ Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window, @@ -106,6 +107,7 @@ use datafusion_physical_plan::unnest::ListUnnest; use async_trait::async_trait; use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper}; +use datafusion_sql::unparser::Unparser; use futures::{StreamExt, TryStreamExt}; use itertools::{Itertools, multiunzip}; use log::debug; @@ -274,7 +276,24 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .create_initial_plan(logical_plan, session_state) .await?; - self.optimize_physical_plan(plan, session_state, |_, _| {}) + let mut plan = self.optimize_physical_plan(plan, session_state, |_, _| {})?; + + // setup the auto explain mode if necessary + if session_state.config().options().explain.auto_explain + && let Some(plan_observer) = &session_state.plan_observer() + { + let id = uuid::Uuid::new_v4().to_string(); + let sql = if let Ok(stmt) = Unparser::default().plan_to_sql(logical_plan) { + Some(stmt.to_string()) + } else { + None + }; + plan_observer.plan_created(id.clone(), sql)?; + plan = + self.add_auto_explain(plan, session_state, Arc::clone(plan_observer), id); + } + + Ok(plan) } /// Create a physical expression from a logical expression @@ -1833,6 +1852,33 @@ impl DefaultPhysicalPlanner { )) } } + + /// Returns a new plan wrapped in an `AnalyzeExec` auto_explain operator. + fn add_auto_explain( + &self, + plan: Arc, + session_state: &SessionState, + plan_observer: Arc, + id: String, + ) -> Arc { + let options = session_state.config().options(); + let show_statistics = options.explain.show_statistics; + let analyze_level = options.explain.analyze_level; + let metric_types = match analyze_level { + ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY], + ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV], + }; + + let mut plan = AnalyzeExec::new( + false, + show_statistics, + metric_types, + plan, + LogicalPlan::explain_schema(), + ); + plan.enable_auto_explain(plan_observer, id); + Arc::new(plan) + } } /// Expand and align a GROUPING SET expression. diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 1a8da9459ae10..2ed90221770f7 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -34,6 +34,7 @@ pub mod memory_pool; pub mod object_store; #[cfg(feature = "parquet_encryption")] pub mod parquet_encryption; +pub mod plan_observer; pub mod runtime_env; mod stream; mod task; diff --git a/datafusion/execution/src/plan_observer.rs b/datafusion/execution/src/plan_observer.rs new file mode 100644 index 0000000000000..a7703b38220fb --- /dev/null +++ b/datafusion/execution/src/plan_observer.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Write; +use std::{fmt::Debug, fs}; + +use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; +use dashmap::DashMap; +use datafusion_common::error::Result; + +/// Used to implement the auto_explain feature. +pub trait PlanObserver: Send + Sync + 'static + Debug { + /// Called after the physical plan has been created but before it has been executed. + /// Receives an identifier and a SQL representation of the query. + /// The unparsing of some logical operators might not be implemented yet, hence why `sql` is an + /// `Option`. + fn plan_created(&self, id: String, sql: Option) -> Result<()>; + + /// Called after the physical plan has been executed. + /// Receives the identifier, the EXPLAIN ANALYZE output, and the duration. + fn plan_executed( + &self, + id: String, + explain_result: RecordBatch, + duration_nanos: u128, + ) -> Result<()>; +} + +#[derive(Debug)] +pub struct DefaultPlanObserver { + output: String, + min_duration_ms: usize, + queries: DashMap, +} + +impl DefaultPlanObserver { + /// Creates a new `DefaultPlanObserver`. + /// * `output`: where to store the output of `auto_explain`, if enabled. + /// Possible values: + /// - `log::error` + /// - `log::warn` + /// - `log::info` + /// - `log::debug` + /// - `log::trace` + /// - a file path: creates the file if it does not exist, or appends to it if it does. + /// * `min_duration_ms`: only outputs the result if the execution duration is greater than or + /// equal to this value. + pub fn new(output: String, min_duration_ms: usize) -> Self { + Self { + output, + min_duration_ms, + queries: DashMap::new(), + } + } +} + +impl Default for DefaultPlanObserver { + fn default() -> Self { + Self { + output: "log::info".to_owned(), + min_duration_ms: 0, + queries: DashMap::new(), + } + } +} + +impl PlanObserver for DefaultPlanObserver { + fn plan_created(&self, id: String, sql: Option) -> Result<()> { + if let Some(sql) = sql { + self.queries.insert(id, sql); + } + Ok(()) + } + + fn plan_executed( + &self, + id: String, + explain_result: RecordBatch, + duration_nanos: u128, + ) -> Result<()> { + let sql = if let Some((_, sql)) = self.queries.remove(&id) { + sql + } else { + "-".to_string() + }; + + let duration_ms = (duration_nanos as f64) / 1e6; + if duration_ms < self.min_duration_ms as f64 { + return Ok(()); + } + + let analyze = pretty_format_batches(&[explain_result])?; + let message = + format!("QUERY: {sql}\nDURATION: {duration_ms:.3}ms\nEXPLAIN:\n{analyze}"); + + match self.output.as_str() { + "log::error" => log::error!("{message}"), + "log::warn" => log::warn!("{message}"), + "log::info" => log::info!("{message}"), + "log::debug" => log::debug!("{message}"), + "log::trace" => log::trace!("{message}"), + _ => { + let fd = &mut fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.output)?; + writeln!(fd, "{message}")?; + } + } + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 4aa78055daee3..56e78e63d238f 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -29,11 +29,13 @@ use crate::display::DisplayableExecutionPlan; use crate::metrics::MetricType; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use arrow::compute::concat_batches; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::instant::Instant; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; +use datafusion_execution::plan_observer::PlanObserver; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::PhysicalExpr; @@ -54,6 +56,10 @@ pub struct AnalyzeExec { /// The output schema for RecordBatches of this exec node schema: SchemaRef, cache: Arc, + /// Observer to call when the plan is executed (only set for the auto explain mode). + plan_observer: Option>, + /// Identifier to pass when calling the plan observer. + plan_id: Option, } impl AnalyzeExec { @@ -73,6 +79,8 @@ impl AnalyzeExec { input, schema, cache: Arc::new(cache), + plan_observer: None, + plan_id: None, } } @@ -103,6 +111,17 @@ impl AnalyzeExec { input.boundedness(), ) } + + /// Enables the auto explain mode. In this mode, the result of this Analyze operator is passed + /// to the [`PlanObserver`] received, while the child's output batches are returned instead. + pub fn enable_auto_explain( + &mut self, + plan_observer: Arc, + plan_id: String, + ) { + self.plan_observer = Some(plan_observer); + self.plan_id = Some(plan_id); + } } impl DisplayAs for AnalyzeExec { @@ -156,13 +175,20 @@ impl ExecutionPlan for AnalyzeExec { self: Arc, mut children: Vec>, ) -> Result> { - Ok(Arc::new(Self::new( + let mut plan = Self::new( self.verbose, self.show_statistics, self.metric_types.clone(), children.pop().unwrap(), Arc::clone(&self.schema), - ))) + ); + + if self.plan_observer.is_some() { + plan.plan_observer = self.plan_observer.clone(); + plan.plan_id = self.plan_id.clone(); + } + + Ok(Arc::new(plan)) } fn execute( @@ -203,14 +229,24 @@ impl ExecutionPlan for AnalyzeExec { // JoinSet that computes the overall row count and final // record batch let mut input_stream = builder.build(); + + let inner_schema = Arc::clone(&self.input.schema()); + let plan_observer = self.plan_observer.clone(); + let plan_id = self.plan_id.clone(); + let output = async move { + let mut batches = vec![]; let mut total_rows = 0; while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); + // in the auto_explain mode, store the input batches to later return them + if plan_observer.is_some() { + batches.push(batch); + } } let duration = Instant::now() - start; - create_output_batch( + let out = create_output_batch( verbose, show_statistics, total_rows, @@ -218,11 +254,28 @@ impl ExecutionPlan for AnalyzeExec { &captured_input, &captured_schema, &metric_types, - ) + )?; + + if let Some(plan_observer) = plan_observer { + plan_observer.plan_executed( + plan_id.unwrap_or_else(|| "".to_owned()), + out, + duration.as_nanos(), + )?; + concat_batches(&inner_schema, &batches).map_err(DataFusionError::from) + } else { + Ok(out) + } + }; + + let output_schema = if self.plan_observer.is_some() { + &self.input.schema() + } else { + &self.schema }; Ok(Box::pin(RecordBatchStreamAdapter::new( - Arc::clone(&self.schema), + Arc::clone(output_schema), futures::stream::once(output), ))) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 0630b8f174563..5085674cd4fd6 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -29,7 +29,6 @@ use crate::ExecutionPlan; use crate::common; use crate::execution_plan::{Boundedness, EmissionType}; use crate::memory::MemoryStream; -use crate::metrics::MetricsSet; use crate::stream::RecordBatchStreamAdapter; use crate::streaming::PartitionStream; use crate::{DisplayAs, DisplayFormatType, PlanProperties}; @@ -50,6 +49,7 @@ use datafusion_physical_expr::{ EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr, }; +use datafusion_physical_expr_common::metrics::MetricsSet; use futures::{Future, FutureExt}; pub mod exec; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index aeeb3481c76b9..738b2387d43d4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -276,6 +276,7 @@ datafusion.execution.target_partitions 7 datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false datafusion.explain.analyze_level dev +datafusion.explain.auto_explain false datafusion.explain.format indent datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false @@ -416,6 +417,7 @@ datafusion.execution.target_partitions 7 Number of partitions for query executio datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. +datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the [`PlanObserver`] are called. By default, the [`SessionContext`] uses the [`DefaultPlanObserver`] implementation. datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6f6d5b205877f..d97b635cde633 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -176,6 +176,7 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | +| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the [`PlanObserver`] are called. By default, the [`SessionContext`] uses the [`DefaultPlanObserver`] implementation. | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. |