Skip to content

Fix push_down_filter for children with non-empty fetch fields#21057

Open
shivbhatia10 wants to merge 8 commits intoapache:mainfrom
shivbhatia10:sb/fix-filter-pushdown-on-sort-with-fetch
Open

Fix push_down_filter for children with non-empty fetch fields#21057
shivbhatia10 wants to merge 8 commits intoapache:mainfrom
shivbhatia10:sb/fix-filter-pushdown-on-sort-with-fetch

Conversation

@shivbhatia10
Copy link
Contributor

@shivbhatia10 shivbhatia10 commented Mar 19, 2026

Which issue does this PR close?

Rationale for this change

Currently if we see a filter with a limit underneath, we don't push the filter past the limit. However, sort nodes and table scan nodes can have fetch fields which do essentially the same thing, and we don't stop filters being pushed past them. This is a correctness bug that can lead to undefined behaviour.

I added checks for exactly this condition so we don't push the filter down. I think the prior expectation was that there would be a limit node between any of these nodes, but this is also not true. In push_down_limit.rs, there's code that does this optimisation when a limit has a sort under it:

           LogicalPlan::Sort(mut sort) => {
               let new_fetch = {
                   let sort_fetch = skip + fetch;
                   Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
               };
               if new_fetch == sort.fetch {
                   if skip > 0 {
                       original_limit(skip, fetch, LogicalPlan::Sort(sort))
                   } else {
                       Ok(Transformed::yes(LogicalPlan::Sort(sort)))
                   }
               } else {
                   sort.fetch = new_fetch;
                   limit.input = Arc::new(LogicalPlan::Sort(sort));
                   Ok(Transformed::yes(LogicalPlan::Limit(limit)))
               }
           }

The first time this runs, it sets the internal fetch of the sort to new_fetch, and on the second optimisation pass it hits the branch where we just get rid of the limit node altogether, leaving the sort node exposed to potential filters which can now push down into it.

There is also a related fix in gather_filters_for_pushdown in SortExec, which does the same thing for physical plan nodes. If we see that a given execution plan has non-empty fetch, it should not allow any parent filters to be pushed down.

What changes are included in this PR?

Added checks in the optimisation rule to avoid pushing filters past children with built-in limits.

Are these changes tested?

Yes:

  • Unit tests in push_down_filter.rs
  • Fixed an existing test in window.slt
  • Unit tests for the physical plan change in sort.rs
  • New slt test in push_down_filter_sort_fetch.slt for this exact behaviour

Are there any user-facing changes?

No

@github-actions github-actions bot added the optimizer Optimizer rules label Mar 19, 2026
// If the sort has a fetch (limit), pushing a filter below
// it would change semantics: the limit should apply before
// the filter, not after.
if sort.fetch.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionPlan has fetch on it so I wonder if we need to do this more generically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's physical nodes, this is the logical plan optimizer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For logical nodes as far as I can see only Sort, Limit, and TableScan have fetch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry yes, that sounds right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just trying to think about how we can prevent this kind of bug happening in the fututre.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to modify gather_filters_for_pushdown in SortExec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will need to check physical plan too.

if self.fetch.is_some() {
          return Ok(FilterDescription::all_unsupported(
              &parent_filters,
              &self.children(),
          ));
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be reasonable to add a fn fetch(&self) -> Option<usize> to LogicalPlan. We could make the match exhaustive / have no fall-through so that anyone adding a new variant has to update it. At least then it will be in one place and also closer to the change -> better for human and machine to pick it up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense @adriangb, I've tried adding that: 0e83891

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Mar 19, 2026
@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch from 66805b1 to 3c09d58 Compare March 19, 2026 17:06
@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch from c519c77 to 3ef82ad Compare March 19, 2026 17:44
@shivbhatia10 shivbhatia10 marked this pull request as ready for review March 19, 2026 17:55
@shivbhatia10 shivbhatia10 changed the title Don't push down filter if sort has fetch, same as limit Fix push_down_filter for children with non-empty fetch fields Mar 19, 2026
@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch from 3ef82ad to 6753fe5 Compare March 19, 2026 18:21
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 19, 2026
@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch 3 times, most recently from 4fb0964 to 5e8fc77 Compare March 19, 2026 19:13
03)----DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
DROP TABLE t;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails without our fixes

@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch from 274e8be to 1973f44 Compare March 19, 2026 20:01
@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch from 1973f44 to 475de9f Compare March 19, 2026 20:07
@shivbhatia10
Copy link
Contributor Author

Hey @alamb and @adriangb, would you like to take a look at this one?

@alamb alamb added the bug Something isn't working label Mar 20, 2026
@alamb
Copy link
Contributor

alamb commented Mar 20, 2026

I will revie this later today

