Skip to content

[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828

Open
pnikic-db wants to merge 1 commit into
apache:masterfrom
pnikic-db:counter-diff-window-function
Open

[SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format#55828
pnikic-db wants to merge 1 commit into
apache:masterfrom
pnikic-db:counter-diff-window-function

Conversation

@pnikic-db
Copy link
Copy Markdown
Contributor

@pnikic-db pnikic-db commented May 12, 2026

What changes were proposed in this pull request?

This pull request proposes the addition of a new window function, counter_diff, which would be used to convert cumulative counters to delta format by computing the differences between consecutive values. The function would include special handling for counter resets, when the cumulative value gets reset to zero.

Syntax

counter_diff(value [, start_time]) OVER (PARTITION BY partition_exprs ORDER BY order_exprs)

Arguments

  • value: A cumulative counter. Must be numeric and non-negative.
  • start_time: An optional timestamp parameter which indicates when the counter was last set to zero. It is used to better detect counter resets.
  • partition_exprs: Used to separate independent counters. Good partitioning columns would be the metric name, as well as any attributes tied to the metric.
  • order_exprs: Used to order the rows. Should be the observation timestamp in ascending order.

Example

SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff
FROM VALUES
  ('http_requests', TIMESTAMP '2026-01-01T00:00:00', 100),
  ('http_requests', TIMESTAMP '2026-01-01T00:01:00', 200),
  ('http_requests', TIMESTAMP '2026-01-01T00:02:00', 400)
  AS tab (m, t, c)
m t c diff
http_requests 2026-01-01 00:00:00 100 NULL
http_requests 2026-01-01 00:01:00 200 100
http_requests 2026-01-01 00:02:00 400 200

Why are the changes needed?

Counters are metrics with monotonically increasing values. One example is the number of HTTP requests processed on a server. With each request, the counter increases. Counters can be represented in two temporalities: cumulative or delta:

  • With delta temporality, each observation represents the increase of the counter since the last observation.
  • With cumulative temporality, each observation represents the total accumulated value of the counter.
    • With cumulative counters, it is possible for the counter to reset to zero, for example when a restart occurs.

The cumulative representation is typically better for storage and transmission, as it is handles missed observations better.

  • For delta counters, if a single observation is lost, the increase is lost from the total counter value.
  • For cumulative counters, the observation is lost, but the total counter value does not decrease.

However, the delta representation is required for performing analytics on the metric, as they can be aggregated and bucketized.

counter_diff reduces the gap between these two representations. Given a cumulative counter, it computes the differences between consecutive values, resulting in the equivalent delta representation for the counter, which can be used in further analysis.

Does this PR introduce any user-facing change?

The counter_diff window function is a new function.

How was this patch tested?

A new test, counter-diff.sql, has been added with various SQL queries involving counter_diff and their expected outputs.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code with Claude Opus 4.7

@pnikic-db pnikic-db force-pushed the counter-diff-window-function branch from 96d71a6 to a29e7d4 Compare May 12, 2026 16:31
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

This PR adds a new SQL/Scala/Python window function counter_diff that converts monotonically increasing cumulative counters into delta form, with built-in handling of counter resets (value decrease, or optional start_time advance) and negative-value validation.

Prior state and problem. counter_diff is expressible today with c - lag(c) IGNORE NULLS OVER (...) plus user-written CASE/error logic, but the SQL gets verbose and easy to get wrong — especially when NULL counters and start_time interact (which row defines the baseline?). This PR consolidates that into one window function with consistent semantics.

Design approach. A new AggregateWindowFunction (CounterDiffBase + two case-class subclasses, dispatched by CounterDiffExpressionBuilder via a sentinel DefaultStartTime and an eq-identity check). It inherits the fixed ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame and carries prevCounter/currCounter (plus prevStartTime/currStartTime for the 2-arg form) in the agg buffer. Rows with NULL counter are excluded from baseline tracking so the next non-NULL row diffs against the most recent non-NULL prev. This is more efficient than expanding into multiple lag window columns via an analyzer rewrite, at the cost of carrying a custom expression class.

Key design decisions.

  • Two distinct case classes (CounterDiff / CounterDiffWithStartTime) for the 1-arg / 2-arg forms, justified by the different buffer sizes (2 attrs vs 4). Centralizing into a single class with Option[Expression] startTime would reduce duplication but at the cost of conditional buffer construction — defensible either way.
  • Negative counter is a hard runtime error (COUNTER_DIFF_NEGATIVE_COUNTER_VALUE) with no permissive variant. The 2-arg form additionally errors on start_time decrease (COUNTER_DIFF_START_TIME_DECREASED). Asymmetry between start_time decrease (error) vs advance (NULL reset) is intentional.
  • DECIMAL subtraction uses a new internal DecimalSubtractNoOverflowCheck (mirror of DecimalAddNoOverflowCheck) that keeps the result in the input precision/scale. Sound because the curr < 0 and curr < prev guards fire before the subtraction, so the result is always in [0, curr] and fits.
  • NULL start_time on a non-NULL-counter row is treated as "skip the reset check on this row" — all start_time comparisons evaluate to NULL and fall through to the regular diff path. A NULL counter on a row whose start_time changes absorbs that change, and the next non-NULL row sees the cumulative shift from the last non-NULL row. The golden file covers these subtle interactions.

Implementation sketch. New sql/catalyst/.../CounterDiff.scala carries the expression classes and the builder. FunctionRegistry adds the entry after the other window functions. Public-API additions go to sql/api/.../functions.scala, python/pyspark/sql/functions/builtin.py, and python/pyspark/sql/connect/functions/builtin.py. Tests: SQL golden file counter-diff.sql, a test in DataFrameWindowFunctionsSuite, and a Python test in test_functions.py. The expression-schema doc and Python rst/__init__ are also updated.

OSS Spark applicability. PR is filed against apache/spark directly — N/A.

General notes (couldn't anchor to a diff line)

  • python/pyspark/sql/tests/connect/test_connect_function.py::test_window_functions (lines 728-749) runs every window function side-by-side through Connect and classic. Worth adding (CF.counter_diff("c"), SF.counter_diff("c")) and a 2-arg variant there to cover the Connect path that python/pyspark/sql/connect/functions/builtin.py adds.


override def prettyName: String = "counter_diff"

override def nullable: Boolean = true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant override — AggregateWindowFunction already defaults nullable to true (windowExpressions.scala:630). Drop this line.

Suggested change
override def nullable: Boolean = true

Comment on lines +355 to +362
expressions match {
case Seq(value) =>
CounterDiff(value)
case Seq(value, startTime) if startTime eq DefaultStartTime =>
CounterDiff(value)
case Seq(value, startTime) =>
CounterDiffWithStartTime(value, startTime)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case Seq(value) branch is unreachable. FunctionSignature.defaultRearrange always returns a Seq sized to parameters.size (asserted in FunctionBuilderBase.scala:212), so build here always receives exactly 2 expressions — counter_diff() with zero args is rejected upstream by REQUIRED_PARAMETER_NOT_FOUND (covered by counter-diff.sql:378).

Suggested change
expressions match {
case Seq(value) =>
CounterDiff(value)
case Seq(value, startTime) if startTime eq DefaultStartTime =>
CounterDiff(value)
case Seq(value, startTime) =>
CounterDiffWithStartTime(value, startTime)
}
expressions match {
case Seq(value, startTime) if startTime eq DefaultStartTime =>
CounterDiff(value)
case Seq(value, startTime) =>
CounterDiffWithStartTime(value, startTime)
}

expressions match {
case Seq(value) =>
CounterDiff(value)
case Seq(value, startTime) if startTime eq DefaultStartTime =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatch between CounterDiff and CounterDiffWithStartTime relies on eq reference identity against the singleton DefaultStartTime. This works because FunctionRegistry calls rearrange and build back-to-back with no intervening transform, and defaultRearrange inserts the same instance verbatim. Worth a short code comment so a future reader (and anyone adding a transform between rearrange and build) understands the invariant: eq rather than == is intentional, because Literal.create(null, NullType) is structurally equal to a user-typed counter_diff(c, NULL) argument but should NOT collapse to the 1-arg form.

in ascending order.

The following special cases are handled:
1. If the counter value is NULL, the row is skipped and NULL is returned.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The row is skipped" reads as "the row is omitted from the output". The actual behavior is: the row appears in the output with a NULL diff, and its counter is excluded from the baseline used by subsequent rows.

Suggested change
1. If the counter value is NULL, the row is skipped and NULL is returned.
1. If the counter value is NULL, NULL is returned for that row, and the row's counter is excluded from the baseline used by subsequent rows.

Comment on lines +2577 to +2579
* @param startTime
* An optional timestamp parameter which indicates when the counter was last set to zero. Used
* to signal counter resets.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this Scala overload startTime is required — the "optional" wording applies to the SQL function's shape across overloads, not to this method (the 1-arg overload above already covers "no startTime").

Suggested change
* @param startTime
* An optional timestamp parameter which indicates when the counter was last set to zero. Used
* to signal counter resets.
* @param startTime
* A timestamp indicating when the counter was last set to zero. Used to signal counter
* resets.


Parameters
----------
col : :class:`~pyspark.sql.Column` or column name
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter-name mismatch: the function signature names the first parameter value, but the docstring documents it as col.

Suggested change
col : :class:`~pyspark.sql.Column` or column name
value : :class:`~pyspark.sql.Column` or column name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants