Skip to content
Open
76 changes: 76 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,82 @@ impl LogicalPlan {
}
}

/// Returns the skip (offset) of this plan node, if it has one.
///
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
pub fn skip(&self) -> Result<Option<usize>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not fold this into fn limit()? It seems like (almost) the same thing to me

match self {
LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
SkipType::Literal(0) => Ok(None),
SkipType::Literal(n) => Ok(Some(n)),
SkipType::UnsupportedExpr => Ok(None),
},
LogicalPlan::Sort(_) => Ok(None),
LogicalPlan::TableScan(_) => Ok(None),
LogicalPlan::Projection(_) => Ok(None),
LogicalPlan::Filter(_) => Ok(None),
LogicalPlan::Window(_) => Ok(None),
LogicalPlan::Aggregate(_) => Ok(None),
LogicalPlan::Join(_) => Ok(None),
LogicalPlan::Repartition(_) => Ok(None),
LogicalPlan::Union(_) => Ok(None),
LogicalPlan::EmptyRelation(_) => Ok(None),
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::SubqueryAlias(_) => Ok(None),
LogicalPlan::Statement(_) => Ok(None),
LogicalPlan::Values(_) => Ok(None),
LogicalPlan::Explain(_) => Ok(None),
LogicalPlan::Analyze(_) => Ok(None),
LogicalPlan::Extension(_) => Ok(None),
LogicalPlan::Distinct(_) => Ok(None),
LogicalPlan::Dml(_) => Ok(None),
LogicalPlan::Ddl(_) => Ok(None),
LogicalPlan::Copy(_) => Ok(None),
LogicalPlan::DescribeTable(_) => Ok(None),
LogicalPlan::Unnest(_) => Ok(None),
LogicalPlan::RecursiveQuery(_) => Ok(None),
}
}

/// Returns the fetch (limit) of this plan node, if it has one.
///
/// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
/// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
/// return `Ok(None)`.
pub fn fetch(&self) -> Result<Option<usize>> {
match self {
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
FetchType::Literal(s) => Ok(s),
FetchType::UnsupportedExpr => Ok(None),
},
LogicalPlan::Projection(_) => Ok(None),
LogicalPlan::Filter(_) => Ok(None),
LogicalPlan::Window(_) => Ok(None),
LogicalPlan::Aggregate(_) => Ok(None),
LogicalPlan::Join(_) => Ok(None),
LogicalPlan::Repartition(_) => Ok(None),
LogicalPlan::Union(_) => Ok(None),
LogicalPlan::EmptyRelation(_) => Ok(None),
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::SubqueryAlias(_) => Ok(None),
LogicalPlan::Statement(_) => Ok(None),
LogicalPlan::Values(_) => Ok(None),
LogicalPlan::Explain(_) => Ok(None),
LogicalPlan::Analyze(_) => Ok(None),
LogicalPlan::Extension(_) => Ok(None),
LogicalPlan::Distinct(_) => Ok(None),
LogicalPlan::Dml(_) => Ok(None),
LogicalPlan::Ddl(_) => Ok(None),
LogicalPlan::Copy(_) => Ok(None),
LogicalPlan::DescribeTable(_) => Ok(None),
LogicalPlan::Unnest(_) => Ok(None),
LogicalPlan::RecursiveQuery(_) => Ok(None),
}
}

/// If this node's expressions contains any references to an outer subquery
pub fn contains_outer_reference(&self) -> bool {
let mut contains = false;
Expand Down
66 changes: 66 additions & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,13 @@ impl OptimizerRule for PushDownFilter {
filter.predicate = new_predicate;
}

// If the child has a fetch (limit) or skip (offset), pushing a filter
// below it would change semantics: the limit/offset should apply before
// the filter, not after.
if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

match Arc::unwrap_or_clone(filter.input) {
LogicalPlan::Filter(child_filter) => {
let parents_predicates = split_conjunction_owned(filter.predicate);
Expand Down Expand Up @@ -4315,4 +4322,63 @@ mod tests {
"
)
}

#[test]
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
let scan = test_table_scan()?;
let scan_with_fetch = match scan {
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
fetch: Some(10),
..scan
}),
_ => unreachable!(),
};
let plan = LogicalPlanBuilder::from(scan_with_fetch)
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
TableScan: test, fetch=10
"
)
}

