Skip to content

feat: re-enable COUNT for mixed Spark partial / Comet final aggregates#4374

Open
andygrove wants to merge 18 commits into
apache:mainfrom
andygrove:feat/count-mixed-partial-final
Open

feat: re-enable COUNT for mixed Spark partial / Comet final aggregates#4374
andygrove wants to merge 18 commits into
apache:mainfrom
andygrove:feat/count-mixed-partial-final

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #4242.

Rationale for this change

#4015 deliberately excluded COUNT from supportsMixedPartialFinal after two regressions surfaced when enabling it: (1) a row drop in correlated IN subqueries with COUNT inside an OR (the count-bug pattern from Spark's in-count-bug.sql), and (2) an interaction with AQE's PropagateEmptyRelationAfterAQE. As a result the TPC-DS coverage gains from the original #2994 work are not realized.

Investigation showed that the count-bug row drop is a canonicalization bug in CometBroadcastExchangeExec, not a buffer-format issue:

  • doCanonicalize was nullifying originalPlan. originalPlan carries the projection applied during broadcast (e.g. count(1) + 1 vs count(1) - 1 in count-bug decorrelation).
  • With COUNT not mixed-safe, the Final aggregate stays on Spark and the projection lives inside child, so child.canonicalized distinguishes the two subqueries' broadcasts.
  • With COUNT mixed-safe, Comet absorbs the Final and the projection migrates to originalPlan. The previous canonical form discarded that field, so two structurally distinct count subqueries canonicalized identically, and AQE's stageCache keyed on canonicalized merged them via ReusedExchange. The second subquery then read the first subquery's broadcast values, producing wrong IN matches and dropping rows.

What changes are included in this PR?

  • CometBroadcastExchangeExec.doCanonicalize now includes originalPlan.canonicalized so broadcasts with different embedded projections stay distinct.
  • CometCount re-adds override def supportsMixedPartialFinal: Boolean = true.
  • Two new regression tests in CometAggregateSuite: the in-count-bug.sql OR pattern and an AQE empty-relation pattern with mixed COUNT.
  • Regenerated TPC-DS plan-stability golden files (34 modified, 1 pruned via the fallback chain) reflecting the additional queries where COUNT now runs natively.

How are these changes tested?

  • The new tests in CometAggregateSuite fail without the canonicalization fix (count-bug row drop reproduces with the documented data divergence) and pass with the fix.
  • CometAggregateSuite (83 tests) and CometTPCDSV1_4_PlanStabilitySuite on Spark 3.5 (97 tests) both pass after regeneration.
  • The wider AQE PropagateEmptyRelationAfterAQE regression is exercised by Spark's own CachedTableSuite via dev/diffs; CI will surface any remaining interaction there.

Stacked on #4015 (will rebase/retarget once it lands).

