From 8637a9a2d492be1b5fdbe5e9988b8269ffc27387 Mon Sep 17 00:00:00 2001 From: yuwmao Date: Tue, 27 Jan 2026 15:34:31 +0800 Subject: [PATCH] Refactor replace member - Provide the entire membership list in all pg move cb to avoid some potential inconsistency between raft and the user's meta. - Use json config instead of raft server memory config. It's because the log might be replayed during restart when the raft server is not initiated. - When raft commit a config, it calls save_config before applying to raft memory server config(reconfigure()). --- conanfile.py | 2 +- .../homestore/replication/repl_decls.h | 17 +++ src/include/homestore/replication/repl_dev.h | 20 ++- .../replication/repl_dev/raft_repl_dev.cpp | 142 +++++++++++------- src/lib/replication/repl_dev/raft_repl_dev.h | 17 --- src/tests/test_common/raft_repl_test_base.hpp | 27 ++-- src/tests/test_solo_repl_dev.cpp | 12 +- 7 files changed, 140 insertions(+), 97 deletions(-) diff --git a/conanfile.py b/conanfile.py index 637a67aed..6e5725a6d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.3.3" + version = "7.4.0" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index 9e8531a17..9202f6ce5 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -113,6 +113,23 @@ struct replace_member_task { replica_id_t replica_in; // The replica which is going to be added in place of replica_out }; +// Context for replace member operations (used in callbacks) +static constexpr size_t max_replace_member_task_id_len = 64; +struct replace_member_ctx { + char task_id[max_replace_member_task_id_len]; + replica_member_info replica_out; + replica_member_info replica_in; + + replace_member_ctx() = default; + replace_member_ctx(const std::string& id, const replica_member_info& out, const replica_member_info& in) { + auto len = std::min(id.length(), max_replace_member_task_id_len - 1); + std::strncpy(task_id, id.c_str(), len); + task_id[len] = '\0'; + replica_out = out; + replica_in = in; + } +}; + } // namespace homestore // hash function definitions diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 58688f341..6070c473e 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -397,16 +397,22 @@ class ReplDevListener { virtual void on_destroy(const group_id_t& group_id) = 0; /// @brief Called when start replace member. - virtual void on_start_replace_member(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) = 0; + /// @param ctx Replace member context with task_id and member info + /// @param member_ids Current complete membership ID list from raft config + virtual void on_start_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) = 0; /// @brief Called when complete replace member. - virtual void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) = 0; + /// @param ctx Replace member context with task_id and member info + /// @param member_ids Current complete membership ID list from raft config + virtual void on_complete_replace_member(const replace_member_ctx& ctx, + const std::vector< replica_id_t >& member_ids, trace_id_t tid) = 0; /// @brief Called when clean replace member task (rollback). - virtual void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) = 0; + /// @param ctx Replace member context with task_id and member info + /// @param member_ids Current complete membership ID list from raft config + virtual void on_clean_replace_member_task(const replace_member_ctx& ctx, + const std::vector< replica_id_t >& member_ids, trace_id_t tid) = 0; /// @brief Called when remove a member. virtual void on_remove_member(const replica_id_t& member, trace_id_t tid) = 0; @@ -453,7 +459,7 @@ class ReplDevListener { virtual void on_no_space_left(repl_lsn_t lsn, sisl::blob const& header) = 0; /// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer - virtual void on_log_replay_done(const group_id_t& group_id) {}; + virtual void on_log_replay_done(const group_id_t& group_id){}; virtual void on_become_leader(const group_id_t& group_id) {}; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 63b052b31..69dba8343 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -4,7 +4,7 @@ #include #include #include - +#include #include #include #include @@ -24,8 +24,6 @@ #include "fetch_data_rpc_generated.h" #include -#include - namespace homestore { std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1}; @@ -1815,13 +1813,34 @@ void RaftReplDev::start_replace_member(repl_req_ptr_t rreq) { RD_LOGI(rreq->traceID(), "Raft repl start_replace_member commit, task_id={} member_out={} member_in={}", ctx->task_id, boost::uuids::to_string(ctx->replica_out.id), boost::uuids::to_string(ctx->replica_in.id)); - m_listener->on_start_replace_member(ctx->task_id, ctx->replica_out, ctx->replica_in, rreq->traceID()); + // Get current membership ID list from raft config + std::vector< replica_id_t > member_ids; + auto config = load_config(); + RD_REL_ASSERT(config, "Unable to load raft config during start_replace_member"); + + auto config_lsn = static_cast< int64_t >(config->get_log_idx()); + if (rreq->lsn() <= config_lsn) { + // This config change was already applied, skip directly + RD_LOGI(rreq->traceID(), "Start replace member: rreq lsn={} <= config lsn={}, skipping (already applied)", + rreq->lsn(), config_lsn); + return; + } + for (auto const& srv : config->get_servers()) { + member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint())); + } + RD_LOGD(rreq->traceID(), + "Start replace member, rreq lsn={} > config lsn={}, passing {} member IDs from persisted config", + rreq->lsn(), config_lsn, member_ids.size()); + // record the replace_member intent std::unique_lock lg{m_sb_mtx}; std::strncpy(m_rd_sb->replace_member_task.task_id, ctx->task_id, max_replace_member_task_id_len); m_rd_sb->replace_member_task.replica_in = ctx->replica_in.id; m_rd_sb->replace_member_task.replica_out = ctx->replica_out.id; m_rd_sb.write(); + lg.unlock(); // Release lock before calling listener + + m_listener->on_start_replace_member(*ctx, member_ids, rreq->traceID()); } void RaftReplDev::complete_replace_member(repl_req_ptr_t rreq) { @@ -1830,7 +1849,24 @@ void RaftReplDev::complete_replace_member(repl_req_ptr_t rreq) { RD_LOGI(rreq->traceID(), "Raft repl complete_replace_member commit, task_id={} member_out={} member_in={}", task_id, boost::uuids::to_string(ctx->replica_out.id), boost::uuids::to_string(ctx->replica_in.id)); - m_listener->on_complete_replace_member(ctx->task_id, ctx->replica_out, ctx->replica_in, rreq->traceID()); + // Get current membership ID list from raft config + std::vector< replica_id_t > member_ids; + auto config = load_config(); + RD_REL_ASSERT(config, "Unable to load raft config during complete_replace_member"); + + auto config_lsn = static_cast< int64_t >(config->get_log_idx()); + if (rreq->lsn() <= config_lsn) { + // This config change was already applied, skip directly + RD_LOGI(rreq->traceID(), "Complete replace member: rreq lsn={} <= config lsn={}, skipping (already applied)", + rreq->lsn(), config_lsn); + return; + } + for (auto const& srv : config->get_servers()) { + member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint())); + } + RD_LOGD(rreq->traceID(), + "Complete replace member, rreq lsn={} > config lsn={}, passing {} member IDs from persisted config", + rreq->lsn(), config_lsn, member_ids.size()); // clear the replace_member intent std::unique_lock lg{m_sb_mtx}; @@ -1841,8 +1877,11 @@ void RaftReplDev::complete_replace_member(repl_req_ptr_t rreq) { m_rd_sb->replace_member_task.task_id); m_rd_sb->replace_member_task = replace_member_task_superblk{}; m_rd_sb.write(); + RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id); } - RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared."); + lg.unlock(); // Release lock before calling listener + + m_listener->on_complete_replace_member(*ctx, member_ids, rreq->traceID()); } void RaftReplDev::remove_member(repl_req_ptr_t rreq) { @@ -1855,49 +1894,49 @@ void RaftReplDev::clean_replace_member_task(repl_req_ptr_t rreq) { auto task_id = std::string(r_cast< const char* >(rreq->header().cbytes())); RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task commit, task_id={}", task_id); - replica_member_info member_out; - replica_member_info member_in; + // Get current membership ID list from raft config + std::vector< replica_id_t > member_ids; + auto config = load_config(); + RD_REL_ASSERT(config, "Unable to load raft config during clean_replace_member_task"); + + auto config_lsn = static_cast< int64_t >(config->get_log_idx()); + if (rreq->lsn() <= config_lsn) { + // This config change was already applied, skip directly + RD_LOGI(rreq->traceID(), "Clean replace member task: rreq lsn={} <= config lsn={}, skipping (already applied)", + rreq->lsn(), config_lsn); + return; + } + for (auto const& srv : config->get_servers()) { + member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint())); + } + RD_LOGD(rreq->traceID(), + "Clean replace member task, rreq lsn={} > config lsn={}, passing {} member IDs from persisted config", + rreq->lsn(), config_lsn, member_ids.size()); - // Step 1: Check and read member info from superblk - { - std::unique_lock lg{m_sb_mtx}; - auto persisted_task_id = get_replace_member_task_id(); - if (persisted_task_id.empty()) { - RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task: task not found, task_id={}", task_id); - return; - } + // Build context from persisted task info + std::unique_lock lg{m_sb_mtx}; + auto persisted_task_id = get_replace_member_task_id(); + if (!persisted_task_id.empty()) { + RD_DBG_ASSERT(persisted_task_id == task_id, + "Invalid task_id in clean_replace_member_task message, received {}, persisted {}", task_id, + persisted_task_id); - if (persisted_task_id != task_id) { - RD_LOGW(rreq->traceID(), - "Raft repl clean_replace_member_task: task_id mismatch, received={}, persisted={}, skip cleaning", - task_id, persisted_task_id); - return; - } + // Build ctx from superblock + replace_member_ctx ctx; + std::strncpy(ctx.task_id, task_id.c_str(), std::min(task_id.length(), max_replace_member_task_id_len - 1)); + ctx.task_id[std::min(task_id.length(), max_replace_member_task_id_len - 1)] = '\0'; + ctx.replica_out.id = m_rd_sb->replace_member_task.replica_out; + ctx.replica_in.id = m_rd_sb->replace_member_task.replica_in; - // Read member info from superblk - member_out.id = m_rd_sb->replace_member_task.replica_out; - member_in.id = m_rd_sb->replace_member_task.replica_in; - } + m_rd_sb->replace_member_task = replace_member_task_superblk{}; + m_rd_sb.write(); - // Step 2: Call listener callback to rollback membership in HomeObject - if (member_out.id != boost::uuids::nil_uuid() && member_in.id != boost::uuids::nil_uuid()) { - RD_LOGI(rreq->traceID(), - "Raft repl clean_replace_member_task, callback to listener, task_id={}, member_out={}, member_in={}", - task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); - m_listener->on_clean_replace_member_task(task_id, member_out, member_in, rreq->traceID()); - } else { - RD_LOGW(rreq->traceID(), "Raft repl clean_replace_member_task: invalid member info, skip callback"); - } + RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id); + lg.unlock(); // Release lock before calling listener - // Step 3: Clear the replace_member task from superblk - { - std::unique_lock lg{m_sb_mtx}; - auto persisted_task_id = get_replace_member_task_id(); - if (!persisted_task_id.empty()) { - m_rd_sb->replace_member_task = replace_member_task_superblk{}; - m_rd_sb.write(); - RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id); - } + m_listener->on_clean_replace_member_task(ctx, member_ids, rreq->traceID()); + } else { + RD_LOGW(rreq->traceID(), "No persisted task found for clean, task_id={}", task_id); } } @@ -2018,19 +2057,14 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const { std::vector< replica_id_t > RaftReplDev::get_replication_quorum() { std::vector< replica_id_t > member_ids; - auto msg_service = group_msg_service(); + auto config = load_config(); + RD_REL_ASSERT(config, "Unable to load raft config during get_replication_quorum"); - if (msg_service) { - std::list< nuraft_mesg::replica_config > cluster_config; - msg_service->get_cluster_config(cluster_config); - for (auto const& config : cluster_config) { - member_ids.push_back(boost::uuids::string_generator()(config.peer_id)); - } - RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in cluster config", member_ids.size()); - } else { - RD_LOGW(NO_TRACE_ID, "get_replication_quorum: msg_service is null, returning empty member list"); + for (auto const& srv : config->get_servers()) { + member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint())); } + RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in raft config", member_ids.size()); return member_ids; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index d97d44cba..06c34a188 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -16,8 +16,6 @@ namespace homestore { -static constexpr uint64_t max_replace_member_task_id_len = 64; - struct replace_member_task_superblk { char task_id[max_replace_member_task_id_len]; replica_id_t replica_out; @@ -43,21 +41,6 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; -struct replace_member_ctx { - char task_id[max_replace_member_task_id_len]; - replica_member_info replica_out; - replica_member_info replica_in; - - replace_member_ctx() = default; - replace_member_ctx(const std::string& id, const replica_member_info& out, const replica_member_info& in) { - auto len = std::min(id.length(), max_replace_member_task_id_len - 1); - std::strncpy(task_id, id.c_str(), len); - task_id[len] = '\0'; - replica_out = out; - replica_in = in; - } -}; - struct truncate_ctx { repl_lsn_t truncation_upper_limit = 0; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 3c313c452..e97cb22c8 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -362,22 +362,25 @@ class TestReplicatedDB : public homestore::ReplDevListener { return hints; } - void on_start_replace_member(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) override { - LOGINFO("[Replica={}] start replace member out {} in {}", g_helper->replica_num(), - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); + void on_start_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) override { + LOGINFO("[Replica={}] start replace member task_id={} out {} in {}, member_count={}", g_helper->replica_num(), + ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), + member_ids.size()); } - void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) override { - LOGINFO("[Replica={}] complete replace member out {} in {}", g_helper->replica_num(), - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); + void on_complete_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) override { + LOGINFO("[Replica={}] complete replace member task_id={} out {} in {}, member_count={}", + g_helper->replica_num(), ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), + boost::uuids::to_string(ctx.replica_in.id), member_ids.size()); } - void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) override { - LOGINFO("[Replica={}] clean replace member task {} out {} in {}", g_helper->replica_num(), task_id, - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); + void on_clean_replace_member_task(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) override { + LOGINFO("[Replica={}] clean replace member task task_id={} out {} in {}, member_count={}", + g_helper->replica_num(), ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), + boost::uuids::to_string(ctx.replica_in.id), member_ids.size()); } void on_remove_member(const replica_id_t& member, trace_id_t tid) override { diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 9b64b6f24..da93ff359 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -133,12 +133,12 @@ class SoloReplDevTest : public testing::Test { cintrusive< repl_req_ctx >& ctx) override { LOGINFO("Received error={} on repl_dev", enum_name(error)); } - void on_start_replace_member(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) override {} - void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) override {} - void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) override {} + void on_start_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) override {} + void on_complete_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) override {} + void on_clean_replace_member_task(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids, + trace_id_t tid) override {} void on_remove_member(const replica_id_t& member, trace_id_t tid) override {} void on_destroy(const group_id_t& group_id) override {} void notify_committed_lsn(int64_t lsn) override {}