Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 25 additions & 35 deletions src/paimon/core/operation/blob_file_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,51 +39,61 @@ BlobFileContext::BlobFileContext(std::set<std::string> descriptor_fields,

std::unique_ptr<BlobFileContext> BlobFileContext::Create(
const std::shared_ptr<arrow::Schema>& schema, const CoreOptions& options) {
// Check if there are any BLOB fields in the schema
bool has_blob = false;
// Collect the BLOB field names that are present in the given schema. The schema may
// only contain a subset of the table columns (e.g. a projected read/write schema), so all
// field categories below must be derived from this set rather than from the options
// alone, which describe the full table.
std::set<std::string> schema_blob_fields;
for (int i = 0; i < schema->num_fields(); ++i) {
if (BlobUtils::IsBlobField(schema->field(i))) {
has_blob = true;
break;
const auto& field = schema->field(i);
if (BlobUtils::IsBlobField(field)) {
schema_blob_fields.insert(field->name());
}
}
if (!has_blob) {
if (schema_blob_fields.empty()) {
return nullptr;
}

// Populate descriptor fields
std::set<std::string> descriptor_fields;
for (const auto& name : options.GetBlobDescriptorFields()) {
descriptor_fields.insert(name);
if (schema_blob_fields.count(name) > 0) {
descriptor_fields.insert(name);
}
}

// Populate view fields
std::set<std::string> view_fields;
for (const auto& name : options.GetBlobViewFields()) {
view_fields.insert(name);
if (schema_blob_fields.count(name) > 0) {
view_fields.insert(name);
}
}

// Populate inline fields from options (descriptor ∪ view)
std::set<std::string> inline_fields;
for (const auto& name : options.GetBlobInlineFields()) {
inline_fields.insert(name);
if (schema_blob_fields.count(name) > 0) {
inline_fields.insert(name);
}
}

// Populate external storage fields
std::set<std::string> external_storage_fields;
for (const auto& name : options.GetBlobExternalStorageFields()) {
external_storage_fields.insert(name);
if (schema_blob_fields.count(name) > 0) {
external_storage_fields.insert(name);
}
}

// Populate external storage path
std::optional<std::string> external_storage_path = options.GetBlobExternalStoragePath();

// Determine blob_file_fields: BLOB fields that are NOT inline
// Determine blob_file_fields: schema BLOB fields that are NOT inline
std::set<std::string> blob_file_fields;
for (int i = 0; i < schema->num_fields(); ++i) {
const auto& field = schema->field(i);
if (BlobUtils::IsBlobField(field) && inline_fields.count(field->name()) == 0) {
blob_file_fields.insert(field->name());
for (const auto& name : schema_blob_fields) {
if (inline_fields.count(name) == 0) {
blob_file_fields.insert(name);
}
}

Expand All @@ -93,26 +103,6 @@ std::unique_ptr<BlobFileContext> BlobFileContext::Create(
std::move(blob_file_fields), std::move(external_storage_path)));
}

bool BlobFileContext::IsInlineField(const std::string& field_name) const {
return inline_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsBlobFileField(const std::string& field_name) const {
return blob_file_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsDescriptorField(const std::string& field_name) const {
return descriptor_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsViewField(const std::string& field_name) const {
return view_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsExternalStorageField(const std::string& field_name) const {
return external_storage_fields_.count(field_name) > 0;
}

bool BlobFileContext::RequireBlobFileWriter() const {
return !blob_file_fields_.empty();
}
Expand Down
16 changes: 0 additions & 16 deletions src/paimon/core/operation/blob_file_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,6 @@ class BlobFileContext {
static std::unique_ptr<BlobFileContext> Create(const std::shared_ptr<arrow::Schema>& schema,
const CoreOptions& options);

/// Returns true if the given field should be stored inline in the main data file
/// (either as descriptor bytes or view bytes).
bool IsInlineField(const std::string& field_name) const;

/// Returns true if the given field should be written to a separate .blob file.
bool IsBlobFileField(const std::string& field_name) const;

/// Returns true if the given field is a descriptor field.
bool IsDescriptorField(const std::string& field_name) const;

/// Returns true if the given field is a view field.
bool IsViewField(const std::string& field_name) const;

/// Returns true if the given field should be written to external storage.
bool IsExternalStorageField(const std::string& field_name) const;

/// Returns true if there are any BLOB fields that need a .blob file writer.
bool RequireBlobFileWriter() const;

Expand Down
74 changes: 42 additions & 32 deletions src/paimon/core/operation/blob_file_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,6 @@ TEST_F(BlobFileContextTest, MixedInlineAndBlobFile) {
// blob file fields = non-inline blob fields
ASSERT_EQ(context->GetBlobFileFields(), std::set<std::string>({"video", "audio"}));

// Query methods
ASSERT_TRUE(context->IsInlineField("image"));
ASSERT_TRUE(context->IsDescriptorField("image"));
ASSERT_FALSE(context->IsViewField("image"));
ASSERT_FALSE(context->IsBlobFileField("image"));

ASSERT_FALSE(context->IsInlineField("video"));
ASSERT_TRUE(context->IsBlobFileField("video"));

ASSERT_FALSE(context->IsInlineField("audio"));
ASSERT_TRUE(context->IsBlobFileField("audio"));

// Requires blob file writer for video and audio
ASSERT_TRUE(context->RequireBlobFileWriter());
ASSERT_FALSE(context->RequireExternalStorageWriter());
Expand All @@ -126,9 +114,6 @@ TEST_F(BlobFileContextTest, ExternalStorageFields) {
ASSERT_EQ(context->GetExternalStoragePath(), "oss://bucket/blob/");
ASSERT_TRUE(context->GetBlobFileFields().empty());

ASSERT_TRUE(context->IsExternalStorageField("image"));
ASSERT_FALSE(context->IsExternalStorageField("video"));

ASSERT_FALSE(context->RequireBlobFileWriter());
ASSERT_TRUE(context->RequireExternalStorageWriter());
}
Expand All @@ -148,10 +133,6 @@ TEST_F(BlobFileContextTest, ViewFields) {
ASSERT_EQ(context->GetInlineFields(), std::set<std::string>({"ref_image"}));
ASSERT_EQ(context->GetBlobFileFields(), std::set<std::string>({"raw_blob"}));

ASSERT_TRUE(context->IsInlineField("ref_image"));
ASSERT_TRUE(context->IsViewField("ref_image"));
ASSERT_FALSE(context->IsDescriptorField("ref_image"));

ASSERT_TRUE(context->RequireBlobFileWriter());
ASSERT_FALSE(context->RequireExternalStorageWriter());
}
Expand All @@ -176,24 +157,53 @@ TEST_F(BlobFileContextTest, DescriptorAndViewTogether) {
ASSERT_EQ(context->GetExternalStoragePath(), "/tmp/ext/");
ASSERT_EQ(context->GetBlobFileFields(), std::set<std::string>({"normal_blob"}));

ASSERT_TRUE(context->IsDescriptorField("desc_blob"));
ASSERT_TRUE(context->IsExternalStorageField("desc_blob"));
ASSERT_TRUE(context->IsInlineField("desc_blob"));
ASSERT_FALSE(context->IsBlobFileField("desc_blob"));
ASSERT_TRUE(context->RequireBlobFileWriter());
ASSERT_TRUE(context->RequireExternalStorageWriter());
}

ASSERT_TRUE(context->IsViewField("view_blob"));
ASSERT_TRUE(context->IsInlineField("view_blob"));
ASSERT_FALSE(context->IsDescriptorField("view_blob"));
TEST_F(BlobFileContextTest, PartialSchemaIgnoresAbsentFields) {
// Schema only carries "image"; "video" and "audio" are not part of this write schema.
auto schema = MakeSchema({"id"}, {"image"});
std::map<std::string, std::string> opts_map = {
{Options::BLOB_DESCRIPTOR_FIELD, "image,audio"},
{Options::BLOB_VIEW_FIELD, "video"},
{Options::BLOB_EXTERNAL_STORAGE_FIELD, "image,video"},
{Options::BLOB_EXTERNAL_STORAGE_PATH, "oss://bucket/blob/"},
};
ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map));
auto context = BlobFileContext::Create(schema, options);
ASSERT_TRUE(context);

ASSERT_FALSE(context->IsInlineField("normal_blob"));
ASSERT_TRUE(context->IsBlobFileField("normal_blob"));
// Only "image" survives filtering; "audio" / "video" are not in the schema.
ASSERT_EQ(context->GetDescriptorFields(), std::set<std::string>({"image"}));
ASSERT_TRUE(context->GetViewFields().empty());
ASSERT_EQ(context->GetInlineFields(), std::set<std::string>({"image"}));
ASSERT_EQ(context->GetExternalStorageFields(), std::set<std::string>({"image"}));

// Non-existent field
ASSERT_FALSE(context->IsInlineField("not_exist"));
ASSERT_FALSE(context->IsBlobFileField("not_exist"));
// No non-inline blob field remains in the schema.
ASSERT_TRUE(context->GetBlobFileFields().empty());

ASSERT_TRUE(context->RequireBlobFileWriter());
ASSERT_FALSE(context->RequireBlobFileWriter());
ASSERT_TRUE(context->RequireExternalStorageWriter());
}

TEST_F(BlobFileContextTest, PartialSchemaWithOnlyBlobFileField) {
auto schema = MakeSchema({"id"}, {"audio"});
std::map<std::string, std::string> opts_map = {
{Options::BLOB_DESCRIPTOR_FIELD, "image"}, {Options::BLOB_VIEW_FIELD, "video"},
// "audio" is not configured as inline -> goes to .blob file
};
ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map));
auto context = BlobFileContext::Create(schema, options);
ASSERT_TRUE(context);

ASSERT_TRUE(context->GetDescriptorFields().empty());
ASSERT_TRUE(context->GetViewFields().empty());
ASSERT_TRUE(context->GetInlineFields().empty());
ASSERT_EQ(context->GetBlobFileFields(), std::set<std::string>({"audio"}));

ASSERT_TRUE(context->RequireBlobFileWriter());
ASSERT_FALSE(context->RequireExternalStorageWriter());
}

} // namespace paimon
Loading