Skip to content

Commit

Permalink
[feature] Concurrency safety in NetworkTree code has been improved
Browse files Browse the repository at this point in the history
  • Loading branch information
Neels99 committed Mar 14, 2024
1 parent ed83f97 commit 4e70e4e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
31 changes: 23 additions & 8 deletions libp2p/network_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <future>
#include <atomic>
#include <tuple>
#include <mutex>

#include "network_tree_node.h"

Expand All @@ -25,25 +26,26 @@ class ReconnectTask
std::promise<ReconnectTaskState> state;
std::atomic_bool alive;
public:
ReconnectTask() : alive(true)
ReconnectTask()
{
alive.store(true);
}

void ready()
{
if (alive)
if (alive.load())
{
state.set_value(ReconnectTaskState::ready);
alive = false;
alive.store(false);
}
}

void cancel()
{
if (alive)
if (alive.load())
{
state.set_value(ReconnectTaskState::cancel);
alive = false;
alive.store(false);
}
}

Expand All @@ -56,20 +58,24 @@ class ReconnectTask
class ReconnectProcess
{
std::future<void> process_future;

std::mutex mutex;
std::atomic_bool canceled{false};
public:
std::shared_ptr<ReconnectTask> task;

ReconnectProcess()
{
}

void start(std::future<void>& fut)
void start(std::future<void>&& fut)
{
process_future = std::move(fut);
}

std::shared_ptr<ReconnectTask> make_task()
{
std::lock_guard<std::mutex> lock(mutex);
task = std::make_shared<ReconnectTask>();
return task;
}
Expand All @@ -90,9 +96,18 @@ class ReconnectProcess

void cancel()
{
if (task)
task->cancel();
canceled.store(true);
{
std::lock_guard<std::mutex> lock(mutex);
if (task)
task->cancel();
}
if (process_future.valid())
process_future.wait();
}

bool is_canceled()
{
return canceled.load();
}
};
9 changes: 6 additions & 3 deletions libp2p/network_tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class NetworkTree
if (node->state == NetworkState::connected)
continue;

if (process->is_canceled())
return;

auto task = process->make_task();

// push run_node to ctx thread
boost::asio::post(*ctx, [&](){node->run_node(task);});

switch (process->wait())
{
case cancel:
Expand All @@ -60,8 +62,10 @@ class NetworkTree

void launch()
{
process = std::make_unique<ReconnectProcess>();

std::future<void> reconnect_process = std::async(&NetworkTree::run, this);
process->start(reconnect_process);
process->start(std::move(reconnect_process));
}

void restart(NetworkTreeNode* node)
Expand All @@ -73,7 +77,6 @@ class NetworkTree
// reset unique_ptr process
process.reset();
}
process = std::make_unique<ReconnectProcess>();

// calculate stop chain
std::vector<NetworkTreeNode*> stop_chain;
Expand Down

0 comments on commit 4e70e4e

Please sign in to comment.