29
29
// IWYU pragma: no_include <ext/alloc_traits.h>
30
30
#include < fmt/core.h>
31
31
#include < fmt/format.h>
32
+ #include < nlohmann/json.hpp>
32
33
#include < rapidjson/ostreamwrapper.h>
33
34
#include < stdio.h>
34
35
#include < stdlib.h>
@@ -413,30 +414,33 @@ namespace {
413
414
void register_flags_ctrl_command ()
414
415
{
415
416
static std::once_flag flag;
416
- static std::vector<std::unique_ptr<dsn::command_deregister>> cmds;
417
417
std::call_once (flag, []() mutable {
418
- cmds.emplace_back (dsn::command_manager::instance ().register_int_command (
419
- FLAGS_max_replicas_on_load_for_each_disk,
420
- FLAGS_max_replicas_on_load_for_each_disk,
421
- " replica.max-replicas-on-load-for-each-disk" ,
422
- kMaxReplicasOnLoadForEachDiskDesc ));
423
-
424
- cmds.emplace_back (dsn::command_manager::instance ().register_int_command (
425
- FLAGS_load_replica_max_wait_time_ms,
426
- FLAGS_load_replica_max_wait_time_ms,
427
- " replica.load-replica-max-wait-time-ms" ,
428
- kLoadReplicaMaxWaitTimeMsDesc ));
429
-
430
- cmds.emplace_back (dsn::command_manager::instance ().register_bool_command (
431
- FLAGS_empty_write_disabled,
432
- " replica.disable-empty-write" ,
433
- " whether to disable empty writes" ));
434
-
435
- cmds.emplace_back (::dsn::command_manager::instance ().register_int_command (
436
- FLAGS_max_concurrent_bulk_load_downloading_count,
437
- FLAGS_max_concurrent_bulk_load_downloading_count,
438
- " replica.max-concurrent-bulk-load-downloading-count" ,
439
- kMaxConcurrentBulkLoadDownloadingCountDesc ));
418
+ dsn::command_manager::instance ().add_global_cmd (
419
+ dsn::command_manager::instance ().register_int_command (
420
+ FLAGS_max_replicas_on_load_for_each_disk,
421
+ FLAGS_max_replicas_on_load_for_each_disk,
422
+ " replica.max-replicas-on-load-for-each-disk" ,
423
+ kMaxReplicasOnLoadForEachDiskDesc ));
424
+
425
+ dsn::command_manager::instance ().add_global_cmd (
426
+ dsn::command_manager::instance ().register_int_command (
427
+ FLAGS_load_replica_max_wait_time_ms,
428
+ FLAGS_load_replica_max_wait_time_ms,
429
+ " replica.load-replica-max-wait-time-ms" ,
430
+ kLoadReplicaMaxWaitTimeMsDesc ));
431
+
432
+ dsn::command_manager::instance ().add_global_cmd (
433
+ dsn::command_manager::instance ().register_bool_command (
434
+ FLAGS_empty_write_disabled,
435
+ " replica.disable-empty-write" ,
436
+ " whether to disable empty writes" ));
437
+
438
+ dsn::command_manager::instance ().add_global_cmd (
439
+ dsn::command_manager::instance ().register_int_command (
440
+ FLAGS_max_concurrent_bulk_load_downloading_count,
441
+ FLAGS_max_concurrent_bulk_load_downloading_count,
442
+ " replica.max-concurrent-bulk-load-downloading-count" ,
443
+ kMaxConcurrentBulkLoadDownloadingCountDesc ));
440
444
});
441
445
}
442
446
@@ -445,6 +449,9 @@ void register_flags_ctrl_command()
445
449
replica_stub::replica_stub (replica_state_subscriber subscriber /* = nullptr*/ ,
446
450
bool is_long_subscriber /* = true*/ )
447
451
: serverlet(" replica_stub" ),
452
+ _state (NS_Disconnected),
453
+ _replica_state_subscriber(subscriber),
454
+ _is_long_subscriber(is_long_subscriber),
448
455
_deny_client(false ),
449
456
_verbose_client_log(false ),
450
457
_verbose_commit_log(false ),
@@ -454,6 +461,9 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
454
461
_bulk_load_downloading_count(0 ),
455
462
_manual_emergency_checkpointing_count(0 ),
456
463
_is_running(false ),
464
+ #ifdef DSN_ENABLE_GPERF
465
+ _is_releasing_memory (false ),
466
+ #endif
457
467
METRIC_VAR_INIT_server (total_replicas),
458
468
METRIC_VAR_INIT_server(opening_replicas),
459
469
METRIC_VAR_INIT_server(closing_replicas),
@@ -486,18 +496,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
486
496
METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms),
487
497
METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes)
488
498
{
489
- #ifdef DSN_ENABLE_GPERF
490
- _is_releasing_memory = false ;
491
- #endif
492
- _replica_state_subscriber = subscriber;
493
- _is_long_subscriber = is_long_subscriber;
494
- _failure_detector = nullptr ;
495
- _state = NS_Disconnected;
496
-
497
499
register_flags_ctrl_command ();
498
500
}
499
501
500
- replica_stub::~replica_stub (void ) { close (); }
502
+ replica_stub::~replica_stub () { close (); }
501
503
502
504
void replica_stub::initialize (bool clear /* = false*/ )
503
505
{
@@ -2680,7 +2682,7 @@ void replica_stub::register_ctrl_command()
2680
2682
std::string
2681
2683
replica_stub::exec_command_on_replica (const std::vector<std::string> &arg_str_list,
2682
2684
bool allow_empty_args,
2683
- std::function<std::string(const replica_ptr &rep )> func)
2685
+ std::function<std::string(const replica_ptr &)> func)
2684
2686
{
2685
2687
static const std::string kInvalidArguments (" invalid arguments" );
2686
2688
@@ -2710,25 +2712,29 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &arg_str_li
2710
2712
}
2711
2713
2712
2714
gpid id;
2713
- int pid;
2714
2715
if (id.parse_from (arg.c_str ())) {
2715
2716
// app_id.partition_index
2716
2717
required_ids.insert (id);
2717
2718
auto find = rs.find (id);
2718
2719
if (find != rs.end ()) {
2719
2720
choosed_rs[id] = find->second ;
2720
2721
}
2721
- } else if (sscanf (arg.c_str (), " %d" , &pid) == 1 ) {
2722
- // app_id
2723
- for (auto kv : rs) {
2724
- id = kv.second ->get_gpid ();
2725
- if (id.get_app_id () == pid) {
2726
- choosed_rs[id] = kv.second ;
2727
- }
2728
- }
2729
- } else {
2722
+
2723
+ continue ;
2724
+ }
2725
+
2726
+ int pid = 0 ;
2727
+ if (sscanf (arg.c_str (), " %d" , &pid) != 1 ) {
2730
2728
return kInvalidArguments ;
2731
2729
}
2730
+
2731
+ // app_id
2732
+ for (const auto &[_, rep] : rs) {
2733
+ id = rep->get_gpid ();
2734
+ if (id.get_app_id () == pid) {
2735
+ choosed_rs[id] = rep;
2736
+ }
2737
+ }
2732
2738
}
2733
2739
}
2734
2740
} else {
0 commit comments