Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,18 @@
],
"sqlState" : "0A000"
},
"COUNTER_DIFF_NEGATIVE_COUNTER_VALUE" : {
"message" : [
"A negative counter value <value> was provided to function <function>. Negative counter values are not allowed."
],
"sqlState" : "22003"
},
"COUNTER_DIFF_START_TIME_DECREASED" : {
"message" : [
"Start time provided to function <function> decreased from <previousStartTime> to <currentStartTime>. Start time is required to be non-decreasing."
],
"sqlState" : "22023"
},
"CREATE_PERMANENT_VIEW_WITHOUT_ALIAS" : {
"message" : [
"Not allowed to create the permanent view <name> without explicitly assigning an alias for the expression <attr>."
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ Window Functions
.. autosummary::
:toctree: api/

counter_diff
cume_dist
dense_rank
lag
Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,15 @@ def bit_xor(col: "ColumnOrName") -> Column:
# Window Functions


def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = None) -> Column:
if startTime is None:
return _invoke_function_over_columns("counter_diff", value)
return _invoke_function_over_columns("counter_diff", value, startTime)


counter_diff.__doc__ = pysparkfuncs.counter_diff.__doc__


def cume_dist() -> Column:
return _invoke_function("cume_dist")

Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@
"var_samp",
"variance",
# Window Functions
"counter_diff",
"cume_dist",
"dense_rank",
"lag",
Expand Down
78 changes: 78 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6464,6 +6464,84 @@ def rank() -> Column:
return _invoke_function("rank")


@_try_remote_functions
def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = None) -> Column:
"""
Window function: computes the differences between consecutive cumulative counter values in a
time series, thereby converting the counter from the cumulative to the delta format.

Gracefully handles counter resets by returning NULL. Counter resets are detected when the
counter value decreases, or when the start time advances between rows.

Use the PARTITION BY clause of the window to separate independent counters. This is done by
specifying all columns which uniquely identify a time series. These are typically the counter
name and any attributes tied to the counter.

Use the ORDER BY clause of the window to order the observations by the associated timestamp
in ascending order.

.. versionadded:: 4.2.0

Parameters
----------
value : :class:`~pyspark.sql.Column` or column name
A cumulative counter. Must be a numeric data type. Must be non-negative.
startTime : :class:`~pyspark.sql.Column` or column name, optional
An optional timestamp parameter which indicates when the counter was last set to zero.
Used to signal counter resets.

Returns
-------
:class:`~pyspark.sql.Column`
The difference between the current and previous counter value within the window partition.

Examples
--------
>>> from pyspark.sql import functions as sf
>>> from pyspark.sql import Window
>>> from datetime import datetime
>>> df = spark.createDataFrame(
... [('http_requests', datetime(2026, 1, 1, 0, 0), 100),
... ('http_requests', datetime(2026, 1, 1, 0, 1), 200),
... ('http_requests', datetime(2026, 1, 1, 0, 2), 400),
... ('http_requests', datetime(2026, 1, 1, 0, 3), 50),
... ('http_requests', datetime(2026, 1, 1, 0, 4), 100)],
... "m STRING, t TIMESTAMP_NTZ, c INT")
>>> w = Window.partitionBy("m").orderBy("t")
>>> df.select("m", "t", "c", sf.counter_diff("c").over(w).alias("diff")).show()
+-------------+-------------------+---+----+
| m| t| c|diff|
+-------------+-------------------+---+----+
|http_requests|2026-01-01 00:00:00|100|NULL|
|http_requests|2026-01-01 00:01:00|200| 100|
|http_requests|2026-01-01 00:02:00|400| 200|
|http_requests|2026-01-01 00:03:00| 50|NULL|
|http_requests|2026-01-01 00:04:00|100| 50|
+-------------+-------------------+---+----+

>>> df2 = spark.createDataFrame(
... [('http_requests', datetime(2026, 1, 1, 0, 0), 100, datetime(2026, 1, 1, 0, 0)),
... ('http_requests', datetime(2026, 1, 1, 0, 1), 200, datetime(2026, 1, 1, 0, 0)),
... ('http_requests', datetime(2026, 1, 1, 0, 2), 400, datetime(2026, 1, 1, 0, 0)),
... ('http_requests', datetime(2026, 1, 1, 0, 3), 500, datetime(2026, 1, 1, 0, 2, 15)),
... ('http_requests', datetime(2026, 1, 1, 0, 4), 600, datetime(2026, 1, 1, 0, 2, 15))],
... "m STRING, t TIMESTAMP_NTZ, c INT, s TIMESTAMP_NTZ")
>>> df2.select("m", "t", "s", "c", sf.counter_diff("c", "s").over(w).alias("diff")).show()
+-------------+-------------------+-------------------+---+----+
| m| t| s| c|diff|
+-------------+-------------------+-------------------+---+----+
|http_requests|2026-01-01 00:00:00|2026-01-01 00:00:00|100|NULL|
|http_requests|2026-01-01 00:01:00|2026-01-01 00:00:00|200| 100|
|http_requests|2026-01-01 00:02:00|2026-01-01 00:00:00|400| 200|
|http_requests|2026-01-01 00:03:00|2026-01-01 00:02:15|500|NULL|
|http_requests|2026-01-01 00:04:00|2026-01-01 00:02:15|600| 100|
+-------------+-------------------+-------------------+---+----+
"""
if startTime is None:
return _invoke_function_over_columns("counter_diff", value)
return _invoke_function_over_columns("counter_diff", value, startTime)


