Fix push_down_filter for children with non-empty fetch fields#21057
Fix push_down_filter for children with non-empty fetch fields#21057shivbhatia10 wants to merge 8 commits intoapache:mainfrom
Conversation
| // If the sort has a fetch (limit), pushing a filter below | ||
| // it would change semantics: the limit should apply before | ||
| // the filter, not after. | ||
| if sort.fetch.is_some() { |
There was a problem hiding this comment.
ExecutionPlan has fetch on it so I wonder if we need to do this more generically?
There was a problem hiding this comment.
That's physical nodes, this is the logical plan optimizer
There was a problem hiding this comment.
For logical nodes as far as I can see only Sort, Limit, and TableScan have fetch
There was a problem hiding this comment.
Ah sorry yes, that sounds right
There was a problem hiding this comment.
Was just trying to think about how we can prevent this kind of bug happening in the fututre.
There was a problem hiding this comment.
Do we need to modify gather_filters_for_pushdown in SortExec
There was a problem hiding this comment.
Yes, will need to check physical plan too.
if self.fetch.is_some() {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}There was a problem hiding this comment.
I think it would be reasonable to add a fn fetch(&self) -> Option<usize> to LogicalPlan. We could make the match exhaustive / have no fall-through so that anyone adding a new variant has to update it. At least then it will be in one place and also closer to the change -> better for human and machine to pick it up.
There was a problem hiding this comment.
I think that would be nice!
66805b1 to
3c09d58
Compare
c519c77 to
3ef82ad
Compare
3ef82ad to
6753fe5
Compare
4fb0964 to
5e8fc77
Compare
| 03)----DataSourceExec: partitions=1, partition_sizes=[1] | ||
|
|
||
| statement ok | ||
| DROP TABLE t; |
There was a problem hiding this comment.
This fails without our fixes
274e8be to
1973f44
Compare
datafusion/sqllogictest/test_files/push_down_filter_sort_fetch.slt
Outdated
Show resolved
Hide resolved
datafusion/sqllogictest/test_files/push_down_filter_sort_fetch.slt
Outdated
Show resolved
Hide resolved
1973f44 to
475de9f
Compare
|
I will revie this later today |
| if scan.fetch.is_some() { | ||
| filter.input = Arc::new(LogicalPlan::TableScan(scan)); | ||
| return Ok(Transformed::no(LogicalPlan::Filter(filter))); | ||
| } |
There was a problem hiding this comment.
@alamb : Curious about your thoughts for this part of the change?
For a plan like:
FILTER col = val
|---- LIMIT 50
|---- TABLE_SCAN
After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set.
Since there is no ordering specified, the LIMIT is essentially non-deterministic in the rows it returns. So the filter can be moved past it or run after it — both are semantically correct - in which case we can remove this part of the change. But if the table scan has an underlying implicit ordering (due to the layout of data or such), then pushing the filter around may be incorrect.
There was a problem hiding this comment.
After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set.
This is an excellent point
. So the filter can be moved past it or run after it — both are semantically correct
I am not sure they are both semantically correct. I think limit is (should be) applied after Filters in table providers and there are some places where that already happens. For example
datafusion/datafusion/catalog-listing/src/table.rs
Lines 511 to 514 in ee24f3c
Also in the parquet data source I know the limit is applied after filters as well
However, that does not appear to be explicitly documented anywhere I could find -- I will make a PR to clarify the documentation
There was a problem hiding this comment.
alamb
left a comment
There was a problem hiding this comment.
This is a pretty amazing find @shivbhatia10 and @hareshkh -- thank you
I do think we should add LogicalPlan::Limit to LogicalPlan::fetch as well to make sure it is more future proof.
However, we can do this in a follow on PR as I think this PR makes the code better than it currently is and could be merged as is.
I also left some comments about the tests -- let me know what you think
Again, thank you. This is a great find
Scope Analysis
I tested your reproducer with all the released DataFusion versions back to 42.0.0 and they all show this problem, which means to me it is a long standing bug
Here is the reproducer for anyone who is interested
CREATE TABLE t(id INT, value INT) AS VALUES
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500);
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
DROP TABLE t;| filter.predicate = new_predicate; | ||
| } | ||
|
|
||
| // If the child has a fetch (limit), pushing a filter below it would |
| LogicalPlan::EmptyRelation(_) => None, | ||
| LogicalPlan::Subquery(_) => None, | ||
| LogicalPlan::SubqueryAlias(_) => None, | ||
| LogicalPlan::Limit(_) => None, |
There was a problem hiding this comment.
I am surprised LogicalPlan::Limit does not return a value here as well;
I looked into the code a bit to try and understand why Limits were needed. Initially I thought it was because the following code claims that we only push down Filters after Limits
datafusion/datafusion/optimizer/src/optimizer.rs
Lines 297 to 299 in ee24f3c
However, at some point we started running multiple optimizer passes so PushdownFilter can (effectively) run after PushDownLimit
datafusion/datafusion/optimizer/src/optimizer.rs
Lines 386 to 402 in ee24f3c
So this bug was probably introduced when we started running multiple optimizer passes
|
|
||
| // If the child has a fetch (limit), pushing a filter below it would | ||
| // change semantics: the limit should apply before the filter, not after. | ||
| if filter.input.fetch().is_some() { |
There was a problem hiding this comment.
I think we should also either explicitly check here for Limit or (better yet) change fetch() to also return Some for LogicalPlan::Limit for the reasons I explain above
There was a problem hiding this comment.
I think this makes a lot of sense, the reason I initially skipped Limit was because we already handle it correctly in PushDownFilter::rewrite, although very indirectly, because Limit is not covered in the match statement and therefore it falls back to leaving the filter unchanged. However this is a fragile assumption - I think the right thing to do here is to add another method to LogicalPlan called skip, since Limit can have both fetch and skip variants, and if either of them is non-empty we should block pushdown. This is true in general for any logical plan node I believe. So I've added that method, used the get_skip_type and get_fetch_type methods on Limit to extract these Optional<usize> types, and now we check for the presence of both skip and fetch in push down filter.
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| # Tests for filter pushdown behavior with Sort + LIMIT (fetch). |
There was a problem hiding this comment.
I recommend adding this to an existing test rather than an entirely new .slt test to make it easier to discover int he future
Perhaps datafusion/sqllogictest/test_files/limit.slt
| 01)Sort: rn1 ASC NULLS LAST | ||
| 02)--Sort: rn1 ASC NULLS LAST, fetch=5 | ||
| 03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 | ||
| 04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50) |
There was a problem hiding this comment.
it definitely had the filter too low 👍
c09acd2 to
79bdcac
Compare
|
THank you also @xanderbailey for the reviews |
|
Thank you for the review @alamb - I've added pushdown filter handling for |
| /// | ||
| /// Only [`LogicalPlan::Limit`] carries a skip value; all other variants | ||
| /// return `Ok(None)`. Returns `Ok(None)` for a zero skip. | ||
| pub fn skip(&self) -> Result<Option<usize>> { |
There was a problem hiding this comment.
Why not fold this into fn limit()? It seems like (almost) the same thing to me

Which issue does this PR close?
Rationale for this change
Currently if we see a filter with a limit underneath, we don't push the filter past the limit. However, sort nodes and table scan nodes can have fetch fields which do essentially the same thing, and we don't stop filters being pushed past them. This is a correctness bug that can lead to undefined behaviour.
I added checks for exactly this condition so we don't push the filter down. I think the prior expectation was that there would be a limit node between any of these nodes, but this is also not true. In
push_down_limit.rs, there's code that does this optimisation when a limit has a sort under it:The first time this runs, it sets the internal fetch of the sort to new_fetch, and on the second optimisation pass it hits the branch where we just get rid of the limit node altogether, leaving the sort node exposed to potential filters which can now push down into it.
There is also a related fix in
gather_filters_for_pushdowninSortExec, which does the same thing for physical plan nodes. If we see that a given execution plan has non-empty fetch, it should not allow any parent filters to be pushed down.What changes are included in this PR?
Added checks in the optimisation rule to avoid pushing filters past children with built-in limits.
Are these changes tested?
Yes:
push_down_filter.rswindow.sltsort.rspush_down_filter_sort_fetch.sltfor this exact behaviourAre there any user-facing changes?
No