Skip to content

Commit

Permalink
curvefs(metaserver): Use DoublyBufferedData to replace read-write loc…
Browse files Browse the repository at this point in the history
…ks to manage copysets

Signed-off-by: Hanqing Wu <wuhanqing@corp.netease.com>
  • Loading branch information
wu-hanqing committed Dec 10, 2023
1 parent c653b74 commit a81c673
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 97 deletions.
6 changes: 6 additions & 0 deletions curvefs/src/metaserver/copyset/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class CopysetNode : public braft::StateMachine {

virtual PeerId GetLeaderId() const;

GroupId GetGroupId() const;

MetaStore* GetMetaStore() const;

virtual uint64_t GetConfEpoch() const;
Expand Down Expand Up @@ -344,6 +346,10 @@ inline bool CopysetNode::IsLoading() const {
return isLoading_.load(std::memory_order_acquire);
}

inline GroupId CopysetNode::GetGroupId() const {
return groupId_;
}

} // namespace copyset
} // namespace metaserver
} // namespace curvefs
Expand Down
223 changes: 142 additions & 81 deletions curvefs/src/metaserver/copyset/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,58 +32,48 @@
#include "curvefs/src/metaserver/copyset/utils.h"
#include "src/common/timeutility.h"

#include "src/common/concurrent/generic_name_lock.h"

