Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for raft wal log poc code #2987

Open
wants to merge 1 commit into
base: release1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

dir=`pwd`
#step1 清除生成的目录和文件
bazel clean
#bazel clean
rm -rf curvefs_python/BUILD
rm -rf curvefs_python/tmplib/

git submodule update --init
if [ $? -ne 0 ]
then
echo "submodule init failed"
exit
fi
#git submodule update --init
#if [ $? -ne 0 ]
#then
# echo "submodule init failed"
# exit
#fi

#step2 获取tag版本和git提交版本信息
#获取tag版本
Expand Down Expand Up @@ -90,24 +90,25 @@ fi
echo "gcc version : "`gcc -dumpversion`

echo "start compile"
cd ${dir}/thirdparties/etcdclient
make clean
make all
if [ $? -ne 0 ]
then
echo "make etcd client failed"
exit
fi
cd ${dir}

cp ${dir}/thirdparties/etcdclient/libetcdclient.h ${dir}/include/etcdclient/etcdclient.h
#cd ${dir}/thirdparties/etcdclient
#make clean
#make all
#if [ $? -ne 0 ]
#then
# echo "make etcd client failed"
# exit
#fi
#cd ${dir}
#
#cp ${dir}/thirdparties/etcdclient/libetcdclient.h ${dir}/include/etcdclient/etcdclient.h

if [ `gcc -dumpversion | awk -F'.' '{print $1}'` -le 6 ]
then
bazelflags=''
else
bazelflags='--copt -faligned-new'
fi
echo "bazelflags=$bazelflags"

if [ "$1" = "debug" ]
then
Expand Down Expand Up @@ -138,31 +139,22 @@ then
exit
fi
else
bazel build ... --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
#bazel build ... --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
#bazel build //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
#bazel build //src/client:all --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
#bazel build //:curvebs-sdk --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
#bazel build --spawn_strategy=local //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
cmd="bazel build //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \
--define=libunwind=true --copt -DGFLAGS_NS=google --copt \
-Wno-error=format-security --copt -DUSE_BTHREAD_MUTEX --copt -DCURVEVERSION=${curve_version} \
--linkopt -L/usr/local/lib ${bazelflags}
--linkopt -L/usr/local/lib ${bazelflags}"

echo $cmd
$cmd
if [ $? -ne 0 ]
then
echo "build phase1 failed"
exit
fi
bash ./curvefs_python/configure.sh
if [ $? -ne 0 ]
then
echo "configure failed"
exit
fi
bazel build curvefs_python:curvefs --copt -DHAVE_ZLIB=1 --copt -O2 -s \
--define=with_glog=true --define=libunwind=true --copt -DGFLAGS_NS=google \
--copt \
-Wno-error=format-security --copt -DUSE_BTHREAD_MUTEX --linkopt \
-L${dir}/curvefs_python/tmplib/ --copt -DCURVEVERSION=${curve_version} \
--linkopt -L/usr/local/lib ${bazelflags}
if [ $? -ne 0 ]
then
echo "build phase2 failed"
exit
fi
fi
echo "end compile"
echo "end compile"
75 changes: 75 additions & 0 deletions curve.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"folders": [
{
"path": "."
}
],
"settings": {
"files.associations": {
"*.rst": "rust",
"atomic": "cpp",
"deque": "cpp",
"string": "cpp",
"vector": "cpp",
"array": "cpp",
"*.tcc": "cpp",
"memory": "cpp",
"future": "cpp",
"istream": "cpp",
"ranges": "cpp",
"functional": "cpp",
"tuple": "cpp",
"utility": "cpp",
"list": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"hash_map": "cpp",
"hash_set": "cpp",
"bitset": "cpp",
"cctype": "cpp",
"chrono": "cpp",
"cinttypes": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"condition_variable": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"iterator": "cpp",
"map": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"random": "cpp",
"ratio": "cpp",
"regex": "cpp",
"set": "cpp",
"system_error": "cpp",
"type_traits": "cpp",
"slist": "cpp",
"fstream": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"cfenv": "cpp",
"typeinfo": "cpp"
}
}
}
2 changes: 1 addition & 1 deletion src/chunkserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ cc_library(
],
)

