Skip to content

Conversation

@guixiaowen
Copy link
Contributor

Which issue does this PR close?

Closes #1994

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

How was this patch tested?

Comment on lines +27 to +31
override def isEnabled: Boolean =
getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)

def enableHiveTableScanExec: Boolean =
getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be adding these to @SparkAuronConfiguration.java?

Comment on lines +33 to +40
override def isSupported(exec: SparkPlan): Boolean =
exec match {
case e: HiveTableScanExec if enableHiveTableScanExec &&
e.relation.tableMeta.provider.isDefined &&
e.relation.tableMeta.provider.get.equals("hive") =>
true
case _ => false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly hard to distinguish when to use isSupported v/s enableHiveTableScanExec flag. Do you mind adding documentation here?

.newBuilder()
.setBaseConf(nativeFileScanConf)
.setFsResourceId(resourceId)
.addAllPruningPredicates(new java.util.ArrayList()) // not support this filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will do a full table scan here? If that's a case, do you mind creating an issue and linking it here?

@ShreyeshArangath
Copy link
Contributor

ShreyeshArangath commented Feb 10, 2026

Are there tests we can add to verify conversion behavior here?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to add native execution support for Spark HiveTableScanExec by introducing a Hive-specific conversion provider and a new native scan exec implementation, plus updating build dependencies to compile against Spark Hive classes.

Changes:

  • Add HiveConvertProvider and a new NativeHiveTableScanExec implementation intended to convert HiveTableScanExec to native ORC/Parquet scans.
  • Update spark-extension-shims-spark Maven dependencies to include Spark Hive/Catalyst APIs needed by the new shims code.
  • Refactor/import changes in AuronConverters, and add a (currently unimplemented) convertHiveTableScanExec stub.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 19 comments.

File Description
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala Import refactor; adds an unimplemented convertHiveTableScanExec method; currently introduces compile issues (missing import + stub).
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala New native Hive table scan exec; currently contains multiple compile/runtime issues (API mismatches, pattern matches, session usage, unsupported formats).
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala New ServiceLoader-based conversion provider for Hive scans; currently missing ServiceLoader registration and insufficient support checks.
spark-extension-shims-spark/pom.xml Adds Spark Hive/Catalyst dependencies, but introduces duplicate spark-hive dependency entries with conflicting scopes.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +33 to +38
override def isSupported(exec: SparkPlan): Boolean =
exec match {
case e: HiveTableScanExec if enableHiveTableScanExec &&
e.relation.tableMeta.provider.isDefined &&
e.relation.tableMeta.provider.get.equals("hive") =>
true
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSupported accepts all Hive tables with provider == "hive", but NativeHiveTableScanExec only builds native nodes for ORC/Parquet and otherwise will throw (e.g., MatchError on file format). Add an explicit format check here (or make the native exec gracefully fall back) to avoid runtime failures on non-ORC/Parquet Hive tables.

Copilot uses AI. Check for mistakes.
nativeTable.getMetadata)

@transient private lazy val nativeHadoopConf = {
val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf()
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nativeHadoopConf uses SparkSession.getActiveSession.get, which can throw if there is no active session (e.g., execution triggered outside a SQL context). Consider using the same session derivation as NativeHiveTableScanBase.broadcastedHadoopConf (Shims.get.getSqlContext(basedHiveScan).sparkSession) to avoid runtime failures.

Suggested change
val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf()
val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
val hiveConf = sparkSession.sessionState.newHadoopConf()

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +95
private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minPartitions reads SparkSession.getActiveSession.get.sparkContext multiple times. Besides the .get risk, it’s also inconsistent with other native scan implementations in this repo that pass an explicit sparkSession around. Prefer using a single sparkSession resolved from basedHiveScan and derive sparkContext from it.

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +99
private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
}

private val ignoreEmptySplits =
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoreEmptySplits also depends on SparkSession.getActiveSession.get. This should use the same non-optional session/context resolution as the rest of the execution code to avoid NoSuchElementException when there is no active session.

Suggested change
private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
}
private val ignoreEmptySplits =
SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
private val minPartitions = if (sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
sparkContext.defaultMinPartitions)
}
private val ignoreEmptySplits =
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

