feat: Create dynamic filters in SortMergeJoin#23064
Draft
adriangb wants to merge 1 commit into
Draft
Conversation
Adds support for generating dynamic filters for the left and right sides of a SortMergeJoinExec, enabling range-based pruning of both sides of the join (Inner / LeftSemi / RightSemi). The work is extracted from the abandoned PR apache#21267, scoped to the SMJ subsystem only. Two notable differences from apache#21267: 1. That PR also modified handle_child_pushdown_result in a number of unrelated operators (coalesce_batches, coop, filter, projection, repartition, sort, sort_preserving_merge, limit). Those changes are unnecessary: the physical-optimizer filter-pushdown driver already rebuilds each parent with its updated children via with_new_children_if_necessary plus the ptr_eq safety net in push_down_filters. The fuzz test confirms the dynamic filter reaches the DataSourceExec through Sort + Repartition without them. 2. The dynamic-filter coordinator is rewritten to follow the correctness model of HashJoinExec's SharedBuildAccumulator: build the filter from complete information and publish it exactly once. The original design shared a single filter across concurrently-executing hash partitions and advanced a one-sided bound as partitions progressed (raising it on partition exhaustion), which non-deterministically pruned valid join rows. The accumulator now gathers the feeding side's global [min, max] and publishes a static `col >= min AND col <= max` superset once all partitions are exhausted. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Pxss1NoHRv1ZdQzSoZMDKr
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
SortMergeJoinExec#20443.Rationale for this change
This adds support for generating dynamic filters for the left and right sides of a
SortMergeJoinExec, enabling range-based pruning of both sides of the join (forInner/LeftSemi/RightSemi). Consumers that support filter pushdown (e.g. Parquet scans) can use the pushed-down filter to prune at scan time.This is a scoped re-do of #21267, which was abandoned. It differs in two important ways.
1. It does not touch unrelated operators
#21267 modified
handle_child_pushdown_resultin a number of operators (coalesce_batches,coop,filter,projection,repartition,sort,sort_preserving_merge,limit) to clone-and-rewrap their updated children. Those changes are unnecessary: the filter-pushdown driver (push_down_filtersindatafusion-physical-optimizer) already rebuilds each parent with its updated children viawith_new_children_if_necessaryplus theptr_eqsafety net:The integration test
test_smj_dynamic_filter_present_in_planconfirms theDynamicFilterreaches aDataSourceExecthrough the insertedSortExec+RepartitionExecwith none of those operator changes.2. Correctness: build from complete info, publish once
The dynamic-filter coordinator is rewritten to follow the model already used by
HashJoinExec'sSharedBuildAccumulator: build the filter from complete information and publish it exactly once.The original
SharedSortMergeBoundsAccumulatorshared a single dynamic filter across the concurrently-executing hash partitions feeding the join, and advanced a one-sided bound as partitions made progress — raising it when a partition exhausted. Because that single bound gates rows across all partitions, it could non-deterministically prune valid join rows. For example, with the existingjoins.sltDate32inner-join test (t1.c1 = {1,2,NULL,3}⋈t2.c1 = {1,NULL,NULL,3},batch_size=2,target_partitions=2), it dropped thekey=3row in roughly 4 of 6 runs.The accumulator now gathers the feeding side's global
[min, max](nulls skipped) and, only once all partitions are exhausted, publishes a static superset predicatecol >= min AND col <= max(orlit(false)when no non-null keys were seen) and marks the filter complete.A consequence of the publish-once design: for the common
scan → SortExec → SMJshape, the blocking sort drains the scan before the bound is known, so there is no skip-ahead benefit there; the benefit accrues to pushdown-capable, already-sorted scans. A future enhancement could route per-partition advancing bounds via a hashedCASEexpression (asHashJoinExec'sPartitionedmode does) to recover skip-ahead pruning while staying correct.What changes are included in this PR?
SortMergeJoinExecgenerates dynamic filters ingather_filters_for_pushdown(Post phase) and captures accepted ones inhandle_child_pushdown_result.SharedSortMergeBoundsAccumulator(publish-once[min, max]superset, mirroring HashJoin).dynamic_filter_updatesmetric.smj_filter_pushdown.rs) including a multi-partition + NULL-key regression test that runs the previously-flaky shape 40×, plus updated EXPLAIN expectations injoins.slt,sort_merge_join.slt,explain_tree.slt.Are these changes tested?
Yes. New
smj_filter_pushdownintegration tests (correctness with/without the filter, plan-presence, and a multi-partition NULL-key regression test), updated sqllogictest EXPLAIN plans, plus the existing SMJ unit tests. The full sqllogictest suite,datafusion-physical-optimizer, anddatafusion-physical-planjoin tests pass.Are there any user-facing changes?
EXPLAIN output for plans containing a
SortMergeJoinExecwith dynamic-filter pushdown enabled (the default) now shows aDynamicFilterpushed to the scans it feeds (as aFilterExecfor sources that cannot absorb it, or as a scan predicate for those that can). LikeHashJoinExec, the filter is not rendered on the join node itself.🤖 Generated with Claude Code