-
Notifications
You must be signed in to change notification settings - Fork 220
[AURON #2253] Support insert-only Iceberg changelog native scan #2254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,13 @@ package org.apache.spark.sql.auron.iceberg | |
| import scala.collection.JavaConverters._ | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} | ||
| import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanTask, FileFormat, FileScanTask, MetadataColumns, ScanTask} | ||
| import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate} | ||
| import org.apache.iceberg.spark.source.AuronIcebergSourceUtil | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.auron.NativeConverters | ||
| import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr} | ||
| import org.apache.spark.sql.connector.read.InputPartition | ||
| import org.apache.spark.sql.connector.read.{InputPartition, Scan} | ||
| import org.apache.spark.sql.execution.datasources.v2.BatchScanExec | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType, StructField, StructType} | ||
|
|
@@ -35,8 +35,15 @@ import org.apache.auron.{protobuf => pb} | |
| // fileSchema is read from the data files. partitionSchema carries supported metadata columns | ||
| // (for example _file and _spec_id) that are materialized as per-file constant values in | ||
| // the native scan. | ||
| final case class IcebergNativeScanTask( | ||
| location: String, | ||
| start: Long, | ||
| length: Long, | ||
| fileSizeInBytes: Long, | ||
| partitionValues: Seq[Any]) | ||
|
|
||
| final case class IcebergScanPlan( | ||
| fileTasks: Seq[FileScanTask], | ||
| scanTasks: Seq[IcebergNativeScanTask], | ||
| fileFormat: FileFormat, | ||
| readSchema: StructType, | ||
| fileSchema: StructType, | ||
|
|
@@ -45,41 +52,104 @@ final case class IcebergScanPlan( | |
|
|
||
| object IcebergScanSupport extends Logging { | ||
|
|
||
| private val SparkChangelogScanClassName = | ||
| "org.apache.iceberg.spark.source.SparkChangelogScan" | ||
|
|
||
| private val ChangelogMetadataColumnNames = Set( | ||
| MetadataColumns.CHANGE_TYPE.name(), | ||
| MetadataColumns.CHANGE_ORDINAL.name(), | ||
| MetadataColumns.COMMIT_SNAPSHOT_ID.name()) | ||
|
|
||
| def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { | ||
| val scan = exec.scan | ||
| val scanClassName = scan.getClass.getName | ||
| // Only handle Iceberg scans; other sources must stay on Spark's path. | ||
| if (scanClassName == SparkChangelogScanClassName) { | ||
| return planChangelogScan(exec, scan) | ||
| } | ||
|
|
||
| if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) { | ||
| return None | ||
| } | ||
|
|
||
| planFileScan(exec, scan, scanClassName) | ||
| } | ||
|
|
||
| private def planFileScan( | ||
| exec: BatchScanExec, | ||
| scan: Scan, | ||
| scanClassName: String): Option[IcebergScanPlan] = { | ||
| val readSchema = scan.readSchema | ||
| val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) | ||
| // Native scan can project file-level metadata columns such as _file and _spec_id | ||
| // via partition values. | ||
| // Metadata columns that require per-row materialization (for example _pos) still fallback. | ||
| if (unsupportedMetadataColumns.nonEmpty) { | ||
| val schemas = supportedSchemas(readSchema, isChangelogScan = false) | ||
| if (schemas.isEmpty) { | ||
| return None | ||
| } | ||
| val (fileSchema, partitionSchema) = schemas.get | ||
|
|
||
| val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn)) | ||
| // Supported metadata columns are materialized via per-file constant values rather than | ||
| // read from the Iceberg data file payload. | ||
| val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn)) | ||
| val partitions = inputPartitions(exec) | ||
| // Empty scan (e.g. empty table) should still build a plan to return no rows. | ||
| if (partitions.isEmpty) { | ||
| logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") | ||
| return Some( | ||
| IcebergScanPlan( | ||
| Seq.empty, | ||
| FileFormat.PARQUET, | ||
| readSchema, | ||
| fileSchema, | ||
| partitionSchema, | ||
| Seq.empty)) | ||
| } | ||
|
|
||
| if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { | ||
| val icebergPartitions = partitions.flatMap(icebergPartition) | ||
| // All partitions must be Iceberg SparkInputPartition with file scan tasks; otherwise fallback. | ||
| if (icebergPartitions.size != partitions.size) { | ||
| return None | ||
| } | ||
|
|
||
| if (!partitionSchema.fields.forall(field => | ||
| NativeConverters.isTypeSupported(field.dataType))) { | ||
| val rawTasks = icebergPartitions.flatMap(_.tasks) | ||
| val fileTasks = rawTasks.collect { case task: FileScanTask => task } | ||
| if (fileTasks.size != rawTasks.size) { | ||
| return None | ||
| } | ||
|
|
||
| // Native scan does not apply delete files; only allow pure data files (COW). | ||
| if (!fileTasks.forall(task => deletesEmpty(task.deletes()))) { | ||
| return None | ||
| } | ||
|
|
||
| // Native scan handles a single file format; mixed formats must fallback. | ||
| val formats = fileTasks.map(_.file().format()).distinct | ||
| if (formats.size > 1) { | ||
| return None | ||
| } | ||
|
|
||
| val format = formats.headOption.getOrElse(FileFormat.PARQUET) | ||
| if (format != FileFormat.PARQUET && format != FileFormat.ORC) { | ||
| return None | ||
| } | ||
|
|
||
| val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) | ||
| val nativeTasks = fileTasks.map(task => toNativeScanTask(task, partitionSchema)) | ||
| Some( | ||
| IcebergScanPlan( | ||
| nativeTasks, | ||
| format, | ||
| readSchema, | ||
| fileSchema, | ||
| partitionSchema, | ||
| pruningPredicates)) | ||
| } | ||
|
|
||
| private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = { | ||
| val readSchema = scan.readSchema | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out. I replaced the raw |
||
| val schemas = supportedSchemas(readSchema, isChangelogScan = true) | ||
| if (schemas.isEmpty) { | ||
| return None | ||
| } | ||
| val (fileSchema, partitionSchema) = schemas.get | ||
|
|
||
| val partitions = inputPartitions(exec) | ||
| // Empty scan (e.g. empty table) should still build a plan to return no rows. | ||
| if (partitions.isEmpty) { | ||
| logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") | ||
| return Some( | ||
| IcebergScanPlan( | ||
| Seq.empty, | ||
|
|
@@ -91,20 +161,32 @@ object IcebergScanSupport extends Logging { | |
| } | ||
|
|
||
| val icebergPartitions = partitions.flatMap(icebergPartition) | ||
| // All partitions must be Iceberg SparkInputPartition; otherwise fallback. | ||
| if (icebergPartitions.size != partitions.size) { | ||
| return None | ||
| } | ||
|
|
||
| val fileTasks = icebergPartitions.flatMap(_.fileTasks) | ||
| val rawTasks = icebergPartitions.flatMap(_.tasks) | ||
| val changelogTasks = rawTasks.collect { case task: ChangelogScanTask => task } | ||
| if (changelogTasks.size != rawTasks.size) { | ||
| return None | ||
| } | ||
|
|
||
| // Native scan does not apply delete files; only allow pure data files (COW). | ||
| if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { | ||
| val addedRowsTasks = changelogTasks.collect { case task: AddedRowsScanTask => task } | ||
| // First native changelog support is insert-only. Delete and update images need Iceberg | ||
| // delete-file handling, so keep them on Spark's reader for now. | ||
| if (addedRowsTasks.size != changelogTasks.size) { | ||
| return None | ||
| } | ||
|
|
||
| // Native scan handles a single file format; mixed formats must fallback. | ||
| val formats = fileTasks.map(_.file().format()).distinct | ||
| if (!addedRowsTasks.forall(_.operation() == ChangelogOperation.INSERT)) { | ||
| return None | ||
| } | ||
|
|
||
| if (!addedRowsTasks.forall(task => deletesEmpty(task.deletes()))) { | ||
| return None | ||
| } | ||
|
|
||
| val formats = addedRowsTasks.map(_.file().format()).distinct | ||
| if (formats.size > 1) { | ||
| return None | ||
| } | ||
|
|
@@ -115,27 +197,69 @@ object IcebergScanSupport extends Logging { | |
| } | ||
|
|
||
| val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) | ||
| val nativeTasks = addedRowsTasks.map(task => toNativeScanTask(task, partitionSchema)) | ||
| Some( | ||
| IcebergScanPlan( | ||
| fileTasks, | ||
| nativeTasks, | ||
| format, | ||
| readSchema, | ||
| fileSchema, | ||
| partitionSchema, | ||
| pruningPredicates)) | ||
| } | ||
|
|
||
| private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] = | ||
| private def supportedSchemas( | ||
| readSchema: StructType, | ||
| isChangelogScan: Boolean): Option[(StructType, StructType)] = { | ||
| val unsupportedMetadataColumns = | ||
| collectUnsupportedMetadataColumns(readSchema, isChangelogScan) | ||
| // Supported metadata columns are materialized via per-file/per-task constant values rather | ||
| // than read from the Iceberg data file payload. Metadata columns that require per-row | ||
| // materialization (for example _pos) still fallback. | ||
| if (unsupportedMetadataColumns.nonEmpty) { | ||
| return None | ||
| } | ||
|
|
||
| val fileSchema = | ||
| StructType(readSchema.fields.filterNot(isSupportedMetadataColumn(_, isChangelogScan))) | ||
| val partitionSchema = | ||
| StructType(readSchema.fields.filter(isSupportedMetadataColumn(_, isChangelogScan))) | ||
|
|
||
| if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { | ||
| return None | ||
| } | ||
|
|
||
| if (!partitionSchema.fields.forall(field => | ||
| NativeConverters.isTypeSupported(field.dataType))) { | ||
| return None | ||
| } | ||
|
|
||
| Some(fileSchema -> partitionSchema) | ||
| } | ||
|
|
||
| private def collectUnsupportedMetadataColumns( | ||
| schema: StructType, | ||
| isChangelogScan: Boolean): Seq[String] = | ||
| schema.fields.collect { | ||
| case field | ||
| if MetadataColumns.isMetadataColumn(field.name) && | ||
| !isSupportedMetadataColumn(field) => | ||
| if isIcebergMetadataColumn(field.name, isChangelogScan) && | ||
| !isSupportedMetadataColumn(field, isChangelogScan) => | ||
| field.name | ||
| } | ||
|
|
||
| private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean = | ||
| private def isIcebergMetadataColumn(name: String, isChangelogScan: Boolean): Boolean = | ||
| MetadataColumns.isMetadataColumn(name) || | ||
| (isChangelogScan && ChangelogMetadataColumnNames.contains(name)) | ||
|
|
||
| private def isSupportedMetadataColumn( | ||
| field: org.apache.spark.sql.types.StructField, | ||
| isChangelogScan: Boolean): Boolean = | ||
| field.name == MetadataColumns.FILE_PATH.name() || | ||
| field.name == MetadataColumns.SPEC_ID.name() | ||
| field.name == MetadataColumns.SPEC_ID.name() || | ||
| (isChangelogScan && ChangelogMetadataColumnNames.contains(field.name)) | ||
|
|
||
| private def deletesEmpty(deletes: java.util.List[_]): Boolean = | ||
| deletes == null || deletes.isEmpty | ||
|
|
||
| private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { | ||
| // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. | ||
|
|
@@ -193,7 +317,7 @@ object IcebergScanSupport extends Logging { | |
| } | ||
| } | ||
|
|
||
| private case class IcebergPartitionView(fileTasks: Seq[FileScanTask]) | ||
| private case class IcebergPartitionView(tasks: Seq[ScanTask]) | ||
|
|
||
| private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { | ||
| val className = partition.getClass.getName | ||
|
|
@@ -208,25 +332,78 @@ object IcebergScanSupport extends Logging { | |
| taskGroupField.setAccessible(true) | ||
| val taskGroup = taskGroupField.get(partition) | ||
|
|
||
| // Extract tasks and keep only file scan tasks. | ||
| // Extract the Iceberg scan tasks. The caller decides which concrete task type is supported. | ||
| val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks") | ||
| tasksMethod.setAccessible(true) | ||
| val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala | ||
| val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq | ||
| val icebergTasks = tasks.collect { case task: ScanTask => task }.toSeq | ||
|
|
||
| // If any task is not a FileScanTask, fallback. | ||
| if (fileTasks.size != tasks.size) { | ||
| if (icebergTasks.size != tasks.size) { | ||
| return None | ||
| } | ||
|
|
||
| Some(IcebergPartitionView(fileTasks)) | ||
| Some(IcebergPartitionView(icebergTasks)) | ||
| } catch { | ||
| case NonFatal(t) => | ||
| logDebug(s"Failed to read Iceberg SparkInputPartition via reflection for $className.", t) | ||
| None | ||
| } | ||
| } | ||
|
|
||
| private def toNativeScanTask( | ||
| task: FileScanTask, | ||
| partitionSchema: StructType): IcebergNativeScanTask = { | ||
| val file = task.file() | ||
| IcebergNativeScanTask( | ||
| file.location(), | ||
| task.start(), | ||
| task.length(), | ||
| file.fileSizeInBytes(), | ||
| metadataPartitionValues(file.location(), file.specId(), None, partitionSchema)) | ||
| } | ||
|
|
||
| private def toNativeScanTask( | ||
| task: AddedRowsScanTask, | ||
| partitionSchema: StructType): IcebergNativeScanTask = { | ||
| val file = task.file() | ||
| IcebergNativeScanTask( | ||
| file.location(), | ||
| task.start(), | ||
| task.length(), | ||
| file.fileSizeInBytes(), | ||
| metadataPartitionValues(file.location(), file.specId(), Some(task), partitionSchema)) | ||
| } | ||
|
|
||
| private def metadataPartitionValues( | ||
| filePath: String, | ||
| specId: Int, | ||
| changelogTask: Option[ChangelogScanTask], | ||
| partitionSchema: StructType): Seq[Any] = { | ||
| def requiredChangelogTask(columnName: String): ChangelogScanTask = | ||
| changelogTask.getOrElse { | ||
| throw new IllegalStateException( | ||
| s"Iceberg changelog metadata column requires a changelog scan task: $columnName") | ||
| } | ||
|
|
||
| partitionSchema.fields.map { field => | ||
| field.name match { | ||
| case name if name == MetadataColumns.FILE_PATH.name() => | ||
| filePath | ||
| case name if name == MetadataColumns.SPEC_ID.name() => | ||
| specId | ||
| case name if name == MetadataColumns.CHANGE_TYPE.name() => | ||
| requiredChangelogTask(name).operation().name() | ||
| case name if name == MetadataColumns.CHANGE_ORDINAL.name() => | ||
| requiredChangelogTask(name).changeOrdinal() | ||
| case name if name == MetadataColumns.COMMIT_SNAPSHOT_ID.name() => | ||
| requiredChangelogTask(name).commitSnapshotId() | ||
| case name => | ||
| throw new IllegalStateException( | ||
| s"unsupported Iceberg metadata column in native scan: $name") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def collectPruningPredicates( | ||
| scan: AnyRef, | ||
| readSchema: StructType): Seq[pb.PhysicalExprNode] = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tasks.collect { case task: AnyRef => task }matches everything, soicebergTasks.size != tasks.sizeis always false. The size guard has become a no-op. Either keep the type asSeq[ScanTask](the Iceberg interface), or drop the dead guard entirely.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, good catch. I changed the reflected task collection to keep the Iceberg
ScanTaskinterface instead ofAnyRef, so the size guard is meaningful again. The normal and changelog scan planning paths still decide which concrete task types they support (FileScanTaskorChangelogScanTask) after this common extraction step.