Skip to content

[spark] support persist source data to avoid loading data repeatedly#8081

Open
Stefanietry wants to merge 1 commit into
apache:masterfrom
Stefanietry:support_persist_for_data_evolution
Open

[spark] support persist source data to avoid loading data repeatedly#8081
Stefanietry wants to merge 1 commit into
apache:masterfrom
Stefanietry:support_persist_for_data_evolution

Conversation

@Stefanietry
Copy link
Copy Markdown
Contributor

Purpose
Purpose: In the UpdateAction mode, it avoids redundant calculations during the process of computing dataSplits and performing join concatenation by persisting the source data.
Linked issue: #8080

Tests
Add SparkDataEvolutionITCase

@Stefanietry Stefanietry force-pushed the support_persist_for_data_evolution branch 2 times, most recently from 4e99876 to cdf7d4c Compare June 2, 2026 13:00
+ "outweighs the benefit of pruning untouched files.");

public static final ConfigOption<Boolean> DATA_EVOLUTION_DATA_SOURCE_PERSIST_ENABLED =
key("data-evolution.data.source.persist.enabled")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data-evolution.merge-into.source-persist

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, the conf has been modified as suggested.

@Stefanietry Stefanietry force-pushed the support_persist_for_data_evolution branch 4 times, most recently from a3b196c to abfcebd Compare June 3, 2026 06:15
@JingsongLi JingsongLi closed this Jun 3, 2026
@JingsongLi JingsongLi reopened this Jun 3, 2026
@Stefanietry Stefanietry closed this Jun 3, 2026
@Stefanietry Stefanietry reopened this Jun 3, 2026
@Stefanietry Stefanietry force-pushed the support_persist_for_data_evolution branch from abfcebd to 95e9f9b Compare June 3, 2026 09:32
+ " 'manifest.compression' = 'snappy',\n"
+ " 'row-tracking.enabled' = 'true',\n"
+ " 'data-evolution.enabled' = 'true',\n"
+ " 'data-evolution.data.source.persist.enabled' = 'true',\n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still uses the old option name. The PR adds data-evolution.merge-into.source-persist, so this table keeps the new option at its default false and the test never exercises the persist path. Please switch this to the new key.

val sourceTableProjExprs =
allReadFieldsOnSource.toSeq :+ Alias(TrueLiteral, ROW_FROM_SOURCE)()
val sourceTableProj = Project(sourceTableProjExprs, sourceTable)
val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only wires the cached source into the matched/update path. For a MERGE that has both matched and not-matched clauses, insertActionInvoke still builds its left-anti join from sourceTable, so the source is scanned again after the update path. Could you pass the persisted source into the insert path too, so the new option avoids repeated source loading for the whole merge action?

@JingsongLi
Copy link
Copy Markdown
Contributor

The Spark 4.0 implementation also needs the same change. paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala is a version-specific copy of this command, and it still creates sourceDss from sourceTable and joins sourceTable directly. As a result, data-evolution.merge-into.source-persist has no effect for the Spark 4.0 artifact unless this file is updated as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants