Skip to content

Commit

Permalink
curvefs/client:ptimize warmup performance
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong authored and ilixiaocui committed Feb 14, 2023
1 parent 549c0fa commit 503aa34
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 36 deletions.
1 change: 1 addition & 0 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ fuseClient.disableXattr=false
fuseClient.maxDataSize=1024
# default refresh data interval 30s
fuseClient.refreshDataIntervalSec=30
fuseClient.warmupThreadsNum=10

#### volume
volume.bigFileSize=1048576
Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);
conf->GetValueFatalIfFail("fuseClient.downloadMaxRetryTimes",
&clientOption->downloadMaxRetryTimes);

conf->GetValueFatalIfFail("fuseClient.warmupThreadsNum",
&clientOption->warmupThreadsNum);
LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice",
&clientOption->enableFuseSplice))
<< "Not found `fuseClient.enableSplice` in conf, use default value `"
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ struct FuseClientOption {
bool enableFuseSplice = false;
bool disableXattr = false;
uint32_t downloadMaxRetryTimes;
uint32_t warmupThreadsNum;
};

void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption);
Expand Down
1 change: 0 additions & 1 deletion curvefs/src/client/dentry_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ CURVEFS_ERROR DentryCacheManagerImpl::GetDentry(uint64_t parent,
<< ", parent = " << parent << ", name = " << name;
return MetaStatusCodeToCurvefsErrCode(ret);
}

if (!curvefs::client::common::FLAGS_enableCto) {
dCache_->Put(key, *out);
}
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
warmUpFile_.exist = false;
bgCmdStop_.store(false, std::memory_order_release);
bgCmdTaskThread_ = Thread(&FuseClient::WarmUpTask, this);
taskFetchMetaPool_.Start(WARMUP_THREADS);
taskFetchMetaPool_.Start(option_.warmupThreadsNum);
return ret3;
}

Expand Down
5 changes: 2 additions & 3 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@

#define DirectIOAlignment 512
#define WARMUP_CHECKINTERVAL_US 1000*1000
#define WARMUP_THREADS 10

using ::curve::common::Atomic;
using ::curve::common::InterruptibleSleeper;
Expand Down Expand Up @@ -268,9 +267,9 @@ class FuseClient {
void SetEnableSumInDir(bool enable) {
enableSumInDir_ = enable;
}
std::list<fuse_ino_t>& GetReadAheadFiles() {
void GetReadAheadFiles(std::list<fuse_ino_t>* readAheadFiles) {
std::unique_lock<std::mutex> lck(fetchMtx_);
return readAheadFiles_;
*readAheadFiles = std::move(readAheadFiles_);
}

void GetWarmUpFile(WarmUpFileContext_t* warmUpFile) {
Expand Down
80 changes: 55 additions & 25 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
inodeManager_, mdsClient_, fsCacheManager,
nullptr, true);
}

isWarmUping_.store(false);
bgFetchStop_.store(false, std::memory_order_release);
bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this);
initbgFetchThread_ = true;
Expand Down Expand Up @@ -151,8 +151,7 @@ void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile,

void FuseS3Client::BackGroundFetch() {
while (!bgFetchStop_.load(std::memory_order_acquire)) {
LOG_EVERY_N(WARNING, 100)
<< "fetch thread start.";
usleep(WARMUP_CHECKINTERVAL_US);
if (hasWarmUpTask()) { // new warmup task
WarmUpFileContext_t warmUpFile;
GetWarmUpFile(&warmUpFile);
Expand All @@ -167,15 +166,30 @@ void FuseS3Client::BackGroundFetch() {
}
{ // file need warmup
std::list<fuse_ino_t> readAheadFiles;
readAheadFiles.swap(GetReadAheadFiles());
for (auto iter : readAheadFiles) {
VLOG(9) << "BackGroundFetch: " << iter;
fetchDataEnqueue(iter);
GetReadAheadFiles(&readAheadFiles);
if (!readAheadFiles.empty()) {
LOG(INFO) << "num of files is need loaded is: "
<< readAheadFiles.size();
for (auto iter : readAheadFiles) {
VLOG(9) << "BackGroundFetch: " << iter;
fetchDataEnqueue(iter);
}
}
}
LOG_EVERY_N(WARNING, 100)
<< "fetch thread end.";
usleep(WARMUP_CHECKINTERVAL_US);
{ // objs will be downloaded
{
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
if (needWarmupObjs_.empty()) {
continue;
}
}
if (isWarmUping_.exchange(true)) {
continue;
}
std::thread downloadThread =
std::thread(&FuseS3Client::WarmUpAllObjs, this);
downloadThread.detach();
}
}
return;
}
Expand Down Expand Up @@ -204,8 +218,7 @@ void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) {
}

// travel and download all objs belong to the chunk
void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
std::list<std::pair<std::string, uint64_t>>* prefetchObjs) {
void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo) {
uint64_t blockSize = s3Adaptor_->GetBlockSize();
uint64_t chunkSize = s3Adaptor_->GetChunkSize();
uint64_t offset, len, chunkid, compaction;
Expand All @@ -226,7 +239,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
if (len < blockSize) { // just one block
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndexBegin, compaction, fsId, ino);
prefetchObjs->push_back(std::make_pair(objectName, len));
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(std::make_pair(objectName, len));
} else {
// the offset in the block
uint64_t blockPos = chunkPos % blockSize;
Expand Down Expand Up @@ -257,7 +271,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
travelStartIndex = blockIndexBegin + 1;
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndexBegin, compaction, fsId, ino);
prefetchObjs->push_back(std::make_pair(
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(std::make_pair(
objectName, firstBlockSize));
} else {
travelStartIndex = blockIndexBegin;
Expand All @@ -270,7 +285,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
chunkid, blockIndexEnd, compaction, fsId, ino);
// there is no need to care about the order
// in which objects are downloaded
prefetchObjs->push_back(
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(
std::make_pair(objectName, lastBlockSize));
} else {
travelEndIndex = blockIndexEnd;
Expand All @@ -293,22 +309,36 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
blockIndex <= travelEndIndex ; blockIndex++) {
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndex, compaction, fsId, ino);
prefetchObjs->push_back(std::make_pair(objectName, blockSize));
{
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(
std::make_pair(objectName, blockSize));
}
}
}
}
}

