Skip to content

Commit

Permalink
fix(bulkload): Sometime ingest will hang when encountering write thro…
Browse files Browse the repository at this point in the history
…ttling (#2157)

Resolve issue #2156.

Due to the flag _is_bulk_load_ingestion allowing only one RPC_RRDB_RRDB_BULK_LOAD RPC call, the throttling_controller::DELAY recall of on_client_write cannot execute RPC_RRDB_RRDB_BULK_LOAD again.
  • Loading branch information
ruojieranyishen authored Dec 13, 2024
1 parent fb2ed45 commit eb88b27
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (static_cast<int32_t>(_primary_states.pc.hp_secondaries.size()) + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}

if (!ignore_throttling && throttle_write_request(request)) {
return;
}

if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status();
if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED &&
Expand All @@ -209,19 +219,9 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
_bulk_load_ingestion_start_time_ms = dsn_now_ms();
}

if (static_cast<int32_t>(_primary_states.pc.hp_secondaries.size()) + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}

if (!ignore_throttling && throttle_write_request(request)) {
return;
}

LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address);
auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this);
if (mu) {
if (mu != nullptr) {
init_prepare(mu, false);
}
}
Expand Down

0 comments on commit eb88b27

Please sign in to comment.