From d9f2600ec9cb6812d567ffb89539e5d03467f3e6 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Mon, 2 Dec 2024 18:35:04 +0800 Subject: [PATCH] feat(duplication): collect last committed decrees from primary replicas to meta server of the master cluster for duplication (#2159) While a table is being migrated to another cluster by duplication, we want to check if the migration is finished by decree. Since we already have confirmed decree for the log entires duplicated to the follower table, we need to collect last committed decrees from the primary replicas to the meta server of the master cluster. We would compare both kinds of decrees to check whether the migration is finished. Following configurations are newly added: ```diff [replication] + dup_progress_min_update_period_ms = 5000 + dup_progress_min_report_period_ms = 300000 ``` --- idl/duplication.thrift | 4 + src/meta/duplication/duplication_info.cpp | 80 ++++++++++---- src/meta/duplication/duplication_info.h | 8 +- .../duplication/meta_duplication_service.cpp | 38 ++++--- src/meta/meta_state_service_utils.h | 20 +++- src/meta/server_state.cpp | 9 +- src/meta/test/duplication_info_test.cpp | 103 ++++++++++++++---- .../test/meta_duplication_service_test.cpp | 29 ++--- .../replica_duplicator_manager.cpp | 1 + 9 files changed, 214 insertions(+), 78 deletions(-) diff --git a/idl/duplication.thrift b/idl/duplication.thrift index 140361ca94..589cc5d085 100644 --- a/idl/duplication.thrift +++ b/idl/duplication.thrift @@ -172,6 +172,10 @@ struct duplication_confirm_entry 1:i32 dupid; 2:i64 confirmed_decree; 3:optional bool checkpoint_prepared = false; + + // Last committed decree from the primary replica of each partition, collected by + // meta server and used to be compared with duplicating progress of follower table. + 4:optional i64 last_committed_decree; } // This is an internal RPC sent from replica server to meta. diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 7f7bb62b69..8f609c7b20 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -20,10 +20,20 @@ #include "common/duplication_common.h" #include "meta/meta_data.h" #include "runtime/api_layer1.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" -namespace dsn { -namespace replication { +DSN_DEFINE_uint64(replication, + dup_progress_min_update_period_ms, + 5000, + "The minimum period in milliseconds that progress of duplication is updated"); + +DSN_DEFINE_uint64(replication, + dup_progress_min_report_period_ms, + 5ULL * 60 * 1000, + "The minimum period in milliseconds that progress of duplication is reported"); + +namespace dsn::replication { /*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s) { @@ -116,8 +126,13 @@ void duplication_info::init_progress(int partition_index, decree d) zauto_write_lock l(_lock); auto &p = _progress[partition_index]; + + p.last_committed_decree = invalid_decree; p.volatile_decree = p.stored_decree = d; + p.is_altering = false; + p.last_progress_update_ms = 0; p.is_inited = true; + p.checkpoint_prepared = false; } bool duplication_info::alter_progress(int partition_index, @@ -126,9 +141,18 @@ bool duplication_info::alter_progress(int partition_index, zauto_write_lock l(_lock); partition_progress &p = _progress[partition_index]; + + // last_committed_decree could be update at any time no matter whether progress is + // initialized or busy updating, since it is not persisted to remote meta storage. + // It is just collected from the primary replica of each partition. + if (confirm_entry.__isset.last_committed_decree) { + p.last_committed_decree = confirm_entry.last_committed_decree; + } + if (!p.is_inited) { return false; } + if (p.is_altering) { return false; } @@ -137,15 +161,19 @@ bool duplication_info::alter_progress(int partition_index, if (p.volatile_decree < confirm_entry.confirmed_decree) { p.volatile_decree = confirm_entry.confirmed_decree; } - if (p.volatile_decree != p.stored_decree) { - // progress update is not supposed to be too frequent. - if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) { - p.is_altering = true; - p.last_progress_update_ms = dsn_now_ms(); - return true; - } + + if (p.volatile_decree == p.stored_decree) { + return false; + } + + // Progress update is not supposed to be too frequent. + if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { + return false; } - return false; + + p.is_altering = true; + p.last_progress_update_ms = dsn_now_ms(); + return true; } void duplication_info::persist_progress(int partition_index) @@ -163,13 +191,26 @@ void duplication_info::persist_status() zauto_write_lock l(_lock); if (!_is_altering) { - LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store"); + LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, " + "next_status={}, master_app_id={}, master_app_name={}, " + "follower_cluster_name={}, follower_app_name={}", + duplication_status_to_string(_status), + duplication_status_to_string(_next_status), + app_id, + app_name, + remote_cluster_name, + remote_app_name); return; } - LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]", + + LOG_INFO_PREFIX("change duplication status from {} to {} successfully: master_app_id={}, " + "master_app_name={}, follower_cluster_name={}, follower_app_name={}", duplication_status_to_string(_status), duplication_status_to_string(_next_status), - app_id); + app_id, + app_name, + remote_cluster_name, + remote_app_name); _is_altering = false; _status = _next_status; @@ -197,11 +238,13 @@ blob duplication_info::to_json_blob() const void duplication_info::report_progress_if_time_up() { - // progress report is not supposed to be too frequent. - if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) { - _last_progress_report_ms = dsn_now_ms(); - LOG_INFO("duplication report: {}", to_string()); + // Progress report is not supposed to be too frequent. + if (dsn_now_ms() < _last_progress_report_ms + FLAGS_dup_progress_min_report_period_ms) { + return; } + + _last_progress_report_ms = dsn_now_ms(); + LOG_INFO("duplication report: {}", to_string()); } duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, @@ -263,5 +306,4 @@ void duplication_info::append_if_valid_for_query( ent.__isset.progress = false; } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index e1ddcacf38..7563d3d411 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -216,13 +216,15 @@ class duplication_info mutable zrwlock_nr _lock; - static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000; // 5s - static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min - struct partition_progress { + // Last committed decree collected from the primary replica of each partition. + // Not persisted to remote meta storage. + int64_t last_committed_decree{invalid_decree}; + int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; + bool is_altering{false}; uint64_t last_progress_update_ms{0}; bool is_inited{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index c557c890a2..f63d8af359 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -791,31 +791,36 @@ void meta_duplication_service::do_update_partition_confirmed( int32_t partition_idx, const duplication_confirm_entry &confirm_entry) { - if (dup->alter_progress(partition_idx, confirm_entry)) { - std::string path = get_partition_path(dup, std::to_string(partition_idx)); - blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); + if (!dup->alter_progress(partition_idx, confirm_entry)) { + return; + } + + const auto &path = get_partition_path(dup, std::to_string(partition_idx)); + + _meta_svc->get_meta_storage()->get_data( + path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob &data) mutable { + auto value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); - _meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable { - if (data.length() == 0) { + if (data.empty()) { _meta_svc->get_meta_storage()->create_node( - std::string(path), std::move(value), [=]() mutable { - dup->persist_progress(partition_idx); - rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = - confirm_entry.confirmed_decree; - }); - } else { - _meta_svc->get_meta_storage()->set_data( - std::string(path), std::move(value), [=]() mutable { + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); + return; } + _meta_svc->get_meta_storage()->set_data( + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { + dup->persist_progress(partition_idx); + rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = + confirm_entry.confirmed_decree; + }); + // duplication_sync_rpc will finally be replied when confirmed points // of all partitions are stored. }); - } } std::shared_ptr @@ -908,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress( std::move(partition_path), [dup, partition_idx](const blob &value) { // value is confirmed_decree encoded in string. - if (value.size() == 0) { + if (value.empty()) { // not found dup->init_progress(partition_idx, invalid_decree); return; @@ -953,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id, app->max_replica_count, store_path, json); - if (nullptr == dup) { + if (!dup) { LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path); return; // fail fast } + if (!dup->is_invalid_status()) { app->duplications[dup->id] = dup; refresh_duplicating_no_lock(app); diff --git a/src/meta/meta_state_service_utils.h b/src/meta/meta_state_service_utils.h index 41099e898c..04b8086df7 100644 --- a/src/meta/meta_state_service_utils.h +++ b/src/meta/meta_state_service_utils.h @@ -20,11 +20,14 @@ #include #include #include +#include #include +#include "utils/blob.h" + namespace dsn { -class blob; class task_tracker; + namespace dist { class meta_state_service; } // namespace dist @@ -57,6 +60,11 @@ struct meta_storage void create_node(std::string &&node, blob &&value, std::function &&cb); + void create_node(const std::string &node, blob &&value, std::function &&cb) + { + create_node(std::string(node), std::move(value), std::move(cb)); + } + void delete_node_recursively(std::string &&node, std::function &&cb); void delete_node(std::string &&node, std::function &&cb); @@ -64,9 +72,19 @@ struct meta_storage /// Will fatal if node doesn't exists. void set_data(std::string &&node, blob &&value, std::function &&cb); + void set_data(const std::string &node, blob &&value, std::function &&cb) + { + set_data(std::string(node), std::move(value), std::move(cb)); + } + /// If node does not exist, cb will receive an empty blob. void get_data(std::string &&node, std::function &&cb); + void get_data(const std::string &node, std::function &&cb) + { + get_data(std::string(node), std::move(cb)); + } + /// \param cb: void (bool node_exists, const std::vector &children) /// `children` contains the name (not full path) of children nodes. /// `node_exists` indicates whether this node exists. diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index fe18206885..8e68113f72 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1128,9 +1128,10 @@ void server_state::create_app(dsn::message_ex *msg) request.options.partition_count, request.options.replica_count, duplicating - ? fmt::format("{}.{}", - request.options.envs[duplication_constants::kEnvMasterClusterKey], - request.app_name) + ? fmt::format("master_cluster_name={}, master_app_name={}", + master_cluster->second, + gutil::FindWithDefault(request.options.envs, + duplication_constants::kEnvMasterAppNameKey)) : "false"); auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) { @@ -1162,7 +1163,7 @@ void server_state::create_app(dsn::message_ex *msg) zauto_write_lock l(_lock); auto app = get_app(request.app_name); - if (nullptr != app) { + if (app) { configuration_create_app_response response; switch (app->status) { diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 5cfa4e12be..72bbcdbb15 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -30,9 +30,12 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" +#include "test_util/test_util.h" +#include "utils/flags.h" -namespace dsn { -namespace replication { +DSN_DECLARE_uint64(dup_progress_min_update_period_ms); + +namespace dsn::replication { class duplication_info_test : public testing::Test { @@ -48,9 +51,22 @@ class duplication_info_test : public testing::Test dup._status = status; } - static void test_alter_progress() + static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree) { + dup.init_progress(partition_idx, expected_decree); + + const auto &progress = dup._progress[partition_idx]; + ASSERT_EQ(invalid_decree, progress.last_committed_decree); + ASSERT_EQ(expected_decree, progress.volatile_decree); + ASSERT_EQ(expected_decree, progress.stored_decree); + ASSERT_FALSE(progress.is_altering); + ASSERT_EQ(0, progress.last_progress_update_ms); + ASSERT_TRUE(progress.is_inited); + ASSERT_FALSE(progress.checkpoint_prepared); + } + static void test_alter_progress() + { duplication_info dup(1, 1, kTestAppName, @@ -61,46 +77,91 @@ class duplication_info_test : public testing::Test kTestRemoteAppName, std::vector(), kTestMetaStorePath); - duplication_confirm_entry entry; - ASSERT_FALSE(dup.alter_progress(0, entry)); - dup.init_progress(0, invalid_decree); + // Failed to alter progres for partition 0 since it has not been initialized. + ASSERT_FALSE(dup.alter_progress(0, duplication_confirm_entry())); + + // Initialize progress for partition 0. + test_init_progress(dup, 0, invalid_decree); + + // Alter progress with specified decrees for partition 0. + duplication_confirm_entry entry; + entry.__set_last_committed_decree(8); entry.confirmed_decree = 5; entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + ASSERT_EQ(8, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // busy updating + // Busy updating. + entry.__set_last_committed_decree(15); entry.confirmed_decree = 10; entry.checkpoint_prepared = false; ASSERT_FALSE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + // last_committed_decree could be updated at any time. + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + // Persist progress for partition 0. dup.persist_progress(0); - ASSERT_EQ(dup._progress[0].stored_decree, 5); + + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(5, dup._progress[0].stored_decree); ASSERT_FALSE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // too frequent to update - dup.init_progress(1, invalid_decree); + // Initialize progress for partition 1. + test_init_progress(dup, 1, 5); + + // Alter progress for partition 1. ASSERT_TRUE(dup.alter_progress(1, entry)); + + ASSERT_EQ(15, dup._progress[1].last_committed_decree); + ASSERT_EQ(10, dup._progress[1].volatile_decree); + ASSERT_EQ(5, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_FALSE(dup._progress[1].checkpoint_prepared); + + // Persist progress for partition 1. dup.persist_progress(1); + // It is too frequent to alter progress. + PRESERVE_FLAG(dup_progress_min_update_period_ms); + FLAGS_dup_progress_min_update_period_ms = 10000; + entry.__set_last_committed_decree(25); + entry.confirmed_decree = 15; + entry.checkpoint_prepared = true; ASSERT_FALSE(dup.alter_progress(1, entry)); + ASSERT_EQ(25, dup._progress[1].last_committed_decree); + // volatile_decree would be updated successfully even if it is too frequent. + ASSERT_EQ(15, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_FALSE(dup._progress[1].is_altering); + // checkpoint_prepared would be updated successfully even if it is too frequent. + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); - dup._progress[1].last_progress_update_ms -= - duplication_info::PROGRESS_UPDATE_PERIOD_MS + 100; + // Reduce last update timestamp to make it infrequent. + dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; + entry.__set_last_committed_decree(26); + entry.confirmed_decree = 25; - entry.confirmed_decree = 15; - entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(1, entry)); + ASSERT_EQ(26, dup._progress[1].last_committed_decree); + ASSERT_EQ(25, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + + // Checkpoint are ready for both partition 0 and 1. ASSERT_TRUE(dup.all_checkpoint_has_prepared()); } @@ -128,8 +189,9 @@ class duplication_info_test : public testing::Test for (int i = 0; i < 4; i++) { dup.init_progress(i, invalid_decree); } + for (auto kv : dup_ent.progress) { - ASSERT_EQ(kv.second, invalid_decree); + ASSERT_EQ(invalid_decree, kv.second); } dup.start(); @@ -153,8 +215,8 @@ class duplication_info_test : public testing::Test dup.start(); dup.persist_status(); - ASSERT_EQ(dup._status, duplication_status::DS_PREPARE); - ASSERT_EQ(dup._next_status, duplication_status::DS_INIT); + ASSERT_EQ(duplication_status::DS_PREPARE, dup._status); + ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); ASSERT_FALSE(dup.is_altering()); } @@ -358,5 +420,4 @@ TEST_F(duplication_info_test, is_valid) ASSERT_TRUE(dup.is_invalid_status()); } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index a8a6668f9c..ec65a41fa0 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -677,11 +677,11 @@ TEST_F(meta_duplication_service_test, remove_dup) TEST_F(meta_duplication_service_test, duplication_sync) { const auto &server_nodes = ensure_enough_alive_nodes(3); - const std::string test_app = "test_app_0"; + const std::string test_app("test_app_0"); create_app(test_app); auto app = find_app(test_app); - // generate all primaries on node[0] + // Generate all primaries on node[0]. for (auto &pc : app->pcs) { pc.ballot = random32(1, 10000); SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, server_nodes[0]); @@ -696,6 +696,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) for (int i = 0; i < app->partition_count; i++) { dup->init_progress(i, invalid_decree); } + { std::map> confirm_list; @@ -712,20 +713,20 @@ TEST_F(meta_duplication_service_test, duplication_sync) confirm_list[gpid(app->app_id, 3)].push_back(ce); duplication_sync_response resp = duplication_sync(node, confirm_list); - ASSERT_EQ(resp.err, ERR_OK); - ASSERT_EQ(resp.dup_map.size(), 1); - ASSERT_EQ(resp.dup_map[app->app_id].size(), 1); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].dupid, dupid); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].status, duplication_status::DS_PREPARE); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].create_ts, dup->create_timestamp_ms); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote, dup->remote_cluster_name); - ASSERT_EQ(resp.dup_map[app->app_id][dupid].fail_mode, dup->fail_mode()); + ASSERT_EQ(ERR_OK, resp.err); + ASSERT_EQ(1, resp.dup_map.size()); + ASSERT_EQ(1, resp.dup_map[app->app_id].size()); + ASSERT_EQ(dupid, resp.dup_map[app->app_id][dupid].dupid); + ASSERT_EQ(duplication_status::DS_PREPARE, resp.dup_map[app->app_id][dupid].status); + ASSERT_EQ(dup->create_timestamp_ms, resp.dup_map[app->app_id][dupid].create_ts); + ASSERT_EQ(dup->remote_cluster_name, resp.dup_map[app->app_id][dupid].remote); + ASSERT_EQ(dup->fail_mode(), resp.dup_map[app->app_id][dupid].fail_mode); auto progress_map = resp.dup_map[app->app_id][dupid].progress; - ASSERT_EQ(progress_map.size(), 8); - ASSERT_EQ(progress_map[1], 5); - ASSERT_EQ(progress_map[2], 6); - ASSERT_EQ(progress_map[3], 7); + ASSERT_EQ(8, progress_map.size()); + ASSERT_EQ(5, progress_map[1]); + ASSERT_EQ(6, progress_map[2]); + ASSERT_EQ(7, progress_map[3]); // ensure no updated progresses will also be included in response for (int p = 4; p < 8; p++) { diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 2e1e61cc4d..724f164d1a 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -94,6 +94,7 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const entry.dupid = dup->id(); entry.confirmed_decree = progress.last_decree; entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared); + entry.__set_last_committed_decree(_replica->last_committed_decree()); updates.emplace_back(entry); } return updates;