#[test]
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort(vec![col("a").sort(true, true)])?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter should be pushed below the sort
assert_optimized_plan_equal!(
plan,
@r"
Sort: test.a ASC NULLS FIRST
TableScan: test, full_filters=[test.a > Int64(10)]
"
)
}

#[test]
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed below the sort when it has a fetch (limit),
// because the limit should apply before the filter.
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
Sort: test.a ASC NULLS FIRST, fetch=5
TableScan: test
"
)
}
}
92 changes: 84 additions & 8 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec {
config: &datafusion_common::config::ConfigOptions,
) -> Result<FilterDescription> {
if phase != FilterPushdownPhase::Post {
if self.fetch.is_some() {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
return FilterDescription::from_children(parent_filters, &self.children());
}

let mut child =
ChildFilterDescription::from_child(&parent_filters, self.input())?;
// In Post phase: block parent filters when fetch is set,
// but still push the TopK dynamic filter (self-filter).
let mut child = if self.fetch.is_some() {
ChildFilterDescription::all_unsupported(&parent_filters)
} else {
ChildFilterDescription::from_child(&parent_filters, self.input())?
};

if let Some(filter) = &self.filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
Expand All @@ -1430,8 +1441,10 @@ mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
use crate::empty::EmptyExec;
use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
use crate::test;
use crate::test::TestMemoryExec;
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
Expand All @@ -1441,15 +1454,19 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::RecordBatchStream;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, Literal};

use futures::{FutureExt, Stream};
use futures::{FutureExt, Stream, TryStreamExt};
use insta::assert_snapshot;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -2748,11 +2765,6 @@ mod tests {
/// those bytes become unaccounted-for reserved memory that nobody uses.
#[tokio::test]
async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};
use futures::TryStreamExt;

let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB

// Pool: merge reservation (10KB) + enough room for sort to work.
Expand Down Expand Up @@ -2863,4 +2875,68 @@ mod tests {
drop(contender);
Ok(())
}

fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let input = Arc::new(EmptyExec::new(schema));
SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
)
.with_fetch(fetch)
}

#[test]
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
// Sort with fetch (TopK) must not allow filters to be pushed below it.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
Ok(())
}

#[test]
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(None);
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
// Plain sort (no fetch) is filter-commutative.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::Yes
));
Ok(())
}

#[test]
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
assert!(sort.filter.is_some(), "TopK filter should be created");

let mut config = ConfigOptions::new();
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![Arc::new(Column::new("a", 0))],
&config,
)?;
// Parent filters are still blocked in the Post phase.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
// But the TopK self-filter should be pushed down.
assert_eq!(desc.self_filters()[0].len(), 1);
Ok(())
}
}
39 changes: 39 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,45 @@ limit 1000;
statement ok
DROP TABLE test_limit_with_partitions;

# Tests for filter pushdown behavior with Sort + LIMIT (fetch).

statement ok
CREATE TABLE t(id INT, value INT) AS VALUES
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500);

# Take the 3 smallest values (100, 200, 300), then filter value > 200.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
3 300

# Take the 3 largest values (500, 400, 300), then filter value < 400.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
----
3 300

# The filter stays above the sort+fetch in the plan.
query TT
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
logical_plan
01)SubqueryAlias: sub
02)--Filter: t.value > Int32(200)
03)----Sort: t.value ASC NULLS LAST, fetch=3
04)------TableScan: t projection=[id, value]
physical_plan
01)FilterExec: value@1 > 200
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
DROP TABLE t;

# Tear down src_table table:
statement ok
DROP TABLE src_table;
15 changes: 8 additions & 7 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
----
logical_plan
01)Sort: rn1 ASC NULLS LAST
02)--Sort: rn1 ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it definitely had the filter too low 👍

02)--Filter: rn1 < UInt64(50)
03)----Sort: rn1 ASC NULLS LAST, fetch=5
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
physical_plan
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
01)FilterExec: rn1@5 < 50
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
03)----GlobalLimitExec: skip=0, fetch=5
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]

# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being
Expand Down
Loading