Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^13.0]@oss/master", transitive_headers=True)
self.requires("homestore/[^7.3]@oss/master")
self.requires("homestore/[^7.4]@oss/master")
self.requires("iomgr/[^12.0]@oss/master")

def validate(self):
Expand Down
55 changes: 37 additions & 18 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,34 +771,41 @@ class HSHomeObject : public HomeObjectImpl {
* @brief Function invoked when start a member replacement
*
* @param group_id The group id of replication device.
* @param member_out Member which is removed from group
* @param member_in Member which is added to group
* @param ctx The replace member context containing task_id, member_out, member_in
* @param member_ids Complete list of member IDs from raft config (single source of truth)
* @param tid Trace ID
* */
void on_pg_start_replace_member(homestore::group_id_t group_id, const std::string& task_id,
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in, trace_id_t tid);
void on_pg_start_replace_member(homestore::group_id_t group_id, const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid);

/**
* @brief Function invoked when complete a member replacement
*
* IMPORTANT: This callback must be idempotent as it may be called multiple times during log replay.
*
* @param group_id The group id of replication device.
* @param member_out Member which is removed from group
* @param member_in Member which is added to group
* @param ctx The replace member context containing task_id, member_out, member_in
* @param member_ids Complete list of member IDs from consensus layer (single source of truth)
* @param tid Trace ID
* */
void on_pg_complete_replace_member(homestore::group_id_t group_id, const std::string& task_id,
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in, trace_id_t tid);
void on_pg_complete_replace_member(homestore::group_id_t group_id, const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid);

/**
* @brief Called when clean replace member task (rollback)
* @param group_id Group ID
* @param task_id Task ID
* @param member_out Member which should be restored to group
* @param member_in Member which should be removed from group
* */
void on_pg_clean_replace_member_task(homestore::group_id_t group_id, const std::string& task_id,
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in, trace_id_t tid);
*
* IMPORTANT: This callback must be idempotent as it may be called multiple times during log replay.
*
* @param group_id The group id of replication device.
* @param ctx The replace member context containing task_id, member_out, member_in
* @param member_ids Complete list of member IDs from consensus layer (single source of truth)
* @param tid Trace ID
*/
void on_pg_clean_replace_member_task(homestore::group_id_t group_id, const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid);

void on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid = 0);

Expand Down Expand Up @@ -1068,6 +1075,18 @@ class HSHomeObject : public HomeObjectImpl {
}
homestore::replica_member_info to_replica_member_info(const PGMember& pg_member) const;
PGMember to_pg_member(const homestore::replica_member_info& replica_info) const;

/**
* @brief Reconcile PG membership with authoritative member ID list from replication layer
* Preserves existing member metadata (name, priority) where possible
*
* @param existing_members Current PG members (for metadata preservation)
* @param member_ids Authoritative member IDs from replication consensus
* @return New membership set reconciled with the authoritative list
*/
std::set<PGMember> reconcile_membership_with_config(
const std::set<PGMember>& existing_members,
const std::vector<homestore::replica_id_t>& member_ids);
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
143 changes: 71 additions & 72 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,97 +339,107 @@ replica_member_info HSHomeObject::to_replica_member_info(const PGMember& pg_memb
return replica_info;
}

