Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,21 @@

package org.apache.paimon.spark.sql

class RowTrackingTest extends RowTrackingTestBase {}
import org.apache.paimon.spark.SparkTable

import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations

class RowTrackingTest extends RowTrackingTestBase {

test("Row Tracking: Spark 3.5 keeps row-tracking tables on V1 DML path") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("t", "rt") {
sql("CREATE TABLE t (id INT, data INT)")
assert(SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])

sql("CREATE TABLE rt (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
assert(!SparkTable.of(loadTable("rt")).isInstanceOf[SupportsRowLevelOperations])
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,111 @@

package org.apache.paimon.spark.sql

class RowTrackingTest extends RowTrackingTestBase {}
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.schema.PaimonMetadataColumn

import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.types.Metadata

class RowTrackingTest extends RowTrackingTestBase {

test("Row Tracking: metadata columns expose Spark preserve flags") {
val rowIdMetadata = Metadata.fromJson(PaimonMetadataColumn.ROW_ID.metadataInJSON())
assert(rowIdMetadata.getBoolean("__preserve_on_delete"))
assert(rowIdMetadata.getBoolean("__preserve_on_update"))
assert(!rowIdMetadata.getBoolean("__preserve_on_reinsert"))

val sequenceNumberMetadata =
Metadata.fromJson(PaimonMetadataColumn.SEQUENCE_NUMBER.metadataInJSON())
assert(sequenceNumberMetadata.getBoolean("__preserve_on_delete"))
assert(!sequenceNumberMetadata.getBoolean("__preserve_on_update"))
assert(!sequenceNumberMetadata.getBoolean("__preserve_on_reinsert"))
}

test("Row Tracking: Spark 4.1 uses V2 copy-on-write for DML") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("s", "t") {
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
sql("INSERT INTO t VALUES (1, 1), (2, 2)")
sql("INSERT INTO t VALUES (3, 3), (4, 4)")

assertPlanContains("DELETE FROM t WHERE id = 2", "ReplaceData")
sql("DELETE FROM t WHERE id = 2")

assertPlanContains("UPDATE t SET data = 30 WHERE id = 3", "ReplaceData")
sql("UPDATE t SET data = 30 WHERE id = 3")

sql("CREATE TABLE s (id INT, data INT)")
sql("INSERT INTO s VALUES (3, 300), (5, 500)")
assertPlanContains(
"""
|MERGE INTO t
|USING s
|ON t.id = s.id
|WHEN MATCHED THEN UPDATE SET data = s.data
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin,
"ReplaceData"
)
sql("""
|MERGE INTO t
|USING s
|ON t.id = s.id
|WHEN MATCHED THEN UPDATE SET data = s.data
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin)

checkAnswer(
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(Row(1, 1, 0, 1), Row(3, 300, 2, 5), Row(4, 4, 3, 2), Row(5, 500, 4, 5))
)
}
}
}

test("Row Tracking: nested CHAR columns do not expose V2 row-level capability") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("t") {
sql("""
|CREATE TABLE t (
| id INT,
| info STRUCT<name: CHAR(3), data: INT>
|) TBLPROPERTIES ('row-tracking.enabled' = 'true')
|""".stripMargin)

assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
}
}
}

test("Row Tracking: Spark 4.1 restores metadata-only delete fast path") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("t") {
sql("""
|CREATE TABLE t (id INT, data INT, dt STRING)
|PARTITIONED BY (dt)
|TBLPROPERTIES ('row-tracking.enabled' = 'true')
|""".stripMargin)
sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p1'), (3, 3, 'p2')")

assertPlanContains("DELETE FROM t WHERE dt = 'p1'", "DeleteFromPaimonTableCommand")
sql("DELETE FROM t WHERE dt = 'p1'")

checkAnswer(
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(Row(3, 3, "p2", 0, 1))
)
}
}
}

private def assertPlanContains(sqlText: String, fragment: String): Unit = {
val plan = explain(sqlText)
assert(plan.contains(fragment), plan)
}

