Skip to content
53 changes: 51 additions & 2 deletions src/paimon/core/catalog/file_system_catalog_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ TEST(FileSystemCatalogTest, TestMetadataSystemTableCatalog) {
/*ignore_if_exists=*/false));
ArrowSchemaRelease(&schema);

std::vector<std::string> metadata_tables = {"snapshots", "schemas", "tags", "branches",
"consumers"};
std::vector<std::string> metadata_tables = {"snapshots", "schemas", "tags", "branches",
"consumers", "manifests", "files"};
for (const auto& table_name : metadata_tables) {
Identifier system_identifier("db1", "tbl1$" + table_name);
ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(system_identifier));
Expand Down Expand Up @@ -363,6 +363,55 @@ TEST(FileSystemCatalogTest, TestMetadataSystemTableCatalog) {
(std::vector<std::string>{"consumer_id", "next_snapshot_id"}));
ASSERT_FALSE(consumers_arrow_schema->field(1)->nullable());

ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> manifests_schema,
catalog.LoadTableSchema(Identifier("db1", "tbl1$manifests")));
ASSERT_OK_AND_ASSIGN(auto manifests_c_schema, manifests_schema->GetArrowSchema());
auto manifests_arrow_schema = arrow::ImportSchema(manifests_c_schema.get()).ValueUnsafe();
ASSERT_EQ(manifests_arrow_schema->field_names(),
(std::vector<std::string>{"file_name", "file_size", "num_added_files",
"num_deleted_files", "schema_id", "min_partition_stats",
"max_partition_stats", "min_row_id", "max_row_id"}));
ASSERT_FALSE(manifests_arrow_schema->field(0)->nullable());
ASSERT_EQ(manifests_arrow_schema->field(1)->type()->id(), arrow::Type::INT64);
ASSERT_FALSE(manifests_arrow_schema->field(4)->nullable());
ASSERT_TRUE(manifests_arrow_schema->field(5)->nullable());
ASSERT_TRUE(manifests_arrow_schema->field(8)->nullable());

ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> files_schema,
catalog.LoadTableSchema(Identifier("db1", "tbl1$files")));
ASSERT_OK_AND_ASSIGN(auto files_c_schema, files_schema->GetArrowSchema());
auto files_arrow_schema = arrow::ImportSchema(files_c_schema.get()).ValueUnsafe();
ASSERT_EQ(files_arrow_schema->field_names(), (std::vector<std::string>{"partition",
"bucket",
"file_path",
"file_format",
"schema_id",
"level",
"record_count",
"file_size_in_bytes",
"min_key",
"max_key",
"null_value_counts",
"min_value_stats",
"max_value_stats",
"min_sequence_number",
"max_sequence_number",
"creation_time",
"deleteRowCount",
"file_source",
"first_row_id",
"write_cols"}));
ASSERT_TRUE(files_arrow_schema->field(0)->nullable());
ASSERT_FALSE(files_arrow_schema->field(1)->nullable());
ASSERT_FALSE(files_arrow_schema->field(2)->nullable());
ASSERT_FALSE(files_arrow_schema->field(10)->nullable());
ASSERT_EQ(files_arrow_schema->field(15)->type()->id(), arrow::Type::TIMESTAMP);
ASSERT_EQ(files_arrow_schema->field(19)->type()->id(), arrow::Type::LIST);
auto write_cols_type =
std::dynamic_pointer_cast<arrow::ListType>(files_arrow_schema->field(19)->type());
ASSERT_TRUE(write_cols_type);
ASSERT_EQ(write_cols_type->value_type()->id(), arrow::Type::STRING);

Identifier snapshots_identifier("db1", "tbl1$snapshots");
::ArrowSchema system_create_schema;
ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok());
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/table/system/in_memory_system_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class InMemorySystemTableBatchReader : public BatchReader {
emitted_ = true;
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> schema, table_->ArrowSchema());
PAIMON_ASSIGN_OR_RAISE(std::vector<GenericRow> rows, table_->BuildRows());
if (rows.empty()) {
return BatchReader::MakeEofBatch();
}
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<GenericRowToArrowArrayConverter> converter,
GenericRowToArrowArrayConverter::Create(schema, arrow_pool_.get()));
return converter->NextBatch(rows);
Expand Down
Loading
Loading