diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 86b33a10edd81..07e0eb1a77aa9 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1392,6 +1392,82 @@ impl LogicalPlan { } } + /// Returns the skip (offset) of this plan node, if it has one. + /// + /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants + /// return `Ok(None)`. Returns `Ok(None)` for a zero skip. + pub fn skip(&self) -> Result> { + match self { + LogicalPlan::Limit(limit) => match limit.get_skip_type()? { + SkipType::Literal(0) => Ok(None), + SkipType::Literal(n) => Ok(Some(n)), + SkipType::UnsupportedExpr => Ok(None), + }, + LogicalPlan::Sort(_) => Ok(None), + LogicalPlan::TableScan(_) => Ok(None), + LogicalPlan::Projection(_) => Ok(None), + LogicalPlan::Filter(_) => Ok(None), + LogicalPlan::Window(_) => Ok(None), + LogicalPlan::Aggregate(_) => Ok(None), + LogicalPlan::Join(_) => Ok(None), + LogicalPlan::Repartition(_) => Ok(None), + LogicalPlan::Union(_) => Ok(None), + LogicalPlan::EmptyRelation(_) => Ok(None), + LogicalPlan::Subquery(_) => Ok(None), + LogicalPlan::SubqueryAlias(_) => Ok(None), + LogicalPlan::Statement(_) => Ok(None), + LogicalPlan::Values(_) => Ok(None), + LogicalPlan::Explain(_) => Ok(None), + LogicalPlan::Analyze(_) => Ok(None), + LogicalPlan::Extension(_) => Ok(None), + LogicalPlan::Distinct(_) => Ok(None), + LogicalPlan::Dml(_) => Ok(None), + LogicalPlan::Ddl(_) => Ok(None), + LogicalPlan::Copy(_) => Ok(None), + LogicalPlan::DescribeTable(_) => Ok(None), + LogicalPlan::Unnest(_) => Ok(None), + LogicalPlan::RecursiveQuery(_) => Ok(None), + } + } + + /// Returns the fetch (limit) of this plan node, if it has one. + /// + /// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and + /// [`LogicalPlan::Limit`] may carry a fetch value; all other variants + /// return `Ok(None)`. + pub fn fetch(&self) -> Result> { + match self { + LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch), + LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch), + LogicalPlan::Limit(limit) => match limit.get_fetch_type()? { + FetchType::Literal(s) => Ok(s), + FetchType::UnsupportedExpr => Ok(None), + }, + LogicalPlan::Projection(_) => Ok(None), + LogicalPlan::Filter(_) => Ok(None), + LogicalPlan::Window(_) => Ok(None), + LogicalPlan::Aggregate(_) => Ok(None), + LogicalPlan::Join(_) => Ok(None), + LogicalPlan::Repartition(_) => Ok(None), + LogicalPlan::Union(_) => Ok(None), + LogicalPlan::EmptyRelation(_) => Ok(None), + LogicalPlan::Subquery(_) => Ok(None), + LogicalPlan::SubqueryAlias(_) => Ok(None), + LogicalPlan::Statement(_) => Ok(None), + LogicalPlan::Values(_) => Ok(None), + LogicalPlan::Explain(_) => Ok(None), + LogicalPlan::Analyze(_) => Ok(None), + LogicalPlan::Extension(_) => Ok(None), + LogicalPlan::Distinct(_) => Ok(None), + LogicalPlan::Dml(_) => Ok(None), + LogicalPlan::Ddl(_) => Ok(None), + LogicalPlan::Copy(_) => Ok(None), + LogicalPlan::DescribeTable(_) => Ok(None), + LogicalPlan::Unnest(_) => Ok(None), + LogicalPlan::RecursiveQuery(_) => Ok(None), + } + } + /// If this node's expressions contains any references to an outer subquery pub fn contains_outer_reference(&self) -> bool { let mut contains = false; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 1eb117f8abdce..fed77809823f8 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -796,6 +796,13 @@ impl OptimizerRule for PushDownFilter { filter.predicate = new_predicate; } + // If the child has a fetch (limit) or skip (offset), pushing a filter + // below it would change semantics: the limit/offset should apply before + // the filter, not after. + if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() { + return Ok(Transformed::no(LogicalPlan::Filter(filter))); + } + match Arc::unwrap_or_clone(filter.input) { LogicalPlan::Filter(child_filter) => { let parents_predicates = split_conjunction_owned(filter.predicate); @@ -4315,4 +4322,63 @@ mod tests { " ) } + + #[test] + fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> { + let scan = test_table_scan()?; + let scan_with_fetch = match scan { + LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan { + fetch: Some(10), + ..scan + }), + _ => unreachable!(), + }; + let plan = LogicalPlanBuilder::from(scan_with_fetch) + .filter(col("a").gt(lit(10i64)))? + .build()?; + // Filter must NOT be pushed into the table scan when it has a fetch (limit) + assert_optimized_plan_equal!( + plan, + @r" + Filter: test.a > Int64(10) + TableScan: test, fetch=10 + " + ) + } + + #[test] + fn filter_push_down_through_sort_without_fetch() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, true)])? + .filter(col("a").gt(lit(10i64)))? + .build()?; + // Filter should be pushed below the sort + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS FIRST + TableScan: test, full_filters=[test.a > Int64(10)] + " + ) + } + + #[test] + fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort_with_limit(vec![col("a").sort(true, true)], Some(5))? + .filter(col("a").gt(lit(10i64)))? + .build()?; + // Filter must NOT be pushed below the sort when it has a fetch (limit), + // because the limit should apply before the filter. + assert_optimized_plan_equal!( + plan, + @r" + Filter: test.a > Int64(10) + Sort: test.a ASC NULLS FIRST, fetch=5 + TableScan: test + " + ) + } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 70918a348b0fa..bdf08823a29d1 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec { config: &datafusion_common::config::ConfigOptions, ) -> Result { if phase != FilterPushdownPhase::Post { + if self.fetch.is_some() { + return Ok(FilterDescription::all_unsupported( + &parent_filters, + &self.children(), + )); + } return FilterDescription::from_children(parent_filters, &self.children()); } - let mut child = - ChildFilterDescription::from_child(&parent_filters, self.input())?; + // In Post phase: block parent filters when fetch is set, + // but still push the TopK dynamic filter (self-filter). + let mut child = if self.fetch.is_some() { + ChildFilterDescription::all_unsupported(&parent_filters) + } else { + ChildFilterDescription::from_child(&parent_filters, self.input())? + }; if let Some(filter) = &self.filter && config.optimizer.enable_topk_dynamic_filter_pushdown @@ -1430,7 +1441,10 @@ mod tests { use super::*; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::collect; + use crate::empty::EmptyExec; + use crate::execution_plan::Boundedness; use crate::expressions::col; + use crate::filter_pushdown::{FilterPushdownPhase, PushedDown}; use crate::test; use crate::test::TestMemoryExec; use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero}; @@ -1441,14 +1455,18 @@ mod tests { use arrow::datatypes::*; use datafusion_common::ScalarValue; use datafusion_common::cast::as_primitive_array; + use datafusion_common::config::ConfigOptions; use datafusion_common::test_util::batches_to_string; use datafusion_execution::RecordBatchStream; use datafusion_execution::config::SessionConfig; + use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryConsumer, MemoryPool, + }; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::expressions::{Column, Literal}; - use futures::{FutureExt, Stream}; + use futures::{FutureExt, Stream, TryStreamExt}; use insta::assert_snapshot; #[derive(Debug, Clone)] @@ -2747,10 +2765,6 @@ mod tests { /// those bytes become unaccounted-for reserved memory that nobody uses. #[tokio::test] async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> { - use datafusion_execution::memory_pool::{ - GreedyMemoryPool, MemoryConsumer, MemoryPool, - }; - let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB // Pool: merge reservation (10KB) + enough room for sort to work. @@ -2861,4 +2875,68 @@ mod tests { drop(contender); Ok(()) } + + fn make_sort_exec_with_fetch(fetch: Option) -> SortExec { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let input = Arc::new(EmptyExec::new(schema)); + SortExec::new( + [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(), + input, + ) + .with_fetch(fetch) + } + + #[test] + fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> { + let sort = make_sort_exec_with_fetch(Some(10)); + let desc = sort.gather_filters_for_pushdown( + FilterPushdownPhase::Pre, + vec![Arc::new(Column::new("a", 0))], + &ConfigOptions::new(), + )?; + // Sort with fetch (TopK) must not allow filters to be pushed below it. + assert!(matches!( + desc.parent_filters()[0][0].discriminant, + PushedDown::No + )); + Ok(()) + } + + #[test] + fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> { + let sort = make_sort_exec_with_fetch(None); + let desc = sort.gather_filters_for_pushdown( + FilterPushdownPhase::Pre, + vec![Arc::new(Column::new("a", 0))], + &ConfigOptions::new(), + )?; + // Plain sort (no fetch) is filter-commutative. + assert!(matches!( + desc.parent_filters()[0][0].discriminant, + PushedDown::Yes + )); + Ok(()) + } + + #[test] + fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> { + let sort = make_sort_exec_with_fetch(Some(10)); + assert!(sort.filter.is_some(), "TopK filter should be created"); + + let mut config = ConfigOptions::new(); + config.optimizer.enable_topk_dynamic_filter_pushdown = true; + let desc = sort.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![Arc::new(Column::new("a", 0))], + &config, + )?; + // Parent filters are still blocked in the Post phase. + assert!(matches!( + desc.parent_filters()[0][0].discriminant, + PushedDown::No + )); + // But the TopK self-filter should be pushed down. + assert_eq!(desc.self_filters()[0].len(), 1); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index ff3c49485a286..f5ec26d304d41 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -869,6 +869,45 @@ limit 1000; statement ok DROP TABLE test_limit_with_partitions; +# Tests for filter pushdown behavior with Sort + LIMIT (fetch). + +statement ok +CREATE TABLE t(id INT, value INT) AS VALUES +(1, 100), +(2, 200), +(3, 300), +(4, 400), +(5, 500); + +# Take the 3 smallest values (100, 200, 300), then filter value > 200. +query II +SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200; +---- +3 300 + +# Take the 3 largest values (500, 400, 300), then filter value < 400. +query II +SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400; +---- +3 300 + +# The filter stays above the sort+fetch in the plan. +query TT +EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200; +---- +logical_plan +01)SubqueryAlias: sub +02)--Filter: t.value > Int32(200) +03)----Sort: t.value ASC NULLS LAST, fetch=3 +04)------TableScan: t projection=[id, value] +physical_plan +01)FilterExec: value@1 > 200 +02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE t; + # Tear down src_table table: statement ok DROP TABLE src_table; diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 61faf4dc9650f..05e364a14bd66 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1 ---- logical_plan 01)Sort: rn1 ASC NULLS LAST -02)--Sort: rn1 ASC NULLS LAST, fetch=5 -03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 -04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50) +02)--Filter: rn1 < UInt64(50) +03)----Sort: rn1 ASC NULLS LAST, fetch=5 +04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] physical_plan -01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] -02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5 -03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] +01)FilterExec: rn1@5 < 50 +02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] +03)----GlobalLimitExec: skip=0, fetch=5 +04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] # Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required # global order. The existing sort is for the second-term lexicographical ordering requirement, which is being