diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/.gitignore b/rust/cube/cubesqlplanner/cubesqlplanner/.gitignore index fb75eef2592d8..f27dc183b3266 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/.gitignore +++ b/rust/cube/cubesqlplanner/cubesqlplanner/.gitignore @@ -11,3 +11,7 @@ node_modules /cubesql/egraph-debug-intermediate egraph-debug /cubesql/debug-qtrace + +# insta snapshot review artefacts +*.snap.new +*.pending-snap diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs deleted file mode 100644 index 85ef84e7b2c05..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs +++ /dev/null @@ -1,156 +0,0 @@ -use super::pretty_print::*; -use super::*; -use cubenativeutils::CubeError; -use std::rc::Rc; - -logical_source_enum!(AggregateMultipliedSubquerySource, [Cube, MeasureSubquery]); - -/// Subquery that aggregates a multiplied measure: a `keys_subquery` -/// produces the unique key set, a `source` (cube or -/// `MeasureSubquery`) supplies the values, optional -/// `dimension_subqueries` materialise sub-query dimensions, and -/// `pre_aggregation_override` lets a matched pre-aggregation -/// short-circuit the whole CTE. -pub struct AggregateMultipliedSubquery { - pub schema: Rc, - pub keys_subquery: Rc, - pub source: AggregateMultipliedSubquerySource, - pub dimension_subqueries: Vec>, - // When Some, physical builder short-circuits to this query instead of - // rendering the native multiplied-subquery SELECT. Set by the pre-aggregation - // optimizer when a matching pre-aggregation replaces this CTE. - pub pre_aggregation_override: Option>, -} - -impl LogicalNode for AggregateMultipliedSubquery { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::AggregateMultipliedSubquery(self.clone()) - } - - fn inputs(&self) -> Vec { - AggregateMultipliedSubqueryInputPacker::pack(self) - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - let AggregateMultipliedSubqueryInputUnPacker { - keys_subquery, - source, - dimension_subqueries, - pre_aggregation_override, - } = AggregateMultipliedSubqueryInputUnPacker::new(&self, &inputs)?; - - let result = Self { - schema: self.schema.clone(), - keys_subquery: keys_subquery.clone().into_logical_node()?, - source: self.source.with_plan_node(source.clone())?, - dimension_subqueries: dimension_subqueries - .iter() - .map(|itm| itm.clone().into_logical_node()) - .collect::, _>>()?, - pre_aggregation_override: match pre_aggregation_override { - Some(node) => Some(node.clone().into_logical_node()?), - None => None, - }, - }; - - Ok(Rc::new(result)) - } - - fn node_name(&self) -> &'static str { - "AggregateMultipliedSubquery" - } - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::AggregateMultipliedSubquery(item) = plan_node { - Ok(item) - } else { - Err(cast_error(&plan_node, "AggregateMultipliedSubquery")) - } - } -} - -pub struct AggregateMultipliedSubqueryInputPacker; - -impl AggregateMultipliedSubqueryInputPacker { - pub fn pack(aggregate: &AggregateMultipliedSubquery) -> Vec { - let mut result = vec![]; - result.push(aggregate.keys_subquery.as_plan_node()); - result.push(aggregate.source.as_plan_node()); - result.extend( - aggregate - .dimension_subqueries - .iter() - .map(|itm| itm.as_plan_node()), - ); - if let Some(override_query) = &aggregate.pre_aggregation_override { - result.push(override_query.as_plan_node()); - } - result - } -} - -pub struct AggregateMultipliedSubqueryInputUnPacker<'a> { - keys_subquery: &'a PlanNode, - source: &'a PlanNode, - dimension_subqueries: &'a [PlanNode], - pre_aggregation_override: Option<&'a PlanNode>, -} - -impl<'a> AggregateMultipliedSubqueryInputUnPacker<'a> { - pub fn new( - aggregate: &AggregateMultipliedSubquery, - inputs: &'a Vec, - ) -> Result { - check_inputs_len(&inputs, Self::inputs_len(aggregate), aggregate.node_name())?; - - let keys_subquery = &inputs[0]; - let source = &inputs[1]; - let dim_end = 2 + aggregate.dimension_subqueries.len(); - let dimension_subqueries = &inputs[2..dim_end]; - let pre_aggregation_override = if aggregate.pre_aggregation_override.is_some() { - Some(&inputs[dim_end]) - } else { - None - }; - - Ok(Self { - keys_subquery, - source, - dimension_subqueries, - pre_aggregation_override, - }) - } - - fn inputs_len(aggregate: &AggregateMultipliedSubquery) -> usize { - 2 + aggregate.dimension_subqueries.len() - + if aggregate.pre_aggregation_override.is_some() { - 1 - } else { - 0 - } - } -} - -impl PrettyPrint for AggregateMultipliedSubquery { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println("AggregateMultipliedSubquery: ", state); - let state = state.new_level(); - let details_state = state.new_level(); - result.println("schema:", &state); - self.schema.pretty_print(result, &details_state); - result.println("keys_subquery:", &state); - self.keys_subquery.pretty_print(result, &details_state); - result.println("source:", &state); - self.source.pretty_print(result, &details_state); - if !self.dimension_subqueries.is_empty() { - result.println("dimension_subqueries:", &state); - let details_state = state.new_level(); - for subquery in self.dimension_subqueries.iter() { - subquery.pretty_print(result, &details_state); - } - } - if let Some(override_query) = &self.pre_aggregation_override { - result.println("pre_aggregation_override:", &state); - override_query.pretty_print(result, &details_state); - } - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs deleted file mode 100644 index aca02901ce637..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs +++ /dev/null @@ -1,79 +0,0 @@ -use super::pretty_print::*; -use super::*; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::rc::Rc; - -/// Materialisation of a `sub_query: true` dimension: a subquery -/// that groups by the owning cube's primary keys and computes the -/// dimension's measure expression, then is joined back to the host -/// query on those keys. -pub struct DimensionSubQuery { - pub query: Rc, - pub primary_keys_dimensions: Vec>, - pub subquery_dimension: Rc, - pub measure_for_subquery_dimension: Rc, -} - -impl LogicalNode for DimensionSubQuery { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::DimensionSubQuery(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.query.as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 1, self.node_name())?; - let query = &inputs[0]; - Ok(Rc::new(Self { - query: query.clone().into_logical_node()?, - primary_keys_dimensions: self.primary_keys_dimensions.clone(), - subquery_dimension: self.subquery_dimension.clone(), - measure_for_subquery_dimension: self.measure_for_subquery_dimension.clone(), - })) - } - - fn node_name(&self) -> &'static str { - "DimensionSubQuery" - } - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::DimensionSubQuery(query) = plan_node { - Ok(query) - } else { - Err(cast_error(&plan_node, "DimensionSubQuery")) - } - } -} - -impl PrettyPrint for DimensionSubQuery { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println("DimensionSubQuery: ", state); - let state = state.new_level(); - let details_state = state.new_level(); - result.println(&format!("query: "), &state); - self.query.pretty_print(result, &details_state); - result.println( - &format!( - "-primary_keys_dimensions: {}", - print_symbols(&self.primary_keys_dimensions) - ), - &state, - ); - result.println( - &format!( - "-subquery_dimension: {}", - self.subquery_dimension.full_name() - ), - &state, - ); - result.println( - &format!( - "-measure_for_subquery_dimension: {}", - self.measure_for_subquery_dimension.full_name() - ), - &state, - ); - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs index 6d6a8d1fbaf57..552acb77d87b0 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs @@ -6,12 +6,20 @@ use typed_builder::TypedBuilder; /// Reference to a multi-stage CTE consumed by `FullKeyAggregate`: /// the CTE's name plus the symbols it exposes. -#[derive(TypedBuilder)] +#[derive(Clone, TypedBuilder)] pub struct MultiStageSubqueryRef { name: String, #[builder(default)] symbols: Vec>, schema: Rc, + /// True when the CTE behind this ref projects measures as ungrouped raw + /// columns (no aggregate wrap yet) — the consumer of this ref must + /// register an `ungrouped_measure_reference` for each measure symbol, + /// so its own outer SELECT wraps the column in the right aggregate. + /// Used by the aggregate-multiplied subquery shape: its MeasureSubquery + /// data input is ungrouped, while keys/regular-measure refs are not. + #[builder(default)] + is_ungrouped: bool, } impl MultiStageSubqueryRef { @@ -26,6 +34,10 @@ impl MultiStageSubqueryRef { pub fn schema(&self) -> &Rc { &self.schema } + + pub fn is_ungrouped(&self) -> bool { + self.is_ungrouped + } } impl PrettyPrint for MultiStageSubqueryRef { @@ -40,15 +52,26 @@ impl PrettyPrint for MultiStageSubqueryRef { } /// Top-level aggregating source that stitches together several -/// multi-stage / multi-fact CTEs into one keyed result. The -/// physical builder picks a join strategy from `multi_stage_subquery_refs` -/// and `use_full_join_and_coalesce`. +/// multi-stage / multi-fact CTEs into one keyed result. The physical +/// builder picks a join strategy from `data_inputs` and +/// `keys_subquery_ref` — when a keys CTE is present, joins go through +/// it on `join_keys`; otherwise data inputs are stitched directly. #[derive(Clone, TypedBuilder)] pub struct FullKeyAggregate { schema: Rc, - use_full_join_and_coalesce: bool, #[builder(default)] - multi_stage_subquery_refs: Vec>, + data_inputs: Vec>, + #[builder(default)] + keys_subquery_ref: Option>, + // Members used as the JOIN keys when stitching `data_queries` onto the + // keys source. When empty, defaults to `schema.all_dimensions()` — + // historical behaviour for the multi-stage flow, where outer dimensions + // are both projected and used as join columns. When non-empty, + // decouples "what to project" (schema) from "what to join on" — needed + // for the multiplied-measures flow where pk dimensions drive the join + // while outer dimensions ride along as payload. + #[builder(default)] + join_keys: Vec>, } impl FullKeyAggregate { @@ -56,19 +79,20 @@ impl FullKeyAggregate { &self.schema } - /// When true, multi-fact branches are stitched together via a - /// FULL OUTER JOIN over keys with COALESCE on dimension columns; - /// otherwise an INNER JOIN is used. - pub fn use_full_join_and_coalesce(&self) -> bool { - self.use_full_join_and_coalesce + pub fn data_inputs(&self) -> &Vec> { + &self.data_inputs } - pub fn multi_stage_subquery_refs(&self) -> &Vec> { - &self.multi_stage_subquery_refs + pub fn keys_subquery_ref(&self) -> &Option> { + &self.keys_subquery_ref + } + + pub fn join_keys(&self) -> &Vec> { + &self.join_keys } pub fn is_empty(&self) -> bool { - self.multi_stage_subquery_refs.is_empty() + self.data_inputs.is_empty() } } @@ -87,8 +111,9 @@ impl LogicalNode for FullKeyAggregate { Ok(Rc::new( Self::builder() .schema(self.schema().clone()) - .use_full_join_and_coalesce(self.use_full_join_and_coalesce()) - .multi_stage_subquery_refs(self.multi_stage_subquery_refs().clone()) + .data_inputs(self.data_inputs().clone()) + .keys_subquery_ref(self.keys_subquery_ref().clone()) + .join_keys(self.join_keys().clone()) .build(), )) } @@ -112,18 +137,21 @@ impl PrettyPrint for FullKeyAggregate { let details_state = state.new_level(); result.println(&format!("schema:"), &state); self.schema().pretty_print(result, &details_state); - result.println( - &format!( - "use_full_join_and_coalesce: {}", - self.use_full_join_and_coalesce() - ), - &state, - ); - if !self.multi_stage_subquery_refs().is_empty() { - result.println("multi_stage_subquery_refs:", &state); - for subquery_ref in self.multi_stage_subquery_refs().iter() { - subquery_ref.pretty_print(result, &details_state); + if !self.data_inputs().is_empty() { + result.println("data_inputs:", &state); + for input in self.data_inputs().iter() { + input.pretty_print(result, &details_state); } } + if let Some(keys_ref) = self.keys_subquery_ref() { + result.println("keys_subquery_ref:", &state); + keys_ref.pretty_print(result, &details_state); + } + if !self.join_keys.is_empty() { + result.println( + &format!("join_keys: {}", print_symbols(self.join_keys())), + &state, + ); + } } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs index 6910aa959bd29..67e244bd4ba8a 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs @@ -40,8 +40,6 @@ pub struct LogicalJoin { root: Option>, #[builder(default)] joins: Vec, - #[builder(default)] - dimension_subqueries: Vec>, } impl LogicalJoin { @@ -52,10 +50,6 @@ impl LogicalJoin { pub fn joins(&self) -> &Vec { &self.joins } - - pub fn dimension_subqueries(&self) -> &Vec> { - &self.dimension_subqueries - } } impl LogicalNode for LogicalJoin { @@ -68,11 +62,8 @@ impl LogicalNode for LogicalJoin { } fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - let LogicalJoinInputUnPacker { - root, - joins, - dimension_subqueries, - } = LogicalJoinInputUnPacker::new(&self, &inputs)?; + let LogicalJoinInputUnPacker { root, joins } = + LogicalJoinInputUnPacker::new(&self, &inputs)?; let root = if let Some(r) = root { Some(r.clone().into_logical_node()?) @@ -92,16 +83,7 @@ impl LogicalNode for LogicalJoin { }) .collect::, _>>()?; - let result = Self::builder() - .root(root) - .joins(joins) - .dimension_subqueries( - dimension_subqueries - .iter() - .map(|itm| itm.clone().into_logical_node()) - .collect::, _>>()?, - ) - .build(); + let result = Self::builder().root(root).joins(joins).build(); Ok(Rc::new(result)) } @@ -127,11 +109,6 @@ impl LogicalJoinInputPacker { result.push(root.as_plan_node()); } result.extend(join.joins().iter().map(|item| item.cube().as_plan_node())); - result.extend( - join.dimension_subqueries() - .iter() - .map(|item| item.as_plan_node()), - ); result } } @@ -139,7 +116,6 @@ impl LogicalJoinInputPacker { pub struct LogicalJoinInputUnPacker<'a> { root: Option<&'a PlanNode>, joins: &'a [PlanNode], - dimension_subqueries: &'a [PlanNode], } impl<'a> LogicalJoinInputUnPacker<'a> { @@ -156,17 +132,13 @@ impl<'a> LogicalJoinInputUnPacker<'a> { let joins_end = joins_start + join.joins().len(); let joins = &inputs[joins_start..joins_end]; - let dimension_subqueries = &inputs[joins_end..]; - Ok(Self { - root, - joins, - dimension_subqueries, - }) + Ok(Self { root, joins }) } fn inputs_len(join: &LogicalJoin) -> usize { - 1 + join.joins().len() + join.dimension_subqueries().len() + let root_len = if join.root.is_some() { 1 } else { 0 }; + root_len + join.joins().len() } } @@ -184,13 +156,6 @@ impl PrettyPrint for LogicalJoin { for join in self.joins().iter() { join.pretty_print(result, &state); } - if !self.dimension_subqueries().is_empty() { - result.println("dimension_subqueries:", &state); - let details_state = state.new_level(); - for subquery in self.dimension_subqueries().iter() { - subquery.pretty_print(result, &details_state); - } - } } else { result.println(&format!("Empty source"), state); } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs deleted file mode 100644 index 5b78c98fcc2bd..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs +++ /dev/null @@ -1,95 +0,0 @@ -use super::*; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::rc::Rc; -use typed_builder::TypedBuilder; - -/// Subquery that produces the primary-key set of `pk_cube` after -/// applying the query's filters. Used as the outer key set in -/// `AggregateMultipliedSubquery`: a measure subquery aggregates -/// values and is joined back against this set on the primary keys. -#[derive(Clone, TypedBuilder)] -pub struct KeysSubQuery { - pk_cube: Rc, - schema: Rc, - primary_keys_dimensions: Vec>, - filter: Rc, - source: Rc, -} - -impl KeysSubQuery { - pub fn pk_cube(&self) -> &Rc { - &self.pk_cube - } - pub fn schema(&self) -> &Rc { - &self.schema - } - pub fn primary_keys_dimensions(&self) -> &Vec> { - &self.primary_keys_dimensions - } - pub fn filter(&self) -> &Rc { - &self.filter - } - pub fn source(&self) -> &Rc { - &self.source - } -} - -impl LogicalNode for KeysSubQuery { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::KeysSubQuery(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.pk_cube.as_plan_node(), self.source.as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 2, self.node_name())?; - let pk_cube = &inputs[0]; - let source = &inputs[1]; - - let res = Self { - pk_cube: pk_cube.clone().into_logical_node()?, - schema: self.schema.clone(), - primary_keys_dimensions: self.primary_keys_dimensions.clone(), - filter: self.filter.clone(), - source: source.clone().into_logical_node()?, - }; - Ok(Rc::new(res)) - } - - fn node_name(&self) -> &'static str { - "KeysSubQuery" - } - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::KeysSubQuery(query) = plan_node { - Ok(query) - } else { - Err(cast_error(&plan_node, "KeysSubQuery")) - } - } -} - -impl PrettyPrint for KeysSubQuery { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println("KeysSubQuery: ", state); - let state = state.new_level(); - let details_state = state.new_level(); - result.println(&format!("pk_cube: {}", self.pk_cube.cube().name()), &state); - - result.println("schema:", &state); - self.schema.pretty_print(result, &details_state); - result.println( - &format!( - "-primary_keys_dimensions: {}", - print_symbols(&self.primary_keys_dimensions) - ), - &state, - ); - result.println("filters:", &state); - self.filter.pretty_print(result, &details_state); - result.println("source:", &state); - self.source.pretty_print(result, &details_state); - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_node.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_node.rs index 040be06612757..278ece66cf652 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_node.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_node.rs @@ -1,3 +1,4 @@ +use super::pretty_print::*; use super::*; use cubenativeutils::CubeError; use std::rc::Rc; @@ -26,15 +27,7 @@ pub enum PlanNode { LogicalJoin(Rc), FullKeyAggregate(Rc), PreAggregation(Rc), - AggregateMultipliedSubquery(Rc), Cube(Rc), - MeasureSubquery(Rc), - DimensionSubQuery(Rc), - KeysSubQuery(Rc), - MultiStageGetDateRange(Rc), - MultiStageLeafMeasure(Rc), - MultiStageMeasureCalculation(Rc), - MultiStageDimensionCalculation(Rc), MultiStageTimeSeries(Rc), MultiStageRollingWindow(Rc), LogicalMultiStageMember(Rc), @@ -48,15 +41,7 @@ macro_rules! match_plan_node { PlanNode::LogicalJoin($node) => $block, PlanNode::FullKeyAggregate($node) => $block, PlanNode::PreAggregation($node) => $block, - PlanNode::AggregateMultipliedSubquery($node) => $block, PlanNode::Cube($node) => $block, - PlanNode::MeasureSubquery($node) => $block, - PlanNode::DimensionSubQuery($node) => $block, - PlanNode::KeysSubQuery($node) => $block, - PlanNode::MultiStageGetDateRange($node) => $block, - PlanNode::MultiStageLeafMeasure($node) => $block, - PlanNode::MultiStageMeasureCalculation($node) => $block, - PlanNode::MultiStageDimensionCalculation($node) => $block, PlanNode::MultiStageTimeSeries($node) => $block, PlanNode::MultiStageRollingWindow($node) => $block, PlanNode::LogicalMultiStageMember($node) => $block, @@ -87,6 +72,38 @@ impl PlanNode { }); Ok(result) } + + /// Semantic classification — leaf/stage distinction independent of where + /// the node currently sits in the plan structure. + /// Returns `None` only for nodes that are pure plan scaffolding and do + /// not produce a SELECT-shaped result on their own. + pub fn multi_stage_kind(&self) -> Option { + match self { + // Leaves — produce a CTE from base sources, no multi-stage CTE deps. + // `Query` covers both true leaves and the aggregate-multiplied + // subquery shape (`FullKeyAggregate` over already-published + // KS/MS CTEs); the latter is conceptually a Stage but is + // structurally a Query at this point. + PlanNode::Query(_) | PlanNode::MultiStageTimeSeries(_) => Some(MultiStageKind::Leaf), + + PlanNode::MultiStageRollingWindow(_) => Some(MultiStageKind::Stage), + + // Pure plan scaffolding — never has a SELECT result on its own. + PlanNode::LogicalJoin(_) + | PlanNode::FullKeyAggregate(_) + | PlanNode::PreAggregation(_) + | PlanNode::Cube(_) + | PlanNode::LogicalMultiStageMember(_) => None, + } + } +} + +impl PrettyPrint for PlanNode { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match_plan_node!(self, node => { + node.pretty_print(result, state); + }); + } } pub(super) fn cast_error(plan_node: &PlanNode, target_type: &str) -> CubeError { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_query_modifers.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_query_modifers.rs index 3f2ac64831b67..273085f3bf90b 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_query_modifers.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_query_modifers.rs @@ -1,13 +1,38 @@ use super::*; +use crate::planner::planners::multi_stage::TimeShiftState; use crate::planner::query_properties::OrderByItem; +/// How the pre-aggregation optimizer should treat this Query when walking +/// the multi-stage tree. Derived from `Query.kind()`; not stored on +/// modifiers anymore. +#[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] +pub enum PreAggregationRewriteRole { + /// Try `try_rewrite_query` over this Query's own schema/filter + /// (regular leaf — top-level, regular_measures, etc.). + #[default] + Leaf, + /// Replace this whole subtree atomically by a pre-aggregation match + /// on schema + outer-query filter (aggregate-multiplied subquery). + WholeSubtree, + /// Intermediate machinery — walk through to descendants without + /// rewriting this Query itself (Stage Calculation). + PassThrough, + /// Raw fact source — never rewritten on its own; the rewrite unit is + /// the parent (MeasureSubquery shape). + NoRewrite, +} + /// Per-query modifiers that sit outside the result schema: paging, /// ordering, and the ungrouped flag. +#[derive(Default, Clone)] pub struct LogicalQueryModifiers { pub offset: Option, pub limit: Option, pub ungrouped: bool, pub order_by: Vec, + pub time_shifts: TimeShiftState, + pub render_measure_as_state: bool, + pub render_measure_for_ungrouped: bool, } impl PrettyPrint for LogicalQueryModifiers { @@ -33,5 +58,31 @@ impl PrettyPrint for LogicalQueryModifiers { ); } } + if !self.time_shifts.is_empty() { + result.println("time_shifts:", &state); + let details_state = state.new_level(); + for (_, time_shift) in self.time_shifts.dimensions_shifts.iter() { + result.println( + &format!( + "- {}: {}", + time_shift.dimension.full_name(), + if let Some(interval) = &time_shift.interval { + interval.to_sql() + } else if let Some(name) = &time_shift.name { + format!("{} (named)", name.to_string()) + } else { + "None".to_string() + } + ), + &details_state, + ); + } + } + if self.render_measure_as_state { + result.println("render_measure_as_state: true", &state); + } + if self.render_measure_for_ungrouped { + result.println("render_measure_for_ungrouped: true", &state); + } } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/measure_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/measure_subquery.rs deleted file mode 100644 index 518702de43221..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/measure_subquery.rs +++ /dev/null @@ -1,51 +0,0 @@ -use super::*; -use cubenativeutils::CubeError; -use std::rc::Rc; - -/// Inner aggregating subquery for a measure inside a multiplied -/// aggregate flow — one of the two `AggregateMultipliedSubquerySource` -/// variants (the other being a raw `Cube`). -pub struct MeasureSubquery { - pub schema: Rc, - pub source: Rc, -} - -impl LogicalNode for MeasureSubquery { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::MeasureSubquery(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.source.as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 1, self.node_name())?; - let source = &inputs[0]; - Ok(Rc::new(Self { - schema: self.schema.clone(), - source: source.clone().into_logical_node()?, - })) - } - - fn node_name(&self) -> &'static str { - "MeasureSubquery" - } - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::MeasureSubquery(query) = plan_node { - Ok(query) - } else { - Err(cast_error(&plan_node, "MeasureSubquery")) - } - } -} - -impl PrettyPrint for MeasureSubquery { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - let details_state = state.new_level(); - result.println("schema:", &state); - self.schema.pretty_print(result, &details_state); - result.println("source:", state); - self.source.pretty_print(result, &details_state); - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs index b7903fea5598c..c928d3d320e8b 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs @@ -7,42 +7,40 @@ #[macro_use] mod logical_source; -mod aggregate_multiplied_subquery; mod cube; -mod dimension_subquery; mod filter; mod full_key_aggregate; mod helper; mod join; -mod keys_subquery; mod logical_node; mod logical_query_modifers; -mod measure_subquery; +mod multi_stage_dimension; mod multistage; pub mod optimizers; +mod plan; mod pre_aggregation; pub mod pretty_print; mod query; +mod query_kind; mod query_source; mod schema; pub mod visitor; -pub use aggregate_multiplied_subquery::*; pub use cube::*; -pub use dimension_subquery::*; pub use filter::*; pub use full_key_aggregate::*; pub use helper::*; pub use join::*; -pub use keys_subquery::*; pub use logical_node::*; pub use logical_query_modifers::*; pub use logical_source::*; -pub use measure_subquery::*; +pub use multi_stage_dimension::*; pub use multistage::*; pub use optimizers::*; +pub use plan::*; pub use pre_aggregation::*; pub use pretty_print::*; pub use query::*; +pub use query_kind::*; pub use query_source::*; pub use schema::*; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multi_stage_dimension.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multi_stage_dimension.rs new file mode 100644 index 0000000000000..f2e2b70b79272 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multi_stage_dimension.rs @@ -0,0 +1,69 @@ +use super::pretty_print::*; +use super::LogicalSchema; +use crate::planner::MemberSymbol; +use std::rc::Rc; + +/// Lightweight reference to a top-level multi-stage CTE that materialises +/// a computed dimension. Unifies the former `DimensionSubQuery` (DSQ — +/// subquery-dim leaf body joined to a pk-cube by its primary keys) and +/// `StageDimensionCalc` (multi-stage dim body joined by outer +/// dimensions) under one descriptor. +/// +/// The CTE body lives in the surrounding `LogicalPlan.ctes` as a +/// `LogicalMultiStageMember`. This ref carries everything a consumer +/// needs to wire the CTE into its FROM and to resolve render +/// references for the exposed column — no body inside. +#[derive(Debug)] +pub struct MultiStageDimensionRef { + /// Stable CTE name. Matches the `LogicalMultiStageMember.name` that + /// holds the body on the surrounding `LogicalPlan.ctes`. + pub name: String, + /// Schema of the CTE body — used to resolve the column alias for + /// `dimension` during render. + pub schema: Rc, + /// How the consumer joins this CTE into its FROM. + pub join: MultiStageDimensionJoin, + /// The dimension this CTE represents — same symbol the body + /// projects and the outer scope reads from the joined CTE. + pub dimension: Rc, +} + +/// How a `MultiStageDimensionRef` CTE is joined into the consumer's +/// FROM. +#[derive(Clone, Debug)] +pub enum MultiStageDimensionJoin { + /// LEFT JOIN inside the cube-join chain, attached after `cube_name` + /// is joined in. Used when the computed dim is keyed by the cube's + /// own primary keys (the ex-DSQ pattern). + OnPrimaryKeys { + cube_name: String, + pk_dimensions: Vec>, + }, + /// LEFT JOIN after the whole join chain / FullKeyAggregate output, + /// keyed by the listed outer dimensions (the ex-multi-stage-dim + /// pattern). + OnOuterDimensions { dimensions: Vec> }, +} + +impl MultiStageDimensionJoin { + pub fn label(&self) -> &'static str { + match self { + Self::OnPrimaryKeys { .. } => "OnPrimaryKeys", + Self::OnOuterDimensions { .. } => "OnOuterDimensions", + } + } +} + +impl PrettyPrint for MultiStageDimensionRef { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println( + &format!( + "MultiStageDimensionRef `{}` -> {} ({})", + self.name, + self.dimension.full_name(), + self.join.label() + ), + state, + ); + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs deleted file mode 100644 index 50d2f477a360d..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs +++ /dev/null @@ -1,182 +0,0 @@ -use crate::logical_plan::*; -use crate::planner::query_properties::OrderByItem; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use itertools::Itertools; -use std::rc::Rc; -use typed_builder::TypedBuilder; - -/// Semantic category of a multi-stage measure CTE — drives how the -/// physical builder shapes the rendered expression. -#[derive(PartialEq, Clone)] -pub enum MultiStageCalculationType { - Rank, - Aggregate, - Calculate, -} - -impl ToString for MultiStageCalculationType { - fn to_string(&self) -> String { - match self { - MultiStageCalculationType::Rank => "Rank".to_string(), - MultiStageCalculationType::Aggregate => "Aggregate".to_string(), - MultiStageCalculationType::Calculate => "Calculate".to_string(), - } - } -} - -/// Which SQL window-function flavour, if any, the calculation -/// renders as. -#[derive(PartialEq, Clone)] -pub enum MultiStageCalculationWindowFunction { - Rank, - Window, - None, -} - -impl ToString for MultiStageCalculationWindowFunction { - fn to_string(&self) -> String { - match self { - MultiStageCalculationWindowFunction::Rank => "Rank".to_string(), - MultiStageCalculationWindowFunction::Window => "Window".to_string(), - MultiStageCalculationWindowFunction::None => "None".to_string(), - } - } -} - -/// Measure CTE in a multi-stage chain — wraps a `FullKeyAggregate` -/// source with the partition / window function / ordering decided -/// by `calculation_type`. -#[derive(TypedBuilder)] -pub struct MultiStageMeasureCalculation { - schema: Rc, - is_ungrouped: bool, - calculation_type: MultiStageCalculationType, - #[builder(default)] - partition_by: Vec>, - window_function_to_use: MultiStageCalculationWindowFunction, - #[builder(default)] - order_by: Vec, - source: Rc, -} - -impl MultiStageMeasureCalculation { - pub fn schema(&self) -> &Rc { - &self.schema - } - - pub fn is_ungrouped(&self) -> bool { - self.is_ungrouped - } - - pub fn calculation_type(&self) -> &MultiStageCalculationType { - &self.calculation_type - } - - pub fn partition_by(&self) -> &Vec> { - &self.partition_by - } - - pub fn window_function_to_use(&self) -> &MultiStageCalculationWindowFunction { - &self.window_function_to_use - } - - pub fn order_by(&self) -> &Vec { - &self.order_by - } - - pub fn source(&self) -> &Rc { - &self.source - } -} - -impl PrettyPrint for MultiStageMeasureCalculation { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println( - &format!( - "Measure Calculation: {}", - self.calculation_type().to_string() - ), - state, - ); - let state = state.new_level(); - let details_state = state.new_level(); - result.println("schema:", &state); - self.schema().pretty_print(result, &details_state); - if !self.partition_by().is_empty() { - result.println( - &format!( - "partition_by: {}", - self.partition_by().iter().map(|m| m.full_name()).join(", ") - ), - &state, - ); - } - if self.window_function_to_use() != &MultiStageCalculationWindowFunction::None { - result.println( - &format!( - "window_function_to_use: {}", - self.window_function_to_use().to_string() - ), - &state, - ); - } - if self.is_ungrouped() { - result.println("is_ungrouped: true", &state); - } - if !self.order_by().is_empty() { - result.println("order_by:", &state); - for order_by in self.order_by().iter() { - result.println( - &format!( - "{} {}", - order_by.name(), - if order_by.desc() { "desc" } else { "asc" } - ), - &details_state, - ); - } - } - result.println("source:", &state); - self.source().pretty_print(result, &details_state); - } -} - -impl LogicalNode for MultiStageMeasureCalculation { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::MultiStageMeasureCalculation(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.source().as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 1, self.node_name())?; - let source = &inputs[0]; - - Ok(Rc::new( - Self::builder() - .schema(self.schema().clone()) - .is_ungrouped(self.is_ungrouped()) - .calculation_type(self.calculation_type().clone()) - .partition_by(self.partition_by().clone()) - .window_function_to_use(self.window_function_to_use().clone()) - .order_by(self.order_by().clone()) - .source(source.clone().into_logical_node()?) - .build(), - )) - } - - fn node_name(&self) -> &'static str { - "MultiStageMeasureCalculation" - } - - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::MultiStageMeasureCalculation(item) = plan_node { - Ok(item) - } else { - Err(cast_error(&plan_node, "MultiStageMeasureCalculation")) - } - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs deleted file mode 100644 index 279412a2914c3..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs +++ /dev/null @@ -1,127 +0,0 @@ -use crate::logical_plan::*; -use crate::planner::collectors::has_multi_stage_members; -use crate::planner::query_properties::OrderByItem; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use itertools::Itertools; -use std::rc::Rc; -use typed_builder::TypedBuilder; - -/// Dimension CTE in a multi-stage chain — materialises a -/// multi-stage dimension on top of a `FullKeyAggregate` source. -#[derive(TypedBuilder)] -pub struct MultiStageDimensionCalculation { - schema: Rc, - multi_stage_dimension: Rc, - #[builder(default)] - order_by: Vec, - source: Rc, -} - -impl MultiStageDimensionCalculation { - pub fn schema(&self) -> &Rc { - &self.schema - } - - pub fn multi_stage_dimension(&self) -> &Rc { - &self.multi_stage_dimension - } - - pub fn order_by(&self) -> &Vec { - &self.order_by - } - - pub fn source(&self) -> &Rc { - &self.source - } - - pub fn resolved_dimensions(&self) -> Result, CubeError> { - let mut result = vec![]; - for dim in self.schema.all_dimensions() { - if has_multi_stage_members(dim, true)? { - result.push(dim.clone().resolve_reference_chain().full_name()); - } - } - result.sort(); - Ok(result) - } - - pub fn join_dimensions(&self) -> Result>, CubeError> { - let mut result = if let Ok(dimension) = self.multi_stage_dimension.as_dimension() { - dimension.add_group_by().clone().unwrap_or_default() - } else { - vec![] - }; - for dim in self.schema.all_dimensions() { - if !has_multi_stage_members(dim, true)? { - result.push(dim.clone()); - } - } - let result = result - .into_iter() - .unique_by(|d| d.full_name()) - .collect_vec(); - Ok(result) - } -} - -impl PrettyPrint for MultiStageDimensionCalculation { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println(&format!("Dimension Calculation",), state); - let state = state.new_level(); - let details_state = state.new_level(); - result.println("schema:", &state); - self.schema().pretty_print(result, &details_state); - if !self.order_by().is_empty() { - result.println("order_by:", &state); - for order_by in self.order_by().iter() { - result.println( - &format!( - "{} {}", - order_by.name(), - if order_by.desc() { "desc" } else { "asc" } - ), - &details_state, - ); - } - } - result.println("source:", &state); - self.source().pretty_print(result, &details_state); - } -} - -impl LogicalNode for MultiStageDimensionCalculation { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::MultiStageDimensionCalculation(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.source().as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 1, self.node_name())?; - let source = &inputs[0]; - - Ok(Rc::new( - Self::builder() - .schema(self.schema().clone()) - .order_by(self.order_by().clone()) - .multi_stage_dimension(self.multi_stage_dimension.clone()) - .source(source.clone().into_logical_node()?) - .build(), - )) - } - - fn node_name(&self) -> &'static str { - "MultiStageDimensionCalculation" - } - - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::MultiStageDimensionCalculation(item) = plan_node { - Ok(item) - } else { - Err(cast_error(&plan_node, "MultiStageMeasureCalculation")) - } - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs deleted file mode 100644 index ecf5abe58a2df..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::logical_plan::*; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::rc::Rc; - -/// CTE that resolves the actual date range of a time dimension at -/// query time (used by rolling windows when no literal range is -/// given). -pub struct MultiStageGetDateRange { - pub time_dimension: Rc, - pub source: Rc, -} - -impl LogicalNode for MultiStageGetDateRange { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::MultiStageGetDateRange(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.source.as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 1, self.node_name())?; - let source = &inputs[0]; - - Ok(Rc::new(Self { - time_dimension: self.time_dimension.clone(), - source: source.clone().into_logical_node()?, - })) - } - - fn node_name(&self) -> &'static str { - "MultiStageGetDateRange" - } - - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::MultiStageGetDateRange(item) = plan_node { - Ok(item) - } else { - Err(cast_error(&plan_node, "MultiStageGetDateRange")) - } - } -} - -impl PrettyPrint for MultiStageGetDateRange { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println("Get Date Range", state); - let state = state.new_level(); - let details_state = state.new_level(); - result.println( - &format!("time_dimension: {}", self.time_dimension.full_name()), - &details_state, - ); - result.println("source:", &state); - self.source.pretty_print(result, &details_state); - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/kind.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/kind.rs new file mode 100644 index 0000000000000..34e2244251e39 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/kind.rs @@ -0,0 +1,15 @@ +/// Classifies a `PlanNode` by its role when used as a multi-stage member body. +/// +/// A multi-stage member body is either: +/// - **Leaf** — produces a CTE from base tables / joins / pre-aggregations. +/// Has no dependency on other multi-stage CTEs. +/// - **Stage** — composes the result by reading other multi-stage CTEs +/// (typically via `FullKeyAggregate` or named `MultiStageSubqueryRef`s). +/// +/// Nodes that exist only as plan structure (`LogicalJoin`, `Cube`, etc.) +/// do not have a kind and are not valid as a multi-stage member body. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MultiStageKind { + Leaf, + Stage, +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs deleted file mode 100644 index 09719fab65ce4..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::logical_plan::*; -use crate::planner::planners::multi_stage::TimeShiftState; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::rc::Rc; - -/// Leaf CTE of a multi-stage chain — a base query that produces the -/// raw aggregated values feeding the rest of the chain. Optional -/// state rendering (`render_measure_as_state`) and time shifts -/// happen here. -pub struct MultiStageLeafMeasure { - pub measures: Vec>, - pub render_measure_as_state: bool, //Render measure as state, for example hll state for count_approx - pub render_measure_for_ungrouped: bool, - pub time_shifts: TimeShiftState, - pub query: Rc, -} - -impl PrettyPrint for MultiStageLeafMeasure { - fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { - result.println("Leaf Measure Query", state); - let state = state.new_level(); - for measure in self.measures.iter() { - result.println(&format!("measure: {}", measure.full_name()), &state); - } - if self.render_measure_as_state { - result.println("render_measure_as_state: true", &state); - } - if self.render_measure_for_ungrouped { - result.println("render_measure_for_ungrouped: true", &state); - } - if !self.time_shifts.is_empty() { - result.println("time_shifts:", &state); - let details_state = state.new_level(); - for (_, time_shift) in self.time_shifts.dimensions_shifts.iter() { - result.println( - &format!( - "- {}: {}", - time_shift.dimension.full_name(), - if let Some(interval) = &time_shift.interval { - interval.to_sql() - } else if let Some(name) = &time_shift.name { - format!("{} (named)", name.to_string()) - } else { - "None".to_string() - } - ), - &details_state, - ); - } - } - result.println(&format!("query:"), &state); - let details_state = state.new_level(); - self.query.pretty_print(result, &details_state); - } -} - -impl LogicalNode for MultiStageLeafMeasure { - fn as_plan_node(self: &Rc) -> PlanNode { - PlanNode::MultiStageLeafMeasure(self.clone()) - } - - fn inputs(&self) -> Vec { - vec![self.query.as_plan_node()] - } - - fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - check_inputs_len(&inputs, 1, self.node_name())?; - let query = &inputs[0]; - - Ok(Rc::new(Self { - measures: self.measures.clone(), - render_measure_as_state: self.render_measure_as_state, - render_measure_for_ungrouped: self.render_measure_for_ungrouped, - time_shifts: self.time_shifts.clone(), - query: query.clone().into_logical_node()?, - })) - } - - fn node_name(&self) -> &'static str { - "MultiStageLeafMeasure" - } - - fn try_from_plan_node(plan_node: PlanNode) -> Result, CubeError> { - if let PlanNode::MultiStageLeafMeasure(item) = plan_node { - Ok(item) - } else { - Err(cast_error(&plan_node, "MultiStageLeafMeasure")) - } - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs index fcd81fd9ecb74..c11090a2f5819 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs @@ -2,65 +2,51 @@ use crate::logical_plan::*; use cubenativeutils::CubeError; use std::rc::Rc; -/// Body of a `LogicalMultiStageMember` — one of the multi-stage -/// CTE shapes the planner can produce. -pub enum MultiStageMemberLogicalType { - LeafMeasure(Rc), - MultipliedMeasure(Rc), - MeasureCalculation(Rc), - DimensionCalculation(Rc), - GetDateRange(Rc), +/// What sits inside a `LogicalMultiStageMember`: a Query body (the +/// regular case — leaf, DSQ, multi-stage inode, multiplied bodies) or +/// one of the special leaf nodes that don't fit the Query shape. +/// There's no nested CTE pool here — every CTE lives at the top-level +/// `LogicalPlan.ctes`, and FK refs reach them by name. +#[derive(Clone)] +pub enum MultiStageMemberBody { + Query(Rc), TimeSeries(Rc), RollingWindow(Rc), } -impl MultiStageMemberLogicalType { - fn as_plan_node(&self) -> PlanNode { +impl MultiStageMemberBody { + /// Output schema of this CTE body: dimensions/measures projected by + /// the rendered SQL. For `Query`, this is the embedded + /// `LogicalSchema`; for `RollingWindow`, the schema it carries; for + /// `TimeSeries`, a synthetic schema with just the time dimension. + pub fn schema(&self) -> Rc { match self { - Self::LeafMeasure(item) => item.as_plan_node(), - Self::MultipliedMeasure(item) => item.as_plan_node(), - Self::MeasureCalculation(item) => item.as_plan_node(), - Self::DimensionCalculation(item) => item.as_plan_node(), - Self::GetDateRange(item) => item.as_plan_node(), - Self::TimeSeries(item) => item.as_plan_node(), - Self::RollingWindow(item) => item.as_plan_node(), + Self::Query(q) => q.schema().clone(), + Self::RollingWindow(rw) => rw.schema.clone(), + Self::TimeSeries(ts) => LogicalSchema::default() + .set_time_dimensions(vec![ts.time_dimension().clone()]) + .into_rc(), } } - - fn with_plan_node(&self, plan_node: PlanNode) -> Result { - Ok(match self { - Self::LeafMeasure(_) => Self::LeafMeasure(plan_node.into_logical_node()?), - Self::MultipliedMeasure(_) => Self::MultipliedMeasure(plan_node.into_logical_node()?), - Self::MeasureCalculation(_) => Self::MeasureCalculation(plan_node.into_logical_node()?), - Self::DimensionCalculation(_) => { - Self::DimensionCalculation(plan_node.into_logical_node()?) - } - Self::GetDateRange(_) => Self::GetDateRange(plan_node.into_logical_node()?), - Self::TimeSeries(_) => Self::TimeSeries(plan_node.into_logical_node()?), - Self::RollingWindow(_) => Self::RollingWindow(plan_node.into_logical_node()?), - }) - } } -impl PrettyPrint for MultiStageMemberLogicalType { +impl PrettyPrint for MultiStageMemberBody { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { match self { - Self::LeafMeasure(measure) => measure.pretty_print(result, state), - Self::MultipliedMeasure(subquery) => subquery.pretty_print(result, state), - Self::MeasureCalculation(calculation) => calculation.pretty_print(result, state), - Self::DimensionCalculation(calculation) => calculation.pretty_print(result, state), - Self::GetDateRange(get_date_range) => get_date_range.pretty_print(result, state), - Self::TimeSeries(time_series) => time_series.pretty_print(result, state), - Self::RollingWindow(rolling_window) => rolling_window.pretty_print(result, state), + Self::Query(q) => q.pretty_print(result, state), + Self::TimeSeries(ts) => ts.pretty_print(result, state), + Self::RollingWindow(rw) => rw.pretty_print(result, state), } } } -/// Named CTE in a multi-stage chain. `Query.multistage_members` -/// holds one per CTE the source depends on. +/// Named CTE in the top-level pool: the surrounding `LogicalPlan` holds +/// one per CTE its tree of FK refs reaches by name. `body` is a +/// `MultiStageMemberBody` — the actual SELECT-shaped node rendered as +/// the CTE body. pub struct LogicalMultiStageMember { pub name: String, - pub member_type: MultiStageMemberLogicalType, + pub body: MultiStageMemberBody, } impl LogicalNode for LogicalMultiStageMember { @@ -69,16 +55,29 @@ impl LogicalNode for LogicalMultiStageMember { } fn inputs(&self) -> Vec { - vec![self.member_type.as_plan_node()] + match &self.body { + MultiStageMemberBody::Query(q) => vec![q.as_plan_node()], + MultiStageMemberBody::TimeSeries(ts) => vec![ts.as_plan_node()], + MultiStageMemberBody::RollingWindow(rw) => vec![rw.as_plan_node()], + } } fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { check_inputs_len(&inputs, 1, self.node_name())?; - let input = inputs[0].clone(); - + let new_body = match &self.body { + MultiStageMemberBody::Query(_) => { + MultiStageMemberBody::Query(inputs[0].clone().into_logical_node()?) + } + MultiStageMemberBody::TimeSeries(_) => { + MultiStageMemberBody::TimeSeries(inputs[0].clone().into_logical_node()?) + } + MultiStageMemberBody::RollingWindow(_) => { + MultiStageMemberBody::RollingWindow(inputs[0].clone().into_logical_node()?) + } + }; Ok(Rc::new(Self { name: self.name.clone(), - member_type: self.member_type.with_plan_node(input)?, + body: new_body, })) } @@ -99,6 +98,6 @@ impl PrettyPrint for LogicalMultiStageMember { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { result.println(&format!("MultiStageMember `{}`: ", self.name), state); let details_state = state.new_level(); - self.member_type.pretty_print(result, &details_state); + self.body.pretty_print(result, &details_state); } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs index 3d269880839f5..700545b5e0099 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs @@ -1,15 +1,9 @@ -mod calculation; -mod dimension; -mod get_date_range; -mod leaf_measure; +mod kind; mod member; mod rolling_window; mod time_series; -pub use calculation::*; -pub use dimension::*; -pub use get_date_range::*; -pub use leaf_measure::*; +pub use kind::*; pub use member::*; pub use rolling_window::*; pub use time_series::*; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs index 63d2260c55b9e..5aac8a3104db0 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs @@ -27,3 +27,27 @@ pub fn collect_cube_names_from_node( visitor.visit(&mut collector, node)?; Ok(collector.cube_names.into_iter().unique().collect_vec()) } + +/// `LogicalPlan` is not part of `PlanNode`, so the generic walker can't +/// descend through it. Recurse explicitly through `ctes` and the `root` +/// PlanNode subtree. +pub fn collect_cube_names_from_plan(plan: &Rc) -> Result, CubeError> { + let mut collector = CubeNamesCollector { + cube_names: Vec::new(), + }; + walk_plan(&mut collector, plan)?; + Ok(collector.cube_names.into_iter().unique().collect_vec()) +} + +fn walk_plan(collector: &mut CubeNamesCollector, plan: &Rc) -> Result<(), CubeError> { + let visitor = LogicalPlanVisitor::new(); + for cte in plan.ctes() { + match &cte.body { + MultiStageMemberBody::Query(q) => visitor.visit(collector, q)?, + MultiStageMemberBody::TimeSeries(ts) => visitor.visit(collector, ts)?, + MultiStageMemberBody::RollingWindow(rw) => visitor.visit(collector, rw)?, + } + } + visitor.visit(collector, plan.root())?; + Ok(()) +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 7d939319c307c..d7b31f3f43956 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -1,6 +1,5 @@ use super::PreAggregationsCompiler; use super::*; -use crate::logical_plan::visitor::{LogicalPlanRewriter, NodeRewriteResult}; use crate::logical_plan::*; use crate::planner::filter::FilterItem; use crate::planner::filter::FilterOp; @@ -11,7 +10,7 @@ use crate::planner::query_tools::QueryTools; use crate::planner::time_dimension::QueryDateTime; use crate::planner::MemberSymbol; use cubenativeutils::CubeError; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet, VecDeque}; use std::rc::Rc; pub struct PreAggregationUsage { @@ -53,11 +52,11 @@ impl PreAggregationOptimizer { pub fn try_optimize( &mut self, - plan: Rc, + plan: Rc, disable_external_pre_aggregations: bool, pre_aggregation_id: Option<&str>, - ) -> Result>, CubeError> { - let cube_names = collect_cube_names_from_node(&plan)?; + ) -> Result>, CubeError> { + let cube_names = collect_cube_names_from_plan(&plan)?; let mut compiler = PreAggregationsCompiler::try_new(self.query_tools.clone(), &cube_names)?; let compiled_pre_aggregations = @@ -73,7 +72,7 @@ impl PreAggregationOptimizer { compiled_pre_aggregations }; - self.try_rewrite_query( + self.try_rewrite_plan( &plan, &filtered_pre_aggregations, &TimeShiftState::default(), @@ -88,25 +87,31 @@ impl PreAggregationOptimizer { std::mem::take(&mut self.usages) } - fn try_rewrite_query( + /// Try to rewrite a whole `LogicalPlan`. Attempts a single-source + /// match against `plan.root` first (collapses the whole plan to a + /// `PreAggregationLeaf` and drops bundled CTEs); falls back to + /// walking the CTE graph from the root's FK refs. + fn try_rewrite_plan( &mut self, - query: &Rc, + plan: &Rc, compiled_pre_aggregations: &[Rc], time_shifts: &TimeShiftState, - ) -> Result>, CubeError> { + ) -> Result>, CubeError> { + let root = plan.root(); for pre_aggregation in compiled_pre_aggregations.iter() { let external = pre_aggregation.external.unwrap_or(false); let date_range = - Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts, external); - if let Some(rewritten) = - self.try_rewrite_simple_query(query, pre_aggregation, date_range)? + Self::extract_date_range(&root.filter(), &self.query_tools, time_shifts, external); + if let Some(rewritten_root) = + self.try_rewrite_simple_query(root, pre_aggregation, date_range)? { - return Ok(Some(rewritten)); + // Root collapsed to PreAggregationLeaf — bundled CTEs orphan. + return Ok(Some(LogicalPlan::new(vec![], rewritten_root))); } } - if self.allow_multi_stage && !query.multistage_members().is_empty() { - return self.try_rewrite_query_with_multistages(query, compiled_pre_aggregations); + if self.allow_multi_stage && !plan.ctes().is_empty() { + return self.try_rewrite_plan_via_graph(plan, compiled_pre_aggregations); } Ok(None) @@ -128,6 +133,7 @@ impl PreAggregationOptimizer { .filter(query.filter().clone()) .modifers(query.modifers().clone()) .source(source.into()) + .kind(QueryKind::PreAggregationLeaf) .build(); Ok(Some(Rc::new(new_query))) } else { @@ -135,9 +141,6 @@ impl PreAggregationOptimizer { } } - // Builds a self-contained Rc wrapping a matching pre-aggregation for - // a node that has schema and filter but no native Rc container - // (e.g. AggregateMultipliedSubquery). fn try_rewrite_schema_and_filter( &mut self, schema: &Rc, @@ -163,13 +166,8 @@ impl PreAggregationOptimizer { let new_query = Query::builder() .schema(schema.clone()) .filter(filter.clone()) - .modifers(Rc::new(LogicalQueryModifiers { - offset: None, - limit: None, - ungrouped: false, - order_by: vec![], - })) .source(source.into()) + .kind(QueryKind::PreAggregationLeaf) .build(); return Ok(Some(Rc::new(new_query))); } @@ -177,95 +175,63 @@ impl PreAggregationOptimizer { Ok(None) } - fn try_rewrite_query_with_multistages( + /// Walk the CTE graph from `plan.root`'s FK refs by name. Each + /// reachable member's body is rewritten according to its role; refs + /// only declared in unreachable members are pruned out of the result. + fn try_rewrite_plan_via_graph( &mut self, - query: &Rc, + plan: &Rc, compiled_pre_aggregations: &[Rc], - ) -> Result>, CubeError> { - let rewriter = LogicalPlanRewriter::new(); - let mut has_unrewritten_leaf = false; - - // Save state in case we need to rollback + ) -> Result>, CubeError> { let saved_usages_len = self.usages.len(); let saved_counter = self.usage_counter; - // Multiplied-measure CTEs don't carry their own filter — logically - // they apply the same filter as the root query, so we match against it. - let root_filter = query.filter().clone(); - - let mut rewritten_multistages = Vec::new(); - for multi_stage in query.multistage_members() { - let rewritten = rewriter.rewrite_top_down_with(multi_stage.clone(), |plan_node| { - let res = match plan_node { - PlanNode::MultiStageLeafMeasure(multi_stage_leaf_measure) => { - if let Some(rewritten) = self.try_rewrite_query( - &multi_stage_leaf_measure.query, - compiled_pre_aggregations, - &multi_stage_leaf_measure.time_shifts, - )? { - let new_leaf = Rc::new(MultiStageLeafMeasure { - measures: multi_stage_leaf_measure.measures.clone(), - render_measure_as_state: multi_stage_leaf_measure - .render_measure_as_state - .clone(), - render_measure_for_ungrouped: multi_stage_leaf_measure - .render_measure_for_ungrouped - .clone(), - time_shifts: multi_stage_leaf_measure.time_shifts.clone(), - query: rewritten, - }); - NodeRewriteResult::rewritten(new_leaf.as_plan_node()) - } else { - has_unrewritten_leaf = true; - NodeRewriteResult::stop() - } - } - PlanNode::AggregateMultipliedSubquery(agg) => { - if let Some(rewritten) = self.try_rewrite_schema_and_filter( - &agg.schema, - &root_filter, - compiled_pre_aggregations, - )? { - let new_agg = Rc::new(AggregateMultipliedSubquery { - schema: agg.schema.clone(), - keys_subquery: agg.keys_subquery.clone(), - source: agg.source.clone(), - dimension_subqueries: agg.dimension_subqueries.clone(), - pre_aggregation_override: Some(rewritten), - }); - NodeRewriteResult::rewritten(new_agg.as_plan_node()) - } else { - has_unrewritten_leaf = true; - NodeRewriteResult::stop() - } - } - PlanNode::LogicalMultiStageMember(_) => NodeRewriteResult::pass(), - _ => NodeRewriteResult::stop(), - }; - Ok(res) - })?; - rewritten_multistages.push(rewritten); - } + let root_filter = plan.root().filter().clone(); + let name_to_idx: HashMap<&str, usize> = plan + .ctes() + .iter() + .enumerate() + .map(|(i, m)| (m.name.as_str(), i)) + .collect(); - if has_unrewritten_leaf { - // Rollback usages added during failed attempt - self.usages.truncate(saved_usages_len); - self.usage_counter = saved_counter; - return Ok(None); + let mut rewritten: HashMap = HashMap::new(); + let mut visited: HashSet = HashSet::new(); + let mut queue: VecDeque = VecDeque::new(); + for r in Self::query_source_refs(plan.root()) { + queue.push_back(r); } - let source = if let QuerySource::FullKeyAggregate(full_key_aggregate) = query.source() { - let result = FullKeyAggregate::builder() - .schema(full_key_aggregate.schema().clone()) - .use_full_join_and_coalesce(full_key_aggregate.use_full_join_and_coalesce()) - .multi_stage_subquery_refs(full_key_aggregate.multi_stage_subquery_refs().clone()) - .build(); - Rc::new(result).into() - } else { - query.source().clone() - }; + while let Some(name) = queue.pop_front() { + if !visited.insert(name.clone()) { + continue; + } + let Some(&idx) = name_to_idx.get(name.as_str()) else { + // Ref not backed by a member in this pool (shouldn't + // happen with well-formed plans). Skip. + continue; + }; + let member = &plan.ctes()[idx]; + match self.visit_member_body( + &member.body, + &root_filter, + compiled_pre_aggregations, + &mut queue, + )? { + CteRewriteOutcome::Rewritten(new_body) => { + rewritten.insert(name, new_body); + } + CteRewriteOutcome::Keep => { + rewritten.insert(name, member.body.clone()); + } + CteRewriteOutcome::NotMatched => { + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; + return Ok(None); + } + } + } - // Reject mixed external/non-external pre-aggregation usages + // Reject mixed external/non-external pre-aggregation usages. let new_usages = &self.usages[saved_usages_len..]; if !new_usages.is_empty() { let first_external = new_usages[0].external(); @@ -276,15 +242,109 @@ impl PreAggregationOptimizer { } } - let result = Query::builder() - .multistage_members(rewritten_multistages) - .schema(query.schema().clone()) - .filter(query.filter().clone()) - .modifers(query.modifers().clone()) - .source(source) - .build(); + // Preserve original CTE order; drop members that were unreachable + // from the root after rewrites (they're orphans of replaced bodies). + let new_ctes: Vec<_> = plan + .ctes() + .iter() + .filter_map(|m| { + rewritten.get(&m.name).map(|body| { + Rc::new(LogicalMultiStageMember { + name: m.name.clone(), + body: body.clone(), + }) + }) + }) + .collect(); - Ok(Some(Rc::new(result))) + Ok(Some(LogicalPlan::new(new_ctes, plan.root().clone()))) + } + + /// Compute the rewrite outcome for a single CTE body and push the + /// names of further refs it transitively reaches into `queue` (only + /// when the body is kept — replaced bodies break the chain). + fn visit_member_body( + &mut self, + body: &MultiStageMemberBody, + root_filter: &Rc, + compiled_pre_aggregations: &[Rc], + queue: &mut VecDeque, + ) -> Result { + match body { + MultiStageMemberBody::Query(q) => match q.kind().pre_agg_rewrite() { + PreAggregationRewriteRole::NoRewrite => Ok(CteRewriteOutcome::Keep), + PreAggregationRewriteRole::PassThrough => { + for r in Self::query_source_refs(q) { + queue.push_back(r); + } + Ok(CteRewriteOutcome::Keep) + } + PreAggregationRewriteRole::Leaf => { + let time_shifts = q.modifers().time_shifts.clone(); + let mut matched: Option> = None; + for pre_aggregation in compiled_pre_aggregations.iter() { + let external = pre_aggregation.external.unwrap_or(false); + let date_range = Self::extract_date_range( + &q.filter(), + &self.query_tools, + &time_shifts, + external, + ); + if let Some(rewritten) = + self.try_rewrite_simple_query(q, pre_aggregation, date_range)? + { + matched = Some(rewritten); + break; + } + } + if let Some(rewritten) = matched { + Ok(CteRewriteOutcome::Rewritten(MultiStageMemberBody::Query( + rewritten, + ))) + } else { + Ok(CteRewriteOutcome::NotMatched) + } + } + PreAggregationRewriteRole::WholeSubtree => { + if let Some(rewritten) = self.try_rewrite_schema_and_filter( + q.schema(), + root_filter, + compiled_pre_aggregations, + )? { + Ok(CteRewriteOutcome::Rewritten(MultiStageMemberBody::Query( + rewritten, + ))) + } else { + Ok(CteRewriteOutcome::NotMatched) + } + } + }, + MultiStageMemberBody::TimeSeries(ts) => { + if let Some(get_range) = ts.get_date_range_multistage_ref() { + queue.push_back(get_range.clone()); + } + Ok(CteRewriteOutcome::Keep) + } + MultiStageMemberBody::RollingWindow(rw) => { + queue.push_back(rw.time_series_input.name().clone()); + queue.push_back(rw.measure_input.name().clone()); + Ok(CteRewriteOutcome::Keep) + } + } + } + + /// Returns the CTE names a Query consumes through its source. Only + /// `FullKeyAggregate` sources hold refs; everything else points at + /// base tables / joins. + fn query_source_refs(query: &Rc) -> Vec { + let QuerySource::FullKeyAggregate(fk) = query.source() else { + return Vec::new(); + }; + let mut refs: Vec = fk.data_inputs().iter().map(|r| r.name().clone()).collect(); + if let Some(keys_ref) = fk.keys_subquery_ref() { + refs.push(keys_ref.name().clone()); + } + refs } fn make_pre_aggregation_source( @@ -504,9 +564,9 @@ impl PreAggregationOptimizer { &self, measures: &Vec>, pre_aggregation: &CompiledPreAggregation, - only_additive: bool, + only_addictive: bool, ) -> Result>, CubeError> { - let mut matcher = MeasureMatcher::new(pre_aggregation, only_additive); + let mut matcher = MeasureMatcher::new(pre_aggregation, only_addictive); for measure in measures.iter() { if !matcher.try_match(measure)? { return Ok(None); @@ -536,3 +596,9 @@ impl PreAggregationOptimizer { Ok(result) } } + +enum CteRewriteOutcome { + Rewritten(MultiStageMemberBody), + Keep, + NotMatched, +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs index 74ec6bd2508ed..3845cfce677dc 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_collector.rs @@ -13,8 +13,11 @@ impl OriginalSqlCollector { Self { query_tools } } - pub fn collect(&mut self, plan: &Rc) -> Result, CubeError> { - let cube_names = collect_cube_names_from_node(&plan)?; + pub fn collect( + &mut self, + plan: &Rc, + ) -> Result, CubeError> { + let cube_names = collect_cube_names_from_plan(plan)?; let mut result = HashMap::new(); for cube_name in cube_names.iter() { let pre_aggregations = self diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/plan.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/plan.rs new file mode 100644 index 0000000000000..5b0730c40dfc1 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/plan.rs @@ -0,0 +1,57 @@ +use super::*; +use std::rc::Rc; + +/// Root container of a planned query: a WITH-clause `ctes` pool plus a +/// `root` Query that consumes them. All CTEs the plan needs live here +/// at one level — there's no nested pool inside member bodies. FK refs +/// inside Queries reach pool entries by name; tree walkers cross the +/// `LogicalPlan ↔ PlanNode` boundary explicitly. +#[derive(Clone)] +pub struct LogicalPlan { + pub ctes: Vec>, + pub root: Rc, +} + +impl LogicalPlan { + pub fn new(ctes: Vec>, root: Rc) -> Rc { + Rc::new(Self { ctes, root }) + } + + pub fn ctes(&self) -> &Vec> { + &self.ctes + } + + pub fn root(&self) -> &Rc { + &self.root + } + + pub fn with_root(self: &Rc, root: Rc) -> Rc { + Rc::new(Self { + ctes: self.ctes.clone(), + root, + }) + } + + pub fn with_ctes(self: &Rc, ctes: Vec>) -> Rc { + Rc::new(Self { + ctes, + root: self.root.clone(), + }) + } +} + +impl PrettyPrint for LogicalPlan { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("LogicalPlan:", state); + let inner = state.new_level(); + let details = inner.new_level(); + if !self.ctes.is_empty() { + result.println("ctes:", &inner); + for cte in self.ctes.iter() { + cte.pretty_print(result, &details); + } + } + result.println("root:", &inner); + self.root.pretty_print(result, &details); + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs index c0ddeacf9116b..22ebc6fb3c33c 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs @@ -9,17 +9,32 @@ use typed_builder::TypedBuilder; /// ungrouped), and the multi-stage CTEs the source depends on. #[derive(Clone, TypedBuilder)] pub struct Query { + /// Computed-dimension CTE references this Query consumes. Each ref + /// carries its own join strategy (`OnPrimaryKeys` for the ex-DSQ + /// pattern, `OnOuterDimensions` for the multi-stage-dim pattern). + /// At render time the processor passes these into the source + /// rendering context — they don't get embedded into `LogicalJoin` / + /// `FullKeyAggregate`. Bodies live on the surrounding `LogicalPlan`. #[builder(default)] - multistage_members: Vec>, + multi_stage_dimensions: Vec>, schema: Rc, + #[builder(default)] filter: Rc, + #[builder(default)] modifers: Rc, source: QuerySource, + /// Explicit role of this Query in the multi-stage pipeline. Planner + /// places set this at construction; consumers (QueryProcessor, + /// pre-aggregation optimizer) match on it to pick the right render + /// path. Role-specific data (partition_by, multi-stage dimension, + /// etc.) lives inside the matching variant. + #[builder(default)] + kind: QueryKind, } impl Query { - pub fn multistage_members(&self) -> &Vec> { - &self.multistage_members + pub fn multi_stage_dimensions(&self) -> &Vec> { + &self.multi_stage_dimensions } pub fn schema(&self) -> &Rc { &self.schema @@ -36,6 +51,35 @@ impl Query { pub fn set_source(&mut self, source: QuerySource) { self.source = source; } + pub fn kind(&self) -> &QueryKind { + &self.kind + } + + pub fn with_modifers(self: &Rc, modifers: Rc) -> Rc { + Rc::new(Self { + multi_stage_dimensions: self.multi_stage_dimensions.clone(), + schema: self.schema.clone(), + filter: self.filter.clone(), + modifers, + source: self.source.clone(), + kind: self.kind.clone(), + }) + } + + /// Replace the published `multi_stage_dimensions` refs. + pub fn with_multi_stage_dimensions( + self: &Rc, + multi_stage_dimensions: Vec>, + ) -> Rc { + Rc::new(Self { + multi_stage_dimensions, + schema: self.schema.clone(), + filter: self.filter.clone(), + modifers: self.modifers.clone(), + source: self.source.clone(), + kind: self.kind.clone(), + }) + } } impl LogicalNode for Query { @@ -44,24 +88,18 @@ impl LogicalNode for Query { } fn inputs(&self) -> Vec { - QueryInputPacker::pack(self) + vec![self.source.as_plan_node()] } fn with_inputs(self: Rc, inputs: Vec) -> Result, CubeError> { - let QueryInputUnPacker { - multistage_members, - source, - } = QueryInputUnPacker::new(&self, &inputs)?; - + check_inputs_len(&inputs, 1, self.node_name())?; Ok(Rc::new(Self { - multistage_members: multistage_members - .iter() - .map(|member| member.clone().into_logical_node()) - .collect::, _>>()?, + multi_stage_dimensions: self.multi_stage_dimensions.clone(), schema: self.schema.clone(), filter: self.filter.clone(), modifers: self.modifers.clone(), - source: self.source.with_plan_node(source.clone())?, + source: self.source.with_plan_node(inputs[0].clone())?, + kind: self.kind.clone(), })) } @@ -77,50 +115,16 @@ impl LogicalNode for Query { } } -pub struct QueryInputPacker; - -impl QueryInputPacker { - pub fn pack(query: &Query) -> Vec { - let mut result = vec![]; - result.extend( - query - .multistage_members - .iter() - .map(|member| member.as_plan_node()), - ); - result.push(query.source.as_plan_node()); - result - } -} -pub struct QueryInputUnPacker<'a> { - multistage_members: &'a [PlanNode], - source: &'a PlanNode, -} - -impl<'a> QueryInputUnPacker<'a> { - pub fn new(query: &Query, inputs: &'a Vec) -> Result { - check_inputs_len(&inputs, Self::inputs_len(query), query.node_name())?; - let multistage_members = &inputs[0..query.multistage_members.len()]; - let source = &inputs[query.multistage_members.len()]; - Ok(Self { - multistage_members, - source, - }) - } - fn inputs_len(query: &Query) -> usize { - query.multistage_members.len() + 1 - } -} - impl PrettyPrint for Query { fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { result.println("Query: ", state); let state = state.new_level(); let details_state = state.new_level(); - if !self.multistage_members.is_empty() { - result.println("multistage_members:", &state); - for member in self.multistage_members.iter() { - member.pretty_print(result, &details_state); + self.kind.pretty_print(result, &state); + if !self.multi_stage_dimensions.is_empty() { + result.println("multi_stage_dimensions:", &state); + for msd in self.multi_stage_dimensions.iter() { + msd.pretty_print(result, &details_state); } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query_kind.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query_kind.rs new file mode 100644 index 0000000000000..bb78def7935f3 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/query_kind.rs @@ -0,0 +1,97 @@ +use super::pretty_print::*; +use super::PreAggregationRewriteRole; +use crate::planner::MemberSymbol; +use std::rc::Rc; + +/// Stage Calculation flavour — what operation the stage performs over its +/// `FullKeyAggregate`-of-CTE-refs source. +#[derive(Clone)] +pub enum StageKind { + /// Re-aggregation (GROUP BY all dims + measure agg-wrap). + Aggregation, + /// `RANK()` window over the FK-of-CTE-refs. + Rank { partition_by: Vec> }, + /// Generic window function over the FK-of-CTE-refs. + Window { partition_by: Vec> }, + /// Computes a multi-stage dimension. + DimensionCalc { + multi_stage_dimension: Rc, + }, +} + +/// Raw-fact body flavour inside the aggregate-multiplied pipeline. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum FactKind { + /// `SELECT DISTINCT outer_dims + pk_dims FROM join` — ex-KeysSubQuery. + Keys, + /// `SELECT pk_dims + raw measures FROM join` with `set_ungrouped_measure` + /// — ex-MeasureSubquery. + Measures, +} + +/// Explicit role of a Query in the multi-stage pipeline. Variants carry +/// their own role-specific data — there is no longer a `source` / +/// `multistage_members` shape predicate to interpret. +#[derive(Clone, Default)] +pub enum QueryKind { + /// Top-level / leaf-wrapper sitting over a non-empty FullKeyAggregate + /// of CTE refs. Multi-stage CTE bodies live in + /// `Query.multistage_members` (common to any Query flavour) — both + /// the FK-of-CTE-refs members and the multi-stage-dim bodies are + /// rendered there. + TopLevelOverCtes, + /// Multi-stage Stage Calculation; the nested `StageKind` picks the + /// flavour and carries the partition / dimension members it needs. + Stage(StageKind), + /// Aggregate-multiplied subquery body — FullKeyAggregate joining a + /// MeasureSubquery CTE to a KeysSubQuery CTE on pk dims. WholeSubtree + /// rewrite. + AggregateMultiplied, + /// Plain aggregating leaf over a LogicalJoin source — top-level + /// SimpleQuery and regular_measures_subquery bodies. + #[default] + LeafOverJoin, + /// Raw fact body inside the aggregate-multiplied pipeline. `FactKind` + /// picks Keys (distinct projection) or Measures (ungrouped raw + /// columns). NoRewrite — the parent AggregateMultiplied is the + /// rewrite unit. + InternalFact(FactKind), + /// Pre-aggregation-backed leaf — output of the pre-agg optimizer. + PreAggregationLeaf, +} + +impl QueryKind { + /// How the pre-aggregation optimizer should treat this Query when + /// walking a multi-stage tree. + pub fn pre_agg_rewrite(&self) -> PreAggregationRewriteRole { + match self { + Self::TopLevelOverCtes | Self::LeafOverJoin | Self::PreAggregationLeaf => { + PreAggregationRewriteRole::Leaf + } + Self::Stage(_) => PreAggregationRewriteRole::PassThrough, + Self::AggregateMultiplied => PreAggregationRewriteRole::WholeSubtree, + Self::InternalFact(_) => PreAggregationRewriteRole::NoRewrite, + } + } + + pub fn label(&self) -> &'static str { + match self { + Self::TopLevelOverCtes => "TopLevelOverCtes", + Self::Stage(StageKind::Aggregation) => "StageAggregation", + Self::Stage(StageKind::Rank { .. }) => "StageRank", + Self::Stage(StageKind::Window { .. }) => "StageWindow", + Self::Stage(StageKind::DimensionCalc { .. }) => "StageDimensionCalc", + Self::AggregateMultiplied => "AggregateMultiplied", + Self::LeafOverJoin => "LeafOverJoin", + Self::InternalFact(FactKind::Keys) => "InternalFact(Keys)", + Self::InternalFact(FactKind::Measures) => "InternalFact(Measures)", + Self::PreAggregationLeaf => "PreAggregationLeaf", + } + } +} + +impl PrettyPrint for QueryKind { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("kind: {}", self.label()), state); + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs index 0a11a28aab067..e2cb61f1f0023 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs @@ -98,6 +98,44 @@ impl LogicalSchema { Ok(result) } + /// Sorted full-names of all multi-stage dimensions in the schema, with + /// reference chains resolved. Used as a stable lookup key for the + /// physical builder's multi-stage dimension schema cache. + pub fn multi_stage_dimensions_resolved_names(&self) -> Result, CubeError> { + let mut result = vec![]; + for dim in self.all_dimensions() { + if has_multi_stage_members(dim, true)? { + result.push(dim.clone().resolve_reference_chain().full_name()); + } + } + result.sort(); + Ok(result) + } + + /// Dimensions used to LEFT JOIN a multi-stage-dimension CTE into the + /// fact join: the dimension's `add_group_by` (if it has one) plus all + /// non-multi-stage dimensions of this schema, deduplicated. + pub fn multi_stage_join_dimensions( + &self, + multi_stage_dimension: &Rc, + ) -> Result>, CubeError> { + let mut result = if let Ok(dimension) = multi_stage_dimension.as_dimension() { + dimension.add_group_by().clone().unwrap_or_default() + } else { + vec![] + }; + for dim in self.all_dimensions() { + if !has_multi_stage_members(dim, true)? { + result.push(dim.clone()); + } + } + let result = result + .into_iter() + .unique_by(|d| d.full_name()) + .collect_vec(); + Ok(result) + } + /// Get the member symbol at a given position (as returned by find_member_positions). /// Position ordering: dimensions, then time_dimensions, then measures. pub fn get_member_at_position(&self, position: usize) -> Option> { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/visitor/visitor.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/visitor/visitor.rs index 4a3eeb81e9515..4a90ebc613aa3 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/visitor/visitor.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/visitor/visitor.rs @@ -40,6 +40,17 @@ impl LogicalPlanVisitor { self.visit(&mut wrapper, node) } + /// Visit a subtree rooted at a `PlanNode` directly. Used by callers + /// outside the `LogicalNode` trait (e.g. `LogicalPlan` which doesn't + /// fit the trait because it lives above the PlanNode hierarchy). + pub fn visit_plan_node( + &self, + node_visitor: &mut T, + node: &PlanNode, + ) -> Result<(), CubeError> { + self.visit_impl(node_visitor, node) + } + fn visit_impl( &self, node_visitor: &mut T, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/select.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/select.rs index aaece48b91827..bdb3cd7a23d8a 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/select.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/select.rs @@ -55,6 +55,12 @@ impl Select { self.schema.clone() } + pub fn with_ctes(self: &Rc, ctes: Vec>) -> Rc { + let mut clone = (**self).clone(); + clone.ctes = ctes; + Rc::new(clone) + } + pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result { let projection = if !self.projection_columns.is_empty() { self.projection_columns diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs index 78e2cbe7186a4..a5f4a77038f5e 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs @@ -7,7 +7,6 @@ use crate::physical_plan::sql_nodes::SqlNodesFactory; use crate::physical_plan::ReferencesBuilder; use crate::physical_plan::VisitorContext; use crate::physical_plan::*; -use crate::physical_plan_builder::context::MultiStageDimensionContext; use crate::planner::query_properties::OrderByItem; use crate::planner::query_tools::QueryTools; use crate::planner::sql_templates::PlanSqlTemplates; @@ -55,9 +54,40 @@ impl PhysicalPlanBuilder { processor.process(logical_node, context) } + pub(super) fn resolve_partition_refs( + &self, + partition_by: &[Rc], + references_builder: &ReferencesBuilder, + ) -> Result, CubeError> { + let templates = &self.plan_sql_templates; + partition_by + .iter() + .map(|dim| -> Result<_, CubeError> { + let reference = references_builder + .find_reference_for_member(dim, &None) + .ok_or_else(|| { + CubeError::internal(format!( + "Alias not found for partition_by dimension {}", + dim.full_name() + )) + })?; + let table_ref = if let Some(table_name) = reference.source() { + format!("{}.", templates.quote_identifier(table_name)?) + } else { + String::new() + }; + Ok(format!( + "{}{}", + table_ref, + templates.quote_identifier(&reference.name())? + )) + }) + .collect() + } + pub fn build( &self, - logical_plan: Rc, + logical_plan: Rc, original_sql_pre_aggregations: HashMap, total_query: bool, ) -> Result, CubeError> { @@ -88,7 +118,7 @@ impl PhysicalPlanBuilder { fn build_impl( &self, - logical_plan: Rc, + logical_plan: Rc, context: &PushDownBuilderContext, ) -> Result, CubeError> { self.process_node(logical_plan.as_ref(), context) @@ -115,56 +145,77 @@ impl PhysicalPlanBuilder { } } - pub(super) fn add_subquery_join( + /// Add a `LEFT JOIN ON ...` for a multi-stage-dim CTE ref to a + /// cube-join chain. Used for the `OnPrimaryKeys` flavour: the cube + /// the ref keys against has already been added to `join_builder`, + /// and we LEFT-join the CTE on its primary keys. + /// + /// Each side of the ON condition is resolved per-source — CTE side + /// via the CTE's projected schema, cube side as a direct column + /// reference to `cube_alias.`. We can't route the + /// cube-side through a `MemberExpression`: once the outer SELECT + /// resolves the same PK dim, `render_references` would map it to + /// the CTE column and the ON would degenerate into a self-equal. + pub(super) fn add_multi_stage_dimension_pk_join( &self, - dimension_subquery: Rc, + ref_name: &str, + pk_dimensions: &[Rc], + cube_alias: &str, join_builder: &mut JoinBuilder, context: &PushDownBuilderContext, ) -> Result<(), CubeError> { - let mut context = context.clone(); - context.dimensions_query = false; - context.measure_subquery = true; - let sub_query = self.process_node(dimension_subquery.query.as_ref(), &context)?; - let dim_name = dimension_subquery.subquery_dimension.name(); - let cube_name = dimension_subquery.subquery_dimension.cube_name(); - let primary_keys_dimensions = &dimension_subquery.primary_keys_dimensions; - let sub_query_alias = format!("{cube_name}_{dim_name}_subquery"); - let conditions = primary_keys_dimensions + // Body is rendered once on the top-level Query as a CTE; here we + // just LEFT JOIN that CTE by name. Order contract: the top-level + // `QueryProcessor` MUST publish the CTE (via `add_cte_schema`) + // before any reference site gets to call this. + let cte_schema = context.get_cte_schema(ref_name)?; + let conditions = pk_dimensions .iter() .map(|dim| -> Result<_, CubeError> { - let alias_in_sub_query = sub_query.schema().resolve_member_alias(&dim); + let alias_in_sub_query = cte_schema.resolve_member_alias(dim); let sub_query_ref = Expr::Reference(QualifiedColumnName::new( - Some(sub_query_alias.clone()), - alias_in_sub_query.clone(), + Some(ref_name.to_string()), + alias_in_sub_query, )); - - Ok(vec![(sub_query_ref, Expr::new_member(dim.clone()))]) + let cube_side_ref = Expr::Reference(QualifiedColumnName::new( + Some(cube_alias.to_string()), + dim.name(), + )); + Ok(vec![(sub_query_ref, cube_side_ref)]) }) .collect::, _>>()?; - join_builder.left_join_subselect( - sub_query, - sub_query_alias, + join_builder.left_join_table_reference( + ref_name.to_string(), + cte_schema, + Some(ref_name.to_string()), JoinCondition::new_dimension_join(conditions, false), ); Ok(()) } - pub(super) fn add_multistage_dimension_join( + /// LEFT-JOIN a multi-stage dimension CTE onto the existing join + /// chain, using the explicit `join_dimensions` list carried by + /// `MultiStageDimensionJoin::OnOuterDimensions`. For each join + /// dimension we map the CTE-side column (via the CTE's projected + /// schema) to the outer-side expression (resolved against the + /// current join's references). + pub(super) fn add_multistage_outer_dimensions_join( &self, - dimension_schema: &Rc, + ms_ref: &MultiStageDimensionRef, + join_dimensions: &[Rc], join_builder: &mut JoinBuilder, context: &PushDownBuilderContext, ) -> Result<(), CubeError> { + let cte_schema = context.get_cte_schema(&ms_ref.name)?; let original_join = join_builder.clone().build(); let references_builder = ReferencesBuilder::new(From::new_from_join(original_join)); - let conditions = dimension_schema - .join_dimensions + let conditions = join_dimensions .iter() .map(|dim| -> Result<_, CubeError> { - let alias_in_cte = dimension_schema.schema.resolve_member_alias(&dim); + let alias_in_cte = cte_schema.resolve_member_alias(dim); let sub_query_ref = Expr::Reference(QualifiedColumnName::new( - Some(dimension_schema.name.clone()), + Some(ms_ref.name.clone()), alias_in_cte, )); @@ -192,33 +243,31 @@ impl PhysicalPlanBuilder { .collect::, _>>()?; join_builder.left_join_table_reference( - dimension_schema.name.clone(), - dimension_schema.schema.clone(), + ms_ref.name.clone(), + cte_schema, None, JoinCondition::new_dimension_join(conditions, false), ); Ok(()) } - pub(super) fn resolve_subquery_dimensions_references( + /// Register outer-scope render references for each multi-stage-dim + /// CTE this Query consumes. + pub(super) fn resolve_multi_stage_dimension_references( &self, - dimension_subqueries: &Vec>, + multi_stage_dimensions: &Vec>, references_builder: &ReferencesBuilder, context_factory: &mut SqlNodesFactory, ) -> Result<(), CubeError> { - for dimension_subquery in dimension_subqueries.iter() { - if let Some(dim_ref) = references_builder.find_reference_for_member( - &dimension_subquery.measure_for_subquery_dimension, - &None, - ) { - context_factory.add_render_reference( - dimension_subquery.subquery_dimension.full_name(), - dim_ref, - ); + for ms_dim in multi_stage_dimensions.iter() { + if let Some(dim_ref) = + references_builder.find_reference_for_member(&ms_dim.dimension, &None) + { + context_factory.add_render_reference(ms_dim.dimension.full_name(), dim_ref); } else { return Err(CubeError::internal(format!( - "Can't find source for subquery dimension {}", - dimension_subquery.subquery_dimension.full_name() + "Can't find source for multi-stage dimension {}", + ms_dim.dimension.full_name() ))); } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/context.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/context.rs index 21254f4c82850..75d1af1af54da 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/context.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/context.rs @@ -1,5 +1,6 @@ use cubenativeutils::CubeError; +use crate::logical_plan::MultiStageDimensionRef; use crate::physical_plan::sql_nodes::SqlNodesFactory; use crate::physical_plan::Schema; use crate::planner::planners::multi_stage::TimeShiftState; @@ -7,13 +8,6 @@ use crate::planner::MemberSymbol; use std::collections::HashMap; use std::rc::Rc; -#[derive(Clone, Debug, Default)] -pub struct MultiStageDimensionContext { - pub name: String, - pub schema: Rc, - pub join_dimensions: Vec>, -} - #[derive(Clone, Debug, Default)] pub(super) struct PushDownBuilderContext { pub alias_prefix: Option, @@ -24,9 +18,17 @@ pub(super) struct PushDownBuilderContext { pub required_measures: Option>>, pub dimensions_query: bool, pub measure_subquery: bool, - pub multi_stage_schemas: HashMap>, - pub multi_stage_dimension_schemas: HashMap, Rc>, - pub multi_stage_dimensions: Vec, + /// Schemas of all CTEs published on the top-level Query: multi-stage + /// member CTEs, dimension-subquery CTEs and measure-subquery CTEs share + /// this storage. Lookup is by CTE alias / name; all three kinds are + /// interchangeable as table references at the SQL level. + pub cte_schemas: HashMap>, + /// MS-dim refs the current Query consumes. The source-render code + /// reads these out to wire `OnPrimaryKeys` LEFT JOINs inside the + /// cube-join chain (`LogicalJoin`) and `OnOuterDimensions` LEFT + /// JOINs at the chain tail. QueryProcessor sets the list before + /// invoking `process_node(source)`. + pub multi_stage_dimension_refs: Vec>, } impl PushDownBuilderContext { @@ -46,62 +48,17 @@ impl PushDownBuilderContext { Ok(factory) } - pub fn add_multi_stage_schema(&mut self, name: String, schema: Rc) { - self.multi_stage_schemas.insert(name, schema); - } - - pub fn remove_multi_stage_dimensions(&mut self) { - self.multi_stage_dimensions = Vec::new(); - } - - pub fn add_multi_stage_dimension(&mut self, name: String) { - self.multi_stage_dimensions.push(name); - } - - pub fn get_multi_stage_dimensions( - &self, - ) -> Result>, CubeError> { - if self.multi_stage_dimensions.is_empty() { - return Ok(None); - } - let mut dimensions_to_resolve = self.multi_stage_dimensions.clone(); - dimensions_to_resolve.sort(); - if let Some(schema) = self - .multi_stage_dimension_schemas - .get(&dimensions_to_resolve) - { - Ok(Some(schema.clone())) - } else { - Err(CubeError::internal(format!( - "Cannot find source for resolve multi stage dimensions {}", - dimensions_to_resolve.join(", ") - ))) - } - } - - pub fn add_multi_stage_dimension_schema( - &mut self, - resolved_dimensions: Vec, - cte_name: String, - join_dimensions: Vec>, - schema: Rc, - ) { - self.multi_stage_dimension_schemas.insert( - resolved_dimensions, - Rc::new(MultiStageDimensionContext { - name: cte_name, - join_dimensions, - schema, - }), - ); + pub fn add_cte_schema(&mut self, name: String, schema: Rc) { + self.cte_schemas.insert(name, schema); } - pub fn get_multi_stage_schema(&self, name: &str) -> Result, CubeError> { - if let Some(schema) = self.multi_stage_schemas.get(name) { + pub fn get_cte_schema(&self, name: &str) -> Result, CubeError> { + if let Some(schema) = self.cte_schemas.get(name) { Ok(schema.clone()) } else { Err(CubeError::internal(format!( - "Cannot find schema for multi stage cte {}", + "CTE schema for `{}` not found — caller must publish it on \ + the top-level Query before any reference site is processed", name ))) } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/aggregate_multiplied_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/aggregate_multiplied_subquery.rs deleted file mode 100644 index a471a8261c780..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/aggregate_multiplied_subquery.rs +++ /dev/null @@ -1,205 +0,0 @@ -use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; -use crate::logical_plan::{AggregateMultipliedSubquery, AggregateMultipliedSubquerySource}; -use crate::physical_plan::ReferencesBuilder; -use crate::physical_plan::VisitorContext; -use crate::physical_plan::{ - Expr, From, JoinBuilder, JoinCondition, MemberExpression, QualifiedColumnName, Select, - SelectBuilder, -}; -use crate::physical_plan_builder::PhysicalPlanBuilder; -use cubenativeutils::CubeError; -use std::rc::Rc; - -pub struct AggregateMultipliedSubqueryProcessor<'a> { - builder: &'a PhysicalPlanBuilder, -} - -impl<'a> LogicalNodeProcessor<'a, AggregateMultipliedSubquery> - for AggregateMultipliedSubqueryProcessor<'a> -{ - type PhysycalNode = Rc; - fn new(builder: &'a PhysicalPlanBuilder) -> Self { - Self { builder } - } - - fn process( - &self, - keys_subquery: &KeysSubQuery, - context: &PushDownBuilderContext, - ) -> Result { - let query_tools = self.builder.query_tools(); - let alias_prefix = Some(format!( - "{}_key", - query_tools.alias_for_cube(&keys_subquery.pk_cube().cube().name())? - )); - - let mut context = context.clone(); - context.alias_prefix = alias_prefix; - - let mut context_factory = context.make_sql_nodes_factory()?; - let source = self - .builder - .process_node(keys_subquery.source().as_ref(), &context)?; - - //FIXME duplication with QueryProcessor - let all_symbols = all_symbols(&keys_subquery.schema(), &keys_subquery.filter()); - let calc_group_dims = collect_calc_group_dims_from_nodes(all_symbols.iter())?; - - let filter = keys_subquery.filter().all_filters(); - let calc_groups_items = calc_group_dims.into_iter().map(|dim| { - let values = get_filtered_values(&dim, &filter); - CalcGroupItem { - symbol: dim, - values, - } - }); - for item in calc_groups_items - .clone() - .filter(|itm| itm.values.len() == 1) - { - context_factory.add_render_reference(item.symbol.full_name(), item.values[0].clone()); - } - let calc_groups_to_join = calc_groups_items - .filter(|itm| itm.values.len() > 1) - .collect_vec(); - let source = if calc_groups_to_join.is_empty() { - source - } else { - let groups_join = CalcGroupsJoin::try_new(source, calc_groups_to_join)?; - From::new_from_calc_groups_join(groups_join) - }; - - let references_builder = ReferencesBuilder::new(source.clone()); - let mut select_builder = SelectBuilder::new(source); - self.builder.resolve_subquery_dimensions_references( - &keys_subquery.source().dimension_subqueries(), - &references_builder, - &mut context_factory, - )?; - for member in keys_subquery.schema().all_dimensions() { - let alias = member.alias(); - references_builder.resolve_references_for_member( - member.clone(), - &None, - context_factory.render_references_mut(), - )?; - select_builder.add_projection_member(member, Some(alias)); - } - - if !context.dimensions_query { - for member in keys_subquery.primary_keys_dimensions().iter() { - let alias = member.alias(); - references_builder.resolve_references_for_member( - member.clone(), - &None, - context_factory.render_references_mut(), - )?; - select_builder.add_projection_member(member, Some(alias)); - } - } - - select_builder.set_distinct(); - select_builder.set_filter(filter); - let res = Rc::new(select_builder.build(query_tools.clone(), context_factory)); - Ok(res) - } -} - -impl ProcessableNode for KeysSubQuery { - type ProcessorType<'a> = KeysSubQueryProcessor<'a>; -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/logical_join.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/logical_join.rs index 9a04cb0834ce4..b75dc707428a4 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/logical_join.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/logical_join.rs @@ -1,5 +1,5 @@ use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; -use crate::logical_plan::LogicalJoin; +use crate::logical_plan::{LogicalJoin, MultiStageDimensionJoin}; use crate::physical_plan::{From, JoinBuilder, JoinCondition}; use crate::physical_plan_builder::PhysicalPlanBuilder; use crate::planner::SqlJoinCondition; @@ -21,25 +21,23 @@ impl<'a> LogicalNodeProcessor<'a, LogicalJoin> for LogicalJoinProcessor<'a> { logical_join: &LogicalJoin, context: &PushDownBuilderContext, ) -> Result { - let multi_stage_dimension = context.get_multi_stage_dimensions()?; + // Partition multi-stage dim refs by their join shape: + // - OnPrimaryKeys: attaches inside the cube-join chain after + // each matching cube (root or joined). + // - OnOuterDimensions: attaches once at the tail of the join, + // keyed on the explicit `join_dimensions` carried by the ref. + let (pk_refs, outer_refs): (Vec<_>, Vec<_>) = context + .multi_stage_dimension_refs + .iter() + .cloned() + .partition(|r| matches!(&r.join, MultiStageDimensionJoin::OnPrimaryKeys { .. })); + if logical_join.root().is_none() { - let res = if let Some(multi_stage_dimension) = &multi_stage_dimension { - From::new_from_table_reference( - multi_stage_dimension.name.clone(), - multi_stage_dimension.schema.clone(), - None, - ) - } else { - From::new_empty() - }; - return Ok(res); + return Ok(From::new_empty()); } let root = logical_join.root().clone().unwrap().cube().clone(); - if logical_join.joins().is_empty() - && logical_join.dimension_subqueries().is_empty() - && multi_stage_dimension.is_none() - { + if logical_join.joins().is_empty() && pk_refs.is_empty() && outer_refs.is_empty() { Ok(From::new_from_cube( root.clone(), Some(root.default_alias_with_prefix(&context.alias_prefix)), @@ -50,44 +48,62 @@ impl<'a> LogicalNodeProcessor<'a, LogicalJoin> for LogicalJoinProcessor<'a> { Some(root.default_alias_with_prefix(&context.alias_prefix)), ); - for dimension_subquery in logical_join - .dimension_subqueries() //TODO move dimension_subquery to + let root_alias = root.default_alias_with_prefix(&context.alias_prefix); + for ms_ref in pk_refs .iter() - .filter(|d| &d.subquery_dimension.cube_name() == root.name()) + .filter(|r| matches_pk_cube(&r.join, root.name())) { - self.builder.add_subquery_join( - dimension_subquery.clone(), + let pk_dims = match &ms_ref.join { + MultiStageDimensionJoin::OnPrimaryKeys { pk_dimensions, .. } => pk_dimensions, + _ => continue, + }; + self.builder.add_multi_stage_dimension_pk_join( + &ms_ref.name, + pk_dims, + &root_alias, &mut join_builder, context, )?; } for join in logical_join.joins().iter() { + let joined_alias = join + .cube() + .cube() + .default_alias_with_prefix(&context.alias_prefix); join_builder.left_join_cube( join.cube().cube().clone(), - Some( - join.cube() - .cube() - .default_alias_with_prefix(&context.alias_prefix), - ), + Some(joined_alias.clone()), JoinCondition::new_base_join(SqlJoinCondition::try_new(join.on_sql().clone())?), ); - for dimension_subquery in logical_join - .dimension_subqueries() + for ms_ref in pk_refs .iter() - .filter(|d| &d.subquery_dimension.cube_name() == join.cube().cube().name()) + .filter(|r| matches_pk_cube(&r.join, join.cube().cube().name())) { - self.builder.add_subquery_join( - dimension_subquery.clone(), + let pk_dims = match &ms_ref.join { + MultiStageDimensionJoin::OnPrimaryKeys { pk_dimensions, .. } => { + pk_dimensions + } + _ => continue, + }; + self.builder.add_multi_stage_dimension_pk_join( + &ms_ref.name, + pk_dims, + &joined_alias, &mut join_builder, context, )?; } } - if let Some(multi_stage_dimension) = &multi_stage_dimension { - self.builder.add_multistage_dimension_join( - multi_stage_dimension, + for ms_ref in outer_refs.iter() { + let dims = match &ms_ref.join { + MultiStageDimensionJoin::OnOuterDimensions { dimensions } => dimensions, + _ => continue, + }; + self.builder.add_multistage_outer_dimensions_join( + ms_ref, + dims, &mut join_builder, - &context, + context, )?; } Ok(From::new_from_join(join_builder.build())) @@ -95,6 +111,15 @@ impl<'a> LogicalNodeProcessor<'a, LogicalJoin> for LogicalJoinProcessor<'a> { } } +fn matches_pk_cube(join: &MultiStageDimensionJoin, cube_name: &str) -> bool { + match join { + MultiStageDimensionJoin::OnPrimaryKeys { + cube_name: target, .. + } => target == cube_name, + _ => false, + } +} + impl ProcessableNode for LogicalJoin { type ProcessorType<'a> = LogicalJoinProcessor<'a>; } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/measure_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/measure_subquery.rs deleted file mode 100644 index 825bc1d08e3dc..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/measure_subquery.rs +++ /dev/null @@ -1,62 +0,0 @@ -use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; -use crate::logical_plan::MeasureSubquery; -use crate::physical_plan::ReferencesBuilder; -use crate::physical_plan::{Select, SelectBuilder}; -use crate::physical_plan_builder::PhysicalPlanBuilder; -use cubenativeutils::CubeError; -use std::rc::Rc; - -pub struct MeasureSubqueryProcessor<'a> { - builder: &'a PhysicalPlanBuilder, -} - -impl<'a> LogicalNodeProcessor<'a, MeasureSubquery> for MeasureSubqueryProcessor<'a> { - type PhysycalNode = Rc; + fn new(builder: &'a PhysicalPlanBuilder) -> Self { + Self { builder } + } + + fn process( + &self, + logical_plan: &LogicalPlan, + context: &PushDownBuilderContext, + ) -> Result { + let mut context = context.clone(); + let mut ctes = Vec::with_capacity(logical_plan.ctes().len()); + + // Render every CTE body in declaration order — later bodies may + // reference earlier CTE schemas via `context.add_cte_schema`. + // The order mirrors how `cte_state` accumulated them in the planner. + for member in logical_plan.ctes().iter() { + let body_plan = self.render_body(&member.body, &context)?; + let alias = member.name.clone(); + context.add_cte_schema(alias.clone(), body_plan.schema()); + ctes.push(Rc::new(Cte::new(Rc::new(body_plan), alias))); + } + + let root_select = self + .builder + .process_node(logical_plan.root().as_ref(), &context)?; + Ok(root_select.with_ctes(ctes)) + } +} + +impl<'a> PlanProcessor<'a> { + fn render_body( + &self, + body: &MultiStageMemberBody, + context: &PushDownBuilderContext, + ) -> Result { + match body { + MultiStageMemberBody::Query(q) => { + let select = self.builder.process_node(q.as_ref(), context)?; + Ok(QueryPlan::Select(select)) + } + MultiStageMemberBody::TimeSeries(ts) => self.builder.process_node(ts.as_ref(), context), + MultiStageMemberBody::RollingWindow(rw) => { + self.builder.process_node(rw.as_ref(), context) + } + } + } +} + +impl ProcessableNode for LogicalPlan { + type ProcessorType<'a> = PlanProcessor<'a>; +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs index 28bbfbff54f46..6f3037e4f67d2 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs @@ -1,12 +1,13 @@ use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; -use crate::logical_plan::{all_symbols, MultiStageMemberLogicalType, Query, QuerySource}; +use crate::logical_plan::{all_symbols, FactKind, Query, QueryKind, QuerySource, StageKind}; use crate::physical_plan::{ - CalcGroupItem, CalcGroupsJoin, Cte, Expr, From, MemberExpression, ReferencesBuilder, Select, - SelectBuilder, + CalcGroupItem, CalcGroupsJoin, Expr, From, MemberExpression, QualifiedColumnName, + ReferencesBuilder, Select, SelectBuilder, }; use crate::physical_plan_builder::PhysicalPlanBuilder; use crate::planner::collectors::collect_calc_group_dims_from_nodes; use crate::planner::get_filtered_values; +use crate::planner::MemberSymbol; use cubenativeutils::CubeError; use itertools::Itertools; use std::rc::Rc; @@ -15,16 +16,6 @@ pub struct QueryProcessor<'a> { builder: &'a PhysicalPlanBuilder, } -impl QueryProcessor<'_> { - fn is_over_full_aggregated_source(&self, logical_plan: &Query) -> bool { - match logical_plan.source() { - QuerySource::FullKeyAggregate(fk) => !fk.is_empty(), - QuerySource::PreAggregation(_) => false, - QuerySource::LogicalJoin(_) => false, - } - } -} - impl<'a> LogicalNodeProcessor<'a, Query> for QueryProcessor<'a> { type PhysycalNode = Rc