Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,11 @@ config_namespace! {
/// "summary" shows common metrics for high-level insights.
/// "dev" provides deep operator-level introspection for developers.
pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev

/// Whether to enable the `auto_explain` mode. In this mode, the callbacks defined by the
/// [`PlanObserver`] are called. By default, the [`SessionContext`] uses the
/// [`DefaultPlanObserver`] implementation.
pub auto_explain: bool, default = false
}
}

Expand Down
178 changes: 178 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Duration;

use super::options::ReadOptions;
use crate::datasource::dynamic_file::DynamicListTableFactory;
use crate::execution::plan_observer::PlanObserver;
use crate::execution::session_state::SessionStateBuilder;
use crate::{
catalog::listing_schema::ListingSchemaProvider,
Expand Down Expand Up @@ -1971,6 +1972,14 @@ impl SessionContext {
.write()
.register_table_options_extension(extension)
}

/// Sets a new plan observer, used for the auto_explain feature. Note that the auto explain mode
/// needs to be enabled for the methods in [`PlanObserver`] to be called, through the
/// `datafusion.explain.auto_explain` config.
pub fn with_plan_observer(self, plan_observer: Arc<dyn PlanObserver>) -> Self {
self.state.write().set_plan_observer(plan_observer);
self
}
}

impl FunctionRegistry for SessionContext {
Expand Down Expand Up @@ -2251,7 +2260,11 @@ mod tests {
use arrow_schema::FieldRef;
use datafusion_common::DataFusionError;
use datafusion_common::datatype::DataTypeExt;
use datafusion_expr::{col, lit};
use regex::Regex;
use std::error::Error;
use std::fs;
use std::io::Write;
use std::path::PathBuf;

use datafusion_common::test_util::batches_to_string;
Expand Down Expand Up @@ -2986,4 +2999,169 @@ mod tests {
assert!(have.unwrap_err().to_string().contains(MEMORY_LIMIT));
}
}

#[derive(Debug)]
struct TestPlanObserver {
output: String,
}

impl TestPlanObserver {
pub fn new(output: String) -> Self {
Self { output }
}
}

impl PlanObserver for TestPlanObserver {
fn plan_created(&self, _id: String, sql: Option<String>) -> Result<()> {
let Some(sql) = sql else {
return Ok(());
};
let fd = &mut fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.output)?;
writeln!(fd, "QUERY: {sql}")?;
Ok(())
}

fn plan_executed(
&self,
_id: String,
explain_result: RecordBatch,
_duration_nanos: u128,
) -> Result<()> {
let analyze = arrow::util::pretty::pretty_format_batches(&[explain_result])?
.to_string();
// the plan is simplified to become deterministic
let re = Regex::new(r"\s+\w+?Exec").unwrap();
let plan = re
.find_iter(&analyze)
.map(|m| m.as_str())
.collect::<Vec<_>>()
.join("\n");

let fd = &mut fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.output)?;
writeln!(fd, "EXPLAIN:\n{plan}\n")?;

Ok(())
}
}

