[SPARK-56842][SQL] Short-circuit AQE when materialized stages are empty#55839
Open
sunchao wants to merge 2 commits into
Open
[SPARK-56842][SQL] Short-circuit AQE when materialized stages are empty#55839sunchao wants to merge 2 commits into
sunchao wants to merge 2 commits into
Conversation
Member
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Could you fix a compilation error, @sunchao ?
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala:83:13: pattern type is incompatible with expected type;
[error] found : org.apache.spark.sql.catalyst.plans.logical.EmptyRelation
[error] required: org.apache.spark.sql.execution.SparkPlan
[error] case _: EmptyRelation => Some(0)
[error] ^
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:415:9: not found: value removeStageFromCache
[error] removeStageFromCache(stage)
[error] ^
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:417:23: not enough arguments for method cancel: (reason: String): Unit.
[error] Unspecified value parameter reason.
[error] stage.cancel()
[error] ^
Member
Author
|
Thanks @dongjoon-hyun . Updated. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR extends AQE's empty-relation propagation so it can keep reasoning about an empty materialized stage even when that stage is wrapped by common physical operators:
AQEShuffleReadExecSortExecProjectExecColumnarToRowExecIt also keeps aggregate semantics correct while doing that:
Finally, when AQE adopts a new physical plan and cancels exchange stages that are no longer referenced, cancellation failures from those obsolete stages are ignored instead of being treated as query failures.
Why are the changes needed?
AQE already knows how to short-circuit a plan when a materialized query stage proves that a branch is empty. The problem is that the row-count signal can get lost once the materialized stage is wrapped by operators that preserve emptiness.
For example, consider:
The left side is empty. AQE can materialize that fact early, but the adaptive plan may see it through a shape like:
Before this change, the empty-stage information was not propagated through those wrappers, so AQE could miss the opportunity to collapse the join even though the result is known to be empty.
After this change, AQE continues propagating the
rowCount = 0signal through the wrapper nodes and can replace the remaining join work with an empty result plan.There is also an important aggregate corner case. A global aggregate over empty input still returns one row:
That result is
0, represented by one output row. This PR preserves that behavior while still allowing grouped aggregates to propagate child row counts.Finally, once AQE picks a new plan after discovering emptiness, previously launched exchange stages may become obsolete and get cancelled. Those cancellation callbacks should not fail the query, because the stages no longer belong to the chosen adaptive plan.
Does this PR introduce any user-facing change?
Yes.
Queries that become provably empty during adaptive execution can finish with less unnecessary downstream work, and queries should no longer fail because AQE intentionally cancelled exchange stages that became obsolete after re-planning.
How was this patch tested?
Added targeted AQE unit coverage for:
Validated locally with:
build/sbt sql/compile build/sbt 'sql/testOnly org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite -- -z "empty materialized stage short-circuits AQE through sort wrappers" -z "empty filtered global aggregate stage is not treated as non-empty"'Was this patch authored or co-authored using generative AI tooling?
Generated-by: OpenAI Codex