void HSHomeObject::on_pg_start_replace_member(group_id_t group_id, const std::string& task_id,
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
std::set< PGMember > HSHomeObject::reconcile_membership_with_config(const std::set< PGMember >& existing_members,
const std::vector< replica_id_t >& member_ids) {
std::set< PGMember > new_members;
for (auto const& id : member_ids) {
// Try to preserve existing member metadata (name, priority)
auto existing = existing_members.find(PGMember(id));
if (existing != existing_members.end()) {
new_members.insert(*existing); // Keep name and priority
} else {
// New member not in current membership, add with just ID
new_members.emplace(id);
}
}
return new_members;
}

void HSHomeObject::on_pg_start_replace_member(group_id_t group_id, const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid) {
std::unique_lock lck(_pg_lock);
for (const auto& iter : _pg_map) {
auto& pg = iter.second;
if (pg_repl_dev(*pg).group_id() == group_id) {
// Remove the old member and add the new member
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());
pg->pg_info_.members.emplace(std::move(to_pg_member(member_in)));
pg->pg_info_.members.emplace(std::move(to_pg_member(member_out)));

uint32_t i{0};
pg_members* sb_members = hs_pg->pg_sb_->get_pg_members_mutable();
for (auto const& m : pg->pg_info_.members) {
sb_members[i].id = m.id;
DEBUG_ASSERT(m.name.size() <= PGMember::max_name_len, "member name exceeds max len, name={}", m.name);
auto name_len = std::min(m.name.size(), PGMember::max_name_len);
std::strncpy(sb_members[i].name, m.name.c_str(), name_len);
sb_members[i].name[name_len] = '\0';
sb_members[i].priority = m.priority;
++i;
}
hs_pg->pg_sb_->num_dynamic_members = pg->pg_info_.members.size();
// Update the latest membership info to pg superblk.
hs_pg->pg_sb_.write();
LOGI("PG start replace member done, task_id={} member_out={} member_in={}, member_nums={}, trace_id={}",
task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id),
pg->pg_info_.members.size(), tid);

// Build membership from raft config
auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, member_ids);

// Start phase: add replica_in with full metadata from ctx, on_pg_start_replace_member is called before the new member is added to the group.
new_members.emplace(to_pg_member(ctx.replica_in));

// Apply to PG
hs_pg->pg_info_.members = std::move(new_members);
hs_pg->update_membership(hs_pg->pg_info_.members);

LOGI("PG start replace member done: task_id={} member_out={} member_in={}, member_nums={}, trace_id={}",
ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id),
hs_pg->pg_info_.members.size(), tid);
return;
}
}

LOGE("PG replace member failed task_id={}, member_out={} member_in={}, trace_id={}", task_id,
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
LOGE("PG start replace member failed, group_id not found, task_id={}, member_out={} member_in={}, trace_id={}",
ctx.task_id, boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), tid);
}

void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std::string& task_id,
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid) {
std::unique_lock lck(_pg_lock);
for (const auto& iter : _pg_map) {
auto& pg = iter.second;
if (pg_repl_dev(*pg).group_id() == group_id) {
// Remove the old member and add the new member
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());
pg->pg_info_.members.erase(PGMember(member_out.id));
pg->pg_info_.members.emplace(std::move(to_pg_member(member_in)));
hs_pg->update_membership(pg->pg_info_.members);
LOGI("PG complete replace member done member_out={} member_in={}, member_nums={}, trace_id={}",
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id),
pg->pg_info_.members.size(), tid);

// Build membership from raft config
auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, member_ids);

// Complete phase: ensure replica_out is removed (defense in depth), on_pg_complete_replace_member should be called after the replica_out is verified to be removed.
new_members.erase(PGMember(ctx.replica_out.id));

// Apply to PG
hs_pg->pg_info_.members = std::move(new_members);
hs_pg->update_membership(hs_pg->pg_info_.members);

LOGI("PG complete replace member done: member_out={} member_in={}, member_nums={}, trace_id={}",
boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id),
hs_pg->pg_info_.members.size(), tid);
return;
}
}
LOGE("PG complete replace member failed member_out={} member_in={}, trace_id={}",
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
LOGE("PG complete replace member failed, group_id not found, member_out={} member_in={}, trace_id={}",
boost::uuids::to_string(ctx.replica_out.id), boost::uuids::to_string(ctx.replica_in.id), tid);
}

