Skip to content

Commit

Permalink
feat(duplication): collect last committed decree from replica to meta…
Browse files Browse the repository at this point in the history
… for duplication
  • Loading branch information
empiredan committed Nov 28, 2024
1 parent f8de6da commit ba2c623
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 30 deletions.
3 changes: 3 additions & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ struct duplication_confirm_entry
1:i32 dupid;
2:i64 confirmed_decree;
3:optional bool checkpoint_prepared = false;

//
4:optional i64 last_committed_decree;
}

// This is an internal RPC sent from replica server to meta.
Expand Down
57 changes: 45 additions & 12 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
#include "runtime/api_layer1.h"
#include "utils/fmt_logging.h"

DSN_DEFINE_uint64(replication,
dup_progress_min_update_period_ms,
5000,
"The minimum period in milliseconds that progress of duplication is updated");

DSN_DEFINE_uint64(replication,
dup_progress_min_report_period_ms,
5 * 60 * 1000,
"The minimum period in milliseconds that progress of duplication is reported");

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -133,19 +143,27 @@ bool duplication_info::alter_progress(int partition_index,
return false;
}

if (confirm_entry.__isset.last_committed_decree) {
p.last_committed_decree = confirm_entry.last_committed_decree;
}

p.checkpoint_prepared = confirm_entry.checkpoint_prepared;
if (p.volatile_decree < confirm_entry.confirmed_decree) {
p.volatile_decree = confirm_entry.confirmed_decree;
}
if (p.volatile_decree != p.stored_decree) {
// progress update is not supposed to be too frequent.
if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) {

if (p.volatile_decree == p.stored_decree) {
return false;
}

// Progress update is not supposed to be too frequent.
if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) {
return false;
}

p.is_altering = true;
p.last_progress_update_ms = dsn_now_ms();
return true;
}
}
return false;
}

void duplication_info::persist_progress(int partition_index)
Expand All @@ -163,13 +181,26 @@ void duplication_info::persist_status()
zauto_write_lock l(_lock);

if (!_is_altering) {
LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store");
LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, "
"next_status={}, master_app_id={}, master_app_name={}, "
"follower_cluster_name={}, follower_app_name={}"
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id,
app_name,
remote_cluster_name,
remote_app_name);
return;
}
LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]",

LOG_INFO_PREFIX("change duplication status from {} to {} successfully: master_app_id={}, "
"master_app_name={}, follower_cluster_name={}, follower_app_name={}",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id);
app_id,
app_name,
remote_cluster_name,
remote_app_name);

_is_altering = false;
_status = _next_status;
Expand Down Expand Up @@ -197,11 +228,13 @@ blob duplication_info::to_json_blob() const

void duplication_info::report_progress_if_time_up()
{
// progress report is not supposed to be too frequent.
if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) {
// Progress report is not supposed to be too frequent.
if (dsn_now_ms() < _last_progress_report_ms + FLAGS_dup_progress_min_report_period_ms) {
return;
}

_last_progress_report_ms = dsn_now_ms();
LOG_INFO("duplication report: {}", to_string());
}
}

duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
Expand Down
7 changes: 4 additions & 3 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,14 @@ class duplication_info

mutable zrwlock_nr _lock;

static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000; // 5s
static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min

struct partition_progress
{
int64_t volatile_decree{invalid_decree};
int64_t stored_decree{invalid_decree};

//
int64_t last_committed_decree{invalid_decree};

bool is_altering{false};
uint64_t last_progress_update_ms{0};
bool is_inited{false};
Expand Down
23 changes: 13 additions & 10 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,31 +791,34 @@ void meta_duplication_service::do_update_partition_confirmed(
int32_t partition_idx,
const duplication_confirm_entry &confirm_entry)
{
if (dup->alter_progress(partition_idx, confirm_entry)) {
std::string path = get_partition_path(dup, std::to_string(partition_idx));
blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));
if (!dup->alter_progress(partition_idx, confirm_entry)) {
return;
}

_meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable {
if (data.length() == 0) {
const auto &path = get_partition_path(dup, std::to_string(partition_idx));
const auto &value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));

_meta_svc->get_meta_storage()->get_data(path, [=](const blob &data) mutable {
if (data.empty()) {
_meta_svc->get_meta_storage()->create_node(
std::string(path), std::move(value), [=]() mutable {
path, std::move(value), [=]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
} else {
return;
}

_meta_svc->get_meta_storage()->set_data(
std::string(path), std::move(value), [=]() mutable {
path, std::move(value), [=]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
}

// duplication_sync_rpc will finally be replied when confirmed points
// of all partitions are stored.
});
}
}

std::shared_ptr<duplication_info>
Expand Down
15 changes: 15 additions & 0 deletions src/meta/meta_state_service_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,31 @@ struct meta_storage

void create_node(std::string &&node, blob &&value, std::function<void()> &&cb);

void create_node(const std::string &node, blob &&value, std::function<void()> &&cb)
{
create_node(std::string(node), std::move(value), std::move(cb));
}

void delete_node_recursively(std::string &&node, std::function<void()> &&cb);

void delete_node(std::string &&node, std::function<void()> &&cb);

/// Will fatal if node doesn't exists.
void set_data(std::string &&node, blob &&value, std::function<void()> &&cb);

void set_data(const std::string &node, blob &&value, std::function<void()> &&cb);
{
set_data(std::string(node), std::move(value), std::move(cb));
}

/// If node does not exist, cb will receive an empty blob.
void get_data(std::string &&node, std::function<void(const blob &)> &&cb);

void get_data(const std::string &node, std::function<void(const blob &)> &&cb)
{
get_data(std::string(node), std::move(cb));
}

/// \param cb: void (bool node_exists, const std::vector<std::string> &children)
/// `children` contains the name (not full path) of children nodes.
/// `node_exists` indicates whether this node exists.
Expand Down
8 changes: 4 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,9 @@ void server_state::create_app(dsn::message_ex *msg)
request.options.partition_count,
request.options.replica_count,
duplicating
? fmt::format("{}.{}",
request.options.envs[duplication_constants::kEnvMasterClusterKey],
request.app_name)
? fmt::format("master_cluster_name={}, master_app_name={}",
master_cluster->second,
gutil::FindWithDefault(request.options.envs, duplication_constants::kEnvMasterAppNameKey))
: "false");

auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ void server_state::create_app(dsn::message_ex *msg)
zauto_write_lock l(_lock);

auto app = get_app(request.app_name);
if (nullptr != app) {
if (!app) {
configuration_create_app_response response;

switch (app->status) {
Expand Down
4 changes: 3 additions & 1 deletion src/meta/test/duplication_info_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "gtest/gtest.h"
#include "runtime/app_model.h"

DSN_DECLARE_uint64(dup_progress_min_update_period_ms)

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -95,7 +97,7 @@ class duplication_info_test : public testing::Test
ASSERT_FALSE(dup._progress[1].is_altering);

dup._progress[1].last_progress_update_ms -=
duplication_info::PROGRESS_UPDATE_PERIOD_MS + 100;
FLAGS_dup_progress_min_update_period_ms + 100;

entry.confirmed_decree = 15;
entry.checkpoint_prepared = true;
Expand Down
1 change: 1 addition & 0 deletions src/replica/duplication/replica_duplicator_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const
entry.dupid = dup->id();
entry.confirmed_decree = progress.last_decree;
entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared);
entry.__set_last_committed_decree(_replica->last_committed_decree());
updates.emplace_back(entry);
}
return updates;
Expand Down

0 comments on commit ba2c623

Please sign in to comment.