Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Lijin Xiong committed Mar 15, 2023
2 parents f352519 + 9dd890b commit 97bddcf
Show file tree
Hide file tree
Showing 41 changed files with 555 additions and 261 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<div align=center> <image src="docs/images/cncf-icon-color.png" width = 8%>

**A could-native distributed storage system**
**A cloud-native distributed storage system**

#### English | [简体中文](README_cn.md)
### 📄 [Documents](https://github.com/opencurve/curve/tree/master/docs) || 🌐 [Official Website](https://www.opencurve.io/Curve/HOME) || 🏠 [Forum](https://ask.opencurve.io/t/topic/7)
Expand Down Expand Up @@ -46,7 +46,7 @@

## About Curve

**Curve** is a mordern storage system developed by netease, currently supporting file storage(CurveFS) and block storage(CurveBS). Now it's hosted at [CNCF](https://www.cncf.io/) as a sandbox project.
**Curve** is a modern storage system developed by netease, currently supporting file storage(CurveFS) and block storage(CurveBS). Now it's hosted at [CNCF](https://www.cncf.io/) as a sandbox project.

The core application scenarios of CurveBS mainly include:
- the performance, mixed, capacity cloud disk or persistent volume of virtual machine/container, and remote disks of physical machines
Expand Down Expand Up @@ -122,7 +122,7 @@ The core application scenarios of CurveFS mainly include:

</details>
<details>
<summary><b><font=5>Docking Kubernates</b></font></summary>
<summary><b><font=5>Docking Kubernetes</b></font></summary>

- Use [Curve CSI Driver](https://github.com/opencurve/curve-csi), The plugin implements the Container Storage Interface(CSI) between Container Orchestrator(CO) and Curve cluster. It allows dynamically provisioning curve volumes and attaching them to workloads.
- For details of the documentation, see [CSI Curve Driver Doc](https://github.com/opencurve/curve-csi/blob/master/docs/README.md).
Expand Down Expand Up @@ -226,7 +226,7 @@ Please refer to the [Test environment configuration](docs/cn/测试环境配置
See [Governance](https://github.com/opencurve/community/blob/master/GOVERNANCE.md).

## Contribute us
Participation in the Curve project is described in the [Curve Open Source Community Guidelines](Community_Guidelines.md) and is subject to a [contributor contract](https://github.com/opencurve/curve/blob/master/CODE_OF_CONDUCT.md).
Participation in the Curve project is described in the [Curve Developers Guidelines](developers_guide.md) and is subject to a [contributor contract](https://github.com/opencurve/curve/blob/master/CODE_OF_CONDUCT.md).
We welcome your contribution!

## Code of Conduct
Expand Down
6 changes: 3 additions & 3 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<div align=center> <image src="docs/images/cncf-icon-color.png" width = 8%>

**A could-native distributed storage system**
**A cloud-native distributed storage system**

#### [English](README.md) | 简体中文
### 📄 [文档](https://github.com/opencurve/curve/tree/master/docs) || 🌐 [官网](https://www.opencurve.io/Curve/HOME) || 🏠 [论坛](https://ask.opencurve.io/t/topic/7)
Expand Down Expand Up @@ -124,7 +124,7 @@ CurveFS的核心应用场景主要包括:

</details>
<details>
<summary><b><font=5>对接 Kubernates</b></font></summary>
<summary><b><font=5>对接 Kubernetes</b></font></summary>

- 使用 [Curve CSI Driver](https://github.com/opencurve/curve-csi) 插件在 Container Orchestrator (CO) 与 Curve 集群中实现了 Container Storage Interface(CSI)。
- 文档详见[CSI Curve Driver Doc](https://github.com/opencurve/curve-csi/blob/master/docs/README.md)
Expand Down Expand Up @@ -221,7 +221,7 @@ $ ./fio --thread --rw=randwrite --bs=4k --ioengine=nebd --nebd=cbd:pool//pfstest

## 贡献我们

参与 Curve 项目开发详见[Curve 开源社区指南](Community_Guidelines_cn.md)并且请遵循[贡献者准则](https://github.com/opencurve/curve/blob/master/CODE_OF_CONDUCT.md), 我们期待您的贡献!
参与 Curve 项目开发详见[Curve 开发者指南](developers_guide_cn.md)并且请遵循[贡献者准则](https://github.com/opencurve/curve/blob/master/CODE_OF_CONDUCT.md), 我们期待您的贡献!

## 最佳实践
- [CurveBS+NFS搭建NFS存储](docs/practical/curvebs_nfs.md)
Expand Down
4 changes: 2 additions & 2 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct DiskCacheOption {
// the max time system command can run
uint32_t cmdTimeoutSec;
// threads for disk cache
uint32_t threads;
uint32_t threads = 10;
// the write throttle bps of disk cache
uint64_t avgFlushBytes;
// the write burst bps of disk cache
Expand Down Expand Up @@ -203,7 +203,7 @@ struct FuseClientOption {
bool enableFuseSplice = false;
bool disableXattr = false;
uint32_t downloadMaxRetryTimes;
uint32_t warmupThreadsNum;
uint32_t warmupThreadsNum = 10;
};

void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption);
Expand Down
14 changes: 9 additions & 5 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
auto fsCacheManager = std::make_shared<FsCacheManager>(
dynamic_cast<S3ClientAdaptorImpl *>(s3Adaptor_.get()),
opt.s3Opt.s3ClientAdaptorOpt.readCacheMaxByte,
opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte);
opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte, kvClientManager_);
if (opt.s3Opt.s3ClientAdaptorOpt.diskCacheOpt.diskCacheType !=
DiskCacheType::Disable) {
auto s3DiskCacheClient = std::make_shared<S3ClientImpl>();
Expand All @@ -84,11 +84,11 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
diskCacheManager, s3DiskCacheClient);
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
diskCacheManagerImpl, true);
diskCacheManagerImpl, kvClientManager_, true);
} else {
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
nullptr, true);
nullptr, kvClientManager_, true);
}
return ret;
}
Expand All @@ -112,13 +112,17 @@ bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) {
return false;
}

g_kvClientManager = new KVClientManager();
if (!g_kvClientManager->Init(opt, memcacheClient)) {
kvClientManager_ = std::make_shared<KVClientManager>();
if (!kvClientManager_->Init(opt, memcacheClient)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init kvClientManager fail";
return false;
}

if (warmupManager_ != nullptr) {
warmupManager_->SetKVClientManager(kvClientManager_);
}

return true;
}

Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FuseS3Client : public FuseClient {
};
warmupManager_ = std::make_shared<warmup::WarmupManagerS3Impl>(
metaClient_, inodeManager_, dentryManager_, fsInfo_, readFunc,
s3Adaptor_);
s3Adaptor_, nullptr);
}