//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists
void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id,
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
//It will reconcile the membership with raft in clean/rollback phase.
void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid) {
std::unique_lock lck(_pg_lock);
for (const auto& iter : _pg_map) {
auto& pg = iter.second;
if (pg_repl_dev(*pg).group_id() == group_id) {
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());

// Remove the in_member (the one that was added but now needs to be removed)
auto removed_count = pg->pg_info_.members.erase(PGMember(member_in.id));

// Ensure out_member exists
// Using emplace is safe - it won't overwrite if already exists
auto out_pg_member = to_pg_member(member_out);
auto [it, inserted] = pg->pg_info_.members.emplace(std::move(out_pg_member));
// Clean phase: sync membership to raft config (rofllback scenario)
auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, member_ids);

// Update superblock
hs_pg->update_membership(pg->pg_info_.members);
// Apply to PG
hs_pg->pg_info_.members = std::move(new_members);
hs_pg->update_membership(hs_pg->pg_info_.members);

LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), "
"ensured out_member={} (inserted={}), member_nums={}, trace_id={}",
task_id, boost::uuids::to_string(member_in.id), removed_count,
boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid);
LOGI("PG clean replace member task done: task_id={}, member_nums={}, trace_id={}",
ctx.task_id, hs_pg->pg_info_.members.size(), tid);
return;
}
}
LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id,
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
LOGE("PG clean replace member task failed, group_id not found, task_id={}, trace_id={}", ctx.task_id, tid);
}


bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
std::unique_lock lck(_pg_lock);
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
Expand All @@ -446,20 +456,9 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
return false;
}

// Build new member set from actual members
std::set< PGMember > new_members;
for (auto& member_id : actual_members) {
auto existing = hs_pg->pg_info_.members.find(PGMember(member_id));
if (existing != hs_pg->pg_info_.members.end()) {
// Keep the existing member with its name and priority
new_members.insert(*existing);
} else {
// New member not in our records, add with default name
PGMember new_member(member_id);
new_members.insert(std::move(new_member));
LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id);
}
}
// Build new member set by reconciling with actual members from replication layer
auto new_members = reconcile_membership_with_config(hs_pg->pg_info_.members, actual_members);

// Check if membership changed
if (new_members == hs_pg->pg_info_.members) {
LOGD("Membership already in sync for pg={}, no update needed", hs_pg->pg_info_.id);
Expand Down Expand Up @@ -606,7 +605,7 @@ PGManager::NullAsyncResult HSHomeObject::_remove_member(pg_id_t pg_id, peer_id_t

LOGI("PG remove member, pg_id={}, member={} trace_id={}", pg_id, boost::uuids::to_string(member_id), tid);
return hs_repl_service()
.remove_member(group_id, member_id, commit_quorum, tid)
.remove_member(group_id, member_id, commit_quorum, true, tid)
.via(executor_)
.thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult {
decr_pending_request_num();
Expand Down
26 changes: 12 additions & 14 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,24 +261,22 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t
return homestore::blk_alloc_hints();
}

void ReplicationStateMachine::on_start_replace_member(const std::string& task_id,
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in, trace_id_t tid) {
home_object_->on_pg_start_replace_member(repl_dev()->group_id(), task_id, member_out, member_in, tid);
void ReplicationStateMachine::on_start_replace_member(const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid) {
home_object_->on_pg_start_replace_member(repl_dev()->group_id(), ctx, member_ids, tid);
}

void ReplicationStateMachine::on_complete_replace_member(const std::string& task_id,
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in,
trace_id_t tid) {
home_object_->on_pg_complete_replace_member(repl_dev()->group_id(), task_id, member_out, member_in, tid);
void ReplicationStateMachine::on_complete_replace_member(const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid) {
home_object_->on_pg_complete_replace_member(repl_dev()->group_id(), ctx, member_ids, tid);
}

void ReplicationStateMachine::on_clean_replace_member_task(const std::string& task_id,
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in,
trace_id_t tid) {
home_object_->on_pg_clean_replace_member_task(repl_dev()->group_id(), task_id, member_out, member_in, tid);
void ReplicationStateMachine::on_clean_replace_member_task(const homestore::replace_member_ctx& ctx,
const std::vector< homestore::replica_id_t >& member_ids,
homestore::trace_id_t tid) {
home_object_->on_pg_clean_replace_member_task(repl_dev()->group_id(), ctx, member_ids, tid);
}

void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
Expand Down
Loading
Loading