Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ object CometArrayAppend extends CometExpressionSerde[ArrayAppend] {
val keyExprProto = exprToProto(expr.children(1), inputs, binding)

val arrayAppendScalarExpr =
scalarFunctionExprToProto("array_append", arrayExprProto, keyExprProto)
scalarFunctionExprToProtoWithReturnType(
"array_append",
ArrayType(elementType = elementType),
false,
arrayExprProto,
keyExprProto)

val isNotNullExpr = createUnaryExpr(
expr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.ArrayInsert.allowIncompatible=true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 4.0 classname is ArrayInsert, in earlier versions ArrayAppend

-- Config: spark.comet.expression.ArrayAppend.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand All @@ -23,17 +25,17 @@ CREATE TABLE test_array_append(arr array<int>, val int) USING parquet
statement
INSERT INTO test_array_append VALUES (array(1, 2, 3), 4), (array(), 1), (NULL, 1), (array(1, 2), NULL)

query spark_answer_only
query
SELECT array_append(arr, val) FROM test_array_append

-- column + literal
query spark_answer_only
query
SELECT array_append(arr, 99) FROM test_array_append

-- literal + column
query spark_answer_only
query
SELECT array_append(array(1, 2, 3), val) FROM test_array_append

-- literal + literal
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT array_append(array(1, 2, 3), 4), array_append(array(), 1), array_append(cast(NULL as array<int>), 1)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.ArrayExcept.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand All @@ -23,17 +23,17 @@ CREATE TABLE test_array_except(a array<int>, b array<int>) USING parquet
statement
INSERT INTO test_array_except VALUES (array(1, 2, 3), array(2, 3, 4)), (array(1, 2), array()), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL))

query spark_answer_only
query
SELECT array_except(a, b) FROM test_array_except

-- column + literal
query spark_answer_only
query
SELECT array_except(a, array(2, 3)) FROM test_array_except

-- literal + column
query spark_answer_only
query
SELECT array_except(array(1, 2, 3), b) FROM test_array_except

-- literal + literal
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query ignore(https://github.com/apache/datafusion-comet/issues/3646)
SELECT array_except(array(1, 2, 3), array(2, 3, 4)), array_except(array(1, 2), array()), array_except(array(), array(1)), array_except(cast(NULL as array<int>), array(1))
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.ArrayIntersect.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand All @@ -23,17 +24,17 @@ CREATE TABLE test_array_intersect(a array<int>, b array<int>) USING parquet
statement
INSERT INTO test_array_intersect VALUES (array(1, 2, 3), array(2, 3, 4)), (array(1, 2), array(3, 4)), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2))

query spark_answer_only
query
SELECT array_intersect(a, b) FROM test_array_intersect

-- column + literal
query spark_answer_only
query
SELECT array_intersect(a, array(2, 3)) FROM test_array_intersect

-- literal + column
query spark_answer_only
query
SELECT array_intersect(array(1, 2, 3), b) FROM test_array_intersect

-- literal + literal
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT array_intersect(array(1, 2, 3), array(2, 3, 4)), array_intersect(array(1, 2), array(3, 4)), array_intersect(array(), array(1)), array_intersect(cast(NULL as array<int>), array(1))
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ query spark_answer_only
SELECT array_max(arr) FROM test_array_max

-- literal arguments
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT array_max(array(1, 2, 3)), array_max(array()), array_max(cast(NULL as array<int>))
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ query spark_answer_only
SELECT array_min(arr) FROM test_array_min

-- literal arguments
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT array_min(array(1, 2, 3)), array_min(array()), array_min(cast(NULL as array<int>))
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ query spark_answer_only
SELECT array_remove(array(1, 2, 3, 2), val) FROM test_array_remove

-- literal + literal
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT array_remove(array(1, 2, 3, 2), 2), array_remove(array(1, 2, 3), 4), array_remove(array(), 1), array_remove(cast(NULL as array<int>), 1)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.ArrayUnion.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand All @@ -23,17 +24,17 @@ CREATE TABLE test_array_union(a array<int>, b array<int>) USING parquet
statement
INSERT INTO test_array_union VALUES (array(1, 2, 3), array(3, 4, 5)), (array(1, 2), array()), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2))

query spark_answer_only
query ignore(https://github.com/apache/datafusion-comet/issues/3644)
SELECT array_union(a, b) FROM test_array_union

-- column + literal
query spark_answer_only
query ignore(https://github.com/apache/datafusion-comet/issues/3644)
SELECT array_union(a, array(3, 4, 5)) FROM test_array_union

-- literal + column
query spark_answer_only
query
SELECT array_union(array(1, 2, 3), b) FROM test_array_union

-- literal + literal
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT array_union(array(1, 2, 3), array(3, 4, 5)), array_union(array(1, 2), array()), array_union(array(), array(1)), array_union(cast(NULL as array<int>), array(1))
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.ArraysOverlap.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand All @@ -23,17 +24,17 @@ CREATE TABLE test_arrays_overlap(a array<int>, b array<int>) USING parquet
statement
INSERT INTO test_arrays_overlap VALUES (array(1, 2, 3), array(3, 4, 5)), (array(1, 2), array(3, 4)), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2))

