Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/paimon/core/operation/file_store_commit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/binary_row_partition_computer.h"
#include "paimon/core/core_options.h"
#include "paimon/core/table/bucket_mode.h"
#include "paimon/core/manifest/index_manifest_entry.h"
#include "paimon/core/manifest/index_manifest_file.h"
#include "paimon/core/manifest/manifest_file.h"
Expand Down Expand Up @@ -68,7 +69,17 @@ Result<std::unique_ptr<FileStoreCommit>> FileStoreCommit::Create(
const auto& schema = table_schema.value();
if (!schema->PrimaryKeys().empty() &&
ctx->GetOptions().find("enable-pk-commit-in-inte-test") == ctx->GetOptions().end()) {
return Status::NotImplemented("not support pk table commit yet");
// Postpone bucket mode (bucket=-2) writes data like an append table: all files go to
// bucket-postpone/ directory and the REST catalog server handles bucket redistribution during
// compaction. The commit logic is the same as append tables, so we allow it.
auto schema_opts = schema->Options();
auto bucket_it = schema_opts.find("bucket");
bool is_postpone_bucket =
bucket_it != schema_opts.end() &&
bucket_it->second == std::to_string(BucketModeDefine::POSTPONE_BUCKET);
if (!is_postpone_bucket) {
return Status::NotImplemented("not support pk table commit yet");
}
}
auto opts = schema->Options();
for (const auto& [key, value] : ctx->GetOptions()) {
Expand Down
54 changes: 54 additions & 0 deletions src/paimon/core/operation/file_store_commit_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1661,4 +1661,58 @@ TEST_F(FileStoreCommitImplTest, TestCommitWithIOException) {
ASSERT_TRUE(commit_run_complete);
}

// Verify that FileStoreCommit::Create succeeds for PK tables with postpone bucket mode (bucket=-2)
// without requiring the enable-pk-commit-in-inte-test workaround flag.
TEST_F(FileStoreCommitImplTest, TestPostponeBucketPKTableCommitAllowed) {
auto pk_dir = UniqueTestDirectory::Create();
ASSERT_TRUE(pk_dir);
std::string pk_root = pk_dir->Str();
ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {}));
ASSERT_OK(catalog->CreateDatabase("db", {}, false));

arrow::Schema pk_schema({arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())});
::ArrowSchema arrow_schema;
ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok());
std::map<std::string, std::string> table_options = {{"bucket", "-2"}};
ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl"), &arrow_schema,
/*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options,
/*ignore_if_exists=*/false));

std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl");

// Create FileStoreCommit WITHOUT the workaround flag — should succeed for postpone bucket
CommitContextBuilder builder(pk_table_path, "test_user");
builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true);
ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish());
ASSERT_OK_AND_ASSIGN(auto committer, FileStoreCommit::Create(std::move(commit_context)));
ASSERT_TRUE(committer != nullptr);
}

// Verify that FileStoreCommit::Create still rejects PK tables with fixed bucket (bucket > 0)
// when the workaround flag is not set.
TEST_F(FileStoreCommitImplTest, TestFixedBucketPKTableCommitRejected) {
auto pk_dir = UniqueTestDirectory::Create();
ASSERT_TRUE(pk_dir);
std::string pk_root = pk_dir->Str();
ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {}));
ASSERT_OK(catalog->CreateDatabase("db", {}, false));

arrow::Schema pk_schema({arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())});
::ArrowSchema arrow_schema;
ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok());
std::map<std::string, std::string> table_options = {{"bucket", "4"}};
ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl_fixed"), &arrow_schema,
/*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options,
/*ignore_if_exists=*/false));

std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl_fixed");

CommitContextBuilder builder(pk_table_path, "test_user");
builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true);
ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish());
auto result = FileStoreCommit::Create(std::move(commit_context));
ASSERT_FALSE(result.ok());
ASSERT_TRUE(result.status().ToString().find("not support pk table commit") != std::string::npos);
}

} // namespace paimon::test