From 26b96bf353a9e548db2ad16ab834126e207ab76c Mon Sep 17 00:00:00 2001 From: yuwmao Date: Tue, 27 Jan 2026 18:12:39 +0800 Subject: [PATCH] Adapt to replace member callback changes --- conanfile.py | 2 +- src/lib/homestore_backend/hs_homeobject.hpp | 55 ++++--- src/lib/homestore_backend/hs_pg_manager.cpp | 143 +++++++++--------- .../replication_state_machine.cpp | 26 ++-- .../replication_state_machine.hpp | 15 +- 5 files changed, 130 insertions(+), 111 deletions(-) diff --git a/conanfile.py b/conanfile.py index b707b7e7..41d327e6 100644 --- a/conanfile.py +++ b/conanfile.py @@ -50,7 +50,7 @@ def build_requirements(self): def requirements(self): self.requires("sisl/[^13.0]@oss/master", transitive_headers=True) - self.requires("homestore/[^7.3]@oss/master") + self.requires("homestore/[^7.4]@oss/master") self.requires("iomgr/[^12.0]@oss/master") def validate(self): diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index f2f14d93..272f6b61 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -771,34 +771,41 @@ class HSHomeObject : public HomeObjectImpl { * @brief Function invoked when start a member replacement * * @param group_id The group id of replication device. - * @param member_out Member which is removed from group - * @param member_in Member which is added to group + * @param ctx The replace member context containing task_id, member_out, member_in + * @param member_ids Complete list of member IDs from raft config (single source of truth) + * @param tid Trace ID * */ - void on_pg_start_replace_member(homestore::group_id_t group_id, const std::string& task_id, - const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid); + void on_pg_start_replace_member(homestore::group_id_t group_id, const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid); /** * @brief Function invoked when complete a member replacement * + * IMPORTANT: This callback must be idempotent as it may be called multiple times during log replay. + * * @param group_id The group id of replication device. - * @param member_out Member which is removed from group - * @param member_in Member which is added to group + * @param ctx The replace member context containing task_id, member_out, member_in + * @param member_ids Complete list of member IDs from consensus layer (single source of truth) + * @param tid Trace ID * */ - void on_pg_complete_replace_member(homestore::group_id_t group_id, const std::string& task_id, - const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid); + void on_pg_complete_replace_member(homestore::group_id_t group_id, const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid); /** * @brief Called when clean replace member task (rollback) - * @param group_id Group ID - * @param task_id Task ID - * @param member_out Member which should be restored to group - * @param member_in Member which should be removed from group - * */ - void on_pg_clean_replace_member_task(homestore::group_id_t group_id, const std::string& task_id, - const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid); + * + * IMPORTANT: This callback must be idempotent as it may be called multiple times during log replay. + * + * @param group_id The group id of replication device. + * @param ctx The replace member context containing task_id, member_out, member_in + * @param member_ids Complete list of member IDs from consensus layer (single source of truth) + * @param tid Trace ID + */ + void on_pg_clean_replace_member_task(homestore::group_id_t group_id, const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid); void on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid = 0); @@ -1068,6 +1075,18 @@ class HSHomeObject : public HomeObjectImpl { } homestore::replica_member_info to_replica_member_info(const PGMember& pg_member) const; PGMember to_pg_member(const homestore::replica_member_info& replica_info) const; + + /** + * @brief Reconcile PG membership with authoritative member ID list from replication layer + * Preserves existing member metadata (name, priority) where possible + * + * @param existing_members Current PG members (for metadata preservation) + * @param member_ids Authoritative member IDs from replication consensus + * @return New membership set reconciled with the authoritative list + */ + std::set reconcile_membership_with_config( + const std::set& existing_members, + const std::vector& member_ids); }; class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks { diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 548af166..eb1377ed 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -339,97 +339,107 @@ replica_member_info HSHomeObject::to_replica_member_info(const PGMember& pg_memb return replica_info; } -void HSHomeObject::on_pg_start_replace_member(group_id_t group_id, const std::string& task_id, - const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) { +std::set< PGMember > HSHomeObject::reconcile_membership_with_config(const std::set< PGMember >& existing_members, + const std::vector< replica_id_t >& member_ids) { + std::set< PGMember > new_members; + for (auto const& id : member_ids) { + // Try to preserve existing member metadata (name, priority) + auto existing = existing_members.find(PGMember(id)); + if (existing != existing_members.end()) { + new_members.insert(*existing); // Keep name and priority + } else { + // New member not in current membership, add with just ID + new_members.emplace(id); + } + } + return new_members; +} + +void HSHomeObject::on_pg_start_replace_member(group_id_t group_id, const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) { std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; if (pg_repl_dev(*pg).group_id() == group_id) { - // Remove the old member and add the new member auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); - pg->pg_info_.members.emplace(std::move(to_pg_member(member_in))); - pg->pg_info_.members.emplace(std::move(to_pg_member(member_out))); - - uint32_t i{0}; - pg_members* sb_members = hs_pg->pg_sb_->get_pg_members_mutable(); - for (auto const& m : pg->pg_info_.members) { - sb_members[i].id = m.id; - DEBUG_ASSERT(m.name.size() <= PGMember::max_name_len, "member name exceeds max len, name={}", m.name); - auto name_len = std::min(m.name.size(), PGMember::max_name_len); - std::strncpy(sb_members[i].name, m.name.c_str(), name_len); - sb_members[i].name[name_len] = '\0'; - sb_members[i].priority = m.priority; - ++i; - } - hs_pg->pg_sb_->num_dynamic_members = pg->pg_info_.members.size(); - // Update the latest membership info to pg superblk. - hs_pg->pg_sb_.write(); - LOGI("PG start replace member done, task_id={} member_out={} member_in={}, member_nums={}, trace_id={}", - task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), - pg->pg_info_.members.size(), tid); + + // Build membership from raft config + auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, member_ids); + + // Start phase: add replica_in with full metadata from ctx, on_pg_start_replace_member is called before the new member is added to the group. + new_members.emplace(to_pg_member(ctx.replica_in)); + + // Apply to PG + hs_pg->pg_info_.members = std::move(new_members); + hs_pg->update_membership(hs_pg->pg_info_.members); + + LOGI("PG start replace member done: task_id={} member_out={} member_in={}, member_nums={}, trace_id={}", + ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), + hs_pg->pg_info_.members.size(), tid); return; } } - LOGE("PG replace member failed task_id={}, member_out={} member_in={}, trace_id={}", task_id, - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); + LOGE("PG start replace member failed, group_id not found, task_id={}, member_out={} member_in={}, trace_id={}", + ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), tid); } -void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std::string& task_id, - const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) { +void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) { std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; if (pg_repl_dev(*pg).group_id() == group_id) { - // Remove the old member and add the new member auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); - pg->pg_info_.members.erase(PGMember(member_out.id)); - pg->pg_info_.members.emplace(std::move(to_pg_member(member_in))); - hs_pg->update_membership(pg->pg_info_.members); - LOGI("PG complete replace member done member_out={} member_in={}, member_nums={}, trace_id={}", - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), - pg->pg_info_.members.size(), tid); + + // Build membership from raft config + auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, member_ids); + + // Complete phase: ensure replica_out is removed (defense in depth), on_pg_complete_replace_member should be called after the replica_out is verified to be removed. + new_members.erase(PGMember(ctx.replica_out.id)); + + // Apply to PG + hs_pg->pg_info_.members = std::move(new_members); + hs_pg->update_membership(hs_pg->pg_info_.members); + + LOGI("PG complete replace member done: member_out={} member_in={}, member_nums={}, trace_id={}", + boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), + hs_pg->pg_info_.members.size(), tid); return; } } - LOGE("PG complete replace member failed member_out={} member_in={}, trace_id={}", - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); + LOGE("PG complete replace member failed, group_id not found, member_out={} member_in={}, trace_id={}", + boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), tid); } -//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists -void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id, - const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) { +//It will reconcile the membership with raft in clean/rollback phase. +void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) { std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; if (pg_repl_dev(*pg).group_id() == group_id) { auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); - // Remove the in_member (the one that was added but now needs to be removed) - auto removed_count = pg->pg_info_.members.erase(PGMember(member_in.id)); - - // Ensure out_member exists - // Using emplace is safe - it won't overwrite if already exists - auto out_pg_member = to_pg_member(member_out); - auto [it, inserted] = pg->pg_info_.members.emplace(std::move(out_pg_member)); + // Clean phase: sync membership to raft config (rofllback scenario) + auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, member_ids); - // Update superblock - hs_pg->update_membership(pg->pg_info_.members); + // Apply to PG + hs_pg->pg_info_.members = std::move(new_members); + hs_pg->update_membership(hs_pg->pg_info_.members); - LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), " - "ensured out_member={} (inserted={}), member_nums={}, trace_id={}", - task_id, boost::uuids::to_string(member_in.id), removed_count, - boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid); + LOGI("PG clean replace member task done: task_id={}, member_nums={}, trace_id={}", + ctx.task_id, hs_pg->pg_info_.members.size(), tid); return; } } - LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id, - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); + LOGE("PG clean replace member task failed, group_id not found, task_id={}, trace_id={}", ctx.task_id, tid); } + bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { std::unique_lock lck(_pg_lock); auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id)); @@ -446,20 +456,9 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { return false; } - // Build new member set from actual members - std::set< PGMember > new_members; - for (auto& member_id : actual_members) { - auto existing = hs_pg->pg_info_.members.find(PGMember(member_id)); - if (existing != hs_pg->pg_info_.members.end()) { - // Keep the existing member with its name and priority - new_members.insert(*existing); - } else { - // New member not in our records, add with default name - PGMember new_member(member_id); - new_members.insert(std::move(new_member)); - LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id); - } - } + // Build new member set by reconciling with actual members from replication layer + auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, actual_members); + // Check if membership changed if (new_members == hs_pg->pg_info_.members) { LOGD("Membership already in sync for pg={}, no update needed", hs_pg->pg_info_.id); @@ -606,7 +605,7 @@ PGManager::NullAsyncResult HSHomeObject::_remove_member(pg_id_t pg_id, peer_id_t LOGI("PG remove member, pg_id={}, member={} trace_id={}", pg_id, boost::uuids::to_string(member_id), tid); return hs_repl_service() - .remove_member(group_id, member_id, commit_quorum, tid) + .remove_member(group_id, member_id, commit_quorum, true, tid) .via(executor_) .thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult { decr_pending_request_num(); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index a508bad9..6d6d5e53 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -261,24 +261,22 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t return homestore::blk_alloc_hints(); } -void ReplicationStateMachine::on_start_replace_member(const std::string& task_id, - const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid) { - home_object_->on_pg_start_replace_member(repl_dev()->group_id(), task_id, member_out, member_in, tid); +void ReplicationStateMachine::on_start_replace_member(const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) { + home_object_->on_pg_start_replace_member(repl_dev()->group_id(), ctx, member_ids, tid); } -void ReplicationStateMachine::on_complete_replace_member(const std::string& task_id, - const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, - trace_id_t tid) { - home_object_->on_pg_complete_replace_member(repl_dev()->group_id(), task_id, member_out, member_in, tid); +void ReplicationStateMachine::on_complete_replace_member(const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) { + home_object_->on_pg_complete_replace_member(repl_dev()->group_id(), ctx, member_ids, tid); } -void ReplicationStateMachine::on_clean_replace_member_task(const std::string& task_id, - const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, - trace_id_t tid) { - home_object_->on_pg_clean_replace_member_task(repl_dev()->group_id(), task_id, member_out, member_in, tid); +void ReplicationStateMachine::on_clean_replace_member_task(const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) { + home_object_->on_pg_clean_replace_member_task(repl_dev()->group_id(), ctx, member_ids, tid); } void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) { diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 77906c8a..6aafa173 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -176,16 +176,19 @@ class ReplicationStateMachine : public homestore::ReplDevListener { cintrusive< homestore::repl_req_ctx >& hs_ctx) override; /// @brief Called when start replacing an existing member with a new member - void on_start_replace_member(const std::string& task_id, const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid = 0) override; + void on_start_replace_member(const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) override; /// @brief Called when complete replacing an existing member with a new member - void on_complete_replace_member(const std::string& task_id, const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid = 0) override; + void on_complete_replace_member(const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) override; /// @brief Called when clean replace member task (rollback) - void on_clean_replace_member_task(const std::string& task_id, const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid = 0) override; + void on_clean_replace_member_task(const homestore::replace_member_ctx& ctx, + const std::vector< homestore::replica_id_t >& member_ids, + homestore::trace_id_t tid) override; /// @brief Called when the replica is being destroyed by nuraft; void on_destroy(const homestore::group_id_t& group_id) override;