diff --git a/src/paimon/core/catalog/file_system_catalog_test.cpp b/src/paimon/core/catalog/file_system_catalog_test.cpp index fce73d531..da510b089 100644 --- a/src/paimon/core/catalog/file_system_catalog_test.cpp +++ b/src/paimon/core/catalog/file_system_catalog_test.cpp @@ -300,8 +300,8 @@ TEST(FileSystemCatalogTest, TestMetadataSystemTableCatalog) { /*ignore_if_exists=*/false)); ArrowSchemaRelease(&schema); - std::vector metadata_tables = {"snapshots", "schemas", "tags", "branches", - "consumers"}; + std::vector metadata_tables = {"snapshots", "schemas", "tags", "branches", + "consumers", "manifests", "files"}; for (const auto& table_name : metadata_tables) { Identifier system_identifier("db1", "tbl1$" + table_name); ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(system_identifier)); @@ -363,6 +363,55 @@ TEST(FileSystemCatalogTest, TestMetadataSystemTableCatalog) { (std::vector{"consumer_id", "next_snapshot_id"})); ASSERT_FALSE(consumers_arrow_schema->field(1)->nullable()); + ASSERT_OK_AND_ASSIGN(std::shared_ptr manifests_schema, + catalog.LoadTableSchema(Identifier("db1", "tbl1$manifests"))); + ASSERT_OK_AND_ASSIGN(auto manifests_c_schema, manifests_schema->GetArrowSchema()); + auto manifests_arrow_schema = arrow::ImportSchema(manifests_c_schema.get()).ValueUnsafe(); + ASSERT_EQ(manifests_arrow_schema->field_names(), + (std::vector{"file_name", "file_size", "num_added_files", + "num_deleted_files", "schema_id", "min_partition_stats", + "max_partition_stats", "min_row_id", "max_row_id"})); + ASSERT_FALSE(manifests_arrow_schema->field(0)->nullable()); + ASSERT_EQ(manifests_arrow_schema->field(1)->type()->id(), arrow::Type::INT64); + ASSERT_FALSE(manifests_arrow_schema->field(4)->nullable()); + ASSERT_TRUE(manifests_arrow_schema->field(5)->nullable()); + ASSERT_TRUE(manifests_arrow_schema->field(8)->nullable()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr files_schema, + catalog.LoadTableSchema(Identifier("db1", "tbl1$files"))); + ASSERT_OK_AND_ASSIGN(auto files_c_schema, files_schema->GetArrowSchema()); + auto files_arrow_schema = arrow::ImportSchema(files_c_schema.get()).ValueUnsafe(); + ASSERT_EQ(files_arrow_schema->field_names(), (std::vector{"partition", + "bucket", + "file_path", + "file_format", + "schema_id", + "level", + "record_count", + "file_size_in_bytes", + "min_key", + "max_key", + "null_value_counts", + "min_value_stats", + "max_value_stats", + "min_sequence_number", + "max_sequence_number", + "creation_time", + "deleteRowCount", + "file_source", + "first_row_id", + "write_cols"})); + ASSERT_TRUE(files_arrow_schema->field(0)->nullable()); + ASSERT_FALSE(files_arrow_schema->field(1)->nullable()); + ASSERT_FALSE(files_arrow_schema->field(2)->nullable()); + ASSERT_FALSE(files_arrow_schema->field(10)->nullable()); + ASSERT_EQ(files_arrow_schema->field(15)->type()->id(), arrow::Type::TIMESTAMP); + ASSERT_EQ(files_arrow_schema->field(19)->type()->id(), arrow::Type::LIST); + auto write_cols_type = + std::dynamic_pointer_cast(files_arrow_schema->field(19)->type()); + ASSERT_TRUE(write_cols_type); + ASSERT_EQ(write_cols_type->value_type()->id(), arrow::Type::STRING); + Identifier snapshots_identifier("db1", "tbl1$snapshots"); ::ArrowSchema system_create_schema; ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok()); diff --git a/src/paimon/core/table/system/in_memory_system_table.cpp b/src/paimon/core/table/system/in_memory_system_table.cpp index fff2b0dd0..7fbd5c884 100644 --- a/src/paimon/core/table/system/in_memory_system_table.cpp +++ b/src/paimon/core/table/system/in_memory_system_table.cpp @@ -46,6 +46,9 @@ class InMemorySystemTableBatchReader : public BatchReader { emitted_ = true; PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, table_->ArrowSchema()); PAIMON_ASSIGN_OR_RAISE(std::vector rows, table_->BuildRows()); + if (rows.empty()) { + return BatchReader::MakeEofBatch(); + } PAIMON_ASSIGN_OR_RAISE(std::unique_ptr converter, GenericRowToArrowArrayConverter::Create(schema, arrow_pool_.get())); return converter->NextBatch(rows); diff --git a/src/paimon/core/table/system/metadata_system_tables.cpp b/src/paimon/core/table/system/metadata_system_tables.cpp index 23107b8f2..c63d08072 100644 --- a/src/paimon/core/table/system/metadata_system_tables.cpp +++ b/src/paimon/core/table/system/metadata_system_tables.cpp @@ -24,22 +24,47 @@ #include #include #include +#include #include #include +#include "fmt/format.h" +#include "fmt/ranges.h" #include "paimon/common/data/binary_string.h" +#include "paimon/common/data/data_define.h" #include "paimon/common/data/generic_row.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/binary_row_partition_computer.h" #include "paimon/common/utils/date_time_utils.h" +#include "paimon/common/utils/internal_row_utils.h" +#include "paimon/common/utils/object_utils.h" +#include "paimon/common/utils/path_util.h" #include "paimon/common/utils/rapidjson_util.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_entry.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_file_meta.h" +#include "paimon/core/manifest/manifest_list.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" #include "paimon/core/snapshot.h" +#include "paimon/core/stats/simple_stats_evolution.h" #include "paimon/core/tag/tag.h" #include "paimon/core/utils/branch_manager.h" #include "paimon/core/utils/consumer_manager.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" #include "paimon/core/utils/snapshot_manager.h" #include "paimon/core/utils/tag_manager.h" +#include "paimon/data/timestamp.h" #include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" #include "paimon/status.h" #include "rapidjson/document.h" #include "rapidjson/stringbuffer.h" @@ -48,6 +73,8 @@ namespace paimon { namespace { +constexpr int32_t kMaxPartitionStatsLength = 255; + template Result JsonString(const T& value) { rapidjson::Document document; @@ -150,12 +177,262 @@ VariantType OptionalTimestampMillisValue(const std::optional& value) { MetadataSystemTableContext CreateMetadataContext(std::shared_ptr fs, std::string table_path, std::string branch) { return { - std::move(fs), - std::move(table_path), - BranchManager::NormalizeBranch(branch), + std::move(fs), std::move(table_path), BranchManager::NormalizeBranch(branch), nullptr, {}, + }; +} + +MetadataSystemTableContext CreateMetadataContext(std::shared_ptr fs, + std::string table_path, std::string branch, + std::shared_ptr table_schema, + std::map options) { + return { + std::move(fs), std::move(table_path), BranchManager::NormalizeBranch(branch), + std::move(table_schema), std::move(options), }; } +Result CreateCoreOptions(const MetadataSystemTableContext& context) { + return CoreOptions::FromMap(context.options, context.fs); +} + +Result> CreatePathFactory( + const MetadataSystemTableContext& context, const CoreOptions& core_options, + const std::shared_ptr& pool) { + std::shared_ptr arrow_schema = + DataField::ConvertDataFieldsToArrowSchema(context.table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + core_options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + core_options.CreateGlobalIndexExternalPath()); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr path_factory, + FileStorePathFactory::Create( + context.table_path, arrow_schema, context.table_schema->PartitionKeys(), + core_options.GetPartitionDefaultName(), core_options.GetFileFormat()->Identifier(), + core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(), + external_paths, global_index_external_path, core_options.IndexFileInDataFileDir(), + pool)); + return std::shared_ptr(std::move(path_factory)); +} + +Result> LatestSnapshot(const MetadataSystemTableContext& context) { + SnapshotManager snapshot_manager(context.fs, context.table_path, context.branch); + return snapshot_manager.LatestSnapshot(); +} + +Result> ReadDataManifests( + const MetadataSystemTableContext& context, const Snapshot& snapshot, + const std::shared_ptr& path_factory, const CoreOptions& core_options, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr manifest_list, + ManifestList::Create(context.fs, core_options.GetManifestFormat(), + core_options.GetManifestCompression(), path_factory, pool)); + std::vector manifests; + // TODO(suxiaogang223): Align Java ReadAllManifests semantics by including changelog + // manifests. ReadAllManifests currently delegates to ReadChangelogManifests, which returns + // NotImplemented when a snapshot has a changelog manifest list. + PAIMON_RETURN_NOT_OK(manifest_list->ReadDataManifests(snapshot, &manifests)); + return manifests; +} + +Result> CreateManifestFile( + const MetadataSystemTableContext& context, + const std::shared_ptr& path_factory, const CoreOptions& core_options, + const std::shared_ptr& pool) { + std::shared_ptr arrow_schema = + DataField::ConvertDataFieldsToArrowSchema(context.table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, context.table_schema->PartitionKeys())); + return ManifestFile::Create(context.fs, core_options.GetManifestFormat(), + core_options.GetManifestCompression(), path_factory, + core_options.GetManifestTargetFileSize(), pool, core_options, + partition_schema); +} + +Result> ReadLatestManifestEntries( + const MetadataSystemTableContext& context, + const std::shared_ptr& path_factory, const CoreOptions& core_options, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(std::optional snapshot, LatestSnapshot(context)); + if (!snapshot) { + return std::vector(); + } + PAIMON_ASSIGN_OR_RAISE( + std::vector manifests, + ReadDataManifests(context, snapshot.value(), path_factory, core_options, pool)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr manifest_file, + CreateManifestFile(context, path_factory, core_options, pool)); + std::vector entries; + for (const auto& manifest : manifests) { + PAIMON_RETURN_NOT_OK( + manifest_file->Read(manifest.FileName(), /*filter=*/nullptr, &entries)); + } + return entries; +} + +Result> ReadLatestDataFiles( + const MetadataSystemTableContext& context, + const std::shared_ptr& path_factory, const CoreOptions& core_options, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(std::vector entries, + ReadLatestManifestEntries(context, path_factory, core_options, pool)); + std::vector merged_entries; + PAIMON_RETURN_NOT_OK(FileEntry::MergeEntries(entries, &merged_entries)); + return merged_entries; +} + +Result> OptionalPartitionString( + const BinaryRow& row, const std::shared_ptr& partition_schema) { + if (row.GetFieldCount() <= 0) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::string value, + BinaryRowPartitionComputer::PartToSimpleString( + partition_schema, row, ",", kMaxPartitionStatsLength)); + return std::optional(value); +} + +Result OptionalPartitionStringValue( + const BinaryRow& row, const std::shared_ptr& partition_schema) { + PAIMON_ASSIGN_OR_RAISE(std::optional value, + OptionalPartitionString(row, partition_schema)); + return OptionalStringValue(value); +} + +Result FilePath(const std::shared_ptr& path_factory, + const ManifestEntry& entry, const DataFileMeta& file) { + if (file.external_path) { + return file.external_path.value(); + } + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, + path_factory->BucketPath(entry.Partition(), entry.Bucket())); + return PathUtil::JoinPath(bucket_path, file.file_name); +} + +Result> RowValueStrings(const std::vector& fields, + const InternalRow& row) { + std::shared_ptr schema = DataField::ConvertDataFieldsToArrowSchema(fields); + PAIMON_ASSIGN_OR_RAISE(std::vector getters, + InternalRowUtils::CreateFieldGetters(schema, /*use_view=*/false)); + std::vector values; + int32_t length = std::min(static_cast(fields.size()), row.GetFieldCount()); + values.reserve(length); + for (int32_t i = 0; i < length; ++i) { + std::string value = "null"; + if (!row.IsNullAt(i)) { + VariantType field_value = getters[i](row); + value = DataDefine::VariantValueToString(field_value); + } + values.push_back(std::move(value)); + } + return values; +} + +Result RowValuesString(const std::vector& fields, const InternalRow& row, + std::string_view left, std::string_view right) { + PAIMON_ASSIGN_OR_RAISE(std::vector values, RowValueStrings(fields, row)); + return fmt::format("{}{}{}", left, fmt::join(values, ", "), right); +} + +Result> OptionalRowValuesString(const std::vector& fields, + const InternalRow& row, + std::string_view left, + std::string_view right) { + if (row.GetFieldCount() <= 0) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::string value, RowValuesString(fields, row, left, right)); + return std::optional(value); +} + +Result FieldsValueMapString(const std::vector& fields, + const InternalRow& row) { + PAIMON_ASSIGN_OR_RAISE(std::vector values, RowValueStrings(fields, row)); + std::vector> field_values; + size_t length = std::min(fields.size(), values.size()); + field_values.reserve(length); + for (size_t i = 0; i < length; ++i) { + field_values.emplace_back(fields[i].Name(), std::move(values[i])); + } + std::sort(field_values.begin(), field_values.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + + std::vector entries; + entries.reserve(field_values.size()); + for (const auto& [name, value] : field_values) { + entries.emplace_back(fmt::format("{}={}", name, value)); + } + return fmt::format("{{{}}}", fmt::join(entries, ", ")); +} + +Result NullValueCountsString(const std::vector& fields, + const InternalArray& null_counts) { + std::vector> field_values; + int32_t length = std::min(static_cast(fields.size()), null_counts.Size()); + field_values.reserve(length); + for (int32_t i = 0; i < length; ++i) { + std::string value = + null_counts.IsNullAt(i) ? "null" : std::to_string(null_counts.GetLong(i)); + field_values.emplace_back(fields[i].Name(), std::move(value)); + } + std::sort(field_values.begin(), field_values.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + + std::vector entries; + entries.reserve(field_values.size()); + for (const auto& [name, value] : field_values) { + entries.emplace_back(fmt::format("{}={}", name, value)); + } + return fmt::format("{{{}}}", fmt::join(entries, ", ")); +} + +Result> LoadDataSchema(const MetadataSystemTableContext& context, + int64_t schema_id) { + if (schema_id == context.table_schema->Id()) { + return context.table_schema; + } + SchemaManager schema_manager(context.fs, context.table_path, context.branch); + return schema_manager.ReadSchema(schema_id); +} + +Result> ProjectWriteFields(const std::shared_ptr& data_schema, + const DataFileMeta& file) { + if (!file.write_cols) { + return data_schema->Fields(); + } + + std::vector fields; + fields.reserve(file.write_cols->size() + data_schema->PartitionKeys().size()); + for (const auto& write_col : file.write_cols.value()) { + if (write_col == SpecialFields::RowId().Name() || + write_col == SpecialFields::SequenceNumber().Name()) { + continue; + } + PAIMON_ASSIGN_OR_RAISE(DataField field, data_schema->GetField(write_col)); + fields.push_back(std::move(field)); + } + + for (const auto& partition_key : data_schema->PartitionKeys()) { + if (!ObjectUtils::Contains(file.write_cols.value(), partition_key)) { + PAIMON_ASSIGN_OR_RAISE(DataField field, data_schema->GetField(partition_key)); + fields.push_back(std::move(field)); + } + } + return fields; +} + +Result> WriteColsValue( + const std::optional>& write_cols, + const std::shared_ptr& pool) { + if (!write_cols) { + return std::shared_ptr(); + } + return std::make_shared( + InternalRowUtils::ToNotNullStringArrayData(write_cols.value(), pool)); +} + } // namespace OptionsSystemTable::OptionsSystemTable(std::string table_path, @@ -424,4 +701,195 @@ Result> ConsumersSystemTable::BuildRows() const { return rows; } +ManifestsSystemTable::ManifestsSystemTable(std::shared_ptr fs, std::string table_path, + std::string branch, + std::shared_ptr table_schema, + std::map options) + : InMemorySystemTable(table_path), + context_(CreateMetadataContext(std::move(fs), std::move(table_path), std::move(branch), + std::move(table_schema), std::move(options))) {} + +std::string ManifestsSystemTable::Name() const { + return kName; +} + +Result> ManifestsSystemTable::ArrowSchema() const { + return arrow::schema({ + arrow::field("file_name", arrow::utf8(), /*nullable=*/false), + arrow::field("file_size", arrow::int64(), /*nullable=*/false), + arrow::field("num_added_files", arrow::int64(), /*nullable=*/false), + arrow::field("num_deleted_files", arrow::int64(), /*nullable=*/false), + arrow::field("schema_id", arrow::int64(), /*nullable=*/false), + arrow::field("min_partition_stats", arrow::utf8(), /*nullable=*/true), + arrow::field("max_partition_stats", arrow::utf8(), /*nullable=*/true), + arrow::field("min_row_id", arrow::int64(), /*nullable=*/true), + arrow::field("max_row_id", arrow::int64(), /*nullable=*/true), + }); +} + +Result> ManifestsSystemTable::BuildRows() const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, ArrowSchema()); + PAIMON_ASSIGN_OR_RAISE(std::optional snapshot, LatestSnapshot(context_)); + if (!snapshot) { + return std::vector(); + } + + std::shared_ptr pool = GetDefaultPool(); + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CreateCoreOptions(context_)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr path_factory, + CreatePathFactory(context_, core_options, pool)); + PAIMON_ASSIGN_OR_RAISE( + std::vector manifests, + ReadDataManifests(context_, snapshot.value(), path_factory, core_options, pool)); + std::shared_ptr arrow_schema = + DataField::ConvertDataFieldsToArrowSchema(context_.table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, context_.table_schema->PartitionKeys())); + + std::vector rows; + rows.reserve(manifests.size()); + for (const auto& manifest : manifests) { + GenericRow row(schema->num_fields()); + row.SetField(0, StringValue(manifest.FileName())); + row.SetField(1, manifest.FileSize()); + row.SetField(2, manifest.NumAddedFiles()); + row.SetField(3, manifest.NumDeletedFiles()); + row.SetField(4, manifest.SchemaId()); + PAIMON_ASSIGN_OR_RAISE( + VariantType min_partition, + OptionalPartitionStringValue(manifest.PartitionStats().MinValues(), partition_schema)); + PAIMON_ASSIGN_OR_RAISE( + VariantType max_partition, + OptionalPartitionStringValue(manifest.PartitionStats().MaxValues(), partition_schema)); + row.SetField(5, min_partition); + row.SetField(6, max_partition); + row.SetField(7, OptionalInt64Value(manifest.MinRowId())); + row.SetField(8, OptionalInt64Value(manifest.MaxRowId())); + rows.push_back(std::move(row)); + } + return rows; +} + +FilesSystemTable::FilesSystemTable(std::shared_ptr fs, std::string table_path, + std::string branch, std::shared_ptr table_schema, + std::map options) + : InMemorySystemTable(table_path), + context_(CreateMetadataContext(std::move(fs), std::move(table_path), std::move(branch), + std::move(table_schema), std::move(options))) {} + +std::string FilesSystemTable::Name() const { + return kName; +} + +Result> FilesSystemTable::ArrowSchema() const { + return arrow::schema({ + arrow::field("partition", arrow::utf8(), /*nullable=*/true), + arrow::field("bucket", arrow::int32(), /*nullable=*/false), + arrow::field("file_path", arrow::utf8(), /*nullable=*/false), + arrow::field("file_format", arrow::utf8(), /*nullable=*/false), + arrow::field("schema_id", arrow::int64(), /*nullable=*/false), + arrow::field("level", arrow::int32(), /*nullable=*/false), + arrow::field("record_count", arrow::int64(), /*nullable=*/false), + arrow::field("file_size_in_bytes", arrow::int64(), /*nullable=*/false), + arrow::field("min_key", arrow::utf8(), /*nullable=*/true), + arrow::field("max_key", arrow::utf8(), /*nullable=*/true), + arrow::field("null_value_counts", arrow::utf8(), /*nullable=*/false), + arrow::field("min_value_stats", arrow::utf8(), /*nullable=*/false), + arrow::field("max_value_stats", arrow::utf8(), /*nullable=*/false), + arrow::field("min_sequence_number", arrow::int64(), /*nullable=*/true), + arrow::field("max_sequence_number", arrow::int64(), /*nullable=*/true), + arrow::field("creation_time", arrow::timestamp(arrow::TimeUnit::MILLI), + /*nullable=*/true), + arrow::field("deleteRowCount", arrow::int64(), /*nullable=*/true), + arrow::field("file_source", arrow::utf8(), /*nullable=*/true), + arrow::field("first_row_id", arrow::int64(), /*nullable=*/true), + arrow::field("write_cols", arrow::list(arrow::utf8()), /*nullable=*/true), + }); +} + +Result> FilesSystemTable::BuildRows() const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, ArrowSchema()); + std::shared_ptr pool = GetDefaultPool(); + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CreateCoreOptions(context_)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr path_factory, + CreatePathFactory(context_, core_options, pool)); + PAIMON_ASSIGN_OR_RAISE(std::vector entries, + ReadLatestDataFiles(context_, path_factory, core_options, pool)); + PAIMON_ASSIGN_OR_RAISE( + std::vector partition_fields, + context_.table_schema->GetFields(context_.table_schema->PartitionKeys())); + const std::vector& value_stats_fields = context_.table_schema->Fields(); + + std::vector rows; + rows.reserve(entries.size()); + for (const auto& entry : entries) { + if (!(entry.Kind() == FileKind::Add())) { + continue; + } + + const std::shared_ptr& file = entry.File(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_schema, + LoadDataSchema(context_, file->schema_id)); + PAIMON_ASSIGN_OR_RAISE(std::vector data_stats_fields, + ProjectWriteFields(data_schema, *file)); + PAIMON_ASSIGN_OR_RAISE(std::vector key_fields, + data_schema->TrimmedPrimaryKeyFields()); + if (key_fields.empty()) { + key_fields = data_schema->Fields(); + } + auto stats_evolution = std::make_shared( + data_stats_fields, value_stats_fields, + data_schema->Id() != context_.table_schema->Id() || file->write_cols.has_value(), pool); + PAIMON_ASSIGN_OR_RAISE( + SimpleStatsEvolution::EvolutionStats stats, + stats_evolution->Evolution(file->value_stats, file->row_count, file->value_stats_cols)); + + GenericRow row(schema->num_fields()); + if (context_.table_schema->PartitionKeys().empty()) { + row.SetField(0, NullType()); + } else { + PAIMON_ASSIGN_OR_RAISE(std::string partition, + RowValuesString(partition_fields, entry.Partition(), "{", "}")); + row.SetField(0, StringValue(partition)); + } + row.SetField(1, entry.Bucket()); + PAIMON_ASSIGN_OR_RAISE(std::string file_path, FilePath(path_factory, entry, *file)); + row.SetField(2, StringValue(file_path)); + PAIMON_ASSIGN_OR_RAISE(std::string file_format, file->FileFormat()); + row.SetField(3, StringValue(file_format)); + row.SetField(4, file->schema_id); + row.SetField(5, file->level); + row.SetField(6, file->row_count); + row.SetField(7, file->file_size); + PAIMON_ASSIGN_OR_RAISE(std::optional min_key, + OptionalRowValuesString(key_fields, file->min_key, "[", "]")); + PAIMON_ASSIGN_OR_RAISE(std::optional max_key, + OptionalRowValuesString(key_fields, file->max_key, "[", "]")); + row.SetField(8, OptionalStringValue(min_key)); + row.SetField(9, OptionalStringValue(max_key)); + PAIMON_ASSIGN_OR_RAISE(std::string null_value_counts, + NullValueCountsString(value_stats_fields, *stats.null_counts)); + row.SetField(10, StringValue(null_value_counts)); + PAIMON_ASSIGN_OR_RAISE(std::string min_value_stats, + FieldsValueMapString(value_stats_fields, *stats.min_values)); + row.SetField(11, StringValue(min_value_stats)); + PAIMON_ASSIGN_OR_RAISE(std::string max_value_stats, + FieldsValueMapString(value_stats_fields, *stats.max_values)); + row.SetField(12, StringValue(max_value_stats)); + row.SetField(13, file->min_sequence_number); + row.SetField(14, file->max_sequence_number); + row.SetField(15, TimestampMillisValue(file->creation_time.GetMillisecond())); + row.SetField(16, OptionalInt64Value(file->delete_row_count)); + row.SetField(17, file->file_source ? StringValue(file->file_source.value().ToString()) + : VariantType(NullType())); + row.SetField(18, OptionalInt64Value(file->first_row_id)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr write_cols, + WriteColsValue(file->write_cols, pool)); + row.SetField(19, write_cols ? VariantType(write_cols) : VariantType(NullType())); + rows.push_back(std::move(row)); + } + return rows; +} + } // namespace paimon diff --git a/src/paimon/core/table/system/metadata_system_tables.h b/src/paimon/core/table/system/metadata_system_tables.h index 91948863f..389ad5a95 100644 --- a/src/paimon/core/table/system/metadata_system_tables.h +++ b/src/paimon/core/table/system/metadata_system_tables.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -46,6 +47,8 @@ struct MetadataSystemTableContext { std::shared_ptr fs; std::string table_path; std::string branch; + std::shared_ptr table_schema; + std::map options; }; /// System table for `T$snapshots`, exposing snapshot commit history. @@ -125,4 +128,38 @@ class ConsumersSystemTable : public InMemorySystemTable { MetadataSystemTableContext context_; }; +/// System table for `T$manifests`, exposing data manifest metadata in the latest snapshot. +class ManifestsSystemTable : public InMemorySystemTable { + public: + static constexpr const char* kName = "manifests"; + + ManifestsSystemTable(std::shared_ptr fs, std::string table_path, std::string branch, + std::shared_ptr table_schema, + std::map options); + + std::string Name() const override; + Result> ArrowSchema() const override; + Result> BuildRows() const override; + + private: + MetadataSystemTableContext context_; +}; + +/// System table for `T$files`, exposing data file metadata in the latest snapshot. +class FilesSystemTable : public InMemorySystemTable { + public: + static constexpr const char* kName = "files"; + + FilesSystemTable(std::shared_ptr fs, std::string table_path, std::string branch, + std::shared_ptr table_schema, + std::map options); + + std::string Name() const override; + Result> ArrowSchema() const override; + Result> BuildRows() const override; + + private: + MetadataSystemTableContext context_; +}; + } // namespace paimon diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp index cc176cd5e..061e6d4f1 100644 --- a/src/paimon/core/table/system/system_table.cpp +++ b/src/paimon/core/table/system/system_table.cpp @@ -126,6 +126,24 @@ const std::vector& SystemTableRegistry() { auto options = MergeOptions(table_schema, dynamic_options); return std::make_shared(fs, table_path, LoadBranch(options)); }}, + {ManifestsSystemTable::kName, + [](const std::shared_ptr& fs, const std::string& table_path, + const std::shared_ptr& table_schema, + const std::map& dynamic_options) + -> Result> { + auto options = MergeOptions(table_schema, dynamic_options); + return std::make_shared(fs, table_path, LoadBranch(options), + table_schema, std::move(options)); + }}, + {FilesSystemTable::kName, + [](const std::shared_ptr& fs, const std::string& table_path, + const std::shared_ptr& table_schema, + const std::map& dynamic_options) + -> Result> { + auto options = MergeOptions(table_schema, dynamic_options); + return std::make_shared(fs, table_path, LoadBranch(options), + table_schema, std::move(options)); + }}, }; return registry; } diff --git a/test/inte/read_inte_test.cpp b/test/inte/read_inte_test.cpp index 5211fdd8c..abaa29f16 100644 --- a/test/inte/read_inte_test.cpp +++ b/test/inte/read_inte_test.cpp @@ -699,6 +699,242 @@ TEST(SystemTableReadInteTest, TestReadMetadataSystemTables) { auto branch_create_time_array = std::dynamic_pointer_cast(branches_array->field(1)); ASSERT_TRUE(branch_create_time_array); + + ASSERT_OK_AND_ASSIGN(auto manifests_result, + ReadSystemTable(table_path + "$manifests", options)); + auto manifests_array = SingleStructChunk(manifests_result); + ASSERT_EQ(StructFieldNames(manifests_array), + (std::vector{"file_name", "file_size", "num_added_files", + "num_deleted_files", "schema_id", "min_partition_stats", + "max_partition_stats", "min_row_id", "max_row_id"})); + ASSERT_GT(manifests_array->length(), 0); + auto manifest_file_name_array = + std::dynamic_pointer_cast(manifests_array->field(0)); + auto manifest_file_size_array = + std::dynamic_pointer_cast(manifests_array->field(1)); + auto manifest_num_added_files_array = + std::dynamic_pointer_cast(manifests_array->field(2)); + auto manifest_schema_id_array = + std::dynamic_pointer_cast(manifests_array->field(4)); + ASSERT_TRUE(manifest_file_name_array); + ASSERT_TRUE(manifest_file_size_array); + ASSERT_TRUE(manifest_num_added_files_array); + ASSERT_TRUE(manifest_schema_id_array); + ASSERT_EQ(manifest_file_name_array->GetString(0).find("manifest-"), 0); + ASSERT_GT(manifest_file_size_array->Value(0), 0); + ASSERT_GE(manifest_num_added_files_array->Value(0), 1); + ASSERT_EQ(manifest_schema_id_array->Value(0), 0); + + ASSERT_OK_AND_ASSIGN(auto files_result, ReadSystemTable(table_path + "$files", options)); + auto files_array = SingleStructChunk(files_result); + ASSERT_EQ(StructFieldNames(files_array), (std::vector{"partition", + "bucket", + "file_path", + "file_format", + "schema_id", + "level", + "record_count", + "file_size_in_bytes", + "min_key", + "max_key", + "null_value_counts", + "min_value_stats", + "max_value_stats", + "min_sequence_number", + "max_sequence_number", + "creation_time", + "deleteRowCount", + "file_source", + "first_row_id", + "write_cols"})); + ASSERT_GT(files_array->length(), 0); + auto partition_array = std::dynamic_pointer_cast(files_array->field(0)); + auto bucket_array = std::dynamic_pointer_cast(files_array->field(1)); + auto file_path_array = std::dynamic_pointer_cast(files_array->field(2)); + auto file_format_array = std::dynamic_pointer_cast(files_array->field(3)); + auto file_schema_id_array = std::dynamic_pointer_cast(files_array->field(4)); + auto record_count_array = std::dynamic_pointer_cast(files_array->field(6)); + auto file_size_array = std::dynamic_pointer_cast(files_array->field(7)); + auto min_sequence_number_array = + std::dynamic_pointer_cast(files_array->field(13)); + auto max_sequence_number_array = + std::dynamic_pointer_cast(files_array->field(14)); + auto creation_time_array = + std::dynamic_pointer_cast(files_array->field(15)); + ASSERT_TRUE(partition_array); + ASSERT_TRUE(bucket_array); + ASSERT_TRUE(file_path_array); + ASSERT_TRUE(file_format_array); + ASSERT_TRUE(file_schema_id_array); + ASSERT_TRUE(record_count_array); + ASSERT_TRUE(file_size_array); + ASSERT_TRUE(min_sequence_number_array); + ASSERT_TRUE(max_sequence_number_array); + ASSERT_TRUE(creation_time_array); + ASSERT_TRUE(partition_array->IsNull(0)); + ASSERT_EQ(bucket_array->Value(0), 0); + ASSERT_NE(file_path_array->GetString(0).find("/bucket-0/"), std::string::npos); + ASSERT_EQ(file_format_array->GetString(0), "parquet"); + ASSERT_EQ(file_schema_id_array->Value(0), 0); + ASSERT_EQ(record_count_array->Value(0), 1); + ASSERT_GT(file_size_array->Value(0), 0); + ASSERT_GE(min_sequence_number_array->Value(0), 0); + ASSERT_GE(max_sequence_number_array->Value(0), min_sequence_number_array->Value(0)); + ASSERT_FALSE(creation_time_array->IsNull(0)); +} + +TEST(SystemTableReadInteTest, TestReadFilesSystemTableForPartitionedTable) { + arrow::FieldVector fields = { + arrow::field("dt", arrow::utf8()), + arrow::field("pk", arrow::utf8()), + arrow::field("v", arrow::int32()), + }; + auto schema = arrow::schema(fields); + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {Options::BUCKET, "1"}}; + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(dir->Str(), schema, + /*partition_keys=*/{"dt"}, + /*primary_keys=*/{"dt", "pk"}, options, + /*is_streaming_mode=*/true)); + + ASSERT_OK_AND_ASSIGN( + std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), R"([["20260527", "a", 1]])", + /*partition_map=*/{{"dt", "20260527"}}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + std::string table_path = PathUtil::JoinPath(dir->Str(), "foo.db/bar"); + ASSERT_OK_AND_ASSIGN(auto files_result, ReadSystemTable(table_path + "$files", options)); + auto files_array = SingleStructChunk(files_result); + ASSERT_EQ(files_array->length(), 1); + auto partition_array = std::dynamic_pointer_cast(files_array->field(0)); + auto file_path_array = std::dynamic_pointer_cast(files_array->field(2)); + auto min_key_array = std::dynamic_pointer_cast(files_array->field(8)); + auto max_key_array = std::dynamic_pointer_cast(files_array->field(9)); + auto null_value_counts_array = + std::dynamic_pointer_cast(files_array->field(10)); + auto min_value_stats_array = + std::dynamic_pointer_cast(files_array->field(11)); + auto max_value_stats_array = + std::dynamic_pointer_cast(files_array->field(12)); + ASSERT_TRUE(partition_array); + ASSERT_TRUE(file_path_array); + ASSERT_TRUE(min_key_array); + ASSERT_TRUE(max_key_array); + ASSERT_TRUE(null_value_counts_array); + ASSERT_TRUE(min_value_stats_array); + ASSERT_TRUE(max_value_stats_array); + ASSERT_EQ(partition_array->GetString(0), "{20260527}"); + ASSERT_NE(file_path_array->GetString(0).find("/dt=20260527/bucket-0/"), std::string::npos); + ASSERT_EQ(min_key_array->GetString(0), "[a]"); + ASSERT_EQ(max_key_array->GetString(0), "[a]"); + ASSERT_EQ(null_value_counts_array->GetString(0), "{dt=0, pk=0, v=0}"); + ASSERT_EQ(min_value_stats_array->GetString(0), "{dt=20260527, pk=a, v=1}"); + ASSERT_EQ(max_value_stats_array->GetString(0), "{dt=20260527, pk=a, v=1}"); +} + +TEST(SystemTableReadInteTest, TestReadFilesSystemTableWithSchemaEvolutionStats) { + std::map options = {{Options::FILE_SYSTEM, "local"}}; + std::string table_path = paimon::test::GetDataDir() + + "/orc/append_table_with_alter_table_with_dense_field.db/" + "append_table_with_alter_table_with_dense_field"; + + ASSERT_OK_AND_ASSIGN(auto files_result, ReadSystemTable(table_path + "$files", options)); + auto files_array = SingleStructChunk(files_result); + ASSERT_EQ(StructFieldNames(files_array), (std::vector{"partition", + "bucket", + "file_path", + "file_format", + "schema_id", + "level", + "record_count", + "file_size_in_bytes", + "min_key", + "max_key", + "null_value_counts", + "min_value_stats", + "max_value_stats", + "min_sequence_number", + "max_sequence_number", + "creation_time", + "deleteRowCount", + "file_source", + "first_row_id", + "write_cols"})); + ASSERT_GT(files_array->length(), 0); + + auto partition_array = std::dynamic_pointer_cast(files_array->field(0)); + auto schema_id_array = std::dynamic_pointer_cast(files_array->field(4)); + auto null_value_counts_array = + std::dynamic_pointer_cast(files_array->field(10)); + auto min_value_stats_array = + std::dynamic_pointer_cast(files_array->field(11)); + auto max_value_stats_array = + std::dynamic_pointer_cast(files_array->field(12)); + ASSERT_TRUE(partition_array); + ASSERT_TRUE(schema_id_array); + ASSERT_TRUE(null_value_counts_array); + ASSERT_TRUE(min_value_stats_array); + ASSERT_TRUE(max_value_stats_array); + + bool found_old_schema_file = false; + bool found_latest_schema_file = false; + for (int64_t i = 0; i < files_array->length(); ++i) { + std::string partition = partition_array->GetString(i); + ASSERT_TRUE(partition == "{0}" || partition == "{1}"); + + std::string null_value_counts = null_value_counts_array->GetString(i); + std::string min_value_stats = min_value_stats_array->GetString(i); + std::string max_value_stats = max_value_stats_array->GetString(i); + ASSERT_NE(null_value_counts.find("f4="), std::string::npos); + ASSERT_NE(min_value_stats.find("f4="), std::string::npos); + ASSERT_NE(max_value_stats.find("f4="), std::string::npos); + ASSERT_EQ(null_value_counts.find("f0="), std::string::npos); + ASSERT_EQ(min_value_stats.find("f0="), std::string::npos); + ASSERT_EQ(max_value_stats.find("f0="), std::string::npos); + + if (schema_id_array->Value(i) == 0) { + found_old_schema_file = true; + ASSERT_NE(null_value_counts.find("f4="), std::string::npos); + ASSERT_NE(min_value_stats.find("f4=null"), std::string::npos); + ASSERT_NE(max_value_stats.find("f4=null"), std::string::npos); + } else if (schema_id_array->Value(i) == 1) { + found_latest_schema_file = true; + } + } + ASSERT_TRUE(found_old_schema_file); + ASSERT_TRUE(found_latest_schema_file); +} + +TEST(SystemTableReadInteTest, TestReadManifestAndFilesSystemTablesForEmptyTable) { + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}}; + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string warehouse = PathUtil::JoinPath(dir->Str(), "warehouse"); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(warehouse, options)); + ASSERT_OK(catalog->CreateDatabase("db1", options, /*ignore_if_exists=*/false)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog->CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + std::string table_path = catalog->GetTableLocation(Identifier("db1", "tbl1")); + ASSERT_OK_AND_ASSIGN(auto manifests_result, + ReadSystemTable(table_path + "$manifests", options)); + ASSERT_EQ(manifests_result.array, nullptr); + ASSERT_OK_AND_ASSIGN(auto files_result, ReadSystemTable(table_path + "$files", options)); + ASSERT_EQ(files_result.array, nullptr); } TEST(SystemTableReadInteTest, TestReadTagBranchAndConsumerSystemTables) {