From 1b633cc4a76e535712ff5b7f8de1bf3827bff319 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Thu, 8 Jan 2026 12:26:46 +0800 Subject: [PATCH 1/3] feat: add UpdateSnapshotReference test cases will be added in the SnapshotManager PR. --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/parquet/parquet_data_util.cc | 2 +- src/iceberg/transaction.cc | 39 ++- src/iceberg/transaction.h | 4 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/meson.build | 1 + src/iceberg/update/pending_update.h | 1 + .../update/update_snapshot_reference.cc | 230 ++++++++++++++++++ .../update/update_snapshot_reference.h | 152 ++++++++++++ src/iceberg/util/snapshot_util.cc | 11 + src/iceberg/util/snapshot_util_internal.h | 11 + 12 files changed, 448 insertions(+), 6 deletions(-) create mode 100644 src/iceberg/update/update_snapshot_reference.cc create mode 100644 src/iceberg/update/update_snapshot_reference.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 788b903f7..359b79e6a 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -94,6 +94,7 @@ set(ICEBERG_SOURCES update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc + update/update_snapshot_reference.cc update/update_sort_order.cc update/update_statistics.cc util/bucket_util.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index a1f88b36d..05cb6f8d3 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -112,6 +112,7 @@ iceberg_sources = files( 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_schema.cc', + 'update/update_snapshot_reference.cc', 'update/update_sort_order.cc', 'update/update_statistics.cc', 'util/bucket_util.cc', diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 14d20ff9e..43efd1cbd 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -148,7 +148,7 @@ Result> ProjectStructArray( return output_array; } -/// Templated implementation for projecting list arrays. +/// \brief Templated implementation for projecting list arrays. /// Works with both ListArray/ListType (32-bit offsets) and /// LargeListArray/LargeListType (64-bit offsets). template diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 7bd4a5774..173f17e8f 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -41,6 +41,7 @@ #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" +#include "iceberg/update/update_snapshot_reference.h" #include "iceberg/update/update_sort_order.h" #include "iceberg/update/update_statistics.h" #include "iceberg/util/checked_cast.h" @@ -159,11 +160,7 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; - case PendingUpdate::Kind::kUpdateSortOrder: { - auto& update_sort_order = internal::checked_cast(update); - ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply()); - metadata_builder_->SetDefaultSortOrder(std::move(sort_order)); - } break; + case PendingUpdate::Kind::kUpdateSnapshot: { const auto& base = metadata_builder_->current(); @@ -200,6 +197,29 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->AssignUUID(); } } break; + case PendingUpdate::Kind::kUpdateSnapshotReference: { + auto& update_ref = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply()); + const auto& current_refs = current().refs; + // Identify references which have been removed + for (const auto& [name, ref] : current_refs) { + if (updated_refs.find(name) == updated_refs.end()) { + metadata_builder_->RemoveRef(name); + } + } + // Identify references which have been created or updated + for (const auto& [name, ref] : updated_refs) { + auto current_it = current_refs.find(name); + if (current_it == current_refs.end() || *current_it->second != *ref) { + metadata_builder_->SetRef(name, ref); + } + } + } break; + case PendingUpdate::Kind::kUpdateSortOrder: { + auto& update_sort_order = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply()); + metadata_builder_->SetDefaultSortOrder(std::move(sort_order)); + } break; case PendingUpdate::Kind::kUpdateStatistics: { auto& update_statistics = internal::checked_cast(update); ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply()); @@ -210,6 +230,7 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveStatistics(snapshot_id); } } break; + default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -335,4 +356,12 @@ Result> Transaction::NewUpdateStatistics() { return update_statistics; } +Result> +Transaction::NewUpdateSnapshotReference() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_ref, + UpdateSnapshotReference::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref)); + return update_ref; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 6d7816ee8..f7110db81 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -97,6 +97,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewFastAppend(); + /// \brief Create a new UpdateSnapshotReference to update snapshot references (branches + /// and tags) and commit the changes. + Result> NewUpdateSnapshotReference(); + private: Transaction(std::shared_ptr table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 2e099c1b5..9e21088f5 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -198,6 +198,7 @@ class UpdateLocation; class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; +class UpdateSnapshotReference; class UpdateSortOrder; class UpdateStatistics; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 8dc92c001..e00b1e6e1 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -25,6 +25,7 @@ install_headers( 'update_location.h', 'update_partition_spec.h', 'update_schema.h', + 'update_snapshot_reference.h', 'update_sort_order.h', 'update_properties.h', 'update_statistics.h', diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 0c0b6e3e9..e5d583d13 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -49,6 +49,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateProperties, kUpdateSchema, kUpdateSnapshot, + kUpdateSnapshotReference, kUpdateSortOrder, kUpdateStatistics, }; diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc new file mode 100644 index 000000000..9c57c0bbb --- /dev/null +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_snapshot_reference.h" + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result> UpdateSnapshotReference::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateSnapshotReference without a transaction"); + return std::shared_ptr( + new UpdateSnapshotReference(std::move(transaction))); +} + +UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + // Initialize updated_refs_ with current refs from base metadata + for (const auto& [name, ref] : base().refs) { + updated_refs_[name] = ref; + } +} + +UpdateSnapshotReference::~UpdateSnapshotReference() = default; + +UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto branch, SnapshotRef::MakeBranch(snapshot_id)); + auto [_, inserted] = updated_refs_.emplace(name, std::move(branch)); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto tag, SnapshotRef::MakeTag(snapshot_id)); + auto [_, inserted] = updated_refs_.emplace(name, std::move(tag)); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, + "Ref '{}' is a branch not a tag", name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch( + const std::string& name, const std::string& new_name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty"); + ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty"); + ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + auto [_, inserted] = updated_refs_.emplace(new_name, it->second); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + it->second->snapshot_id = snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from, + const std::string& to) { + return ReplaceBranchInternal(from, to, false); +} + +UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from, + const std::string& to) { + return ReplaceBranchInternal(from, to, true); +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( + const std::string& from, const std::string& to, bool fast_forward) { + ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty"); + ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty"); + auto to_it = updated_refs_.find(to); + ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to); + + auto from_it = updated_refs_.find(from); + if (from_it == updated_refs_.end()) { + return CreateBranch(from, to_it->second->snapshot_id); + } + + ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", from); + + // Nothing to replace if snapshot IDs are the same + if (to_it->second->snapshot_id == from_it->second->snapshot_id) { + return *this; + } + + if (fast_forward) { + // Fast-forward is valid only when the current branch (from) is an ancestor of the + // target (to), i.e. we are moving forward in history. + const auto& base_metadata = transaction_->current(); + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + auto from_is_ancestor_of_to, + SnapshotUtil::IsAncestorOf( + to_it->second->snapshot_id, from_it->second->snapshot_id, + [&base_metadata](int64_t id) { return base_metadata.SnapshotById(id); })); + + ICEBERG_BUILDER_CHECK(from_is_ancestor_of_to, + "Cannot fast-forward: {} is not an ancestor of {}", from, to); + } + + from_it->second->snapshot_id = to_it->second->snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, + "Ref '{}' is a branch not a tag", name); + it->second->snapshot_id = snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep( + const std::string& name, int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + std::get(it->second->retention).min_snapshots_to_keep = + min_snapshots_to_keep; + ICEBERG_BUILDER_CHECK(it->second->Validate(), + "Invalid min_snapshots_to_keep {} for branch '{}'", + min_snapshots_to_keep, name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs( + const std::string& name, int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + std::get(it->second->retention).max_snapshot_age_ms = + max_snapshot_age_ms; + ICEBERG_BUILDER_CHECK(it->second->Validate(), + "Invalid max_snapshot_age_ms {} for branch '{}'", + max_snapshot_age_ms, name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name); + if (it->second->type() == SnapshotRefType::kBranch) { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + } else { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + } + ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'", + max_ref_age_ms, name); + return *this; +} + +Result>> +UpdateSnapshotReference::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + return updated_refs_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h new file mode 100644 index 000000000..84474c045 --- /dev/null +++ b/src/iceberg/update/update_snapshot_reference.h @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_snapshot_reference.h + +namespace iceberg { + +/// \brief Updates snapshot references. +/// +/// TODO(xxx): Add SetSnapshotOperation operations such as setCurrentSnapshot, +/// rollBackTime, rollbackTo to this class so that we can support those operations for +/// refs. +class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateSnapshotReference() override; + + /// \brief Create a branch reference. + /// + /// \param name The branch name + /// \param snapshot_id The snapshot ID for the branch + /// \return Reference to this for method chaining + UpdateSnapshotReference& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a tag reference. + /// + /// \param name The tag name + /// \param snapshot_id The snapshot ID for the tag + /// \return Reference to this for method chaining + UpdateSnapshotReference& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch reference. + /// + /// \param name The branch name to remove + /// \return Reference to this for method chaining + UpdateSnapshotReference& RemoveBranch(const std::string& name); + + /// \brief Remove a tag reference. + /// + /// \param name The tag name to remove + /// \return Reference to this for method chaining + UpdateSnapshotReference& RemoveTag(const std::string& name); + + /// \brief Rename a branch reference. + /// + /// \param name The current branch name + /// \param new_name The new branch name + /// \return Reference to this for method chaining + UpdateSnapshotReference& RenameBranch(const std::string& name, + const std::string& new_name); + + /// \brief Replace a branch reference with a new snapshot ID. + /// + /// \param name The branch name + /// \param snapshot_id The new snapshot ID + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replace a branch reference with another reference's snapshot ID. + /// + /// \param from The branch name to update + /// \param to The reference name to copy the snapshot ID from + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Fast-forward a branch to another reference's snapshot ID. + /// + /// This is similar to ReplaceBranch but validates that the target snapshot is an + /// ancestor of the current branch snapshot. + /// + /// \param from The branch name to update + /// \param to The reference name to copy the snapshot ID from + /// \return Reference to this for method chaining + UpdateSnapshotReference& FastForward(const std::string& from, const std::string& to); + + /// \brief Replace a tag reference with a new snapshot ID. + /// + /// \param name The tag name + /// \param snapshot_id The new snapshot ID + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Set the minimum number of snapshots to keep for a branch. + /// + /// \param name The branch name + /// \param min_snapshots_to_keep The minimum number of snapshots to keep + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMinSnapshotsToKeep(const std::string& name, + int32_t min_snapshots_to_keep); + + /// \brief Set the maximum snapshot age in milliseconds for a branch. + /// + /// \param name The branch name + /// \param max_snapshot_age_ms The maximum snapshot age in milliseconds + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMaxSnapshotAgeMs(const std::string& name, + int64_t max_snapshot_age_ms); + + /// \brief Set the maximum reference age in milliseconds. + /// + /// \param name The reference name + /// \param max_ref_age_ms The maximum reference age in milliseconds + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms); + + Kind kind() const final { return Kind::kUpdateSnapshotReference; } + + /// \brief Apply the pending changes and return the updated references. + Result>> Apply(); + + private: + explicit UpdateSnapshotReference(std::shared_ptr transaction); + + UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from, + const std::string& to, + bool fast_forward); + + std::unordered_map> updated_refs_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index d3d5669cb..84e7a10bc 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -62,6 +62,17 @@ Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, }); } +Result SnapshotUtil::IsAncestorOf( + int64_t snapshot_id, int64_t ancestor_snapshot_id, + const std::function>(int64_t)>& lookup) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, lookup(snapshot_id)); + ICEBERG_CHECK(snapshot != nullptr, "Cannot find snapshot: {}", snapshot_id); + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(snapshot, lookup)); + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& ancestor) { + return ancestor != nullptr && ancestor->snapshot_id == ancestor_snapshot_id; + }); +} + Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index 0a000c691..ce5e02782 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -62,6 +62,17 @@ class ICEBERG_EXPORT SnapshotUtil { static Result IsAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_snapshot_id); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using the + /// given lookup function. + /// + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \param lookup Function to lookup snapshots by ID + /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id + static Result IsAncestorOf( + int64_t snapshot_id, int64_t ancestor_snapshot_id, + const std::function>(int64_t)>& lookup); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current /// state. /// From 0f36ff026ceb1587e9d9067ffeee6cf251eb04a8 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Thu, 22 Jan 2026 13:52:23 +0800 Subject: [PATCH 2/3] fix: review comments --- src/iceberg/transaction.cc | 20 ++---- .../update/update_snapshot_reference.cc | 71 +++++++++++++------ .../update/update_snapshot_reference.h | 16 +++-- 3 files changed, 65 insertions(+), 42 deletions(-) diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 173f17e8f..99b766a45 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -160,7 +160,6 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; - case PendingUpdate::Kind::kUpdateSnapshot: { const auto& base = metadata_builder_->current(); @@ -199,20 +198,12 @@ Status Transaction::Apply(PendingUpdate& update) { } break; case PendingUpdate::Kind::kUpdateSnapshotReference: { auto& update_ref = internal::checked_cast(update); - ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply()); - const auto& current_refs = current().refs; - // Identify references which have been removed - for (const auto& [name, ref] : current_refs) { - if (updated_refs.find(name) == updated_refs.end()) { - metadata_builder_->RemoveRef(name); - } + ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply()); + for (const auto& name : result.to_remove) { + metadata_builder_->RemoveRef(name); } - // Identify references which have been created or updated - for (const auto& [name, ref] : updated_refs) { - auto current_it = current_refs.find(name); - if (current_it == current_refs.end() || *current_it->second != *ref) { - metadata_builder_->SetRef(name, ref); - } + for (auto&& [name, ref] : result.to_set) { + metadata_builder_->SetRef(std::move(name), std::move(ref)); } } break; case PendingUpdate::Kind::kUpdateSortOrder: { @@ -230,7 +221,6 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveStatistics(snapshot_id); } } break; - default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc index 9c57c0bbb..74588e942 100644 --- a/src/iceberg/update/update_snapshot_reference.cc +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -24,11 +24,9 @@ #include #include -#include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/transaction.h" -#include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" @@ -43,12 +41,8 @@ Result> UpdateSnapshotReference::Make( } UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)) { - // Initialize updated_refs_ with current refs from base metadata - for (const auto& [name, ref] : base().refs) { - updated_refs_[name] = ref; - } -} + : PendingUpdate(std::move(transaction)), + updated_refs_(base().refs.begin(), base().refs.end()) {} UpdateSnapshotReference::~UpdateSnapshotReference() = default; @@ -113,7 +107,9 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::strin ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, "Ref '{}' is a tag not a branch", name); - it->second->snapshot_id = snapshot_id; + // Clone the ref before modifying to avoid affecting base metadata + auto cloned = it->second->Clone(snapshot_id); + it->second = std::shared_ptr(cloned.release()); return *this; } @@ -161,7 +157,9 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( "Cannot fast-forward: {} is not an ancestor of {}", from, to); } - from_it->second->snapshot_id = to_it->second->snapshot_id; + // Clone the ref before modifying to avoid affecting base metadata + auto cloned = from_it->second->Clone(to_it->second->snapshot_id); + from_it->second = std::shared_ptr(cloned.release()); return *this; } @@ -172,7 +170,9 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, "Ref '{}' is a branch not a tag", name); - it->second->snapshot_id = snapshot_id; + // Clone the ref before modifying to avoid affecting base metadata + auto cloned = it->second->Clone(snapshot_id); + it->second = std::shared_ptr(cloned.release()); return *this; } @@ -183,11 +183,14 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep( ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, "Ref '{}' is a tag not a branch", name); - std::get(it->second->retention).min_snapshots_to_keep = + // Clone the ref before modifying to avoid affecting base metadata + auto cloned = it->second->Clone(); + std::get(cloned->retention).min_snapshots_to_keep = min_snapshots_to_keep; - ICEBERG_BUILDER_CHECK(it->second->Validate(), + ICEBERG_BUILDER_CHECK(cloned->Validate(), "Invalid min_snapshots_to_keep {} for branch '{}'", min_snapshots_to_keep, name); + it->second = std::shared_ptr(cloned.release()); return *this; } @@ -198,11 +201,14 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs( ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, "Ref '{}' is a tag not a branch", name); - std::get(it->second->retention).max_snapshot_age_ms = + // Clone the ref before modifying to avoid affecting base metadata + auto cloned = it->second->Clone(); + std::get(cloned->retention).max_snapshot_age_ms = max_snapshot_age_ms; - ICEBERG_BUILDER_CHECK(it->second->Validate(), + ICEBERG_BUILDER_CHECK(cloned->Validate(), "Invalid max_snapshot_age_ms {} for branch '{}'", max_snapshot_age_ms, name); + it->second = std::shared_ptr(cloned.release()); return *this; } @@ -211,20 +217,41 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::stri ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty"); auto it = updated_refs_.find(name); ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name); - if (it->second->type() == SnapshotRefType::kBranch) { - std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + // Clone the ref before modifying to avoid affecting base metadata + auto cloned = it->second->Clone(); + if (cloned->type() == SnapshotRefType::kBranch) { + std::get(cloned->retention).max_ref_age_ms = max_ref_age_ms; } else { - std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + std::get(cloned->retention).max_ref_age_ms = max_ref_age_ms; } - ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'", + ICEBERG_BUILDER_CHECK(cloned->Validate(), "Invalid max_ref_age_ms {} for ref '{}'", max_ref_age_ms, name); + it->second = std::shared_ptr(cloned.release()); return *this; } -Result>> -UpdateSnapshotReference::Apply() { +Result UpdateSnapshotReference::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - return updated_refs_; + + ApplyResult result; + const auto& current_refs = base().refs; + + // Identify references which have been removed + for (const auto& [name, ref] : current_refs) { + if (updated_refs_.find(name) == updated_refs_.end()) { + result.to_remove.push_back(name); + } + } + + // Identify references which have been created or updated + for (const auto& [name, ref] : updated_refs_) { + auto current_it = current_refs.find(name); + if (current_it == current_refs.end() || *current_it->second != *ref) { + result.to_set.emplace_back(name, ref); + } + } + + return result; } } // namespace iceberg diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h index 84474c045..066f43ac3 100644 --- a/src/iceberg/update/update_snapshot_reference.h +++ b/src/iceberg/update/update_snapshot_reference.h @@ -35,9 +35,8 @@ namespace iceberg { /// \brief Updates snapshot references. /// -/// TODO(xxx): Add SetSnapshotOperation operations such as setCurrentSnapshot, -/// rollBackTime, rollbackTo to this class so that we can support those operations for -/// refs. +/// TODO(xxx): Add SetSnapshot operations such as SetCurrentSnapshot, RollBackTime, +/// RollbackTo to this class so that we can support those operations for refs. class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { public: static Result> Make( @@ -136,8 +135,15 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { Kind kind() const final { return Kind::kUpdateSnapshotReference; } - /// \brief Apply the pending changes and return the updated references. - Result>> Apply(); + struct ApplyResult { + /// References to set or update (name, ref pairs) + std::vector>> to_set; + /// Reference names to remove + std::vector to_remove; + }; + + /// \brief Apply the pending changes and return the updated and removed references. + Result Apply(); private: explicit UpdateSnapshotReference(std::shared_ptr transaction); From 8c4fb8ff0de8e3c5ed0f5ee5f0515b8053cd14e5 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 22 Jan 2026 14:41:28 +0800 Subject: [PATCH 3/3] simplify a little bit --- .../update/update_snapshot_reference.cc | 53 +++++++------------ .../update/update_snapshot_reference.h | 1 - 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc index 74588e942..923f0c8df 100644 --- a/src/iceberg/update/update_snapshot_reference.cc +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -41,8 +41,7 @@ Result> UpdateSnapshotReference::Make( } UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)), - updated_refs_(base().refs.begin(), base().refs.end()) {} + : PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {} UpdateSnapshotReference::~UpdateSnapshotReference() = default; @@ -107,20 +106,18 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::strin ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, "Ref '{}' is a tag not a branch", name); - // Clone the ref before modifying to avoid affecting base metadata - auto cloned = it->second->Clone(snapshot_id); - it->second = std::shared_ptr(cloned.release()); + it->second = it->second->Clone(snapshot_id); return *this; } UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from, const std::string& to) { - return ReplaceBranchInternal(from, to, false); + return ReplaceBranchInternal(from, to, /*fast_forward=*/false); } UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from, const std::string& to) { - return ReplaceBranchInternal(from, to, true); + return ReplaceBranchInternal(from, to, /*fast_forward=*/true); } UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( @@ -157,9 +154,7 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( "Cannot fast-forward: {} is not an ancestor of {}", from, to); } - // Clone the ref before modifying to avoid affecting base metadata - auto cloned = from_it->second->Clone(to_it->second->snapshot_id); - from_it->second = std::shared_ptr(cloned.release()); + from_it->second = from_it->second->Clone(to_it->second->snapshot_id); return *this; } @@ -170,9 +165,7 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, "Ref '{}' is a branch not a tag", name); - // Clone the ref before modifying to avoid affecting base metadata - auto cloned = it->second->Clone(snapshot_id); - it->second = std::shared_ptr(cloned.release()); + it->second = it->second->Clone(snapshot_id); return *this; } @@ -183,14 +176,12 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep( ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, "Ref '{}' is a tag not a branch", name); - // Clone the ref before modifying to avoid affecting base metadata - auto cloned = it->second->Clone(); - std::get(cloned->retention).min_snapshots_to_keep = + it->second = it->second->Clone(); + std::get(it->second->retention).min_snapshots_to_keep = min_snapshots_to_keep; - ICEBERG_BUILDER_CHECK(cloned->Validate(), + ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid min_snapshots_to_keep {} for branch '{}'", min_snapshots_to_keep, name); - it->second = std::shared_ptr(cloned.release()); return *this; } @@ -201,14 +192,12 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs( ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, "Ref '{}' is a tag not a branch", name); - // Clone the ref before modifying to avoid affecting base metadata - auto cloned = it->second->Clone(); - std::get(cloned->retention).max_snapshot_age_ms = + it->second = it->second->Clone(); + std::get(it->second->retention).max_snapshot_age_ms = max_snapshot_age_ms; - ICEBERG_BUILDER_CHECK(cloned->Validate(), + ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_snapshot_age_ms {} for branch '{}'", max_snapshot_age_ms, name); - it->second = std::shared_ptr(cloned.release()); return *this; } @@ -217,16 +206,14 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::stri ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty"); auto it = updated_refs_.find(name); ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name); - // Clone the ref before modifying to avoid affecting base metadata - auto cloned = it->second->Clone(); - if (cloned->type() == SnapshotRefType::kBranch) { - std::get(cloned->retention).max_ref_age_ms = max_ref_age_ms; + it->second = it->second->Clone(); + if (it->second->type() == SnapshotRefType::kBranch) { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; } else { - std::get(cloned->retention).max_ref_age_ms = max_ref_age_ms; + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; } - ICEBERG_BUILDER_CHECK(cloned->Validate(), "Invalid max_ref_age_ms {} for ref '{}'", + ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'", max_ref_age_ms, name); - it->second = std::shared_ptr(cloned.release()); return *this; } @@ -238,15 +225,15 @@ Result UpdateSnapshotReference::Apply() { // Identify references which have been removed for (const auto& [name, ref] : current_refs) { - if (updated_refs_.find(name) == updated_refs_.end()) { + if (!updated_refs_.contains(name)) { result.to_remove.push_back(name); } } // Identify references which have been created or updated for (const auto& [name, ref] : updated_refs_) { - auto current_it = current_refs.find(name); - if (current_it == current_refs.end() || *current_it->second != *ref) { + if (auto iter = current_refs.find(name); + iter == current_refs.end() || *iter->second != *ref) { result.to_set.emplace_back(name, ref); } } diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h index 066f43ac3..e13f5bfa9 100644 --- a/src/iceberg/update/update_snapshot_reference.h +++ b/src/iceberg/update/update_snapshot_reference.h @@ -20,7 +20,6 @@ #pragma once #include -#include #include #include