query spark_answer_only
query ignore(https://github.com/apache/datafusion-comet/issues/3645)
SELECT arrays_overlap(a, b) FROM test_arrays_overlap

-- column + literal
query spark_answer_only
query ignore(https://github.com/apache/datafusion-comet/issues/3645)
SELECT arrays_overlap(a, array(3, 4, 5)) FROM test_arrays_overlap

-- literal + column
query spark_answer_only
query
SELECT arrays_overlap(array(1, 2, 3), b) FROM test_arrays_overlap

-- literal + literal
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)), arrays_overlap(array(1, 2), array(3, 4)), arrays_overlap(array(), array(1)), arrays_overlap(cast(NULL as array<int>), array(1))
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ query spark_answer_only
SELECT size(arr), size(m) FROM test_size

-- literal arguments
query ignore(https://github.com/apache/datafusion-comet/issues/3338)
query
SELECT size(array(1, 2, 3)), size(array()), size(cast(NULL as array<int>))
71 changes: 41 additions & 30 deletions spark/src/test/scala/org/apache/comet/CometSqlFileTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,39 +83,50 @@ class CometSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTable(file.tables: _*) {
file.records.foreach {
case SqlStatement(sql, line) =>
val location = if (line > 0) s"$relativePath:$line" else relativePath
withClue(s"In SQL file $location, executing statement:\n$sql\n") {
spark.sql(sql)
try {
val location = if (line > 0) s"$relativePath:$line" else relativePath
withClue(s"In SQL file $location, executing statement:\n$sql\n") {
spark.sql(sql)
}
} catch {
case e: Exception =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output the failed SQL that causes the file to fail

throw new RuntimeException(s"Error executing SQL '$sql'", e)
}
case SqlQuery(sql, mode, line) =>
val location = if (line > 0) s"$relativePath:$line" else relativePath
withClue(s"In SQL file $location, executing query:\n$sql\n") {
mode match {
case CheckCoverageAndAnswer =>
checkSparkAnswerAndOperator(sql)
case SparkAnswerOnly =>
checkSparkAnswer(sql)
case WithTolerance(tol) =>
checkSparkAnswerWithTolerance(sql, tol)
case ExpectFallback(reason) =>
checkSparkAnswerAndFallbackReason(sql, reason)
case Ignore(reason) =>
logInfo(s"IGNORED query (${reason}): $sql")
case ExpectError(pattern) =>
val (sparkError, cometError) = checkSparkAnswerMaybeThrows(spark.sql(sql))
assert(
sparkError.isDefined,
s"Expected Spark to throw an error matching '$pattern' but query succeeded")
assert(
cometError.isDefined,
s"Expected Comet to throw an error matching '$pattern' but query succeeded")
assert(
sparkError.get.getMessage.contains(pattern),
s"Spark error '${sparkError.get.getMessage}' does not contain '$pattern'")
assert(
cometError.get.getMessage.contains(pattern),
s"Comet error '${cometError.get.getMessage}' does not contain '$pattern'")
try {
val location = if (line > 0) s"$relativePath:$line" else relativePath
withClue(s"In SQL file $location, executing query:\n$sql\n") {
mode match {
case CheckCoverageAndAnswer =>
checkSparkAnswerAndOperator(sql)
case SparkAnswerOnly =>
checkSparkAnswer(sql)
case WithTolerance(tol) =>
checkSparkAnswerWithTolerance(sql, tol)
case ExpectFallback(reason) =>
checkSparkAnswerAndFallbackReason(sql, reason)
case Ignore(reason) =>
logInfo(s"IGNORED query ($reason): $sql")
case ExpectError(pattern) =>
val (sparkError, cometError) = checkSparkAnswerMaybeThrows(spark.sql(sql))
assert(
sparkError.isDefined,
s"Expected Spark to throw an error matching '$pattern' but query succeeded")
assert(
cometError.isDefined,
s"Expected Comet to throw an error matching '$pattern' but query succeeded")
assert(
sparkError.get.getMessage.contains(pattern),
s"Spark error '${sparkError.get.getMessage}' does not contain '$pattern'")
assert(
cometError.get.getMessage.contains(pattern),
s"Comet error '${cometError.get.getMessage}' does not contain '$pattern'")
}
}

} catch {
case e: Exception =>
throw new RuntimeException(s"Error executing SQL '$sql'", e)
}
}
}
Expand Down
Loading