Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f7fa33c
fix: allow safe mixed Spark/Comet partial/final aggregate execution
andygrove Apr 21, 2026
f2a8207
fix: address review feedback on mixed partial/final aggregate guard
andygrove Apr 21, 2026
9826403
fix: skip partial aggregate tag when partial itself cannot be converted
andygrove Apr 21, 2026
753a9a5
fix: narrow partial aggregate tag lookup and regenerate TPC-DS golden…
andygrove Apr 21, 2026
6ae483d
fix: reject grouping on nested map types in hash aggregate conversion
andygrove Apr 21, 2026
53405f6
fix: remove COUNT from mixed-safe aggregates to fix AQE/count-bug reg…
andygrove Apr 22, 2026
9e2c25a
spotless
andygrove Apr 22, 2026
f53e3c1
test: ignore SPARK-33853 explain codegen subquery test under Comet
andygrove Apr 23, 2026
3285485
Merge remote-tracking branch 'apache/main' into fix/safe-mixed-partia…
andygrove Apr 25, 2026
671afa6
Merge remote-tracking branch 'apache/main' into fix/safe-mixed-partia…
andygrove May 6, 2026
4322852
test: regenerate Spark 4.2 TPC-DS golden files after merge from main
andygrove May 6, 2026
12018c3
Merge remote-tracking branch 'apache/main' into fix/safe-mixed-partia…
andygrove May 20, 2026
43e0c0b
fix: address review feedback on safe mixed aggregate guard
andygrove May 20, 2026
f0437e0
feat: re-enable COUNT for mixed Spark partial / Comet final aggregates
andygrove May 20, 2026
6bd58c8
Merge remote-tracking branch 'apache/main' into feat/count-mixed-part…
andygrove May 21, 2026
8b83454
fix: canonicalize broadcast mode in CometBroadcastExchangeExec [skip ci]
andygrove May 21, 2026
f813be0
chore: drop unrelated .gitignore change [skip ci]
andygrove May 21, 2026
bc53538
fix: include mode and output in CometBroadcastExchangeExec equals/has…
andygrove May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/aggregates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ object CometMax extends CometAggregateExpressionSerde[Max] {
}

object CometCount extends CometAggregateExpressionSerde[Count] {

override def supportsMixedPartialFinal: Boolean = true

override def convert(
aggExpr: AggregateExpression,
expr: Count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,20 @@ case class CometBroadcastExchangeExec(
"number of coalesced rows for broadcast"))

override def doCanonicalize(): SparkPlan = {
CometBroadcastExchangeExec(null, null, mode, child.canonicalized)
// originalPlan is the source-of-truth for the projection applied during broadcast
// (e.g. count+1 vs count-1 in correlated IN with count-bug decorrelation). Two
// CometBroadcastExchanges with identical Comet children but different originalPlans
// produce different broadcast values, so their canonical forms must differ to
// prevent AQE's ReusedExchange from incorrectly merging them. See issue #4242.
// Keying only on originalPlan.output is not sufficient — Spark canonicalization
// rewrites alias names to "none", so count+1 vs count-1 output attributes become
// indistinguishable; the discriminator lives in the Add/Subtract expression nodes
// deep inside the canonicalized plan tree.
// Canonicalize mode as well: HashedRelationBroadcastMode carries join-key expressions
// with non-canonical exprIds, so two semantically equivalent broadcasts would otherwise
// miss legitimate reuse opportunities (matches Spark's BroadcastExchangeExec).
val canonicalOriginal = if (originalPlan != null) originalPlan.canonicalized else null
CometBroadcastExchangeExec(canonicalOriginal, null, mode.canonicalized, child.canonicalized)
}

override def runtimeStatistics: Statistics = {
Expand Down Expand Up @@ -245,13 +258,15 @@ case class CometBroadcastExchangeExec(
obj match {
case other: CometBroadcastExchangeExec =>
this.originalPlan == other.originalPlan &&
this.output == other.output &&
this.mode == other.mode &&
this.child == other.child
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(child)
override def hashCode(): Int = Objects.hashCode(originalPlan, output, mode, child)

override def stringArgs: Iterator[Any] = Iterator(output, child)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : : +- ReusedSubquery
: : : : +- CometBroadcastExchange
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down Expand Up @@ -180,11 +176,7 @@ CometNativeColumnarToRow
: +- CometFilter
: : +- ReusedSubquery
: +- CometNativeScan parquet spark_catalog.default.date_dim
: +- Subquery
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.date_dim
: +- ReusedSubquery
+- CometBroadcastExchange
+- CometFilter
: +- ReusedSubquery
Expand Down Expand Up @@ -225,11 +217,7 @@ CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : : +- ReusedSubquery
: : : : +- CometBroadcastExchange
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down Expand Up @@ -288,11 +276,7 @@ CometNativeColumnarToRow
: : : +- CometBroadcastHashJoin
: : : :- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.store_sales
: : : : +- CometSubqueryBroadcast
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometBroadcastHashJoin
: : : :- CometFilter
Expand Down Expand Up @@ -336,10 +320,6 @@ CometNativeColumnarToRow
+- CometFilter
: +- ReusedSubquery
+- CometNativeScan parquet spark_catalog.default.date_dim
+- Subquery
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.date_dim
+- ReusedSubquery

Comet accelerated 310 out of 337 eligible operators (91%). Final plan contains 6 transitions between Spark and Comet.
Comet accelerated 292 out of 319 eligible operators (91%). Final plan contains 4 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,7 @@ CometNativeColumnarToRow
: :- ReusedSubquery
: +- ReusedSubquery
+- CometNativeScan parquet spark_catalog.default.date_dim
:- Subquery
: +- CometNativeColumnarToRow
: +- CometHashAggregate
: +- CometExchange
: +- CometHashAggregate
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.date_dim
+- Subquery
+- CometNativeColumnarToRow
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.date_dim
:- ReusedSubquery
+- ReusedSubquery

Comet accelerated 89 out of 100 eligible operators (89%). Final plan contains 5 transitions between Spark and Comet.
Comet accelerated 77 out of 88 eligible operators (87%). Final plan contains 3 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,7 @@ CometNativeColumnarToRow
: +- CometFilter
: : +- ReusedSubquery
: +- CometNativeScan parquet spark_catalog.default.date_dim
: +- Subquery
: +- CometNativeColumnarToRow
: +- CometHashAggregate
: +- CometExchange
: +- CometHashAggregate
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.date_dim
: +- ReusedSubquery
+- CometBroadcastExchange
+- CometProject
+- CometBroadcastHashJoin
Expand All @@ -62,4 +55,4 @@ CometNativeColumnarToRow
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.item

Comet accelerated 55 out of 60 eligible operators (91%). Final plan contains 3 transitions between Spark and Comet.
Comet accelerated 49 out of 54 eligible operators (90%). Final plan contains 2 transitions between Spark and Comet.
Loading
Loading