[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828
Conversation
…mulative counters to delta format
96d71a6 to
a29e7d4
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
This PR adds a new SQL/Scala/Python window function counter_diff that converts monotonically increasing cumulative counters into delta form, with built-in handling of counter resets (value decrease, or optional start_time advance) and negative-value validation.
Prior state and problem. counter_diff is expressible today with c - lag(c) IGNORE NULLS OVER (...) plus user-written CASE/error logic, but the SQL gets verbose and easy to get wrong — especially when NULL counters and start_time interact (which row defines the baseline?). This PR consolidates that into one window function with consistent semantics.
Design approach. A new AggregateWindowFunction (CounterDiffBase + two case-class subclasses, dispatched by CounterDiffExpressionBuilder via a sentinel DefaultStartTime and an eq-identity check). It inherits the fixed ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame and carries prevCounter/currCounter (plus prevStartTime/currStartTime for the 2-arg form) in the agg buffer. Rows with NULL counter are excluded from baseline tracking so the next non-NULL row diffs against the most recent non-NULL prev. This is more efficient than expanding into multiple lag window columns via an analyzer rewrite, at the cost of carrying a custom expression class.
Key design decisions.
- Two distinct case classes (
CounterDiff/CounterDiffWithStartTime) for the 1-arg / 2-arg forms, justified by the different buffer sizes (2 attrs vs 4). Centralizing into a single class withOption[Expression]startTime would reduce duplication but at the cost of conditional buffer construction — defensible either way. - Negative counter is a hard runtime error (
COUNTER_DIFF_NEGATIVE_COUNTER_VALUE) with no permissive variant. The 2-arg form additionally errors onstart_timedecrease (COUNTER_DIFF_START_TIME_DECREASED). Asymmetry between start_time decrease (error) vs advance (NULL reset) is intentional. - DECIMAL subtraction uses a new internal
DecimalSubtractNoOverflowCheck(mirror ofDecimalAddNoOverflowCheck) that keeps the result in the input precision/scale. Sound because thecurr < 0andcurr < prevguards fire before the subtraction, so the result is always in[0, curr]and fits. - NULL
start_timeon a non-NULL-counter row is treated as "skip the reset check on this row" — all start_time comparisons evaluate to NULL and fall through to the regular diff path. A NULL counter on a row whosestart_timechanges absorbs that change, and the next non-NULL row sees the cumulative shift from the last non-NULL row. The golden file covers these subtle interactions.
Implementation sketch. New sql/catalyst/.../CounterDiff.scala carries the expression classes and the builder. FunctionRegistry adds the entry after the other window functions. Public-API additions go to sql/api/.../functions.scala, python/pyspark/sql/functions/builtin.py, and python/pyspark/sql/connect/functions/builtin.py. Tests: SQL golden file counter-diff.sql, a test in DataFrameWindowFunctionsSuite, and a Python test in test_functions.py. The expression-schema doc and Python rst/__init__ are also updated.
OSS Spark applicability. PR is filed against apache/spark directly — N/A.
General notes (couldn't anchor to a diff line)
python/pyspark/sql/tests/connect/test_connect_function.py::test_window_functions(lines 728-749) runs every window function side-by-side through Connect and classic. Worth adding(CF.counter_diff("c"), SF.counter_diff("c"))and a 2-arg variant there to cover the Connect path thatpython/pyspark/sql/connect/functions/builtin.pyadds.
|
|
||
| override def prettyName: String = "counter_diff" | ||
|
|
||
| override def nullable: Boolean = true |
There was a problem hiding this comment.
Redundant override — AggregateWindowFunction already defaults nullable to true (windowExpressions.scala:630). Drop this line.
| override def nullable: Boolean = true |
| expressions match { | ||
| case Seq(value) => | ||
| CounterDiff(value) | ||
| case Seq(value, startTime) if startTime eq DefaultStartTime => | ||
| CounterDiff(value) | ||
| case Seq(value, startTime) => | ||
| CounterDiffWithStartTime(value, startTime) | ||
| } |
There was a problem hiding this comment.
The case Seq(value) branch is unreachable. FunctionSignature.defaultRearrange always returns a Seq sized to parameters.size (asserted in FunctionBuilderBase.scala:212), so build here always receives exactly 2 expressions — counter_diff() with zero args is rejected upstream by REQUIRED_PARAMETER_NOT_FOUND (covered by counter-diff.sql:378).
| expressions match { | |
| case Seq(value) => | |
| CounterDiff(value) | |
| case Seq(value, startTime) if startTime eq DefaultStartTime => | |
| CounterDiff(value) | |
| case Seq(value, startTime) => | |
| CounterDiffWithStartTime(value, startTime) | |
| } | |
| expressions match { | |
| case Seq(value, startTime) if startTime eq DefaultStartTime => | |
| CounterDiff(value) | |
| case Seq(value, startTime) => | |
| CounterDiffWithStartTime(value, startTime) | |
| } |
| expressions match { | ||
| case Seq(value) => | ||
| CounterDiff(value) | ||
| case Seq(value, startTime) if startTime eq DefaultStartTime => |
There was a problem hiding this comment.
The dispatch between CounterDiff and CounterDiffWithStartTime relies on eq reference identity against the singleton DefaultStartTime. This works because FunctionRegistry calls rearrange and build back-to-back with no intervening transform, and defaultRearrange inserts the same instance verbatim. Worth a short code comment so a future reader (and anyone adding a transform between rearrange and build) understands the invariant: eq rather than == is intentional, because Literal.create(null, NullType) is structurally equal to a user-typed counter_diff(c, NULL) argument but should NOT collapse to the 1-arg form.
| in ascending order. | ||
|
|
||
| The following special cases are handled: | ||
| 1. If the counter value is NULL, the row is skipped and NULL is returned. |
There was a problem hiding this comment.
"The row is skipped" reads as "the row is omitted from the output". The actual behavior is: the row appears in the output with a NULL diff, and its counter is excluded from the baseline used by subsequent rows.
| 1. If the counter value is NULL, the row is skipped and NULL is returned. | |
| 1. If the counter value is NULL, NULL is returned for that row, and the row's counter is excluded from the baseline used by subsequent rows. |
| * @param startTime | ||
| * An optional timestamp parameter which indicates when the counter was last set to zero. Used | ||
| * to signal counter resets. |
There was a problem hiding this comment.
In this Scala overload startTime is required — the "optional" wording applies to the SQL function's shape across overloads, not to this method (the 1-arg overload above already covers "no startTime").
| * @param startTime | |
| * An optional timestamp parameter which indicates when the counter was last set to zero. Used | |
| * to signal counter resets. | |
| * @param startTime | |
| * A timestamp indicating when the counter was last set to zero. Used to signal counter | |
| * resets. |
|
|
||
| Parameters | ||
| ---------- | ||
| col : :class:`~pyspark.sql.Column` or column name |
There was a problem hiding this comment.
Parameter-name mismatch: the function signature names the first parameter value, but the docstring documents it as col.
| col : :class:`~pyspark.sql.Column` or column name | |
| value : :class:`~pyspark.sql.Column` or column name |
What changes were proposed in this pull request?
This pull request proposes the addition of a new window function,
counter_diff, which would be used to convert cumulative counters to delta format by computing the differences between consecutive values. The function would include special handling for counter resets, when the cumulative value gets reset to zero.Syntax
counter_diff(value [, start_time]) OVER (PARTITION BY partition_exprs ORDER BY order_exprs)Arguments
value: A cumulative counter. Must be numeric and non-negative.start_time: An optional timestamp parameter which indicates when the counter was last set to zero. It is used to better detect counter resets.partition_exprs: Used to separate independent counters. Good partitioning columns would be the metric name, as well as any attributes tied to the metric.order_exprs: Used to order the rows. Should be the observation timestamp in ascending order.Example
Why are the changes needed?
Counters are metrics with monotonically increasing values. One example is the number of HTTP requests processed on a server. With each request, the counter increases. Counters can be represented in two temporalities: cumulative or delta:
The cumulative representation is typically better for storage and transmission, as it is handles missed observations better.
However, the delta representation is required for performing analytics on the metric, as they can be aggregated and bucketized.
counter_diffreduces the gap between these two representations. Given a cumulative counter, it computes the differences between consecutive values, resulting in the equivalent delta representation for the counter, which can be used in further analysis.Does this PR introduce any user-facing change?
The
counter_diffwindow function is a new function.How was this patch tested?
A new test,
counter-diff.sql, has been added with various SQL queries involvingcounter_diffand their expected outputs.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code with Claude Opus 4.7