From c7ae9db3064167aeaf7b2ec77ffed920998c94bb Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Wed, 3 Jun 2026 23:53:02 +0800 Subject: [PATCH] Allow FileStoreCommit for PK tables with postpone bucket mode Postpone bucket mode (bucket=-2) writes data like an append table: all files go to bucket--2/ directory and the REST catalog server handles bucket redistribution during background compaction. The commit logic (manifest and snapshot generation) is identical to append tables, so there is no reason to block it. See: https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#postpone-bucket Co-Authored-By: Claude Opus 4.6 (1M context) --- .../core/operation/file_store_commit.cpp | 13 ++++- .../operation/file_store_commit_impl_test.cpp | 54 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/paimon/core/operation/file_store_commit.cpp b/src/paimon/core/operation/file_store_commit.cpp index b2f87bf30..a11f12b2f 100644 --- a/src/paimon/core/operation/file_store_commit.cpp +++ b/src/paimon/core/operation/file_store_commit.cpp @@ -23,6 +23,7 @@ #include "paimon/common/types/data_field.h" #include "paimon/common/utils/binary_row_partition_computer.h" #include "paimon/core/core_options.h" +#include "paimon/core/table/bucket_mode.h" #include "paimon/core/manifest/index_manifest_entry.h" #include "paimon/core/manifest/index_manifest_file.h" #include "paimon/core/manifest/manifest_file.h" @@ -68,7 +69,17 @@ Result> FileStoreCommit::Create( const auto& schema = table_schema.value(); if (!schema->PrimaryKeys().empty() && ctx->GetOptions().find("enable-pk-commit-in-inte-test") == ctx->GetOptions().end()) { - return Status::NotImplemented("not support pk table commit yet"); + // Postpone bucket mode (bucket=-2) writes data like an append table: all files go to + // bucket-postpone/ directory and the REST catalog server handles bucket redistribution during + // compaction. The commit logic is the same as append tables, so we allow it. + auto schema_opts = schema->Options(); + auto bucket_it = schema_opts.find("bucket"); + bool is_postpone_bucket = + bucket_it != schema_opts.end() && + bucket_it->second == std::to_string(BucketModeDefine::POSTPONE_BUCKET); + if (!is_postpone_bucket) { + return Status::NotImplemented("not support pk table commit yet"); + } } auto opts = schema->Options(); for (const auto& [key, value] : ctx->GetOptions()) { diff --git a/src/paimon/core/operation/file_store_commit_impl_test.cpp b/src/paimon/core/operation/file_store_commit_impl_test.cpp index 9cd078016..1008d843a 100644 --- a/src/paimon/core/operation/file_store_commit_impl_test.cpp +++ b/src/paimon/core/operation/file_store_commit_impl_test.cpp @@ -1661,4 +1661,58 @@ TEST_F(FileStoreCommitImplTest, TestCommitWithIOException) { ASSERT_TRUE(commit_run_complete); } +// Verify that FileStoreCommit::Create succeeds for PK tables with postpone bucket mode (bucket=-2) +// without requiring the enable-pk-commit-in-inte-test workaround flag. +TEST_F(FileStoreCommitImplTest, TestPostponeBucketPKTableCommitAllowed) { + auto pk_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(pk_dir); + std::string pk_root = pk_dir->Str(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {})); + ASSERT_OK(catalog->CreateDatabase("db", {}, false)); + + arrow::Schema pk_schema({arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())}); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok()); + std::map table_options = {{"bucket", "-2"}}; + ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl"), &arrow_schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options, + /*ignore_if_exists=*/false)); + + std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl"); + + // Create FileStoreCommit WITHOUT the workaround flag — should succeed for postpone bucket + CommitContextBuilder builder(pk_table_path, "test_user"); + builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true); + ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto committer, FileStoreCommit::Create(std::move(commit_context))); + ASSERT_TRUE(committer != nullptr); +} + +// Verify that FileStoreCommit::Create still rejects PK tables with fixed bucket (bucket > 0) +// when the workaround flag is not set. +TEST_F(FileStoreCommitImplTest, TestFixedBucketPKTableCommitRejected) { + auto pk_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(pk_dir); + std::string pk_root = pk_dir->Str(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {})); + ASSERT_OK(catalog->CreateDatabase("db", {}, false)); + + arrow::Schema pk_schema({arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())}); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok()); + std::map table_options = {{"bucket", "4"}}; + ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl_fixed"), &arrow_schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options, + /*ignore_if_exists=*/false)); + + std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl_fixed"); + + CommitContextBuilder builder(pk_table_path, "test_user"); + builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true); + ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish()); + auto result = FileStoreCommit::Create(std::move(commit_context)); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().ToString().find("not support pk table commit") != std::string::npos); +} + } // namespace paimon::test