Skip to content

Commit

Permalink
fix clang-tidy and add command to update configurations for loading r…
Browse files Browse the repository at this point in the history
…eplicas dynamically
  • Loading branch information
empiredan committed Dec 18, 2024
1 parent be70e2e commit 510dc78
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 72 deletions.
155 changes: 99 additions & 56 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,18 @@
#include "remote_cmd/remote_command.h"
#include "utils/fail_point.h"

static const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
"The maximum concurrent bulk load downloading replica count";
namespace {

const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
"The maximum concurrent bulk load downloading replica count.";

const char *kMaxReplicasOnLoadForEachDiskDesc =
"The max number of replicas that are allowed to be loaded simultaneously for each disk dir.";

const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica loading to complete.";

} // anonymous namespace

DSN_DEFINE_int32(replication,
max_concurrent_bulk_load_downloading_count,
5,
Expand Down Expand Up @@ -269,18 +279,15 @@ DSN_DECLARE_string(server_key);
DSN_DEFINE_uint64(replication,
max_replicas_on_load_for_each_disk,
256,
"The max number of replicas that are allowed to be loaded simultaneously "
"for each disk dir.");
kMaxReplicasOnLoadForEachDiskDesc);
DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE);
DSN_DEFINE_validator(max_replicas_on_load_for_each_disk,
[](uint64_t max_replicas_on_load_for_each_disk) -> bool {
return max_replicas_on_load_for_each_disk > 0;
});
[](uint64_t value) -> bool { return value > 0; });

DSN_DEFINE_uint64(replication,
load_replica_max_wait_time_ms,
10,
"The max waiting time for replica loading to complete.");
DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10, kLoadReplicaMaxWaitTimeMsDesc);
DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE);
DSN_DEFINE_validator(load_replica_max_wait_time_ms,
[](uint64_t value) -> bool { return value > 0; });

DSN_DEFINE_bool(replication,
deny_client_on_start,
Expand Down Expand Up @@ -401,6 +408,40 @@ namespace dsn {
namespace replication {
bool replica_stub::s_not_exit_on_log_failure = false;

namespace {

void register_flags_ctrl_command()
{
static std::once_flag flag;
static std::vector<std::unique_ptr<dsn::command_deregister>> cmds;
std::call_once(flag, []() mutable {
cmds.emplace_back(dsn::command_manager::instance().register_int_command(
FLAGS_max_replicas_on_load_for_each_disk,
FLAGS_max_replicas_on_load_for_each_disk,
"replica.max-replicas-on-load-for-each-disk",
kMaxReplicasOnLoadForEachDiskDesc));

cmds.emplace_back(dsn::command_manager::instance().register_int_command(
FLAGS_load_replica_max_wait_time_ms,
FLAGS_load_replica_max_wait_time_ms,
"replica.load-replica-max-wait-time-ms",
kLoadReplicaMaxWaitTimeMsDesc));

cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
FLAGS_empty_write_disabled,
"replica.disable-empty-write",
"whether to disable empty writes"));

cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
FLAGS_max_concurrent_bulk_load_downloading_count,
FLAGS_max_concurrent_bulk_load_downloading_count,
"replica.max-concurrent-bulk-load-downloading-count",
kMaxConcurrentBulkLoadDownloadingCountDesc));
});
}

} // anonymous namespace

replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
bool is_long_subscriber /* = true*/)
: serverlet("replica_stub"),
Expand Down Expand Up @@ -452,6 +493,8 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_is_long_subscriber = is_long_subscriber;
_failure_detector = nullptr;
_state = NS_Disconnected;

register_flags_ctrl_command();
}

replica_stub::~replica_stub(void) { close(); }
Expand Down Expand Up @@ -531,14 +574,17 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
{
const auto &disks = get_all_disk_dirs();

std::vector<size_t> dir_indexes(disks.size(), 0);
// The index of currently loaded replica dir for each disk. Once current replica
std::vector<size_t> replica_dir_indexes(disks.size(), 0);

struct replica_dir_loader
{
size_t dir_index;
std::string dir_path;
task_ptr load_task;
size_t replica_dir_index;
std::string replica_dir_path;
task_ptr load_replica_task;
};

//
std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size());

utils::ex_lock reps_lock;
Expand All @@ -561,8 +607,8 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support.
const auto &dirs = disks[disk_index].replica_dirs;

