diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index ed0e42653..f19d1ca86 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.auron.iceberg import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} +import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanTask, FileFormat, FileScanTask, MetadataColumns, ScanTask} import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate} import org.apache.iceberg.spark.source.AuronIcebergSourceUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr} -import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType, StructField, StructType} @@ -35,8 +35,15 @@ import org.apache.auron.{protobuf => pb} // fileSchema is read from the data files. partitionSchema carries supported metadata columns // (for example _file and _spec_id) that are materialized as per-file constant values in // the native scan. +final case class IcebergNativeScanTask( + location: String, + start: Long, + length: Long, + fileSizeInBytes: Long, + partitionValues: Seq[Any]) + final case class IcebergScanPlan( - fileTasks: Seq[FileScanTask], + scanTasks: Seq[IcebergNativeScanTask], fileFormat: FileFormat, readSchema: StructType, fileSchema: StructType, @@ -45,41 +52,104 @@ final case class IcebergScanPlan( object IcebergScanSupport extends Logging { + private val SparkChangelogScanClassName = + "org.apache.iceberg.spark.source.SparkChangelogScan" + + private val ChangelogMetadataColumnNames = Set( + MetadataColumns.CHANGE_TYPE.name(), + MetadataColumns.CHANGE_ORDINAL.name(), + MetadataColumns.COMMIT_SNAPSHOT_ID.name()) + def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. + if (scanClassName == SparkChangelogScanClassName) { + return planChangelogScan(exec, scan) + } + if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) { return None } + planFileScan(exec, scan, scanClassName) + } + + private def planFileScan( + exec: BatchScanExec, + scan: Scan, + scanClassName: String): Option[IcebergScanPlan] = { val readSchema = scan.readSchema - val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) - // Native scan can project file-level metadata columns such as _file and _spec_id - // via partition values. - // Metadata columns that require per-row materialization (for example _pos) still fallback. - if (unsupportedMetadataColumns.nonEmpty) { + val schemas = supportedSchemas(readSchema, isChangelogScan = false) + if (schemas.isEmpty) { return None } + val (fileSchema, partitionSchema) = schemas.get - val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn)) - // Supported metadata columns are materialized via per-file constant values rather than - // read from the Iceberg data file payload. - val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn)) + val partitions = inputPartitions(exec) + // Empty scan (e.g. empty table) should still build a plan to return no rows. + if (partitions.isEmpty) { + logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") + return Some( + IcebergScanPlan( + Seq.empty, + FileFormat.PARQUET, + readSchema, + fileSchema, + partitionSchema, + Seq.empty)) + } - if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + val icebergPartitions = partitions.flatMap(icebergPartition) + // All partitions must be Iceberg SparkInputPartition with file scan tasks; otherwise fallback. + if (icebergPartitions.size != partitions.size) { return None } - if (!partitionSchema.fields.forall(field => - NativeConverters.isTypeSupported(field.dataType))) { + val rawTasks = icebergPartitions.flatMap(_.tasks) + val fileTasks = rawTasks.collect { case task: FileScanTask => task } + if (fileTasks.size != rawTasks.size) { + return None + } + + // Native scan does not apply delete files; only allow pure data files (COW). + if (!fileTasks.forall(task => deletesEmpty(task.deletes()))) { + return None + } + + // Native scan handles a single file format; mixed formats must fallback. + val formats = fileTasks.map(_.file().format()).distinct + if (formats.size > 1) { + return None + } + + val format = formats.headOption.getOrElse(FileFormat.PARQUET) + if (format != FileFormat.PARQUET && format != FileFormat.ORC) { + return None + } + + val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) + val nativeTasks = fileTasks.map(task => toNativeScanTask(task, partitionSchema)) + Some( + IcebergScanPlan( + nativeTasks, + format, + readSchema, + fileSchema, + partitionSchema, + pruningPredicates)) + } + + private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = { + val readSchema = scan.readSchema + val schemas = supportedSchemas(readSchema, isChangelogScan = true) + if (schemas.isEmpty) { return None } + val (fileSchema, partitionSchema) = schemas.get val partitions = inputPartitions(exec) - // Empty scan (e.g. empty table) should still build a plan to return no rows. if (partitions.isEmpty) { - logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") return Some( IcebergScanPlan( Seq.empty, @@ -91,20 +161,32 @@ object IcebergScanSupport extends Logging { } val icebergPartitions = partitions.flatMap(icebergPartition) - // All partitions must be Iceberg SparkInputPartition; otherwise fallback. if (icebergPartitions.size != partitions.size) { return None } - val fileTasks = icebergPartitions.flatMap(_.fileTasks) + val rawTasks = icebergPartitions.flatMap(_.tasks) + val changelogTasks = rawTasks.collect { case task: ChangelogScanTask => task } + if (changelogTasks.size != rawTasks.size) { + return None + } - // Native scan does not apply delete files; only allow pure data files (COW). - if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { + val addedRowsTasks = changelogTasks.collect { case task: AddedRowsScanTask => task } + // First native changelog support is insert-only. Delete and update images need Iceberg + // delete-file handling, so keep them on Spark's reader for now. + if (addedRowsTasks.size != changelogTasks.size) { return None } - // Native scan handles a single file format; mixed formats must fallback. - val formats = fileTasks.map(_.file().format()).distinct + if (!addedRowsTasks.forall(_.operation() == ChangelogOperation.INSERT)) { + return None + } + + if (!addedRowsTasks.forall(task => deletesEmpty(task.deletes()))) { + return None + } + + val formats = addedRowsTasks.map(_.file().format()).distinct if (formats.size > 1) { return None } @@ -115,9 +197,10 @@ object IcebergScanSupport extends Logging { } val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) + val nativeTasks = addedRowsTasks.map(task => toNativeScanTask(task, partitionSchema)) Some( IcebergScanPlan( - fileTasks, + nativeTasks, format, readSchema, fileSchema, @@ -125,17 +208,58 @@ object IcebergScanSupport extends Logging { pruningPredicates)) } - private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] = + private def supportedSchemas( + readSchema: StructType, + isChangelogScan: Boolean): Option[(StructType, StructType)] = { + val unsupportedMetadataColumns = + collectUnsupportedMetadataColumns(readSchema, isChangelogScan) + // Supported metadata columns are materialized via per-file/per-task constant values rather + // than read from the Iceberg data file payload. Metadata columns that require per-row + // materialization (for example _pos) still fallback. + if (unsupportedMetadataColumns.nonEmpty) { + return None + } + + val fileSchema = + StructType(readSchema.fields.filterNot(isSupportedMetadataColumn(_, isChangelogScan))) + val partitionSchema = + StructType(readSchema.fields.filter(isSupportedMetadataColumn(_, isChangelogScan))) + + if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + if (!partitionSchema.fields.forall(field => + NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + Some(fileSchema -> partitionSchema) + } + + private def collectUnsupportedMetadataColumns( + schema: StructType, + isChangelogScan: Boolean): Seq[String] = schema.fields.collect { case field - if MetadataColumns.isMetadataColumn(field.name) && - !isSupportedMetadataColumn(field) => + if isIcebergMetadataColumn(field.name, isChangelogScan) && + !isSupportedMetadataColumn(field, isChangelogScan) => field.name } - private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean = + private def isIcebergMetadataColumn(name: String, isChangelogScan: Boolean): Boolean = + MetadataColumns.isMetadataColumn(name) || + (isChangelogScan && ChangelogMetadataColumnNames.contains(name)) + + private def isSupportedMetadataColumn( + field: org.apache.spark.sql.types.StructField, + isChangelogScan: Boolean): Boolean = field.name == MetadataColumns.FILE_PATH.name() || - field.name == MetadataColumns.SPEC_ID.name() + field.name == MetadataColumns.SPEC_ID.name() || + (isChangelogScan && ChangelogMetadataColumnNames.contains(field.name)) + + private def deletesEmpty(deletes: java.util.List[_]): Boolean = + deletes == null || deletes.isEmpty private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. @@ -193,7 +317,7 @@ object IcebergScanSupport extends Logging { } } - private case class IcebergPartitionView(fileTasks: Seq[FileScanTask]) + private case class IcebergPartitionView(tasks: Seq[ScanTask]) private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { val className = partition.getClass.getName @@ -208,18 +332,17 @@ object IcebergScanSupport extends Logging { taskGroupField.setAccessible(true) val taskGroup = taskGroupField.get(partition) - // Extract tasks and keep only file scan tasks. + // Extract the Iceberg scan tasks. The caller decides which concrete task type is supported. val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks") tasksMethod.setAccessible(true) val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala - val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq + val icebergTasks = tasks.collect { case task: ScanTask => task }.toSeq - // If any task is not a FileScanTask, fallback. - if (fileTasks.size != tasks.size) { + if (icebergTasks.size != tasks.size) { return None } - Some(IcebergPartitionView(fileTasks)) + Some(IcebergPartitionView(icebergTasks)) } catch { case NonFatal(t) => logDebug(s"Failed to read Iceberg SparkInputPartition via reflection for $className.", t) @@ -227,6 +350,60 @@ object IcebergScanSupport extends Logging { } } + private def toNativeScanTask( + task: FileScanTask, + partitionSchema: StructType): IcebergNativeScanTask = { + val file = task.file() + IcebergNativeScanTask( + file.location(), + task.start(), + task.length(), + file.fileSizeInBytes(), + metadataPartitionValues(file.location(), file.specId(), None, partitionSchema)) + } + + private def toNativeScanTask( + task: AddedRowsScanTask, + partitionSchema: StructType): IcebergNativeScanTask = { + val file = task.file() + IcebergNativeScanTask( + file.location(), + task.start(), + task.length(), + file.fileSizeInBytes(), + metadataPartitionValues(file.location(), file.specId(), Some(task), partitionSchema)) + } + + private def metadataPartitionValues( + filePath: String, + specId: Int, + changelogTask: Option[ChangelogScanTask], + partitionSchema: StructType): Seq[Any] = { + def requiredChangelogTask(columnName: String): ChangelogScanTask = + changelogTask.getOrElse { + throw new IllegalStateException( + s"Iceberg changelog metadata column requires a changelog scan task: $columnName") + } + + partitionSchema.fields.map { field => + field.name match { + case name if name == MetadataColumns.FILE_PATH.name() => + filePath + case name if name == MetadataColumns.SPEC_ID.name() => + specId + case name if name == MetadataColumns.CHANGE_TYPE.name() => + requiredChangelogTask(name).operation().name() + case name if name == MetadataColumns.CHANGE_ORDINAL.name() => + requiredChangelogTask(name).changeOrdinal() + case name if name == MetadataColumns.COMMIT_SNAPSHOT_ID.name() => + requiredChangelogTask(name).commitSnapshotId() + case name => + throw new IllegalStateException( + s"unsupported Iceberg metadata column in native scan: $name") + } + } + } + private def collectPruningPredicates( scan: AnyRef, readSchema: StructType): Seq[pb.PhysicalExprNode] = { diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index d53aa690a..9782403ff 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -24,22 +24,22 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileSystem -import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} +import org.apache.iceberg.FileFormat import org.apache.spark.Partition import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} -import org.apache.spark.sql.auron.iceberg.IcebergScanPlan +import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal} import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import org.apache.auron.{protobuf => pb} @@ -63,7 +63,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private lazy val partitionSchema: StructType = plan.partitionSchema private lazy val projectableSchema: StructType = StructType(fileSchema.fields ++ partitionSchema.fields) - private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks + private lazy val scanTasks: Seq[IcebergNativeScanTask] = plan.scanTasks private lazy val pruningPredicates: Seq[pb.PhysicalExprNode] = plan.pruningPredicates private lazy val partitions: Array[FilePartition] = { @@ -72,7 +72,6 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca filePartitions } private lazy val fileSizes: Map[String, Long] = buildFileSizes() - private lazy val fileSpecIds: Map[String, Int] = buildFileSpecIds() private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) private lazy val nativePartitionSchema: pb.Schema = @@ -111,7 +110,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .setPath(filePath) .setSize(size) .setLastModifiedNs(0) - .addAllPartitionValues(metadataPartitionValues(filePath).asJava) + .addAllPartitionValues(metadataPartitionValues(file).asJava) .setRange( pb.FileRange .newBuilder() @@ -126,19 +125,12 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .build() } - private def metadataPartitionValues(filePath: String): Seq[pb.ScalarValue] = - partitionSchema.fields.map { field => - field.name match { - case name if name == MetadataColumns.FILE_PATH.name() => - NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral - case name if name == MetadataColumns.SPEC_ID.name() => - NativeConverters - .convertExpr(Literal.create(fileSpecIds(filePath), field.dataType)) - .getLiteral - case name => - throw new IllegalStateException( - s"unsupported Iceberg metadata column in native scan: $name") - } + private def metadataPartitionValues(file: PartitionedFile): Seq[pb.ScalarValue] = + partitionSchema.fields.zipWithIndex.map { case (field, index) => + NativeConverters + .convertExpr( + Literal.create(file.partitionValues.get(index, field.dataType), field.dataType)) + .getLiteral } override def doExecuteNative(): NativeRDD = { @@ -224,8 +216,8 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private def buildFileSizes(): Map[String, Long] = { // Map file path to full file size; tasks may split a file into multiple ranges. - fileTasks - .map(task => task.file().location() -> task.file().fileSizeInBytes()) + scanTasks + .map(task => task.location -> task.fileSizeInBytes) .groupBy(_._1) .mapValues(_.head._2) .toMap @@ -243,37 +235,21 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca Seq(metrics("numPartitions"), metrics("numFiles"))) } - private def buildFileSpecIds(): Map[String, Int] = { - // Map file path to Iceberg partition spec id; tasks may split a file into multiple ranges. - val specIds = scala.collection.mutable.HashMap.empty[String, Int] - fileTasks.foreach { task => - val filePath = task.file().location() - val specId = task.file().specId() - specIds.get(filePath) match { - case Some(existingSpecId) if existingSpecId != specId => - throw new IllegalStateException( - s"Inconsistent Iceberg partition spec id for file $filePath: " + - s"$existingSpecId != $specId") - case Some(_) => - case None => - specIds.put(filePath, specId) - } - } - specIds.toMap - } - private def buildFilePartitions(): Array[FilePartition] = { - // Convert Iceberg file tasks into Spark FilePartition groups for execution. - if (fileTasks.isEmpty) { + // Convert Iceberg scan tasks into Spark FilePartition groups for execution. + if (scanTasks.isEmpty) { return Array.empty } val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession - val maxSplitBytes = getMaxSplitBytes(sparkSession, fileTasks) - val partitionedFiles = fileTasks + val maxSplitBytes = getMaxSplitBytes(sparkSession, scanTasks) + val partitionedFiles = scanTasks .map { task => - val filePath = task.file().location() - Shims.get.getPartitionedFile(InternalRow.empty, filePath, task.start(), task.length()) + Shims.get.getPartitionedFile( + partitionValuesRow(task), + task.location, + task.start, + task.length) } .sortBy(_.length)(Ordering[Long].reverse) .toSeq @@ -281,12 +257,21 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray } - private def getMaxSplitBytes(sparkSession: SparkSession, tasks: Seq[FileScanTask]): Long = { + private def partitionValuesRow(task: IcebergNativeScanTask): InternalRow = { + val values = partitionSchema.fields.zip(task.partitionValues).map { case (field, value) => + Literal.create(value, field.dataType).eval() + } + new GenericInternalRow(values.toArray) + } + + private def getMaxSplitBytes( + sparkSession: SparkSession, + tasks: Seq[IcebergNativeScanTask]): Long = { val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes val minPartitionNum = Shims.get.getMinPartitionNum(sparkSession) val totalBytes = tasks - .map(task => task.file().fileSizeInBytes() + openCostInBytes) + .map(task => task.fileSizeInBytes + openCostInBytes) .sum val bytesPerCore = if (minPartitionNum > 0) totalBytes / minPartitionNum else totalBytes diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index e99dfbb82..dd19c5887 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import org.apache.iceberg.{FileFormat, FileScanTask} +import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} import org.apache.iceberg.data.{GenericAppenderFactory, Record} import org.apache.iceberg.deletes.PositionDelete import org.apache.iceberg.spark.Spark3Util @@ -294,6 +294,65 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan supports insert-only changelog scan") { + withTable("local.db.t_changelog_insert") { + withTempView("t_changelog_insert_changes") { + sql(""" + |create table local.db.t_changelog_insert (id int, v string) + |using iceberg + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_insert values (1, 'a')") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_insert") + sql("insert into local.db.t_changelog_insert values (2, 'b'), (3, 'c')") + val endSnapshotId = currentSnapshotId("local.db.t_changelog_insert") + createChangelogView( + "local.db.t_changelog_insert", + "t_changelog_insert_changes", + startSnapshotId, + endSnapshotId) + + val df = checkSparkAnswerAndOperator(""" + |select id, v, _change_type, _change_ordinal, _commit_snapshot_id + |from t_changelog_insert_changes + |order by id + |""".stripMargin) + val nativeScanPlan = icebergScanPlan(df) + assert(nativeScanPlan.nonEmpty) + assert( + nativeScanPlan.get.partitionSchema.fieldNames + .contains(MetadataColumns.CHANGE_TYPE.name())) + } + } + } + + test("iceberg changelog scan falls back when delete changes exist") { + withTable("local.db.t_changelog_delete") { + withTempView("t_changelog_delete_changes") { + sql(""" + |create table local.db.t_changelog_delete (id int, v string) + |using iceberg + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_delete values (1, 'a'), (2, 'b')") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_delete") + sql("delete from local.db.t_changelog_delete where id = 1") + val endSnapshotId = currentSnapshotId("local.db.t_changelog_delete") + createChangelogView( + "local.db.t_changelog_delete", + "t_changelog_delete_changes", + startSnapshotId, + endSnapshotId) + + withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") { + val df = sql("select * from t_changelog_delete_changes") + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + } + } + test("iceberg scan falls back when reading unsupported metadata columns") { withTable("local.db.t4_pos") { sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v") @@ -392,6 +451,27 @@ class AuronIcebergIntegrationSuite } } + private def createChangelogView( + tableName: String, + viewName: String, + startSnapshotId: Long, + endSnapshotId: Long): Unit = { + val tableIdent = tableName.stripPrefix("local.") + sql(s""" + |CALL local.system.create_changelog_view( + | table => '$tableIdent', + | changelog_view => '$viewName', + | options => map( + | 'start-snapshot-id', '$startSnapshotId', + | 'end-snapshot-id', '$endSnapshotId' + | ) + |) + |""".stripMargin) + } + + private def currentSnapshotId(tableName: String): Long = + Spark3Util.loadIcebergTable(spark, tableName).currentSnapshot().snapshotId() + private def checkSparkAnswerAndOperator(sqlText: String): DataFrame = { var expected: Seq[Row] = Nil withSQLConf("spark.auron.enable" -> "false") {