// TODO(hzwuhongsong): These logics are very similar to other place,
// try to merge it
void FuseS3Client::WarmUpAllObjs(
const std::list<std::pair<std::string, uint64_t>> &prefetchObjs) {
void FuseS3Client::WarmUpAllObjs() {
std::list<std::pair<std::string, uint64_t>> needWarmupObjs;
{
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
LOG(INFO) << "num of objs need loaded is: " << needWarmupObjs_.size();
needWarmupObjs = std::move(needWarmupObjs_);
}
std::atomic<uint64_t> pendingReq(0);
curve::common::CountDownEvent cond(1);
// callback function
GetObjectAsyncCallBack cb =
[&](const S3Adapter *adapter,
const std::shared_ptr<GetObjectAsyncContext> &context) {
if (bgFetchStop_.load()) {
LOG(INFO) << "need stop warmup";
cond.Signal();
return;
}
if (context->retCode == 0) {
VLOG(9) << "Get Object success: " << context->key;
int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(
Expand Down Expand Up @@ -341,10 +371,10 @@ void FuseS3Client::WarmUpAllObjs(
s3Adaptor_->GetS3Client()->DownloadAsync(context);
};

pendingReq.fetch_add(prefetchObjs.size(), std::memory_order_seq_cst);
if (pendingReq.load(std::memory_order_seq_cst)) {
VLOG(9) << "wait for pendingReq";
for (auto iter : prefetchObjs) {
pendingReq.fetch_add(needWarmupObjs.size(), std::memory_order_seq_cst);
if (pendingReq.load()) {
VLOG(9) << "wait for pendingReq" << pendingReq.load();
for (auto iter : needWarmupObjs) {
VLOG(9) << "download start: " << iter.first;
std::string name = iter.first;
uint64_t readLen = iter.second;
Expand All @@ -366,19 +396,19 @@ void FuseS3Client::WarmUpAllObjs(
if (pendingReq.load())
cond.Wait();
}
isWarmUping_.exchange(false);
LOG(INFO) << "num of objs is loaded over ";
}

void FuseS3Client::travelChunks(
fuse_ino_t ino,
const google::protobuf::Map<uint64_t, S3ChunkInfoList>& s3ChunkInfoMap) {
VLOG(9) << "travel chunk start: " << ino
<< ", size: " << s3ChunkInfoMap.size();
std::list<std::pair<std::string, uint64_t>> prefetchObjs;
for (auto const& iter : s3ChunkInfoMap) {
VLOG(9) << "travel chunk: " << iter.first;
travelChunk(ino, iter.second, &prefetchObjs);
travelChunk(ino, iter.second);
}
WarmUpAllObjs(prefetchObjs);
VLOG(9) << "travel chunks end";
}

Expand Down
10 changes: 5 additions & 5 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ class FuseS3Client : public FuseClient {
fuse_ino_t ino,
const google::protobuf::Map<uint64_t, S3ChunkInfoList>& s3ChunkInfoMap);
// travel and download all objs belong to the chunk
void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
std::list<std::pair<std::string, uint64_t>>* prefetchObjs);
void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo);
// warmup all the prefetchObjs
void WarmUpAllObjs(const std::list<
std::pair<std::string, uint64_t>> &prefetchObjs);
void WarmUpAllObjs();

private:
// s3 adaptor
Expand All @@ -122,8 +120,10 @@ class FuseS3Client : public FuseClient {
bool initbgFetchThread_;
Thread bgFetchThread_;
std::atomic<bool> bgFetchStop_;
std::mutex fetchMtx_;
uint32_t downloadMaxRetryTimes_;
std::mutex warmupObjsMtx_;
std::atomic<bool> isWarmUping_;
std::list<std::pair<std::string, uint64_t>> needWarmupObjs_;
};


Expand Down
9 changes: 9 additions & 0 deletions curvefs/test/client/test_fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ class TestFuseS3Client : public ::testing::Test {
s3ClientAdaptor_);
InitOptionBasic(&fuseClientOption_);
InitFSInfo(client_);
fuseClientOption_.s3Opt.s3AdaptrOpt.asyncThreadNum = 1;
fuseClientOption_.dummyServerStartPort = 5000;
fuseClientOption_.maxNameLength = 20u;
fuseClientOption_.listDentryThreads = 2;
fuseClientOption_.warmupThreadsNum = 10;
auto fsInfo = std::make_shared<FsInfo>();
fsInfo->set_fsid(fsId);
fsInfo->set_fsname("s3fs");
client_->SetFsInfo(fsInfo);
client_->Init(fuseClientOption_);
PrepareFsInfo();
}
Expand Down

0 comments on commit 503aa34

Please sign in to comment.