Skip to content

Commit

Permalink
for safe async call_back.
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Jun 3, 2019
1 parent 3e11354 commit f2e4755
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions include/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ namespace rest_rpc {
callback_id_++;
callback_id_ |= (uint64_t(1) << 63);
cb_id = callback_id_;
callback_map_.emplace(cb_id, std::make_unique<call_t>(ios_, this, cb_id, std::move(cb), TIMEOUT));
auto call = std::make_shared<call_t>(ios_, std::move(cb), TIMEOUT);
call->start_timer();
callback_map_.emplace(cb_id, call);
}

msgpack_codec codec;
Expand Down Expand Up @@ -406,7 +408,7 @@ namespace rest_rpc {
void call_back(uint64_t req_id, const boost::system::error_code& ec, string_view data) {
auto cb_flag = req_id >> 63;
if (cb_flag) {
std::unique_ptr<call_t> cl = nullptr;
std::shared_ptr<call_t> cl = nullptr;
{
std::unique_lock<std::mutex> lock(cb_mtx_);
cl = std::move(callback_map_[req_id]);
Expand Down Expand Up @@ -467,16 +469,20 @@ namespace rest_rpc {
}
}

class call_t : asio::noncopyable {
class call_t : asio::noncopyable, public std::enable_shared_from_this<call_t> {
public:
call_t(asio::io_service& ios, rpc_client* client, uint64_t cb_id, std::function<void(boost::system::error_code, string_view)> cb, size_t timeout) : timer_(ios),
client_(client), cb_(std::move(cb)), timeout_(timeout){
call_t(asio::io_service& ios, std::function<void(boost::system::error_code, string_view)> cb, size_t timeout) : timer_(ios),
cb_(std::move(cb)), timeout_(timeout){
}

void start_timer() {
if (timeout_ == 0) {
return;
}

timer_.expires_from_now(std::chrono::milliseconds(timeout));
timer_.async_wait([this, cb_id](boost::system::error_code ec) {
timer_.expires_from_now(std::chrono::milliseconds(timeout_));
auto self = this->shared_from_this();
timer_.async_wait([this, self](boost::system::error_code ec) {
if (ec) {
return;
}
Expand Down Expand Up @@ -504,7 +510,6 @@ namespace rest_rpc {

private:
boost::asio::steady_timer timer_;
rpc_client* client_;
std::function<void(boost::system::error_code, string_view)> cb_;
size_t timeout_;
bool has_timeout_ = false;
Expand Down Expand Up @@ -534,7 +539,7 @@ namespace rest_rpc {
std::function<void(boost::system::error_code)> err_cb_;

std::unordered_map<std::uint64_t, std::shared_ptr<std::promise<req_result>>> future_map_;
std::unordered_map<std::uint64_t, std::unique_ptr<call_t>> callback_map_;
std::unordered_map<std::uint64_t, std::shared_ptr<call_t>> callback_map_;
std::mutex cb_mtx_;
uint64_t callback_id_ = 0;

Expand Down

0 comments on commit f2e4755

Please sign in to comment.