Skip to content

Commit

Permalink
Wait for coroutines to stop on teardown()
Browse files Browse the repository at this point in the history
  • Loading branch information
dvsku committed May 23, 2024
1 parent f9ae6af commit edf88f5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
20 changes: 17 additions & 3 deletions include/libnetwrk/net/core/base_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,16 @@ namespace libnetwrk {

protected:
void teardown() {
if (m_connection)
m_connection->stop();

/*
Wait for all coroutines to stop
*/
wait_for_coroutines_to_stop();

if (this->io_context && !this->io_context->stopped())
this->io_context->stop();

if (m_connection && m_connection->is_connected())
m_connection->stop();

if (m_context_thread.joinable())
m_context_thread.join();
Expand Down Expand Up @@ -210,6 +215,15 @@ namespace libnetwrk {
thread.detach();
}

void wait_for_coroutines_to_stop() {
while (true) {
if (!m_connection) break;
if (m_connection->active_operations == 0) break;

std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

void ev_system_message(owned_message_t& msg) override final {
system_command command = static_cast<system_command>(msg.msg.command());

Expand Down
42 changes: 42 additions & 0 deletions include/libnetwrk/net/core/base_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <chrono>
#include <list>
#include <algorithm>

namespace libnetwrk {
template<typename Desc, typename Socket>
Expand Down Expand Up @@ -191,6 +192,16 @@ namespace libnetwrk {
if (m_gc_future.valid())
m_gc_future.wait();

/*
Close all connections and signal coroutines to stop
*/
stop_all_connections();

/*
Wait for all coroutines to stop
*/
wait_for_coroutines_to_stop();

if (this->io_context && !this->io_context->stopped())
this->io_context->stop();

Expand Down Expand Up @@ -301,6 +312,37 @@ namespace libnetwrk {
client->send(message);
}

void stop_all_connections() {
std::lock_guard<std::mutex> guard(m_connections_mutex);

for (auto& client : m_connections) {
if (!client) continue;

client->stop();
}
}

void wait_for_coroutines_to_stop() {
bool running;

while (true) {
{
std::lock_guard<std::mutex> guard(m_connections_mutex);

running = std::any_of(m_connections.begin(), m_connections.end(),
[](auto& client) {
return client && client->active_operations != 0;
}
);
}

if (!running)
break;

std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

asio::awaitable<void> co_gc() {
auto current_executor = co_await asio::this_coro::executor;
m_gc_timer = std::make_unique<timer_t>(current_executor, std::chrono::seconds(gc_freq_sec));
Expand Down

0 comments on commit edf88f5

Please sign in to comment.