diff --git a/src/paimon/core/operation/file_store_commit.cpp b/src/paimon/core/operation/file_store_commit.cpp index b2f87bf3..a11f12b2 100644 --- a/src/paimon/core/operation/file_store_commit.cpp +++ b/src/paimon/core/operation/file_store_commit.cpp @@ -23,6 +23,7 @@ #include "paimon/common/types/data_field.h" #include "paimon/common/utils/binary_row_partition_computer.h" #include "paimon/core/core_options.h" +#include "paimon/core/table/bucket_mode.h" #include "paimon/core/manifest/index_manifest_entry.h" #include "paimon/core/manifest/index_manifest_file.h" #include "paimon/core/manifest/manifest_file.h" @@ -68,7 +69,17 @@ Result> FileStoreCommit::Create( const auto& schema = table_schema.value(); if (!schema->PrimaryKeys().empty() && ctx->GetOptions().find("enable-pk-commit-in-inte-test") == ctx->GetOptions().end()) { - return Status::NotImplemented("not support pk table commit yet"); + // Postpone bucket mode (bucket=-2) writes data like an append table: all files go to + // bucket-postpone/ directory and the REST catalog server handles bucket redistribution during + // compaction. The commit logic is the same as append tables, so we allow it. + auto schema_opts = schema->Options(); + auto bucket_it = schema_opts.find("bucket"); + bool is_postpone_bucket = + bucket_it != schema_opts.end() && + bucket_it->second == std::to_string(BucketModeDefine::POSTPONE_BUCKET); + if (!is_postpone_bucket) { + return Status::NotImplemented("not support pk table commit yet"); + } } auto opts = schema->Options(); for (const auto& [key, value] : ctx->GetOptions()) { diff --git a/src/paimon/core/operation/file_store_commit_impl_test.cpp b/src/paimon/core/operation/file_store_commit_impl_test.cpp index 9cd07801..1008d843 100644 --- a/src/paimon/core/operation/file_store_commit_impl_test.cpp +++ b/src/paimon/core/operation/file_store_commit_impl_test.cpp @@ -1661,4 +1661,58 @@ TEST_F(FileStoreCommitImplTest, TestCommitWithIOException) { ASSERT_TRUE(commit_run_complete); } +// Verify that FileStoreCommit::Create succeeds for PK tables with postpone bucket mode (bucket=-2) +// without requiring the enable-pk-commit-in-inte-test workaround flag. +TEST_F(FileStoreCommitImplTest, TestPostponeBucketPKTableCommitAllowed) { + auto pk_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(pk_dir); + std::string pk_root = pk_dir->Str(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {})); + ASSERT_OK(catalog->CreateDatabase("db", {}, false)); + + arrow::Schema pk_schema({arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())}); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok()); + std::map table_options = {{"bucket", "-2"}}; + ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl"), &arrow_schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options, + /*ignore_if_exists=*/false)); + + std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl"); + + // Create FileStoreCommit WITHOUT the workaround flag — should succeed for postpone bucket + CommitContextBuilder builder(pk_table_path, "test_user"); + builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true); + ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto committer, FileStoreCommit::Create(std::move(commit_context))); + ASSERT_TRUE(committer != nullptr); +} + +// Verify that FileStoreCommit::Create still rejects PK tables with fixed bucket (bucket > 0) +// when the workaround flag is not set. +TEST_F(FileStoreCommitImplTest, TestFixedBucketPKTableCommitRejected) { + auto pk_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(pk_dir); + std::string pk_root = pk_dir->Str(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {})); + ASSERT_OK(catalog->CreateDatabase("db", {}, false)); + + arrow::Schema pk_schema({arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())}); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok()); + std::map table_options = {{"bucket", "4"}}; + ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl_fixed"), &arrow_schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options, + /*ignore_if_exists=*/false)); + + std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl_fixed"); + + CommitContextBuilder builder(pk_table_path, "test_user"); + builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true); + ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish()); + auto result = FileStoreCommit::Create(std::move(commit_context)); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().ToString().find("not support pk table commit") != std::string::npos); +} + } // namespace paimon::test