From fecb8d5400fc0654d1408ad092507e4f37fde162 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 14 Dec 2025 11:22:26 +0000 Subject: [PATCH 01/13] feat: Add auto_explain mode --- Cargo.lock | 1 + datafusion/common/src/config.rs | 13 ++ datafusion/core/src/physical_planner.rs | 35 ++- datafusion/physical-plan/Cargo.toml | 1 + datafusion/physical-plan/src/analyze.rs | 219 +++++++++++++++++- datafusion/physical-plan/src/test.rs | 5 - .../test_files/explain_analyze.slt | 20 ++ .../test_files/information_schema.slt | 6 + docs/source/user-guide/configs.md | 3 + 9 files changed, 293 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 08198cc49b72c..2899544f9c143 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2569,6 +2569,7 @@ dependencies = [ "rand 0.9.2", "rstest", "rstest_reuse", + "tempfile", "tokio", ] diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 39cf7a9855de4..bb82ecf760ce6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1074,6 +1074,19 @@ config_namespace! { /// "summary" shows common metrics for high-level insights. /// "dev" provides deep operator-level introspection for developers. pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev + + /// Whether to enable the `auto_explain` mode. In this mode, the execution plan is + /// automatically written to the location set by `auto_explain_output`, if the plan's total + /// duration is greater ot equal to `auto_explain_min_duration`, in milliseconds. + pub auto_explain: bool, default = false + + /// Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file + /// path (file is created if it does not exist; plans are appended to the file). + pub auto_explain_output: String, default = "stdout".to_owned() + + /// In the `auto_explain` mode, only output plans if their duration is bigger than this + /// value, in milliseconds. + pub auto_explain_min_duration: usize, default = 0 } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a942b5de41899..7e18de6bc44e6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -197,7 +197,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .create_initial_plan(logical_plan, session_state) .await?; - self.optimize_physical_plan(plan, session_state, |_, _| {}) + let mut plan = self.optimize_physical_plan(plan, session_state, |_, _| {})?; + + if session_state.config().options().explain.auto_explain { + plan = self.add_auto_explain(plan, session_state); + } + + Ok(plan) } /// Create a physical expression from a logical expression @@ -1597,6 +1603,33 @@ impl DefaultPhysicalPlanner { )) } } + + /// Returns a new plan wrapped in an `AnalyzeExec` auto_explain operator. + fn add_auto_explain( + &self, + plan: Arc, + session_state: &SessionState, + ) -> Arc { + let options = session_state.config().options(); + let show_statistics = options.explain.show_statistics; + let analyze_level = options.explain.analyze_level; + let metric_types = match analyze_level { + ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY], + ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV], + }; + + let mut plan = AnalyzeExec::new( + false, + show_statistics, + metric_types, + plan, + LogicalPlan::explain_schema(), + ); + plan.enable_auto_explain(); + plan.set_auto_explain_output(options.explain.auto_explain_output.clone()); + plan.set_auto_explain_min_duration(options.explain.auto_explain_min_duration); + Arc::new(plan) + } } /// Expand and align a GROUPING SET expression. diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index bc1ce68532ecc..40d3d5da19979 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -79,6 +79,7 @@ insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" +tempfile = { workspace = true } tokio = { workspace = true, features = [ "rt-multi-thread", "fs", diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 01f997f23d6a9..08ab4d2197090 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -18,6 +18,8 @@ //! Defines the ANALYZE operator use std::any::Any; +use std::fs::OpenOptions; +use std::io::{self, Write}; use std::sync::Arc; use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; @@ -29,6 +31,8 @@ use crate::display::DisplayableExecutionPlan; use crate::metrics::MetricType; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use arrow::compute::concat_batches; +use arrow::util::pretty::pretty_format_batches; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::instant::Instant; use datafusion_common::{assert_eq_or_internal_err, DataFusionError, Result}; @@ -52,6 +56,19 @@ pub struct AnalyzeExec { /// The output schema for RecordBatches of this exec node schema: SchemaRef, cache: PlanProperties, + /// Whether the operator is executing in the `auto_explain` mode. + /// In this mode, the underlying records are returned as they would without the `AnalyzeExec`, + /// while the "explain analyze" output is sent to `auto_explain_output`. (default=false) + auto_explain: bool, + /// Where to store the output of `auto_explain`, if enabled. + /// Possible values: + /// - stdout (default) + /// - stderr + /// - *path to file* (creates if it does not exist; appends to it if it does) + auto_explain_output: String, + /// In the `auto_explain` mode, only output if the execution is greater or equal to this value, + /// in milliseconds. (default=0) + auto_explain_min_duration: usize, } impl AnalyzeExec { @@ -71,6 +88,9 @@ impl AnalyzeExec { input, schema, cache, + auto_explain: false, + auto_explain_output: "stdout".to_owned(), + auto_explain_min_duration: 0, } } @@ -101,6 +121,20 @@ impl AnalyzeExec { input.boundedness(), ) } + + pub fn enable_auto_explain(&mut self) { + self.auto_explain = true; + self.cache = + Self::compute_properties(&self.input, Arc::clone(&self.input.schema())); + } + + pub fn set_auto_explain_output(&mut self, value: String) { + self.auto_explain_output = value + } + + pub fn set_auto_explain_min_duration(&mut self, value: usize) { + self.auto_explain_min_duration = value + } } impl DisplayAs for AnalyzeExec { @@ -194,14 +228,22 @@ impl ExecutionPlan for AnalyzeExec { // JoinSet that computes the overall row count and final // record batch let mut input_stream = builder.build(); + + let inner_schema = Arc::clone(&self.input.schema()); + let auto_explain = self.auto_explain; + let auto_explain_output = self.auto_explain_output.clone(); + let auto_explain_min_duration = self.auto_explain_min_duration; + let output = async move { + let mut batches = vec![]; let mut total_rows = 0; while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); + batches.push(batch); } let duration = Instant::now() - start; - create_output_batch( + let out = create_output_batch( verbose, show_statistics, total_rows, @@ -209,11 +251,26 @@ impl ExecutionPlan for AnalyzeExec { &captured_input, &captured_schema, &metric_types, - ) + )?; + + if auto_explain { + if duration.as_millis() >= auto_explain_min_duration as u128 { + export_auto_explain(out, &auto_explain_output)?; + } + concat_batches(&inner_schema, &batches).map_err(DataFusionError::from) + } else { + Ok(out) + } + }; + + let output_schema = if self.auto_explain { + &self.input.schema() + } else { + &self.schema }; Ok(Box::pin(RecordBatchStreamAdapter::new( - Arc::clone(&self.schema), + Arc::clone(output_schema), futures::stream::once(output), ))) } @@ -271,19 +328,42 @@ fn create_output_batch( .map_err(DataFusionError::from) } +fn export_auto_explain(batch: RecordBatch, output: &str) -> Result<()> { + let fd: &mut dyn Write = match output { + "stdout" => &mut io::stdout(), + "stderr" => &mut io::stderr(), + _ => &mut OpenOptions::new().create(true).append(true).open(output)?, + }; + + let formatted = pretty_format_batches(&[batch])?; + writeln!(fd, "{formatted}")?; + + Ok(()) +} + #[cfg(test)] mod tests { + use std::fs; + use super::*; use crate::{ collect, test::{ assert_is_pending, exec::{assert_strong_count_converges_to_zero, BlockingExec}, + TestMemoryExec, }, }; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::{ + array::StringArray, + datatypes::{DataType, Field, Schema}, + }; + use datafusion_common::test_util::batches_to_string; + use datafusion_expr::LogicalPlan; use futures::FutureExt; + use insta::assert_snapshot; + use tempfile::TempDir; #[tokio::test] async fn test_drop_cancel() -> Result<()> { @@ -310,4 +390,135 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_auto_explain() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Utf8, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(StringArray::from(vec![ + Some("k1"), + Some("k2"), + Some("k3"), + ]))], + )?; + let exec_plan: Arc = Arc::new(TestMemoryExec::try_new( + &[vec![batch]], + Arc::clone(&schema), + None, + )?); + + let mut analyze_exec = AnalyzeExec::new( + false, + true, + vec![MetricType::SUMMARY, MetricType::DEV], + exec_plan, + LogicalPlan::explain_schema(), + ); + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("auto_explain_output.txt"); + analyze_exec.enable_auto_explain(); + analyze_exec.set_auto_explain_output( + output_path.as_os_str().to_str().unwrap().to_owned(), + ); + + // check that the original output remains the same + let result = + collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; + assert_snapshot!( + batches_to_string(&result), + @r" + +----+ + | k | + +----+ + | k1 | + | k2 | + | k3 | + +----+ + " + ); + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | + | | | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + "); + + // check that with a longer runtime a new plan is not printed + analyze_exec.set_auto_explain_min_duration(1000000); + collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | + | | | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + "); + + // test again with the default min duration + analyze_exec.set_auto_explain_min_duration(0); + collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | + | | | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | + | | | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + "); + + // check that auto_explain uses the pre-defined analyze parameters + analyze_exec.show_statistics = false; + collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; + analyze_exec.verbose = true; + collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; + let complete_output = fs::read_to_string(&output_path)?; + // remove the duration since it's not deterministic + let output_without_duration = + complete_output.rsplitn(4, '\n').nth(3).unwrap().to_string(); + assert_snapshot!(output_without_duration, + @r" + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | + | | | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | + | | | + +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + +-------------------+---------------------------------------------------------------+ + | plan_type | plan | + +-------------------+---------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] | + | | | + +-------------------+---------------------------------------------------------------+ + +------------------------+---------------------------------------------------------------+ + | plan_type | plan | + +------------------------+---------------------------------------------------------------+ + | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] | + | | | + | Plan with Full Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] | + | | | + | Output Rows | 3 | + "); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index e3b22611f4deb..4d3a46c54d6d5 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -28,7 +28,6 @@ use std::task::Context; use crate::common; use crate::execution_plan::{Boundedness, EmissionType}; use crate::memory::MemoryStream; -use crate::metrics::MetricsSet; use crate::stream::RecordBatchStreamAdapter; use crate::streaming::PartitionStream; use crate::ExecutionPlan; @@ -165,10 +164,6 @@ impl ExecutionPlan for TestMemoryExec { self.open(partition, context) } - fn metrics(&self) -> Option { - unimplemented!() - } - fn statistics(&self) -> Result { self.statistics_inner() } diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index b213cd9565c86..b127495d12a75 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -25,3 +25,23 @@ Plan with Metrics LazyMemoryExec: partitions=1, batch_generators=[generate_serie statement ok reset datafusion.explain.analyze_level; + + +# test auto_explain + +statement ok +set datafusion.explain.auto_explain_output = 'test_files/scratch/auto_explain.txt'; + +statement ok +set datafusion.explain.auto_explain = true; + +query TI +SELECT 'k' k, 1 v; +---- +k 1 + +statement ok +reset datafusion.explain.auto_explain; + +statement ok +reset datafusion.explain.auto_explain_output; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5e478de0416ce..28446cc89c760 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -273,6 +273,9 @@ datafusion.execution.target_partitions 7 datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false datafusion.explain.analyze_level dev +datafusion.explain.auto_explain false +datafusion.explain.auto_explain_min_duration 0 +datafusion.explain.auto_explain_output stdout datafusion.explain.format indent datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false @@ -401,6 +404,9 @@ datafusion.execution.target_partitions 7 Number of partitions for query executio datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. +datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater ot equal to `auto_explain_min_duration`, in milliseconds. +datafusion.explain.auto_explain_min_duration 0 In the `auto_explain` mode, only output plans if their duration is bigger than this value, in milliseconds. +datafusion.explain.auto_explain_output stdout Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 77d6ff8be97ed..0519de383469c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -166,6 +166,9 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | +| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater ot equal to `auto_explain_min_duration`, in milliseconds. | +| datafusion.explain.auto_explain_output | stdout | Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). | +| datafusion.explain.auto_explain_min_duration | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than this value, in milliseconds. | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | From 1a8d6f853b3e241900d1c78fbc540c6c9d571056 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 14 Dec 2025 11:40:30 +0000 Subject: [PATCH 02/13] Fix warnings --- datafusion/common/src/config.rs | 2 +- datafusion/physical-plan/src/analyze.rs | 5 ++--- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- docs/source/user-guide/configs.md | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 76704da9c70cb..896132e5b2d49 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1120,7 +1120,7 @@ config_namespace! { /// Whether to enable the `auto_explain` mode. In this mode, the execution plan is /// automatically written to the location set by `auto_explain_output`, if the plan's total - /// duration is greater ot equal to `auto_explain_min_duration`, in milliseconds. + /// duration is greater or equal to `auto_explain_min_duration`, in milliseconds. pub auto_explain: bool, default = false /// Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 44cec7a68db43..685252e1f03d1 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -349,9 +349,8 @@ mod tests { use crate::{ collect, test::{ - assert_is_pending, - exec::{assert_strong_count_converges_to_zero, BlockingExec}, - TestMemoryExec, + TestMemoryExec, assert_is_pending, + exec::{BlockingExec, assert_strong_count_converges_to_zero}, }, }; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index f020dc7a64be9..2cfa0f0f634ab 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -407,7 +407,7 @@ datafusion.execution.target_partitions 7 Number of partitions for query executio datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. -datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater ot equal to `auto_explain_min_duration`, in milliseconds. +datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration`, in milliseconds. datafusion.explain.auto_explain_min_duration 0 In the `auto_explain` mode, only output plans if their duration is bigger than this value, in milliseconds. datafusion.explain.auto_explain_output stdout Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 82bc29b21d316..a8ab8ffa88aaa 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -169,7 +169,7 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | -| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater ot equal to `auto_explain_min_duration`, in milliseconds. | +| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration`, in milliseconds. | | datafusion.explain.auto_explain_output | stdout | Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). | | datafusion.explain.auto_explain_min_duration | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than this value, in milliseconds. | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | From fb295d0c23258d3c6ef2ae039658861c442a0f72 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Mon, 15 Dec 2025 19:32:36 +0000 Subject: [PATCH 03/13] Avoid storing batches in AnalyzeExec when not needed --- datafusion/physical-plan/src/analyze.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 685252e1f03d1..25794b1fa6869 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -239,7 +239,10 @@ impl ExecutionPlan for AnalyzeExec { let mut total_rows = 0; while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); - batches.push(batch); + // in the auto_explain mode, store the input batches to later return them + if auto_explain { + batches.push(batch); + } } let duration = Instant::now() - start; From ae2d6a1159c9dbd5b79adb5c40d999ee9b5e47fb Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Mon, 15 Dec 2025 19:37:05 +0000 Subject: [PATCH 04/13] Keep auto_explain properties in with_new_children --- datafusion/physical-plan/src/analyze.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 25794b1fa6869..0c12200c88217 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -181,13 +181,21 @@ impl ExecutionPlan for AnalyzeExec { self: Arc, mut children: Vec>, ) -> Result> { - Ok(Arc::new(Self::new( + let mut plan = Self::new( self.verbose, self.show_statistics, self.metric_types.clone(), children.pop().unwrap(), Arc::clone(&self.schema), - ))) + ); + + if self.auto_explain { + plan.enable_auto_explain(); + plan.set_auto_explain_output(self.auto_explain_output.clone()); + plan.set_auto_explain_min_duration(self.auto_explain_min_duration); + } + + Ok(Arc::new(plan)) } fn execute( From 0bac7ba40de0eed9408eea2f95d7c23fdfc07e20 Mon Sep 17 00:00:00 2001 From: Nuno Faria Date: Mon, 15 Dec 2025 19:39:44 +0000 Subject: [PATCH 05/13] Update datafusion/common/src/config.rs Co-authored-by: Martin Grigorov --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 896132e5b2d49..e73e0b75d0733 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1127,7 +1127,7 @@ config_namespace! { /// path (file is created if it does not exist; plans are appended to the file). pub auto_explain_output: String, default = "stdout".to_owned() - /// In the `auto_explain` mode, only output plans if their duration is bigger than this + /// In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this /// value, in milliseconds. pub auto_explain_min_duration: usize, default = 0 } From 27d16620fd9d519814d08ee1cacd363d607fd426 Mon Sep 17 00:00:00 2001 From: Nuno Faria Date: Mon, 15 Dec 2025 19:49:11 +0000 Subject: [PATCH 06/13] Update datafusion/physical-plan/src/analyze.rs Co-authored-by: Martin Grigorov --- datafusion/physical-plan/src/analyze.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 0c12200c88217..8d45657ca0c71 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -132,7 +132,7 @@ impl AnalyzeExec { self.auto_explain_output = value } - pub fn set_auto_explain_min_duration(&mut self, value: usize) { + pub fn set_auto_explain_min_duration_ms(&mut self, value: usize) { self.auto_explain_min_duration = value } } From f574b4f2c970a4dee4adfc91222f407d6f2f190f Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Mon, 15 Dec 2025 20:04:27 +0000 Subject: [PATCH 07/13] Refactor auto_explain_min_duration to auto_explain_min_duration_ms --- datafusion/common/src/config.rs | 8 ++++---- datafusion/core/src/physical_planner.rs | 4 +++- datafusion/physical-plan/src/analyze.rs | 16 ++++++++-------- .../test_files/information_schema.slt | 6 +++--- docs/source/user-guide/configs.md | 4 ++-- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e73e0b75d0733..4a45be02d0fbf 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1120,16 +1120,16 @@ config_namespace! { /// Whether to enable the `auto_explain` mode. In this mode, the execution plan is /// automatically written to the location set by `auto_explain_output`, if the plan's total - /// duration is greater or equal to `auto_explain_min_duration`, in milliseconds. + /// duration is greater or equal to `auto_explain_min_duration_ms`. pub auto_explain: bool, default = false /// Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file /// path (file is created if it does not exist; plans are appended to the file). pub auto_explain_output: String, default = "stdout".to_owned() - /// In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this - /// value, in milliseconds. - pub auto_explain_min_duration: usize, default = 0 + /// In the `auto_explain` mode, only output plans if their duration is bigger than or equal + /// to this value (milliseconds). + pub auto_explain_min_duration_ms: usize, default = 0 } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 17fd86eb23423..df0718683f979 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1627,7 +1627,9 @@ impl DefaultPhysicalPlanner { ); plan.enable_auto_explain(); plan.set_auto_explain_output(options.explain.auto_explain_output.clone()); - plan.set_auto_explain_min_duration(options.explain.auto_explain_min_duration); + plan.set_auto_explain_min_duration_ms( + options.explain.auto_explain_min_duration_ms, + ); Arc::new(plan) } } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 8d45657ca0c71..c2bb5254e09a9 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -68,7 +68,7 @@ pub struct AnalyzeExec { auto_explain_output: String, /// In the `auto_explain` mode, only output if the execution is greater or equal to this value, /// in milliseconds. (default=0) - auto_explain_min_duration: usize, + auto_explain_min_duration_ms: usize, } impl AnalyzeExec { @@ -90,7 +90,7 @@ impl AnalyzeExec { cache, auto_explain: false, auto_explain_output: "stdout".to_owned(), - auto_explain_min_duration: 0, + auto_explain_min_duration_ms: 0, } } @@ -133,7 +133,7 @@ impl AnalyzeExec { } pub fn set_auto_explain_min_duration_ms(&mut self, value: usize) { - self.auto_explain_min_duration = value + self.auto_explain_min_duration_ms = value } } @@ -192,7 +192,7 @@ impl ExecutionPlan for AnalyzeExec { if self.auto_explain { plan.enable_auto_explain(); plan.set_auto_explain_output(self.auto_explain_output.clone()); - plan.set_auto_explain_min_duration(self.auto_explain_min_duration); + plan.set_auto_explain_min_duration_ms(self.auto_explain_min_duration_ms); } Ok(Arc::new(plan)) @@ -240,7 +240,7 @@ impl ExecutionPlan for AnalyzeExec { let inner_schema = Arc::clone(&self.input.schema()); let auto_explain = self.auto_explain; let auto_explain_output = self.auto_explain_output.clone(); - let auto_explain_min_duration = self.auto_explain_min_duration; + let auto_explain_min_duration_ms = self.auto_explain_min_duration_ms; let output = async move { let mut batches = vec![]; @@ -265,7 +265,7 @@ impl ExecutionPlan for AnalyzeExec { )?; if auto_explain { - if duration.as_millis() >= auto_explain_min_duration as u128 { + if duration.as_millis() >= auto_explain_min_duration_ms as u128 { export_auto_explain(out, &auto_explain_output)?; } concat_batches(&inner_schema, &batches).map_err(DataFusionError::from) @@ -459,7 +459,7 @@ mod tests { "); // check that with a longer runtime a new plan is not printed - analyze_exec.set_auto_explain_min_duration(1000000); + analyze_exec.set_auto_explain_min_duration_ms(1000000); collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; assert_snapshot!(fs::read_to_string(&output_path)?, @r" @@ -472,7 +472,7 @@ mod tests { "); // test again with the default min duration - analyze_exec.set_auto_explain_min_duration(0); + analyze_exec.set_auto_explain_min_duration_ms(0); collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; assert_snapshot!(fs::read_to_string(&output_path)?, @r" diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2cfa0f0f634ab..ba371bb4c04ff 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -274,7 +274,7 @@ datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false datafusion.explain.analyze_level dev datafusion.explain.auto_explain false -datafusion.explain.auto_explain_min_duration 0 +datafusion.explain.auto_explain_min_duration_ms 0 datafusion.explain.auto_explain_output stdout datafusion.explain.format indent datafusion.explain.logical_plan_only false @@ -407,8 +407,8 @@ datafusion.execution.target_partitions 7 Number of partitions for query executio datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. -datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration`, in milliseconds. -datafusion.explain.auto_explain_min_duration 0 In the `auto_explain` mode, only output plans if their duration is bigger than this value, in milliseconds. +datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. +datafusion.explain.auto_explain_min_duration_ms 0 In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). datafusion.explain.auto_explain_output stdout Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index a8ab8ffa88aaa..2f5528c3afa00 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -169,9 +169,9 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | -| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration`, in milliseconds. | +| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. | | datafusion.explain.auto_explain_output | stdout | Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). | -| datafusion.explain.auto_explain_min_duration | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than this value, in milliseconds. | +| datafusion.explain.auto_explain_min_duration_ms | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | From 1743c00a6942fbf913f28aa48931aafe53068432 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 1 Feb 2026 11:21:34 +0000 Subject: [PATCH 08/13] Re-run CI --- datafusion/physical-plan/src/analyze.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c2bb5254e09a9..9c5d082d71c64 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -64,7 +64,7 @@ pub struct AnalyzeExec { /// Possible values: /// - stdout (default) /// - stderr - /// - *path to file* (creates if it does not exist; appends to it if it does) + /// - *path to file* (creates if it does not exist; or appends to it if it does) auto_explain_output: String, /// In the `auto_explain` mode, only output if the execution is greater or equal to this value, /// in milliseconds. (default=0) From 46feaf75e76ea240600ed9c54e117ce88c883d84 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 1 Feb 2026 11:54:16 +0000 Subject: [PATCH 09/13] Update configs.md --- docs/source/user-guide/configs.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..109fa41bd81ee 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -173,6 +173,9 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | +| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. | +| datafusion.explain.auto_explain_output | stdout | Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). | +| datafusion.explain.auto_explain_min_duration_ms | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | From 0397da3212ee0df529048c09efeb86c7a2033be3 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 8 Mar 2026 17:09:10 +0000 Subject: [PATCH 10/13] Use log instead of stdout/stderr --- datafusion/common/src/config.rs | 7 +++-- datafusion/physical-plan/src/analyze.rs | 29 ++++++++++++------- .../test_files/information_schema.slt | 4 +-- docs/source/user-guide/configs.md | 2 +- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3b0718d61259e..c4d329890eab2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1186,9 +1186,10 @@ config_namespace! { /// duration is greater or equal to `auto_explain_min_duration_ms`. pub auto_explain: bool, default = false - /// Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file - /// path (file is created if it does not exist; plans are appended to the file). - pub auto_explain_output: String, default = "stdout".to_owned() + /// Output location used in the `auto_explain` mode. Supports `log::error`, `log::warn`, + /// `log::info` (default), `log::debug`, `log::trace`, or a file path (file is created if it + /// does not exist; plans are appended to the file). + pub auto_explain_output: String, default = "log::info".to_owned() /// In the `auto_explain` mode, only output plans if their duration is bigger than or equal /// to this value (milliseconds). diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 9c5d082d71c64..22ec2dc36b964 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -19,7 +19,7 @@ use std::any::Any; use std::fs::OpenOptions; -use std::io::{self, Write}; +use std::io::Write; use std::sync::Arc; use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; @@ -62,8 +62,11 @@ pub struct AnalyzeExec { auto_explain: bool, /// Where to store the output of `auto_explain`, if enabled. /// Possible values: - /// - stdout (default) - /// - stderr + /// - log::error + /// - log::warn + /// - log::info (default) + /// - log::debug + /// - log::trace /// - *path to file* (creates if it does not exist; or appends to it if it does) auto_explain_output: String, /// In the `auto_explain` mode, only output if the execution is greater or equal to this value, @@ -89,7 +92,7 @@ impl AnalyzeExec { schema, cache, auto_explain: false, - auto_explain_output: "stdout".to_owned(), + auto_explain_output: "log::info".to_owned(), auto_explain_min_duration_ms: 0, } } @@ -340,14 +343,18 @@ fn create_output_batch( } fn export_auto_explain(batch: RecordBatch, output: &str) -> Result<()> { - let fd: &mut dyn Write = match output { - "stdout" => &mut io::stdout(), - "stderr" => &mut io::stderr(), - _ => &mut OpenOptions::new().create(true).append(true).open(output)?, - }; - let formatted = pretty_format_batches(&[batch])?; - writeln!(fd, "{formatted}")?; + match output { + "log::error" => log::error!("{formatted}"), + "log::warn" => log::warn!("{formatted}"), + "log::info" => log::info!("{formatted}"), + "log::debug" => log::debug!("{formatted}"), + "log::trace" => log::trace!("{formatted}"), + _ => { + let fd = &mut OpenOptions::new().create(true).append(true).open(output)?; + writeln!(fd, "{formatted}")?; + } + } Ok(()) } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 15cdc3a2e048c..502caa9d36d0f 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -277,7 +277,7 @@ datafusion.execution.use_row_number_estimates_to_optimize_partitioning false datafusion.explain.analyze_level dev datafusion.explain.auto_explain false datafusion.explain.auto_explain_min_duration_ms 0 -datafusion.explain.auto_explain_output stdout +datafusion.explain.auto_explain_output log::info datafusion.explain.format indent datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false @@ -417,7 +417,7 @@ datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Sho datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. datafusion.explain.auto_explain_min_duration_ms 0 In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). -datafusion.explain.auto_explain_output stdout Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). +datafusion.explain.auto_explain_output log::info Output location used in the `auto_explain` mode. Supports `log::error`, `log::warn`, `log::info` (default), `log::debug`, `log::trace`, or a file path (file is created if it does not exist; plans are appended to the file). datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 109fa41bd81ee..c34bc0a34d93c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -174,7 +174,7 @@ The following configuration settings are available: | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | | datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. | -| datafusion.explain.auto_explain_output | stdout | Output location used in the `auto_explain` mode. Supports `stdout`, `stderr`, or a file path (file is created if it does not exist; plans are appended to the file). | +| datafusion.explain.auto_explain_output | log::info | Output location used in the `auto_explain` mode. Supports `log::error`, `log::warn`, `log::info` (default), `log::debug`, `log::trace`, or a file path (file is created if it does not exist; plans are appended to the file). | | datafusion.explain.auto_explain_min_duration_ms | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | From 884afe707b865b9a4d7088f867bd18b28c2ecfaf Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 15 Mar 2026 12:12:08 +0000 Subject: [PATCH 11/13] Refactor to PlanObserver --- Cargo.lock | 1 - datafusion/common/src/config.rs | 15 +- datafusion/core/src/execution/context/mod.rs | 178 +++++++++++++ .../core/src/execution/session_state.rs | 14 ++ datafusion/core/src/physical_planner.rs | 25 +- datafusion/execution/src/lib.rs | 1 + datafusion/execution/src/plan_observer.rs | 110 ++++++++ datafusion/physical-plan/Cargo.toml | 1 - datafusion/physical-plan/src/analyze.rs | 237 +++--------------- .../test_files/explain_analyze.slt | 20 -- .../test_files/information_schema.slt | 6 +- docs/source/user-guide/configs.md | 4 +- 12 files changed, 357 insertions(+), 255 deletions(-) create mode 100644 datafusion/execution/src/plan_observer.rs diff --git a/Cargo.lock b/Cargo.lock index 87bdf47ac37c8..1f28687f4f839 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2479,7 +2479,6 @@ dependencies = [ "rand 0.9.2", "rstest", "rstest_reuse", - "tempfile", "tokio", ] diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c4d329890eab2..71d97f378c915 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1181,19 +1181,10 @@ config_namespace! { /// "dev" provides deep operator-level introspection for developers. pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev - /// Whether to enable the `auto_explain` mode. In this mode, the execution plan is - /// automatically written to the location set by `auto_explain_output`, if the plan's total - /// duration is greater or equal to `auto_explain_min_duration_ms`. + /// Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the + /// [`PlanObserver`] are called. By default, the [`SessionContext`] uses the + /// [`DefaultPlanObserver`] implementation. pub auto_explain: bool, default = false - - /// Output location used in the `auto_explain` mode. Supports `log::error`, `log::warn`, - /// `log::info` (default), `log::debug`, `log::trace`, or a file path (file is created if it - /// does not exist; plans are appended to the file). - pub auto_explain_output: String, default = "log::info".to_owned() - - /// In the `auto_explain` mode, only output plans if their duration is bigger than or equal - /// to this value (milliseconds). - pub auto_explain_min_duration_ms: usize, default = 0 } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b6c606ff467f9..abbcfc9b3de99 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -24,6 +24,7 @@ use std::time::Duration; use super::options::ReadOptions; use crate::datasource::dynamic_file::DynamicListTableFactory; +use crate::execution::plan_observer::PlanObserver; use crate::execution::session_state::SessionStateBuilder; use crate::{ catalog::listing_schema::ListingSchemaProvider, @@ -1884,6 +1885,14 @@ impl SessionContext { .write() .register_table_options_extension(extension) } + + /// Sets a new plan observer, used for the auto_explain feature. Note that the auto explain mode + /// needs to be enabled for the methods in [`PlanObserver`] to be called, through the + /// `datafusion.explain.auto_explain` config. + pub fn with_plan_observer(self, plan_observer: Arc) -> Self { + self.state.write().set_plan_observer(plan_observer); + self + } } impl FunctionRegistry for SessionContext { @@ -2162,7 +2171,11 @@ mod tests { use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::DataFusionError; + use datafusion_expr::{col, lit}; + use regex::Regex; use std::error::Error; + use std::fs; + use std::io::Write; use std::path::PathBuf; use datafusion_common::test_util::batches_to_string; @@ -2759,4 +2772,169 @@ mod tests { assert!(have.is_err()); } } + + #[derive(Debug)] + struct TestPlanObserver { + output: String, + } + + impl TestPlanObserver { + pub fn new(output: String) -> Self { + Self { output } + } + } + + impl PlanObserver for TestPlanObserver { + fn plan_created(&self, _id: String, sql: Option) -> Result<()> { + let Some(sql) = sql else { + return Ok(()); + }; + let fd = &mut fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.output)?; + writeln!(fd, "QUERY: {sql}")?; + Ok(()) + } + + fn plan_executed( + &self, + _id: String, + explain_result: RecordBatch, + _duration_nanos: u128, + ) -> Result<()> { + let analyze = arrow::util::pretty::pretty_format_batches(&[explain_result])? + .to_string(); + // the plan is simplified to become deterministic + let re = Regex::new(r"\s+\w+?Exec").unwrap(); + let plan = re + .find_iter(&analyze) + .map(|m| m.as_str()) + .collect::>() + .join("\n"); + + let fd = &mut fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.output)?; + writeln!(fd, "EXPLAIN:\n{plan}\n")?; + + Ok(()) + } + } + + #[tokio::test] + async fn test_auto_explain() -> Result<()> { + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("auto_explain_output.txt"); + let plan_observer = + TestPlanObserver::new(output_path.as_os_str().to_str().unwrap().to_owned()); + + let ctx = SessionContext::new().with_plan_observer(Arc::new(plan_observer)); + ctx.sql("create table t (k int, v int)") + .await? + .collect() + .await?; + ctx.sql("insert into t values (1, 10), (2, 20), (3, 30)") + .await? + .collect() + .await?; + + // without enabling the auto explain mode, the plan is not exported + ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5") + .await? + .collect() + .await?; + assert!(!output_path.exists()); + + // enabling the auto explain mode + ctx.sql("set datafusion.explain.auto_explain = true") + .await? + .collect() + .await?; + let result = ctx + .sql("select * from t where k = 1 or k = 2 order by v desc limit 5") + .await? + .collect() + .await?; + // query output is not affected + assert_snapshot!( + batches_to_string(&result), + @r" + +---+----+ + | k | v | + +---+----+ + | 2 | 20 | + | 1 | 10 | + +---+----+ + " + ); + // the first output comes from the "set" query + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + QUERY: SELECT * + EXPLAIN: + EmptyExec + + QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5 + EXPLAIN: + SortExec + FilterExec + DataSourceExec + "); + + // also works with the dataframe API + ctx.table("t") + .await? + .filter(col("v").lt(lit(11)))? + .select(vec![col("k")])? + .collect() + .await?; + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + QUERY: SELECT * + EXPLAIN: + EmptyExec + + QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5 + EXPLAIN: + SortExec + FilterExec + DataSourceExec + + QUERY: SELECT t.k FROM t WHERE (t.v < 11) + EXPLAIN: + FilterExec + DataSourceExec + "); + + // when disabling the auto explain mode the plan is again not exported + ctx.sql("set datafusion.explain.auto_explain = false") + .await? + .collect() + .await?; + ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5") + .await? + .collect() + .await?; + assert_snapshot!(fs::read_to_string(&output_path)?, + @r" + QUERY: SELECT * + EXPLAIN: + EmptyExec + + QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5 + EXPLAIN: + SortExec + FilterExec + DataSourceExec + + QUERY: SELECT t.k FROM t WHERE (t.v < 11) + EXPLAIN: + FilterExec + DataSourceExec + "); + + Ok(()) + } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9cee04ad3b2ca..89270cc486de3 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -29,6 +29,7 @@ use crate::datasource::file_format::FileFormatFactory; use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; +use crate::execution::plan_observer::{DefaultPlanObserver, PlanObserver}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; @@ -189,6 +190,8 @@ pub struct SessionState { /// Cache logical plans of prepared statements for later execution. /// Key is the prepared statement name. prepared_plans: HashMap>, + /// Plan observer used for the auto explain mode (if enabled). + plan_observer: Option>, } impl Debug for SessionState { @@ -963,6 +966,16 @@ impl SessionState { None => exec_err!("Prepared statement '{}' does not exist", name), } } + + /// Set a new plan observer. + pub(crate) fn set_plan_observer(&mut self, plan_observer: Arc) { + self.plan_observer = Some(plan_observer); + } + + /// Returns a ref to the current plan observer. + pub(crate) fn plan_observer(&self) -> &Option> { + &self.plan_observer + } } /// A builder to be used for building [`SessionState`]'s. Defaults will @@ -1502,6 +1515,7 @@ impl SessionStateBuilder { function_factory, cache_factory, prepared_plans: HashMap::new(), + plan_observer: Some(Arc::new(DefaultPlanObserver::default())), }; if let Some(file_formats) = file_formats { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 652793cd48f9d..921af89a23c6f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -27,6 +27,7 @@ use crate::datasource::physical_plan::{FileOutputMode, FileSinkConfig}; use crate::datasource::{DefaultTableSource, source_as_provider}; use crate::error::{DataFusionError, Result}; use crate::execution::context::{ExecutionProps, SessionState}; +use crate::execution::plan_observer::PlanObserver; use crate::logical_expr::utils::generate_sort_key; use crate::logical_expr::{ Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window, @@ -106,6 +107,7 @@ use datafusion_physical_plan::unnest::ListUnnest; use async_trait::async_trait; use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper}; +use datafusion_sql::unparser::Unparser; use futures::{StreamExt, TryStreamExt}; use itertools::{Itertools, multiunzip}; use log::debug; @@ -202,8 +204,19 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { let mut plan = self.optimize_physical_plan(plan, session_state, |_, _| {})?; - if session_state.config().options().explain.auto_explain { - plan = self.add_auto_explain(plan, session_state); + // setup the auto explain mode if necessary + if session_state.config().options().explain.auto_explain + && let Some(plan_observer) = &session_state.plan_observer() + { + let id = uuid::Uuid::new_v4().to_string(); + let sql = if let Ok(stmt) = Unparser::default().plan_to_sql(logical_plan) { + Some(stmt.to_string()) + } else { + None + }; + plan_observer.plan_created(id.clone(), sql)?; + plan = + self.add_auto_explain(plan, session_state, Arc::clone(plan_observer), id); } Ok(plan) @@ -1737,6 +1750,8 @@ impl DefaultPhysicalPlanner { &self, plan: Arc, session_state: &SessionState, + plan_observer: Arc, + id: String, ) -> Arc { let options = session_state.config().options(); let show_statistics = options.explain.show_statistics; @@ -1753,11 +1768,7 @@ impl DefaultPhysicalPlanner { plan, LogicalPlan::explain_schema(), ); - plan.enable_auto_explain(); - plan.set_auto_explain_output(options.explain.auto_explain_output.clone()); - plan.set_auto_explain_min_duration_ms( - options.explain.auto_explain_min_duration_ms, - ); + plan.enable_auto_explain(plan_observer, id); Arc::new(plan) } } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 1a8da9459ae10..2ed90221770f7 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -34,6 +34,7 @@ pub mod memory_pool; pub mod object_store; #[cfg(feature = "parquet_encryption")] pub mod parquet_encryption; +pub mod plan_observer; pub mod runtime_env; mod stream; mod task; diff --git a/datafusion/execution/src/plan_observer.rs b/datafusion/execution/src/plan_observer.rs new file mode 100644 index 0000000000000..15a19af707618 --- /dev/null +++ b/datafusion/execution/src/plan_observer.rs @@ -0,0 +1,110 @@ +use std::io::Write; +use std::{fmt::Debug, fs}; + +use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; +use dashmap::DashMap; +use datafusion_common::error::Result; + +/// Used to implement the auto_explain feature. +pub trait PlanObserver: Send + Sync + 'static + Debug { + /// Called after the physical plan has been created but before it has been executed. + /// Receives an identifier and a SQL representation of the query. + /// The unparsing of some logical operators might not be implemented yet, hence why `sql` is an + /// `Option`. + fn plan_created(&self, id: String, sql: Option) -> Result<()>; + + /// Called after the physical plan has been executed. + /// Receives the identifier, the EXPLAIN ANALYZE output, and the duration. + fn plan_executed( + &self, + id: String, + explain_result: RecordBatch, + duration_nanos: u128, + ) -> Result<()>; +} + +#[derive(Debug)] +pub struct DefaultPlanObserver { + output: String, + min_duration_ms: usize, + queries: DashMap, +} + +impl DefaultPlanObserver { + /// Creates a new `DefaultPlanObserver`. + /// * `output`: where to store the output of `auto_explain`, if enabled. + /// Possible values: + /// - `log::error` + /// - `log::warn` + /// - `log::info` + /// - `log::debug` + /// - `log::trace` + /// - a file path: creates the file if it does not exist, or appends to it if it does. + /// * `min_duration_ms`: only outputs the result if the execution duration is greater than or + /// equal to this value. + pub fn new(output: String, min_duration_ms: usize) -> Self { + Self { + output, + min_duration_ms, + queries: DashMap::new(), + } + } +} + +impl Default for DefaultPlanObserver { + fn default() -> Self { + Self { + output: "log::info".to_owned(), + min_duration_ms: 0, + queries: DashMap::new(), + } + } +} + +impl PlanObserver for DefaultPlanObserver { + fn plan_created(&self, id: String, sql: Option) -> Result<()> { + if let Some(sql) = sql { + self.queries.insert(id, sql); + } + Ok(()) + } + + fn plan_executed( + &self, + id: String, + explain_result: RecordBatch, + duration_nanos: u128, + ) -> Result<()> { + let sql = if let Some((_, sql)) = self.queries.remove(&id) { + sql + } else { + "-".to_string() + }; + + let duration_ms = (duration_nanos as f64) / 1e6; + if duration_ms < self.min_duration_ms as f64 { + return Ok(()); + } + + let analyze = pretty_format_batches(&[explain_result])?; + let message = + format!("QUERY: {sql}\nDURATION: {duration_ms:.3}ms\nEXPLAIN:\n{analyze}"); + + match self.output.as_str() { + "log::error" => log::error!("{message}"), + "log::warn" => log::warn!("{message}"), + "log::info" => log::info!("{message}"), + "log::debug" => log::debug!("{message}"), + "log::trace" => log::trace!("{message}"), + _ => { + let fd = &mut fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.output)?; + writeln!(fd, "{message}")?; + } + } + + Ok(()) + } +} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index b04f96c2ae15c..13f91fd7d4ea2 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -80,7 +80,6 @@ insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" -tempfile = { workspace = true } tokio = { workspace = true, features = [ "rt-multi-thread", "fs", diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 22ec2dc36b964..340f2d59655e5 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -18,8 +18,6 @@ //! Defines the ANALYZE operator use std::any::Any; -use std::fs::OpenOptions; -use std::io::Write; use std::sync::Arc; use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; @@ -32,11 +30,11 @@ use crate::metrics::MetricType; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::compute::concat_batches; -use arrow::util::pretty::pretty_format_batches; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::instant::Instant; use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; +use datafusion_execution::plan_observer::PlanObserver; use datafusion_physical_expr::EquivalenceProperties; use futures::StreamExt; @@ -56,22 +54,10 @@ pub struct AnalyzeExec { /// The output schema for RecordBatches of this exec node schema: SchemaRef, cache: PlanProperties, - /// Whether the operator is executing in the `auto_explain` mode. - /// In this mode, the underlying records are returned as they would without the `AnalyzeExec`, - /// while the "explain analyze" output is sent to `auto_explain_output`. (default=false) - auto_explain: bool, - /// Where to store the output of `auto_explain`, if enabled. - /// Possible values: - /// - log::error - /// - log::warn - /// - log::info (default) - /// - log::debug - /// - log::trace - /// - *path to file* (creates if it does not exist; or appends to it if it does) - auto_explain_output: String, - /// In the `auto_explain` mode, only output if the execution is greater or equal to this value, - /// in milliseconds. (default=0) - auto_explain_min_duration_ms: usize, + /// Observer to call when the plan is executed (only set for the auto explain mode). + plan_observer: Option>, + /// Identifier to pass when calling the plan observer. + plan_id: Option, } impl AnalyzeExec { @@ -91,9 +77,8 @@ impl AnalyzeExec { input, schema, cache, - auto_explain: false, - auto_explain_output: "log::info".to_owned(), - auto_explain_min_duration_ms: 0, + plan_observer: None, + plan_id: None, } } @@ -125,18 +110,15 @@ impl AnalyzeExec { ) } - pub fn enable_auto_explain(&mut self) { - self.auto_explain = true; - self.cache = - Self::compute_properties(&self.input, Arc::clone(&self.input.schema())); - } - - pub fn set_auto_explain_output(&mut self, value: String) { - self.auto_explain_output = value - } - - pub fn set_auto_explain_min_duration_ms(&mut self, value: usize) { - self.auto_explain_min_duration_ms = value + /// Enables the auto explain mode. In this mode, the result of this Analyze operator is passed + /// to the [`PlanObserver`] received, while the child's output batches are returned instead. + pub fn enable_auto_explain( + &mut self, + plan_observer: Arc, + plan_id: String, + ) { + self.plan_observer = Some(plan_observer); + self.plan_id = Some(plan_id); } } @@ -192,10 +174,9 @@ impl ExecutionPlan for AnalyzeExec { Arc::clone(&self.schema), ); - if self.auto_explain { - plan.enable_auto_explain(); - plan.set_auto_explain_output(self.auto_explain_output.clone()); - plan.set_auto_explain_min_duration_ms(self.auto_explain_min_duration_ms); + if self.plan_observer.is_some() { + plan.plan_observer = self.plan_observer.clone(); + plan.plan_id = self.plan_id.clone(); } Ok(Arc::new(plan)) @@ -241,9 +222,8 @@ impl ExecutionPlan for AnalyzeExec { let mut input_stream = builder.build(); let inner_schema = Arc::clone(&self.input.schema()); - let auto_explain = self.auto_explain; - let auto_explain_output = self.auto_explain_output.clone(); - let auto_explain_min_duration_ms = self.auto_explain_min_duration_ms; + let plan_observer = self.plan_observer.clone(); + let plan_id = self.plan_id.clone(); let output = async move { let mut batches = vec![]; @@ -251,7 +231,7 @@ impl ExecutionPlan for AnalyzeExec { while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); // in the auto_explain mode, store the input batches to later return them - if auto_explain { + if plan_observer.is_some() { batches.push(batch); } } @@ -267,17 +247,19 @@ impl ExecutionPlan for AnalyzeExec { &metric_types, )?; - if auto_explain { - if duration.as_millis() >= auto_explain_min_duration_ms as u128 { - export_auto_explain(out, &auto_explain_output)?; - } + if let Some(plan_observer) = plan_observer { + plan_observer.plan_executed( + plan_id.unwrap_or_else(|| "".to_owned()), + out, + duration.as_nanos(), + )?; concat_batches(&inner_schema, &batches).map_err(DataFusionError::from) } else { Ok(out) } }; - let output_schema = if self.auto_explain { + let output_schema = if self.plan_observer.is_some() { &self.input.schema() } else { &self.schema @@ -342,45 +324,19 @@ fn create_output_batch( .map_err(DataFusionError::from) } -fn export_auto_explain(batch: RecordBatch, output: &str) -> Result<()> { - let formatted = pretty_format_batches(&[batch])?; - match output { - "log::error" => log::error!("{formatted}"), - "log::warn" => log::warn!("{formatted}"), - "log::info" => log::info!("{formatted}"), - "log::debug" => log::debug!("{formatted}"), - "log::trace" => log::trace!("{formatted}"), - _ => { - let fd = &mut OpenOptions::new().create(true).append(true).open(output)?; - writeln!(fd, "{formatted}")?; - } - } - - Ok(()) -} - #[cfg(test)] mod tests { - use std::fs; - use super::*; use crate::{ collect, test::{ - TestMemoryExec, assert_is_pending, + assert_is_pending, exec::{BlockingExec, assert_strong_count_converges_to_zero}, }, }; - use arrow::{ - array::StringArray, - datatypes::{DataType, Field, Schema}, - }; - use datafusion_common::test_util::batches_to_string; - use datafusion_expr::LogicalPlan; + use arrow::datatypes::{DataType, Field, Schema}; use futures::FutureExt; - use insta::assert_snapshot; - use tempfile::TempDir; #[tokio::test] async fn test_drop_cancel() -> Result<()> { @@ -407,135 +363,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_auto_explain() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); - let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Utf8, false)])); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(StringArray::from(vec![ - Some("k1"), - Some("k2"), - Some("k3"), - ]))], - )?; - let exec_plan: Arc = Arc::new(TestMemoryExec::try_new( - &[vec![batch]], - Arc::clone(&schema), - None, - )?); - - let mut analyze_exec = AnalyzeExec::new( - false, - true, - vec![MetricType::SUMMARY, MetricType::DEV], - exec_plan, - LogicalPlan::explain_schema(), - ); - let tmp_dir = TempDir::new()?; - let output_path = tmp_dir.path().join("auto_explain_output.txt"); - analyze_exec.enable_auto_explain(); - analyze_exec.set_auto_explain_output( - output_path.as_os_str().to_str().unwrap().to_owned(), - ); - - // check that the original output remains the same - let result = - collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; - assert_snapshot!( - batches_to_string(&result), - @r" - +----+ - | k | - +----+ - | k1 | - | k2 | - | k3 | - +----+ - " - ); - assert_snapshot!(fs::read_to_string(&output_path)?, - @r" - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | - | | | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - "); - - // check that with a longer runtime a new plan is not printed - analyze_exec.set_auto_explain_min_duration_ms(1000000); - collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; - assert_snapshot!(fs::read_to_string(&output_path)?, - @r" - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | - | | | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - "); - - // test again with the default min duration - analyze_exec.set_auto_explain_min_duration_ms(0); - collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; - assert_snapshot!(fs::read_to_string(&output_path)?, - @r" - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | - | | | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | - | | | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - "); - - // check that auto_explain uses the pre-defined analyze parameters - analyze_exec.show_statistics = false; - collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; - analyze_exec.verbose = true; - collect(Arc::new(analyze_exec.clone()), Arc::clone(&task_ctx)).await?; - let complete_output = fs::read_to_string(&output_path)?; - // remove the duration since it's not deterministic - let output_without_duration = - complete_output.rsplitn(4, '\n').nth(3).unwrap().to_string(); - assert_snapshot!(output_without_duration, - @r" - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | - | | | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[], statistics=[Rows=Exact(3), Bytes=Exact(1160), [(Col[0]: Null=Exact(0))]] | - | | | - +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ - +-------------------+---------------------------------------------------------------+ - | plan_type | plan | - +-------------------+---------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] | - | | | - +-------------------+---------------------------------------------------------------+ - +------------------------+---------------------------------------------------------------+ - | plan_type | plan | - +------------------------+---------------------------------------------------------------+ - | Plan with Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] | - | | | - | Plan with Full Metrics | DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] | - | | | - | Output Rows | 3 | - "); - - Ok(()) - } } diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index 2117e07402cb0..e109b32a95ed1 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -66,23 +66,3 @@ Plan with Metrics statement ok reset datafusion.explain.analyze_level; - - -# test auto_explain - -statement ok -set datafusion.explain.auto_explain_output = 'test_files/scratch/auto_explain.txt'; - -statement ok -set datafusion.explain.auto_explain = true; - -query TI -SELECT 'k' k, 1 v; ----- -k 1 - -statement ok -reset datafusion.explain.auto_explain; - -statement ok -reset datafusion.explain.auto_explain_output; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 502caa9d36d0f..ca7a65607b12e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -276,8 +276,6 @@ datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false datafusion.explain.analyze_level dev datafusion.explain.auto_explain false -datafusion.explain.auto_explain_min_duration_ms 0 -datafusion.explain.auto_explain_output log::info datafusion.explain.format indent datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false @@ -415,9 +413,7 @@ datafusion.execution.target_partitions 7 Number of partitions for query executio datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. -datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. -datafusion.explain.auto_explain_min_duration_ms 0 In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). -datafusion.explain.auto_explain_output log::info Output location used in the `auto_explain` mode. Supports `log::error`, `log::warn`, `log::info` (default), `log::debug`, `log::trace`, or a file path (file is created if it does not exist; plans are appended to the file). +datafusion.explain.auto_explain false Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the [`PlanObserver`] are called. By default, the [`SessionContext`] uses the [`DefaultPlanObserver`] implementation. datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c34bc0a34d93c..d236d864258de 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -173,9 +173,7 @@ The following configuration settings are available: | datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | | datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | | datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | -| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the execution plan is automatically written to the location set by `auto_explain_output`, if the plan's total duration is greater or equal to `auto_explain_min_duration_ms`. | -| datafusion.explain.auto_explain_output | log::info | Output location used in the `auto_explain` mode. Supports `log::error`, `log::warn`, `log::info` (default), `log::debug`, `log::trace`, or a file path (file is created if it does not exist; plans are appended to the file). | -| datafusion.explain.auto_explain_min_duration_ms | 0 | In the `auto_explain` mode, only output plans if their duration is bigger than or equal to this value (milliseconds). | +| datafusion.explain.auto_explain | false | Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the [`PlanObserver`] are called. By default, the [`SessionContext`] uses the [`DefaultPlanObserver`] implementation. | | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | From 6ddec42e9ec00f5dc1ac1a4b19978aea714380ac Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 15 Mar 2026 12:39:08 +0000 Subject: [PATCH 12/13] Fix merge --- datafusion/core/src/execution/context/mod.rs | 2 +- datafusion/physical-plan/src/analyze.rs | 6 ++---- datafusion/physical-plan/src/test.rs | 6 +----- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b5cdca32693bf..a5e6286774e2f 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -2259,9 +2259,9 @@ mod tests { use arrow::datatypes::{DataType, TimeUnit}; use arrow_schema::FieldRef; use datafusion_common::DataFusionError; + use datafusion_common::datatype::DataTypeExt; use datafusion_expr::{col, lit}; use regex::Regex; - use datafusion_common::datatype::DataTypeExt; use std::error::Error; use std::fs; use std::io::Write; diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index a355347033d5d..56e78e63d238f 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -55,12 +55,11 @@ pub struct AnalyzeExec { pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node schema: SchemaRef, - cache: PlanProperties, + cache: Arc, /// Observer to call when the plan is executed (only set for the auto explain mode). plan_observer: Option>, /// Identifier to pass when calling the plan observer. plan_id: Option, - cache: Arc, } impl AnalyzeExec { @@ -79,10 +78,9 @@ impl AnalyzeExec { metric_types, input, schema, - cache, + cache: Arc::new(cache), plan_observer: None, plan_id: None, - cache: Arc::new(cache), } } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 3c24b42db8557..5085674cd4fd6 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -49,6 +49,7 @@ use datafusion_physical_expr::{ EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr, }; +use datafusion_physical_expr_common::metrics::MetricsSet; use futures::{Future, FutureExt}; pub mod exec; @@ -181,11 +182,6 @@ impl ExecutionPlan for TestMemoryExec { self.open(partition, context) } - fn statistics(&self) -> Result { - self.statistics_inner() - } - - fn partition_statistics(&self, partition: Option) -> Result { fn metrics(&self) -> Option { unimplemented!() } From 6eace4cc34ac9a57f4d4fbc443ccde2a7f760bd1 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 15 Mar 2026 12:41:50 +0000 Subject: [PATCH 13/13] Add missing license --- datafusion/execution/src/plan_observer.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/execution/src/plan_observer.rs b/datafusion/execution/src/plan_observer.rs index 15a19af707618..a7703b38220fb 100644 --- a/datafusion/execution/src/plan_observer.rs +++ b/datafusion/execution/src/plan_observer.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::io::Write; use std::{fmt::Debug, fs};