@_try_remote_functions
def cume_dist() -> Column:
"""
Expand Down
31 changes: 31 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,37 @@ def test_window_functions(self):
sdf.select(scol.over(swin)).toPandas(),
)

# test counter_diff: requires non-negative values and has a two-argument form that
# accepts a start_time parameter, so a separate dataset is used.
counter_diff_query = """
SELECT * FROM VALUES
(0, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(0, TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(0, TIMESTAMP_NTZ '2026-01-01 00:02:00', 300, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(0, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1200, TIMESTAMP_NTZ '2026-01-01 00:02:15'),
(0, TIMESTAMP_NTZ '2026-01-01 00:04:00', 1300, TIMESTAMP_NTZ '2026-01-01 00:02:15'),
(0, TIMESTAMP_NTZ '2026-01-01 00:05:00', 50, TIMESTAMP_NTZ '2026-01-01 00:02:15'),
(0, TIMESTAMP_NTZ '2026-01-01 00:06:00', 100, TIMESTAMP_NTZ '2026-01-01 00:02:15'),
(0, TIMESTAMP_NTZ '2026-01-01 00:07:00', 20, TIMESTAMP_NTZ '2026-01-01 00:06:45'),
(0, TIMESTAMP_NTZ '2026-01-01 00:08:00', 60, TIMESTAMP_NTZ '2026-01-01 00:06:45'),
(1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(1, TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(1, TIMESTAMP_NTZ '2026-01-01 00:02:00', 300, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(1, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1000, TIMESTAMP_NTZ '2025-12-31 00:00:00'),
(1, TIMESTAMP_NTZ '2026-01-01 00:04:00', 75, TIMESTAMP_NTZ '2026-01-01 00:03:15')
AS tab(p, t, c, s)
"""
cdf_cd = self.connect.sql(counter_diff_query)
sdf_cd = self.spark.sql(counter_diff_query)
for ccol, scol in [
(CF.counter_diff("c"), SF.counter_diff("c")),
(CF.counter_diff("c", "s"), SF.counter_diff("c", "s")),
]:
self.assert_eq(
cdf_cd.select(ccol.over(CW.partitionBy("p").orderBy("t"))).toPandas(),
sdf_cd.select(scol.over(SW.partitionBy("p").orderBy("t"))).toPandas(),
)

# test aggregation functions
for ccol, scol in [
(CF.count("c"), SF.count("c")),
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,35 @@ def test_window_functions(self):
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[: len(r)])

def test_counter_diff_window_function(self):
df = self.spark.createDataFrame(
[
(1, datetime.datetime(2024, 1, 1), 100),
(2, datetime.datetime(2024, 1, 1), 200),
(3, datetime.datetime(2024, 1, 1), 400),
(4, datetime.datetime(2024, 1, 2), 50),
(5, datetime.datetime(2024, 1, 2), 150),
],
["t", "st", "c"],
)
w = Window.orderBy("t")

rows = df.select("t", F.counter_diff("c").over(w).alias("d")).orderBy("t").collect()
self.assertEqual(
[(r.t, r.d) for r in rows],
[(1, None), (2, 100), (3, 200), (4, None), (5, 100)],
)

rows = (
df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d"))
.orderBy("t")
.collect()
)
self.assertEqual(
[(r.t, r.d) for r in rows],
[(1, None), (2, 100), (3, 200), (4, None), (5, 100)],
)

def test_window_functions_without_partitionBy(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
w = Window.orderBy("key", df.value)
Expand Down
57 changes: 57 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2531,6 +2531,63 @@ object functions {
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////

/**
* Window function: computes the differences between consecutive cumulative counter values in a
* time series, thereby converting the counter from the cumulative to the delta format.
*
* Gracefully handles counter resets by returning NULL. Counter resets are detected when the
* counter value decreases.
*
* Use the PARTITION BY clause of the window to separate independent counters. This is done by
* specifying all columns which uniquely identify a time series. These are typically the counter
* name and any attributes tied to the counter.
*
* Use the ORDER BY clause of the window to order the observations by the associated timestamp
* in ascending order.
*
* @param value
* A cumulative counter. Must be a numeric data type. Must be non-negative.
*
* @return
* The difference between the current and previous counter value within the window partition,
* according to the order defined by the window's ORDER BY clause.
*
* @group window_funcs
* @since 4.2.0
*/
def counter_diff(value: Column): Column = Column.fn("counter_diff", value)

/**
* Window function: returns the difference between consecutive cumulative counter values in a
* time series. With this operation, the counter is converted from cumulative to delta format.
*
* Gracefully handles counter resets by returning NULL. Counter resets are detected when the
* counter value decreases, or when the start time advances between rows.
*
* Use the PARTITION BY clause of the window to separate independent counters. This is done by
* specifying all columns which uniquely identify a time series. These are typically the counter
* name and any attributes tied to the counter.
*
* Use the ORDER BY clause of the window to order the observations by the associated timestamp
* in ascending order.
*
* @param value
* A cumulative counter. Must be a numeric data type. Must be non-negative.
*
* @param startTime
* A timestamp indicating when the counter was last set to zero. Used to signal counter
* resets.
*
* @return
* The difference between the current and previous counter value within the window partition,
* according to the order defined by the window's ORDER BY clause.
*
* @group window_funcs
* @since 4.2.0
*/
def counter_diff(value: Column, startTime: Column): Column =
Column.fn("counter_diff", value, startTime)

/**
* Window function: returns the cumulative distribution of values within a window partition,
* i.e. the fraction of rows that are below the current row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ object FunctionRegistry {
expression[Rank]("rank"),
expression[DenseRank]("dense_rank"),
expression[PercentRank]("percent_rank"),
expressionBuilder("counter_diff", CounterDiffExpressionBuilder),

// predicates
expression[Between]("between"),
Expand Down
Loading