andygrove and others added 15 commits April 20, 2026 21:15
Previously, when one aggregate stage (Partial or Final) couldn't be
converted to Comet, the other was also blocked to avoid crashes from
incompatible intermediate buffer formats (issues apache#1389, apache#1267).

This change introduces per-aggregate `supportsMixedPartialFinal` declarations
so that aggregates with simple, compatible buffers (MIN, MAX, COUNT, bitwise)
can safely run in mixed mode while unsafe aggregates (SUM, AVG, Variance,
CollectSet) continue to be blocked.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Restore `convert` scaladoc in `CometAggregateExpressionSerde` that was
  displaced when `supportsMixedPartialFinal` was added
- Require `aggregateExpressions.nonEmpty` in `findPartialAggInPlan` so
  intermediate distinct-elimination stages (empty agg, group-by only)
  are not incorrectly tagged as the Partial to disable
- Document that `canFinalAggregateBeConverted` mirrors the predicate
  checks in `CometBaseAggregate.doConvert` and must be kept in sync
If the corresponding partial aggregate would also fail conversion to Comet
(for example, collect_set on float is incompatible), tagging it early
hijacks the more specific natural fallback reason. Only tag the partial
when it would otherwise have been converted, so the tag guards genuine
buffer-format mismatches rather than masking unrelated fallbacks.

Generalize the convertibility predicate to accept an expected mode and
mirror the mode-specific result-expression handling in doConvert.
… files

findPartialAggInPlan was using a deep tree traversal that matched partial
aggregates separated from the final by other aggregate stages. For Spark's
distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a
PartialMerge stage rather than directly into the final, so tagging it as
unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode
PartialMerge' fallback reason. Walk only through exchanges and AQE stages.

Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and
4.0 to reflect the branch's new safe-mixed-execution behavior where the
final aggregate converts to Comet when all aggregate functions have
compatible intermediate buffer formats.
Arrow's row format, used by DataFusion's grouped hash aggregate for
composite group keys, does not support Map at any nesting level. The
existing guard in CometBaseAggregate.doConvert only matched top-level
MapType, so queries grouping by e.g. array<map<int,int>> crashed with
"Row format support not yet implemented for: [SortField { ... List(Map(...)) }]"
once the new mixed-partial-final path produced a Comet Final aggregate
over Spark-partial output.

Add a recursive QueryPlanSerde.containsMapType helper that walks into
ArrayType and StructType, and use it in both doConvert and
canAggregateBeConverted. Add a regression test exercising the failing
group-by.sql query shape from SQLQueryTestSuite.
…ressions

Mixed COUNT partial/final regressed AQE's PropagateEmptyRelationAfterAQE
(which matches BaseAggregateExec only, not CometHashAggregateExec) and the
Spark 4.0 count-bug decorrelation for correlated IN subqueries (row
dropped in in-count-bug.sql OR pattern).

- Remove supportsMixedPartialFinal override from CometCount.
- Update trait docstring to explain why COUNT is intentionally excluded.
- Narrow the two "safe mixed" tests in CometExecRuleSuite to use only
  MIN/MAX, which remain mixed-safe.
- Revert TPC-DS golden file regeneration from commit 753a9a5; those
  plan changes were driven by COUNT becoming mixed-safe.

MIN, MAX, and bitwise aggregates retain supportsMixedPartialFinal = true.
Comet now accelerates the Partial/Final MIN aggregate in the subquery,
which reduces the WholeStageCodegen subtree count below the hardcoded 3
the test asserts. Tag the test with IgnoreComet in the 3.4.3, 3.5.8,
and 4.0.1 diffs, matching the pattern used for other plan-shape tests
in ExplainSuite.
…l-final-aggregates

# Conflicts:
#	.gitignore
#	dev/diffs/3.4.3.diff
#	spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
- add withInfo for multiMode and Spark-Final-without-Comet-Partial early
  returns so EXPLAIN shows the reason
- dedupe containsMapType: route operators.scala through
  QueryPlanSerde.containsMapType and drop the local private copy
- log unrecognized passthrough in findPartialAggInPlan at debug level
Re-enables `CometCount.supportsMixedPartialFinal = true` and fixes the
underlying canonicalization bug that caused the count-bug regression
documented in apache#4242.

Root cause: `CometBroadcastExchangeExec.doCanonicalize` nullified
`originalPlan`, the source-of-truth for the projection applied during
broadcast (e.g. count+1 vs count-1 in count-bug decorrelation). With
COUNT mixed-safe the Final aggregate moves to Comet and the projection
migrates from `child` to `originalPlan`, so two structurally distinct
correlated-IN subqueries canonicalized identically and AQE's
ReusedExchange wrongly merged them, dropping rows in the OR pattern.

Fix: include `originalPlan.canonicalized` in the canonical form so
broadcasts with different embedded projections stay distinct.

Adds two regression tests in `CometAggregateSuite` exercising the OR
pattern from Spark's `in-count-bug.sql` and the AQE empty-relation
pattern with mixed COUNT, and regenerates affected TPC-DS golden files.

Closes apache#4242
…ial-final

# Conflicts:
#	spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt
@andygrove andygrove marked this pull request as ready for review May 21, 2026 17:59
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just one minor thing to change, thanks for doing this @andygrove! Always great to see more reuse in the golden plans for TPC-DS, since that'll hopefully translate to better performance.

Comment thread spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala Outdated
andygrove added 2 commits May 21, 2026 14:02
HashedRelationBroadcastMode carries join-key expressions with non-canonical
exprIds, so two semantically equivalent Comet broadcasts could end up with
non-matching canonical forms and miss legitimate ReusedExchange opportunities.
Match Spark's BroadcastExchangeExec by canonicalizing the mode.
@@ -101,4 +127,4 @@ CometNativeColumnarToRow
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.date_dim

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come the 98/101 comes into 124/127 🤔 is it expected? we got more operators?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is much more work currently being done by the engine

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we lost a reuse on q83, the others went down because they gained reuse. I'm trying to make sense if this is correct to not reuse on q83, or if we've made something too strict.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the plan-stability changes:

  • approved-plans-v1_4-spark3_5/q83/extended.txt is deleted (was 98 ops with 2 ReusedSubquery), so 3.5 falls through to the base file at 124 ops with 0 ReusedSubquery.
  • approved-plans-v1_4-spark4_0/q83.ansi/extended.txt goes 98 ops with 2 ReusedSubquery to 124 ops with 0 ReusedSubquery.

On 3.5+, CometReuseSubquery keys on sub.plan.canonicalized (CometReuseSubquery.scala:59), which flows through the new doCanonicalize. AQE's stageCache keys the same way. The two-ReusedSubquery to zero-ReusedSubquery shift on q83 looks like the cache is no longer collapsing the three date_dim DPP subqueries that it used to collapse, which would translate to broadcasting date_dim three times instead of once at runtime. On 3.4 this is moot because CometSpark34AqeDppFallbackRule keeps DPP on the Spark side, but 3.5+ doesn't have that fallback.

My read is that originalPlan.canonicalized is broader than the count-bug needs. The count+1 vs count-1 difference surfaces in originalPlan.output. The q83 subqueries differ inside originalPlan (each scan's DPP gets its own AdaptiveSparkPlanExec wrapping a structurally-equivalent build plan in CometPlanAdaptiveDynamicPruningFilters.scala:266) but produce equivalent broadcast outputs. Including only originalPlan.canonicalized.output (or the canonicalized output expressions) in the canonical key would distinguish count+1 from count-1 while letting q83's three date_dim subqueries collide again. Worth confirming the regression test still fails without the broader form before narrowing.

Unrelated, while reading the file I noticed equals at line 244-252 compares originalPlan and child but ignores mode and output, and hashCode at line 254 only hashes child. Pre-existing and not in scope for this PR, but might be worth a follow-up issue.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried the narrower form (originalPlan.canonicalized.output in the canonical key, plus mode.canonicalized and equals/hashCode widened to include output and mode) and the count-bug regression still reproduces — CometAggregateSuite test returns 1 row vs Spark's 2. So the discriminator isn't (just) in output; the count+1 / count-1 projection lives deeper in the plan and is structurally indistinguishable at the output level after canonicalization. Looking into this more to see if there's a tighter key that recovers q83's reuse without losing the count-bug guard.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sticking with this, @andygrove!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a follow on issue since there does not seem to be a simple fix - #4395

…hCode

equals previously compared only originalPlan and child, and hashCode hashed
only child. With doCanonicalize now including originalPlan.canonicalized and
mode.canonicalized in the canonical form, all four case fields should
participate in identity for correctness and consistency.

Also document why keying on originalPlan.output alone is not a sufficient
discriminator for the count-bug case (alias names are wiped by Spark
canonicalization). q83 reuse loss tracked separately in apache#4395.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Re-enable COUNT for mixed Spark partial / Comet final aggregate execution

3 participants