From 81f3690a12650d027b06c2e09e68fd7b829dd423 Mon Sep 17 00:00:00 2001 From: noroshi <253434427+n0r0shi@users.noreply.github.com> Date: Mon, 16 Feb 2026 03:36:43 +0000 Subject: [PATCH 1/4] feat: support Spark luhn_check via StaticInvoke Register datafusion-spark's SparkLuhnCheck UDF and add StaticInvoke handler for ExpressionImplUtils.isLuhnNumber (Spark 3.5+). --- native/core/src/execution/jni_api.rs | 2 ++ .../org/apache/comet/serde/statics.scala | 27 +++++++++++++++++-- .../comet/CometStringExpressionSuite.scala | 18 +++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 0193f3012c..4d5985226b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -55,6 +55,7 @@ use datafusion_spark::function::math::hex::SparkHex; use datafusion_spark::function::math::width_bucket::SparkWidthBucket; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; +use datafusion_spark::function::string::luhn_check::SparkLuhnCheck; use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; @@ -400,6 +401,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLuhnCheck::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/statics.scala b/spark/src/main/scala/org/apache/comet/serde/statics.scala index 0737644ab9..3f70891fd6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/statics.scala +++ b/spark/src/main/scala/org/apache/comet/serde/statics.scala @@ -19,11 +19,13 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils +import org.apache.spark.sql.types.BooleanType import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { @@ -34,7 +36,8 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { : Map[(String, Class[_]), CometExpressionSerde[StaticInvoke]] = Map( ("readSidePadding", classOf[CharVarcharCodegenUtils]) -> CometScalarFunction( - "read_side_padding")) + "read_side_padding"), + ("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometLuhnCheck) override def convert( expr: StaticInvoke, @@ -52,3 +55,23 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { } } } + +/** + * Handler for ExpressionImplUtils.isLuhnNumber StaticInvoke (Spark 3.5+). + * Maps to datafusion-spark's built-in luhn_check function. + */ +private object CometLuhnCheck extends CometExpressionSerde[StaticInvoke] { + + override def convert( + expr: StaticInvoke, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(expr.arguments.head, inputs, binding) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "luhn_check", + BooleanType, + false, + childExpr) + optExprWithInfo(optExpr, expr, expr.arguments.head) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 121d7f7d5a..14195ac95e 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -148,6 +148,24 @@ class CometStringExpressionSuite extends CometTestBase { } } + test("luhn_check") { + val data = Seq( + "79927398710", // invalid (fails Luhn) + "79927398713", // valid Luhn number + "1234567812345670", // valid credit card-like + "0", // valid single digit + "", // empty string + "abc", // non-numeric + null).map(Tuple1(_)) + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT luhn_check(_1) FROM tbl") + // literal values + checkSparkAnswerAndOperator("SELECT luhn_check('79927398713') FROM tbl") + // null handling + checkSparkAnswerAndOperator("SELECT luhn_check(NULL) FROM tbl") + } + } + test("split string basic") { withSQLConf("spark.comet.expression.StringSplit.allowIncompatible" -> "true") { withParquetTable((0 until 5).map(i => (s"value$i,test$i", i)), "tbl") { From 39a887e2c86e7d3c1a377075bb6ca9f825df07b5 Mon Sep 17 00:00:00 2001 From: noroshi <253434427+n0r0shi@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:26:03 +0000 Subject: [PATCH 2/4] fix spotless --- .../main/scala/org/apache/comet/serde/statics.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/statics.scala b/spark/src/main/scala/org/apache/comet/serde/statics.scala index 3f70891fd6..5a0fa4e267 100644 --- a/spark/src/main/scala/org/apache/comet/serde/statics.scala +++ b/spark/src/main/scala/org/apache/comet/serde/statics.scala @@ -57,8 +57,8 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { } /** - * Handler for ExpressionImplUtils.isLuhnNumber StaticInvoke (Spark 3.5+). - * Maps to datafusion-spark's built-in luhn_check function. + * Handler for ExpressionImplUtils.isLuhnNumber StaticInvoke (Spark 3.5+). Maps to + * datafusion-spark's built-in luhn_check function. */ private object CometLuhnCheck extends CometExpressionSerde[StaticInvoke] { @@ -67,11 +67,8 @@ private object CometLuhnCheck extends CometExpressionSerde[StaticInvoke] { inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { val childExpr = exprToProtoInternal(expr.arguments.head, inputs, binding) - val optExpr = scalarFunctionExprToProtoWithReturnType( - "luhn_check", - BooleanType, - false, - childExpr) + val optExpr = + scalarFunctionExprToProtoWithReturnType("luhn_check", BooleanType, false, childExpr) optExprWithInfo(optExpr, expr, expr.arguments.head) } } From c3bc9ac84aa949fff73ae6fd925ef5fd41bf4c15 Mon Sep 17 00:00:00 2001 From: noroshi Date: Wed, 4 Mar 2026 20:11:55 +0000 Subject: [PATCH 3/4] address feedback --- .../org/apache/comet/serde/statics.scala | 21 +----------- .../expressions/string/luhn_check.sql | 32 +++++++++++++++++++ .../comet/CometStringExpressionSuite.scala | 18 ----------- 3 files changed, 33 insertions(+), 38 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql diff --git a/spark/src/main/scala/org/apache/comet/serde/statics.scala b/spark/src/main/scala/org/apache/comet/serde/statics.scala index 5a0fa4e267..9dbc6d169f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/statics.scala +++ b/spark/src/main/scala/org/apache/comet/serde/statics.scala @@ -22,10 +22,8 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils -import org.apache.spark.sql.types.BooleanType import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { @@ -37,7 +35,7 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { Map( ("readSidePadding", classOf[CharVarcharCodegenUtils]) -> CometScalarFunction( "read_side_padding"), - ("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometLuhnCheck) + ("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometScalarFunction("luhn_check")) override def convert( expr: StaticInvoke, @@ -55,20 +53,3 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { } } } - -/** - * Handler for ExpressionImplUtils.isLuhnNumber StaticInvoke (Spark 3.5+). Maps to - * datafusion-spark's built-in luhn_check function. - */ -private object CometLuhnCheck extends CometExpressionSerde[StaticInvoke] { - - override def convert( - expr: StaticInvoke, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - val childExpr = exprToProtoInternal(expr.arguments.head, inputs, binding) - val optExpr = - scalarFunctionExprToProtoWithReturnType("luhn_check", BooleanType, false, childExpr) - optExprWithInfo(optExpr, expr, expr.arguments.head) - } -} diff --git a/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql b/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql new file mode 100644 index 0000000000..9a37840568 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql @@ -0,0 +1,32 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_luhn(s string) USING parquet + +statement +INSERT INTO test_luhn VALUES ('79927398710'), ('79927398713'), ('1234567812345670'), ('0'), (''), ('abc'), (NULL) + +-- column input +query +SELECT luhn_check(s) FROM test_luhn + +-- literal arguments +query +SELECT luhn_check('79927398713'), luhn_check('79927398710'), luhn_check(''), luhn_check(NULL) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 14195ac95e..121d7f7d5a 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -148,24 +148,6 @@ class CometStringExpressionSuite extends CometTestBase { } } - test("luhn_check") { - val data = Seq( - "79927398710", // invalid (fails Luhn) - "79927398713", // valid Luhn number - "1234567812345670", // valid credit card-like - "0", // valid single digit - "", // empty string - "abc", // non-numeric - null).map(Tuple1(_)) - withParquetTable(data, "tbl") { - checkSparkAnswerAndOperator("SELECT luhn_check(_1) FROM tbl") - // literal values - checkSparkAnswerAndOperator("SELECT luhn_check('79927398713') FROM tbl") - // null handling - checkSparkAnswerAndOperator("SELECT luhn_check(NULL) FROM tbl") - } - } - test("split string basic") { withSQLConf("spark.comet.expression.StringSplit.allowIncompatible" -> "true") { withParquetTable((0 until 5).map(i => (s"value$i,test$i", i)), "tbl") { From 39e71c1924fc5f291126229f3c14897375c10021 Mon Sep 17 00:00:00 2001 From: noroshi Date: Wed, 4 Mar 2026 21:36:37 +0000 Subject: [PATCH 4/4] remove unnecessary version check --- .../test/resources/sql-tests/expressions/string/luhn_check.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql b/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql index 9a37840568..ea10808467 100644 --- a/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql +++ b/spark/src/test/resources/sql-tests/expressions/string/luhn_check.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- MinSparkVersion: 3.5 -- ConfigMatrix: parquet.enable.dictionary=false,true statement