From a29e7d4926a265cd3d1affc8f08842d5845dd9eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petar=20Niki=C4=87?= Date: Mon, 11 May 2026 14:38:27 +0000 Subject: [PATCH 1/2] [SPARK-56820][SQL] Add counter_diff window function for converting cumulative counters to delta format --- .../resources/error/error-conditions.json | 12 + .../reference/pyspark.sql/functions.rst | 1 + .../pyspark/sql/connect/functions/builtin.py | 9 + python/pyspark/sql/functions/__init__.py | 1 + python/pyspark/sql/functions/builtin.py | 78 ++ python/pyspark/sql/tests/test_functions.py | 29 + .../org/apache/spark/sql/functions.scala | 57 ++ .../catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/CounterDiff.scala | 364 ++++++++ .../expressions/decimalExpressions.scala | 28 + .../sql-functions/sql-expression-schema.md | 1 + .../analyzer-results/counter-diff.sql.out | 826 +++++++++++++++++ .../sql-tests/inputs/counter-diff.sql | 390 ++++++++ .../sql-tests/results/counter-diff.sql.out | 862 ++++++++++++++++++ .../sql/DataFrameWindowFunctionsSuite.scala | 22 + 15 files changed, 2681 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala create mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out create mode 100644 sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 889ecf9f7b08a..04cd3a51f79aa 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1386,6 +1386,18 @@ ], "sqlState" : "0A000" }, + "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE" : { + "message" : [ + "A negative counter value was provided to function . Negative counter values are not allowed." + ], + "sqlState" : "22003" + }, + "COUNTER_DIFF_START_TIME_DECREASED" : { + "message" : [ + "Start time provided to function decreased from to . Start time is required to be non-decreasing." + ], + "sqlState" : "22023" + }, "CREATE_PERMANENT_VIEW_WITHOUT_ALIAS" : { "message" : [ "Not allowed to create the permanent view without explicitly assigning an alias for the expression ." diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index d6810722e79a5..0303308dd3480 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -520,6 +520,7 @@ Window Functions .. autosummary:: :toctree: api/ + counter_diff cume_dist dense_rank lag diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index 22e52d91232cd..0ea0fe65a0ff9 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -1539,6 +1539,15 @@ def bit_xor(col: "ColumnOrName") -> Column: # Window Functions +def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = None) -> Column: + if startTime is None: + return _invoke_function_over_columns("counter_diff", value) + return _invoke_function_over_columns("counter_diff", value, startTime) + + +counter_diff.__doc__ = pysparkfuncs.counter_diff.__doc__ + + def cume_dist() -> Column: return _invoke_function("cume_dist") diff --git a/python/pyspark/sql/functions/__init__.py b/python/pyspark/sql/functions/__init__.py index 27db280be86d1..b90b5a26ecb01 100644 --- a/python/pyspark/sql/functions/__init__.py +++ b/python/pyspark/sql/functions/__init__.py @@ -434,6 +434,7 @@ "var_samp", "variance", # Window Functions + "counter_diff", "cume_dist", "dense_rank", "lag", diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index be948a22a30f2..fc6c462ff22f9 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -6464,6 +6464,84 @@ def rank() -> Column: return _invoke_function("rank") +@_try_remote_functions +def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = None) -> Column: + """ + Window function: computes the differences between consecutive cumulative counter values in a + time series, thereby converting the counter from the cumulative to the delta format. + + Gracefully handles counter resets by returning NULL. Counter resets are detected when the + counter value decreases, or when the start time advances between rows. + + Use the PARTITION BY clause of the window to separate independent counters. This is done by + specifying all columns which uniquely identify a time series. These are typically the counter + name and any attributes tied to the counter. + + Use the ORDER BY clause of the window to order the observations by the associated timestamp + in ascending order. + + .. versionadded:: 4.2.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or column name + A cumulative counter. Must be a numeric data type. Must be non-negative. + startTime : :class:`~pyspark.sql.Column` or column name, optional + An optional timestamp parameter which indicates when the counter was last set to zero. + Used to signal counter resets. + + Returns + ------- + :class:`~pyspark.sql.Column` + The difference between the current and previous counter value within the window partition. + + Examples + -------- + >>> from pyspark.sql import functions as sf + >>> from pyspark.sql import Window + >>> from datetime import datetime + >>> df = spark.createDataFrame( + ... [('http_requests', datetime(2026, 1, 1, 0, 0), 100), + ... ('http_requests', datetime(2026, 1, 1, 0, 1), 200), + ... ('http_requests', datetime(2026, 1, 1, 0, 2), 400), + ... ('http_requests', datetime(2026, 1, 1, 0, 3), 50), + ... ('http_requests', datetime(2026, 1, 1, 0, 4), 100)], + ... "m STRING, t TIMESTAMP_NTZ, c INT") + >>> w = Window.partitionBy("m").orderBy("t") + >>> df.select("m", "t", "c", sf.counter_diff("c").over(w).alias("diff")).show() + +-------------+-------------------+---+----+ + | m| t| c|diff| + +-------------+-------------------+---+----+ + |http_requests|2026-01-01 00:00:00|100|NULL| + |http_requests|2026-01-01 00:01:00|200| 100| + |http_requests|2026-01-01 00:02:00|400| 200| + |http_requests|2026-01-01 00:03:00| 50|NULL| + |http_requests|2026-01-01 00:04:00|100| 50| + +-------------+-------------------+---+----+ + + >>> df2 = spark.createDataFrame( + ... [('http_requests', datetime(2026, 1, 1, 0, 0), 100, datetime(2026, 1, 1, 0, 0)), + ... ('http_requests', datetime(2026, 1, 1, 0, 1), 200, datetime(2026, 1, 1, 0, 0)), + ... ('http_requests', datetime(2026, 1, 1, 0, 2), 400, datetime(2026, 1, 1, 0, 0)), + ... ('http_requests', datetime(2026, 1, 1, 0, 3), 500, datetime(2026, 1, 1, 0, 2, 15)), + ... ('http_requests', datetime(2026, 1, 1, 0, 4), 600, datetime(2026, 1, 1, 0, 2, 15))], + ... "m STRING, t TIMESTAMP_NTZ, c INT, s TIMESTAMP_NTZ") + >>> df2.select("m", "t", "s", "c", sf.counter_diff("c", "s").over(w).alias("diff")).show() + +-------------+-------------------+-------------------+---+----+ + | m| t| s| c|diff| + +-------------+-------------------+-------------------+---+----+ + |http_requests|2026-01-01 00:00:00|2026-01-01 00:00:00|100|NULL| + |http_requests|2026-01-01 00:01:00|2026-01-01 00:00:00|200| 100| + |http_requests|2026-01-01 00:02:00|2026-01-01 00:00:00|400| 200| + |http_requests|2026-01-01 00:03:00|2026-01-01 00:02:15|500|NULL| + |http_requests|2026-01-01 00:04:00|2026-01-01 00:02:15|600| 100| + +-------------+-------------------+-------------------+---+----+ + """ + if startTime is None: + return _invoke_function_over_columns("counter_diff", value) + return _invoke_function_over_columns("counter_diff", value, startTime) + + @_try_remote_functions def cume_dist() -> Column: """ diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 75824d3ebe491..ca14f62fd892c 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -1954,6 +1954,35 @@ def test_window_functions(self): for r, ex in zip(rs, expected): self.assertEqual(tuple(r), ex[: len(r)]) + def test_counter_diff_window_function(self): + df = self.spark.createDataFrame( + [ + (1, datetime.datetime(2024, 1, 1), 100), + (2, datetime.datetime(2024, 1, 1), 200), + (3, datetime.datetime(2024, 1, 1), 400), + (4, datetime.datetime(2024, 1, 2), 50), + (5, datetime.datetime(2024, 1, 2), 150), + ], + ["t", "st", "c"], + ) + w = Window.orderBy("t") + + rows = df.select("t", F.counter_diff("c").over(w).alias("d")).orderBy("t").collect() + self.assertEqual( + [(r.t, r.d) for r in rows], + [(1, None), (2, 100), (3, 200), (4, None), (5, 100)], + ) + + rows = ( + df.select("t", F.counter_diff("c", startTime="st").over(w).alias("d")) + .orderBy("t") + .collect() + ) + self.assertEqual( + [(r.t, r.d) for r in rows], + [(1, None), (2, 100), (3, 200), (4, None), (5, 100)], + ) + def test_window_functions_without_partitionBy(self): df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) w = Window.orderBy("key", df.value) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index 4ad731f6d8b01..47e97170bb1bd 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -2531,6 +2531,63 @@ object functions { // Window functions ////////////////////////////////////////////////////////////////////////////////////////////// + /** + * Window function: computes the differences between consecutive cumulative counter values in a + * time series, thereby converting the counter from the cumulative to the delta format. + * + * Gracefully handles counter resets by returning NULL. Counter resets are detected when the + * counter value decreases. + * + * Use the PARTITION BY clause of the window to separate independent counters. This is done by + * specifying all columns which uniquely identify a time series. These are typically the counter + * name and any attributes tied to the counter. + * + * Use the ORDER BY clause of the window to order the observations by the associated timestamp + * in ascending order. + * + * @param value + * A cumulative counter. Must be a numeric data type. Must be non-negative. + * + * @return + * The difference between the current and previous counter value within the window partition, + * according to the order defined by the window's ORDER BY clause. + * + * @group window_funcs + * @since 4.2.0 + */ + def counter_diff(value: Column): Column = Column.fn("counter_diff", value) + + /** + * Window function: returns the difference between consecutive cumulative counter values in a + * time series. With this operation, the counter is converted from cumulative to delta format. + * + * Gracefully handles counter resets by returning NULL. Counter resets are detected when the + * counter value decreases, or when the start time advances between rows. + * + * Use the PARTITION BY clause of the window to separate independent counters. This is done by + * specifying all columns which uniquely identify a time series. These are typically the counter + * name and any attributes tied to the counter. + * + * Use the ORDER BY clause of the window to order the observations by the associated timestamp + * in ascending order. + * + * @param value + * A cumulative counter. Must be a numeric data type. Must be non-negative. + * + * @param startTime + * An optional timestamp parameter which indicates when the counter was last set to zero. Used + * to signal counter resets. + * + * @return + * The difference between the current and previous counter value within the window partition, + * according to the order defined by the window's ORDER BY clause. + * + * @group window_funcs + * @since 4.2.0 + */ + def counter_diff(value: Column, startTime: Column): Column = + Column.fn("counter_diff", value, startTime) + /** * Window function: returns the cumulative distribution of values within a window partition, * i.e. the fraction of rows that are below the current row. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 34b9559c58511..ac1fbb9f9bef6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -922,6 +922,7 @@ object FunctionRegistry { expression[Rank]("rank"), expression[DenseRank]("dense_rank"), expression[PercentRank]("percent_rank"), + expressionBuilder("counter_diff", CounterDiffExpressionBuilder), // predicates expression[Between]("between"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala new file mode 100644 index 0000000000000..ec630f22265c2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, InputParameter} +import org.apache.spark.sql.errors.QueryErrorsBase +import org.apache.spark.sql.types._ + +/** + * The counter_diff window function computes the differences between consecutive cumulative counter + * values in a time series, thereby converting the counter from the cumulative to the delta format. + * + * This class serves as the base class for the two versions of the counter_diff function: + * - counter_diff(counter) -> CounterDiff(counter) + * - counter_diff(counter, start_time) -> CounterDiffWithStartTime(counter, startTime) + */ +abstract class CounterDiffBase(val counter: Expression) + extends AggregateWindowFunction + with QueryErrorsBase { + + override def prettyName: String = "counter_diff" + + override def nullable: Boolean = true + + override def dataType: DataType = counter.dataType + + /** + * Last non-NULL counter value from a previous row. + */ + protected lazy val prevCounter: AttributeReference = + AttributeReference("prevCounter", counter.dataType, nullable = true)() + + /** + * Counter value from the current row. + */ + protected lazy val currCounter: AttributeReference = + AttributeReference("currCounter", counter.dataType, nullable = true)() + + /** + * Null literal used as a counter_diff result, when appropriate. + */ + protected lazy val nullResult: Expression = Literal.create(null, counter.dataType) + + /** + * Difference between the current and previous counter values. + */ + protected lazy val diff: Expression = { + counter.dataType match { + // For DECIMAL, subtraction typically widens the result type to handle possible overflow. + // For counter_diff, since counters cannot be negative, there is no risk of overflow, and no + // need to widen the result type, so we subtract directly in the input type. + case dt: DecimalType => DecimalSubtractNoOverflowCheck(currCounter, prevCounter, dt) + case _ => currCounter - prevCounter + } + } + + /** + * Returns the difference, unless the counter has decreased, which is treated as a counter reset. + * In this case, NULL is returned. + */ + protected lazy val diffWithCounterDecreaseCheck: Expression = + If(currCounter < prevCounter, nullResult, diff) + + /** + * Error raised when the counter is negative. + */ + protected lazy val negativeCounterError: Expression = RaiseError( + Literal("COUNTER_DIFF_NEGATIVE_COUNTER_VALUE"), + CreateMap( + Seq( + Literal("value"), + Cast(currCounter, StringType), + Literal("function"), + Literal(toSQLId("counter_diff")) + ) + ), + counter.dataType + ) + + /** + * Wraps `inner` with the "skip row on NULL counter" and "raise error on negative counter" checks. + */ + protected def withCounterNullAndNegativeChecks(inner: Expression): Expression = { + If(IsNull(currCounter), + nullResult, + If(currCounter < Literal.default(counter.dataType), + negativeCounterError, + inner + ) + ) + } +} + +/** + * The single-parameter form of `counter_diff`: `counter_diff(value)`. + * Detects counter resets only when the counter value decreases. + */ +case class CounterDiff(override val counter: Expression) + extends CounterDiffBase(counter) + with ExpectsInputTypes { + + override def children: Seq[Expression] = Seq(counter) + + override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) + + /** + * The aggregation state attributes for the counter_diff function. + * In the single-parameter form, there are two attributes: + * - prevCounter: The last non-NULL counter value from a previous row. + * - currCounter: The counter value from the current row. + */ + override lazy val aggBufferAttributes: Seq[AttributeReference] = + Seq(prevCounter, currCounter) + + /** + * The initial aggregation state for the counter_diff function. Initial values are NULL. + */ + override lazy val initialValues: Seq[Expression] = Seq( + Literal.create(null, counter.dataType), + Literal.create(null, counter.dataType) + ) + + /** + * The update expressions for the counter_diff function's aggregation state. + * + * Fundamentally, the current value becomes the previous value, and the new value becomes the + * current value. + * + * Rows with NULL counter values should be skipped. As a result, the previous counter value + * should not be updated in the aggregation state. + */ + override lazy val updateExpressions: Seq[Expression] = Seq( + If(IsNotNull(currCounter), currCounter, prevCounter), + counter + ) + + /** + * The evaluation expression for the counter_diff function. + * + * Checks for edge cases first: NULL counter value, negative counter value and counter reset. + * Otherwise, returns the difference between the current and previous counter values. + */ + override lazy val evaluateExpression: Expression = + withCounterNullAndNegativeChecks(diffWithCounterDecreaseCheck) + + /** + * The SQL representation of the single-parameter form of the counter_diff function. + */ + override def sql: String = s"${prettyName}(${counter.sql})" + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): CounterDiff = + copy(counter = newChildren.head) +} + +/** + * The two-parameter form of `counter_diff`: `counter_diff(value, start_time)`. + * Additionally checks for counter resets when `start_time` increases, which signals a new start. + * Requires that the start time doesn't decrease, which would indicate moving backwards in time. + */ +case class CounterDiffWithStartTime( + override val counter: Expression, + startTime: Expression, + timeZoneId: Option[String] = None) + extends CounterDiffBase(counter) + with ExpectsInputTypes + with TimeZoneAwareExpression { + + override def withTimeZone(timeZoneId: String): CounterDiffWithStartTime = + copy(timeZoneId = Some(timeZoneId)) + + override def children: Seq[Expression] = Seq(counter, startTime) + + override def inputTypes: Seq[AbstractDataType] = + Seq(NumericType, TypeCollection(TimestampType, TimestampNTZType)) + + /** + * The start time from a previous row. + */ + protected lazy val prevStartTime: AttributeReference = + AttributeReference("prevStartTime", startTime.dataType, nullable = true)() + + /** + * The start time from the current row. + */ + protected lazy val currStartTime: AttributeReference = + AttributeReference("currStartTime", startTime.dataType, nullable = true)() + + /** + * The aggregation state attributes for the counter_diff function. + * In the two-parameter form, there are four attributes: + * - prevCounter: The last non-NULL counter value from a previous row. + * - currCounter: The counter value from the current row. + * - prevStartTime: The start time from a previous row. + * - currStartTime: The start time from the current row. + */ + override lazy val aggBufferAttributes: Seq[AttributeReference] = + Seq(prevCounter, currCounter, prevStartTime, currStartTime) + + /** + * The initial aggregation state for the counter_diff function. Initial values are NULL. + */ + override lazy val initialValues: Seq[Expression] = Seq( + Literal.create(null, counter.dataType), + Literal.create(null, counter.dataType), + Literal.create(null, startTime.dataType), + Literal.create(null, startTime.dataType) + ) + + /** + * The update expressions for the counter_diff function's aggregation state. + * + * Fundamentally, the current value becomes the previous value, and the new value becomes the + * current value. The same applies to the start time. + * + * Rows with NULL counter values should be skipped. As a result, the previous values for both + * the counter and start time should not be updated in the aggregation state. + */ + override lazy val updateExpressions: Seq[Expression] = Seq( + If(IsNotNull(currCounter), currCounter, prevCounter), + counter, + If(IsNotNull(currCounter), currStartTime, prevStartTime), + startTime + ) + + /** + * Error raised when the start time decreases. + */ + protected lazy val decreasedStartTimeError: Expression = RaiseError( + Literal("COUNTER_DIFF_START_TIME_DECREASED"), + CreateMap( + Seq( + Literal("function"), + Literal(toSQLId("counter_diff")), + Literal("previousStartTime"), + Cast(prevStartTime, StringType, timeZoneId), + Literal("currentStartTime"), + Cast(currStartTime, StringType, timeZoneId) + ) + ), + counter.dataType + ) + + /** + * The evaluation expression for the counter_diff function. + * + * Checks for edge cases first: NULL counter value, negative counter value, start time decrease + * and counter resets. + * + * Otherwise, returns the difference between the current and previous counter values. + */ + override lazy val evaluateExpression: Expression = withCounterNullAndNegativeChecks { + If(currStartTime < prevStartTime, + decreasedStartTimeError, + If(prevStartTime < currStartTime, + nullResult, + diffWithCounterDecreaseCheck + ) + ) + } + + /** + * The SQL representation of the two-parameter form of the counter_diff function. + */ + override def sql: String = + s"${prettyName}(${counter.sql}, ${startTime.sql})" + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): CounterDiffWithStartTime = + copy(counter = newChildren(0), startTime = newChildren(1)) +} + +// scalastyle:off line.size.limit line.contains.tab +@ExpressionDescription( + usage = """ + _FUNC_(value [, start_time]) - Computes the differences between consecutive cumulative counter + values in a time series, thereby converting the counter from the cumulative to the delta + format. + """, + arguments = """ + Arguments: + * value - A cumulative counter. Must be a numeric data type. Must be non-negative. + * start_time - An optional timestamp parameter which indicates when the counter was last set + to zero. Used to signal counter resets. + """, + examples = """ + Examples: + > SELECT m, t, c, _FUNC_(c) OVER (PARTITION BY m ORDER BY t) AS diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:03:00', 50), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:04:00', 100) AS tab(m, t, c) ORDER BY t; + http_requests 2026-01-01 00:00:00 100 NULL + http_requests 2026-01-01 00:01:00 200 100 + http_requests 2026-01-01 00:02:00 400 200 + http_requests 2026-01-01 00:03:00 50 NULL + http_requests 2026-01-01 00:04:00 100 50 + > SELECT m, t, s, c, _FUNC_(c, s) OVER (PARTITION BY m ORDER BY t) AS diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ '2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400, TIMESTAMP_NTZ '2026-01-01 00:00:00'), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:03:00', 500, TIMESTAMP_NTZ '2026-01-01 00:02:15'), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:04:00', 600, TIMESTAMP_NTZ '2026-01-01 00:02:15') AS tab(m, t, c, s) ORDER BY t; + http_requests 2026-01-01 00:00:00 2026-01-01 00:00:00 100 NULL + http_requests 2026-01-01 00:01:00 2026-01-01 00:00:00 200 100 + http_requests 2026-01-01 00:02:00 2026-01-01 00:00:00 400 200 + http_requests 2026-01-01 00:03:00 2026-01-01 00:02:15 500 NULL + http_requests 2026-01-01 00:04:00 2026-01-01 00:02:15 600 100 + """, + note = """ + _FUNC_ calculates the difference between the current and the previous counter value within the + partition, according to the order defined by the ORDER BY clause. + + Use the PARTITION BY clause of the window to separate independent counters. This is done by + specifying all columns which uniquely identify a time series. These are typically the counter + name and any attributes tied to the counter. + + Use the ORDER BY clause of the window to order the observations by the associated timestamp + in ascending order. + + The following special cases are handled: + 1. If the counter value is NULL, the row is skipped and NULL is returned. + 2. If the value is negative, or the start time decreases, an error is raised. + 3. In the case of a counter reset, NULL is returned. + 4. NULL is returned for the first row. + + Counter resets are detected when: + 1. The current counter value is less than the previous counter value. + 2. The current start time is greater than the previous start time, if start_time was provided. + """, + group = "window_funcs", + source = "built-in", + since = "4.2.0" +) +// scalastyle:on line.size.limit line.contains.tab +object CounterDiffExpressionBuilder extends ExpressionBuilder { + // Placeholder start time which serves as a default value. + private val DefaultStartTime: Literal = Literal.create(null, NullType) + + override def functionSignature: Option[FunctionSignature] = { + val valueArg = InputParameter("value") + val startTimeArg = InputParameter("start_time", Some(DefaultStartTime)) + Some(FunctionSignature(Seq(valueArg, startTimeArg))) + } + + override def build(funcName: String, expressions: Seq[Expression]): Expression = { + expressions match { + case Seq(value) => + CounterDiff(value) + case Seq(value, startTime) if startTime eq DefaultStartTime => + CounterDiff(value) + case Seq(value, startTime) => + CounterDiffWithStartTime(value, startTime) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala index 46ab43074409a..4bd1cad5e929a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala @@ -247,6 +247,34 @@ case class DecimalAddNoOverflowCheck( copy(left = newLeft, right = newRight) } +/** + * A subtract expression for decimal values which is only used internally. + * + * Note that, this expression does not check overflow which is different from `Subtract`. + */ +case class DecimalSubtractNoOverflowCheck( + left: Expression, + right: Expression, + override val dataType: DataType) extends BinaryOperator { + require(dataType.isInstanceOf[DecimalType]) + + override def inputType: AbstractDataType = DecimalType + override def symbol: String = "-" + private def decimalMethod: String = "$minus" + + private lazy val numeric = TypeUtils.getNumeric(dataType) + + override protected def nullSafeEval(input1: Any, input2: Any): Any = + numeric.minus(input1, input2) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1.$decimalMethod($eval2)") + + override protected def withNewChildrenInternal( + newLeft: Expression, newRight: Expression): DecimalSubtractNoOverflowCheck = + copy(left = newLeft, right = newRight) +} + /** * A divide expression for decimal values which is only used internally by Avg. * diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index c56db6c9b7e3f..9ac6e49b03436 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -94,6 +94,7 @@ | org.apache.spark.sql.catalyst.expressions.Cos | cos | SELECT cos(0) | struct | | org.apache.spark.sql.catalyst.expressions.Cosh | cosh | SELECT cosh(0) | struct | | org.apache.spark.sql.catalyst.expressions.Cot | cot | SELECT cot(1) | struct | +| org.apache.spark.sql.catalyst.expressions.CounterDiffExpressionBuilder | counter_diff | SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff FROM VALUES ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:00:00', 100), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:01:00', 200), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:02:00', 400), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:03:00', 50), ('http_requests', TIMESTAMP_NTZ '2026-01-01 00:04:00', 100) AS tab(m, t, c) ORDER BY t | struct | | org.apache.spark.sql.catalyst.expressions.Crc32 | crc32 | SELECT crc32('Spark') | struct | | org.apache.spark.sql.catalyst.expressions.CreateArray | array | SELECT array(1, 2, 3) | struct> | | org.apache.spark.sql.catalyst.expressions.CreateMap | map | SELECT map(1.0, '2', 3.0, '4') | struct> | diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out new file mode 100644 index 0000000000000..3be955dc393f8 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/counter-diff.sql.out @@ -0,0 +1,826 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +SET TIME ZONE 'UTC' +-- !query analysis +SetCommand (spark.sql.session.timeZone,Some(UTC)) + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 50) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff +FROM VALUES + ('a', 1, 100), ('a', 2, 200), + ('b', 1, 10), ('b', 2, 30) +AS tab(m, t, c) +ORDER BY m, t +-- !query analysis +Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true ++- Project [m#x, t#x, c#x, diff#x] + +- Project [m#x, t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(m#x, t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [m#x], [t#x ASC NULLS FIRST] + +- Project [m#x, t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [m#x, t#x, c#x] + + +-- !query +SELECT t, + counter_diff(d) OVER (ORDER BY t) AS d_diff, + counter_diff(f) OVER (ORDER BY t) AS f_diff, + counter_diff(b) OVER (ORDER BY t) AS b_diff, + counter_diff(l) OVER (ORDER BY t) AS l_diff, + counter_diff(i) OVER (ORDER BY t) AS i_diff, + counter_diff(si) OVER (ORDER BY t) AS si_diff, + counter_diff(ti) OVER (ORDER BY t) AS ti_diff, + counter_diff(dec) OVER (ORDER BY t) AS dec_diff +FROM VALUES + (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG), + CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT), + CAST(10.5 AS DECIMAL(10,2))), + (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG), + CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT), + CAST(20.5 AS DECIMAL(10,2))) +AS tab(t, d, f, b, l, i, si, ti, dec) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x, dec_diff#x] + +- Project [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x, d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x, dec_diff#x, d_diff#x, f_diff#x, b_diff#xL, l_diff#xL, i_diff#x, si_diff#x, ti_diff#x, dec_diff#x] + +- Window [counter_diff(d#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d_diff#x, counter_diff(f#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS f_diff#x, counter_diff(b#xL) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS b_diff#xL, counter_diff(l#xL) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS l_diff#xL, counter_diff(i#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS i_diff#x, counter_diff(si#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS si_diff#x, counter_diff(ti#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ti_diff#x, counter_diff(dec#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS dec_diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, d#x, f#x, b#xL, l#xL, i#x, si#x, ti#x, dec#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))), + (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38))) +AS tab(t, c) ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x, diff_subtract#x] + +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS diff_subtract#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))), + (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6))) +AS tab(t, c) ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x, diff_subtract#x] + +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS diff_subtract#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(99999999.98 AS DECIMAL(10, 2))), + (2, CAST(99999999.99 AS DECIMAL(10, 2))) +AS tab(t, c) ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x, diff_subtract#x] + +- Project [t#x, c#x, diff#x, _we1#x, diff#x, (c#x - _we1#x) AS diff_subtract#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, diff#x, diff#x] + +- Window [counter_diff(null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st) +ORDER BY t +-- !query analysis +[Analyzer test output redacted due to nondeterminism] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM (SELECT t, CAST(c AS INT) AS c + FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Project [t#x, cast(c#x as int) AS c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, -5) AS tab(t, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, -5.0D) AS tab(t, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES + (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D), + (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES + (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D), + (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query analysis +Sort [t#xL ASC NULLS FIRST], true ++- Project [t#xL, diff#x] + +- Project [t#xL, diff#x, diff#x] + +- Window [counter_diff(1) windowspecdefinition(t#xL ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#xL ASC NULLS FIRST] + +- Project [t#xL] + +- SubqueryAlias tab + +- Project [id#xL AS t#xL] + +- Range (1, 5, step=1) + + +-- !query +SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query analysis +Sort [t#xL ASC NULLS FIRST], true ++- Project [t#xL, diff#x] + +- Project [t#xL, diff#x, diff#x] + +- Window [counter_diff((1 + 1)) windowspecdefinition(t#xL ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#xL ASC NULLS FIRST] + +- Project [t#xL] + +- SubqueryAlias tab + +- Project [id#xL AS t#xL] + +- Range (1, 5, step=1) + + +-- !query +SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00') + OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query analysis +[Analyzer test output redacted due to nondeterminism] + + +-- !query +SELECT t, counter_diff( + 1 + 1, + TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND + ) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query analysis +[Analyzer test output redacted due to nondeterminism] + + +-- !query +SELECT t, c, + counter_diff(c) OVER (ORDER BY t) AS diff, + c - lag(c) OVER (ORDER BY t) AS d1, + c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2 +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x, d1#x, d2#x] + +- Project [t#x, c#x, diff#x, _we1#x, _we2#x, diff#x, (c#x - _we1#x) AS d1#x, (c#x - _we2#x) AS d2#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#x, lag(c#x, -1, null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we2#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT t, c, + counter_diff(c) OVER (ORDER BY t) AS diff, + avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg, + lead(c) OVER (ORDER BY t) AS nc, + max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) AS some_max +FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x, local_avg#x, nc#x, some_max#x] + +- Project [t#x, c#x, diff#x, local_avg#x, nc#x, some_max#x, diff#x, local_avg#x, nc#x, some_max#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x, avg(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, 1)) AS local_avg#x, lead(c#x, 1, null) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS nc#x, max(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 1)) AS some_max#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT m, t, c, + counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff, + avg(c) OVER (PARTITION BY m ORDER BY t + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg, + max(c) OVER (PARTITION BY t) AS max_c +FROM VALUES + ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300), + ('b', 1, 10), ('b', 2, 30), ('b', 3, 60), ('b', 4, 100) +AS tab(m, t, c) +ORDER BY m, t +-- !query analysis +Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true ++- Project [m#x, t#x, c#x, diff#x, local_avg#x, max_c#x] + +- Project [m#x, t#x, c#x, diff#x, local_avg#x, max_c#x, diff#x, local_avg#x, max_c#x] + +- Window [max(c#x) windowspecdefinition(t#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_c#x], [t#x] + +- Window [counter_diff(c#x) windowspecdefinition(m#x, t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x, avg(c#x) windowspecdefinition(m#x, t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, 1)) AS local_avg#x], [m#x], [t#x ASC NULLS FIRST] + +- Project [m#x, t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [m#x, t#x, c#x] + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, TIMESTAMP '2026-01-01 00:00:00', 200), + (3, TIMESTAMP '2026-01-01 00:02:31', 400) +AS tab(t, st, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, TIMESTAMP '2026-01-01 00:00:00', 200) +AS tab(t, st, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, CAST(NULL AS TIMESTAMP), 200) +AS tab(t, st, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', 200) +AS tab(t, st, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, st#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, st#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', -1) +AS tab(t, st, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, st#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, st#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100), + (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200), + (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300) +AS tab(t, st, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, c#x, diff#x] + +- Project [t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff +FROM VALUES + ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100), + ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200), + ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500), + ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10), + ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20) +AS tab(m, t, st, c) +ORDER BY m, t +-- !query analysis +Sort [m#x ASC NULLS FIRST, t#x ASC NULLS FIRST], true ++- Project [m#x, t#x, c#x, diff#x] + +- Project [m#x, t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(m#x, t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [m#x], [t#x ASC NULLS FIRST] + +- Project [m#x, t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [m#x, t#x, st#x, c#x] + + +-- !query +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)) +AS tab(t, st, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)), + (3, TIMESTAMP '2026-01-01 00:01:00', 300) +AS tab(t, st, c) +-- !query analysis +Project [diff#x] ++- Project [c#x, st#x, t#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [c#x, st#x, t#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)), + (3, TIMESTAMP '2026-01-01 00:07:00', 300) +AS tab(t, st, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, st#x, diff#x, diff#x] + +- Window [counter_diff(c#x, st#x, Some(UTC)) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x, st#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, st#x, c#x] + + +-- !query +WITH gen AS ( + SELECT + TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts, + CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00' + ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st, + CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c + FROM RANGE(8) AS r(id) +), +diffs AS ( + SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff + FROM gen +) +SELECT date_trunc('hour', ts) AS hour_bucket, + SUM(diff) AS total_diff +FROM diffs GROUP BY 1 ORDER BY 1 +-- !query analysis +[Analyzer test output redacted due to nondeterminism] + + +-- !query +SELECT counter_diff(1) AS diff +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE", + "sqlState" : "42601", + "messageParameters" : { + "funcName" : "\"counter_diff(1)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "counter_diff(1) AS diff" + } ] +} + + +-- !query +SELECT counter_diff(c) OVER () AS diff +FROM VALUES (1) AS tab(c) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WINDOW_FUNCTION_FRAME_NOT_ORDERED", + "sqlState" : "42601", + "messageParameters" : { + "wf_expr" : "counter_diff(tab.c)", + "wf_name" : "counter_diff" + } +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 'abc') AS tab(t, c) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"c\"", + "inputType" : "\"STRING\"", + "paramIndex" : "first", + "requiredType" : "\"NUMERIC\"", + "sqlExpr" : "\"counter_diff(c)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 40, + "fragment" : "counter_diff(c) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, 'abc', 100) AS tab(t, st, c) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"st\"", + "inputType" : "\"STRING\"", + "paramIndex" : "second", + "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")", + "sqlExpr" : "\"counter_diff(c, st)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 44, + "fragment" : "counter_diff(c, st) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1036", + "messageParameters" : { + "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())", + "wf" : "specifiedwindowframe(RowFrame, -1, 1)" + } +} + + +-- !query +SELECT t, counter_diff(c) OVER ( + ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +ORDER BY t +-- !query analysis +Sort [t#x ASC NULLS FIRST], true ++- Project [t#x, diff#x] + +- Project [t#x, c#x, diff#x, diff#x] + +- Window [counter_diff(c#x) windowspecdefinition(t#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS diff#x], [t#x ASC NULLS FIRST] + +- Project [t#x, c#x] + +- SubqueryAlias tab + +- LocalRelation [t#x, c#x] + + +-- !query +SELECT counter_diff(c) OVER ( + ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1036", + "messageParameters" : { + "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())", + "wf" : "specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())" + } +} + + +-- !query +SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "REQUIRED_PARAMETER_NOT_FOUND", + "sqlState" : "4274K", + "messageParameters" : { + "index" : "0", + "parameterName" : "`value`", + "routineName" : "`counter_diff`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 39, + "fragment" : "counter_diff() OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff +FROM VALUES (1) AS tab(t) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", + "sqlState" : "42605", + "messageParameters" : { + "actualNum" : "3", + "docroot" : "https://spark.apache.org/docs/latest", + "expectedNum" : "[1, 2]", + "functionName" : "`counter_diff`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 68, + "fragment" : "counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, true), (2, false) AS tab(t, c) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"c\"", + "inputType" : "\"BOOLEAN\"", + "paramIndex" : "first", + "requiredType" : "\"NUMERIC\"", + "sqlExpr" : "\"counter_diff(c)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 40, + "fragment" : "counter_diff(c) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"st\"", + "inputType" : "\"DATE\"", + "paramIndex" : "second", + "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")", + "sqlExpr" : "\"counter_diff(c, st)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 44, + "fragment" : "counter_diff(c, st) OVER (ORDER BY t)" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql b/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql new file mode 100644 index 0000000000000..17db75b2888ca --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/counter-diff.sql @@ -0,0 +1,390 @@ +-- Tests for the counter_diff window function. + +SET TIME ZONE 'UTC'; + +------------------------------------------------------------ +-- Basic semantics +------------------------------------------------------------ + +-- Monotonically increasing counter: each diff is current - previous. +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c) +ORDER BY t; + +-- Single-row input: the only row has no predecessor and returns NULL. +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 50) AS tab(t, c) +ORDER BY t; + +-- Counter reset detected by value decrease: the row after the drop returns NULL. +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c) +ORDER BY t; + +-- Equal counter values produce a diff of 0. +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c) +ORDER BY t; + +-- NULL counter rows have NULL as the result and are skipped when calculating differences. +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c) +ORDER BY t; + +-- Each partition has its own first row and prior values. +SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff +FROM VALUES + ('a', 1, 100), ('a', 2, 200), + ('b', 1, 10), ('b', 2, 30) +AS tab(m, t, c) +ORDER BY m, t; + +------------------------------------------------------------ +-- Numeric types +------------------------------------------------------------ + +-- counter_diff supports all numeric types: +-- DOUBLE, FLOAT, BIGINT, LONG, INT, SMALLINT, TINYINT, DECIMAL. +SELECT t, + counter_diff(d) OVER (ORDER BY t) AS d_diff, + counter_diff(f) OVER (ORDER BY t) AS f_diff, + counter_diff(b) OVER (ORDER BY t) AS b_diff, + counter_diff(l) OVER (ORDER BY t) AS l_diff, + counter_diff(i) OVER (ORDER BY t) AS i_diff, + counter_diff(si) OVER (ORDER BY t) AS si_diff, + counter_diff(ti) OVER (ORDER BY t) AS ti_diff, + counter_diff(dec) OVER (ORDER BY t) AS dec_diff +FROM VALUES + (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG), + CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT), + CAST(10.5 AS DECIMAL(10,2))), + (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG), + CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT), + CAST(20.5 AS DECIMAL(10,2))) +AS tab(t, d, f, b, l, i, si, ti, dec) +ORDER BY t; + +------------------------------------------------------------ +-- High-precision DECIMAL inputs +------------------------------------------------------------ +-- Decimal subtraction normally widens the result type to handle possible overflow. +-- For counter_diff, since counters cannot be negative, there is no risk of overflow, and no +-- need to widen the result type, so we subtract directly in the input type. +-- These tests verify that the result type is not widened, and that no precision is lost for +-- large precision and scale. + +-- DECIMAL(38, 38): normal subtraction would be of type DECIMAL(38, 37). +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))), + (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38))) +AS tab(t, c) ORDER BY t; + +-- DECIMAL(38, 6): normal subtraction would be of type DECIMAL(38, 6). +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))), + (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6))) +AS tab(t, c) ORDER BY t; + +-- DECIMAL(10, 2): normal subtraction would be of type DECIMAL(11, 2). +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(99999999.98 AS DECIMAL(10, 2))), + (2, CAST(99999999.99 AS DECIMAL(10, 2))) +AS tab(t, c) ORDER BY t; + +------------------------------------------------------------ +-- NULL type inputs +------------------------------------------------------------ + +-- Untyped NULL counters are treated as DOUBLE. +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c) +ORDER BY t; + +-- Untyped NULL start_time is treated as TIMESTAMP. +-- The counter behavior is unaffected because NULL start_time skips the reset check. +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st) +ORDER BY t; + +-- Explicitly-typed all-NULL INT counter: type stays INT. +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM (SELECT t, CAST(c AS INT) AS c + FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)) +ORDER BY t; + +------------------------------------------------------------ +-- Negative counter values are runtime errors +------------------------------------------------------------ + +-- INT. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, -5) AS tab(t, c); + +-- DOUBLE. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, -5.0D) AS tab(t, c); + +-- DECIMAL: the error message preserves the configured scale. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c); + +-- -Infinity is treated as a negative value. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c); + +-- Negative value after a NULL row still results in an error. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c); + +------------------------------------------------------------ +-- Special floating-point values +------------------------------------------------------------ + +-- Positive Infinity participates in arithmetic: +-- +Infinity - 100 = +Infinity. +-- 200 - +Infinity => Reset. +-- +Infinity - 200 = +Infinity. +-- +Infinity - +Infinity = NaN. +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES + (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D), + (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c) +ORDER BY t; + +-- NaN values are greater than all other numeric values, so: +-- any value -> NaN => NaN +-- NaN -> any non-NaN value => Reset +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES + (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D), + (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c) +ORDER BY t; + +------------------------------------------------------------ +-- Constants and foldable expressions as arguments +------------------------------------------------------------ + +-- Constant counter: every diff except the first is 0. +SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t; + +-- Foldable expression counter (1 + 1) behaves like the constant case. +SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t; + +-- Constant start_time alongside constant counter. +SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00') + OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t; + +-- Foldable counter and foldable start_time. +SELECT t, counter_diff( + 1 + 1, + TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND + ) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t; + +------------------------------------------------------------ +-- Combined with other window functions +------------------------------------------------------------ + +-- Compare counter_diff against lag and lag IGNORE NULLS over the same ordering. +SELECT t, c, + counter_diff(c) OVER (ORDER BY t) AS diff, + c - lag(c) OVER (ORDER BY t) AS d1, + c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2 +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c) +ORDER BY t; + +-- Mix counter_diff with avg, lead, max over different frames. +SELECT t, c, + counter_diff(c) OVER (ORDER BY t) AS diff, + avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg, + lead(c) OVER (ORDER BY t) AS nc, + max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) AS some_max +FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS tab(t, c) +ORDER BY t; + +-- Multiple windows with different partitions in the same SELECT. +SELECT m, t, c, + counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff, + avg(c) OVER (PARTITION BY m ORDER BY t + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg, + max(c) OVER (PARTITION BY t) AS max_c +FROM VALUES + ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300), + ('b', 1, 10), ('b', 2, 30), ('b', 3, 60), ('b', 4, 100) +AS tab(m, t, c) +ORDER BY m, t; + +------------------------------------------------------------ +-- start_time parameter +------------------------------------------------------------ + +-- start_time advance triggers a reset even with the counter increased. +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, TIMESTAMP '2026-01-01 00:00:00', 200), + (3, TIMESTAMP '2026-01-01 00:02:31', 400) +AS tab(t, st, c) +ORDER BY t; + +-- Equal start_time across rows: behavior matches the case with no start time. +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, TIMESTAMP '2026-01-01 00:00:00', 200) +AS tab(t, st, c) +ORDER BY t; + +-- NULL start_time skips the start time reset check on that row. +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, CAST(NULL AS TIMESTAMP), 200) +AS tab(t, st, c) +ORDER BY t; + +-- Decreasing start_time is a runtime error. +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', 200) +AS tab(t, st, c); + +-- Negative counter still raises in the start_time form. +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', -1) +AS tab(t, st, c); + +-- TIMESTAMP_NTZ start_time is accepted; same reset behavior as TIMESTAMP. +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100), + (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200), + (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300) +AS tab(t, st, c) +ORDER BY t; + +-- Partitioned start_time: each partition tracks its own previous start_time. +SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff +FROM VALUES + ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100), + ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200), + ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500), + ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10), + ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20) +AS tab(m, t, st, c) +ORDER BY m, t; + +-- A NULL-counter row is skipped before the start_time check, so a same-row +-- start_time decrease is absorbed when the counter is NULL. +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)) +AS tab(t, st, c) +ORDER BY t; + +-- ...but the next non-NULL row still observes the decrease and raises. +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)), + (3, TIMESTAMP '2026-01-01 00:01:00', 300) +AS tab(t, st, c); + +-- A NULL-counter row is skipped before the start_time check, so a same-row +-- start_time increase is absorbed when the counter is NULL, but is resurfaced +-- on the next non-NULL row. +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)), + (3, TIMESTAMP '2026-01-01 00:07:00', 300) +AS tab(t, st, c) +ORDER BY t; + +------------------------------------------------------------ +-- End-to-end: bucket per-row diffs by hour using a CTE +------------------------------------------------------------ + +-- 8 measurements every 30 min; start_time changes at id=4 -> single reset. +-- The hourly SUM of diffs yields the per-hour total counter increase. +WITH gen AS ( + SELECT + TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts, + CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00' + ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st, + CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c + FROM RANGE(8) AS r(id) +), +diffs AS ( + SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff + FROM gen +) +SELECT date_trunc('hour', ts) AS hour_bucket, + SUM(diff) AS total_diff +FROM diffs GROUP BY 1 ORDER BY 1; + +------------------------------------------------------------ +-- Frame, arity, and type validation +------------------------------------------------------------ + +-- counter_diff requires an OVER clause (it is a window function). +SELECT counter_diff(1) AS diff; + +-- Window must specify ORDER BY (otherwise the frame is unordered). +SELECT counter_diff(c) OVER () AS diff +FROM VALUES (1) AS tab(c); + +-- Counter argument must be NUMERIC: STRING is rejected. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 'abc') AS tab(t, c); + +-- start_time argument must be TIMESTAMP or TIMESTAMP_NTZ: STRING is rejected. +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, 'abc', 100) AS tab(t, st, c); + +-- A user-supplied frame other than ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW is rejected. +SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c); + +-- Explicitly specifying the required frame is allowed. +SELECT t, counter_diff(c) OVER ( + ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +ORDER BY t; + +-- RANGE frames are rejected (counter_diff is row-based). +SELECT counter_diff(c) OVER ( + ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c); + +-- Zero arguments: not accepted. +SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t); + +-- More than 2 arguments: not accepted. +SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff +FROM VALUES (1) AS tab(t); + +-- BOOLEAN counter is rejected. +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, true), (2, false) AS tab(t, c); + +-- DATE start_time is rejected. +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c); diff --git a/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out b/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out new file mode 100644 index 0000000000000..02479ea879bc0 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/counter-diff.sql.out @@ -0,0 +1,862 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +SET TIME ZONE 'UTC' +-- !query schema +struct +-- !query output +spark.sql.session.timeZone UTC + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 200), (3, 400) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 200 100 +3 400 200 + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 50) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 200), (3, 400), (4, 50), (5, 100) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 200 100 +3 400 200 +4 50 NULL +5 100 50 + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, 100), (3, 200) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 100 0 +3 200 100 + + +-- !query +SELECT t, c, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 200) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 NULL NULL +3 200 100 + + +-- !query +SELECT m, t, c, counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff +FROM VALUES + ('a', 1, 100), ('a', 2, 200), + ('b', 1, 10), ('b', 2, 30) +AS tab(m, t, c) +ORDER BY m, t +-- !query schema +struct +-- !query output +a 1 100 NULL +a 2 200 100 +b 1 10 NULL +b 2 30 20 + + +-- !query +SELECT t, + counter_diff(d) OVER (ORDER BY t) AS d_diff, + counter_diff(f) OVER (ORDER BY t) AS f_diff, + counter_diff(b) OVER (ORDER BY t) AS b_diff, + counter_diff(l) OVER (ORDER BY t) AS l_diff, + counter_diff(i) OVER (ORDER BY t) AS i_diff, + counter_diff(si) OVER (ORDER BY t) AS si_diff, + counter_diff(ti) OVER (ORDER BY t) AS ti_diff, + counter_diff(dec) OVER (ORDER BY t) AS dec_diff +FROM VALUES + (1, 1.5D, CAST(1.5 AS FLOAT), CAST(100 AS BIGINT), CAST(100 AS LONG), + CAST(100 AS INT), CAST(100 AS SMALLINT), CAST(10 AS TINYINT), + CAST(10.5 AS DECIMAL(10,2))), + (2, 3.5D, CAST(3.5 AS FLOAT), CAST(300 AS BIGINT), CAST(300 AS LONG), + CAST(300 AS INT), CAST(300 AS SMALLINT), CAST(30 AS TINYINT), + CAST(20.5 AS DECIMAL(10,2))) +AS tab(t, d, f, b, l, i, si, ti, dec) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL NULL NULL NULL NULL NULL NULL NULL +2 2.0 2.0 200 200 200 200 20 10.00 + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(0.10000000000000000000000000000000000001 AS DECIMAL(38, 38))), + (2, CAST(0.10000000000000000000000000000000000002 AS DECIMAL(38, 38))) +AS tab(t, c) ORDER BY t +-- !query schema +struct +-- !query output +1 NULL NULL +2 0.00000000000000000000000000000000000001 0.0000000000000000000000000000000000000 + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(12345678901234567890123456789012.123456 AS DECIMAL(38, 6))), + (2, CAST(12345678901234567890123456789012.123457 AS DECIMAL(38, 6))) +AS tab(t, c) ORDER BY t +-- !query schema +struct +-- !query output +1 NULL NULL +2 0.000001 0.000001 + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff, c - lag(c) OVER (ORDER BY t) AS diff_subtract +FROM VALUES + (1, CAST(99999999.98 AS DECIMAL(10, 2))), + (2, CAST(99999999.99 AS DECIMAL(10, 2))) +AS tab(t, c) ORDER BY t +-- !query schema +struct +-- !query output +1 NULL NULL +2 0.01 0.01 + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 NULL +3 NULL + + +-- !query +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100, NULL), (2, 200, NULL) AS tab(t, c, st) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 100 + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM (SELECT t, CAST(c AS INT) AS c + FROM VALUES (1, NULL), (2, NULL), (3, NULL) AS tab(t, c)) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 NULL +3 NULL + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, -5) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE", + "sqlState" : "22003", + "messageParameters" : { + "function" : "`counter_diff`", + "value" : "-5" + } +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, -5.0D) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE", + "sqlState" : "22003", + "messageParameters" : { + "function" : "`counter_diff`", + "value" : "-5.0" + } +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, CAST(-5.5 AS DECIMAL(10, 3))) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE", + "sqlState" : "22003", + "messageParameters" : { + "function" : "`counter_diff`", + "value" : "-5.500" + } +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, DOUBLE('-Infinity')) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE", + "sqlState" : "22003", + "messageParameters" : { + "function" : "`counter_diff`", + "value" : "-Infinity" + } +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, -5) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE", + "sqlState" : "22003", + "messageParameters" : { + "function" : "`counter_diff`", + "value" : "-5" + } +} + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES + (1, 100.0D), (2, DOUBLE('Infinity')), (3, 200.0D), + (4, DOUBLE('Infinity')), (5, DOUBLE('Infinity')) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 Infinity +3 NULL +4 Infinity +5 NaN + + +-- !query +SELECT t, counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES + (1, 100.0D), (2, DOUBLE('NaN')), (3, DOUBLE('NaN')), (4, 200.0D), + (5, 400.0D), (6, DOUBLE('NaN')), (7, 50.0D), (8, 100.0D) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 NaN +3 NaN +4 NULL +5 200.0 +6 NaN +7 NULL +8 50.0 + + +-- !query +SELECT t, counter_diff(1) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 0 +3 0 +4 0 + + +-- !query +SELECT t, counter_diff(1 + 1) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 0 +3 0 +4 0 + + +-- !query +SELECT t, counter_diff(1, TIMESTAMP '2026-01-01 00:00:00') + OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 0 +3 0 +4 0 + + +-- !query +SELECT t, counter_diff( + 1 + 1, + TIMESTAMP '2026-01-01 00:00:00' + INTERVAL '1' SECOND + ) OVER (ORDER BY t) AS diff +FROM RANGE(1, 5) AS tab(t) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 0 +3 0 +4 0 + + +-- !query +SELECT t, c, + counter_diff(c) OVER (ORDER BY t) AS diff, + c - lag(c) OVER (ORDER BY t) AS d1, + c - lag(c) IGNORE NULLS OVER (ORDER BY t) AS d2 +FROM VALUES (1, 100), (2, CAST(NULL AS INT)), (3, 300), (4, 150) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL NULL NULL +2 NULL NULL NULL NULL +3 300 200 NULL 200 +4 150 NULL -150 -150 + + +-- !query +SELECT t, c, + counter_diff(c) OVER (ORDER BY t) AS diff, + avg(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg, + lead(c) OVER (ORDER BY t) AS nc, + max(c) OVER (ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) AS some_max +FROM VALUES (1, 100), (2, 200), (3, 150), (4, 400), (5, 500), (6, 600) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL 150.0 200 200 +2 200 100 150.0 150 200 +3 150 NULL 250.0 400 400 +4 400 250 350.0 500 500 +5 500 100 500.0 600 600 +6 600 100 550.0 NULL 600 + + +-- !query +SELECT m, t, c, + counter_diff(c) OVER (PARTITION BY m ORDER BY t) AS diff, + avg(c) OVER (PARTITION BY m ORDER BY t + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS local_avg, + max(c) OVER (PARTITION BY t) AS max_c +FROM VALUES + ('a', 1, 100), ('a', 2, 200), ('a', 3, 150), ('a', 4, 300), + ('b', 1, 10), ('b', 2, 30), ('b', 3, 60), ('b', 4, 100) +AS tab(m, t, c) +ORDER BY m, t +-- !query schema +struct +-- !query output +a 1 100 NULL 150.0 100 +a 2 200 100 150.0 200 +a 3 150 NULL 216.66666666666666 150 +a 4 300 150 225.0 300 +b 1 10 NULL 20.0 100 +b 2 30 20 33.333333333333336 200 +b 3 60 30 63.333333333333336 150 +b 4 100 40 80.0 300 + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, TIMESTAMP '2026-01-01 00:00:00', 200), + (3, TIMESTAMP '2026-01-01 00:02:31', 400) +AS tab(t, st, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 200 100 +3 400 NULL + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, TIMESTAMP '2026-01-01 00:00:00', 200) +AS tab(t, st, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 200 100 + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', 100), + (2, CAST(NULL AS TIMESTAMP), 200) +AS tab(t, st, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 200 100 + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', 200) +AS tab(t, st, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_START_TIME_DECREASED", + "sqlState" : "22023", + "messageParameters" : { + "currentStartTime" : "2026-01-01 00:01:00", + "function" : "`counter_diff`", + "previousStartTime" : "2026-01-01 00:05:00" + } +} + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:00:00', -1) +AS tab(t, st, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_NEGATIVE_COUNTER_VALUE", + "sqlState" : "22003", + "messageParameters" : { + "function" : "`counter_diff`", + "value" : "-1" + } +} + + +-- !query +SELECT t, c, + counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100), + (2, TIMESTAMP_NTZ '2026-01-01 00:00:00', 200), + (3, TIMESTAMP_NTZ '2026-01-01 00:05:00', 300) +AS tab(t, st, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 100 NULL +2 200 100 +3 300 NULL + + +-- !query +SELECT m, t, c, counter_diff(c, st) OVER (PARTITION BY m ORDER BY t) AS diff +FROM VALUES + ('a', 1, TIMESTAMP '2026-01-01 00:00:00', 100), + ('a', 2, TIMESTAMP '2026-01-01 00:00:00', 200), + ('a', 3, TIMESTAMP '2026-01-01 00:05:00', 500), + ('b', 1, TIMESTAMP '2026-01-01 00:00:00', 10), + ('b', 2, TIMESTAMP '2026-01-01 00:00:00', 20) +AS tab(m, t, st, c) +ORDER BY m, t +-- !query schema +struct +-- !query output +a 1 100 NULL +a 2 200 100 +a 3 500 NULL +b 1 10 NULL +b 2 20 10 + + +-- !query +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)) +AS tab(t, st, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 NULL + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:01:00', CAST(NULL AS INT)), + (3, TIMESTAMP '2026-01-01 00:01:00', 300) +AS tab(t, st, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkRuntimeException +{ + "errorClass" : "COUNTER_DIFF_START_TIME_DECREASED", + "sqlState" : "22023", + "messageParameters" : { + "currentStartTime" : "2026-01-01 00:01:00", + "function" : "`counter_diff`", + "previousStartTime" : "2026-01-01 00:05:00" + } +} + + +-- !query +SELECT t, counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES + (1, TIMESTAMP '2026-01-01 00:05:00', 100), + (2, TIMESTAMP '2026-01-01 00:06:00', CAST(NULL AS INT)), + (3, TIMESTAMP '2026-01-01 00:07:00', 300) +AS tab(t, st, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 NULL +3 NULL + + +-- !query +WITH gen AS ( + SELECT + TIMESTAMPADD(MINUTE, id * 30, TIMESTAMP '2026-01-01 00:00:00') AS ts, + CASE WHEN id < 4 THEN TIMESTAMP '2026-01-01 00:00:00' + ELSE TIMESTAMP '2026-01-01 02:00:00' END AS st, + CASE WHEN id < 4 THEN id * 1000 ELSE (id - 4) * 800 END AS c + FROM RANGE(8) AS r(id) +), +diffs AS ( + SELECT ts, c, counter_diff(c, st) OVER (ORDER BY ts) AS diff + FROM gen +) +SELECT date_trunc('hour', ts) AS hour_bucket, + SUM(diff) AS total_diff +FROM diffs GROUP BY 1 ORDER BY 1 +-- !query schema +struct +-- !query output +2026-01-01 00:00:00 1000 +2026-01-01 01:00:00 2000 +2026-01-01 02:00:00 800 +2026-01-01 03:00:00 1600 + + +-- !query +SELECT counter_diff(1) AS diff +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE", + "sqlState" : "42601", + "messageParameters" : { + "funcName" : "\"counter_diff(1)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "counter_diff(1) AS diff" + } ] +} + + +-- !query +SELECT counter_diff(c) OVER () AS diff +FROM VALUES (1) AS tab(c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WINDOW_FUNCTION_FRAME_NOT_ORDERED", + "sqlState" : "42601", + "messageParameters" : { + "wf_expr" : "counter_diff(tab.c)", + "wf_name" : "counter_diff" + } +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, 'abc') AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"c\"", + "inputType" : "\"STRING\"", + "paramIndex" : "first", + "requiredType" : "\"NUMERIC\"", + "sqlExpr" : "\"counter_diff(c)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 40, + "fragment" : "counter_diff(c) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, 'abc', 100) AS tab(t, st, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"st\"", + "inputType" : "\"STRING\"", + "paramIndex" : "second", + "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")", + "sqlExpr" : "\"counter_diff(c, st)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 44, + "fragment" : "counter_diff(c, st) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1036", + "messageParameters" : { + "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())", + "wf" : "specifiedwindowframe(RowFrame, -1, 1)" + } +} + + +-- !query +SELECT t, counter_diff(c) OVER ( + ORDER BY t ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +ORDER BY t +-- !query schema +struct +-- !query output +1 NULL +2 100 + + +-- !query +SELECT counter_diff(c) OVER ( + ORDER BY t RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS diff +FROM VALUES (1, 100), (2, 200) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1036", + "messageParameters" : { + "required" : "specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())", + "wf" : "specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())" + } +} + + +-- !query +SELECT counter_diff() OVER (ORDER BY t) AS diff FROM VALUES (1) AS tab(t) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "REQUIRED_PARAMETER_NOT_FOUND", + "sqlState" : "4274K", + "messageParameters" : { + "index" : "0", + "parameterName" : "`value`", + "routineName" : "`counter_diff`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 39, + "fragment" : "counter_diff() OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t) AS diff +FROM VALUES (1) AS tab(t) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", + "sqlState" : "42605", + "messageParameters" : { + "actualNum" : "3", + "docroot" : "https://spark.apache.org/docs/latest", + "expectedNum" : "[1, 2]", + "functionName" : "`counter_diff`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 68, + "fragment" : "counter_diff(1, TIMESTAMP '2026-01-01', 99) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c) OVER (ORDER BY t) AS diff +FROM VALUES (1, true), (2, false) AS tab(t, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"c\"", + "inputType" : "\"BOOLEAN\"", + "paramIndex" : "first", + "requiredType" : "\"NUMERIC\"", + "sqlExpr" : "\"counter_diff(c)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 40, + "fragment" : "counter_diff(c) OVER (ORDER BY t)" + } ] +} + + +-- !query +SELECT counter_diff(c, st) OVER (ORDER BY t) AS diff +FROM VALUES (1, DATE '2026-01-01', 100) AS tab(t, st, c) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"st\"", + "inputType" : "\"DATE\"", + "paramIndex" : "second", + "requiredType" : "(\"TIMESTAMP\" or \"TIMESTAMP_NTZ\")", + "sqlExpr" : "\"counter_diff(c, st)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 44, + "fragment" : "counter_diff(c, st) OVER (ORDER BY t)" + } ] +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index c46cde0d0db13..ffb018b264bdc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -835,6 +835,28 @@ class DataFrameWindowFunctionsSuite extends SharedSparkSession "v", "z", null, "v", "z", "y", null, "va"))) } + test("counter_diff with and without startTime") { + import java.sql.Timestamp + val df = Seq( + (1, Timestamp.valueOf("2024-01-01 00:00:00"), 100), + (2, Timestamp.valueOf("2024-01-01 12:00:00"), 200), + (3, Timestamp.valueOf("2024-01-01 12:00:00"), 400), + (4, Timestamp.valueOf("2024-01-02 00:00:00"), 50), + (5, Timestamp.valueOf("2024-01-02 00:00:00"), 150) + ).toDF("t", "st", "c") + val w = Window.orderBy($"t") + + // 1-arg form: reset detected by counter decrease only. + checkAnswer( + df.select($"t", counter_diff($"c").over(w)).orderBy($"t"), + Seq(Row(1, null), Row(2, 100), Row(3, 200), Row(4, null), Row(5, 100))) + + // 2-arg form: reset also detected by startTime advance. + checkAnswer( + df.select($"t", counter_diff($"c", $"st").over(w)).orderBy($"t"), + Seq(Row(1, null), Row(2, null), Row(3, 200), Row(4, null), Row(5, 100))) + } + test("lag - Offset expression must be a literal") { val nullStr: String = null val df = Seq( From ad3cb41da8cb7d440c5274d02b21d7574cefb565 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petar=20Niki=C4=87?= Date: Wed, 13 May 2026 10:41:05 +0000 Subject: [PATCH 2/2] Address counter_diff PR comments * Add counter_diff Connect + classic test * Remove "optional" from counter_diff with start time doc comment * Fix incorrect counter_diff parameter name in PySpark docstring * Remove redundant "nullable" override for counter_diff * Improve expression description for counter_diff for the NULL counter case * Remove single-parameter case from counter_diff match and explain reasoning --- python/pyspark/sql/functions/builtin.py | 2 +- .../tests/connect/test_connect_function.py | 31 +++++++++++++++++++ .../org/apache/spark/sql/functions.scala | 4 +-- .../catalyst/expressions/CounterDiff.scala | 11 ++++--- 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index fc6c462ff22f9..eccb7d768e885 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -6484,7 +6484,7 @@ def counter_diff(value: "ColumnOrName", startTime: Optional["ColumnOrName"] = No Parameters ---------- - col : :class:`~pyspark.sql.Column` or column name + value : :class:`~pyspark.sql.Column` or column name A cumulative counter. Must be a numeric data type. Must be non-negative. startTime : :class:`~pyspark.sql.Column` or column name, optional An optional timestamp parameter which indicates when the counter was last set to zero. diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index e287ea9326ae7..ed653333b6c5d 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -766,6 +766,37 @@ def test_window_functions(self): sdf.select(scol.over(swin)).toPandas(), ) + # test counter_diff: requires non-negative values and has a two-argument form that + # accepts a start_time parameter, so a separate dataset is used. + counter_diff_query = """ + SELECT * FROM VALUES + (0, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (0, TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (0, TIMESTAMP_NTZ '2026-01-01 00:02:00', 300, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (0, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1200, TIMESTAMP_NTZ '2026-01-01 00:02:15'), + (0, TIMESTAMP_NTZ '2026-01-01 00:04:00', 1300, TIMESTAMP_NTZ '2026-01-01 00:02:15'), + (0, TIMESTAMP_NTZ '2026-01-01 00:05:00', 50, TIMESTAMP_NTZ '2026-01-01 00:02:15'), + (0, TIMESTAMP_NTZ '2026-01-01 00:06:00', 100, TIMESTAMP_NTZ '2026-01-01 00:02:15'), + (0, TIMESTAMP_NTZ '2026-01-01 00:07:00', 20, TIMESTAMP_NTZ '2026-01-01 00:06:45'), + (0, TIMESTAMP_NTZ '2026-01-01 00:08:00', 60, TIMESTAMP_NTZ '2026-01-01 00:06:45'), + (1, TIMESTAMP_NTZ '2026-01-01 00:00:00', 100, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (1, TIMESTAMP_NTZ '2026-01-01 00:01:00', 200, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (1, TIMESTAMP_NTZ '2026-01-01 00:02:00', 300, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (1, TIMESTAMP_NTZ '2026-01-01 00:03:00', 1000, TIMESTAMP_NTZ '2025-12-31 00:00:00'), + (1, TIMESTAMP_NTZ '2026-01-01 00:04:00', 75, TIMESTAMP_NTZ '2026-01-01 00:03:15') + AS tab(p, t, c, s) + """ + cdf_cd = self.connect.sql(counter_diff_query) + sdf_cd = self.spark.sql(counter_diff_query) + for ccol, scol in [ + (CF.counter_diff("c"), SF.counter_diff("c")), + (CF.counter_diff("c", "s"), SF.counter_diff("c", "s")), + ]: + self.assert_eq( + cdf_cd.select(ccol.over(CW.partitionBy("p").orderBy("t"))).toPandas(), + sdf_cd.select(scol.over(SW.partitionBy("p").orderBy("t"))).toPandas(), + ) + # test aggregation functions for ccol, scol in [ (CF.count("c"), SF.count("c")), diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index 47e97170bb1bd..b6044fcba8de9 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -2575,8 +2575,8 @@ object functions { * A cumulative counter. Must be a numeric data type. Must be non-negative. * * @param startTime - * An optional timestamp parameter which indicates when the counter was last set to zero. Used - * to signal counter resets. + * A timestamp indicating when the counter was last set to zero. Used to signal counter + * resets. * * @return * The difference between the current and previous counter value within the window partition, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala index ec630f22265c2..ea4802d54d943 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CounterDiff.scala @@ -37,8 +37,6 @@ abstract class CounterDiffBase(val counter: Expression) override def prettyName: String = "counter_diff" - override def nullable: Boolean = true - override def dataType: DataType = counter.dataType /** @@ -327,7 +325,8 @@ case class CounterDiffWithStartTime( in ascending order. The following special cases are handled: - 1. If the counter value is NULL, the row is skipped and NULL is returned. + 1. If the counter value is NULL, NULL is returned for that row, and the row's counter is + excluded from the baseline used by subsequent rows. 2. If the value is negative, or the start time decreases, an error is raised. 3. In the case of a counter reset, NULL is returned. 4. NULL is returned for the first row. @@ -352,9 +351,11 @@ object CounterDiffExpressionBuilder extends ExpressionBuilder { } override def build(funcName: String, expressions: Seq[Expression]): Expression = { + // The function signature defines two parameters, so two expressions are always provided. + // For the single-parameter form, the start_time argument takes on the default value, which is + // exactly `DefaultStartTime`, so `eq` is used to check this case. This also differentiates + // the single-parameter form from the two-parameter form with an explicit NULL start time. expressions match { - case Seq(value) => - CounterDiff(value) case Seq(value, startTime) if startTime eq DefaultStartTime => CounterDiff(value) case Seq(value, startTime) =>