-
Notifications
You must be signed in to change notification settings - Fork 207
[ARUON #1994] Support HiveTableScanExec to native #1995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| override def isEnabled: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) | ||
|
|
||
| def enableHiveTableScanExec: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be adding these to @SparkAuronConfiguration.java?
| override def isSupported(exec: SparkPlan): Boolean = | ||
| exec match { | ||
| case e: HiveTableScanExec if enableHiveTableScanExec && | ||
| e.relation.tableMeta.provider.isDefined && | ||
| e.relation.tableMeta.provider.get.equals("hive") => | ||
| true | ||
| case _ => false | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly hard to distinguish when to use isSupported v/s enableHiveTableScanExec flag. Do you mind adding documentation here?
| .newBuilder() | ||
| .setBaseConf(nativeFileScanConf) | ||
| .setFsResourceId(resourceId) | ||
| .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we will do a full table scan here? If that's a case, do you mind creating an issue and linking it here?
|
Are there tests we can add to verify conversion behavior here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR aims to add native execution support for Spark HiveTableScanExec by introducing a Hive-specific conversion provider and a new native scan exec implementation, plus updating build dependencies to compile against Spark Hive classes.
Changes:
- Add
HiveConvertProviderand a newNativeHiveTableScanExecimplementation intended to convertHiveTableScanExecto native ORC/Parquet scans. - Update
spark-extension-shims-sparkMaven dependencies to include Spark Hive/Catalyst APIs needed by the new shims code. - Refactor/import changes in
AuronConverters, and add a (currently unimplemented)convertHiveTableScanExecstub.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 19 comments.
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala | Import refactor; adds an unimplemented convertHiveTableScanExec method; currently introduces compile issues (missing import + stub). |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala | New native Hive table scan exec; currently contains multiple compile/runtime issues (API mismatches, pattern matches, session usage, unsupported formats). |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala | New ServiceLoader-based conversion provider for Hive scans; currently missing ServiceLoader registration and insufficient support checks. |
| spark-extension-shims-spark/pom.xml | Adds Spark Hive/Catalyst dependencies, but introduces duplicate spark-hive dependency entries with conflicting scopes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override def isSupported(exec: SparkPlan): Boolean = | ||
| exec match { | ||
| case e: HiveTableScanExec if enableHiveTableScanExec && | ||
| e.relation.tableMeta.provider.isDefined && | ||
| e.relation.tableMeta.provider.get.equals("hive") => | ||
| true |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isSupported accepts all Hive tables with provider == "hive", but NativeHiveTableScanExec only builds native nodes for ORC/Parquet and otherwise will throw (e.g., MatchError on file format). Add an explicit format check here (or make the native exec gracefully fall back) to avoid runtime failures on non-ORC/Parquet Hive tables.
| nativeTable.getMetadata) | ||
|
|
||
| @transient private lazy val nativeHadoopConf = { | ||
| val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nativeHadoopConf uses SparkSession.getActiveSession.get, which can throw if there is no active session (e.g., execution triggered outside a SQL context). Consider using the same session derivation as NativeHiveTableScanBase.broadcastedHadoopConf (Shims.get.getSqlContext(basedHiveScan).sparkSession) to avoid runtime failures.
| val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() | |
| val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession | |
| val hiveConf = sparkSession.sessionState.newHadoopConf() |
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | ||
| 0 // will splitted based on block by default. | ||
| } else { | ||
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | ||
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minPartitions reads SparkSession.getActiveSession.get.sparkContext multiple times. Besides the .get risk, it’s also inconsistent with other native scan implementations in this repo that pass an explicit sparkSession around. Prefer using a single sparkSession resolved from basedHiveScan and derive sparkContext from it.
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | ||
| 0 // will splitted based on block by default. | ||
| } else { | ||
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | ||
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) | ||
| } | ||
|
|
||
| private val ignoreEmptySplits = | ||
| SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignoreEmptySplits also depends on SparkSession.getActiveSession.get. This should use the same non-optional session/context resolution as the rest of the execution code to avoid NoSuchElementException when there is no active session.
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | |
| 0 // will splitted based on block by default. | |
| } else { | |
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | |
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) | |
| } | |
| private val ignoreEmptySplits = | |
| SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) | |
| private val minPartitions = if (sparkContext.isLocal) { | |
| 0 // will splitted based on block by default. | |
| } else { | |
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | |
| sparkContext.defaultMinPartitions) | |
| } | |
| private val ignoreEmptySplits = | |
| sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) |
| private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]): | ||
| InputFormat[Writable, Writable] = { | ||
| val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) | ||
| .asInstanceOf[InputFormat[Writable, Writable]] | ||
| newInputFormat match { |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getInputFormat takes a mapreduce.InputFormat class (newInputClass) but returns/instantiates org.apache.hadoop.mapred.InputFormat. This signature mismatch makes the unchecked cast even riskier. Align the parameter type with the returned InputFormat type (or vice versa) so the compiler can help enforce correctness.
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | ||
| arrayFilePartition.toArray | ||
| } | ||
|
|
||
| private def getMaxSplitBytes(sparkSession: SparkSession): Long = { | ||
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | ||
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | ||
| Math.min(defaultMaxSplitBytes, openCostInBytes) |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getMaxSplitBytes currently returns min(filesMaxPartitionBytes, filesOpenCostInBytes), which can drastically shrink splits and create excessive partitions. Elsewhere in this repo (NativePaimonTableScanExec) you fork Spark’s FilePartition#maxSplitBytes logic using min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore)). Align this implementation to that logic (or call the shared helper) to avoid performance regressions.
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | |
| arrayFilePartition.toArray | |
| } | |
| private def getMaxSplitBytes(sparkSession: SparkSession): Long = { | |
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | |
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | |
| Math.min(defaultMaxSplitBytes, openCostInBytes) | |
| getMaxSplitBytes(SparkSession.getActiveSession.get, partitionedFiles)).toArray | |
| arrayFilePartition.toArray | |
| } | |
| private def getMaxSplitBytes( | |
| sparkSession: SparkSession, | |
| partitionedFiles: Seq[PartitionedFile]): Long = { | |
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | |
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | |
| val totalBytes = partitionedFiles.map(_.length).sum | |
| val parallelism = math.max(1, sparkSession.sparkContext.defaultParallelism) | |
| val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes / parallelism | |
| Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) |
| class HiveConvertProvider extends AuronConvertProvider with Logging { | ||
| override def isEnabled: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) | ||
|
|
||
| def enableHiveTableScanExec: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) | ||
|
|
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HiveConvertProvider is discovered via ServiceLoader (AuronConverters loads AuronConvertProvider implementations). This module currently doesn’t include a META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider entry, so the provider won’t be loaded at runtime (see thirdparty/auron-paimon for the existing pattern).
| def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { | ||
| AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec)) | ||
| } |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are existing query/operator validation test utilities in this module, but this new Hive scan conversion path doesn’t appear to have coverage. Add at least one suite that enables spark.auron.enable.hiveTableScanExec and verifies HiveTableScanExec is converted (and that unsupported formats don’t break execution).
| arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, | ||
| partitionedFiles, | ||
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...) again relies on getActiveSession.get. Use the sparkSession derived from basedHiveScan (as in NativeHiveTableScanBase) so partition planning doesn’t fail when there’s no active session.
| arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, | |
| partitionedFiles, | |
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | |
| val sparkSession = basedHiveScan.sparkSession | |
| arrayFilePartition += FilePartition.getFilePartitions( | |
| sparkSession, | |
| partitionedFiles, | |
| getMaxSplitBytes(sparkSession) | |
| ).toArray |
| inputFormatClass match { | ||
| case OrcInputFormat => | ||
| case MapredParquetInputFormat => | ||
| case _ => | ||
| } |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputFormatClass match { case OrcInputFormat => ... } is matching a Class[_] value against a class name, and the cases are empty. If you need special handling by input format, compare against classOf[OrcInputFormat] / classOf[MapredParquetInputFormat] and implement the intended behavior; otherwise remove this dead code block.
| inputFormatClass match { | |
| case OrcInputFormat => | |
| case MapredParquetInputFormat => | |
| case _ => | |
| } |
Which issue does this PR close?
Closes #1994
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?