Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.3.3"
version = "7.4.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
17 changes: 17 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ struct replace_member_task {
replica_id_t replica_in; // The replica which is going to be added in place of replica_out
};

// Context for replace member operations (used in callbacks)
static constexpr size_t max_replace_member_task_id_len = 64;
struct replace_member_ctx {
char task_id[max_replace_member_task_id_len];
replica_member_info replica_out;
replica_member_info replica_in;

replace_member_ctx() = default;
replace_member_ctx(const std::string& id, const replica_member_info& out, const replica_member_info& in) {
auto len = std::min(id.length(), max_replace_member_task_id_len - 1);
std::strncpy(task_id, id.c_str(), len);
task_id[len] = '\0';
replica_out = out;
replica_in = in;
}
};

} // namespace homestore

// hash function definitions
Expand Down
20 changes: 13 additions & 7 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,22 @@ class ReplDevListener {
virtual void on_destroy(const group_id_t& group_id) = 0;

/// @brief Called when start replace member.
virtual void on_start_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) = 0;
/// @param ctx Replace member context with task_id and member info
/// @param member_ids Current complete membership ID list from raft config
virtual void on_start_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) = 0;

/// @brief Called when complete replace member.
virtual void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) = 0;
/// @param ctx Replace member context with task_id and member info
/// @param member_ids Current complete membership ID list from raft config
virtual void on_complete_replace_member(const replace_member_ctx& ctx,
const std::vector< replica_id_t >& member_ids, trace_id_t tid) = 0;

/// @brief Called when clean replace member task (rollback).
virtual void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) = 0;
/// @param ctx Replace member context with task_id and member info
/// @param member_ids Current complete membership ID list from raft config
virtual void on_clean_replace_member_task(const replace_member_ctx& ctx,
const std::vector< replica_id_t >& member_ids, trace_id_t tid) = 0;

/// @brief Called when remove a member.
virtual void on_remove_member(const replica_id_t& member, trace_id_t tid) = 0;
Expand Down Expand Up @@ -453,7 +459,7 @@ class ReplDevListener {
virtual void on_no_space_left(repl_lsn_t lsn, sisl::blob const& header) = 0;

/// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer
virtual void on_log_replay_done(const group_id_t& group_id) {};
virtual void on_log_replay_done(const group_id_t& group_id){};

virtual void on_become_leader(const group_id_t& group_id) {};

Expand Down
142 changes: 88 additions & 54 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <iomgr/iomgr_flip.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/uuid/nil_generator.hpp>

#include <boost/uuid/string_generator.hpp>
#include <sisl/fds/buffer.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <sisl/grpc/rpc_client.hpp>
Expand All @@ -24,8 +24,6 @@
#include "fetch_data_rpc_generated.h"
#include <nuraft_mesg/common.hpp>

#include <boost/uuid/string_generator.hpp>

namespace homestore {
std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1};

Expand Down Expand Up @@ -1815,13 +1813,34 @@ void RaftReplDev::start_replace_member(repl_req_ptr_t rreq) {
RD_LOGI(rreq->traceID(), "Raft repl start_replace_member commit, task_id={} member_out={} member_in={}",
ctx->task_id, boost::uuids::to_string(ctx->replica_out.id), boost::uuids::to_string(ctx->replica_in.id));

m_listener->on_start_replace_member(ctx->task_id, ctx->replica_out, ctx->replica_in, rreq->traceID());
// Get current membership ID list from raft config
std::vector< replica_id_t > member_ids;
auto config = load_config();
RD_REL_ASSERT(config, "Unable to load raft config during start_replace_member");

auto config_lsn = static_cast< int64_t >(config->get_log_idx());
if (rreq->lsn() <= config_lsn) {
// This config change was already applied, skip directly
RD_LOGI(rreq->traceID(), "Start replace member: rreq lsn={} <= config lsn={}, skipping (already applied)",
rreq->lsn(), config_lsn);
return;
}
for (auto const& srv : config->get_servers()) {
member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint()));
}
RD_LOGD(rreq->traceID(),
"Start replace member, rreq lsn={} > config lsn={}, passing {} member IDs from persisted config",
rreq->lsn(), config_lsn, member_ids.size());

// record the replace_member intent
std::unique_lock lg{m_sb_mtx};
std::strncpy(m_rd_sb->replace_member_task.task_id, ctx->task_id, max_replace_member_task_id_len);
m_rd_sb->replace_member_task.replica_in = ctx->replica_in.id;
m_rd_sb->replace_member_task.replica_out = ctx->replica_out.id;
m_rd_sb.write();
lg.unlock(); // Release lock before calling listener

m_listener->on_start_replace_member(*ctx, member_ids, rreq->traceID());
}

void RaftReplDev::complete_replace_member(repl_req_ptr_t rreq) {
Expand All @@ -1830,7 +1849,24 @@ void RaftReplDev::complete_replace_member(repl_req_ptr_t rreq) {
RD_LOGI(rreq->traceID(), "Raft repl complete_replace_member commit, task_id={} member_out={} member_in={}", task_id,
boost::uuids::to_string(ctx->replica_out.id), boost::uuids::to_string(ctx->replica_in.id));

m_listener->on_complete_replace_member(ctx->task_id, ctx->replica_out, ctx->replica_in, rreq->traceID());
// Get current membership ID list from raft config
std::vector< replica_id_t > member_ids;
auto config = load_config();
RD_REL_ASSERT(config, "Unable to load raft config during complete_replace_member");

auto config_lsn = static_cast< int64_t >(config->get_log_idx());
if (rreq->lsn() <= config_lsn) {
// This config change was already applied, skip directly
RD_LOGI(rreq->traceID(), "Complete replace member: rreq lsn={} <= config lsn={}, skipping (already applied)",
rreq->lsn(), config_lsn);
return;
}
for (auto const& srv : config->get_servers()) {
member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint()));
}
RD_LOGD(rreq->traceID(),
"Complete replace member, rreq lsn={} > config lsn={}, passing {} member IDs from persisted config",
rreq->lsn(), config_lsn, member_ids.size());

// clear the replace_member intent
std::unique_lock lg{m_sb_mtx};
Expand All @@ -1841,8 +1877,11 @@ void RaftReplDev::complete_replace_member(repl_req_ptr_t rreq) {
m_rd_sb->replace_member_task.task_id);
m_rd_sb->replace_member_task = replace_member_task_superblk{};
m_rd_sb.write();
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
}
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared.");
lg.unlock(); // Release lock before calling listener

m_listener->on_complete_replace_member(*ctx, member_ids, rreq->traceID());
}

void RaftReplDev::remove_member(repl_req_ptr_t rreq) {
Expand All @@ -1855,49 +1894,49 @@ void RaftReplDev::clean_replace_member_task(repl_req_ptr_t rreq) {
auto task_id = std::string(r_cast< const char* >(rreq->header().cbytes()));
RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task commit, task_id={}", task_id);

replica_member_info member_out;
replica_member_info member_in;
// Get current membership ID list from raft config
std::vector< replica_id_t > member_ids;
auto config = load_config();
RD_REL_ASSERT(config, "Unable to load raft config during clean_replace_member_task");

auto config_lsn = static_cast< int64_t >(config->get_log_idx());
if (rreq->lsn() <= config_lsn) {
// This config change was already applied, skip directly
RD_LOGI(rreq->traceID(), "Clean replace member task: rreq lsn={} <= config lsn={}, skipping (already applied)",
rreq->lsn(), config_lsn);
return;
}
for (auto const& srv : config->get_servers()) {
member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint()));
}
RD_LOGD(rreq->traceID(),
"Clean replace member task, rreq lsn={} > config lsn={}, passing {} member IDs from persisted config",
rreq->lsn(), config_lsn, member_ids.size());

