Background
#4374 fixes a correctness bug (#4242) by including originalPlan.canonicalized in CometBroadcastExchangeExec.doCanonicalize, so that two semantically distinct broadcasts (count+1 vs count-1 in correlated-IN count-bug decorrelation) no longer collide and get merged by AQE's stageCache / CometReuseSubquery.
The trade-off is that q83 loses two ReusedSubquery collapses on the three DPP subqueries against date_dim:
approved-plans-v1_4-spark3_5/q83/extended.txt: deleted (falls through to the base file at 124 ops with 0 ReusedSubquery).
approved-plans-v1_4-spark4_0/q83.ansi/extended.txt: 98 ops → 124 ops; 2 ReusedSubquery → 0.
This is a perf regression (broadcasting date_dim three times instead of once) but not a correctness regression. Tracking the recovery separately so the correctness fix in #4374 isn't blocked.
Root cause
Spark's Canonicalize strips alias names (rewrites attributes to none#N) but preserves expression structure. That makes originalPlan.canonicalized the right discriminator for count-bug (the Add(count, 1) vs Subtract(count, 1) node inside HashAggregateExec.resultExpressions survives canonicalization).
But QueryPlan.normalizeExprIds is context-dependent: exprIds are assigned by plan traversal order. For q83, each DPP subquery is wrapped in its own AdaptiveSparkPlanExec, and the three structurally-equivalent build plans end up with non-equal canonical forms because exprIds embedded in their expressions are numbered against different surrounding contexts. The old (buggy) form happened to collapse q83's DPPs because it only keyed on child.canonicalized — the inner Comet operator tree canonicalizes from scratch with stable internal exprIds, so the three equivalent child plans were byte-equal, and originalPlan wasn't in the key at all.
What was tried and why it didn't work
While investigating review feedback on #4374, I tried two narrower keys:
-
originalPlan.canonicalized.output — fails. Spark canonicalization wipes alias names, so count+1 and count-1 broadcasts have output [none#0L, none#1, none#2] after canonicalization. The CometAggregateSuite count-bug regression test reproduces (Spark returns 2 rows, Comet returns 1).
-
Flat expression set: canonicalOriginal.collect { case p => p.expressions }.flatten.toSet — discriminates count-bug correctly, but false-negatives on equivalent broadcasts. Sampling 5 distinct invocations of equivalent count+1 broadcasts in the same test produced two different sets: (none#53L + 1) AS #2L vs (none#136L + 1) AS #2L. Same problem would hit q83.
The fundamental issue is that exprId assignment depends on traversal context, so structurally equivalent plans in different surrounding contexts get different canonical forms.
Possible paths forward
- Custom exprId-insensitive canonicalization for
CometBroadcastExchangeExec that re-numbers exprIds within just the canonicalized originalPlan, independent of the surrounding plan context. Non-trivial but most principled.
- Unwrap
AdaptiveSparkPlanExec before canonicalization (would need to verify this is the actual carrier of the differing exprIds in q83).
- Targeted matching on the count-bug pattern only — e.g. include originalPlan in the key only when it embeds Spark-side projections above Comet aggregates. Brittle and pattern-specific.
Related
Background
#4374 fixes a correctness bug (#4242) by including
originalPlan.canonicalizedinCometBroadcastExchangeExec.doCanonicalize, so that two semantically distinct broadcasts (count+1 vs count-1 in correlated-IN count-bug decorrelation) no longer collide and get merged by AQE'sstageCache/CometReuseSubquery.The trade-off is that q83 loses two
ReusedSubquerycollapses on the three DPP subqueries againstdate_dim:approved-plans-v1_4-spark3_5/q83/extended.txt: deleted (falls through to the base file at 124 ops with 0ReusedSubquery).approved-plans-v1_4-spark4_0/q83.ansi/extended.txt: 98 ops → 124 ops; 2ReusedSubquery→ 0.This is a perf regression (broadcasting
date_dimthree times instead of once) but not a correctness regression. Tracking the recovery separately so the correctness fix in #4374 isn't blocked.Root cause
Spark's
Canonicalizestrips alias names (rewrites attributes tonone#N) but preserves expression structure. That makesoriginalPlan.canonicalizedthe right discriminator for count-bug (theAdd(count, 1)vsSubtract(count, 1)node insideHashAggregateExec.resultExpressionssurvives canonicalization).But
QueryPlan.normalizeExprIdsis context-dependent: exprIds are assigned by plan traversal order. For q83, each DPP subquery is wrapped in its ownAdaptiveSparkPlanExec, and the three structurally-equivalent build plans end up with non-equal canonical forms because exprIds embedded in their expressions are numbered against different surrounding contexts. The old (buggy) form happened to collapse q83's DPPs because it only keyed onchild.canonicalized— the inner Comet operator tree canonicalizes from scratch with stable internal exprIds, so the three equivalent child plans were byte-equal, andoriginalPlanwasn't in the key at all.What was tried and why it didn't work
While investigating review feedback on #4374, I tried two narrower keys:
originalPlan.canonicalized.output— fails. Spark canonicalization wipes alias names, so count+1 and count-1 broadcasts have output[none#0L, none#1, none#2]after canonicalization. TheCometAggregateSuitecount-bug regression test reproduces (Spark returns 2 rows, Comet returns 1).Flat expression set:
canonicalOriginal.collect { case p => p.expressions }.flatten.toSet— discriminates count-bug correctly, but false-negatives on equivalent broadcasts. Sampling 5 distinct invocations of equivalent count+1 broadcasts in the same test produced two different sets:(none#53L + 1) AS #2Lvs(none#136L + 1) AS #2L. Same problem would hit q83.The fundamental issue is that exprId assignment depends on traversal context, so structurally equivalent plans in different surrounding contexts get different canonical forms.
Possible paths forward
CometBroadcastExchangeExecthat re-numbers exprIds within just the canonicalized originalPlan, independent of the surrounding plan context. Non-trivial but most principled.AdaptiveSparkPlanExecbefore canonicalization (would need to verify this is the actual carrier of the differing exprIds in q83).Related
CometBroadcastExchangeExec:equals/hashCodepreviously ignoredmodeandoutput(now fixed in feat: re-enable COUNT for mixed Spark partial / Comet final aggregates #4374), but Spark'sBroadcastExchangeExec.doCanonicalizecallsmode.canonicalizedwhile ours passed baremode(also fixed in feat: re-enable COUNT for mixed Spark partial / Comet final aggregates #4374). Mentioning for context — these are no longer outstanding.