FuseS3Client(const std::shared_ptr<MdsClient> &mdsClient,
Expand Down Expand Up @@ -118,6 +118,7 @@ class FuseS3Client : public FuseClient {
private:
// s3 adaptor
std::shared_ptr<S3ClientAdaptor> s3Adaptor_;
std::shared_ptr<KVClientManager> kvClientManager_;
};


Expand Down
16 changes: 6 additions & 10 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

#include "curvefs/src/client/kvclient/kvclient_manager.h"
#include <memory>
#include "src/client/client_metric.h"
#include "src/common/concurrent/count_down_event.h"

Expand All @@ -31,32 +32,27 @@ using curvefs::client::metric::KVClientMetric;
namespace curvefs {
namespace client {

KVClientManager *g_kvClientManager = nullptr;
KVClientMetric *g_kvClientMetric = nullptr;

#define ONRETURN(TYPE, RES) \
if (RES) { \
g_kvClientMetric->kvClient##TYPE.qps.count << 1; \
kvClientMetric_.kvClient##TYPE.qps.count << 1; \
} else { \
g_kvClientMetric->kvClient##TYPE.eps.count << 1; \
}
kvClientMetric_.kvClient##TYPE.eps.count << 1; \
} \

bool KVClientManager::Init(const KVClientManagerOpt &config,
const std::shared_ptr<KVClient> &kvclient) {
client_ = kvclient;
g_kvClientMetric = new KVClientMetric();
return threadPool_.Start(config.setThreadPooln) == 0;
}

void KVClientManager::Uninit() {
client_->UnInit();
threadPool_.Stop();
delete g_kvClientMetric;
}

void KVClientManager::Set(std::shared_ptr<SetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&g_kvClientMetric->kvClientSet.latency);
LatencyGuard guard(&kvClientMetric_.kvClientSet.latency);

std::string error_log;
auto res =
Expand All @@ -69,7 +65,7 @@ void KVClientManager::Set(std::shared_ptr<SetKVCacheTask> task) {

void KVClientManager::Get(std::shared_ptr<GetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency);
LatencyGuard guard(&kvClientMetric_.kvClientGet.latency);

std::string error_log;
task->res = client_->Get(task->key, task->value, task->offset,
Expand Down
14 changes: 7 additions & 7 deletions curvefs/src/client/kvclient/kvclient_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ class GetKVCacheTask;
using curve::common::TaskThreadPool;
using curvefs::client::common::KVClientManagerOpt;

extern KVClientManager *g_kvClientManager;
extern KVClientMetric *g_kvClientMetric;

typedef std::function<void(const std::shared_ptr<SetKVCacheTask> &)>
SetKVCacheDone;
typedef std::function<void(const std::shared_ptr<GetKVCacheTask> &)>
Expand All @@ -62,10 +59,10 @@ struct SetKVCacheTask {
uint64_t length;
SetKVCacheDone done;
SetKVCacheTask() = default;
SetKVCacheTask(const std::string &k, const char *val, const uint64_t len)
: key(k), value(val), length(len) {
done = [](const std::shared_ptr<SetKVCacheTask> &) {};
}
SetKVCacheTask(
const std::string &k, const char *val, const uint64_t len,
SetKVCacheDone done = [](const std::shared_ptr<SetKVCacheTask> &) {})
: key(k), value(val), length(len), done(std::move(done)) {}
};

struct GetKVCacheTask {
Expand Down Expand Up @@ -98,12 +95,15 @@ class KVClientManager {

void Get(std::shared_ptr<GetKVCacheTask> task);

KVClientMetric *GetClientMetricForTesting() { return &kvClientMetric_; }

private:
void Uninit();

private:
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable> threadPool_;
std::shared_ptr<KVClient> client_;
KVClientMetric kvClientMetric_;
};

} // namespace client
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <brpc/controller.h>
#include <algorithm>
#include <list>
#include <utility>

#include "absl/memory/memory.h"
#include "curvefs/src/client/s3/client_s3_adaptor.h"
Expand All @@ -39,6 +40,7 @@ S3ClientAdaptorImpl::Init(
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
std::shared_ptr<KVClientManager> kvClientManager,
bool startBackGround) {
pendingReq_ = 0;
blockSize_ = option.blockSize;
Expand Down Expand Up @@ -66,6 +68,7 @@ S3ClientAdaptorImpl::Init(
fsCacheManager_ = fsCacheManager;
waitInterval_.Init(option.intervalSec * 1000);
diskCacheManagerImpl_ = diskCacheManagerImpl;
kvClientManager_ = std::move(kvClientManager);
if (HasDiskCache()) {
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (diskCacheManagerImpl_->Init(option) < 0) {
Expand Down Expand Up @@ -372,7 +375,7 @@ CURVEFS_ERROR S3ClientAdaptorImpl::FlushAllCache(uint64_t inodeId) {
}

// force flush data in diskcache to s3
if (!g_kvClientManager && HasDiskCache()) {
if (!kvClientManager_ && HasDiskCache()) {
VLOG(6) << "FlushAllCache, wait inodeId:" << inodeId
<< "related chunk upload to s3";
if (ClearDiskCache(inodeId) < 0) {
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class S3ClientAdaptor {
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
std::shared_ptr<KVClientManager> kvClientManager,
bool startBackGround = false) = 0;
/**
* @brief write data to s3
Expand Down Expand Up @@ -128,6 +129,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
std::shared_ptr<KVClientManager> kvClientManager,
bool startBackGround = false);
/**
* @brief write data to s3
Expand Down Expand Up @@ -274,6 +276,8 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {

TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
taskPool_;

std::shared_ptr<KVClientManager> kvClientManager_ = nullptr;
};

} // namespace client
Expand Down
27 changes: 16 additions & 11 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ FsCacheManager::FindOrCreateFileCacheManager(uint64_t fsId, uint64_t inodeId) {
}

FileCacheManagerPtr fileCacheManager = std::make_shared<FileCacheManager>(
fsId, inodeId, s3ClientAdaptor_);
fsId, inodeId, s3ClientAdaptor_, kvClientManager_);
auto ret = fileCacheManagerMap_.emplace(inodeId, fileCacheManager);
g_s3MultiManagerMetric->fileManagerNum << 1;
assert(ret.second);
Expand Down Expand Up @@ -293,7 +293,8 @@ FileCacheManager::FindOrCreateChunkCacheManager(uint64_t index) {
}

ChunkCacheManagerPtr chunkCacheManager =
std::make_shared<ChunkCacheManager>(index, s3ClientAdaptor_);
std::make_shared<ChunkCacheManager>(index, s3ClientAdaptor_,
kvClientManager_);
auto ret = chunkCacheMap_.emplace(index, chunkCacheManager);
g_s3MultiManagerMetric->chunkManagerNum << 1;
assert(ret.second);
Expand Down Expand Up @@ -509,7 +510,7 @@ bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name,
char *databuf,
uint64_t offset,
uint64_t length) {
if (!g_kvClientManager) {
if (!kvClientManager_) {
return false;
}

Expand All @@ -519,7 +520,7 @@ bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name,
event.Signal();
return;
};
g_kvClientManager->Get(task);
kvClientManager_->Get(task);
event.Wait();

return task->res;
Expand Down Expand Up @@ -616,7 +617,7 @@ int FileCacheManager::ReadKVRequest(
WriteLockGuard writeLockGuard(chunkCacheManager->rwLockChunk_);
DataCachePtr dataCache = std::make_shared<DataCache>(
s3ClientAdaptor_, chunkCacheManager, chunkPos, req->len,
dataBuf + req->readOffset);
dataBuf + req->readOffset, kvClientManager_);
chunkCacheManager->AddReadDataCache(dataCache);
}
}
Expand Down Expand Up @@ -1477,8 +1478,9 @@ DataCachePtr ChunkCacheManager::FindWriteableDataCache(
void ChunkCacheManager::WriteNewDataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
uint32_t chunkPos, uint32_t len,
const char *data) {
DataCachePtr dataCache = std::make_shared<DataCache>(
s3ClientAdaptor, this->shared_from_this(), chunkPos, len, data);
DataCachePtr dataCache =
std::make_shared<DataCache>(s3ClientAdaptor, this->shared_from_this(),
chunkPos, len, data, kvClientManager_);
VLOG(9) << "WriteNewDataCache chunkPos:" << chunkPos << ", len:" << len
<< ", new len:" << dataCache->GetLen() << ",chunkIndex:" << index_;
WriteLockGuard writeLockGuard(rwLockWrite_);
Expand Down Expand Up @@ -1733,7 +1735,8 @@ void ChunkCacheManager::AddWriteDataCacheForTest(DataCachePtr dataCache) {

DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
ChunkCacheManagerPtr chunkCacheManager, uint64_t chunkPos,
uint64_t len, const char *data)
uint64_t len, const char *data,
std::shared_ptr<KVClientManager> kvClientManager)
: s3ClientAdaptor_(std::move(s3ClientAdaptor)),
chunkCacheManager_(chunkCacheManager),
status_(DataCacheStatus::Dirty), inReadCache_(false) {
Expand Down Expand Up @@ -1792,6 +1795,8 @@ DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
assert((actualLen_ % pageSize) == 0);
assert((actualChunkPos_ % pageSize) == 0);
createTime_ = ::curve::common::TimeUtility::GetTimeofDaySec();

kvClientManager_ = std::move(kvClientManager);
}

void DataCache::CopyBufToDataCache(uint64_t dataCachePos, uint64_t len,
Expand Down Expand Up @@ -2312,7 +2317,7 @@ CURVEFS_ERROR DataCache::PrepareFlushTasks(uint64_t inodeId,
s3Tasks->emplace_back(context);

// generate flush to kvcache task
if (g_kvClientManager) {
if (kvClientManager_) {
auto task = std::make_shared<SetKVCacheTask>();
task->key = objectName;
task->value = data + (*writeOffset);
Expand Down Expand Up @@ -2398,11 +2403,11 @@ void DataCache::FlushTaskExecute(
});
}
// kvtask execute
if (g_kvClientManager && kvPendingTaskCal.load()) {
if (kvClientManager_ && kvPendingTaskCal.load()) {
std::for_each(kvCacheTasks.begin(), kvCacheTasks.end(),
[&](const std::shared_ptr<SetKVCacheTask> &task) {
task->done = kvdone;
g_kvClientManager->Set(task);
kvClientManager_->Set(task);
});
kvTaskEnvent.Wait();
}
Expand Down
Loading

0 comments on commit 97bddcf

Please sign in to comment.