Skip to content

Recover q83 broadcast/subquery reuse lost by CometBroadcastExchangeExec.doCanonicalize fix in #4374 #4395

@andygrove

Description

@andygrove

Background

#4374 fixes a correctness bug (#4242) by including originalPlan.canonicalized in CometBroadcastExchangeExec.doCanonicalize, so that two semantically distinct broadcasts (count+1 vs count-1 in correlated-IN count-bug decorrelation) no longer collide and get merged by AQE's stageCache / CometReuseSubquery.

The trade-off is that q83 loses two ReusedSubquery collapses on the three DPP subqueries against date_dim:

  • approved-plans-v1_4-spark3_5/q83/extended.txt: deleted (falls through to the base file at 124 ops with 0 ReusedSubquery).
  • approved-plans-v1_4-spark4_0/q83.ansi/extended.txt: 98 ops → 124 ops; 2 ReusedSubquery → 0.

This is a perf regression (broadcasting date_dim three times instead of once) but not a correctness regression. Tracking the recovery separately so the correctness fix in #4374 isn't blocked.

Root cause

Spark's Canonicalize strips alias names (rewrites attributes to none#N) but preserves expression structure. That makes originalPlan.canonicalized the right discriminator for count-bug (the Add(count, 1) vs Subtract(count, 1) node inside HashAggregateExec.resultExpressions survives canonicalization).

But QueryPlan.normalizeExprIds is context-dependent: exprIds are assigned by plan traversal order. For q83, each DPP subquery is wrapped in its own AdaptiveSparkPlanExec, and the three structurally-equivalent build plans end up with non-equal canonical forms because exprIds embedded in their expressions are numbered against different surrounding contexts. The old (buggy) form happened to collapse q83's DPPs because it only keyed on child.canonicalized — the inner Comet operator tree canonicalizes from scratch with stable internal exprIds, so the three equivalent child plans were byte-equal, and originalPlan wasn't in the key at all.

What was tried and why it didn't work

While investigating review feedback on #4374, I tried two narrower keys:

  1. originalPlan.canonicalized.output — fails. Spark canonicalization wipes alias names, so count+1 and count-1 broadcasts have output [none#0L, none#1, none#2] after canonicalization. The CometAggregateSuite count-bug regression test reproduces (Spark returns 2 rows, Comet returns 1).

  2. Flat expression set: canonicalOriginal.collect { case p => p.expressions }.flatten.toSet — discriminates count-bug correctly, but false-negatives on equivalent broadcasts. Sampling 5 distinct invocations of equivalent count+1 broadcasts in the same test produced two different sets: (none#53L + 1) AS #2L vs (none#136L + 1) AS #2L. Same problem would hit q83.

The fundamental issue is that exprId assignment depends on traversal context, so structurally equivalent plans in different surrounding contexts get different canonical forms.

Possible paths forward

  • Custom exprId-insensitive canonicalization for CometBroadcastExchangeExec that re-numbers exprIds within just the canonicalized originalPlan, independent of the surrounding plan context. Non-trivial but most principled.
  • Unwrap AdaptiveSparkPlanExec before canonicalization (would need to verify this is the actual carrier of the differing exprIds in q83).
  • Targeted matching on the count-bug pattern only — e.g. include originalPlan in the key only when it embeds Spark-side projections above Comet aggregates. Brittle and pattern-specific.

Related

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions