From 6e9e33290b1ee07867270672cb5602bdc9334127 Mon Sep 17 00:00:00 2001 From: Arnav Balyan Date: Wed, 27 May 2026 19:44:19 +0530 Subject: [PATCH] update --- .../paimon/schema/SchemaValidation.java | 18 ++++++++++++- .../paimon/schema/SchemaValidationTest.java | 25 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 3d02ede61743..d1fbba5b620d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -63,6 +63,7 @@ import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR; +import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG; @@ -92,7 +93,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; -/** Validation utils for {@link TableSchema}. */ +/** Validation utilities for {@link TableSchema}. */ public class SchemaValidation { public static final List> PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES = @@ -145,6 +146,21 @@ public static void validateTableSchema(TableSchema schema) { ChangelogProducer.LOOKUP)); } + if (options.fullCompactionDeltaCommits() != null + && changelogProducer == ChangelogProducer.LOOKUP) { + throw new UnsupportedOperationException( + String.format( + "'%s' is incompatible with '%s'='%s'. " + + "Use '%s'='%s' to get periodic full compaction with changelog generation, " + + "or remove '%s'.", + FULL_COMPACTION_DELTA_COMMITS.key(), + CHANGELOG_PRODUCER.key(), + ChangelogProducer.LOOKUP, + CHANGELOG_PRODUCER.key(), + ChangelogProducer.FULL_COMPACTION, + FULL_COMPACTION_DELTA_COMMITS.key())); + } + checkArgument( options.snapshotNumRetainMin() > 0, SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index c3a79d91fdf1..f8ae460af8d0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -553,4 +553,29 @@ public void testMergeOnReadRequiresDvEnabled() { .hasMessageContaining( "deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true"); } + + @Test + public void testFullCompactionDeltaCommitsWithLookupChangelogProducer() { + Map options = new HashMap<>(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key()) + .hasMessageContaining("lookup") + .hasMessageContaining("full-compaction"); + + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + + options.clear(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + + options.clear(); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "input"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + } }