diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala index 9f96840a7788..60bfd244b2de 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala @@ -18,4 +18,21 @@ package org.apache.paimon.spark.sql -class RowTrackingTest extends RowTrackingTestBase {} +import org.apache.paimon.spark.SparkTable + +import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations + +class RowTrackingTest extends RowTrackingTestBase { + + test("Row Tracking: Spark 3.5 keeps row-tracking tables on V1 DML path") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t", "rt") { + sql("CREATE TABLE t (id INT, data INT)") + assert(SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations]) + + sql("CREATE TABLE rt (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + assert(!SparkTable.of(loadTable("rt")).isInstanceOf[SupportsRowLevelOperations]) + } + } + } +} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala index 9f96840a7788..382aa1e77880 100644 --- a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala @@ -18,4 +18,111 @@ package org.apache.paimon.spark.sql -class RowTrackingTest extends RowTrackingTestBase {} +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.schema.PaimonMetadataColumn + +import org.apache.spark.sql.Row +import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations +import org.apache.spark.sql.types.Metadata + +class RowTrackingTest extends RowTrackingTestBase { + + test("Row Tracking: metadata columns expose Spark preserve flags") { + val rowIdMetadata = Metadata.fromJson(PaimonMetadataColumn.ROW_ID.metadataInJSON()) + assert(rowIdMetadata.getBoolean("__preserve_on_delete")) + assert(rowIdMetadata.getBoolean("__preserve_on_update")) + assert(!rowIdMetadata.getBoolean("__preserve_on_reinsert")) + + val sequenceNumberMetadata = + Metadata.fromJson(PaimonMetadataColumn.SEQUENCE_NUMBER.metadataInJSON()) + assert(sequenceNumberMetadata.getBoolean("__preserve_on_delete")) + assert(!sequenceNumberMetadata.getBoolean("__preserve_on_update")) + assert(!sequenceNumberMetadata.getBoolean("__preserve_on_reinsert")) + } + + test("Row Tracking: Spark 4.1 uses V2 copy-on-write for DML") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("s", "t") { + sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + sql("INSERT INTO t VALUES (1, 1), (2, 2)") + sql("INSERT INTO t VALUES (3, 3), (4, 4)") + + assertPlanContains("DELETE FROM t WHERE id = 2", "ReplaceData") + sql("DELETE FROM t WHERE id = 2") + + assertPlanContains("UPDATE t SET data = 30 WHERE id = 3", "ReplaceData") + sql("UPDATE t SET data = 30 WHERE id = 3") + + sql("CREATE TABLE s (id INT, data INT)") + sql("INSERT INTO s VALUES (3, 300), (5, 500)") + assertPlanContains( + """ + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET data = s.data + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin, + "ReplaceData" + ) + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET data = s.data + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(3, 300, 2, 5), Row(4, 4, 3, 2), Row(5, 500, 4, 5)) + ) + } + } + } + + test("Row Tracking: nested CHAR columns do not expose V2 row-level capability") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql(""" + |CREATE TABLE t ( + | id INT, + | info STRUCT + |) TBLPROPERTIES ('row-tracking.enabled' = 'true') + |""".stripMargin) + + assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations]) + } + } + } + + test("Row Tracking: Spark 4.1 restores metadata-only delete fast path") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql(""" + |CREATE TABLE t (id INT, data INT, dt STRING) + |PARTITIONED BY (dt) + |TBLPROPERTIES ('row-tracking.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p1'), (3, 3, 'p2')") + + assertPlanContains("DELETE FROM t WHERE dt = 'p1'", "DeleteFromPaimonTableCommand") + sql("DELETE FROM t WHERE dt = 'p1'") + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(3, 3, "p2", 0, 1)) + ) + } + } + } + + private def assertPlanContains(sqlText: String, fragment: String): Unit = { + val plan = explain(sqlText) + assert(plan.contains(fragment), plan) + } + + private def explain(sqlText: String): String = { + sql(s"EXPLAIN EXTENDED $sqlText").collect().map(_.getString(0)).mkString("\n") + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java new file mode 100644 index 000000000000..3f4cc090cd48 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.schema; + +import org.apache.spark.sql.connector.catalog.MetadataColumn; + +abstract class PaimonMetadataColumnBase implements MetadataColumn { + + abstract boolean preserveOnDelete(); + + abstract boolean preserveOnUpdate(); + + abstract boolean preserveOnReinsert(); + + public String metadataInJSON() { + return "{\"__preserve_on_delete\":" + + preserveOnDelete() + + ",\"__preserve_on_update\":" + + preserveOnUpdate() + + ",\"__preserve_on_reinsert\":" + + preserveOnReinsert() + + "}"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java new file mode 100644 index 000000000000..fd6786a885b3 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.types.RowType; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import scala.Option; + +/** + * Spark 4.x calls DataWriter.write(metadata, data) for metadata-aware writes. Keep this method in + * Java so the common sources still compile against Spark 3.5, where that interface method does not + * exist; Spark 4.x compilation generates the erased bridge required by the runtime call. + */ +public class PaimonV2MetadataAwareDataWriter extends PaimonV2DataWriter { + + public PaimonV2MetadataAwareDataWriter( + BatchWriteBuilder writeBuilder, + StructType writeSchema, + StructType rowTrackingWriteSchema, + StructType dataSchema, + StructType metadataSchema, + CoreOptions coreOptions, + CatalogContext catalogContext, + RowType paimonWriteType) { + super( + writeBuilder, + rowTrackingWriteSchema, + dataSchema, + coreOptions, + catalogContext, + Option.empty(), + Option.apply(paimonWriteType), + Option.apply(metadataSchema), + Option.apply(writeSchema)); + } + + public void write(InternalRow metadata, InternalRow data) { + writeWithMetadata(metadata, data); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index 0196ea6404ca..9ea20de1909d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -40,14 +40,15 @@ import java.util.{EnumSet => JEnumSet, Set => JSet} * If this base class implemented `SupportsRowLevelOperations`, Spark 4.1 would immediately call * `newRowLevelOperationBuilder` on tables whose V2 write is disabled (e.g. dynamic bucket or * primary-key tables that fall back to V1 write) and fail before Paimon has a chance to rewrite the - * plan to a V1 command. Likewise, deletion-vector, row-tracking, and data-evolution tables need to - * stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must also not expose - * `SupportsRowLevelOperations`. + * plan to a V1 command. Likewise, deletion-vector, data-evolution, and fixed-length CHAR tables + * need to stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must also not + * expose `SupportsRowLevelOperations`. * * Tables that DO support V2 row-level operations use the [[SparkTableWithRowLevelOps]] subclass * instead; the [[SparkTable.of]] factory picks the right variant via - * [[SparkTable.supportsV2RowLevelOps]], which is kept in lockstep with - * `RowLevelHelper.shouldFallbackToV1`. + * [[SparkTable.supportsV2RowLevelOps]]. Append-only tables, including row-tracking-only tables, + * expose `SupportsRowLevelOperations` so DELETE, UPDATE, and MERGE INTO can go through the V2 + * copy-on-write path when the table has no PK, deletion vectors, data evolution, or CHAR columns. */ case class SparkTable(override val table: Table) extends PaimonSparkTableBase(table) @@ -93,12 +94,11 @@ object SparkTable { * Whether the given table supports Paimon's V2 row-level operations, i.e. whether it is safe to * expose [[SupportsRowLevelOperations]] to Spark. * - * This must stay in sync with - * `org.apache.paimon.spark.catalyst.analysis.RowLevelHelper#shouldFallbackToV1` — the two - * predicates are logical complements. If they diverge, Spark 4.1's row-level rewrite rules (which - * fire in the main Resolution batch) will intercept DML on tables that Paimon expects to handle - * through its postHoc V1 fallback, leaving primary-key / deletion-vector / row-tracking / - * data-evolution tables with broken MERGE/UPDATE/DELETE dispatch. + * Append-only tables return `true` here so that `SparkTable.of` wraps them as + * `SparkTableWithRowLevelOps`, enabling Spark's V2 copy-on-write DELETE, UPDATE, and MERGE INTO + * paths. Row-tracking append-only tables require Spark 4.0+ because Spark 3.5 does not have the + * metadata-aware `DataWriter.write(metadata, data)` path needed to preserve row-tracking metadata + * for rewritten rows. * * Per-version shims for Spark 3.2/3.3/3.4 each ship their own * `org.apache.paimon.spark.SparkTable` (class + companion) that shadows this one at packaging @@ -113,10 +113,13 @@ object SparkTable { if (!sparkTable.useV2Write) return false sparkTable.getTable match { case fs: FileStoreTable => + val supportsRowTrackingCopyOnWrite = + !sparkTable.coreOptions.rowTrackingEnabled() || org.apache.spark.SPARK_VERSION >= "4.0" fs.primaryKeys().isEmpty && + supportsRowTrackingCopyOnWrite && !sparkTable.coreOptions.deletionVectorsEnabled() && - !sparkTable.coreOptions.rowTrackingEnabled() && - !sparkTable.coreOptions.dataEvolutionEnabled() + !sparkTable.coreOptions.dataEvolutionEnabled() && + !SparkTypeUtils.containsCharType(fs.rowType()) case _ => false } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java index c16f16a42990..80a27c35ac90 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java @@ -99,6 +99,23 @@ public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) { return SparkToPaimonTypeVisitor.visit(dataType); } + public static boolean containsCharType(org.apache.paimon.types.DataType type) { + if (type instanceof CharType) { + return true; + } else if (type instanceof RowType) { + return ((RowType) type).getFields().stream() + .anyMatch(field -> containsCharType(field.type())); + } else if (type instanceof ArrayType) { + return containsCharType(((ArrayType) type).getElementType()); + } else if (type instanceof MapType) { + MapType mapType = (MapType) type; + return containsCharType(mapType.getKeyType()) || containsCharType(mapType.getValueType()); + } else if (type instanceof MultisetType) { + return containsCharType(((MultisetType) type).getElementType()); + } + return false; + } + /** * Prune Paimon `RowType` by required Spark `StructType`, use this method instead of {@link * #toPaimonType(DataType)} when need to retain the field id. diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index 4bbdb8bbd8c2..da4394867611 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -76,17 +76,14 @@ trait RowLevelHelper extends SQLConfHelper { } } - /** - * Determines if DataSourceV2 is not supported for the given table. This is the logical complement - * of [[SparkTable.supportsV2RowLevelOps]]; the two predicates must stay in sync so that Spark - * 4.1's row-level rewrite rules (which key on `SupportsRowLevelOperations`) and Paimon's V1 - * postHoc fallback rules (which gate on this predicate) agree about which tables go down which - * path. - */ protected def shouldFallbackToV1(table: SparkTable): Boolean = { !SparkTable.supportsV2RowLevelOps(table) } + // `SparkTable.supportsV2RowLevelOps` controls whether the table exposes Spark row-level + // capability at all. These per-operation checks are the remaining V1 fallbacks for cases Spark's + // V2 rewrite cannot safely handle: metadata-only DELETE, non-rewritable UPDATE/MERGE, or + // assignments that have not been aligned yet. /** Determines if DataSourceV2 delete is not supported for the given table. */ protected def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = { shouldFallbackToV1(table) || @@ -106,13 +103,6 @@ trait RowLevelHelper extends SQLConfHelper { protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = { val relation = PaimonRelation.getPaimonRelation(m.targetTable) val table = relation.table.asInstanceOf[SparkTable] - // Note for Spark 4.1+: `shouldFallbackToV1(table)` returns `false` for pure append-only - // tables (no PK / RT / DE / DV), so this predicate lets the aligned `MergeIntoTable` node - // return untouched. Spark's own `RewriteMergeIntoTable` in the Resolution batch can't fire - // (`resolveOperators` short-circuits on `analyzed=true`), so the rewrite is performed by - // `Spark41MergeIntoRewrite` (paimon-spark4-common) which aligns + transcribes Spark's - // `ReplaceData` / `AppendData` branches for non-`SupportsDelta` sources. Non-append-only - // tables still fall back to V1 (`MergeIntoPaimonTable` / `MergeIntoPaimonDataEvolutionTable`). shouldFallbackToV1(table) || !m.rewritable || !m.aligned diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala index e1e2ddd4d9f1..24c3c19761c2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.rowops import org.apache.paimon.options.Options import org.apache.paimon.spark.PaimonBaseScanBuilder -import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN +import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH_COLUMN, ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN} import org.apache.paimon.spark.write.PaimonV2WriteBuilder import org.apache.paimon.table.FileStoreTable @@ -57,6 +57,11 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, info: RowLevelOpera } override def requiredMetadataAttributes(): Array[NamedReference] = { - Array(Expressions.column(FILE_PATH_COLUMN)) + val base = Array(Expressions.column(FILE_PATH_COLUMN)) + if (table.coreOptions().rowTrackingEnabled()) { + base ++ Array(Expressions.column(ROW_ID_COLUMN), Expressions.column(SEQUENCE_NUMBER_COLUMN)) + } else { + base + } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala index 34343f7380e4..8438d5b17581 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala @@ -24,11 +24,16 @@ import org.apache.paimon.types.DataField import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.connector.catalog.MetadataColumn import org.apache.spark.sql.types.{DataType, FloatType, IntegerType, LongType, StringType, StructField, StructType} -case class PaimonMetadataColumn(id: Int, override val name: String, override val dataType: DataType) - extends MetadataColumn { +case class PaimonMetadataColumn( + id: Int, + override val name: String, + override val dataType: DataType, + preserveOnDelete: Boolean = true, + preserveOnUpdate: Boolean = true, + preserveOnReinsert: Boolean = false) + extends PaimonMetadataColumnBase { def toPaimonDataField: DataField = { new DataField(id, name, SparkTypeUtils.toPaimonType(dataType)); @@ -79,7 +84,11 @@ object PaimonMetadataColumn { val ROW_ID: PaimonMetadataColumn = PaimonMetadataColumn(Int.MaxValue - 104, ROW_ID_COLUMN, LongType) val SEQUENCE_NUMBER: PaimonMetadataColumn = - PaimonMetadataColumn(Int.MaxValue - 105, SEQUENCE_NUMBER_COLUMN, LongType) + PaimonMetadataColumn( + Int.MaxValue - 105, + SEQUENCE_NUMBER_COLUMN, + LongType, + preserveOnUpdate = false) val VECTOR_SEARCH_SCORE: PaimonMetadataColumn = PaimonMetadataColumn(Integer.MAX_VALUE - 106, VECTOR_SEARCH_SCORE_COLUMN, FloatType) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala index d19f1a709646..42d2ebcd8590 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala @@ -19,11 +19,13 @@ package org.apache.paimon.spark.write import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} +import org.apache.paimon.spark.SparkTypeUtils import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.commands.SparkDataFileMeta import org.apache.paimon.spark.metric.SparkMetricRegistry import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan -import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, ROW_ID, SEQUENCE_NUMBER} +import org.apache.paimon.table.{FileStoreTable, SpecialFields} import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl} import org.apache.spark.sql.PaimonSparkSession @@ -67,18 +69,46 @@ abstract class PaimonBatchWriteBase( builder } + private val writeRowTracking: Boolean = + coreOptions.rowTrackingEnabled() && copyOnWriteScan.isDefined + + private lazy val rtPaimonWriteType = + SpecialFields.rowTypeWithRowTracking(table.rowType(), false, true) + + private lazy val rtWriteSchema = + SparkTypeUtils.fromPaimonRowType(rtPaimonWriteType) + + private lazy val rtMetadataSchema = + StructType(Seq(FILE_PATH, ROW_ID, SEQUENCE_NUMBER).map(_.toStructField)) + protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { (_: Int, _: Long) => { - PaimonV2DataWriter( - batchWriteBuilder, - writeSchema, - dataSchema, - coreOptions, - catalogContextForBlobDescriptor) + if (writeRowTracking) { + createPaimonMetadataAwareDataWriter() + } else { + PaimonV2DataWriter( + batchWriteBuilder, + writeSchema, + dataSchema, + coreOptions, + catalogContextForBlobDescriptor) + } } } + private def createPaimonMetadataAwareDataWriter(): PaimonV2DataWriter = { + new PaimonV2MetadataAwareDataWriter( + batchWriteBuilder, + writeSchema, + rtWriteSchema, + dataSchema, + rtMetadataSchema, + coreOptions, + catalogContextForBlobDescriptor, + rtPaimonWriteType) + } + protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = { commitStarted = true logInfo(s"Committing to table ${table.name()}") diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala index aa2dfcdf8f56..eadd056cf2a5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala @@ -23,8 +23,11 @@ import org.apache.paimon.catalog.CatalogContext import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils} import org.apache.paimon.spark.metric.SparkMetricRegistry import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, TableWriteImpl} +import org.apache.paimon.types.RowType +import org.apache.paimon.utils.IOUtils import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.types.StructType @@ -36,7 +39,10 @@ case class PaimonV2DataWriter( dataSchema: StructType, coreOptions: CoreOptions, catalogContext: CatalogContext, - batchId: Option[Long] = None) + batchId: Option[Long] = None, + paimonWriteType: Option[RowType] = None, + metadataSchema: Option[StructType] = None, + plainWriteSchema: Option[StructType] = None) extends abstractInnerTableDataWrite[InternalRow] with InnerTableV2DataWrite { @@ -46,35 +52,79 @@ case class PaimonV2DataWriter( val fullCompactionDeltaCommits: Option[Int] = Option.apply(coreOptions.fullCompactionDeltaCommits()) - val write: TableWriteImpl[InternalRow] = { - writeBuilder + private def createTableWrite(writeType: Option[RowType]): TableWriteImpl[InternalRow] = { + val w = writeBuilder .newWrite() .withIOManager(ioManager) .withMetricRegistry(metricRegistry) .asInstanceOf[TableWriteImpl[InternalRow]] + writeType.foreach(w.withWriteType) + w } - private val rowConverter: InternalRow => SparkInternalRowWrapper = { + val write: TableWriteImpl[InternalRow] = createTableWrite(paimonWriteType) + + private var plainWrite: Option[TableWriteImpl[InternalRow]] = None + + private def getPlainWrite: TableWriteImpl[InternalRow] = { + plainWrite.getOrElse { + val w = createTableWrite(None) + plainWrite = Some(w) + w + } + } + + private def createRowConverter( + writeSchema: StructType, + schema: StructType): InternalRow => SparkInternalRowWrapper = { val numFields = writeSchema.fields.length val reusableWrapper = - new SparkInternalRowWrapper(writeSchema, numFields, dataSchema, catalogContext) + new SparkInternalRowWrapper(writeSchema, numFields, schema, catalogContext) record => reusableWrapper.replace(record) } + private val rowConverter: InternalRow => SparkInternalRowWrapper = + createRowConverter(writeSchema, dataSchema) + + private val plainRowConverter: Option[InternalRow => SparkInternalRowWrapper] = + plainWriteSchema.map(schema => createRowConverter(schema, dataSchema)) + + private val metadataAwareRowConverter: Option[InternalRow => SparkInternalRowWrapper] = + metadataSchema.map( + schema => createRowConverter(writeSchema, StructType(dataSchema.fields ++ schema.fields))) + + private val joinedRow = new JoinedRow() + override def write(record: InternalRow): Unit = { - postWrite(write.writeAndReturn(rowConverter.apply(record))) + plainRowConverter match { + case Some(converter) => + postWrite(getPlainWrite.writeAndReturn(converter.apply(record))) + case _ => + postWrite(write.writeAndReturn(rowConverter.apply(record))) + } + } + + def writeWithMetadata(metadata: InternalRow, record: InternalRow): Unit = { + metadataAwareRowConverter match { + case Some(converter) => + postWrite(write.writeAndReturn(converter.apply(joinedRow(record, metadata)))) + case None => + write(record) + } } override def commitImpl(): Seq[CommitMessage] = { - write.prepareCommit().asScala.toSeq + val metadataMessages = write.prepareCommit().asScala.toSeq + val plainMessages = plainWrite.map(_.prepareCommit().asScala.toSeq).getOrElse(Seq.empty) + metadataMessages ++ plainMessages } override def abort(): Unit = close() override def close(): Unit = { try { - write.close() - ioManager.close() + val closeables = Seq[AutoCloseable](write) ++ plainWrite.toSeq ++ Seq(ioManager) + IOUtils.closeAll(closeables.asJava) } catch { case e: Exception => throw new RuntimeException(e) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 6728f8cb54e9..1da318e2825d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -334,6 +334,21 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar } } + test("Row Tracking: delete preserves row tracking metadata for update") { + withTable("t") { + sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") + + sql("DELETE FROM t WHERE id = 2") + sql("UPDATE t SET data = 33 WHERE _ROW_ID = 2") + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(3, 33, 2, 3)) + ) + } + } + test("Row Tracking: update table") { withTable("t") { // only enable row tracking diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala index c005bd40c717..4fdf2bafc02f 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.{SparkTable, SparkTypeUtils} import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -26,14 +26,13 @@ import org.apache.spark.sql.execution.datasources.v2.ExtractV2Table /** * Shared scope predicates for the Spark 4.1 Resolution-batch row-level rewrite rules - * ([[Spark41AppendOnlyRowLevelRewrite]] for UPDATE + metadata-only DELETE reverse-optimization, + * ([[Spark41UpdateTableRewrite]] for UPDATE + metadata-only DELETE reverse-optimization, * [[Spark41MergeIntoRewrite]] for MERGE). * - * Both rules only intercept operations against **pure append-only** Paimon tables: no primary key, - * row tracking, data evolution, deletion vectors, or fixed-length `CHAR(n)` columns. Tables that - * violate any of these constraints either have a working V2 rewrite path on 4.1 (PK / DV / RT / DE - * go through Paimon's own postHoc V1 commands) or race with Spark's `CharVarcharCodegenUtils` - * padding Project (CHAR columns — see [[hasCharColumn]]). + * These rules only intercept operations against Paimon tables that are valid for Spark's V2 + * copy-on-write rewrite: no primary key, data evolution, deletion vectors, or fixed-length + * `CHAR(n)` columns. Row-tracking-only tables are included; tables that violate any of these + * constraints go through Paimon's postHoc V1 commands or Spark's built-in analysis path. * * Kept as a mix-in trait so the two rewrite objects stay single-responsibility (one rule per Spark * row-level command, mirroring Spark's own `RewriteUpdateTable` / `RewriteMergeIntoTable` layout) @@ -41,36 +40,25 @@ import org.apache.spark.sql.execution.datasources.v2.ExtractV2Table */ trait PureAppendOnlyScope { - /** - * Whether the target of a row-level operation is a pure append-only Paimon table that Spark 4.1's - * built-in rewrite rules can't handle (see the two rule class docs for why). - */ - protected def targetsPureAppendOnly(aliasedTable: LogicalPlan): Boolean = { + protected def targetsV2CopyOnWriteTable(aliasedTable: LogicalPlan): Boolean = { + targetsPaimonFileStoreTable(aliasedTable) { + case (sparkTable, fs) => + fs.primaryKeys().isEmpty && + !sparkTable.coreOptions.dataEvolutionEnabled() && + !sparkTable.coreOptions.deletionVectorsEnabled() && + !SparkTypeUtils.containsCharType(fs.rowType()) + } + } + + private def targetsPaimonFileStoreTable(aliasedTable: LogicalPlan)( + predicate: (SparkTable, FileStoreTable) => Boolean): Boolean = { EliminateSubqueryAliases(aliasedTable) match { case ExtractV2Table(sparkTable: SparkTable) => sparkTable.getTable match { - case fs: FileStoreTable => - fs.primaryKeys().isEmpty && - !sparkTable.coreOptions.rowTrackingEnabled() && - !sparkTable.coreOptions.dataEvolutionEnabled() && - !sparkTable.coreOptions.deletionVectorsEnabled() && - !hasCharColumn(fs) + case fs: FileStoreTable => predicate(sparkTable, fs) case _ => false } case _ => false } } - - /** - * Tables with fixed-length `CHAR(n)` columns go through Spark's - * `CharVarcharCodegenUtils.readSidePadding` Project that gets inserted between the - * `DataSourceV2Relation` and its consumers. If we intercept before that padding project settles, - * CheckAnalysis trips on mismatched attribute ids (see PR 7648 history). Let those plans fall - * through to Paimon's postHoc V1 fallback rules which run after the padding project stabilizes. - */ - protected def hasCharColumn(fs: FileStoreTable): Boolean = { - import org.apache.paimon.types.CharType - import scala.collection.JavaConverters._ - fs.rowType().getFields.asScala.exists(_.`type`().isInstanceOf[CharType]) - } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala index 0efbe6a8bb96..d21bc8098e49 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala @@ -42,8 +42,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation * fast path that a `DeleteFromPaimonTableCommand` would enable. * * This rule pattern-matches the `ReplaceData` Spark produced (tagged with - * `RowLevelOperation.Command.DELETE`) and, if the target is a pure append-only Paimon table (see - * [[PureAppendOnlyScope]]) and the predicate is metadata-only, rewrites back to + * `RowLevelOperation.Command.DELETE`) and, if the target is a Paimon table eligible for V2 + * copy-on-write (see [[PureAppendOnlyScope]]) and the predicate is metadata-only, rewrites back to * `DeleteFromPaimonTableCommand`. Non-metadata-only DELETE is left alone (Spark's `ReplaceData` is * correct for data deletes). This is **not** a rewrite of `DeleteFromTable` — it's a restoration * layered on top of Spark's existing rewrite output, hence the `…Restore` naming rather than @@ -64,8 +64,8 @@ object Spark41DeleteMetadataRestore extends RewriteRowLevelCommand with PureAppe } /** - * Whether a `ReplaceData` node (Spark 4.1's post-rewrite DELETE form) targets a pure append-only - * Paimon table with a metadata-only predicate, such that converting back to + * Whether a `ReplaceData` node (Spark 4.1's post-rewrite DELETE form) targets a Paimon table + * eligible for V2 copy-on-write with a metadata-only predicate, such that converting back to * `DeleteFromPaimonTableCommand` would let the optimizer fold to `TruncatePaimonTableWithFilter`. */ private def isMetadataOnlyDeleteOnAppendOnlyPaimon(rd: ReplaceData): Boolean = { @@ -78,7 +78,7 @@ object Spark41DeleteMetadataRestore extends RewriteRowLevelCommand with PureAppe case _ => false } writeIsDelete && (rd.originalTable match { - case r: DataSourceV2Relation if targetsPureAppendOnly(r) => + case r: DataSourceV2Relation if targetsV2CopyOnWriteTable(r) => r.table match { case spk: SparkTable => spk.getTable match { diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala index 0b2e6c607e17..f3dd21f15f45 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala @@ -38,9 +38,9 @@ import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.util.CaseInsensitiveStringMap /** - * Spark 4.1-only Resolution-batch rule that rewrites MERGE INTO on pure append-only Paimon tables - * (no PK / RT / DE / DV) into V2 `ReplaceData` / `AppendData` plans, mirroring Spark's built-in - * `RewriteMergeIntoTable` for non-`SupportsDelta` row-level tables. + * Spark 4.1-only Resolution-batch rule that rewrites MERGE INTO on Paimon tables eligible for V2 + * copy-on-write (no PK / DE / DV / CHAR) into V2 `ReplaceData` / `AppendData` plans, mirroring + * Spark's built-in `RewriteMergeIntoTable` for non-`SupportsDelta` row-level tables. * * In Spark 4.1, `RewriteMergeIntoTable` runs in the Resolution batch via `resolveOperators`, which * short-circuits on `analyzed=true` plans — by the time it would fire, the `MergeIntoTable` is @@ -52,9 +52,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * We fire before `ResolveAssignments`, so `m.aligned` is `false`. The rule pre-aligns each action * list via `PaimonAssignmentUtils.alignActions` (shared with the postHoc `PaimonMergeInto` rule). * - * CHAR columns are excluded — `readSidePadding` races with the rewrite and trips CheckAnalysis; - * those plans fall back to the postHoc `PaimonMergeInto` V1 path, which also owns PK / RT / DE / DV - * tables via `RowLevelHelper.shouldFallbackToV1MergeInto`. + * Row-tracking-only tables use the same V2 copy-on-write rewrite. CHAR columns are excluded — + * `readSidePadding` races with the rewrite and trips CheckAnalysis; those plans fall back to the + * postHoc `PaimonMergeInto` V1 path, which also owns PK / DE / DV tables via + * `RowLevelHelper.shouldFallbackToV1MergeInto`. */ object Spark41MergeIntoRewrite extends RewriteRowLevelCommand @@ -71,7 +72,7 @@ object Spark41MergeIntoRewrite plan.transformDown { case m: MergeIntoTable if m.resolved && m.rewritable && !m.needSchemaEvolution && - targetsPureAppendOnly(m.targetTable) => + targetsV2CopyOnWriteTable(m.targetTable) => // Pure append-only tables skip postHoc `PaimonMergeInto`, so evolve schema here. val evolved = evolveSchemaIfPaimon(m) rewrite(alignAllMergeActions(evolved, evolved.targetTable.output)) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala index d8082b592e08..97edbdc780c3 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala @@ -47,8 +47,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * We fire before `ResolveAssignments`, so `u.aligned` is `false`; the rule pre-aligns via * `PaimonAssignmentUtils.alignUpdateAssignments` before building the plan. * - * PK tables go through the postHoc rule; RT / DE / DV tables go through Spark's V2 path. DELETE is - * handled by [[Spark41DeleteMetadataRestore]]; MERGE by [[Spark41MergeIntoRewrite]]. + * Row-tracking-only tables use the same V2 copy-on-write rewrite. PK / DE / DV tables go through + * the postHoc V1 rule because they do not expose `SupportsRowLevelOperations`. DELETE is handled by + * [[Spark41DeleteMetadataRestore]]; MERGE by [[Spark41MergeIntoRewrite]]. */ object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with PureAppendOnlyScope { @@ -57,7 +58,7 @@ object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with PureAppendO AnalysisHelper.allowInvokingTransformsInAnalyzer { plan.transformDown { case u @ UpdateTable(aliasedTable, assignments, cond) - if u.resolved && u.rewritable && targetsPureAppendOnly(aliasedTable) => + if u.resolved && u.rewritable && targetsV2CopyOnWriteTable(aliasedTable) => EliminateSubqueryAliases(aliasedTable) match { case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) => val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty())