Skip to content

Commit

Permalink
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
Browse files Browse the repository at this point in the history
This is inefficient and involves unfair scheduling. The latter implies
possible bad surprises regarding waiting durations on busy nodes. Instead,
use AsioConditionVariable#Wait() if there are no free slots. It's notified
by others' CpuBoundWork#~CpuBoundWork() once finished.
  • Loading branch information
Al2Klimov committed Sep 27, 2024
1 parent b413287 commit 26ef66e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
51 changes: 41 additions & 10 deletions lib/base/io-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,55 @@

using namespace icinga;

CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
: m_Done(false)
{
auto& ioEngine (IoEngine::Get());
auto& sem (ioEngine.m_CpuBoundSemaphore);
std::unique_lock<std::mutex> lock (sem.Mutex);

for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
if (sem.FreeSlots) {
--sem.FreeSlots;
return;
}

auto cv (Shared<AsioConditionVariable>::Make(ioEngine.GetIoContext()));
bool gotSlot = false;
auto pos (sem.Waiting.insert(sem.Waiting.end(), IoEngine::CpuBoundQueueItem{&strand, cv, &gotSlot}));

lock.unlock();

try {
cv->Wait(yc);
} catch (...) {
std::unique_lock<std::mutex> lock (sem.Mutex);

if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
if (gotSlot) {
lock.unlock();
Done();
} else {
sem.Waiting.erase(pos);
}

break;
throw;
}
}

void CpuBoundWork::Done()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
auto& sem (IoEngine::Get().m_CpuBoundSemaphore);
std::unique_lock<std::mutex> lock (sem.Mutex);

if (sem.Waiting.empty()) {
++sem.FreeSlots;
} else {
auto next (sem.Waiting.front());

*next.GotSlot = true;
sem.Waiting.pop_front();
boost::asio::post(*next.Strand, [cv = std::move(next.CV)]() { cv->Set(); });
}

m_Done = true;
}
Expand All @@ -58,7 +85,11 @@ boost::asio::io_context& IoEngine::GetIoContext()
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
{
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);

{
std::unique_lock<std::mutex> lock (m_CpuBoundSemaphore.Mutex);
m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u;
}

for (auto& thread : m_Threads) {
thread = std::thread(&IoEngine::RunEventLoop, this);
Expand Down
57 changes: 37 additions & 20 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
#include "base/shared.hpp"
#include "base/shared-object.hpp"
#include <atomic>
#include <cstdint>
#include <exception>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
Expand All @@ -31,7 +35,7 @@ namespace icinga
class CpuBoundWork
{
public:
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&);
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand);
CpuBoundWork(const CpuBoundWork&) = delete;
CpuBoundWork(CpuBoundWork&&) = delete;
CpuBoundWork& operator=(const CpuBoundWork&) = delete;
Expand All @@ -48,6 +52,25 @@ class CpuBoundWork
bool m_Done;
};


/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);

void Set();
void Clear();
void Wait(boost::asio::yield_context yc);

private:
boost::asio::deadline_timer m_Timer;
};

/**
* Async I/O engine
*
Expand Down Expand Up @@ -110,6 +133,13 @@ class IoEngine
}

private:
struct CpuBoundQueueItem
{
boost::asio::io_context::strand* Strand;
Shared<AsioConditionVariable>::Ptr CV;
bool* GotSlot;
};

IoEngine();

void RunEventLoop();
Expand All @@ -120,29 +150,16 @@ class IoEngine
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore;
};

class TerminateIoThread : public std::exception
{
struct {
std::mutex Mutex;
uint_fast32_t FreeSlots;
std::list<CpuBoundQueueItem> Waiting;
} m_CpuBoundSemaphore;
};

/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
class TerminateIoThread : public std::exception
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);

void Set();
void Clear();
void Wait(boost::asio::yield_context yc);

private:
boost::asio::deadline_timer m_Timer;
};

/**
Expand Down

0 comments on commit 26ef66e

Please sign in to comment.