Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.spark.sql.auron.iceberg
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanTask, FileFormat, FileScanTask, MetadataColumns, ScanTask}
import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate}
import org.apache.iceberg.spark.source.AuronIcebergSourceUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType, StructField, StructType}
Expand All @@ -35,8 +35,15 @@ import org.apache.auron.{protobuf => pb}
// fileSchema is read from the data files. partitionSchema carries supported metadata columns
// (for example _file and _spec_id) that are materialized as per-file constant values in
// the native scan.
final case class IcebergNativeScanTask(
location: String,
start: Long,
length: Long,
fileSizeInBytes: Long,
partitionValues: Seq[Any])

final case class IcebergScanPlan(
fileTasks: Seq[FileScanTask],
scanTasks: Seq[IcebergNativeScanTask],
fileFormat: FileFormat,
readSchema: StructType,
fileSchema: StructType,
Expand All @@ -45,41 +52,104 @@ final case class IcebergScanPlan(

object IcebergScanSupport extends Logging {

private val SparkChangelogScanClassName =
"org.apache.iceberg.spark.source.SparkChangelogScan"

private val ChangelogMetadataColumnNames = Set(
MetadataColumns.CHANGE_TYPE.name(),
MetadataColumns.CHANGE_ORDINAL.name(),
MetadataColumns.COMMIT_SNAPSHOT_ID.name())

def plan(exec: BatchScanExec): Option[IcebergScanPlan] = {
val scan = exec.scan
val scanClassName = scan.getClass.getName
// Only handle Iceberg scans; other sources must stay on Spark's path.
if (scanClassName == SparkChangelogScanClassName) {
return planChangelogScan(exec, scan)
}

if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) {
return None
}

planFileScan(exec, scan, scanClassName)
}

private def planFileScan(
exec: BatchScanExec,
scan: Scan,
scanClassName: String): Option[IcebergScanPlan] = {
val readSchema = scan.readSchema
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
// Native scan can project file-level metadata columns such as _file and _spec_id
// via partition values.
// Metadata columns that require per-row materialization (for example _pos) still fallback.
if (unsupportedMetadataColumns.nonEmpty) {
val schemas = supportedSchemas(readSchema, isChangelogScan = false)
if (schemas.isEmpty) {
return None
}
val (fileSchema, partitionSchema) = schemas.get

val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
// Supported metadata columns are materialized via per-file constant values rather than
// read from the Iceberg data file payload.
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))
val partitions = inputPartitions(exec)
// Empty scan (e.g. empty table) should still build a plan to return no rows.
if (partitions.isEmpty) {
logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.")
return Some(
IcebergScanPlan(
Seq.empty,
FileFormat.PARQUET,
readSchema,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks.collect { case task: AnyRef => task } matches everything, so icebergTasks.size != tasks.size is always false. The size guard has become a no-op. Either keep the type as Seq[ScanTask] (the Iceberg interface), or drop the dead guard entirely.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good catch. I changed the reflected task collection to keep the Iceberg ScanTask interface instead of AnyRef, so the size guard is meaningful again. The normal and changelog scan planning paths still decide which concrete task types they support (FileScanTask or ChangelogScanTask) after this common extraction step.

fileSchema,
partitionSchema,
Seq.empty))
}

if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
val icebergPartitions = partitions.flatMap(icebergPartition)
// All partitions must be Iceberg SparkInputPartition with file scan tasks; otherwise fallback.
if (icebergPartitions.size != partitions.size) {
return None
}

if (!partitionSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
val rawTasks = icebergPartitions.flatMap(_.tasks)
val fileTasks = rawTasks.collect { case task: FileScanTask => task }
if (fileTasks.size != rawTasks.size) {
return None
}

// Native scan does not apply delete files; only allow pure data files (COW).
if (!fileTasks.forall(task => deletesEmpty(task.deletes()))) {
return None
}

// Native scan handles a single file format; mixed formats must fallback.
val formats = fileTasks.map(_.file().format()).distinct
if (formats.size > 1) {
return None
}

val format = formats.headOption.getOrElse(FileFormat.PARQUET)
if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
return None
}

val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
val nativeTasks = fileTasks.map(task => toNativeScanTask(task, partitionSchema))
Some(
IcebergScanPlan(
nativeTasks,
format,
readSchema,
fileSchema,
partitionSchema,
pruningPredicates))
}

private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = {
val readSchema = scan.readSchema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changelogTask.get will throw NoSuchElementException if called with None and a partition schema containing a changelog column. The current callers keep these two in sync, but the method signature does not enforce it. Consider taking ChangelogScanTask directly instead of Option, or at least replace the .get calls with a match / getOrElse that throws a meaningful error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I replaced the raw changelogTask.get calls with a helper that uses getOrElse and throws a meaningful IllegalStateException when a changelog metadata column is requested without a changelog scan task.

val schemas = supportedSchemas(readSchema, isChangelogScan = true)
if (schemas.isEmpty) {
return None
}
val (fileSchema, partitionSchema) = schemas.get

val partitions = inputPartitions(exec)
// Empty scan (e.g. empty table) should still build a plan to return no rows.
if (partitions.isEmpty) {
logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.")
return Some(
IcebergScanPlan(
Seq.empty,
Expand All @@ -91,20 +161,32 @@ object IcebergScanSupport extends Logging {
}

val icebergPartitions = partitions.flatMap(icebergPartition)
// All partitions must be Iceberg SparkInputPartition; otherwise fallback.
if (icebergPartitions.size != partitions.size) {
return None
}

val fileTasks = icebergPartitions.flatMap(_.fileTasks)
val rawTasks = icebergPartitions.flatMap(_.tasks)
val changelogTasks = rawTasks.collect { case task: ChangelogScanTask => task }
if (changelogTasks.size != rawTasks.size) {
return None
}

// Native scan does not apply delete files; only allow pure data files (COW).
if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) {
val addedRowsTasks = changelogTasks.collect { case task: AddedRowsScanTask => task }
// First native changelog support is insert-only. Delete and update images need Iceberg
// delete-file handling, so keep them on Spark's reader for now.
if (addedRowsTasks.size != changelogTasks.size) {
return None
}

// Native scan handles a single file format; mixed formats must fallback.
val formats = fileTasks.map(_.file().format()).distinct
if (!addedRowsTasks.forall(_.operation() == ChangelogOperation.INSERT)) {
return None
}

if (!addedRowsTasks.forall(task => deletesEmpty(task.deletes()))) {
return None
}

val formats = addedRowsTasks.map(_.file().format()).distinct
if (formats.size > 1) {
return None
}
Expand All @@ -115,27 +197,69 @@ object IcebergScanSupport extends Logging {
}

val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
val nativeTasks = addedRowsTasks.map(task => toNativeScanTask(task, partitionSchema))
Some(
IcebergScanPlan(
fileTasks,
nativeTasks,
format,
readSchema,
fileSchema,
partitionSchema,
pruningPredicates))
}

private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] =
private def supportedSchemas(
readSchema: StructType,
isChangelogScan: Boolean): Option[(StructType, StructType)] = {
val unsupportedMetadataColumns =
collectUnsupportedMetadataColumns(readSchema, isChangelogScan)
// Supported metadata columns are materialized via per-file/per-task constant values rather
// than read from the Iceberg data file payload. Metadata columns that require per-row
// materialization (for example _pos) still fallback.
if (unsupportedMetadataColumns.nonEmpty) {
return None
}

val fileSchema =
StructType(readSchema.fields.filterNot(isSupportedMetadataColumn(_, isChangelogScan)))
val partitionSchema =
StructType(readSchema.fields.filter(isSupportedMetadataColumn(_, isChangelogScan)))

if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
return None
}

if (!partitionSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
return None
}

Some(fileSchema -> partitionSchema)
}

private def collectUnsupportedMetadataColumns(
schema: StructType,
isChangelogScan: Boolean): Seq[String] =
schema.fields.collect {
case field
if MetadataColumns.isMetadataColumn(field.name) &&
!isSupportedMetadataColumn(field) =>
if isIcebergMetadataColumn(field.name, isChangelogScan) &&
!isSupportedMetadataColumn(field, isChangelogScan) =>
field.name
}

private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean =
private def isIcebergMetadataColumn(name: String, isChangelogScan: Boolean): Boolean =
MetadataColumns.isMetadataColumn(name) ||
(isChangelogScan && ChangelogMetadataColumnNames.contains(name))

private def isSupportedMetadataColumn(
field: org.apache.spark.sql.types.StructField,
isChangelogScan: Boolean): Boolean =
field.name == MetadataColumns.FILE_PATH.name() ||
field.name == MetadataColumns.SPEC_ID.name()
field.name == MetadataColumns.SPEC_ID.name() ||
(isChangelogScan && ChangelogMetadataColumnNames.contains(field.name))

private def deletesEmpty(deletes: java.util.List[_]): Boolean =
deletes == null || deletes.isEmpty

private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
// Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection.
Expand Down Expand Up @@ -193,7 +317,7 @@ object IcebergScanSupport extends Logging {
}
}

private case class IcebergPartitionView(fileTasks: Seq[FileScanTask])
private case class IcebergPartitionView(tasks: Seq[ScanTask])

private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = {
val className = partition.getClass.getName
Expand All @@ -208,25 +332,78 @@ object IcebergScanSupport extends Logging {
taskGroupField.setAccessible(true)
val taskGroup = taskGroupField.get(partition)

// Extract tasks and keep only file scan tasks.
// Extract the Iceberg scan tasks. The caller decides which concrete task type is supported.
val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks")
tasksMethod.setAccessible(true)
val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala
val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq
val icebergTasks = tasks.collect { case task: ScanTask => task }.toSeq

// If any task is not a FileScanTask, fallback.
if (fileTasks.size != tasks.size) {
if (icebergTasks.size != tasks.size) {
return None
}

Some(IcebergPartitionView(fileTasks))
Some(IcebergPartitionView(icebergTasks))
} catch {
case NonFatal(t) =>
logDebug(s"Failed to read Iceberg SparkInputPartition via reflection for $className.", t)
None
}
}

private def toNativeScanTask(
task: FileScanTask,
partitionSchema: StructType): IcebergNativeScanTask = {
val file = task.file()
IcebergNativeScanTask(
file.location(),
task.start(),
task.length(),
file.fileSizeInBytes(),
metadataPartitionValues(file.location(), file.specId(), None, partitionSchema))
}

private def toNativeScanTask(
task: AddedRowsScanTask,
partitionSchema: StructType): IcebergNativeScanTask = {
val file = task.file()
IcebergNativeScanTask(
file.location(),
task.start(),
task.length(),
file.fileSizeInBytes(),
metadataPartitionValues(file.location(), file.specId(), Some(task), partitionSchema))
}

private def metadataPartitionValues(
filePath: String,
specId: Int,
changelogTask: Option[ChangelogScanTask],
partitionSchema: StructType): Seq[Any] = {
def requiredChangelogTask(columnName: String): ChangelogScanTask =
changelogTask.getOrElse {
throw new IllegalStateException(
s"Iceberg changelog metadata column requires a changelog scan task: $columnName")
}

partitionSchema.fields.map { field =>
field.name match {
case name if name == MetadataColumns.FILE_PATH.name() =>
filePath
case name if name == MetadataColumns.SPEC_ID.name() =>
specId
case name if name == MetadataColumns.CHANGE_TYPE.name() =>
requiredChangelogTask(name).operation().name()
case name if name == MetadataColumns.CHANGE_ORDINAL.name() =>
requiredChangelogTask(name).changeOrdinal()
case name if name == MetadataColumns.COMMIT_SNAPSHOT_ID.name() =>
requiredChangelogTask(name).commitSnapshotId()
case name =>
throw new IllegalStateException(
s"unsupported Iceberg metadata column in native scan: $name")
}
}
}

private def collectPruningPredicates(
scan: AnyRef,
readSchema: StructType): Seq[pb.PhysicalExprNode] = {
Expand Down
Loading
Loading