From 3592481f2464c6694120b295c351987a3d5dec18 Mon Sep 17 00:00:00 2001 From: noroshi <253434427+n0r0shi@users.noreply.github.com> Date: Tue, 24 Feb 2026 03:00:25 +0000 Subject: [PATCH] feat: support make_dt_interval and make_interval expressions Register datafusion-spark's SparkMakeDtInterval and SparkMakeInterval UDFs and add serde mappings in temporalExpressions. --- native/core/src/execution/jni_api.rs | 4 +++ .../apache/comet/serde/QueryPlanSerde.scala | 4 ++- .../comet/CometTemporalExpressionSuite.scala | 26 +++++++++++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 0193f3012c..a67d2af8ac 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -45,6 +45,8 @@ use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::datetime::last_day::SparkLastDay; +use datafusion_spark::function::datetime::make_dt_interval::SparkMakeDtInterval; +use datafusion_spark::function::datetime::make_interval::SparkMakeInterval; use datafusion_spark::function::datetime::next_day::SparkNextDay; use datafusion_spark::function::hash::crc32::SparkCrc32; use datafusion_spark::function::hash::sha1::SparkSha1; @@ -400,6 +402,8 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkMakeDtInterval::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkMakeInterval::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9d13ccd9ed..35fdb95c2c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -213,7 +213,9 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[MakeDTInterval] -> CometScalarFunction("make_dt_interval"), + classOf[MakeInterval] -> CometScalarFunction("make_interval")) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..79856ee1b8 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -395,4 +395,30 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + test("make_dt_interval") { + // The 4th arg (seconds) is Double, and Spark casts it to Decimal(18,6). + // Comet's decimal cast has known rounding differences, so allow it here. + withSQLConf("spark.comet.expression.Cast.allowIncompatible" -> "true") { + withParquetTable(Seq((1, 2, 3, 4.5), (0, 0, 0, 0.0), (10, 23, 59, 59.999)), "tbl") { + checkSparkAnswerAndOperator("SELECT make_dt_interval(_1, _2, _3, _4) FROM tbl") + checkSparkAnswerAndOperator("SELECT make_dt_interval(1, 2, 3) FROM tbl") + checkSparkAnswerAndOperator("SELECT make_dt_interval(1) FROM tbl") + checkSparkAnswerAndOperator("SELECT make_dt_interval() FROM tbl") + } + } + } + + test("make_interval") { + // The 7th arg (seconds) is Double, and Spark casts it to Decimal(18,6). + // Comet's decimal cast has known rounding differences, so allow it here. + withSQLConf("spark.comet.expression.Cast.allowIncompatible" -> "true") { + withParquetTable(Seq((1, 2, 3, 4, 5, 6, 7.5), (0, 0, 0, 0, 0, 0, 0.0)), "tbl") { + checkSparkAnswerAndOperator("SELECT make_interval(_1, _2, _3, _4, _5, _6, _7) FROM tbl") + checkSparkAnswerAndOperator("SELECT make_interval(1, 2, 3) FROM tbl") + checkSparkAnswerAndOperator("SELECT make_interval(1) FROM tbl") + checkSparkAnswerAndOperator("SELECT make_interval() FROM tbl") + } + } + } }