Copilot uses AI. Check for mistakes.
Comment on lines +244 to +248
private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]):
InputFormat[Writable, Writable] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[Writable, Writable]]
newInputFormat match {
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getInputFormat takes a mapreduce.InputFormat class (newInputClass) but returns/instantiates org.apache.hadoop.mapred.InputFormat. This signature mismatch makes the unchecked cast even riskier. Align the parameter type with the returned InputFormat type (or vice versa) so the compiler can help enforce correctness.

Copilot uses AI. Check for mistakes.
Comment on lines +206 to +213
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
arrayFilePartition.toArray
}

private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
Math.min(defaultMaxSplitBytes, openCostInBytes)
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getMaxSplitBytes currently returns min(filesMaxPartitionBytes, filesOpenCostInBytes), which can drastically shrink splits and create excessive partitions. Elsewhere in this repo (NativePaimonTableScanExec) you fork Spark’s FilePartition#maxSplitBytes logic using min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore)). Align this implementation to that logic (or call the shared helper) to avoid performance regressions.

Suggested change
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
arrayFilePartition.toArray
}
private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
Math.min(defaultMaxSplitBytes, openCostInBytes)
getMaxSplitBytes(SparkSession.getActiveSession.get, partitionedFiles)).toArray
arrayFilePartition.toArray
}
private def getMaxSplitBytes(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val totalBytes = partitionedFiles.map(_.length).sum
val parallelism = math.max(1, sparkSession.sparkContext.defaultParallelism)
val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes / parallelism
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

Copilot uses AI. Check for mistakes.
Comment on lines +26 to +32
class HiveConvertProvider extends AuronConvertProvider with Logging {
override def isEnabled: Boolean =
getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)

def enableHiveTableScanExec: Boolean =
getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false)

Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveConvertProvider is discovered via ServiceLoader (AuronConverters loads AuronConvertProvider implementations). This module currently doesn’t include a META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider entry, so the provider won’t be loaded at runtime (see thirdparty/auron-paimon for the existing pattern).

Copilot uses AI. Check for mistakes.
Comment on lines +50 to +52
def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = {
AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec))
}
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are existing query/operator validation test utilities in this module, but this new Hive scan conversion path doesn’t appear to have coverage. Add at least one suite that enables spark.auron.enable.hiveTableScanExec and verifies HiveTableScanExec is converted (and that unsupported formats don’t break execution).

Copilot uses AI. Check for mistakes.
Comment on lines +204 to +206
arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
partitionedFiles,
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...) again relies on getActiveSession.get. Use the sparkSession derived from basedHiveScan (as in NativeHiveTableScanBase) so partition planning doesn’t fail when there’s no active session.

Suggested change
arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
partitionedFiles,
getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
val sparkSession = basedHiveScan.sparkSession
arrayFilePartition += FilePartition.getFilePartitions(
sparkSession,
partitionedFiles,
getMaxSplitBytes(sparkSession)
).toArray

Copilot uses AI. Check for mistakes.
Comment on lines +225 to +229
inputFormatClass match {
case OrcInputFormat =>
case MapredParquetInputFormat =>
case _ =>
}
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputFormatClass match { case OrcInputFormat => ... } is matching a Class[_] value against a class name, and the cases are empty. If you need special handling by input format, compare against classOf[OrcInputFormat] / classOf[MapredParquetInputFormat] and implement the intended behavior; otherwise remove this dead code block.

Suggested change
inputFormatClass match {
case OrcInputFormat =>
case MapredParquetInputFormat =>
case _ =>
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support HiveTableScanExec to native

2 participants