From 041a61fca7cf5835367adedbdd99e642198552c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 1 Jun 2026 17:38:28 +0800 Subject: [PATCH] [core] Validate schema after applying schema changes --- .../apache/paimon/schema/SchemaManager.java | 19 +++++---- .../paimon/schema/SchemaManagerTest.java | 41 +++++++++++++++++++ 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 95c6f04514db..24eb99570414 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -609,14 +609,17 @@ protected void updateLastColumn( applyRenameColumnsToOptions(newOptions, changes), newComment); - return new TableSchema( - oldTableSchema.id() + 1, - newSchema.fields(), - highestFieldId.get(), - newSchema.partitionKeys(), - newSchema.primaryKeys(), - newSchema.options(), - newSchema.comment()); + TableSchema newTableSchema = + new TableSchema( + oldTableSchema.id() + 1, + newSchema.fields(), + highestFieldId.get(), + newSchema.partitionKeys(), + newSchema.primaryKeys(), + newSchema.options(), + newSchema.comment()); + SchemaValidation.validateTableSchema(newTableSchema); + return newTableSchema; } // gets the rootType at the defined depth diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9a15cab36159..0c9470514c0a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -168,6 +168,47 @@ public void testUpdateOptions() throws Exception { assertThat(latest.get().options()).containsEntry("new_k", "new_v"); } + @Test + public void testResetSequenceGroupForAggregateFunction() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("fields.f2.aggregate-function", "sum"); + options.put("fields.f1.sequence-group", "f2"); + Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); + + retryArtificialException(() -> manager.createTable(schema)); + + assertThatThrownBy( + () -> + retryArtificialException( + () -> + manager.commitChanges( + SchemaChange.removeOption( + "fields.f1.sequence-group")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Must use sequence group for aggregation functions but not found for field f2."); + } + + @Test + public void testResetSequenceGroupForLastNonNullAggregateFunction() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("fields.f2.aggregate-function", "last_non_null_value"); + options.put("fields.f1.sequence-group", "f2"); + Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); + + retryArtificialException(() -> manager.createTable(schema)); + retryArtificialException( + () -> manager.commitChanges(SchemaChange.removeOption("fields.f1.sequence-group"))); + + Optional latest = retryArtificialException(() -> manager.latest()); + assertThat(latest.isPresent()).isTrue(); + assertThat(latest.get().options()).doesNotContainKey("fields.f1.sequence-group"); + } + @Test public void testConcurrentCommit() throws Exception { retryArtificialException(