Skip to content

feat: Create dynamic filters in SortMergeJoin#23064

Draft
adriangb wants to merge 1 commit into
apache:mainfrom
pydantic:smj-dynamic-filter-only
Draft

feat: Create dynamic filters in SortMergeJoin#23064
adriangb wants to merge 1 commit into
apache:mainfrom
pydantic:smj-dynamic-filter-only

Conversation

@adriangb

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

This adds support for generating dynamic filters for the left and right sides of a SortMergeJoinExec, enabling range-based pruning of both sides of the join (for Inner / LeftSemi / RightSemi). Consumers that support filter pushdown (e.g. Parquet scans) can use the pushed-down filter to prune at scan time.

This is a scoped re-do of #21267, which was abandoned. It differs in two important ways.

1. It does not touch unrelated operators

#21267 modified handle_child_pushdown_result in a number of operators (coalesce_batches, coop, filter, projection, repartition, sort, sort_preserving_merge, limit) to clone-and-rewrap their updated children. Those changes are unnecessary: the filter-pushdown driver (push_down_filters in datafusion-physical-optimizer) already rebuilds each parent with its updated children via with_new_children_if_necessary plus the ptr_eq safety net:

if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) {
    res.updated_node = Some(updated_node)
}

The integration test test_smj_dynamic_filter_present_in_plan confirms the DynamicFilter reaches a DataSourceExec through the inserted SortExec + RepartitionExec with none of those operator changes.

2. Correctness: build from complete info, publish once

The dynamic-filter coordinator is rewritten to follow the model already used by HashJoinExec's SharedBuildAccumulator: build the filter from complete information and publish it exactly once.

The original SharedSortMergeBoundsAccumulator shared a single dynamic filter across the concurrently-executing hash partitions feeding the join, and advanced a one-sided bound as partitions made progress — raising it when a partition exhausted. Because that single bound gates rows across all partitions, it could non-deterministically prune valid join rows. For example, with the existing joins.slt Date32 inner-join test (t1.c1 = {1,2,NULL,3}t2.c1 = {1,NULL,NULL,3}, batch_size=2, target_partitions=2), it dropped the key=3 row in roughly 4 of 6 runs.

The accumulator now gathers the feeding side's global [min, max] (nulls skipped) and, only once all partitions are exhausted, publishes a static superset predicate col >= min AND col <= max (or lit(false) when no non-null keys were seen) and marks the filter complete.

A consequence of the publish-once design: for the common scan → SortExec → SMJ shape, the blocking sort drains the scan before the bound is known, so there is no skip-ahead benefit there; the benefit accrues to pushdown-capable, already-sorted scans. A future enhancement could route per-partition advancing bounds via a hashed CASE expression (as HashJoinExec's Partitioned mode does) to recover skip-ahead pruning while staying correct.

What changes are included in this PR?

  • SortMergeJoinExec generates dynamic filters in gather_filters_for_pushdown (Post phase) and captures accepted ones in handle_child_pushdown_result.
  • New SharedSortMergeBoundsAccumulator (publish-once [min, max] superset, mirroring HashJoin).
  • The two SMJ stream implementations report join keys to the accumulator.
  • dynamic_filter_updates metric.
  • Tests: a fuzz/integration suite (smj_filter_pushdown.rs) including a multi-partition + NULL-key regression test that runs the previously-flaky shape 40×, plus updated EXPLAIN expectations in joins.slt, sort_merge_join.slt, explain_tree.slt.

Are these changes tested?

Yes. New smj_filter_pushdown integration tests (correctness with/without the filter, plan-presence, and a multi-partition NULL-key regression test), updated sqllogictest EXPLAIN plans, plus the existing SMJ unit tests. The full sqllogictest suite, datafusion-physical-optimizer, and datafusion-physical-plan join tests pass.

Are there any user-facing changes?

EXPLAIN output for plans containing a SortMergeJoinExec with dynamic-filter pushdown enabled (the default) now shows a DynamicFilter pushed to the scans it feeds (as a FilterExec for sources that cannot absorb it, or as a scan predicate for those that can). Like HashJoinExec, the filter is not rendered on the join node itself.

🤖 Generated with Claude Code

Adds support for generating dynamic filters for the left and right sides
of a SortMergeJoinExec, enabling range-based pruning of both sides of the
join (Inner / LeftSemi / RightSemi). The work is extracted from the
abandoned PR apache#21267, scoped to the SMJ subsystem only.

Two notable differences from apache#21267:

1. That PR also modified handle_child_pushdown_result in a number of
   unrelated operators (coalesce_batches, coop, filter, projection,
   repartition, sort, sort_preserving_merge, limit). Those changes are
   unnecessary: the physical-optimizer filter-pushdown driver already
   rebuilds each parent with its updated children via
   with_new_children_if_necessary plus the ptr_eq safety net in
   push_down_filters. The fuzz test confirms the dynamic filter reaches
   the DataSourceExec through Sort + Repartition without them.

2. The dynamic-filter coordinator is rewritten to follow the correctness
   model of HashJoinExec's SharedBuildAccumulator: build the filter from
   complete information and publish it exactly once. The original design
   shared a single filter across concurrently-executing hash partitions
   and advanced a one-sided bound as partitions progressed (raising it on
   partition exhaustion), which non-deterministically pruned valid join
   rows. The accumulator now gathers the feeding side's global [min, max]
   and publishes a static `col >= min AND col <= max` superset once all
   partitions are exhausted.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Pxss1NoHRv1ZdQzSoZMDKr
@github-actions github-actions Bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Jun 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant