Skip to content

Commit 9df2e94

Browse files
author
Mariam-Almesfer
committed
Add validation test for localtimestamp
1 parent 5b4421a commit 9df2e94

5 files changed

Lines changed: 61 additions & 2 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,18 @@ class VeloxValidatorApi extends ValidatorApi {
104104

105105
object VeloxValidatorApi {
106106
private def isPrimitiveType(dataType: DataType): Boolean = {
107+
val enableTimestampNtzValidation = VeloxConfig.get.enableTimestampNtzValidation
107108
dataType match {
108109
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
109110
StringType | BinaryType | _: DecimalType | DateType | TimestampType |
110111
YearMonthIntervalType.DEFAULT | NullType =>
111112
true
113+
case dt
114+
if !enableTimestampNtzValidation &&
115+
dt.getClass.getSimpleName == "TimestampNTZType" =>
116+
// Allow TimestampNTZ when validation is disabled (for development/testing)
117+
// Use reflection to avoid compile-time dependency on Spark 3.4+ TimestampNTZType
118+
true
112119
case _ => false
113120
}
114121
}

backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
9999

100100
def valueStreamDynamicFilterEnabled: Boolean =
101101
getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED)
102+
103+
def enableTimestampNtzValidation: Boolean = getConf(ENABLE_TIMESTAMP_NTZ_VALIDATION)
102104
}
103105

104106
object VeloxConfig extends ConfigRegistry {
@@ -751,4 +753,13 @@ object VeloxConfig extends ConfigRegistry {
751753
.doc("Maps table field names to file field names using names, not indices for Parquet files.")
752754
.booleanConf
753755
.createWithDefault(true)
756+
757+
val ENABLE_TIMESTAMP_NTZ_VALIDATION =
758+
buildConf("spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation")
759+
.doc(
760+
"Enable validation fallback for TimestampNTZ type. When true (default), any plan " +
761+
"containing TimestampNTZ will fall back to Spark execution. Set to false during " +
762+
"development/testing of TimestampNTZ support to allow native execution.")
763+
.booleanConf
764+
.createWithDefault(true)
754765
}

backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1507,4 +1507,15 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite {
15071507
}
15081508
}
15091509
}
1510+
test("test_current_timestamp") {
1511+
val df = spark.sql("SELECT l_orderkey, current_timestamp() from lineitem limit 1")
1512+
val optimizedPlan = df.queryExecution.optimizedPlan.toString()
1513+
assert(
1514+
!optimizedPlan.contains("CurrentTimestamp"),
1515+
s"Expected CurrentTimestamp to be folded to a literal, but got: $optimizedPlan"
1516+
)
1517+
checkGlutenPlan[ProjectExecTransformer](df)
1518+
checkFallbackOperators(df, 0)
1519+
df.collect()
1520+
}
15101521
}

docs/velox-configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ nav_order: 16
2727
| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. |
2828
| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waitingfor GPU available. |
2929
| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize |
30+
| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. |
3031
| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. |
3132
| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold |
3233
| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. |

gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,18 @@ object Validators {
4040
private val conf = GlutenConfig.get
4141
private val settings = BackendsApiManager.getSettings
4242

43+
// Get VeloxConfig
44+
private val veloxConf: Option[Any] = {
45+
try {
46+
val veloxConfigClass =
47+
org.apache.spark.util.Utils.classForName("org.apache.gluten.config.VeloxConfig")
48+
val getMethod = veloxConfigClass.getMethod("get")
49+
Some(getMethod.invoke(null))
50+
} catch {
51+
case _: Exception => None
52+
}
53+
}
54+
4355
/** Fails validation if a plan node was already tagged with TRANSFORM_UNSUPPORTED. */
4456
def fallbackByHint(): Validator.Builder = {
4557
builder.add(FallbackByHint)
@@ -81,7 +93,7 @@ object Validators {
8193

8294
/** Fails validation if a plan node's input or output schema contains TimestampNTZType. */
8395
def fallbackByTimestampNTZ(): Validator.Builder = {
84-
builder.add(new FallbackByTimestampNTZ())
96+
builder.add(new FallbackByTimestampNTZ(veloxConf))
8597
}
8698

8799
/**
@@ -218,8 +230,25 @@ object Validators {
218230
}
219231
}
220232

221-
private class FallbackByTimestampNTZ() extends Validator {
233+
private class FallbackByTimestampNTZ(veloxConf: Option[Any]) extends Validator {
234+
// Check if TimestampNTZ validation is enabled via VeloxConfig
235+
private val enableValidation: Boolean = veloxConf match {
236+
case Some(config) =>
237+
try {
238+
val enableMethod = config.getClass.getMethod("enableTimestampNtzValidation")
239+
enableMethod.invoke(config).asInstanceOf[Boolean]
240+
} catch {
241+
case _: Exception => true
242+
}
243+
case None => true
244+
}
245+
222246
override def validate(plan: SparkPlan): Validator.OutCome = {
247+
if (!enableValidation) {
248+
// Validation is disabled, allow TimestampNTZ
249+
return pass()
250+
}
251+
223252
def containsNTZ(dataType: DataType): Boolean = dataType match {
224253
case dt if dt.catalogString == "timestamp_ntz" => true
225254
case st: StructType => st.exists(f => containsNTZ(f.dataType))

0 commit comments

Comments
 (0)