Skip to content

Commit

Permalink
Add replace member changes. Perist membership details.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Oct 1, 2024
1 parent d22f806 commit ee75415
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 33 deletions.
3 changes: 2 additions & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ struct PGStats {
class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum = 0) = 0;

/**
* Retrieves the statistics for a specific PG (Placement Group) identified by its ID.
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class HomeObjectImpl : public HomeObject,

virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) = 0;
virtual PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) = 0;
PGMember const& new_member, uint32_t commit_quorum) = 0;
virtual bool _get_stats(pg_id_t id, PGStats& stats) const = 0;
virtual void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const = 0;

Expand Down Expand Up @@ -144,8 +144,8 @@ class HomeObjectImpl : public HomeObject,

/// PgManager
PGManager::NullAsyncResult create_pg(PGInfo&& pg_info) final;
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) final;
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) final;
// see api comments in base class;
bool get_stats(pg_id_t id, PGStats& stats) const final;
void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final;
Expand Down
13 changes: 11 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;

PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) override;

bool _get_stats(pg_id_t id, PGStats& stats) const override;
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;
Expand Down Expand Up @@ -378,6 +378,15 @@ class HSHomeObject : public HomeObjectImpl {
void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Function invoked when a member is replaced by a new member
*
* @param repl_dev The replication device.
* @param old_member Member which is removed from group
* @param new_member Member which is added to group
* */
void on_pg_replace_member(pg_id_t pg_id, peer_id_t const& old_member, PGMember const& new_member);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down
52 changes: 49 additions & 3 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,55 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
if (ctx) ctx->promise_.setValue(folly::Unit());
}

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member,
PGMember const& new_member, uint32_t commit_quorum) {

group_id_t group_id;
{
auto lg = std::shared_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*iter->second);

if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
group_id = repl_dev.group_id();
}

return hs_repl_service()
.replace_member(group_id, old_member, new_member.id, commit_quorum)
.via(executor_)
.thenValue([this, pg_id, old_member, new_member](auto&& v) mutable -> PGManager::NullAsyncResult {
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }

on_pg_replace_member(pg_id, old_member, new_member);
return folly::Unit();
});
}

void HSHomeObject::on_pg_replace_member(pg_id_t pg_id, peer_id_t const& old_member, PGMember const& new_member) {
auto lg = std::shared_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG id not found");
auto& pg = iter->second;
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());

// Update the pg info members.
pg->pg_info_.members.erase(PGMember(old_member));
pg->pg_info_.members.emplace(new_member);

uint32_t i{0};
for (auto const& m : pg->pg_info_.members) {
hs_pg->pg_sb_->members[i].id = m.id;
std::strncpy(hs_pg->pg_sb_->members[i].name, m.name.c_str(), std::min(m.name.size(), pg_members::max_name_len));
hs_pg->pg_sb_->members[i].priority = m.priority;
++i;
}

// Update the latest membership info to pg superblk.
hs_pg->pg_sb_.write();
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
Expand Down
4 changes: 1 addition & 3 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
uint32_t data_size) override;

/// @brief Called when replication module is replacing an existing member with a new member
void replace_member(homestore::replica_id_t member_out, homestore::replica_id_t member_in) override {
// TODO
}
void replace_member(homestore::replica_id_t member_out, homestore::replica_id_t member_in) override {};

/// @brief Called when the replica is being destroyed by nuraft;
void on_destroy() override;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class MemoryHomeObject : public HomeObjectImpl {

// PGManager
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) override;

bool _get_stats(pg_id_t id, PGStats& stats) const override;
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;
Expand Down
10 changes: 8 additions & 2 deletions src/lib/memory_backend/mem_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ PGManager::NullAsyncResult MemoryHomeObject::_create_pg(PGInfo&& pg_info, std::s
}

PGManager::NullAsyncResult MemoryHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
PGMember const& new_member, uint32_t commit_quorum) {
auto lg = std::shared_lock(_pg_lock);
auto it = _pg_map.find(id);
if (_pg_map.end() == it) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNKNOWN_PG));
}
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
}

Expand All @@ -25,7 +30,8 @@ bool MemoryHomeObject::_get_stats(pg_id_t id, PGStats& stats) const {
stats.open_shards =
std::count_if(pg->shards_.begin(), pg->shards_.end(), [](auto const& s) { return s->is_open(); });
for (auto const& m : pg->pg_info_.members) {
stats.members.emplace_back(std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */));
stats.members.emplace_back(
std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */));
}

return true;
Expand Down
12 changes: 4 additions & 8 deletions src/lib/pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) {
}

PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
LOGI("[pg={}] replace member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id));
PGMember const& new_member, uint32_t commit_quorum) {
LOGI("[pg={}] replace member [{}] with [{}] quorum [{}]", id, to_string(old_member), to_string(new_member.id),
commit_quorum);
if (old_member == new_member.id) {
LOGW("rejecting identical replacement SvcId [{}]!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

if (old_member == our_uuid()) {
LOGW("refusing to remove ourself {}!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

return _replace_member(id, old_member, new_member);
return _replace_member(id, old_member, new_member, commit_quorum);
}

bool HomeObjectImpl::get_stats(pg_id_t id, PGStats& stats) const { return _get_stats(id, stats); }
Expand Down
13 changes: 4 additions & 9 deletions src/lib/tests/PGManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,17 @@ TEST_F(TestFixture, Migrate) {
PGMember{boost::uuids::random_generator()()})
.get()
.error(),
PGError::UNSUPPORTED_OP);
EXPECT_EQ(
homeobj_->pg_manager()
->replace_member(_pg_id, boost::uuids::random_generator()(), PGMember{boost::uuids::random_generator()()})
.get()
.error(),
PGError::UNSUPPORTED_OP);
PGError::UNKNOWN_PG);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}).get().error(),
PGError::INVALID_ARG);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer2}).get().error(),
PGError::INVALID_ARG);
// TODO enable after HO test framework is enabled
#if 0
EXPECT_EQ(homeobj_->pg_manager()
->replace_member(_pg_id, _peer1, PGMember{boost::uuids::random_generator()()})
.get()
.error(),
PGError::INVALID_ARG);
EXPECT_FALSE(
homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}).get());
#endif
}

0 comments on commit ee75415

Please sign in to comment.