#[tokio::test]
async fn test_auto_explain() -> Result<()> {
let tmp_dir = TempDir::new()?;
let output_path = tmp_dir.path().join("auto_explain_output.txt");
let plan_observer =
TestPlanObserver::new(output_path.as_os_str().to_str().unwrap().to_owned());

let ctx = SessionContext::new().with_plan_observer(Arc::new(plan_observer));
ctx.sql("create table t (k int, v int)")
.await?
.collect()
.await?;
ctx.sql("insert into t values (1, 10), (2, 20), (3, 30)")
.await?
.collect()
.await?;

// without enabling the auto explain mode, the plan is not exported
ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5")
.await?
.collect()
.await?;
assert!(!output_path.exists());

// enabling the auto explain mode
ctx.sql("set datafusion.explain.auto_explain = true")
.await?
.collect()
.await?;
let result = ctx
.sql("select * from t where k = 1 or k = 2 order by v desc limit 5")
.await?
.collect()
.await?;
// query output is not affected
assert_snapshot!(
batches_to_string(&result),
@r"
+---+----+
| k | v |
+---+----+
| 2 | 20 |
| 1 | 10 |
+---+----+
"
);
// the first output comes from the "set" query
assert_snapshot!(fs::read_to_string(&output_path)?,
@r"
QUERY: SELECT *
EXPLAIN:
EmptyExec

QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5
EXPLAIN:
SortExec
FilterExec
DataSourceExec
");

// also works with the dataframe API
ctx.table("t")
.await?
.filter(col("v").lt(lit(11)))?
.select(vec![col("k")])?
.collect()
.await?;
assert_snapshot!(fs::read_to_string(&output_path)?,
@r"
QUERY: SELECT *
EXPLAIN:
EmptyExec

QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5
EXPLAIN:
SortExec
FilterExec
DataSourceExec

QUERY: SELECT t.k FROM t WHERE (t.v < 11)
EXPLAIN:
FilterExec
DataSourceExec
");

// when disabling the auto explain mode the plan is again not exported
ctx.sql("set datafusion.explain.auto_explain = false")
.await?
.collect()
.await?;
ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5")
.await?
.collect()
.await?;
assert_snapshot!(fs::read_to_string(&output_path)?,
@r"
QUERY: SELECT *
EXPLAIN:
EmptyExec

QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5
EXPLAIN:
SortExec
FilterExec
DataSourceExec

QUERY: SELECT t.k FROM t WHERE (t.v < 11)
EXPLAIN:
FilterExec
DataSourceExec
");

Ok(())
}
}
14 changes: 14 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::datasource::file_format::FileFormatFactory;
use crate::datasource::provider_as_source;
use crate::execution::SessionStateDefaults;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::execution::plan_observer::{DefaultPlanObserver, PlanObserver};
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use arrow_schema::{DataType, FieldRef};
use datafusion_catalog::MemoryCatalogProviderList;
Expand Down Expand Up @@ -189,6 +190,8 @@ pub struct SessionState {
/// Cache logical plans of prepared statements for later execution.
/// Key is the prepared statement name.
prepared_plans: HashMap<String, Arc<PreparedPlan>>,
/// Plan observer used for the auto explain mode (if enabled).
plan_observer: Option<Arc<dyn PlanObserver>>,
}

impl Debug for SessionState {
Expand Down Expand Up @@ -963,6 +966,16 @@ impl SessionState {
None => exec_err!("Prepared statement '{}' does not exist", name),
}
}

/// Set a new plan observer.
pub(crate) fn set_plan_observer(&mut self, plan_observer: Arc<dyn PlanObserver>) {
self.plan_observer = Some(plan_observer);
}

/// Returns a ref to the current plan observer.
pub(crate) fn plan_observer(&self) -> &Option<Arc<dyn PlanObserver>> {
&self.plan_observer
}
}

/// A builder to be used for building [`SessionState`]'s. Defaults will
Expand Down Expand Up @@ -1503,6 +1516,7 @@ impl SessionStateBuilder {
function_factory,
cache_factory,
prepared_plans: HashMap::new(),
plan_observer: Some(Arc::new(DefaultPlanObserver::default())),
};

if let Some(file_formats) = file_formats {
Expand Down
48 changes: 47 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::datasource::physical_plan::{FileOutputMode, FileSinkConfig};
use crate::datasource::{DefaultTableSource, source_as_provider};
use crate::error::{DataFusionError, Result};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::execution::plan_observer::PlanObserver;
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_expr::{
Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window,
Expand Down Expand Up @@ -106,6 +107,7 @@ use datafusion_physical_plan::unnest::ListUnnest;

use async_trait::async_trait;
use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper};
use datafusion_sql::unparser::Unparser;
use futures::{StreamExt, TryStreamExt};
use itertools::{Itertools, multiunzip};
use log::debug;
Expand Down Expand Up @@ -274,7 +276,24 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
.create_initial_plan(logical_plan, session_state)
.await?;

self.optimize_physical_plan(plan, session_state, |_, _| {})
let mut plan = self.optimize_physical_plan(plan, session_state, |_, _| {})?;

// setup the auto explain mode if necessary
if session_state.config().options().explain.auto_explain
&& let Some(plan_observer) = &session_state.plan_observer()
{
let id = uuid::Uuid::new_v4().to_string();
let sql = if let Ok(stmt) = Unparser::default().plan_to_sql(logical_plan) {
Some(stmt.to_string())
} else {
None
};
plan_observer.plan_created(id.clone(), sql)?;
plan =
self.add_auto_explain(plan, session_state, Arc::clone(plan_observer), id);
}

Ok(plan)
}

/// Create a physical expression from a logical expression
Expand Down Expand Up @@ -1833,6 +1852,33 @@ impl DefaultPhysicalPlanner {
))
}
}

/// Returns a new plan wrapped in an `AnalyzeExec` auto_explain operator.
fn add_auto_explain(
&self,
plan: Arc<dyn ExecutionPlan>,
session_state: &SessionState,
plan_observer: Arc<dyn PlanObserver>,
id: String,
) -> Arc<dyn ExecutionPlan> {
let options = session_state.config().options();
let show_statistics = options.explain.show_statistics;
let analyze_level = options.explain.analyze_level;
let metric_types = match analyze_level {
ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
};

let mut plan = AnalyzeExec::new(
false,
show_statistics,
metric_types,
plan,
LogicalPlan::explain_schema(),
);
plan.enable_auto_explain(plan_observer, id);
Arc::new(plan)
}
}

/// Expand and align a GROUPING SET expression.
Expand Down
1 change: 1 addition & 0 deletions datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod memory_pool;
pub mod object_store;
#[cfg(feature = "parquet_encryption")]
pub mod parquet_encryption;
pub mod plan_observer;
pub mod runtime_env;
mod stream;
mod task;
Expand Down
Loading
Loading