private def explain(sqlText: String): String = {
sql(s"EXPLAIN EXTENDED $sqlText").collect().map(_.getString(0)).mkString("\n")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.schema;

import org.apache.spark.sql.connector.catalog.MetadataColumn;

abstract class PaimonMetadataColumnBase implements MetadataColumn {

abstract boolean preserveOnDelete();

abstract boolean preserveOnUpdate();

abstract boolean preserveOnReinsert();

public String metadataInJSON() {
return "{\"__preserve_on_delete\":"
+ preserveOnDelete()
+ ",\"__preserve_on_update\":"
+ preserveOnUpdate()
+ ",\"__preserve_on_reinsert\":"
+ preserveOnReinsert()
+ "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.write;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.types.RowType;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;

import scala.Option;

/**
* Spark 4.x calls DataWriter.write(metadata, data) for metadata-aware writes. Keep this method in
* Java so the common sources still compile against Spark 3.5, where that interface method does not
* exist; Spark 4.x compilation generates the erased bridge required by the runtime call.
*/
public class PaimonV2MetadataAwareDataWriter extends PaimonV2DataWriter {

public PaimonV2MetadataAwareDataWriter(
BatchWriteBuilder writeBuilder,
StructType writeSchema,
StructType rowTrackingWriteSchema,
StructType dataSchema,
StructType metadataSchema,
CoreOptions coreOptions,
CatalogContext catalogContext,
RowType paimonWriteType) {
super(
writeBuilder,
rowTrackingWriteSchema,
dataSchema,
coreOptions,
catalogContext,
Option.empty(),
Option.apply(paimonWriteType),
Option.apply(metadataSchema),
Option.apply(writeSchema));
}

public void write(InternalRow metadata, InternalRow data) {
writeWithMetadata(metadata, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ import java.util.{EnumSet => JEnumSet, Set => JSet}
* If this base class implemented `SupportsRowLevelOperations`, Spark 4.1 would immediately call
* `newRowLevelOperationBuilder` on tables whose V2 write is disabled (e.g. dynamic bucket or
* primary-key tables that fall back to V1 write) and fail before Paimon has a chance to rewrite the
* plan to a V1 command. Likewise, deletion-vector, row-tracking, and data-evolution tables need to
* stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must also not expose
* `SupportsRowLevelOperations`.
* plan to a V1 command. Likewise, deletion-vector, data-evolution, and fixed-length CHAR tables
* need to stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must also not
* expose `SupportsRowLevelOperations`.
*
* Tables that DO support V2 row-level operations use the [[SparkTableWithRowLevelOps]] subclass
* instead; the [[SparkTable.of]] factory picks the right variant via
* [[SparkTable.supportsV2RowLevelOps]], which is kept in lockstep with
* `RowLevelHelper.shouldFallbackToV1`.
* [[SparkTable.supportsV2RowLevelOps]]. Append-only tables, including row-tracking-only tables,
* expose `SupportsRowLevelOperations` so DELETE, UPDATE, and MERGE INTO can go through the V2
* copy-on-write path when the table has no PK, deletion vectors, data evolution, or CHAR columns.
*/
case class SparkTable(override val table: Table) extends PaimonSparkTableBase(table)

Expand Down Expand Up @@ -93,12 +94,11 @@ object SparkTable {
* Whether the given table supports Paimon's V2 row-level operations, i.e. whether it is safe to
* expose [[SupportsRowLevelOperations]] to Spark.
*
* This must stay in sync with
* `org.apache.paimon.spark.catalyst.analysis.RowLevelHelper#shouldFallbackToV1` — the two
* predicates are logical complements. If they diverge, Spark 4.1's row-level rewrite rules (which
* fire in the main Resolution batch) will intercept DML on tables that Paimon expects to handle
* through its postHoc V1 fallback, leaving primary-key / deletion-vector / row-tracking /
* data-evolution tables with broken MERGE/UPDATE/DELETE dispatch.
* Append-only tables return `true` here so that `SparkTable.of` wraps them as
* `SparkTableWithRowLevelOps`, enabling Spark's V2 copy-on-write DELETE, UPDATE, and MERGE INTO
* paths. Row-tracking append-only tables require Spark 4.0+ because Spark 3.5 does not have the
* metadata-aware `DataWriter.write(metadata, data)` path needed to preserve row-tracking metadata
* for rewritten rows.
*
* Per-version shims for Spark 3.2/3.3/3.4 each ship their own
* `org.apache.paimon.spark.SparkTable` (class + companion) that shadows this one at packaging
Expand All @@ -113,10 +113,13 @@ object SparkTable {
if (!sparkTable.useV2Write) return false
sparkTable.getTable match {
case fs: FileStoreTable =>
val supportsRowTrackingCopyOnWrite =
!sparkTable.coreOptions.rowTrackingEnabled() || org.apache.spark.SPARK_VERSION >= "4.0"
fs.primaryKeys().isEmpty &&
supportsRowTrackingCopyOnWrite &&
!sparkTable.coreOptions.deletionVectorsEnabled() &&
!sparkTable.coreOptions.rowTrackingEnabled() &&
!sparkTable.coreOptions.dataEvolutionEnabled()
!sparkTable.coreOptions.dataEvolutionEnabled() &&
!SparkTypeUtils.containsCharType(fs.rowType())
Comment thread
kerwin-zk marked this conversation as resolved.
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) {
return SparkToPaimonTypeVisitor.visit(dataType);
}

public static boolean containsCharType(org.apache.paimon.types.DataType type) {
if (type instanceof CharType) {
return true;
} else if (type instanceof RowType) {
return ((RowType) type).getFields().stream()
.anyMatch(field -> containsCharType(field.type()));
} else if (type instanceof ArrayType) {
return containsCharType(((ArrayType) type).getElementType());
} else if (type instanceof MapType) {
MapType mapType = (MapType) type;
return containsCharType(mapType.getKeyType()) || containsCharType(mapType.getValueType());
} else if (type instanceof MultisetType) {
return containsCharType(((MultisetType) type).getElementType());
}
return false;
}

/**
* Prune Paimon `RowType` by required Spark `StructType`, use this method instead of {@link
* #toPaimonType(DataType)} when need to retain the field id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,14 @@ trait RowLevelHelper extends SQLConfHelper {
}
}

/**
* Determines if DataSourceV2 is not supported for the given table. This is the logical complement
* of [[SparkTable.supportsV2RowLevelOps]]; the two predicates must stay in sync so that Spark
* 4.1's row-level rewrite rules (which key on `SupportsRowLevelOperations`) and Paimon's V1
* postHoc fallback rules (which gate on this predicate) agree about which tables go down which
* path.
*/
protected def shouldFallbackToV1(table: SparkTable): Boolean = {
!SparkTable.supportsV2RowLevelOps(table)
}

// `SparkTable.supportsV2RowLevelOps` controls whether the table exposes Spark row-level
// capability at all. These per-operation checks are the remaining V1 fallbacks for cases Spark's
// V2 rewrite cannot safely handle: metadata-only DELETE, non-rewritable UPDATE/MERGE, or
// assignments that have not been aligned yet.
/** Determines if DataSourceV2 delete is not supported for the given table. */
protected def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = {
shouldFallbackToV1(table) ||
Expand All @@ -106,13 +103,6 @@ trait RowLevelHelper extends SQLConfHelper {
protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
val relation = PaimonRelation.getPaimonRelation(m.targetTable)
val table = relation.table.asInstanceOf[SparkTable]
// Note for Spark 4.1+: `shouldFallbackToV1(table)` returns `false` for pure append-only
// tables (no PK / RT / DE / DV), so this predicate lets the aligned `MergeIntoTable` node
// return untouched. Spark's own `RewriteMergeIntoTable` in the Resolution batch can't fire
// (`resolveOperators` short-circuits on `analyzed=true`), so the rewrite is performed by
// `Spark41MergeIntoRewrite` (paimon-spark4-common) which aligns + transcribes Spark's
// `ReplaceData` / `AppendData` branches for non-`SupportsDelta` sources. Non-append-only
// tables still fall back to V1 (`MergeIntoPaimonTable` / `MergeIntoPaimonDataEvolutionTable`).
shouldFallbackToV1(table) ||
!m.rewritable ||
!m.aligned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.paimon.spark.rowops

import org.apache.paimon.options.Options
import org.apache.paimon.spark.PaimonBaseScanBuilder
import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH_COLUMN, ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN}
import org.apache.paimon.spark.write.PaimonV2WriteBuilder
import org.apache.paimon.table.FileStoreTable

Expand Down Expand Up @@ -57,6 +57,11 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, info: RowLevelOpera
}

override def requiredMetadataAttributes(): Array[NamedReference] = {
Array(Expressions.column(FILE_PATH_COLUMN))
val base = Array(Expressions.column(FILE_PATH_COLUMN))
if (table.coreOptions().rowTrackingEnabled()) {
base ++ Array(Expressions.column(ROW_ID_COLUMN), Expressions.column(SEQUENCE_NUMBER_COLUMN))
} else {
base
}
}
}
Loading
Loading