# chunkserver exec
#chunkserver exec
cc_binary(
name = "chunkserver",
srcs = glob([
Expand Down
8 changes: 8 additions & 0 deletions src/chunkserver/chunk_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ void ChunkServiceImpl::ReadChunk(RpcController *controller,
const ChunkRequest *request,
ChunkResponse *response,
Closure *done) {
/*
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
response->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS);
cntl->response_attachment().resize(request->size());
done->Run();
return;
*/

ChunkServiceClosure* closure =
new (std::nothrow) ChunkServiceClosure(inflightThrottle_,
request,
Expand Down
25 changes: 22 additions & 3 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ DEFINE_string(chunkServerMetaUri,
DEFINE_string(copySetUri, "local://./0/copysets", "copyset data uri");
DEFINE_string(raftSnapshotUri, "curve://./0/copysets", "raft snapshot uri");
DEFINE_string(raftLogUri, "curve://./0/copysets", "raft log uri");
DEFINE_string(raftMetaUri, "local://./0/copysets", "zyb, raft mata uri");
DEFINE_string(recycleUri, "local://./0/recycler" , "recycle uri");
DEFINE_string(chunkFilePoolDir, "./0/", "chunk file pool location");
DEFINE_string(chunkFilePoolMetaPath,
Expand All @@ -73,6 +74,10 @@ DEFINE_bool(enableWalfilepool, true, "enable WAL filepool");
DEFINE_string(walFilePoolDir, "./0/", "WAL filepool location");
DEFINE_string(walFilePoolMetaPath, "./walfilepool.meta",
"WAL filepool meta path");
DEFINE_bool(useChunkFilePool, false,
"zyb, walfilepool.use_chunk_file_pool");
DEFINE_uint32(walFilePoolSegmentSize, 16777216,
"zyb, walfilepool.segment_size");

const char* kProtocalCurve = "curve";

Expand Down Expand Up @@ -104,6 +109,7 @@ int ChunkServer::Run(int argc, char** argv) {
curve::common::ExposeCurveVersion();

// ============================初始化各模块==========================//
LOG(INFO) << "zyb mod ChunkServer";
LOG(INFO) << "Initializing ChunkServer modules";

LOG_IF(FATAL, !conf.GetUInt32Value("global.min_io_alignment",
Expand Down Expand Up @@ -710,9 +716,6 @@ void ChunkServer::LoadConfigFromCmdline(common::Configuration *conf) {
if (GetCommandLineFlagInfo("raftSnapshotUri", &info) && !info.is_default) {
conf->SetStringValue(
"copyset.raft_snapshot_uri", FLAGS_raftSnapshotUri);
} else {
LOG(FATAL)
<< "raftSnapshotUri must be set when run chunkserver in command.";
}
if (GetCommandLineFlagInfo("raftLogUri", &info) && !info.is_default) {
conf->SetStringValue(
Expand All @@ -721,6 +724,10 @@ void ChunkServer::LoadConfigFromCmdline(common::Configuration *conf) {
LOG(FATAL)
<< "raftLogUri must be set when run chunkserver in command.";
}
if (GetCommandLineFlagInfo("raftMetaUri", &info) && !info.is_default) {
conf->SetStringValue(
"copyset.raft_meta_uri", FLAGS_raftMetaUri);
}

if (GetCommandLineFlagInfo("recycleUri", &info) &&
!info.is_default) {
Expand Down Expand Up @@ -766,6 +773,18 @@ void ChunkServer::LoadConfigFromCmdline(common::Configuration *conf) {
<< "walFilePoolMetaPath must be set when run chunkserver in command.";
}

if (GetCommandLineFlagInfo("useChunkFilePool", &info) &&
!info.is_default) {
conf->SetBoolValue(
"walfilepool.use_chunk_file_pool", FLAGS_useChunkFilePool);
}

if (GetCommandLineFlagInfo("walFilePoolSegmentSize", &info) &&
!info.is_default) {
conf->SetIntValue(
"walfilepool.segment_size", FLAGS_walFilePoolSegmentSize);
}

if (GetCommandLineFlagInfo("mdsListenAddr", &info) && !info.is_default) {
conf->SetStringValue("mds.listen.addr", FLAGS_mdsListenAddr);
}
Expand Down
29 changes: 27 additions & 2 deletions src/chunkserver/concurrent_apply/concurrent_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ bool ConcurrentApplyModule::Init(const ConcurrentApplyOption &opt) {
}

start_ = true;
cond_.Reset(opt.rconcurrentsize + opt.wconcurrentsize);
cond_.Reset(opt.rconcurrentsize + opt.wconcurrentsize + opt.wconcurrentsize);
InitThreadPool(ThreadPoolType::READ, rconcurrentsize_, rqueuedepth_);
InitThreadPool(ThreadPoolType::WRITE, wconcurrentsize_, wqueuedepth_);
InitThreadPool(ThreadPoolType::DELETE, dconcurrentsize_, dqueuedepth_);

if (!cond_.WaitFor(5000)) {
LOG(ERROR) << "init concurrent module's threads fail";
Expand All @@ -72,6 +73,8 @@ bool ConcurrentApplyModule::checkOptAndInit(
wqueuedepth_ = opt.wqueuedepth;
rconcurrentsize_ = opt.rconcurrentsize;
rqueuedepth_ = opt.rqueuedepth;
dconcurrentsize_ = opt.wconcurrentsize;
dqueuedepth_ = opt.wqueuedepth;

return true;
}
Expand All @@ -91,6 +94,10 @@ void ConcurrentApplyModule::InitThreadPool(
case ThreadPoolType::WRITE:
wapplyMap_.insert(std::make_pair(i, asyncth));
break;

case ThreadPoolType::DELETE:
dapplyMap_.insert(std::make_pair(i, asyncth));
break;
}
}

Expand All @@ -105,6 +112,11 @@ void ConcurrentApplyModule::InitThreadPool(
wapplyMap_[i]->th =
std::thread(&ConcurrentApplyModule::Run, this, type, i);
break;

case ThreadPoolType::DELETE:
dapplyMap_[i]->th =
std::thread(&ConcurrentApplyModule::Run, this, type, i);
break;
}
}
}
Expand All @@ -120,6 +132,10 @@ void ConcurrentApplyModule::Run(ThreadPoolType type, int index) {
case ThreadPoolType::WRITE:
wapplyMap_[index]->tq.Pop()();
break;

case ThreadPoolType::DELETE:
dapplyMap_[index]->tq.Pop()();
break;
}
}
}
Expand All @@ -142,6 +158,13 @@ void ConcurrentApplyModule::Stop() {
}
wapplyMap_.clear();

for (auto iter : dapplyMap_) {
iter.second->tq.Push(wakeup);
iter.second->th.join();
delete iter.second;
}
dapplyMap_.clear();

LOG(INFO) << "stop ConcurrentApplyModule ok.";
}

Expand All @@ -163,8 +186,10 @@ ThreadPoolType ConcurrentApplyModule::Schedule(CHUNK_OP_TYPE optype) {
case CHUNK_OP_READ:
case CHUNK_OP_RECOVER:
return ThreadPoolType::READ;
default:
case CHUNK_OP_WRITE:
return ThreadPoolType::WRITE;
default:
return ThreadPoolType::DELETE;
}
}
} // namespace concurrent
Expand Down
12 changes: 11 additions & 1 deletion src/chunkserver/concurrent_apply/concurrent_apply.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,21 @@ struct ConcurrentApplyOption {
int wqueuedepth;
int rconcurrentsize;
int rqueuedepth;
int dconcurrentsize;
int dqueuedepth;
};

enum class ThreadPoolType {READ, WRITE};
enum class ThreadPoolType {READ, WRITE, DELETE};

class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
public:
ConcurrentApplyModule(): start_(false),
rconcurrentsize_(0),
wconcurrentsize_(0),
dconcurrentsize_(0),
rqueuedepth_(0),
wqueuedepth_(0),
dqueuedepth_(0),
cond_(0) {}
~ConcurrentApplyModule() {}

Expand Down Expand Up @@ -90,6 +94,9 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
case ThreadPoolType::WRITE:
wapplyMap_[Hash(key, wconcurrentsize_)]->tq.Push(task);
break;
case ThreadPoolType::DELETE:
dapplyMap_[Hash(key, dconcurrentsize_)]->tq.Push(task);
break;
}

return true;
Expand Down Expand Up @@ -129,9 +136,12 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
int rqueuedepth_;
int wconcurrentsize_;
int wqueuedepth_;
int dconcurrentsize_;
int dqueuedepth_;
CountDownEvent cond_;
CURVE_CACHELINE_ALIGNMENT std::unordered_map<threadIndex, taskthread_t*> wapplyMap_; // NOLINT
CURVE_CACHELINE_ALIGNMENT std::unordered_map<threadIndex, taskthread_t*> rapplyMap_; // NOLINT
CURVE_CACHELINE_ALIGNMENT std::unordered_map<threadIndex, taskthread_t*> dapplyMap_; // NOLINT
};
} // namespace concurrent
} // namespace chunkserver
Expand Down
Loading