Skip to content

CometHashAggregateExec doesn't participate in Spark's AQEPropagateEmptyRelation optimization #4412

@andygrove

Description

@andygrove

Problem

Spark's AQE rule AQEPropagateEmptyRelation (in sql/core/.../adaptive/AQEPropagateEmptyRelation.scala) detects when a query stage materializes 0 rows and propagates emptiness upward, eliminating downstream operators. The key pattern for aggregates is:

case LogicalQueryStage(_, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty &&
  agg.child.isInstanceOf[QueryStageExec] =>
  // estimate rows = child stage's actual row count

CometHashAggregateExec extends CometUnaryExec, not BaseAggregateExec, so the pattern never matches. The optimization can't see Comet aggregates, can't short-circuit them, and can't propagate emptiness through them to downstream unions / joins.

Why this surfaced now

PR #4374 (re-enable COUNT for mixed Spark partial / Comet final aggregates) routes COUNT's Final aggregate through Comet. Pre-PR, COUNT's Final stayed as Spark HashAggregate (= BaseAggregateExec), so the optimization fired. Post-PR it doesn't, and the affected tests fail:

  • SPARK-35442: Support propagate empty relation through aggregate (AdaptiveQueryExecSuite)
  • SPARK-35442: Support propagate empty relation through union (AdaptiveQueryExecSuite)

Both are tagged IgnoreComet linking to this issue in dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff.

Impact

Query results are correct under Comet. CometHashAggregateExec over an empty input returns empty. What's lost is the AQE short-circuit: instead of the plan reducing to LocalTableScanExec(empty) at planning time, the Comet aggregate runs against an empty shuffle stage and returns empty. The wasted work is microseconds, minor, but visible on plans with many post-aggregate operators (unions, joins) that could also collapse.

Options

A. Widen the SPARK-35442 test assertions to accept CometHashAggregateExec as a valid endpoint. Test-only change. Doesn't fix the actual missed optimization.

B. Add a Comet AQE rule that mirrors AQEPropagateEmptyRelation. When a CometHashAggregateExec (with GROUP BY) sits over a materialized empty stage, rewrite it to LocalTableScanExec(empty). ~40-80 lines plus tests. Closes the real gap; localized blast radius.

C. Make CometHashAggregateExec extend BaseAggregateExec. BaseAggregateExec is a trait, and 3 of its 8 abstract members already exist on CometHashAggregateExec; the remaining 5 are one-line delegates to originalPlan.asInstanceOf[BaseAggregateExec]. ~5-10 lines. The optimization fires naturally, and any future Spark rule keyed on BaseAggregateExec picks up Comet for free.

Today's review of every other Spark 3.4.3 rule that pattern-matches BaseAggregateExec (ReplaceHashWithSortAgg, RemoveRedundantProjects, AggregateCodegenSupport, DisableUnnecessaryBucketedScan, LogicalQueryStage stats) shows none would do anything harmful to a Comet aggregate. The caveat with C is that this needs verification on every Spark version we support (3.4 / 3.5 / 4.0 / 4.1 / 4.2), and any future BaseAggregateExec-keyed rule would automatically pick up Comet without an explicit opt-in.

Suggested follow-up

Prototype Option C in isolation, verify SPARK-35442 passes unchanged on all supported Spark versions, and verify no other rule regresses. If C proves risky, fall back to B.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions