Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
- [x] last_day
- [x] localtimestamp
- [x] make_date
- [ ] make_dt_interval
- [x] make_dt_interval
- [ ] make_interval
- [ ] make_time
- [ ] make_timestamp
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ of expressions that be disabled.
| LastDay | `last_day` |
| LocalTimestamp | `localtimestamp` |
| MakeDate | `make_date` |
| MakeDTInterval | `make_dt_interval` |
| MakeTime | `make_time` |
| Minute | `minute` |
| NextDay | `next_day` |
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd;
use datafusion_spark::function::datetime::date_sub::SparkDateSub;
use datafusion_spark::function::datetime::from_utc_timestamp::SparkFromUtcTimestamp;
use datafusion_spark::function::datetime::last_day::SparkLastDay;
use datafusion_spark::function::datetime::make_dt_interval::SparkMakeDtInterval;
use datafusion_spark::function::datetime::next_day::SparkNextDay;
use datafusion_spark::function::datetime::to_utc_timestamp::SparkToUtcTimestamp;
use datafusion_spark::function::hash::crc32::SparkCrc32;
Expand Down Expand Up @@ -580,6 +581,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFromUtcTimestamp::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkMakeDtInterval::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkNextDay::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkToUtcTimestamp::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default()));
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ impl PhysicalPlanner {
DataType::Map(f, s) => DataType::Map(f, s).try_into()?,
DataType::List(f) => DataType::List(f).try_into()?,
DataType::Null => ScalarValue::Null,
DataType::Duration(TimeUnit::Microsecond) => {
ScalarValue::DurationMicrosecond(None)
}
DataType::Time64(TimeUnit::Nanosecond) => {
ScalarValue::Time64Nanosecond(None)
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType {
}
_ => unreachable!(),
},
DataTypeId::DurationMicrosecond => ArrowDataType::Duration(TimeUnit::Microsecond),
DataTypeId::Timestamp => {
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into()))
}
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ message DataType {
MAP = 15;
STRUCT = 16;
TIME = 17;
DURATION_MICROSECOND = 18;
}
DataTypeId type_id = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[LastDay] -> CometLastDay,
classOf[Hour] -> CometHour,
classOf[MakeDate] -> CometMakeDate,
classOf[MakeDTInterval] -> CometMakeDTInterval,
classOf[Minute] -> CometMinute,
classOf[NextDay] -> CometNextDay,
classOf[Second] -> CometSecond,
Expand Down Expand Up @@ -367,7 +368,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
_: DecimalType | _: DateType | _: BooleanType | _: NullType =>
_: DecimalType | _: DateType | _: DayTimeIntervalType | _: BooleanType | _: NullType =>
true
case dt if isTimeType(dt) =>
true
Expand Down Expand Up @@ -406,6 +407,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
case _: MapType => 15
case _: StructType => 16
case dt if isTimeType(dt) => 17
case _: DayTimeIntervalType => 18
case dt =>
logWarning(s"Cannot serialize Spark data type: $dt")
return None
Expand Down
13 changes: 12 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeDTInterval, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -434,6 +434,17 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day")

object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")

object CometMakeDTInterval extends CometScalarFunction[MakeDTInterval]("make_dt_interval") {
private val overflowIncompatReason: String =
"make_dt_interval returns NULL on arithmetic overflow, whereas Spark throws " +
"INTERVAL_ARITHMETIC_OVERFLOW"

override def getIncompatibleReasons(): Seq[String] = Seq(overflowIncompatReason)

override def getSupportLevel(expr: MakeDTInterval): SupportLevel =
Incompatible(Some(overflowIncompatReason))
}

object CometSecondsToTimestamp
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") {
override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ object Utils extends CometTypeShim with Logging {
case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH =>
YearMonthIntervalType()
case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType()
case d: ArrowType.Duration if d.getUnit == TimeUnit.MICROSECOND => DayTimeIntervalType()
case t: ArrowType.Time if t.getUnit == TimeUnit.NANOSECOND && t.getBitWidth == 64 =>
// scalastyle:off classforname
val clazz = Class.forName("org.apache.spark.sql.types.TimeType$")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.MakeDTInterval.allowIncompatible=true

-- literal arguments, all arities (omitted args default to 0)
query
SELECT make_dt_interval()

query
SELECT make_dt_interval(1)

query
SELECT make_dt_interval(1, 2)

query
SELECT make_dt_interval(1, 2, 3)

query
SELECT make_dt_interval(1, 2, 3, 4.5)

-- microsecond precision in the seconds component
query
SELECT make_dt_interval(0, 0, 0, 4.123456)

-- negative components
query
SELECT make_dt_interval(-1, -2, -3, -4.5)

-- null-intolerant: any null input yields a null result
query
SELECT make_dt_interval(CAST(NULL AS INT), 2, 3, 4.5)

query
SELECT make_dt_interval(1, 2, 3, CAST(NULL AS DECIMAL(18, 6)))

-- column inputs (not constant-folded): exercises the per-row path
statement
CREATE TABLE test_make_dt_interval(id INT, d INT, h INT, m INT, s DECIMAL(18, 6)) USING parquet

statement
INSERT INTO test_make_dt_interval VALUES
(1, 1, 2, 3, 4.5),
(2, 0, 0, 0, 0),
(3, -1, -2, -3, -4.123456),
(4, 100, 23, 59, 59.999999),
(5, NULL, 2, 3, 4.5)

query
SELECT id, make_dt_interval(d, h, m, s) FROM test_make_dt_interval
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- schema_of_json requires a foldable (literal) JSON argument; its result is a
-- constant schema string. In production, ConstantFolding collapses it to a
-- string literal that runs natively in Comet. This test suite disables
-- ConstantFolding, so the expression survives to the physical plan and Comet
-- falls back. The exact fallback reason differs by Spark version:
-- * Spark 3.4 / 3.5: "schema_of_json is not supported" (plain expression)
-- * Spark 4.0+: "invoke is not supported" (SchemaOfJson is now
-- RuntimeReplaceable -> Invoke on SchemaOfJsonEvaluator)
-- so we assert on the common substring "is not supported". These assertions
-- document the current fallback behavior while still verifying Comet's result
-- matches Spark. If Comet later runs this natively, flip to plain `query`.

query expect_fallback(is not supported)
SELECT schema_of_json('{"name":"John", "age":30}')

query expect_fallback(is not supported)
SELECT schema_of_json('{"users":[{"name":"John","scores":[95,87]}]}')

query expect_fallback(is not supported)
SELECT schema_of_json('[1, 2, 3]')

query expect_fallback(is not supported)
SELECT schema_of_json('{}')