Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,27 @@ impl LogicalPlanBuilder {
self,
sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
fetch: Option<usize>,
) -> Result<Self> {
self.sort_with_limit_inner(sorts, fetch, false)
}

/// Apply a sort with option to skip adding missing columns
///
/// This is used by SELECT statements where missing ORDER BY columns are
/// already added by `add_missing_order_by_exprs`.
pub fn sort_with_limit_skip_missing(
self,
sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
fetch: Option<usize>,
) -> Result<Self> {
self.sort_with_limit_inner(sorts, fetch, true)
}

fn sort_with_limit_inner(
self,
sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
fetch: Option<usize>,
skip_add_missing_columns: bool,
) -> Result<Self> {
let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;

Expand All @@ -820,7 +841,7 @@ impl LogicalPlanBuilder {
Ok(())
})?;

if missing_cols.is_empty() {
if missing_cols.is_empty() || skip_add_missing_columns {
return Ok(Self::new(LogicalPlan::Sort(Sort {
expr: normalize_sorts(sorts, &self.plan)?,
input: self.plan,
Expand Down
298 changes: 297 additions & 1 deletion datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
Column, DFSchema, Result, not_impl_err, plan_datafusion_err, plan_err,
Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_datafusion_err, plan_err,
};
use datafusion_expr::expr::Sort;
use datafusion_expr::{Expr, SortExpr};
use indexmap::IndexSet;
use sqlparser::ast::{
Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan,
};
Expand Down Expand Up @@ -117,4 +123,294 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

Ok(sort_expr_vec)
}

/// Add missing ORDER BY expressions to the SELECT list.
///
/// This function handles the case where ORDER BY expressions reference columns
/// or expressions that are not present in the SELECT list. Instead of traversing
/// the plan tree to find projection nodes, it directly adds the missing
/// expressions to the SELECT list.
///
/// # Behavior
///
/// - For aggregate functions (e.g., `SUM(x)`) and window functions, the original
/// expression is added to the SELECT list, and the ORDER BY expression is
/// replaced with a column reference to that expression's output name.
///
/// - For column references that don't exist in the current schema, the column
/// reference itself is added to the SELECT list.
///
/// - If the query uses `SELECT DISTINCT` and there are missing ORDER BY
/// expressions, an error is returned, as this would make the DISTINCT
/// operation ambiguous.
///
/// - Aliases defined in the SELECT list are recognized and used to replace
/// the corresponding expressions in ORDER BY with column references.
///
/// - When `strict` is true (e.g., when GROUP BY is present), ORDER BY
/// expressions must already be in the SELECT list, be an alias, or be an
/// aggregate/window function. Missing expressions will cause an error instead
/// of being added to the SELECT list. This preserves the error message
/// "Column in ORDER BY must be in GROUP BY" for invalid queries.
///
/// # Arguments
///
/// * `select_exprs` - Mutable reference to the SELECT expressions list. Missing
/// expressions will be added to this list (unless strict is true).
/// * `schema` - The schema of the projected plan, used to check if column
/// references exist.
/// * `distinct` - Whether the query uses `SELECT DISTINCT`. If true, missing
/// ORDER BY expressions will cause an error.
/// * `strict` - Whether to strictly validate ORDER BY expressions. If true,
/// missing expressions will cause an error instead of being added.
/// * `order_by` - Mutable slice of ORDER BY expressions. The expressions will
/// be rewritten to use column references where appropriate.
///
/// # Returns
///
/// * `Ok(true)` - If expressions were added to the SELECT list.
/// * `Ok(false)` - If no expressions needed to be added.
/// * `Err(...)` - If there's an error (e.g., DISTINCT with missing ORDER BY
/// expressions).
///
/// # Example
///
/// ```text
/// Input: SELECT x FROM foo ORDER BY y
///
/// Before: select_exprs = [x]
/// order_by = [Sort { expr: Column(y), ... }]
///
/// After: select_exprs = [x, y]
/// order_by = [Sort { expr: Column(y), ... }]
/// returns Ok(true)
/// ```
pub(crate) fn add_missing_order_by_exprs(
select_exprs: &mut Vec<Expr>,
schema: &DFSchemaRef,
distinct: bool,
strict: bool,
order_by: &mut [Sort],
) -> Result<bool> {
add_missing_order_by_exprs_impl(select_exprs, schema, distinct, strict, order_by)
}
}

/// Internal implementation of add_missing_order_by_exprs for testability.
fn add_missing_order_by_exprs_impl(
select_exprs: &mut Vec<Expr>,
schema: &DFSchemaRef,
distinct: bool,
strict: bool,
order_by: &mut [Sort],
) -> Result<bool> {
let mut missing_exprs: IndexSet<Expr> = IndexSet::new();

let mut aliases = HashMap::new();
for expr in select_exprs.iter() {
if let Expr::Alias(alias) = expr {
aliases.insert(alias.expr.clone(), alias.name.clone());
}
}

let mut rewrite = |expr: Expr| {
if select_exprs.contains(&expr) {
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}
if let Some(alias) = aliases.get(&expr) {
return Ok(Transformed::new(
Expr::Column(Column::new_unqualified(alias.clone())),
false,
TreeNodeRecursion::Jump,
));
}
match expr {
Expr::AggregateFunction(_) | Expr::WindowFunction(_) => {
let replaced = Expr::Column(Column::new_unqualified(
expr.schema_name().to_string(),
));
missing_exprs.insert(expr);
Ok(Transformed::new(replaced, true, TreeNodeRecursion::Jump))
}
Expr::Column(ref c) => {
if strict {
// In strict mode (e.g., GROUP BY present), the column must exist in schema
// If it doesn't exist and isn't in select_exprs, we'll error later
// Don't add it to missing_exprs to preserve proper error message
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
} else if !schema.has_column(c) {
missing_exprs.insert(Expr::Column(c.clone()));
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
} else {
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
}
_ => Ok(Transformed::no(expr)),
}
};
for sort in order_by.iter_mut() {
let expr = std::mem::take(&mut sort.expr);
sort.expr = expr.transform_down(&mut rewrite).data()?;
}
if !missing_exprs.is_empty() {
if distinct {
plan_err!(
"For SELECT DISTINCT, ORDER BY expressions {} must appear in select list",
missing_exprs[0]
)
} else {
select_exprs.extend(missing_exprs);
Ok(true)
}
} else {
Ok(false)
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};
use datafusion_expr::expr::Alias;

fn create_test_schema() -> DFSchemaRef {
let fields = vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
];
DFSchemaRef::new(
DFSchema::from_unqualified_fields(fields.into(), HashMap::new()).unwrap(),
)
}

#[test]
fn test_add_missing_column_not_in_select() {
let schema = create_test_schema();
let mut select_exprs = vec![col("a")];
let mut order_by = vec![col("d").sort(true, false)]; // d is not in schema

let result = add_missing_order_by_exprs_impl(
&mut select_exprs,
&schema,
false,
false,
&mut order_by,
);

// d is not in schema, so it should be added
assert!(result.unwrap());
assert_eq!(select_exprs.len(), 2);
assert!(select_exprs.contains(&col("a")));
assert!(select_exprs.contains(&col("d")));
}

#[test]
fn test_no_missing_column_when_already_in_select() {
let schema = create_test_schema();
let mut select_exprs = vec![col("a"), col("b")];
let mut order_by = vec![col("b").sort(true, false)];

let result = add_missing_order_by_exprs_impl(
&mut select_exprs,
&schema,
false,
false,
&mut order_by,
);

assert!(!result.unwrap());
assert_eq!(select_exprs.len(), 2);
}

#[test]
fn test_alias_resolution() {
let schema = create_test_schema();
// SELECT a AS x, b
let mut select_exprs = vec![
Expr::Alias(Alias::new(col("a"), None::<&str>, "x")),
col("b"),
];
// ORDER BY a (should be resolved to alias x)
let mut order_by = vec![col("a").sort(true, false)];

let result = add_missing_order_by_exprs_impl(
&mut select_exprs,
&schema,
false,
false,
&mut order_by,
);

// No new expressions should be added (a is resolved to alias x)
assert!(!result.unwrap());
// ORDER BY a should be replaced with Column(x) reference
assert_eq!(order_by[0].expr, col("x"));
}

#[test]
fn test_distinct_with_missing_column_error() {
let schema = create_test_schema();
// SELECT DISTINCT a
// ORDER BY d (d is not in select, not in schema)
let mut select_exprs = vec![col("a")];
let mut order_by = vec![col("d").sort(true, false)];

let result = add_missing_order_by_exprs_impl(
&mut select_exprs,
&schema,
true, // distinct = true
false,
&mut order_by,
);

assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("SELECT DISTINCT"));
assert!(err_msg.contains("must appear in select list"));
}

#[test]
fn test_strict_mode_no_add() {
let schema = create_test_schema();
let mut select_exprs = vec![col("a")];
let mut order_by = vec![col("b").sort(true, false)];

// strict = true should NOT add missing columns
let result = add_missing_order_by_exprs_impl(
&mut select_exprs,
&schema,
false,
true, // strict = true
&mut order_by,
);

assert!(!result.unwrap());
assert_eq!(select_exprs.len(), 1); // b was not added
}

#[test]
fn test_column_in_order_by_not_in_select_or_schema() {
let schema = create_test_schema();
// SELECT a, b
// ORDER BY d - d is not in schema (would come from FROM clause in real scenario)
let mut select_exprs = vec![col("a"), col("b")];
let mut order_by = vec![col("d").sort(true, false)];

let result = add_missing_order_by_exprs_impl(
&mut select_exprs,
&schema,
false,
false,
&mut order_by,
);

// d should be added to select_exprs
assert!(result.unwrap());
assert!(select_exprs.contains(&col("d")));
}

fn col(name: &str) -> Expr {
Expr::Column(Column::new_unqualified(name))
}
}
16 changes: 14 additions & 2 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
true,
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
// Pass false to skip_add_missing_columns because for non-SELECT set expressions
// (like UNION), we still need to use add_missing_columns in sort_with_limit
let plan = self.order_by(plan, order_by_rex, false)?;
self.limit(plan, limit_clause, planner_context)
}
}?;
Expand Down Expand Up @@ -134,7 +136,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
true,
None,
)?;
self.order_by(plan, sort_exprs)
// For pipe operator ORDER BY, use add_missing_columns behavior
self.order_by(plan, sort_exprs, false)
}
PipeOperator::Limit { expr, offset } => self.limit(
plan,
Expand Down Expand Up @@ -299,10 +302,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

/// Wrap the logical in a sort
///
/// If `skip_add_missing_columns` is true, the method will not try to add
/// missing columns to the input plan. This is used by SELECT statements
/// where missing ORDER BY columns are already added by `add_missing_order_by_exprs`.
pub(super) fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<Sort>,
skip_add_missing_columns: bool,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
Expand All @@ -313,6 +321,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// optimization we're effectively doing a `first_value` aggregation according to them.
let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else if skip_add_missing_columns {
LogicalPlanBuilder::from(plan)
.sort_with_limit_skip_missing(order_by, None)?
.build()
} else {
LogicalPlanBuilder::from(plan).sort(order_by)?.build()
}
Expand Down
Loading
Loading