Comment on lines +1143 to +1146
if scan.fetch.is_some() {
filter.input = Arc::new(LogicalPlan::TableScan(scan));
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
Copy link
Contributor

@hareshkh hareshkh Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb : Curious about your thoughts for this part of the change?
For a plan like:

FILTER col = val
|---- LIMIT 50
      |---- TABLE_SCAN

After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set.

Since there is no ordering specified, the LIMIT is essentially non-deterministic in the rows it returns. So the filter can be moved past it or run after it — both are semantically correct - in which case we can remove this part of the change. But if the table scan has an underlying implicit ordering (due to the layout of data or such), then pushing the filter around may be incorrect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set.

This is an excellent point

. So the filter can be moved past it or run after it — both are semantically correct

I am not sure they are both semantically correct. I think limit is (should be) applied after Filters in table providers and there are some places where that already happens. For example

  • // We should not limit the number of partitioned files to scan if there are filters and limit
    // at the same time. This is because the limit should be applied after the filters are applied.
    let statistic_file_limit = if filters.is_empty() { limit } else { None };

Also in the parquet data source I know the limit is applied after filters as well

However, that does not appear to be explicitly documented anywhere I could find -- I will make a PR to clarify the documentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Mar 20, 2026
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty amazing find @shivbhatia10 and @hareshkh -- thank you

I do think we should add LogicalPlan::Limit to LogicalPlan::fetch as well to make sure it is more future proof.

However, we can do this in a follow on PR as I think this PR makes the code better than it currently is and could be merged as is.

I also left some comments about the tests -- let me know what you think

Again, thank you. This is a great find

Scope Analysis

I tested your reproducer with all the released DataFusion versions back to 42.0.0 and they all show this problem, which means to me it is a long standing bug

Here is the reproducer for anyone who is interested

CREATE TABLE t(id INT, value INT) AS VALUES
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500);

SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;

SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;

EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;

DROP TABLE t;

filter.predicate = new_predicate;
}

// If the child has a fetch (limit), pushing a filter below it would
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

LogicalPlan::EmptyRelation(_) => None,
LogicalPlan::Subquery(_) => None,
LogicalPlan::SubqueryAlias(_) => None,
LogicalPlan::Limit(_) => None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised LogicalPlan::Limit does not return a value here as well;

I looked into the code a bit to try and understand why Limits were needed. Initially I thought it was because the following code claims that we only push down Filters after Limits

// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
Arc::new(PushDownLimit::new()),
Arc::new(PushDownFilter::new()),

However, at some point we started running multiple optimizer passes so PushdownFilter can (effectively) run after PushDownLimit

while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
for rule in &self.rules {
// If skipping failed rules, copy plan before attempting to rewrite
// as rewriting is destructive
let prev_plan = options
.optimizer
.skip_failed_rules
.then(|| new_plan.clone());
let starting_schema = Arc::clone(new_plan.schema());
let result = match rule.apply_order() {
// optimizer handles recursion
Some(apply_order) => new_plan.rewrite_with_subqueries(
&mut Rewriter::new(apply_order, rule.as_ref(), config),

So this bug was probably introduced when we started running multiple optimizer passes


// If the child has a fetch (limit), pushing a filter below it would
// change semantics: the limit should apply before the filter, not after.
if filter.input.fetch().is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also either explicitly check here for Limit or (better yet) change fetch() to also return Some for LogicalPlan::Limit for the reasons I explain above

Copy link
Contributor Author

@shivbhatia10 shivbhatia10 Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes a lot of sense, the reason I initially skipped Limit was because we already handle it correctly in PushDownFilter::rewrite, although very indirectly, because Limit is not covered in the match statement and therefore it falls back to leaving the filter unchanged. However this is a fragile assumption - I think the right thing to do here is to add another method to LogicalPlan called skip, since Limit can have both fetch and skip variants, and if either of them is non-empty we should block pushdown. This is true in general for any logical plan node I believe. So I've added that method, used the get_skip_type and get_fetch_type methods on Limit to extract these Optional<usize> types, and now we check for the presence of both skip and fetch in push down filter.

# specific language governing permissions and limitations
# under the License.

# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding this to an existing test rather than an entirely new .slt test to make it easier to discover int he future

Perhaps datafusion/sqllogictest/test_files/limit.slt

01)Sort: rn1 ASC NULLS LAST
02)--Sort: rn1 ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it definitely had the filter too low 👍

@github-actions github-actions bot added documentation Improvements or additions to documentation development-process Related to development process of DataFusion physical-expr Changes to the physical-expr crates labels Mar 21, 2026
@github-actions github-actions bot added core Core DataFusion crate substrait Changes to the substrait crate common Related to common crate functions Changes to functions implementation datasource Changes to the datasource crate ffi Changes to the ffi crate spark labels Mar 21, 2026
@shivbhatia10 shivbhatia10 force-pushed the sb/fix-filter-pushdown-on-sort-with-fetch branch from c09acd2 to 79bdcac Compare March 21, 2026 11:29
@github-actions github-actions bot removed documentation Improvements or additions to documentation development-process Related to development process of DataFusion physical-expr Changes to the physical-expr crates core Core DataFusion crate substrait Changes to the substrait crate common Related to common crate functions Changes to functions implementation datasource Changes to the datasource crate ffi Changes to the ffi crate spark labels Mar 21, 2026
@alamb
Copy link
Contributor

alamb commented Mar 21, 2026

THank you also @xanderbailey for the reviews

@shivbhatia10
Copy link
Contributor Author

Thank you for the review @alamb - I've added pushdown filter handling for Limit, checking both fetch and skip variants, and I've made some changes to the tests

///
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
pub fn skip(&self) -> Result<Option<usize>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not fold this into fn limit()? It seems like (almost) the same thing to me

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shivbhatia10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working logical-expr Logical plan and expressions optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Filter pushdown past children with fetch limits causes correctness bug

5 participants