From e94fcc81f66a59e0aaae6f6166338389c87f1018 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 20 Jan 2026 19:18:36 +0800 Subject: [PATCH 1/3] chore: refactor DataFileSet and make WriteManifests to use iterators - Move DataFileSet out of content_file_util.h to data_file_set.h to reduce header dependencies - Refactor WriteDataManifests and WriteDeleteManifests to accept iterator begin/end instead of vectors --- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/data_file_set_test.cc | 237 +++++++++++++++++++++++++ src/iceberg/update/fast_append.cc | 8 +- src/iceberg/update/fast_append.h | 2 +- src/iceberg/update/snapshot_update.cc | 61 ------- src/iceberg/update/snapshot_update.h | 72 +++++++- src/iceberg/util/content_file_util.h | 68 ------- src/iceberg/util/data_file_set.h | 104 +++++++++++ 8 files changed, 409 insertions(+), 144 deletions(-) create mode 100644 src/iceberg/test/data_file_set_test.cc create mode 100644 src/iceberg/util/data_file_set.h diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 3414a862e..28e0b8d51 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -109,6 +109,7 @@ add_iceberg_test(util_test SOURCES bucket_util_test.cc config_test.cc + data_file_set_test.cc decimal_test.cc endian_test.cc formatter_test.cc diff --git a/src/iceberg/test/data_file_set_test.cc b/src/iceberg/test/data_file_set_test.cc new file mode 100644 index 000000000..595ecf3d0 --- /dev/null +++ b/src/iceberg/test/data_file_set_test.cc @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/data_file_set.h" + +#include + +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/row/partition_values.h" + +namespace iceberg { + +class DataFileSetTest : public ::testing::Test { + protected: + std::shared_ptr CreateDataFile(const std::string& path, int64_t size = 100) { + auto file = std::make_shared(); + file->file_path = path; + file->file_format = FileFormatType::kParquet; + file->file_size_in_bytes = size; + file->record_count = 10; + file->content = DataFile::Content::kData; + return file; + } +}; + +TEST_F(DataFileSetTest, EmptySet) { + DataFileSet set; + EXPECT_TRUE(set.empty()); + EXPECT_EQ(set.size(), 0); + EXPECT_EQ(set.begin(), set.end()); +} + +TEST_F(DataFileSetTest, InsertSingleFile) { + DataFileSet set; + auto file = CreateDataFile("/path/to/file.parquet"); + + auto [iter, inserted] = set.insert(file); + EXPECT_TRUE(inserted); + EXPECT_EQ(*iter, file); + EXPECT_FALSE(set.empty()); + EXPECT_EQ(set.size(), 1); +} + +TEST_F(DataFileSetTest, InsertDuplicateFile) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file.parquet"); + auto file2 = CreateDataFile("/path/to/file.parquet"); // Same path + + auto [iter1, inserted1] = set.insert(file1); + EXPECT_TRUE(inserted1); + + auto [iter2, inserted2] = set.insert(file2); + EXPECT_FALSE(inserted2); + EXPECT_EQ(iter1, iter2); // Should point to the same element + EXPECT_EQ(set.size(), 1); // Should still be size 1 +} + +TEST_F(DataFileSetTest, InsertDifferentFiles) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file3.parquet"); + + set.insert(file1); + set.insert(file2); + set.insert(file3); + + EXPECT_EQ(set.size(), 3); + EXPECT_FALSE(set.empty()); +} + +TEST_F(DataFileSetTest, InsertionOrderPreserved) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file3.parquet"); + + set.insert(file1); + set.insert(file2); + set.insert(file3); + + // Iterate and verify order + std::vector paths; + for (const auto& file : set) { + paths.push_back(file->file_path); + } + + EXPECT_EQ(paths.size(), 3); + EXPECT_EQ(paths[0], "/path/to/file1.parquet"); + EXPECT_EQ(paths[1], "/path/to/file2.parquet"); + EXPECT_EQ(paths[2], "/path/to/file3.parquet"); +} + +TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file1.parquet"); // Duplicate of file1 + + set.insert(file1); + set.insert(file2); + set.insert(file3); // Should not insert, but order should be preserved + + EXPECT_EQ(set.size(), 2); + + std::vector paths; + for (const auto& file : set) { + paths.push_back(file->file_path); + } + + EXPECT_EQ(paths[0], "/path/to/file1.parquet"); + EXPECT_EQ(paths[1], "/path/to/file2.parquet"); +} + +TEST_F(DataFileSetTest, InsertNullFile) { + DataFileSet set; + std::shared_ptr null_file = nullptr; + + auto [iter, inserted] = set.insert(null_file); + EXPECT_FALSE(inserted); + EXPECT_EQ(iter, set.end()); + EXPECT_TRUE(set.empty()); + EXPECT_EQ(set.size(), 0); +} + +TEST_F(DataFileSetTest, InsertMoveSemantics) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + + // Insert using move + auto [iter1, inserted1] = set.insert(std::move(file1)); + EXPECT_TRUE(inserted1); + EXPECT_EQ(file1, nullptr); // Should be moved + + // Insert using copy + auto [iter2, inserted2] = set.insert(file2); + EXPECT_TRUE(inserted2); + EXPECT_NE(file2, nullptr); // Should still be valid + + EXPECT_EQ(set.size(), 2); +} + +TEST_F(DataFileSetTest, Clear) { + DataFileSet set; + set.insert(CreateDataFile("/path/to/file1.parquet")); + set.insert(CreateDataFile("/path/to/file2.parquet")); + + EXPECT_EQ(set.size(), 2); + set.clear(); + EXPECT_TRUE(set.empty()); + EXPECT_EQ(set.size(), 0); + EXPECT_EQ(set.begin(), set.end()); +} + +TEST_F(DataFileSetTest, IteratorOperations) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file3.parquet"); + + set.insert(file1); + set.insert(file2); + set.insert(file3); + + // Test const iterators + const auto& const_set = set; + EXPECT_NE(const_set.begin(), const_set.end()); + EXPECT_NE(const_set.cbegin(), const_set.cend()); + + // Test iterator increment + auto it = set.begin(); + EXPECT_EQ((*it)->file_path, "/path/to/file1.parquet"); + ++it; + EXPECT_EQ((*it)->file_path, "/path/to/file2.parquet"); + ++it; + EXPECT_EQ((*it)->file_path, "/path/to/file3.parquet"); + ++it; + EXPECT_EQ(it, set.end()); +} + +TEST_F(DataFileSetTest, RangeBasedForLoop) { + DataFileSet set; + set.insert(CreateDataFile("/path/to/file1.parquet")); + set.insert(CreateDataFile("/path/to/file2.parquet")); + set.insert(CreateDataFile("/path/to/file3.parquet")); + + int count = 0; + for (const auto& file : set) { + EXPECT_NE(file, nullptr); + ++count; + } + EXPECT_EQ(count, 3); +} + +TEST_F(DataFileSetTest, CaseSensitivePaths) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file.parquet"); + auto file2 = CreateDataFile("/path/to/FILE.parquet"); // Different case + + set.insert(file1); + set.insert(file2); + + // Should be treated as different files + EXPECT_EQ(set.size(), 2); +} + +TEST_F(DataFileSetTest, MultipleInsertsSameFile) { + DataFileSet set; + auto file = CreateDataFile("/path/to/file.parquet"); + + // Insert the same file multiple times + set.insert(file); + set.insert(file); + set.insert(file); + + EXPECT_EQ(set.size(), 1); +} + +} // namespace iceberg diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index c7f66f2fb..c9b248301 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -20,7 +20,6 @@ #include "iceberg/update/fast_append.h" #include -#include #include #include "iceberg/constants.h" @@ -198,10 +197,9 @@ Result> FastAppend::WriteNewManifests() { if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) { for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id)); - std::vector> files; - files.reserve(data_files.size()); - std::ranges::copy(data_files, std::back_inserter(files)); - ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec)); + ICEBERG_ASSIGN_OR_RAISE( + auto written_manifests, + WriteDataManifests(data_files.begin(), data_files.end(), spec)); new_manifests_.insert(new_manifests_.end(), std::make_move_iterator(written_manifests.begin()), std::make_move_iterator(written_manifests.end())); diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index 87887c74d..7f5cbb097 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -30,7 +30,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/snapshot_update.h" -#include "iceberg/util/content_file_util.h" +#include "iceberg/util/data_file_set.h" namespace iceberg { diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 2bbb2d506..25d3ba0d8 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -27,12 +27,7 @@ #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" -#include "iceberg/manifest/manifest_writer.h" -#include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/partition_summary_internal.h" -#include "iceberg/snapshot.h" -#include "iceberg/table.h" -#include "iceberg/transaction.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" @@ -164,62 +159,6 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) target_manifest_size_bytes_( base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {} -// TODO(xxx): write manifests in parallel -Result> SnapshotUpdate::WriteDataManifests( - const std::vector>& data_files, - const std::shared_ptr& spec, - std::optional data_sequence_number) { - if (data_files.empty()) { - return std::vector{}; - } - - ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); - RollingManifestWriter rolling_writer( - [this, spec, schema = std::move(current_schema), - snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter(base().format_version, snapshot_id, - ManifestPath(), transaction_->table()->io(), - std::move(spec), std::move(schema), - ManifestContent::kData, - /*first_row_id=*/base().next_row_id); - }, - target_manifest_size_bytes_); - - for (const auto& file : data_files) { - ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number)); - } - ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); - return rolling_writer.ToManifestFiles(); -} - -// TODO(xxx): write manifests in parallel -Result> SnapshotUpdate::WriteDeleteManifests( - const std::vector>& delete_files, - const std::shared_ptr& spec) { - if (delete_files.empty()) { - return std::vector{}; - } - - ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); - RollingManifestWriter rolling_writer( - [this, spec, schema = std::move(current_schema), - snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter(base().format_version, snapshot_id, - ManifestPath(), transaction_->table()->io(), - std::move(spec), std::move(schema), - ManifestContent::kDeletes); - }, - target_manifest_size_bytes_); - - for (const auto& file : delete_files) { - /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with - /// file->data_sequenece_number - ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file)); - } - ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); - return rolling_writer.ToManifestFiles(); -} - int64_t SnapshotUpdate::SnapshotId() { if (!snapshot_id_.has_value()) { snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base()); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index f31327fcd..decd4cfa3 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -29,8 +29,12 @@ #include "iceberg/iceberg_export.h" #include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/result.h" #include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/transaction.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" @@ -103,25 +107,75 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Write data manifests for the given data files /// - /// \param data_files The data files to write + /// \tparam Iterator Iterator type that dereferences to std::shared_ptr + /// \param begin Iterator to the beginning of the data files range + /// \param end Iterator to the end of the data files range /// \param spec The partition spec to use /// \param data_sequence_number Optional data sequence number for the files /// \return A vector of manifest files - /// TODO(xxx): Change signature to accept iterator begin/end instead of vector to avoid - /// intermediate vector allocations (e.g., from DataFileSet) + // TODO(xxx): write manifests in parallel + template Result> WriteDataManifests( - const std::vector>& data_files, - const std::shared_ptr& spec, - std::optional data_sequence_number = std::nullopt); + Iterator begin, Iterator end, const std::shared_ptr& spec, + std::optional data_sequence_number = std::nullopt) { + if (begin == end) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + RollingManifestWriter rolling_writer( + [this, spec, schema = std::move(current_schema), + snapshot_id = SnapshotId()]() -> Result> { + return ManifestWriter::MakeWriter(base().format_version, snapshot_id, + ManifestPath(), transaction_->table()->io(), + std::move(spec), std::move(schema), + ManifestContent::kData, + /*first_row_id=*/base().next_row_id); + }, + target_manifest_size_bytes_); + + for (auto it = begin; it != end; ++it) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(*it, data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); + } /// \brief Write delete manifests for the given delete files /// - /// \param delete_files The delete files to write + /// \tparam Iterator Iterator type that dereferences to std::shared_ptr + /// \param begin Iterator to the beginning of the delete files range + /// \param end Iterator to the end of the delete files range /// \param spec The partition spec to use /// \return A vector of manifest files + // TODO(xxx): write manifests in parallel + template Result> WriteDeleteManifests( - const std::vector>& delete_files, - const std::shared_ptr& spec); + Iterator begin, Iterator end, const std::shared_ptr& spec) { + if (begin == end) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + RollingManifestWriter rolling_writer( + [this, spec, schema = std::move(current_schema), + snapshot_id = SnapshotId()]() -> Result> { + return ManifestWriter::MakeWriter(base().format_version, snapshot_id, + ManifestPath(), transaction_->table()->io(), + std::move(spec), std::move(schema), + ManifestContent::kDeletes); + }, + target_manifest_size_bytes_); + + for (auto it = begin; it != end; ++it) { + /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with + /// (*it)->data_sequenece_number + ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(*it)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); + } Status SetTargetBranch(const std::string& branch); const std::string& target_branch() const { return target_branch_; } diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index 95a8d6343..f547716d2 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -22,12 +22,10 @@ /// \file iceberg/util/content_file_util.h /// Utility functions for content files (data files and delete files). -#include #include #include #include #include -#include #include "iceberg/iceberg_export.h" #include "iceberg/manifest/manifest_entry.h" @@ -36,72 +34,6 @@ namespace iceberg { -/// \brief A set of DataFile pointers with insertion order preserved and deduplicated by -/// file path. -class ICEBERG_EXPORT DataFileSet { - public: - using value_type = std::shared_ptr; - using iterator = typename std::vector::iterator; - using const_iterator = typename std::vector::const_iterator; - using difference_type = typename std::vector::difference_type; - - DataFileSet() = default; - - /// \brief Insert a data file into the set. - /// \param file The data file to insert - /// \return A pair with an iterator to the inserted element (or the existing one) and - /// a bool indicating whether insertion took place - std::pair insert(const value_type& file) { return InsertImpl(file); } - - /// \brief Insert a data file into the set (move version). - std::pair insert(value_type&& file) { - return InsertImpl(std::move(file)); - } - - /// \brief Get the number of elements in the set. - size_t size() const { return elements_.size(); } - - /// \brief Check if the set is empty. - bool empty() const { return elements_.empty(); } - - /// \brief Clear all elements from the set. - void clear() { - elements_.clear(); - index_by_path_.clear(); - } - - /// \brief Get iterator to the beginning. - iterator begin() { return elements_.begin(); } - const_iterator begin() const { return elements_.begin(); } - const_iterator cbegin() const { return elements_.cbegin(); } - - /// \brief Get iterator to the end. - iterator end() { return elements_.end(); } - const_iterator end() const { return elements_.end(); } - const_iterator cend() const { return elements_.cend(); } - - private: - std::pair InsertImpl(value_type file) { - if (!file) { - return {elements_.end(), false}; - } - - auto [index_iter, inserted] = - index_by_path_.try_emplace(file->file_path, elements_.size()); - if (!inserted) { - auto pos = static_cast(index_iter->second); - return {elements_.begin() + pos, false}; - } - - elements_.push_back(std::move(file)); - return {std::prev(elements_.end()), true}; - } - - // Vector to preserve insertion order - std::vector elements_; - std::unordered_map index_by_path_; -}; - /// \brief Utility functions for content files. struct ICEBERG_EXPORT ContentFileUtil { /// \brief Check if a delete file is a deletion vector (DV). diff --git a/src/iceberg/util/data_file_set.h b/src/iceberg/util/data_file_set.h new file mode 100644 index 000000000..729af0051 --- /dev/null +++ b/src/iceberg/util/data_file_set.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/util/data_file_set.h +/// A set of DataFile pointers with insertion order preserved and deduplicated by file +/// path. + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/util/string_util.h" + +namespace iceberg { + +/// \brief A set of DataFile pointers with insertion order preserved and deduplicated by +/// file path. +class ICEBERG_EXPORT DataFileSet { + public: + using value_type = std::shared_ptr; + using iterator = typename std::vector::iterator; + using const_iterator = typename std::vector::const_iterator; + using difference_type = typename std::vector::difference_type; + + DataFileSet() = default; + + /// \brief Insert a data file into the set. + /// \param file The data file to insert + /// \return A pair with an iterator to the inserted element (or the existing one) and + /// a bool indicating whether insertion took place + std::pair insert(const value_type& file) { return InsertImpl(file); } + + /// \brief Insert a data file into the set (move version). + std::pair insert(value_type&& file) { + return InsertImpl(std::move(file)); + } + + /// \brief Get the number of elements in the set. + size_t size() const { return elements_.size(); } + + /// \brief Check if the set is empty. + bool empty() const { return elements_.empty(); } + + /// \brief Clear all elements from the set. + void clear() { + elements_.clear(); + index_by_path_.clear(); + } + + /// \brief Get iterator to the beginning. + iterator begin() { return elements_.begin(); } + const_iterator begin() const { return elements_.begin(); } + const_iterator cbegin() const { return elements_.cbegin(); } + + /// \brief Get iterator to the end. + iterator end() { return elements_.end(); } + const_iterator end() const { return elements_.end(); } + const_iterator cend() const { return elements_.cend(); } + + private: + std::pair InsertImpl(value_type file) { + if (!file) { + return {elements_.end(), false}; + } + + auto [index_iter, inserted] = + index_by_path_.try_emplace(file->file_path, elements_.size()); + if (!inserted) { + auto pos = static_cast(index_iter->second); + return {elements_.begin() + pos, false}; + } + + elements_.push_back(std::move(file)); + return {std::prev(elements_.end()), true}; + } + + // Vector to preserve insertion order + std::vector elements_; + std::unordered_map index_by_path_; +}; + +} // namespace iceberg From 5839cf8930e6fd77756db47863d266b62d49b8ce Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Wed, 21 Jan 2026 10:29:32 +0800 Subject: [PATCH 2/3] fix: add a as as_span method for DataFileSet --- src/iceberg/test/data_file_set_test.cc | 19 +++++++ src/iceberg/test/meson.build | 1 + src/iceberg/update/fast_append.cc | 5 +- src/iceberg/update/snapshot_update.cc | 60 +++++++++++++++++++++ src/iceberg/update/snapshot_update.h | 72 +++----------------------- src/iceberg/util/data_file_set.h | 6 +++ src/iceberg/util/meson.build | 1 + 7 files changed, 97 insertions(+), 67 deletions(-) diff --git a/src/iceberg/test/data_file_set_test.cc b/src/iceberg/test/data_file_set_test.cc index 595ecf3d0..30cadbc85 100644 --- a/src/iceberg/test/data_file_set_test.cc +++ b/src/iceberg/test/data_file_set_test.cc @@ -45,6 +45,7 @@ TEST_F(DataFileSetTest, EmptySet) { EXPECT_TRUE(set.empty()); EXPECT_EQ(set.size(), 0); EXPECT_EQ(set.begin(), set.end()); + EXPECT_TRUE(set.as_span().empty()); } TEST_F(DataFileSetTest, InsertSingleFile) { @@ -108,6 +109,24 @@ TEST_F(DataFileSetTest, InsertionOrderPreserved) { EXPECT_EQ(paths[2], "/path/to/file3.parquet"); } +TEST_F(DataFileSetTest, AsSpan) { + DataFileSet set; + EXPECT_TRUE(set.as_span().empty()); + + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + set.insert(file1); + set.insert(file2); + + auto span = set.as_span(); + EXPECT_EQ(span.size(), 2); + EXPECT_EQ(span[0]->file_path, "/path/to/file1.parquet"); + EXPECT_EQ(span[1]->file_path, "/path/to/file2.parquet"); + + set.clear(); + EXPECT_TRUE(set.as_span().empty()); +} + TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) { DataFileSet set; auto file1 = CreateDataFile("/path/to/file1.parquet"); diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 791340be7..5e3007c4a 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -84,6 +84,7 @@ iceberg_tests = { 'sources': files( 'bucket_util_test.cc', 'config_test.cc', + 'data_file_set_test.cc', 'decimal_test.cc', 'endian_test.cc', 'formatter_test.cc', diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index c9b248301..3c132a407 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -197,9 +197,8 @@ Result> FastAppend::WriteNewManifests() { if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) { for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id)); - ICEBERG_ASSIGN_OR_RAISE( - auto written_manifests, - WriteDataManifests(data_files.begin(), data_files.end(), spec)); + ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, + WriteDataManifests(data_files.as_span(), spec)); new_manifests_.insert(new_manifests_.end(), std::make_move_iterator(written_manifests.begin()), std::make_move_iterator(written_manifests.end())); diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 25d3ba0d8..38c5129f4 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -27,7 +27,11 @@ #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/partition_summary_internal.h" +#include "iceberg/table.h" +#include "iceberg/transaction.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" @@ -159,6 +163,62 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) target_manifest_size_bytes_( base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {} +// TODO(xxx): write manifests in parallel +Result> SnapshotUpdate::WriteDataManifests( + std::span> files, + const std::shared_ptr& spec, + std::optional data_sequence_number) { + if (files.empty()) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + RollingManifestWriter rolling_writer( + [this, spec, schema = std::move(current_schema), + snapshot_id = SnapshotId()]() -> Result> { + return ManifestWriter::MakeWriter(base().format_version, snapshot_id, + ManifestPath(), transaction_->table()->io(), + std::move(spec), std::move(schema), + ManifestContent::kData, + /*first_row_id=*/base().next_row_id); + }, + target_manifest_size_bytes_); + + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); +} + +// TODO(xxx): write manifests in parallel +Result> SnapshotUpdate::WriteDeleteManifests( + std::span> files, + const std::shared_ptr& spec) { + if (files.empty()) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + RollingManifestWriter rolling_writer( + [this, spec, schema = std::move(current_schema), + snapshot_id = SnapshotId()]() -> Result> { + return ManifestWriter::MakeWriter(base().format_version, snapshot_id, + ManifestPath(), transaction_->table()->io(), + std::move(spec), std::move(schema), + ManifestContent::kDeletes); + }, + target_manifest_size_bytes_); + + for (const auto& file : files) { + // FIXME: Java impl wrap it with `PendingDeleteFile` and deals with + // file->data_sequence_number + ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); +} + int64_t SnapshotUpdate::SnapshotId() { if (!snapshot_id_.has_value()) { snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base()); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index decd4cfa3..12c3b19dc 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -22,19 +22,15 @@ #include #include #include +#include #include #include #include #include #include "iceberg/iceberg_export.h" -#include "iceberg/manifest/manifest_list.h" -#include "iceberg/manifest/manifest_writer.h" -#include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/result.h" #include "iceberg/snapshot.h" -#include "iceberg/table.h" -#include "iceberg/transaction.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" @@ -107,75 +103,23 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Write data manifests for the given data files /// - /// \tparam Iterator Iterator type that dereferences to std::shared_ptr - /// \param begin Iterator to the beginning of the data files range - /// \param end Iterator to the end of the data files range + /// \param files Data files to write /// \param spec The partition spec to use /// \param data_sequence_number Optional data sequence number for the files /// \return A vector of manifest files - // TODO(xxx): write manifests in parallel - template Result> WriteDataManifests( - Iterator begin, Iterator end, const std::shared_ptr& spec, - std::optional data_sequence_number = std::nullopt) { - if (begin == end) { - return std::vector{}; - } - - ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); - RollingManifestWriter rolling_writer( - [this, spec, schema = std::move(current_schema), - snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter(base().format_version, snapshot_id, - ManifestPath(), transaction_->table()->io(), - std::move(spec), std::move(schema), - ManifestContent::kData, - /*first_row_id=*/base().next_row_id); - }, - target_manifest_size_bytes_); - - for (auto it = begin; it != end; ++it) { - ICEBERG_RETURN_UNEXPECTED( - rolling_writer.WriteAddedEntry(*it, data_sequence_number)); - } - ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); - return rolling_writer.ToManifestFiles(); - } + std::span> files, + const std::shared_ptr& spec, + std::optional data_sequence_number = std::nullopt); /// \brief Write delete manifests for the given delete files /// - /// \tparam Iterator Iterator type that dereferences to std::shared_ptr - /// \param begin Iterator to the beginning of the delete files range - /// \param end Iterator to the end of the delete files range + /// \param files Delete files to write /// \param spec The partition spec to use /// \return A vector of manifest files - // TODO(xxx): write manifests in parallel - template Result> WriteDeleteManifests( - Iterator begin, Iterator end, const std::shared_ptr& spec) { - if (begin == end) { - return std::vector{}; - } - - ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); - RollingManifestWriter rolling_writer( - [this, spec, schema = std::move(current_schema), - snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter(base().format_version, snapshot_id, - ManifestPath(), transaction_->table()->io(), - std::move(spec), std::move(schema), - ManifestContent::kDeletes); - }, - target_manifest_size_bytes_); - - for (auto it = begin; it != end; ++it) { - /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with - /// (*it)->data_sequenece_number - ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(*it)); - } - ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); - return rolling_writer.ToManifestFiles(); - } + std::span> files, + const std::shared_ptr& spec); Status SetTargetBranch(const std::string& branch); const std::string& target_branch() const { return target_branch_; } diff --git a/src/iceberg/util/data_file_set.h b/src/iceberg/util/data_file_set.h index 729af0051..608e4a725 100644 --- a/src/iceberg/util/data_file_set.h +++ b/src/iceberg/util/data_file_set.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -79,6 +80,11 @@ class ICEBERG_EXPORT DataFileSet { const_iterator end() const { return elements_.end(); } const_iterator cend() const { return elements_.cend(); } + /// \brief Get a non-owning view of the data files in insertion order. + std::span as_span() const { + return std::span(elements_.data(), elements_.size()); + } + private: std::pair InsertImpl(value_type file) { if (!file) { diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index 95952bb8b..496a75758 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -22,6 +22,7 @@ install_headers( 'config.h', 'content_file_util.h', 'conversions.h', + 'data_file_set.h', 'decimal.h', 'endian.h', 'error_collector.h', From 67926a871a4199e232bf6f1ac5b3ea01b2b4e7de Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Wed, 21 Jan 2026 12:33:20 +0800 Subject: [PATCH 3/3] fix: clang-tidy and add testcase for as span --- src/iceberg/test/data_file_set_test.cc | 34 +++++++++++++++++++++++--- src/iceberg/util/data_file_set.h | 4 +-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/iceberg/test/data_file_set_test.cc b/src/iceberg/test/data_file_set_test.cc index 30cadbc85..60539adfa 100644 --- a/src/iceberg/test/data_file_set_test.cc +++ b/src/iceberg/test/data_file_set_test.cc @@ -113,15 +113,43 @@ TEST_F(DataFileSetTest, AsSpan) { DataFileSet set; EXPECT_TRUE(set.as_span().empty()); + // Single element + auto file0 = CreateDataFile("/path/to/file0.parquet"); + set.insert(file0); + { + auto span = set.as_span(); + EXPECT_EQ(span.size(), 1); + EXPECT_EQ(span[0]->file_path, "/path/to/file0.parquet"); + EXPECT_EQ(span[0], file0); // Same pointer, span is a view + } + + // Multiple elements auto file1 = CreateDataFile("/path/to/file1.parquet"); auto file2 = CreateDataFile("/path/to/file2.parquet"); set.insert(file1); set.insert(file2); auto span = set.as_span(); - EXPECT_EQ(span.size(), 2); - EXPECT_EQ(span[0]->file_path, "/path/to/file1.parquet"); - EXPECT_EQ(span[1]->file_path, "/path/to/file2.parquet"); + EXPECT_EQ(span.size(), 3); + EXPECT_EQ(span[0]->file_path, "/path/to/file0.parquet"); + EXPECT_EQ(span[1]->file_path, "/path/to/file1.parquet"); + EXPECT_EQ(span[2]->file_path, "/path/to/file2.parquet"); + + // Span matches set iteration order and identity + size_t i = 0; + for (const auto& file : set) { + EXPECT_EQ(span[i], file) << "Span element " << i << " should match set iterator"; + ++i; + } + EXPECT_EQ(i, span.size()); + + // Span works with range-for + i = 0; + for (const auto& file : span) { + EXPECT_EQ(file->file_path, span[i]->file_path); + ++i; + } + EXPECT_EQ(i, 3); set.clear(); EXPECT_TRUE(set.as_span().empty()); diff --git a/src/iceberg/util/data_file_set.h b/src/iceberg/util/data_file_set.h index 608e4a725..741b34e56 100644 --- a/src/iceberg/util/data_file_set.h +++ b/src/iceberg/util/data_file_set.h @@ -81,9 +81,7 @@ class ICEBERG_EXPORT DataFileSet { const_iterator cend() const { return elements_.cend(); } /// \brief Get a non-owning view of the data files in insertion order. - std::span as_span() const { - return std::span(elements_.data(), elements_.size()); - } + std::span as_span() const { return elements_; } private: std::pair InsertImpl(value_type file) {