diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 8f7ea2eaaea9..f95f646823df 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -476,6 +476,12 @@ Boolean If true, enables the file-level pruning step for MergeInto partial column update on data-evolution tables. Set this to false when most files in the target partition are expected to be updated, so that the overhead of collecting touched file IDs outweighs the benefit of pruning untouched files. + +
data-evolution.merge-into.source-persist
+ false + Boolean + Whether to persist source when process merge into action on data evolution table. +
data-file.external-paths
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 03140e9ecc42..3bcff2eb1e76 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2249,6 +2249,13 @@ public InlineElement getDescription() { + "to be updated, so that the overhead of collecting touched file IDs " + "outweighs the benefit of pruning untouched files."); + public static final ConfigOption DATA_EVOLUTION_MERGE_INTO_SOURCE_PERSIST = + key("data-evolution.merge-into.source-persist") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to persist source when process merge into action on data evolution table."); + public static final ConfigOption BLOB_COMPACTION_ENABLED = key("blob-compaction.enabled") .booleanType() @@ -3759,6 +3766,10 @@ public boolean dataEvolutionMergeIntoFilePruning() { return options.get(DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING); } + public boolean dataEvolutionMergeIntoSourcePersist() { + return options.get(DATA_EVOLUTION_MERGE_INTO_SOURCE_PERSIST); + } + public boolean blobCompactionEnabled() { return options.get(BLOB_COMPACTION_ENABLED); } diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 9ce5b87386f4..58ec73b99874 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -184,30 +184,54 @@ case class MergeIntoPaimonDataEvolutionTable( map.toMap } - // step 1: find the related data splits, make it target file plan - val dataSplits: Seq[DataSplit] = - targetRelatedSplits(sparkSession, tableSplits, firstRowIds, firstRowIdToBlobFirstRowIds) - val touchedFileTargetRelation = - createNewScanPlan(dataSplits, targetRelation) - - // step 2: invoke update action - val updateCommit = - if (matchedActions.nonEmpty) { - val updateResult = - updateActionInvoke(dataSplits, sparkSession, touchedFileTargetRelation, firstRowIds) - checkUpdateResult(updateResult) - } else Nil - - // step 3: invoke insert action - val insertCommit = - if (notMatchedActions.nonEmpty) - insertActionInvoke(sparkSession, touchedFileTargetRelation) - else Nil - - if (plan.snapshotId() != null) { - writer.rowIdCheckConflict(plan.snapshotId()) + val persistSourceDss: Option[Dataset[Row]] = + if (table.coreOptions().dataEvolutionMergeIntoSourcePersist() && matchedActions.nonEmpty) { + val dss = createDataset(sparkSession, sourceTable) + dss.persist() + Some(dss) + } else { + None + } + + try { + // step 1: find the related data splits, make it target file plan + val dataSplits: Seq[DataSplit] = targetRelatedSplits( + sparkSession, + tableSplits, + firstRowIds, + firstRowIdToBlobFirstRowIds, + persistSourceDss) + val touchedFileTargetRelation = + createNewScanPlan(dataSplits, targetRelation) + + // step 2: invoke update action + val updateCommit = + if (matchedActions.nonEmpty) { + val updateResult = + updateActionInvoke( + dataSplits, + sparkSession, + touchedFileTargetRelation, + firstRowIds, + persistSourceDss) + checkUpdateResult(updateResult) + } else Nil + + // step 3: invoke insert action + val insertCommit = + if (notMatchedActions.nonEmpty) + insertActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) + else Nil + + if (plan.snapshotId() != null) { + writer.rowIdCheckConflict(plan.snapshotId()) + } + writer.commit(updateCommit ++ insertCommit) + } finally { + if (persistSourceDss.isDefined) { + persistSourceDss.get.unpersist(blocking = false) + } } - writer.commit(updateCommit ++ insertCommit) } private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader): Unit = { @@ -249,7 +273,8 @@ case class MergeIntoPaimonDataEvolutionTable( sparkSession: SparkSession, tableSplits: Seq[DataSplit], firstRowIds: immutable.IndexedSeq[Long], - firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = { + firstRowIdToBlobFirstRowIds: Map[Long, List[Long]], + persistSourceDss: Option[Dataset[Row]]): Seq[DataSplit] = { // Self-Merge shortcut: // In Self-Merge mode, every row in the table may be updated, so we scan all splits. if (isSelfMergeOnRowId) { @@ -263,7 +288,7 @@ case class MergeIntoPaimonDataEvolutionTable( return tableSplits } - val sourceDss = createDataset(sparkSession, sourceTable) + val sourceDss = persistSourceDss.getOrElse(createDataset(sparkSession, sourceTable)) val firstRowIdsTouched = extractSourceRowIdMapping match { case Some(sourceRowIdAttr) => @@ -300,7 +325,8 @@ case class MergeIntoPaimonDataEvolutionTable( dataSplits: Seq[DataSplit], sparkSession: SparkSession, touchedFileTargetRelation: DataSourceV2Relation, - firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = { + firstRowIds: immutable.IndexedSeq[Long], + persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( (o1, o2) => { @@ -423,7 +449,8 @@ case class MergeIntoPaimonDataEvolutionTable( val sourceTableProjExprs = allReadFieldsOnSource.toSeq :+ Alias(TrueLiteral, ROW_FROM_SOURCE)() - val sourceTableProj = Project(sourceTableProjExprs, sourceTable) + val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) + val sourceTableProj = Project(sourceTableProjExprs, sourceChild) val joinPlan = Join(targetTableProj, sourceTableProj, LeftOuter, Some(matchedCondition), JoinHint.NONE) @@ -466,16 +493,18 @@ case class MergeIntoPaimonDataEvolutionTable( private def insertActionInvoke( sparkSession: SparkSession, - touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = { + touchedFileTargetRelation: DataSourceV2Relation, + persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allReadFieldsOnTarget = mergeFields.filter(field => targetTable.output.exists(attr => attr.equals(field))) val targetReadPlan = touchedFileTargetRelation.copy(targetRelation.table, allReadFieldsOnTarget.toSeq) + val sourceReadPlan = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) val joinPlan = - Join(sourceTable, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE) + Join(sourceReadPlan, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE) // merge rows as there are multiple not matched actions val mergeRows = MergeRows( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index c92229c1d0e1..36881208ef15 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -189,30 +189,53 @@ case class MergeIntoPaimonDataEvolutionTable( map.toMap } - // step 1: find the related data splits, make it target file plan - val dataSplits: Seq[DataSplit] = - targetRelatedSplits(sparkSession, tableSplits, firstRowIds, firstRowIdToBlobFirstRowIds) - val touchedFileTargetRelation = - createNewScanPlan(dataSplits, targetRelation) - - // step 2: invoke update action - val updateCommit = - if (matchedActions.nonEmpty) { - val updateResult = - updateActionInvoke(dataSplits, sparkSession, touchedFileTargetRelation, firstRowIds) - checkUpdateResult(updateResult) - } else Nil - - // step 3: invoke insert action - val insertCommit = - if (notMatchedActions.nonEmpty) - insertActionInvoke(sparkSession, touchedFileTargetRelation) - else Nil - - if (plan.snapshotId() != null) { - writer.rowIdCheckConflict(plan.snapshotId()) + val persistSourceDss: Option[Dataset[Row]] = + if (table.coreOptions().dataEvolutionMergeIntoSourcePersist() && matchedActions.nonEmpty) { + val dss = createDataset(sparkSession, sourceTable) + dss.persist() + Some(dss) + } else { + None + } + + try { + // step 1: find the related data splits, make it target file plan + val dataSplits: Seq[DataSplit] = targetRelatedSplits( + sparkSession, + tableSplits, + firstRowIds, + firstRowIdToBlobFirstRowIds, + persistSourceDss) + val touchedFileTargetRelation = + createNewScanPlan(dataSplits, targetRelation) + + // step 2: invoke update action + val updateCommit = + if (matchedActions.nonEmpty) { + val updateResult = updateActionInvoke( + dataSplits, + sparkSession, + touchedFileTargetRelation, + firstRowIds, + persistSourceDss) + checkUpdateResult(updateResult) + } else Nil + + // step 3: invoke insert action + val insertCommit = + if (notMatchedActions.nonEmpty) + insertActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) + else Nil + + if (plan.snapshotId() != null) { + writer.rowIdCheckConflict(plan.snapshotId()) + } + writer.commit(updateCommit ++ insertCommit) + } finally { + if (persistSourceDss.isDefined) { + persistSourceDss.get.unpersist(blocking = false) + } } - writer.commit(updateCommit ++ insertCommit) } private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader): Unit = { @@ -254,7 +277,8 @@ case class MergeIntoPaimonDataEvolutionTable( sparkSession: SparkSession, tableSplits: Seq[DataSplit], firstRowIds: immutable.IndexedSeq[Long], - firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = { + firstRowIdToBlobFirstRowIds: Map[Long, List[Long]], + persistSourceDss: Option[Dataset[Row]]): Seq[DataSplit] = { // Self-Merge shortcut: // In Self-Merge mode, every row in the table may be updated, so we scan all splits. if (isSelfMergeOnRowId) { @@ -268,7 +292,7 @@ case class MergeIntoPaimonDataEvolutionTable( return tableSplits } - val sourceDss = createDataset(sparkSession, sourceTable) + val sourceDss = persistSourceDss.getOrElse(createDataset(sparkSession, sourceTable)) val firstRowIdsTouched = extractSourceRowIdMapping match { case Some(sourceRowIdAttr) => @@ -305,7 +329,8 @@ case class MergeIntoPaimonDataEvolutionTable( dataSplits: Seq[DataSplit], sparkSession: SparkSession, touchedFileTargetRelation: DataSourceV2Relation, - firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = { + firstRowIds: immutable.IndexedSeq[Long], + persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( (o1, o2) => { @@ -426,7 +451,8 @@ case class MergeIntoPaimonDataEvolutionTable( val sourceTableProjExprs = allReadFieldsOnSource.toSeq :+ Alias(TrueLiteral, ROW_FROM_SOURCE)() - val sourceTableProj = Project(sourceTableProjExprs, sourceTable) + val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) + val sourceTableProj = Project(sourceTableProjExprs, sourceChild) val joinPlan = Join(targetTableProj, sourceTableProj, LeftOuter, Some(matchedCondition), JoinHint.NONE) @@ -469,16 +495,18 @@ case class MergeIntoPaimonDataEvolutionTable( private def insertActionInvoke( sparkSession: SparkSession, - touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = { + touchedFileTargetRelation: DataSourceV2Relation, + persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allReadFieldsOnTarget = mergeFields.filter(field => targetTable.output.exists(attr => attr.equals(field))) val targetReadPlan = touchedFileTargetRelation.copy(targetRelation.table, allReadFieldsOnTarget.toSeq) + val sourceReadPlan = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) val joinPlan = - Join(sourceTable, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE) + Join(sourceReadPlan, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE) // merge rows as there are multiple not matched actions val mergeRows = MergeRows( diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkDataEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkDataEvolutionITCase.java new file mode 100644 index 000000000000..b61eaae0be8f --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkDataEvolutionITCase.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for data evolution on Spark. */ +public class SparkDataEvolutionITCase { + + private static TestHiveMetastore testHiveMetastore; + private static final int PORT = 9092; + + @BeforeAll + public static void startMetastore() { + testHiveMetastore = new TestHiveMetastore(); + testHiveMetastore.start(PORT); + } + + @AfterAll + public static void closeMetastore() throws Exception { + testHiveMetastore.stop(); + } + + private SparkSession.Builder createSparkSessionBuilder(Path warehousePath) { + return SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config("spark.sql.catalog.spark_catalog.warehouse", warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + } + + @Test + public void testDataEvolution(@TempDir java.nio.file.Path tempDir) throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config( + "spark.sql.catalog.spark_catalog.warehouse", + warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`data_evolution_test` (\n" + + " `id` BIGINT COMMENT 'id',\n" + + " `g_1_1` BIGINT COMMENT 'g_1_1',\n" + + " `g_1_2` BIGINT COMMENT 'g_1_2',\n" + + " `g_2_1` BIGINT COMMENT 'g_2_1',\n" + + " `g_2_2` BIGINT COMMENT 'g_2_2'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'file.compression' = 'snappy',\n" + + " 'manifest.compression' = 'snappy',\n" + + " 'row-tracking.enabled' = 'true',\n" + + " 'data-evolution.enabled' = 'true',\n" + + " 'data-evolution.merge-into.source-persist' = 'true',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'metastore.partitioned-table' = 'true'" + + " )"); + + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`data_evolution_source` (\n" + + " `id` BIGINT COMMENT 'id',\n" + + " `g_2_1` BIGINT COMMENT 'g_2_1',\n" + + " `g_2_2` BIGINT COMMENT 'g_2_2'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'file.compression' = 'snappy',\n" + + " 'manifest.compression' = 'snappy',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'metastore.partitioned-table' = 'true'" + + " )"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "insert overwrite table\n" + + " `my_db1`.`data_evolution_test` partition (dt='20260305')\n" + + "values (1, 1, 1, null, null),\n" + + " (1, 2, 1, null, null),\n" + + " (2, 1, 1, null, null);"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "insert overwrite table\n" + + " `my_db1`.`data_evolution_test` partition (dt='20260304')\n" + + "values (10, 10, 10, null, null),\n" + + " (10, 20, 10, null, null),\n" + + " (20, 10, 10, null, null)"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "insert overwrite table\n" + + " `my_db1`.`data_evolution_source` partition (dt='20260304')\n" + + "values (10, 20, 20),\n" + + " (40, 10, 10);"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "insert overwrite table\n" + + " `my_db1`.`data_evolution_source` partition (dt='20260305')\n" + + "values (1, 2, 2),\n" + + " (4, 1, 1);"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "MERGE INTO `my_db1`.`data_evolution_test` AS t\n" + + "USING `my_db1`.`data_evolution_source` AS s\n" + + "ON t.id = s.id\n" + + "AND t.dt = s.dt\n" + + "AND s.dt = '20260304'\n" + + "AND t.dt = '20260304'\n" + + " WHEN matched THEN\n" + + "UPDATE\n" + + "SET t.g_2_1 = s.g_2_1,\n" + + " t.g_2_2 = s.g_2_2;"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "MERGE INTO `my_db1`.`data_evolution_test` AS t\n" + + "USING `my_db1`.`data_evolution_source` AS s\n" + + "ON t.id = s.id\n" + + "AND t.dt = s.dt\n" + + "AND s.dt = '20260305'\n" + + "AND t.dt = '20260305'\n" + + " WHEN matched THEN\n" + + "UPDATE\n" + + "SET t.g_2_1 = s.g_2_1,\n" + + " t.g_2_2 = s.g_2_2;"); + spark.close(); + + spark = builder.getOrCreate(); + assertThat( + spark + .sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260304'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[10,10,20]", "[10,20,20]", "[20,10,null]"); + + long recordCount = + spark.sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260304' and g_1_1 = 10 and g_2_1 = 10") + .count(); + assertThat(recordCount).isEqualTo(0); + + assertThat( + spark + .sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260304' and g_1_1 = 10 and g_2_1 =20") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[10,10,20]"); + + assertThat( + spark + .sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260304' and (g_1_1 = 10 or g_2_1 =10)") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[10,10,20]", "[20,10,null]"); + + assertThat( + spark + .sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260304' and (g_1_1 = 10 or g_2_1 =20)") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[10,10,20]", "[10,20,20]", "[20,10,null]"); + spark.close(); + + spark = builder.getOrCreate(); + assertThat( + spark + .sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260305'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,2]", "[1,2,2]", "[2,1,null]"); + + recordCount = + spark.sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260305' and g_1_1 = 1 and g_2_1 =1") + .count(); + assertThat(recordCount).isEqualTo(0); + + assertThat( + spark + .sql( + "select id,g_1_1,g_2_1 from `my_db1`.`data_evolution_test` where dt='20260305' and g_1_1 = 1 and g_2_1 =2") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,2]"); + spark.close(); + + spark = builder.getOrCreate(); + /** Drop table */ + spark.sql("DROP TABLE IF EXISTS `my_db1`.`data_evolution_test`;"); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`data_evolution_source`;"); + spark.close(); + } +}