From 765b7a9e4cea0262d11235df30a374662077bac6 Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 19 May 2026 06:06:55 +0000 Subject: [PATCH 1/3] [GLUTEN-8629][VL] Add RDDScanExec support to Velox backend Implements VeloxRDDScanTransformer to offload RDDScanExec to Velox's native row-to-columnar conversion path. This enables columnar execution for DataFrames backed by LogicalRDD (e.g., from df.checkpoint(), df.localCheckpoint(), or programmatic RDD creation). Key design: - Validates schema via VeloxValidatorApi.validateSchema (recursive) plus Arrow compatibility checks (rejects MapType, interval types) - Handles BatchCarrierRow (from checkpoint) by unwrapping directly - Standard InternalRow path delegates to RowToVeloxColumnarExec.toColumnarBatchIterator - Preserves original partitioning and ordering from the source RDDScanExec - Unsupported schemas fall back gracefully to vanilla Spark Test coverage: 13 tests covering basic types, complex types, aggregation, empty RDD, nulls, fallback scenarios, and BatchCarrierRow from checkpoint. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../velox/VeloxSparkPlanExecApi.scala | 5 + .../execution/VeloxRDDScanTransformer.scala | 148 ++++++++++ .../gluten/execution/MiscOperatorSuite.scala | 5 +- .../gluten/execution/VeloxRDDScanSuite.scala | 265 ++++++++++++++++++ 4 files changed, 421 insertions(+), 2 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala create mode 100644 backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index b7a1e172b2c..416b4df01da 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -1205,6 +1205,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec = ColumnarRangeExec(rangeExec.range) + override def isSupportRDDScanExec(plan: RDDScanExec): Boolean = true + + override def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer = + VeloxRDDScanTransformer.replace(plan) + override def genColumnarTailExec(limit: Int, child: SparkPlan): ColumnarCollectTailBaseExec = ColumnarCollectTailExec(limit, child) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala new file mode 100644 index 00000000000..6fb3cd971d3 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.velox.VeloxValidatorApi +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of RDDScanTransformer. + * + * Converts an RDD[InternalRow] into columnar batches using Velox's native row-to-columnar + * conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + // Row-to-columnar conversion preserves data distribution, so we carry through + // the original partitioning. This differs from CH which uses UnknownPartitioning(0) + // but is consistent with RowToVeloxColumnarExec's behavior. + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) + with Logging { + + override def nodeName: String = name + + @transient override lazy val metrics: Map[String, SQLMetric] = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert") + ) + + override protected def doValidateInternal(): ValidationResult = { + for (field <- schema.fields) { + val reason = VeloxValidatorApi.validateSchema(field.dataType) + if (reason.isDefined) { + return ValidationResult.failed(reason.get) + } + val arrowReason = validateArrowCompatibility(field.dataType) + if (arrowReason.isDefined) { + return ValidationResult.failed(arrowReason.get) + } + } + + ValidationResult.succeeded + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numInputRows") + val numOutputBatches = longMetric("numOutputBatches") + val convertTime = longMetric("convertTime") + val localSchema = this.schema + val batchSize = GlutenConfig.get.maxBatchSize + val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes + rdd.mapPartitions { + iter => + if (iter.hasNext) { + val first = iter.next() + first match { + case _: BatchCarrierRow => + // RDD already contains columnar batches wrapped as carrier rows + // (e.g., from df.checkpoint() on a Gluten plan). Unwrap directly. + (Iterator.single(first) ++ iter).flatMap { + row => + BatchCarrierRow.unwrap(row).map { + batch => + numOutputBatches += 1 + numInputRows += batch.numRows() + batch + } + } + case _ => + // Standard InternalRow path - convert via native row-to-columnar. + RowToVeloxColumnarExec.toColumnarBatchIterator( + Iterator.single(first) ++ iter, + localSchema, + numInputRows, + numOutputBatches, + convertTime, + batchSize, + batchBytes) + } + } else { + Iterator.empty + } + } + } + + /** + * Additional validation for Arrow export compatibility. The RDDScan path transfers data via + * Arrow ABI, which has stricter constraints than Velox's type system: + * - Map types can trigger "Map data key type should be a non-nullable" in Arrow export + * - Interval types are not supported by ArrowWritableColumnVector + */ + private def validateArrowCompatibility(dataType: DataType): Option[String] = { + dataType match { + case _: MapType => + Some(s"Map type is not supported in RDDScan Arrow export path: $dataType") + case _: YearMonthIntervalType | _: DayTimeIntervalType | CalendarIntervalType => + Some(s"Interval type is not supported in Arrow export: $dataType") + case struct: StructType => + struct.fields.flatMap(f => validateArrowCompatibility(f.dataType)).headOption + case array: ArrayType => + validateArrowCompatibility(array.elementType) + case _ => None + } + } + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = { + assert(newChildren.isEmpty, "VeloxRDDScanTransformer is a leaf node") + copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) + } +} + +object VeloxRDDScanTransformer { + def replace(plan: org.apache.spark.sql.execution.RDDScanExec): RDDScanTransformer = + VeloxRDDScanTransformer( + plan.output, + plan.inputRDD, + plan.nodeName, + plan.outputPartitioning, + plan.outputOrdering) +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index fec55eff09e..1968da34b0d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -805,10 +805,11 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa if (isSparkVersionGE("4.1")) { assert(plan.find(_.getClass.getSimpleName == "OneRowRelationExec").isDefined) } else { - assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined) + // RDDScanExec is offloaded to VeloxRDDScanTransformer which produces columnar + // output directly, so no RowToVeloxColumnarExec is needed. + assert(plan.find(_.isInstanceOf[VeloxRDDScanTransformer]).isDefined) } assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined) - assert(plan.find(_.isInstanceOf[RowToVeloxColumnarExec]).isDefined) } test("equal null safe") { diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala new file mode 100644 index 00000000000..41131614a49 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.classic.{ClassicDataset, ClassicTypes} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +class VeloxRDDScanSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { + + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.ansi.enabled", "false") + } + + override def beforeAll(): Unit = { + super.beforeAll() + createTPCHNotNullTables() + } + + /** Creates a DataFrame backed by LogicalRDD/RDDScanExec from an existing DataFrame. */ + private def asRDDScanDF(data: DataFrame): DataFrame = { + val node = LogicalRDD(data.queryExecution.logical.output, data.queryExecution.toRdd)( + data.sparkSession.asInstanceOf[ClassicTypes.ClassicSparkSession]) + ClassicDataset.ofRows(spark, node).toDF() + } + + test("basic RDDScanExec is replaced by VeloxRDDScanTransformer") { + val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 10") + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with string and numeric types") { + val data = spark.sql("""SELECT l_returnflag, l_linestatus, l_quantity, l_extendedprice + |FROM lineitem LIMIT 20""".stripMargin) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with aggregation downstream") { + val query = + """SELECT l_returnflag, sum(l_quantity) AS sum_qty + |FROM lineitem + |WHERE l_shipdate <= date'1998-09-02' + |GROUP BY l_returnflag""".stripMargin + val data = spark.sql(query) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with empty RDD") { + val data = spark.sql("SELECT l_orderkey FROM lineitem WHERE 1 = 0") + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + assert(df.count() == 0) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan preserves data correctness with multiple re-reads") { + val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 50") + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + // Read twice to verify idempotency + checkAnswer(df, expectedAnswer) + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with null values") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(1, "a", null), + Row(null, "b", 2.0), + Row(3, null, 3.0) + )) + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = true), + StructField("name", StringType, nullable = true), + StructField("value", DoubleType, nullable = true) + )) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with all supported primitive types") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row( + true, + 1.toByte, + 2.toShort, + 3, + 4L, + 5.0f, + 6.0, + "hello", + java.sql.Date.valueOf("2024-01-01"), + java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), + Array[Byte](1, 2, 3), + BigDecimal("123.45").underlying() + ) + )) + val schema = StructType( + Seq( + StructField("bool", BooleanType), + StructField("byte", ByteType), + StructField("short", ShortType), + StructField("int", IntegerType), + StructField("long", LongType), + StructField("float", FloatType), + StructField("double", DoubleType), + StructField("string", StringType), + StructField("date", DateType), + StructField("timestamp", TimestampType), + StructField("binary", BinaryType), + StructField("decimal", DecimalType(10, 2)) + )) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with array type") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(Seq(1, 2, 3)), + Row(Seq(4, 5)) + )) + val schema = StructType(Seq(StructField("arr", ArrayType(IntegerType)))) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with map type falls back to row-based") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(Map("a" -> 1, "b" -> 2)), + Row(Map("c" -> 3)) + )) + val schema = StructType(Seq(StructField("m", MapType(StringType, IntegerType)))) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + // MapType is not supported in Arrow export, so falls back to row-based processing + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.isEmpty, "MapType schema should fall back from VeloxRDDScanTransformer") + } + + test("RDDScan with struct type") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(Row("hello", 1)), + Row(Row("world", 2)) + )) + val innerSchema = + StructType(Seq(StructField("name", StringType), StructField("value", IntegerType))) + val schema = StructType(Seq(StructField("s", innerSchema))) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan falls back for unsupported types") { + val data = spark.sql("SELECT INTERVAL '1' DAY AS di") + val expectedAnswer = data.collect() + val result = asRDDScanDF(data) + + // Should still produce correct results via fallback to vanilla Spark + checkAnswer(result, expectedAnswer) + val cnt = collect(result.queryExecution.executedPlan) { + case _: VeloxRDDScanTransformer => true + } + assert(cnt.isEmpty, "Expected fallback - VeloxRDDScanTransformer should NOT be in plan") + } + + test("RDDScan handles BatchCarrierRow from checkpoint") { + val tempDir = Utils.createTempDir() + try { + spark.sparkContext.setCheckpointDir(tempDir.getAbsolutePath) + val df = spark.range(100).selectExpr("id", "id * 2 as value") + val checkpointed = df.localCheckpoint() + val result = asRDDScanDF(checkpointed) + + checkAnswer(result, df.collect()) + val cnt = collect(result.queryExecution.executedPlan) { + case _: VeloxRDDScanTransformer => true + } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("falls back for schemas with interval types") { + val df = spark.sql("SELECT INTERVAL '1' YEAR as y") + val rddDf = asRDDScanDF(df) + checkAnswer(rddDf, df.collect()) + // Should NOT use VeloxRDDScanTransformer (falls back due to interval type) + val veloxScans = collect(rddDf.queryExecution.executedPlan) { + case _: VeloxRDDScanTransformer => true + } + assert(veloxScans.isEmpty, "Interval type schema should fall back from VeloxRDDScanTransformer") + } +} From f6a96ffc3be5d8db179a748ac370273af622de08 Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 19 May 2026 08:35:35 +0000 Subject: [PATCH 2/3] Fix scalafmt formatting in VeloxRDDScanTransformer scaladoc Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../apache/gluten/execution/VeloxRDDScanTransformer.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala index 6fb3cd971d3..15432174d02 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala @@ -112,10 +112,10 @@ case class VeloxRDDScanTransformer( } /** - * Additional validation for Arrow export compatibility. The RDDScan path transfers data via - * Arrow ABI, which has stricter constraints than Velox's type system: - * - Map types can trigger "Map data key type should be a non-nullable" in Arrow export - * - Interval types are not supported by ArrowWritableColumnVector + * Additional validation for Arrow export compatibility. The RDDScan path transfers data via Arrow + * ABI, which has stricter constraints than Velox's type system: + * - Map types can trigger "Map data key type should be a non-nullable" in Arrow export + * - Interval types are not supported by ArrowWritableColumnVector */ private def validateArrowCompatibility(dataType: DataType): Option[String] = { dataType match { From 193ac359f5679abb9e5f6c781de834ccb53005c3 Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 19 May 2026 12:30:09 +0000 Subject: [PATCH 3/3] Fix VeloxRDDScanSuite: use analyzed.output for resolved attributes The logical plan's output contains unresolved attributes which cause AnalysisException when creating a new DataFrame from LogicalRDD. Use analyzed.output to get fully resolved attributes with proper types and expression IDs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala index 41131614a49..ff07923e5c1 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala @@ -42,7 +42,7 @@ class VeloxRDDScanSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa /** Creates a DataFrame backed by LogicalRDD/RDDScanExec from an existing DataFrame. */ private def asRDDScanDF(data: DataFrame): DataFrame = { - val node = LogicalRDD(data.queryExecution.logical.output, data.queryExecution.toRdd)( + val node = LogicalRDD(data.queryExecution.analyzed.output, data.queryExecution.toRdd)( data.sparkSession.asInstanceOf[ClassicTypes.ClassicSparkSession]) ClassicDataset.ofRows(spark, node).toDF() }