auto &dir_index = dir_indexes[disk_index];
if (dir_index >= dirs.size()) {
auto &replica_dir_index = replica_dir_indexes[disk_index];
if (replica_dir_index >= dirs.size()) {
// All of the replicas for the disk `disks[disk_index]` have begun to be loaded,
// thus just skip.
++finished_disks;
Expand All @@ -573,7 +619,7 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
auto &load_disk_queue = load_disk_queues[disk_index];
if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
// Loading replicas should be throttled in case that disk IO is saturated.
if (!load_disk_queue.front().load_task->wait(
if (!load_disk_queue.front().load_replica_task->wait(
static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) {
// There might be too many replicas that are being loaded which lead to
// slow disk IO.
Expand All @@ -582,37 +628,38 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
"={}, tag={}, path={}), skip dir(index={}, path={}), turn to "
"next disk",
FLAGS_load_replica_max_wait_time_ms,
load_disk_queue.front().dir_index,
load_disk_queue.front().dir_path,
load_disk_queue.front().replica_dir_index,
load_disk_queue.front().replica_dir_path,
load_disk_queue.size(),
disk_index,
dn->tag,
dn->full_dir,
dir_index,
dirs[dir_index]);
replica_dir_index,
dirs[replica_dir_index]);
continue;
}

// Continue to load a replica since we are within the limit now.
load_disk_queue.pop();
}

if (dsn::replication::is_data_dir_invalid(dirs[dir_index])) {
LOG_WARNING("ignore dir(index={}, path={})", dir_index, dirs[dir_index]);
++dir_index;
if (dsn::replication::is_data_dir_invalid(dirs[replica_dir_index])) {
LOG_WARNING(
"ignore dir(index={}, path={})", replica_dir_index, dirs[replica_dir_index]);
++replica_dir_index;
continue;
}

LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})",
dir_index,
dirs[dir_index],
replica_dir_index,
dirs[replica_dir_index],
disk_index,
dn->tag,
dn->full_dir);

load_disk_queue.push(replica_dir_loader{
dir_index,
dirs[dir_index],
replica_dir_index,
dirs[replica_dir_index],
tasking::create_task(
// Ensure that the thread pool is non-partitioned.
LPC_REPLICATION_INIT_LOAD,
Expand All @@ -624,13 +671,13 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
&replica_stub::load_replica),
this,
dn,
dirs[dir_index],
dirs[replica_dir_index],
std::ref(reps_lock),
std::ref(reps)))});

load_disk_queue.back().load_task->enqueue();
load_disk_queue.back().load_replica_task->enqueue();

++dir_index;
++replica_dir_index;
}

if (finished_disks >= disks.size()) {
Expand All @@ -642,7 +689,7 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
// All loading tasks have been in the queue. Just wait all tasks to be finished.
for (auto &load_disk_queue : load_disk_queues) {
while (!load_disk_queue.empty()) {
CHECK_TRUE(load_disk_queue.front().load_task->wait());
CHECK_TRUE(load_disk_queue.front().load_replica_task->wait());
load_disk_queue.pop();
}
}
Expand Down Expand Up @@ -2593,11 +2640,6 @@ void replica_stub::register_ctrl_command()
});
}));

_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
FLAGS_empty_write_disabled,
"replica.disable-empty-write",
"whether to disable empty writes"));

#ifdef DSN_ENABLE_GPERF
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
_release_tcmalloc_memory,
Expand Down Expand Up @@ -2632,21 +2674,18 @@ void replica_stub::register_ctrl_command()
#elif defined(DSN_USE_JEMALLOC)
register_jemalloc_ctrl_command();
#endif
_cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
FLAGS_max_concurrent_bulk_load_downloading_count,
FLAGS_max_concurrent_bulk_load_downloading_count,
"replica.max-concurrent-bulk-load-downloading-count",
kMaxConcurrentBulkLoadDownloadingCountDesc));
});
}

