diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 21695bdf57..e0bc5f06ee 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -58,6 +58,51 @@ Expressions that are not 100% Spark-compatible will fall back to Spark by defaul `spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark expression class name. See the [Comet Supported Expressions Guide](expressions.md) for more information on this configuration setting. +### Array Expressions + +- **ArrayContains**: Returns null instead of false for empty arrays with literal values. + [#3346](https://github.com/apache/datafusion-comet/issues/3346) +- **ArrayRemove**: Returns null when the element to remove is null, instead of removing null elements from the array. + [#3173](https://github.com/apache/datafusion-comet/issues/3173) +- **GetArrayItem**: Known correctness issues with index handling, including off-by-one errors and incorrect results + with dynamic (non-literal) index values. + [#3330](https://github.com/apache/datafusion-comet/issues/3330), + [#3332](https://github.com/apache/datafusion-comet/issues/3332) +- **ArraysOverlap**: Inconsistent behavior when arrays contain NULL values. + [#3645](https://github.com/apache/datafusion-comet/issues/3645), + [#2036](https://github.com/apache/datafusion-comet/issues/2036) +- **ArrayUnion**: Sorts input arrays before performing the union, while Spark preserves the order of the first array + and appends unique elements from the second. + [#3644](https://github.com/apache/datafusion-comet/issues/3644) + +### Date/Time Expressions + +- **Hour, Minute, Second**: Incorrectly apply timezone conversion to TimestampNTZ inputs. TimestampNTZ stores local + time without timezone, so no conversion should be applied. These expressions work correctly with Timestamp inputs. + [#3180](https://github.com/apache/datafusion-comet/issues/3180) +- **TruncTimestamp (date_trunc)**: Produces incorrect results when used with non-UTC timezones. Compatible when + timezone is UTC. + [#2649](https://github.com/apache/datafusion-comet/issues/2649) + +### Math Expressions + +- **Ceil, Floor**: Incorrect results for Decimal type inputs. + [#1729](https://github.com/apache/datafusion-comet/issues/1729) +- **Tan**: `tan(-0.0)` produces `0.0` instead of `-0.0`. + [#1897](https://github.com/apache/datafusion-comet/issues/1897) + +### Aggregate Expressions + +- **Corr**: Returns null instead of NaN in some edge cases. + [#2646](https://github.com/apache/datafusion-comet/issues/2646) +- **First, Last**: These functions are not deterministic. When `ignoreNulls` is set, results may not match Spark. + [#1630](https://github.com/apache/datafusion-comet/issues/1630) + +### Struct Expressions + +- **StructsToJson (to_json)**: Does not support `+Infinity` and `-Infinity` for numeric types (float, double). + [#3016](https://github.com/apache/datafusion-comet/issues/3016) + ## Regular Expressions Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 0339cd2a3e..57b7a3455a 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -92,76 +92,76 @@ Expressions that are not Spark-compatible will fall back to Spark by default and ## Date/Time Functions -| Expression | SQL | Spark-Compatible? | Compatibility Notes | -| -------------- | ---------------------------- | ----------------- | -------------------------------------------------------------------------------------------------------------------- | -| DateAdd | `date_add` | Yes | | -| DateDiff | `datediff` | Yes | | -| DateFormat | `date_format` | Yes | Partial support. Only specific format patterns are supported. | -| DateSub | `date_sub` | Yes | | -| DatePart | `date_part(field, source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | -| Extract | `extract(field FROM source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | -| FromUnixTime | `from_unixtime` | No | Does not support format, supports only -8334601211038 <= sec <= 8210266876799 | -| Hour | `hour` | Yes | | -| LastDay | `last_day` | Yes | | -| Minute | `minute` | Yes | | -| Second | `second` | Yes | | -| TruncDate | `trunc` | Yes | | -| TruncTimestamp | `date_trunc` | Yes | | -| UnixDate | `unix_date` | Yes | | -| UnixTimestamp | `unix_timestamp` | Yes | | -| Year | `year` | Yes | | -| Month | `month` | Yes | | -| DayOfMonth | `day`/`dayofmonth` | Yes | | -| DayOfWeek | `dayofweek` | Yes | | -| WeekDay | `weekday` | Yes | | -| DayOfYear | `dayofyear` | Yes | | -| WeekOfYear | `weekofyear` | Yes | | -| Quarter | `quarter` | Yes | | +| Expression | SQL | Spark-Compatible? | Compatibility Notes | +| -------------- | ---------------------------- | ----------------- | -------------------------------------------------------------------------------------------------------------------------------- | +| DateAdd | `date_add` | Yes | | +| DateDiff | `datediff` | Yes | | +| DateFormat | `date_format` | Yes | Partial support. Only specific format patterns are supported. | +| DateSub | `date_sub` | Yes | | +| DatePart | `date_part(field, source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | +| Extract | `extract(field FROM source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | +| FromUnixTime | `from_unixtime` | No | Does not support format, supports only -8334601211038 <= sec <= 8210266876799 | +| Hour | `hour` | No | Incorrectly applies timezone conversion to TimestampNTZ inputs ([#3180](https://github.com/apache/datafusion-comet/issues/3180)) | +| LastDay | `last_day` | Yes | | +| Minute | `minute` | No | Incorrectly applies timezone conversion to TimestampNTZ inputs ([#3180](https://github.com/apache/datafusion-comet/issues/3180)) | +| Second | `second` | No | Incorrectly applies timezone conversion to TimestampNTZ inputs ([#3180](https://github.com/apache/datafusion-comet/issues/3180)) | +| TruncDate | `trunc` | Yes | | +| TruncTimestamp | `date_trunc` | No | Incorrect results in non-UTC timezones ([#2649](https://github.com/apache/datafusion-comet/issues/2649)) | +| UnixDate | `unix_date` | Yes | | +| UnixTimestamp | `unix_timestamp` | Yes | | +| Year | `year` | Yes | | +| Month | `month` | Yes | | +| DayOfMonth | `day`/`dayofmonth` | Yes | | +| DayOfWeek | `dayofweek` | Yes | | +| WeekDay | `weekday` | Yes | | +| DayOfYear | `dayofyear` | Yes | | +| WeekOfYear | `weekofyear` | Yes | | +| Quarter | `quarter` | Yes | | ## Math Expressions -| Expression | SQL | Spark-Compatible? | Compatibility Notes | -| -------------- | --------- | ----------------- | --------------------------------- | -| Abs | `abs` | Yes | | -| Acos | `acos` | Yes | | -| Add | `+` | Yes | | -| Asin | `asin` | Yes | | -| Atan | `atan` | Yes | | -| Atan2 | `atan2` | Yes | | -| BRound | `bround` | Yes | | -| Ceil | `ceil` | Yes | | -| Cos | `cos` | Yes | | -| Cosh | `cosh` | Yes | | -| Cot | `cot` | Yes | | -| Divide | `/` | Yes | | -| Exp | `exp` | Yes | | -| Expm1 | `expm1` | Yes | | -| Floor | `floor` | Yes | | -| Hex | `hex` | Yes | | -| IntegralDivide | `div` | Yes | | -| IsNaN | `isnan` | Yes | | -| Log | `log` | Yes | | -| Log2 | `log2` | Yes | | -| Log10 | `log10` | Yes | | -| Multiply | `*` | Yes | | -| Pow | `power` | Yes | | -| Rand | `rand` | Yes | | -| Randn | `randn` | Yes | | -| Remainder | `%` | Yes | | -| Round | `round` | Yes | | -| Signum | `signum` | Yes | | -| Sin | `sin` | Yes | | -| Sinh | `sinh` | Yes | | -| Sqrt | `sqrt` | Yes | | -| Subtract | `-` | Yes | | -| Tan | `tan` | Yes | | -| Tanh | `tanh` | Yes | | -| TryAdd | `try_add` | Yes | Only integer inputs are supported | -| TryDivide | `try_div` | Yes | Only integer inputs are supported | -| TryMultiply | `try_mul` | Yes | Only integer inputs are supported | -| TrySubtract | `try_sub` | Yes | Only integer inputs are supported | -| UnaryMinus | `-` | Yes | | -| Unhex | `unhex` | Yes | | +| Expression | SQL | Spark-Compatible? | Compatibility Notes | +| -------------- | --------- | ----------------- | ----------------------------------------------------------------------------------------------------------- | +| Abs | `abs` | Yes | | +| Acos | `acos` | Yes | | +| Add | `+` | Yes | | +| Asin | `asin` | Yes | | +| Atan | `atan` | Yes | | +| Atan2 | `atan2` | Yes | | +| BRound | `bround` | Yes | | +| Ceil | `ceil` | No | Incorrect results for Decimal type inputs ([#1729](https://github.com/apache/datafusion-comet/issues/1729)) | +| Cos | `cos` | Yes | | +| Cosh | `cosh` | Yes | | +| Cot | `cot` | Yes | | +| Divide | `/` | Yes | | +| Exp | `exp` | Yes | | +| Expm1 | `expm1` | Yes | | +| Floor | `floor` | No | Incorrect results for Decimal type inputs ([#1729](https://github.com/apache/datafusion-comet/issues/1729)) | +| Hex | `hex` | Yes | | +| IntegralDivide | `div` | Yes | | +| IsNaN | `isnan` | Yes | | +| Log | `log` | Yes | | +| Log2 | `log2` | Yes | | +| Log10 | `log10` | Yes | | +| Multiply | `*` | Yes | | +| Pow | `power` | Yes | | +| Rand | `rand` | Yes | | +| Randn | `randn` | Yes | | +| Remainder | `%` | Yes | | +| Round | `round` | Yes | | +| Signum | `signum` | Yes | | +| Sin | `sin` | Yes | | +| Sinh | `sinh` | Yes | | +| Sqrt | `sqrt` | Yes | | +| Subtract | `-` | Yes | | +| Tan | `tan` | No | tan(-0.0) produces incorrect result ([#1897](https://github.com/apache/datafusion-comet/issues/1897)) | +| Tanh | `tanh` | Yes | | +| TryAdd | `try_add` | Yes | Only integer inputs are supported | +| TryDivide | `try_div` | Yes | Only integer inputs are supported | +| TryMultiply | `try_mul` | Yes | Only integer inputs are supported | +| TrySubtract | `try_sub` | Yes | Only integer inputs are supported | +| UnaryMinus | `-` | Yes | | +| Unhex | `unhex` | Yes | | ## Hashing Functions @@ -188,27 +188,27 @@ Expressions that are not Spark-compatible will fall back to Spark by default and ## Aggregate Expressions -| Expression | SQL | Spark-Compatible? | Compatibility Notes | -| ------------- | ---------- | ------------------------- | ---------------------------------------------------------------- | -| Average | | Yes, except for ANSI mode | | -| BitAndAgg | | Yes | | -| BitOrAgg | | Yes | | -| BitXorAgg | | Yes | | -| BoolAnd | `bool_and` | Yes | | -| BoolOr | `bool_or` | Yes | | -| Corr | | Yes | | -| Count | | Yes | | -| CovPopulation | | Yes | | -| CovSample | | Yes | | -| First | | No | This function is not deterministic. Results may not match Spark. | -| Last | | No | This function is not deterministic. Results may not match Spark. | -| Max | | Yes | | -| Min | | Yes | | -| StddevPop | | Yes | | -| StddevSamp | | Yes | | -| Sum | | Yes, except for ANSI mode | | -| VariancePop | | Yes | | -| VarianceSamp | | Yes | | +| Expression | SQL | Spark-Compatible? | Compatibility Notes | +| ------------- | ---------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------- | +| Average | | Yes, except for ANSI mode | | +| BitAndAgg | | Yes | | +| BitOrAgg | | Yes | | +| BitXorAgg | | Yes | | +| BoolAnd | `bool_and` | Yes | | +| BoolOr | `bool_or` | Yes | | +| Corr | | No | Returns null instead of NaN in some edge cases ([#2646](https://github.com/apache/datafusion-comet/issues/2646)) | +| Count | | Yes | | +| CovPopulation | | Yes | | +| CovSample | | Yes | | +| First | | No | This function is not deterministic. Results may not match Spark. | +| Last | | No | This function is not deterministic. Results may not match Spark. | +| Max | | Yes | | +| Min | | Yes | | +| StddevPop | | Yes | | +| StddevSamp | | Yes | | +| Sum | | Yes, except for ANSI mode | | +| VariancePop | | Yes | | +| VarianceSamp | | Yes | | ## Window Functions @@ -233,7 +233,7 @@ Comet supports using the following aggregate functions within window contexts wi | -------------- | ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | ArrayAppend | No | | | ArrayCompact | No | | -| ArrayContains | Yes | | +| ArrayContains | No | Returns null instead of false for empty arrays with literal values ([#3346](https://github.com/apache/datafusion-comet/issues/3346)) | | ArrayDistinct | No | Behaves differently than spark. Comet first sorts then removes duplicates while Spark preserves the original order. | | ArrayExcept | No | | | ArrayFilter | Yes | Only supports case where function is `IsNotNull` | @@ -242,14 +242,14 @@ Comet supports using the following aggregate functions within window contexts wi | ArrayJoin | No | | | ArrayMax | Yes | | | ArrayMin | Yes | | -| ArrayRemove | Yes | | +| ArrayRemove | No | Returns null when element is null instead of removing null elements ([#3173](https://github.com/apache/datafusion-comet/issues/3173)) | | ArrayRepeat | No | | | ArrayUnion | No | Behaves differently than spark. Comet sorts the input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. | | ArraysOverlap | No | | | CreateArray | Yes | | | ElementAt | Yes | Input must be an array. Map inputs are not supported. | | Flatten | Yes | | -| GetArrayItem | Yes | | +| GetArrayItem | No | Known correctness issues with index handling ([#3330](https://github.com/apache/datafusion-comet/issues/3330), [#3332](https://github.com/apache/datafusion-comet/issues/3332)) | ## Map Expressions @@ -263,13 +263,13 @@ Comet supports using the following aggregate functions within window contexts wi ## Struct Expressions -| Expression | Spark-Compatible? | Compatibility Notes | -| -------------------- | ----------------- | ------------------------------------------ | -| CreateNamedStruct | Yes | | -| GetArrayStructFields | Yes | | -| GetStructField | Yes | | -| JsonToStructs | No | Partial support. Requires explicit schema. | -| StructsToJson | Yes | | +| Expression | Spark-Compatible? | Compatibility Notes | +| -------------------- | ----------------- | ----------------------------------------------------------------------------------------------------------------------- | +| CreateNamedStruct | Yes | | +| GetArrayStructFields | Yes | | +| GetStructField | Yes | | +| JsonToStructs | No | Partial support. Requires explicit schema. | +| StructsToJson | No | Does not support Infinity/-Infinity for numeric types ([#3016](https://github.com/apache/datafusion-comet/issues/3016)) | ## Conversion Expressions diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0b9dadd35a..8c39ba779d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -116,7 +116,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Sinh] -> CometScalarFunction("sinh"), classOf[Sqrt] -> CometScalarFunction("sqrt"), classOf[Subtract] -> CometSubtract, - classOf[Tan] -> CometScalarFunction("tan"), + classOf[Tan] -> CometTan, classOf[Tanh] -> CometScalarFunction("tanh"), classOf[Cot] -> CometScalarFunction("cot"), classOf[UnaryMinus] -> CometUnaryMinus, diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index 8e58c08740..1485589b46 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -584,6 +584,13 @@ object CometStddevPop extends CometAggregateExpressionSerde[StddevPop] with Come } object CometCorr extends CometAggregateExpressionSerde[Corr] { + + override def getSupportLevel(expr: Corr): SupportLevel = + Incompatible( + Some( + "Returns null instead of NaN in some edge cases" + + " (https://github.com/apache/datafusion-comet/issues/2646)")) + override def convert( aggExpr: AggregateExpression, corr: Corr, diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 79e995f2ee..c82018fe6d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -35,6 +35,12 @@ object CometArrayRemove with CometExprShim with ArraysBase { + override def getSupportLevel(expr: ArrayRemove): SupportLevel = + Incompatible( + Some( + "Returns null when element is null instead of removing null elements" + + " (https://github.com/apache/datafusion-comet/issues/3173)")) + override def convert( expr: ArrayRemove, inputs: Seq[Attribute], @@ -131,6 +137,13 @@ object CometArrayAppend extends CometExpressionSerde[ArrayAppend] { } object CometArrayContains extends CometExpressionSerde[ArrayContains] { + + override def getSupportLevel(expr: ArrayContains): SupportLevel = + Incompatible( + Some( + "Returns null instead of false for empty arrays with literal values" + + " (https://github.com/apache/datafusion-comet/issues/3346)")) + override def convert( expr: ArrayContains, inputs: Seq[Attribute], @@ -472,6 +485,14 @@ object CometCreateArray extends CometExpressionSerde[CreateArray] { } object CometGetArrayItem extends CometExpressionSerde[GetArrayItem] { + + override def getSupportLevel(expr: GetArrayItem): SupportLevel = + Incompatible( + Some( + "Known correctness issues with index handling" + + " (https://github.com/apache/datafusion-comet/issues/3330," + + " https://github.com/apache/datafusion-comet/issues/3332)")) + override def convert( expr: GetArrayItem, inputs: Seq[Attribute], diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index d36b6a3b40..0720f785dd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -177,6 +177,18 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF } object CometHour extends CometExpressionSerde[Hour] { + + override def getSupportLevel(expr: Hour): SupportLevel = { + if (expr.child.dataType.typeName == "timestamp_ntz") { + Incompatible( + Some( + "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)")) + } else { + Compatible() + } + } + override def convert( expr: Hour, inputs: Seq[Attribute], @@ -203,6 +215,18 @@ object CometHour extends CometExpressionSerde[Hour] { } object CometMinute extends CometExpressionSerde[Minute] { + + override def getSupportLevel(expr: Minute): SupportLevel = { + if (expr.child.dataType.typeName == "timestamp_ntz") { + Incompatible( + Some( + "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)")) + } else { + Compatible() + } + } + override def convert( expr: Minute, inputs: Seq[Attribute], @@ -229,6 +253,18 @@ object CometMinute extends CometExpressionSerde[Minute] { } object CometSecond extends CometExpressionSerde[Second] { + + override def getSupportLevel(expr: Second): SupportLevel = { + if (expr.child.dataType.typeName == "timestamp_ntz") { + Incompatible( + Some( + "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)")) + } else { + Compatible() + } + } + override def convert( expr: Second, inputs: Seq[Attribute], @@ -402,10 +438,19 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] { "microsecond") override def getSupportLevel(expr: TruncTimestamp): SupportLevel = { + val timezone = expr.timeZoneId.getOrElse("UTC") + val isUtc = timezone == "UTC" || timezone == "Etc/UTC" expr.format match { case Literal(fmt: UTF8String, _) => if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) { - Compatible() + if (isUtc) { + Compatible() + } else { + Incompatible( + Some( + s"Incorrect results in non-UTC timezone '$timezone'" + + " (https://github.com/apache/datafusion-comet/issues/2649)")) + } } else { Unsupported(Some(s"Format $fmt is not supported")) } diff --git a/spark/src/main/scala/org/apache/comet/serde/math.scala b/spark/src/main/scala/org/apache/comet/serde/math.scala index 68b6e8d11e..5a0393142a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/math.scala +++ b/spark/src/main/scala/org/apache/comet/serde/math.scala @@ -19,7 +19,7 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.{Abs, Atan2, Attribute, Ceil, CheckOverflow, Expression, Floor, Hex, If, LessThanOrEqual, Literal, Log, Log10, Log2, Unhex} +import org.apache.spark.sql.catalyst.expressions.{Abs, Atan2, Attribute, Ceil, CheckOverflow, Expression, Floor, Hex, If, LessThanOrEqual, Literal, Log, Log10, Log2, Tan, Unhex} import org.apache.spark.sql.types.{DecimalType, NumericType} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -38,6 +38,18 @@ object CometAtan2 extends CometExpressionSerde[Atan2] { } object CometCeil extends CometExpressionSerde[Ceil] { + + override def getSupportLevel(expr: Ceil): SupportLevel = { + expr.child.dataType match { + case _: DecimalType => + Incompatible( + Some( + "Incorrect results for Decimal type inputs" + + " (https://github.com/apache/datafusion-comet/issues/1729)")) + case _ => Compatible() + } + } + override def convert( expr: Ceil, inputs: Seq[Attribute], @@ -58,6 +70,18 @@ object CometCeil extends CometExpressionSerde[Ceil] { } object CometFloor extends CometExpressionSerde[Floor] { + + override def getSupportLevel(expr: Floor): SupportLevel = { + expr.child.dataType match { + case _: DecimalType => + Incompatible( + Some( + "Incorrect results for Decimal type inputs" + + " (https://github.com/apache/datafusion-comet/issues/1729)")) + case _ => Compatible() + } + } + override def convert( expr: Floor, inputs: Seq[Attribute], @@ -174,6 +198,24 @@ object CometAbs extends CometExpressionSerde[Abs] with MathExprBase { } } +object CometTan extends CometExpressionSerde[Tan] { + + override def getSupportLevel(expr: Tan): SupportLevel = + Incompatible( + Some( + "tan(-0.0) produces incorrect result" + + " (https://github.com/apache/datafusion-comet/issues/1897)")) + + override def convert( + expr: Tan, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = expr.children.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = scalarFunctionExprToProto("tan", childExpr: _*) + optExprWithInfo(optExpr, expr, expr.children: _*) + } +} + sealed trait MathExprBase { protected def nullIfNegative(expression: Expression): Expression = { val zero = Literal.default(expression.dataType) diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index d9d83f6594..449d0fc5b9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -105,6 +105,12 @@ object CometGetArrayStructFields extends CometExpressionSerde[GetArrayStructFiel object CometStructsToJson extends CometExpressionSerde[StructsToJson] { + override def getSupportLevel(expr: StructsToJson): SupportLevel = + Incompatible( + Some( + "Does not support Infinity/-Infinity for numeric types" + + " (https://github.com/apache/datafusion-comet/issues/3016)")) + override def convert( expr: StructsToJson, inputs: Seq[Attribute], diff --git a/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql b/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql index 9d11ba0ca3..4231da316c 100644 --- a/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql +++ b/spark/src/test/resources/sql-tests/expressions/aggregate/corr.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.Corr.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql b/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql index f91089d554..aead1fa44d 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/array_remove.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.ArrayRemove.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql b/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql index 1105d014f3..6618665431 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/trunc_timestamp.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.TruncTimestamp.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql index 7dc3ce436d..a70f4a1bde 100644 --- a/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql +++ b/spark/src/test/resources/sql-tests/expressions/map/map_contains_key.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.ArrayContains.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true -- TODO: replace map_from_arrays with map whenever map is supported in Comet diff --git a/spark/src/test/resources/sql-tests/expressions/math/ceil.sql b/spark/src/test/resources/sql-tests/expressions/math/ceil.sql index fade75d28a..c11c42bedb 100644 --- a/spark/src/test/resources/sql-tests/expressions/math/ceil.sql +++ b/spark/src/test/resources/sql-tests/expressions/math/ceil.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.Ceil.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/resources/sql-tests/expressions/math/floor.sql b/spark/src/test/resources/sql-tests/expressions/math/floor.sql index 3960002846..62fa2a4045 100644 --- a/spark/src/test/resources/sql-tests/expressions/math/floor.sql +++ b/spark/src/test/resources/sql-tests/expressions/math/floor.sql @@ -15,6 +15,7 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.Floor.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 65b2c85379..fb5531a573 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion} +import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -37,50 +38,57 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOpti class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("array_remove - integer") { - Seq(true, false).foreach { dictionaryEnabled => - withTempView("t1") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator( - sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) - checkSparkAnswerAndOperator( - sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayRemove]) -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempView("t1") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null")) + checkSparkAnswerAndOperator( + sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1")) + } } } } } test("array_remove - test all types (native Parquet reader)") { - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), - DataGenOptions(allowNull = true, generateNegativeZero = true)) - } - val table = spark.read.parquet(filename) - table.createOrReplaceTempView("t1") - // test with array of each column - val fieldNames = - table.schema.fields - .filter(field => CometArrayRemove.isTypeSupported(field.dataType)) - .map(_.name) - for (fieldName <- fieldNames) { - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t2") - val df = sql("SELECT array_remove(a, b) FROM t2") - checkSparkAnswerAndOperator(df) + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayRemove]) -> "true") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions( + generateArray = false, + generateStruct = false, + generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) + } + val table = spark.read.parquet(filename) + table.createOrReplaceTempView("t1") + // test with array of each column + val fieldNames = + table.schema.fields + .filter(field => CometArrayRemove.isTypeSupported(field.dataType)) + .map(_.name) + for (fieldName <- fieldNames) { + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t2") + val df = sql("SELECT array_remove(a, b) FROM t2") + checkSparkAnswerAndOperator(df) + } } } } @@ -129,7 +137,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp sql("SELECT array(struct(_1, _2)) as a, struct(_1, _2) as b FROM t1") .createOrReplaceTempView("t2") val expectedFallbackReason = - "data type not supported: ArrayType(StructType(StructField(_1,BooleanType,true),StructField(_2,ByteType,true)),false)" + "is not fully compatible with Spark" checkSparkAnswerAndFallbackReason( sql("SELECT array_remove(a, b) FROM t2"), expectedFallbackReason) @@ -245,54 +253,59 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_contains - int values") { - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n = 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayContains]) -> "true") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql( + "SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + } } } } test("array_contains - test all types (native Parquet reader)") { - withTempDir { dir => - withTempView("t1", "t2", "t3") { - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), - DataGenOptions(allowNull = true, generateNegativeZero = true)) - } - val table = spark.read.parquet(filename) - table.createOrReplaceTempView("t1") - val complexTypeFields = - table.schema.fields.filter(field => isComplexType(field.dataType)) - val primitiveTypeFields = - table.schema.fields.filterNot(field => isComplexType(field.dataType)) - for (field <- primitiveTypeFields) { - val fieldName = field.name - val typeName = field.dataType.typeName - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t2") - checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) - checkSparkAnswerAndOperator( - sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) - } - for (field <- complexTypeFields) { - val fieldName = field.name - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t3") - checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3")) + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayContains]) -> "true") { + withTempDir { dir => + withTempView("t1", "t2", "t3") { + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) + } + val table = spark.read.parquet(filename) + table.createOrReplaceTempView("t1") + val complexTypeFields = + table.schema.fields.filter(field => isComplexType(field.dataType)) + val primitiveTypeFields = + table.schema.fields.filterNot(field => isComplexType(field.dataType)) + for (field <- primitiveTypeFields) { + val fieldName = field.name + val typeName = field.dataType.typeName + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t2") + checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) + checkSparkAnswerAndOperator( + sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) + } + for (field <- complexTypeFields) { + val fieldName = field.name + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t3") + checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3")) + } } } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 570db1795c..eeaf1ed911 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Ceil, Floor, FromUnixTime, GetArrayItem, Literal, StructsToJson, Tan, TruncDate, TruncTimestamp} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps import org.apache.spark.sql.comet.CometProjectExec import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} @@ -666,34 +666,36 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "timetbl") { - Seq( - "YEAR", - "YYYY", - "YY", - "MON", - "MONTH", - "MM", - "QUARTER", - "WEEK", - "DAY", - "DD", - "HOUR", - "MINUTE", - "SECOND", - "MILLISECOND", - "MICROSECOND").foreach { format => - checkSparkAnswerAndOperator( - "SELECT " + - s"date_trunc('$format', _0), " + - s"date_trunc('$format', _1), " + - s"date_trunc('$format', _2), " + - s"date_trunc('$format', _4) " + - " from timetbl") + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[TruncTimestamp]) -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "timetbl") { + Seq( + "YEAR", + "YYYY", + "YY", + "MON", + "MONTH", + "MM", + "QUARTER", + "WEEK", + "DAY", + "DD", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND").foreach { format => + checkSparkAnswerAndOperator( + "SELECT " + + s"date_trunc('$format', _0), " + + s"date_trunc('$format', _1), " + + s"date_trunc('$format', _2), " + + s"date_trunc('$format', _4) " + + " from timetbl") + } } } } @@ -701,7 +703,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with timestamp_ntz") { - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true") { + withSQLConf( + CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true", + CometConf.getExprAllowIncompatConfigKey(classOf[TruncTimestamp]) -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -770,7 +774,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(false, true).foreach { conversionEnabled => withSQLConf( SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { + SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString, + CometConf.getExprAllowIncompatConfigKey(classOf[TruncTimestamp]) -> "true") { withTempPath { path => Seq .tabulate(N)(_ => ts) @@ -1312,39 +1317,41 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("various math scalar functions") { val data = doubleValues.map(n => (n, n)) - withParquetTable(data, "tbl") { - // expressions with single arg - for (expr <- Seq( - "acos", - "asin", - "atan", - "cos", - "cosh", - "exp", - "ln", - "log10", - "log2", - "sin", - "sinh", - "sqrt", - "tan", - "tanh", - "cot")) { - val (_, cometPlan) = - checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1), $expr(_2) FROM tbl")) - val cometProjectExecs = collect(cometPlan) { case op: CometProjectExec => - op - } - assert(cometProjectExecs.length == 1, expr) - } - // expressions with two args - for (expr <- Seq("atan2", "pow")) { - val (_, cometPlan) = - checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1, _2) FROM tbl")) - val cometProjectExecs = collect(cometPlan) { case op: CometProjectExec => - op - } - assert(cometProjectExecs.length == 1, expr) + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[Tan]) -> "true") { + withParquetTable(data, "tbl") { + // expressions with single arg + for (expr <- Seq( + "acos", + "asin", + "atan", + "cos", + "cosh", + "exp", + "ln", + "log10", + "log2", + "sin", + "sinh", + "sqrt", + "tan", + "tanh", + "cot")) { + val (_, cometPlan) = + checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1), $expr(_2) FROM tbl")) + val cometProjectExecs = collect(cometPlan) { case op: CometProjectExec => + op + } + assert(cometProjectExecs.length == 1, expr) + } + // expressions with two args + for (expr <- Seq("atan2", "pow")) { + val (_, cometPlan) = + checkSparkAnswerAndOperatorWithTol(sql(s"SELECT $expr(_1, _2) FROM tbl")) + val cometProjectExecs = collect(cometPlan) { case op: CometProjectExec => + op + } + assert(cometProjectExecs.length == 1, expr) + } } } } @@ -1408,7 +1415,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ceil and floor") { Seq("true", "false").foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary) { + withSQLConf( + "parquet.enable.dictionary" -> dictionary, + CometConf.getExprAllowIncompatConfigKey(classOf[Ceil]) -> "true", + CometConf.getExprAllowIncompatConfigKey(classOf[Floor]) -> "true") { withParquetTable( (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), "tbl", @@ -2199,24 +2209,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("to_json") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) - Seq(true, false).foreach { dictionaryEnabled => - withParquetTable( - (0 until 100).map(i => { - val str = if (i % 2 == 0) { - "even" - } else { - "odd" - } - (i.toByte, i.toShort, i, i.toLong, i * 1.2f, -i * 1.2d, str, i.toString) - }), - "tbl", - withDictionary = dictionaryEnabled) { + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withParquetTable( + (0 until 100).map(i => { + val str = if (i % 2 == 0) { + "even" + } else { + "odd" + } + (i.toByte, i.toShort, i, i.toLong, i * 1.2f, -i * 1.2d, str, i.toString) + }), + "tbl", + withDictionary = dictionaryEnabled) { - val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ") + val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ") - checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") - checkSparkAnswerAndOperator( - s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") + checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") + checkSparkAnswerAndOperator( + s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") + } } } } @@ -2224,28 +2236,30 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("to_json escaping of field names and string values") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) - val gen = new DataGenerator(new Random(42)) - val chars = "\\'\"abc\t\r\n\f\b" - Seq(true, false).foreach { dictionaryEnabled => - withParquetTable( - (0 until 100).map(i => { - val str1 = gen.generateString(chars, 8) - val str2 = gen.generateString(chars, 8) - (i.toString, str1, str2) - }), - "tbl", - withDictionary = dictionaryEnabled) { + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { + val gen = new DataGenerator(new Random(42)) + val chars = "\\'\"abc\t\r\n\f\b" + Seq(true, false).foreach { dictionaryEnabled => + withParquetTable( + (0 until 100).map(i => { + val str1 = gen.generateString(chars, 8) + val str2 = gen.generateString(chars, 8) + (i.toString, str1, str2) + }), + "tbl", + withDictionary = dictionaryEnabled) { - val fields = Range(1, 3) - .map(n => { - val columnName = s"""column "$n"""" - s"'$columnName', _$n" - }) - .mkString(", ") + val fields = Range(1, 3) + .map(n => { + val columnName = s"""column "$n"""" + s"'$columnName', _$n" + }) + .mkString(", ") - checkSparkAnswerAndOperator( - """SELECT 'column "1"' x, """ + - s"to_json(named_struct($fields)) FROM tbl ORDER BY x") + checkSparkAnswerAndOperator( + """SELECT 'column "1"' x, """ + + s"to_json(named_struct($fields)) FROM tbl ORDER BY x") + } } } } @@ -2253,24 +2267,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("to_json unicode") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) - Seq(true, false).foreach { dictionaryEnabled => - withParquetTable( - (0 until 100).map(i => { - (i.toString, "\uD83E\uDD11", "\u018F") - }), - "tbl", - withDictionary = dictionaryEnabled) { + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withParquetTable( + (0 until 100).map(i => { + (i.toString, "\uD83E\uDD11", "\u018F") + }), + "tbl", + withDictionary = dictionaryEnabled) { - val fields = Range(1, 3) - .map(n => { - val columnName = s"""column "$n"""" - s"'$columnName', _$n" - }) - .mkString(", ") + val fields = Range(1, 3) + .map(n => { + val columnName = s"""column "$n"""" + s"'$columnName', _$n" + }) + .mkString(", ") - checkSparkAnswerAndOperator( - """SELECT 'column "1"' x, """ + - s"to_json(named_struct($fields)) FROM tbl ORDER BY x") + checkSparkAnswerAndOperator( + """SELECT 'column "1"' x, """ + + s"to_json(named_struct($fields)) FROM tbl ORDER BY x") + } } } } @@ -2571,7 +2587,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( SQLConf.ANSI_ENABLED.key -> ansiEnabled.toString(), // Prevent the optimizer from collapsing an extract value of a create array - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> SimplifyExtractValueOps.ruleName) { + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> SimplifyExtractValueOps.ruleName, + CometConf.getExprAllowIncompatConfigKey(classOf[GetArrayItem]) -> "true") { val df = spark.read.parquet(path.toString) val stringArray = df.select(array(col("_8"), col("_8"), lit(null)).alias("arr")) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 14b5dc3092..9426d1c848 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.expressions.aggregate.Corr import org.apache.spark.sql.catalyst.optimizer.EliminateSorts import org.apache.spark.sql.comet.CometHashAggregateExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1319,7 +1320,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("covariance & correlation") { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.getExprAllowIncompatConfigKey(classOf[Corr]) -> "true") { Seq("jvm", "native").foreach { cometShuffleMode => withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) { Seq(true, false).foreach { dictionary => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 84a326c790..05e82bdfb5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.comet.CometConf class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper { + import org.apache.spark.sql.catalyst.expressions.GetArrayItem + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scan => @@ -40,7 +42,8 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) { + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan, + CometConf.getExprAllowIncompatConfigKey(classOf[GetArrayItem]) -> "true") { testFun } }) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index b9caa94308..09a2308e35 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -35,6 +35,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.schema.MessageTypeParser import org.apache.spark.SparkException import org.apache.spark.sql.{CometTestBase, DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions.GetArrayItem import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1505,7 +1506,9 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { withParquetTable(path.toUri.toString, "complex_types") { Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach( scanMode => { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode, + CometConf.getExprAllowIncompatConfigKey(classOf[GetArrayItem]) -> "true") { checkSparkAnswerAndOperator(sql("select * from complex_types")) // First level checkSparkAnswerAndOperator(sql(