Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Nov 11, 2024
1 parent 0ff0330 commit 133b7cc
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 26 deletions.
104 changes: 80 additions & 24 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,19 @@ int server_state::count_staging_app()
configuration_create_app_response response; \
response.err = err_code

#define SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, err_code) \
#define INIT_CREATE_APP_RESPONSE_WITH_OK(response, app_id) \
configuration_create_app_response response; \
response.err = dsn::ERR_OK; \
response.appid = app_id;

#define FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, err_code) \
INIT_CREATE_APP_RESPONSE_WITH_ERR(response, err_code); \
REPLY_TO_CLIENT_AND_RETURN(msg, response)

#define SUCC_CREATE_APP_RESPONSE_AND_RETURN(msg, response, app_id) \
INIT_CREATE_APP_RESPONSE_WITH_OK(response, app_id); \
REPLY_TO_CLIENT_AND_RETURN(msg, response)

void server_state::transition_staging_state(std::shared_ptr<app_state> &app)
{
#define send_response(meta, msg, response_data) \
Expand Down Expand Up @@ -1123,16 +1132,16 @@ void server_state::create_app(dsn::message_ex *msg)
auto level = _meta_svc->get_function_level();
if (level <= meta_function_level::fl_freezed) {
LOG_ERROR("current meta function level is freezed, since there are too few alive nodes");
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_STATE_FREEZED);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_STATE_FREEZED);
}

if (request.options.partition_count <= 0 ||
!validate_target_max_replica_count(request.options.replica_count)) {
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_INVALID_PARAMETERS);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_INVALID_PARAMETERS);
}

if (!_app_env_validator.validate_app_envs(request.options.envs)) {
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_INVALID_PARAMETERS);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_INVALID_PARAMETERS);
}

zauto_write_lock l(_lock);
Expand All @@ -1145,7 +1154,7 @@ void server_state::create_app(dsn::message_ex *msg)
case app_status::AS_AVAILABLE:
if (!request.options.success_if_exist) {
if (duplicating) {
process_create_follower_app_status(msg, request, *app, master_cluster->second);
process_create_follower_app_status(msg, request, master_cluster->second, app);
return;
}

Expand Down Expand Up @@ -1197,22 +1206,22 @@ void server_state::create_app(dsn::message_ex *msg)
void server_state::process_create_follower_app_status(
message_ex *msg,
const configuration_create_app_request &request,
const app_state &app,
const std::string &req_master_cluster)
const std::string &req_master_cluster,
std::shared_ptr<app_state> &app)
{
const auto &req_status = request.options.envs.find(
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey);
if (req_status == request.options.envs.end()) {
// Still reply with ERR_APP_EXIST to the master cluster of old versions.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_APP_EXIST);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_APP_EXIST);
}

const auto &my_status =
app.envs.find(duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey);
if (my_status == app.envs.end()) {
app->envs.find(duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey);
if (my_status == app->envs.end()) {
// Since currently this table have been AS_AVAILABLE, it should have the env of
// creating status.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_INVALID_STATE);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_INVALID_STATE);
return;
}

Expand All @@ -1221,52 +1230,97 @@ void server_state::process_create_follower_app_status(
if (req_status->second ==
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating) {
const auto &my_master_cluster =
app.envs.find(duplication_constants::kDuplicationEnvMasterClusterKey);
if (my_master_cluster == app.envs.end() ||
app->envs.find(duplication_constants::kDuplicationEnvMasterClusterKey);
if (my_master_cluster == app->envs.end() ||
my_master_cluster->second != req_master_cluster) {
// The source cluster is not matched.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_APP_EXIST);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_APP_EXIST);
}

// It is idempotent for the repeated requests.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_OK);
SUCC_CREATE_APP_RESPONSE_AND_RETURN(msg, response, app->app_id);
}

if (req_status->second ==
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated) {
// Update the creating status both on the remote storage and local memory.

update_create_follower_app_status(
msg,
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating,
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated,
app);
return;
}

// Some undefined creating status from the source table.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_INVALID_STATE);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_INVALID_STATE);
}

if (my_status->second ==
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated) {
const auto &my_master_cluster =
app.envs.find(duplication_constants::kDuplicationEnvMasterClusterKey);
if (my_master_cluster == app.envs.end() ||
app->envs.find(duplication_constants::kDuplicationEnvMasterClusterKey);
if (my_master_cluster == app->envs.end() ||
my_master_cluster->second != req_master_cluster) {
// The source cluster is not matched.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_APP_EXIST);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_APP_EXIST);
}

if (req_status->second ==
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating ||
req_status->second ==
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated) {
// It is idempotent for the repeated requests.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_OK);
SUCC_CREATE_APP_RESPONSE_AND_RETURN(msg, response, app->app_id);
}

// Some undefined creating status from the source table.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_INVALID_STATE);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_INVALID_STATE);
}

// Some undefined creating status from the target table.
SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN(msg, response, ERR_INVALID_STATE);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ERR_INVALID_STATE);
}

void server_state::update_create_follower_app_status(message_ex *msg,
const std::string &old_status,
const std::string &new_status,
std::shared_ptr<app_state> &app)
{
auto ainfo = *(reinterpret_cast<app_info *>(app.get()));
ainfo.envs[duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey] = new_status;
auto app_path = get_app_path(*app);
do_update_app_info(
app_path, ainfo, [this, msg, old_status, new_status, app](error_code ec) mutable {
{
zauto_write_lock l(_lock);

if (ec != ERR_OK) {
LOG_ERROR(

"failed to update remote env of creating follower app status: "
"error_code={}, app_name={}, app_id={}, {}={} => {}",
ec,
app->app_name,
app->app_id,
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey,
old_status,
new_status);
FAIL_CREATE_APP_RESPONSE_AND_RETURN(msg, response, ec);
}

app->envs[duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey] =
new_status;
LOG_INFO("both remote and local env of creating follower app status have been "
"updated successfully: app_name={}, app_id={}, {}={} => {}",
app->app_name,
app->app_id,
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey,
old_status,
new_status);
SUCC_CREATE_APP_RESPONSE_AND_RETURN(msg, response, app->app_id);
}
});
}

void server_state::do_app_drop(std::shared_ptr<app_state> &app)
Expand Down Expand Up @@ -4143,7 +4197,9 @@ void server_state::recover_app_max_replica_count(std::shared_ptr<app_state> &app
&tracker);
}

#undef SEND_CREATE_APP_RESPONSE_TO_CLIENT_AND_RETURN
#undef SUCC_CREATE_APP_RESPONSE_AND_RETURN
#undef FAIL_CREATE_APP_RESPONSE_AND_RETURN
#undef INIT_CREATE_APP_RESPONSE_WITH_OK
#undef INIT_CREATE_APP_RESPONSE_WITH_ERR

#undef REPLY_TO_CLIENT_AND_RETURN
Expand Down
8 changes: 6 additions & 2 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,12 @@ class server_state

void process_create_follower_app_status(message_ex *msg,
const configuration_create_app_request &request,
const app_state &app,
const std::string &req_master_cluster);
const std::string &req_master_cluster,
std::shared_ptr<app_state> &app);
void update_create_follower_app_status(message_ex *msg,
const std::string &old_status,
const std::string &new_status,
std::shared_ptr<app_state> &app);

void do_app_create(std::shared_ptr<app_state> &app);
void do_app_drop(std::shared_ptr<app_state> &app);
Expand Down

0 comments on commit 133b7cc

Please sign in to comment.