namespace curvefs {
namespace metaserver {
namespace copyset {

using ::curve::common::TimeUtility;
using NameLockGuard = curve::common::GenericNameLockGuard<Mutex>;

bool CopysetNodeManager::IsLoadFinished() const {
return loadFinished_.load(std::memory_order_acquire);
}

bool CopysetNodeManager::DeleteCopysetNodeInternal(PoolId poolId,
CopysetId copysetId,
bool removeData) {
GroupId groupId = ToGroupId(poolId, copysetId);

// stop copyset node first
{
ReadLockGuard lock(lock_);
auto it = copysets_.find(groupId);
if (it != copysets_.end()) {
it->second->Stop();
} else {
LOG(WARNING) << "Delete copyset failed, copyset "
<< ToGroupIdString(poolId, copysetId) << " not found";
return false;
}
bool CopysetNodeManager::DeleteCopysetNodeInternalLocked(PoolId poolId,
CopysetId copysetId,
bool removeData) {
auto node = GetSharedCopysetNode(poolId, copysetId);
if (node == nullptr) {
LOG(WARNING) << "Delete copyset failed, copyset: "
<< ToGroupIdString(poolId, copysetId) << " not found";
return false;
}

// remove copyset node
{
WriteLockGuard lock(lock_);
auto it = copysets_.find(groupId);
if (it != copysets_.end()) {
bool ret = true;
if (removeData) {
std::string copysetDataDir = it->second->GetCopysetDataDir();
if (!trash_.RecycleCopyset(copysetDataDir)) {
LOG(WARNING) << "Recycle copyset remote data failed, "
"copyset data path: '"
<< copysetDataDir << "'";
ret = false;
}
}

copysets_.erase(it);
LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId)
<< " success";
return ret;
node->Stop();
LOG(INFO) << "Copyset " << ToGroupIdString(poolId, copysetId) << " stopped";

bool ret = true;
if (removeData) {
std::string copysetDataDir = node->GetCopysetDataDir();
if (!trash_.RecycleCopyset(copysetDataDir)) {
LOG(WARNING) << "Recycle copyset remove data failed, "
"copyset data path: '"
<< copysetDataDir << "'";
ret = false;
}
}

return false;
// delete node
copysets_.Modify(RemoveCopysetNode, node);
LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId)
<< " success";
return ret;
}

bool CopysetNodeManager::Init(const CopysetNodeOptions& options) {
Expand All @@ -107,11 +97,11 @@ bool CopysetNodeManager::Start() {
loadFinished_.store(true, std::memory_order_release);
LOG(INFO) << "Reload copysets success";
return true;
} else {
running_.store(false, std::memory_order_release);
LOG(ERROR) << "Reload copysets failed";
return false;
}

running_.store(false, std::memory_order_release);
LOG(ERROR) << "Reload copysets failed";
return false;
}

bool CopysetNodeManager::Stop() {
Expand All @@ -123,16 +113,24 @@ bool CopysetNodeManager::Stop() {
loadFinished_.store(false);

{
ReadLockGuard lock(lock_);
for (auto& copyset : copysets_) {
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(ERROR) << "Fail to get copyset nodes";
return false;
}

for (const auto& copyset : *ptr) {
copyset.second->Stop();
}
}

{
WriteLockGuard lock(lock_);
copysets_.clear();
}
auto clear = [](CopysetNodeMap& map) -> size_t {
map.clear();
return 1;
};

// clear copysets
copysets_.Modify(clear);

if (!trash_.Stop()) {
LOG(ERROR) << "Stop trash failed";
Expand All @@ -146,54 +144,105 @@ bool CopysetNodeManager::Stop() {

CopysetNode* CopysetNodeManager::GetCopysetNode(PoolId poolId,
CopysetId copysetId) {
ReadLockGuard lock(lock_);
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

auto it = copysets_.find(ToGroupId(poolId, copysetId));
if (it != copysets_.end()) {
auto it = ptr->find(ToGroupId(poolId, copysetId));
if (it != ptr->end()) {
return it->second.get();
}

LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

std::shared_ptr<CopysetNode> CopysetNodeManager::GetSharedCopysetNode(
PoolId poolId, CopysetId copysetId) {
ReadLockGuard lock(lock_);
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

auto it = copysets_.find(ToGroupId(poolId, copysetId));
if (it != copysets_.end()) {
auto it = ptr->find(ToGroupId(poolId, copysetId));
if (it != ptr->end()) {
return it->second;
}

LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

int CopysetNodeManager::IsCopysetNodeExist(
const CreateCopysetRequest::Copyset& copyset) {
ReadLockGuard lock(lock_);
auto iter = copysets_.find(ToGroupId(copyset.poolid(),
copyset.copysetid()));
if (iter == copysets_.end()) {
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(copyset.poolid(), copyset.copysetid());
return 0;
}

auto iter = ptr->find(ToGroupId(copyset.poolid(), copyset.copysetid()));
if (iter == ptr->end()) {
return 0;
} else {
auto copysetNode = iter->second.get();
std::vector<Peer> peers;
copysetNode->ListPeers(&peers);
if (peers.size() != static_cast<size_t>(copyset.peers_size())) {
}

auto* copysetNode = iter->second.get();
std::vector<Peer> peers;
copysetNode->ListPeers(&peers);
if (peers.size() != static_cast<size_t>(copyset.peers_size())) {
return -1;
}

for (int i = 0; i < copyset.peers_size(); i++) {
const auto& cspeer = copyset.peers(i);
auto iter =
std::find_if(peers.begin(), peers.end(), [&cspeer](const Peer& p) {
return cspeer.address() == p.address();
});
if (iter == peers.end()) {
return -1;
}
}

for (int i = 0; i < copyset.peers_size(); i++) {
const auto& cspeer = copyset.peers(i);
auto iter = std::find_if(peers.begin(), peers.end(),
[&cspeer](const Peer& p) {
return cspeer.address() == p.address();
});
if (iter == peers.end()) {
return -1;
}
}
return 1;
}

size_t CopysetNodeManager::AddCopysetNode(
CopysetNodeMap& map, const std::shared_ptr<CopysetNode>& copysetNode) {
assert(copysetNode != nullptr);

auto groupId = copysetNode->GetGroupId();
auto it = map.find(groupId);
if (it != map.end()) {
LOG(WARNING) << "Copyset node already exists: " << groupId;
return 0;
}

auto ret = map.emplace(groupId, copysetNode);
CHECK(ret.second);
return 1;
}

size_t CopysetNodeManager::RemoveCopysetNode(
CopysetNodeMap& map, const std::shared_ptr<CopysetNode>& copysetNode) {
assert(copysetNode != nullptr);

auto groupId = copysetNode->GetGroupId();
auto it = map.find(groupId);
if (it == map.end()) {
LOG(WARNING) << "Copyset node not found: " << groupId;
return 0;
}

map.erase(it);
return 1;
}

Expand All @@ -209,9 +258,11 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
braft::GroupId groupId = ToGroupId(poolId, copysetId);
std::shared_ptr<CopysetNode> copysetNode;

NameLockGuard guard(copysetLifetimeNameLock_, groupId);

{
WriteLockGuard lock(lock_);
if (copysets_.count(groupId) != 0) {
auto* exist = GetCopysetNode(poolId, copysetId);
if (exist != nullptr) {
LOG(WARNING) << "Copyset node already exists: "
<< ToGroupIdString(poolId, copysetId);
return false;
Expand All @@ -225,7 +276,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
return false;
}

copysets_.emplace(groupId, copysetNode);
copysets_.Modify(AddCopysetNode, copysetNode);
}

// node start maybe time-consuming
Expand All @@ -234,7 +285,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
// restart, in this case we should not automaticaly remove copyset's
// data
const bool removeData = checkLoadFinish;
DeleteCopysetNodeInternal(poolId, copysetId, removeData);
DeleteCopysetNodeInternalLocked(poolId, copysetId, removeData);
LOG(ERROR) << "Copyset " << ToGroupIdString(poolId, copysetId)
<< " start failed";
return false;
Expand All @@ -246,10 +297,16 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
}

void CopysetNodeManager::GetAllCopysets(
std::vector<CopysetNode *> *nodes) const {
std::vector<CopysetNode*>* nodes) const {
nodes->clear();
ReadLockGuard lock(lock_);
for (auto& copyset : copysets_) {
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get all copysets";
return;
}

nodes->reserve(ptr->size());
for (const auto& copyset : *ptr) {
nodes->push_back(copyset.second.get());
}
}
Expand All @@ -266,11 +323,15 @@ void CopysetNodeManager::AddService(brpc::Server* server,
}

bool CopysetNodeManager::DeleteCopysetNode(PoolId poolId, CopysetId copysetId) {
return DeleteCopysetNodeInternal(poolId, copysetId, false);
GroupId groupId = ToGroupId(poolId, copysetId);
NameLockGuard guard(copysetLifetimeNameLock_, groupId);
return DeleteCopysetNodeInternalLocked(poolId, copysetId, false);
}

bool CopysetNodeManager::PurgeCopysetNode(PoolId poolId, CopysetId copysetId) {
return DeleteCopysetNodeInternal(poolId, copysetId, true);
GroupId groupId = ToGroupId(poolId, copysetId);
NameLockGuard guard(copysetLifetimeNameLock_, groupId);
return DeleteCopysetNodeInternalLocked(poolId, copysetId, true);
}

} // namespace copyset
Expand Down
Loading

0 comments on commit a81c673

Please sign in to comment.