// Step 1: Check and read member info from superblk
{
std::unique_lock lg{m_sb_mtx};
auto persisted_task_id = get_replace_member_task_id();
if (persisted_task_id.empty()) {
RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task: task not found, task_id={}", task_id);
return;
}
// Build context from persisted task info
std::unique_lock lg{m_sb_mtx};
auto persisted_task_id = get_replace_member_task_id();
if (!persisted_task_id.empty()) {
RD_DBG_ASSERT(persisted_task_id == task_id,
"Invalid task_id in clean_replace_member_task message, received {}, persisted {}", task_id,
persisted_task_id);

if (persisted_task_id != task_id) {
RD_LOGW(rreq->traceID(),
"Raft repl clean_replace_member_task: task_id mismatch, received={}, persisted={}, skip cleaning",
task_id, persisted_task_id);
return;
}
// Build ctx from superblock
replace_member_ctx ctx;
std::strncpy(ctx.task_id, task_id.c_str(), std::min(task_id.length(), max_replace_member_task_id_len - 1));
ctx.task_id[std::min(task_id.length(), max_replace_member_task_id_len - 1)] = '\0';
ctx.replica_out.id = m_rd_sb->replace_member_task.replica_out;
ctx.replica_in.id = m_rd_sb->replace_member_task.replica_in;

// Read member info from superblk
member_out.id = m_rd_sb->replace_member_task.replica_out;
member_in.id = m_rd_sb->replace_member_task.replica_in;
}
m_rd_sb->replace_member_task = replace_member_task_superblk{};
m_rd_sb.write();

// Step 2: Call listener callback to rollback membership in HomeObject
if (member_out.id != boost::uuids::nil_uuid() && member_in.id != boost::uuids::nil_uuid()) {
RD_LOGI(rreq->traceID(),
"Raft repl clean_replace_member_task, callback to listener, task_id={}, member_out={}, member_in={}",
task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
m_listener->on_clean_replace_member_task(task_id, member_out, member_in, rreq->traceID());
} else {
RD_LOGW(rreq->traceID(), "Raft repl clean_replace_member_task: invalid member info, skip callback");
}
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
lg.unlock(); // Release lock before calling listener

// Step 3: Clear the replace_member task from superblk
{
std::unique_lock lg{m_sb_mtx};
auto persisted_task_id = get_replace_member_task_id();
if (!persisted_task_id.empty()) {
m_rd_sb->replace_member_task = replace_member_task_superblk{};
m_rd_sb.write();
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
}
m_listener->on_clean_replace_member_task(ctx, member_ids, rreq->traceID());
} else {
RD_LOGW(rreq->traceID(), "No persisted task found for clean, task_id={}", task_id);
}
}

Expand Down Expand Up @@ -2018,19 +2057,14 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const {

std::vector< replica_id_t > RaftReplDev::get_replication_quorum() {
std::vector< replica_id_t > member_ids;
auto msg_service = group_msg_service();
auto config = load_config();
RD_REL_ASSERT(config, "Unable to load raft config during get_replication_quorum");

if (msg_service) {
std::list< nuraft_mesg::replica_config > cluster_config;
msg_service->get_cluster_config(cluster_config);
for (auto const& config : cluster_config) {
member_ids.push_back(boost::uuids::string_generator()(config.peer_id));
}
RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in cluster config", member_ids.size());
} else {
RD_LOGW(NO_TRACE_ID, "get_replication_quorum: msg_service is null, returning empty member list");
for (auto const& srv : config->get_servers()) {
member_ids.push_back(boost::uuids::string_generator()(srv->get_endpoint()));
}

RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in raft config", member_ids.size());
return member_ids;
}

Expand Down
17 changes: 0 additions & 17 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

namespace homestore {

static constexpr uint64_t max_replace_member_task_id_len = 64;

struct replace_member_task_superblk {
char task_id[max_replace_member_task_id_len];
replica_id_t replica_out;
Expand All @@ -43,21 +41,6 @@ struct raft_repl_dev_superblk : public repl_dev_superblk {
using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;

struct replace_member_ctx {
char task_id[max_replace_member_task_id_len];
replica_member_info replica_out;
replica_member_info replica_in;

replace_member_ctx() = default;
replace_member_ctx(const std::string& id, const replica_member_info& out, const replica_member_info& in) {
auto len = std::min(id.length(), max_replace_member_task_id_len - 1);
std::strncpy(task_id, id.c_str(), len);
task_id[len] = '\0';
replica_out = out;
replica_in = in;
}
};

struct truncate_ctx {
repl_lsn_t truncation_upper_limit = 0;

Expand Down
27 changes: 15 additions & 12 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,22 +362,25 @@ class TestReplicatedDB : public homestore::ReplDevListener {
return hints;
}

void on_start_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {
LOGINFO("[Replica={}] start replace member out {} in {}", g_helper->replica_num(),
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
void on_start_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) override {
LOGINFO("[Replica={}] start replace member task_id={} out {} in {}, member_count={}", g_helper->replica_num(),
ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id),
member_ids.size());
}

void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {
LOGINFO("[Replica={}] complete replace member out {} in {}", g_helper->replica_num(),
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
void on_complete_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) override {
LOGINFO("[Replica={}] complete replace member task_id={} out {} in {}, member_count={}",
g_helper->replica_num(), ctx.task_id, boost::uuids::to_string(ctx.replica_out.id),
boost::uuids::to_string(ctx.replica_in.id), member_ids.size());
}

void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {
LOGINFO("[Replica={}] clean replace member task {} out {} in {}", g_helper->replica_num(), task_id,
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
void on_clean_replace_member_task(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) override {
LOGINFO("[Replica={}] clean replace member task task_id={} out {} in {}, member_count={}",
g_helper->replica_num(), ctx.task_id, boost::uuids::to_string(ctx.replica_out.id),
boost::uuids::to_string(ctx.replica_in.id), member_ids.size());
}

void on_remove_member(const replica_id_t& member, trace_id_t tid) override {
Expand Down
12 changes: 6 additions & 6 deletions src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ class SoloReplDevTest : public testing::Test {
cintrusive< repl_req_ctx >& ctx) override {
LOGINFO("Received error={} on repl_dev", enum_name(error));
}
void on_start_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {}
void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {}
void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {}
void on_start_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) override {}
void on_complete_replace_member(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) override {}
void on_clean_replace_member_task(const replace_member_ctx& ctx, const std::vector< replica_id_t >& member_ids,
trace_id_t tid) override {}
void on_remove_member(const replica_id_t& member, trace_id_t tid) override {}
void on_destroy(const group_id_t& group_id) override {}
void notify_committed_lsn(int64_t lsn) override {}
Expand Down
Loading