feat: re-enable COUNT for mixed Spark partial / Comet final aggregates#4374
feat: re-enable COUNT for mixed Spark partial / Comet final aggregates#4374andygrove wants to merge 18 commits into
Conversation
Previously, when one aggregate stage (Partial or Final) couldn't be converted to Comet, the other was also blocked to avoid crashes from incompatible intermediate buffer formats (issues apache#1389, apache#1267). This change introduces per-aggregate `supportsMixedPartialFinal` declarations so that aggregates with simple, compatible buffers (MIN, MAX, COUNT, bitwise) can safely run in mixed mode while unsafe aggregates (SUM, AVG, Variance, CollectSet) continue to be blocked. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Restore `convert` scaladoc in `CometAggregateExpressionSerde` that was displaced when `supportsMixedPartialFinal` was added - Require `aggregateExpressions.nonEmpty` in `findPartialAggInPlan` so intermediate distinct-elimination stages (empty agg, group-by only) are not incorrectly tagged as the Partial to disable - Document that `canFinalAggregateBeConverted` mirrors the predicate checks in `CometBaseAggregate.doConvert` and must be kept in sync
If the corresponding partial aggregate would also fail conversion to Comet (for example, collect_set on float is incompatible), tagging it early hijacks the more specific natural fallback reason. Only tag the partial when it would otherwise have been converted, so the tag guards genuine buffer-format mismatches rather than masking unrelated fallbacks. Generalize the convertibility predicate to accept an expected mode and mirror the mode-specific result-expression handling in doConvert.
… files findPartialAggInPlan was using a deep tree traversal that matched partial aggregates separated from the final by other aggregate stages. For Spark's distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a PartialMerge stage rather than directly into the final, so tagging it as unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode PartialMerge' fallback reason. Walk only through exchanges and AQE stages. Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and 4.0 to reflect the branch's new safe-mixed-execution behavior where the final aggregate converts to Comet when all aggregate functions have compatible intermediate buffer formats.
Arrow's row format, used by DataFusion's grouped hash aggregate for
composite group keys, does not support Map at any nesting level. The
existing guard in CometBaseAggregate.doConvert only matched top-level
MapType, so queries grouping by e.g. array<map<int,int>> crashed with
"Row format support not yet implemented for: [SortField { ... List(Map(...)) }]"
once the new mixed-partial-final path produced a Comet Final aggregate
over Spark-partial output.
Add a recursive QueryPlanSerde.containsMapType helper that walks into
ArrayType and StructType, and use it in both doConvert and
canAggregateBeConverted. Add a regression test exercising the failing
group-by.sql query shape from SQLQueryTestSuite.
…ressions Mixed COUNT partial/final regressed AQE's PropagateEmptyRelationAfterAQE (which matches BaseAggregateExec only, not CometHashAggregateExec) and the Spark 4.0 count-bug decorrelation for correlated IN subqueries (row dropped in in-count-bug.sql OR pattern). - Remove supportsMixedPartialFinal override from CometCount. - Update trait docstring to explain why COUNT is intentionally excluded. - Narrow the two "safe mixed" tests in CometExecRuleSuite to use only MIN/MAX, which remain mixed-safe. - Revert TPC-DS golden file regeneration from commit 753a9a5; those plan changes were driven by COUNT becoming mixed-safe. MIN, MAX, and bitwise aggregates retain supportsMixedPartialFinal = true.
Comet now accelerates the Partial/Final MIN aggregate in the subquery, which reduces the WholeStageCodegen subtree count below the hardcoded 3 the test asserts. Tag the test with IgnoreComet in the 3.4.3, 3.5.8, and 4.0.1 diffs, matching the pattern used for other plan-shape tests in ExplainSuite.
…l-final-aggregates # Conflicts: # .gitignore # dev/diffs/3.4.3.diff # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
…l-final-aggregates
…l-final-aggregates
- add withInfo for multiMode and Spark-Final-without-Comet-Partial early returns so EXPLAIN shows the reason - dedupe containsMapType: route operators.scala through QueryPlanSerde.containsMapType and drop the local private copy - log unrecognized passthrough in findPartialAggInPlan at debug level
Re-enables `CometCount.supportsMixedPartialFinal = true` and fixes the underlying canonicalization bug that caused the count-bug regression documented in apache#4242. Root cause: `CometBroadcastExchangeExec.doCanonicalize` nullified `originalPlan`, the source-of-truth for the projection applied during broadcast (e.g. count+1 vs count-1 in count-bug decorrelation). With COUNT mixed-safe the Final aggregate moves to Comet and the projection migrates from `child` to `originalPlan`, so two structurally distinct correlated-IN subqueries canonicalized identically and AQE's ReusedExchange wrongly merged them, dropping rows in the OR pattern. Fix: include `originalPlan.canonicalized` in the canonical form so broadcasts with different embedded projections stay distinct. Adds two regression tests in `CometAggregateSuite` exercising the OR pattern from Spark's `in-count-bug.sql` and the AQE empty-relation pattern with mixed COUNT, and regenerates affected TPC-DS golden files. Closes apache#4242
…ial-final # Conflicts: # spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt
mbutrovich
left a comment
There was a problem hiding this comment.
I think just one minor thing to change, thanks for doing this @andygrove! Always great to see more reuse in the golden plans for TPC-DS, since that'll hopefully translate to better performance.
HashedRelationBroadcastMode carries join-key expressions with non-canonical exprIds, so two semantically equivalent Comet broadcasts could end up with non-matching canonical forms and miss legitimate ReusedExchange opportunities. Match Spark's BroadcastExchangeExec by canonicalizing the mode.
| @@ -101,4 +127,4 @@ CometNativeColumnarToRow | |||
| +- CometFilter | |||
| +- CometNativeScan parquet spark_catalog.default.date_dim | |||
|
|
|||
There was a problem hiding this comment.
how come the 98/101 comes into 124/127 🤔 is it expected? we got more operators?
There was a problem hiding this comment.
Looks like it is much more work currently being done by the engine
There was a problem hiding this comment.
we lost a reuse on q83, the others went down because they gained reuse. I'm trying to make sense if this is correct to not reuse on q83, or if we've made something too strict.
There was a problem hiding this comment.
Looking at the plan-stability changes:
approved-plans-v1_4-spark3_5/q83/extended.txtis deleted (was 98 ops with 2ReusedSubquery), so 3.5 falls through to the base file at 124 ops with 0ReusedSubquery.approved-plans-v1_4-spark4_0/q83.ansi/extended.txtgoes 98 ops with 2ReusedSubqueryto 124 ops with 0ReusedSubquery.
On 3.5+, CometReuseSubquery keys on sub.plan.canonicalized (CometReuseSubquery.scala:59), which flows through the new doCanonicalize. AQE's stageCache keys the same way. The two-ReusedSubquery to zero-ReusedSubquery shift on q83 looks like the cache is no longer collapsing the three date_dim DPP subqueries that it used to collapse, which would translate to broadcasting date_dim three times instead of once at runtime. On 3.4 this is moot because CometSpark34AqeDppFallbackRule keeps DPP on the Spark side, but 3.5+ doesn't have that fallback.
My read is that originalPlan.canonicalized is broader than the count-bug needs. The count+1 vs count-1 difference surfaces in originalPlan.output. The q83 subqueries differ inside originalPlan (each scan's DPP gets its own AdaptiveSparkPlanExec wrapping a structurally-equivalent build plan in CometPlanAdaptiveDynamicPruningFilters.scala:266) but produce equivalent broadcast outputs. Including only originalPlan.canonicalized.output (or the canonicalized output expressions) in the canonical key would distinguish count+1 from count-1 while letting q83's three date_dim subqueries collide again. Worth confirming the regression test still fails without the broader form before narrowing.
Unrelated, while reading the file I noticed equals at line 244-252 compares originalPlan and child but ignores mode and output, and hashCode at line 254 only hashes child. Pre-existing and not in scope for this PR, but might be worth a follow-up issue.
There was a problem hiding this comment.
Tried the narrower form (originalPlan.canonicalized.output in the canonical key, plus mode.canonicalized and equals/hashCode widened to include output and mode) and the count-bug regression still reproduces — CometAggregateSuite test returns 1 row vs Spark's 2. So the discriminator isn't (just) in output; the count+1 / count-1 projection lives deeper in the plan and is structurally indistinguishable at the output level after canonicalization. Looking into this more to see if there's a tighter key that recovers q83's reuse without losing the count-bug guard.
There was a problem hiding this comment.
I filed a follow on issue since there does not seem to be a simple fix - #4395
…hCode equals previously compared only originalPlan and child, and hashCode hashed only child. With doCanonicalize now including originalPlan.canonicalized and mode.canonicalized in the canonical form, all four case fields should participate in identity for correctness and consistency. Also document why keying on originalPlan.output alone is not a sufficient discriminator for the count-bug case (alias names are wiped by Spark canonicalization). q83 reuse loss tracked separately in apache#4395.
Which issue does this PR close?
Closes #4242.
Rationale for this change
#4015 deliberately excluded
COUNTfromsupportsMixedPartialFinalafter two regressions surfaced when enabling it: (1) a row drop in correlatedINsubqueries with COUNT inside anOR(the count-bug pattern from Spark'sin-count-bug.sql), and (2) an interaction with AQE'sPropagateEmptyRelationAfterAQE. As a result the TPC-DS coverage gains from the original #2994 work are not realized.Investigation showed that the count-bug row drop is a canonicalization bug in
CometBroadcastExchangeExec, not a buffer-format issue:doCanonicalizewas nullifyingoriginalPlan.originalPlancarries the projection applied during broadcast (e.g.count(1) + 1vscount(1) - 1in count-bug decorrelation).COUNTnot mixed-safe, the Final aggregate stays on Spark and the projection lives insidechild, sochild.canonicalizeddistinguishes the two subqueries' broadcasts.COUNTmixed-safe, Comet absorbs the Final and the projection migrates tooriginalPlan. The previous canonical form discarded that field, so two structurally distinct count subqueries canonicalized identically, and AQE'sstageCachekeyed oncanonicalizedmerged them viaReusedExchange. The second subquery then read the first subquery's broadcast values, producing wrongINmatches and dropping rows.What changes are included in this PR?
CometBroadcastExchangeExec.doCanonicalizenow includesoriginalPlan.canonicalizedso broadcasts with different embedded projections stay distinct.CometCountre-addsoverride def supportsMixedPartialFinal: Boolean = true.CometAggregateSuite: thein-count-bug.sqlOR pattern and an AQE empty-relation pattern with mixed COUNT.How are these changes tested?
CometAggregateSuitefail without the canonicalization fix (count-bug row drop reproduces with the documented data divergence) and pass with the fix.CometAggregateSuite(83 tests) andCometTPCDSV1_4_PlanStabilitySuiteon Spark 3.5 (97 tests) both pass after regeneration.PropagateEmptyRelationAfterAQEregression is exercised by Spark's ownCachedTableSuiteviadev/diffs; CI will surface any remaining interaction there.Stacked on #4015 (will rebase/retarget once it lands).