std::string
replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
replica_stub::exec_command_on_replica(const std::vector<std::string> &arg_str_list,
bool allow_empty_args,
std::function<std::string(const replica_ptr &rep)> func)
{
if (args.empty() && !allow_empty_args) {
return std::string("invalid arguments");
static const std::string kInvalidArguments("invalid arguments");

if (arg_str_list.empty() && !allow_empty_args) {
return kInvalidArguments;
}

replica_map_by_gpid rs;
Expand All @@ -2657,17 +2696,19 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,

std::set<gpid> required_ids;
replica_map_by_gpid choosed_rs;
if (!args.empty()) {
for (int i = 0; i < args.size(); i++) {
std::vector<std::string> arg_strs;
utils::split_args(args[i].c_str(), arg_strs, ',');
if (arg_strs.empty()) {
return std::string("invalid arguments");
if (!arg_str_list.empty()) {
for (const auto &arg_str : arg_str_list) {
std::vector<std::string> args;
utils::split_args(arg_str.c_str(), args, ',');
if (args.empty()) {
return kInvalidArguments;
}

for (const std::string &arg : arg_strs) {
if (arg.empty())
for (const std::string &arg : args) {
if (arg.empty()) {
continue;
}

gpid id;
int pid;
if (id.parse_from(arg.c_str())) {
Expand All @@ -2686,7 +2727,7 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
}
}
} else {
return std::string("invalid arguments");
return kInvalidArguments;
}
}
}
Expand All @@ -2706,8 +2747,10 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
[rep, &func, &results_lock, &results]() {
partition_status::type status = rep->status();
if (status != partition_status::PS_PRIMARY &&
status != partition_status::PS_SECONDARY)
status != partition_status::PS_SECONDARY) {
return;
}

std::string result = func(rep);
::dsn::zauto_lock l(results_lock);
auto &value = results[rep->get_gpid()];
Expand Down
3 changes: 2 additions & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ typedef rpc_holder<add_new_disk_request, add_new_disk_response> add_new_disk_rpc

namespace test {
class test_checker;
}
} // namespace test

class cold_backup_context;
class replica_split_manager;

Expand Down
41 changes: 26 additions & 15 deletions src/utils/command_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ class command_manager : public ::dsn::utils::singleton<command_manager>
// 'validator' is used to validate the new value.
// The value is reset to 'default_value' if passing "DEFAULT" argument.
template <typename T>
WARN_UNUSED_RESULT std::unique_ptr<command_deregister> register_int_command(
T &value,
T default_value,
const std::string &command,
const std::string &help,
std::function<bool(int64_t new_value)> validator = [](int64_t new_value) -> bool {
return new_value >= 0;
})
WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
register_int_command(T &value,
T default_value,
const std::string &command,
const std::string &help,
std::function<bool(typename std::remove_reference<T>::type)> validator)
{
return register_single_command(
command,
Expand All @@ -83,6 +81,19 @@ class command_manager : public ::dsn::utils::singleton<command_manager>
});
}

template <typename T>
WARN_UNUSED_RESULT std::unique_ptr<command_deregister> register_int_command(
T &value, T default_value, const std::string &command, const std::string &help)
{
return register_int_command(value,
default_value,
command,
help,
[](typename std::remove_reference<T>::type new_value) -> bool {
return new_value >= 0;
});
}

// Register a single 'command' with the 'help' description, its arguments are described in
// 'args'.
std::unique_ptr<command_deregister>
Expand Down Expand Up @@ -133,11 +144,12 @@ class command_manager : public ::dsn::utils::singleton<command_manager>
set_bool(bool &value, const std::string &name, const std::vector<std::string> &args);

template <typename T>
static std::string set_int(T &value,
T default_value,
const std::string &name,
const std::vector<std::string> &args,
const std::function<bool(int64_t value)> &validator)
static std::string
set_int(T &value,
T default_value,
const std::string &name,
const std::vector<std::string> &args,
const std::function<bool(typename std::remove_reference<T>::type)> &validator)
{
nlohmann::json msg;
msg["error"] = "ok";
Expand All @@ -164,8 +176,7 @@ class command_manager : public ::dsn::utils::singleton<command_manager>

// Invalid argument.
T new_value = 0;
if (!internal::buf2signed(args[0], new_value) ||
!validator(static_cast<int64_t>(new_value))) {
if (!buf2numeric(args[0], new_value) || !validator(new_value)) {
msg["error"] =
fmt::format("ERR: invalid argument '{}', the value is not acceptable", args[0]);
return msg.dump(2);
Expand Down

0 comments on commit 510dc78

Please sign in to comment.