diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 8c2d9650776c0..c01ef534eb212 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -163,6 +163,7 @@ o2_add_library(Framework O2::Headers O2::MemoryResources O2::PCG + O2::X9 RapidJSON::RapidJSON Arrow::arrow_shared Microsoft.GSL::GSL diff --git a/Framework/Core/include/Framework/AsyncQueue.h b/Framework/Core/include/Framework/AsyncQueue.h index d214b9bfc09ba..0f25410179c1f 100644 --- a/Framework/Core/include/Framework/AsyncQueue.h +++ b/Framework/Core/include/Framework/AsyncQueue.h @@ -14,6 +14,10 @@ #include "Framework/TimesliceSlot.h" #include #include +#include + +typedef struct x9_inbox_internal x9_inbox; +typedef struct x9_node_internal x9_node; namespace o2::framework { @@ -89,6 +93,12 @@ struct AsyncQueue { std::vector prototypes; std::vector tasks; size_t iteration = 0; + + std::atomic first = true; + + // Inbox for the message queue used to append + // tasks to this queue. + x9_inbox* inbox = nullptr; AsyncQueue(); }; @@ -104,6 +114,8 @@ struct AsyncQueueHelpers { /// 3. only execute the highest (timeslice, debounce) value static void run(AsyncQueue& queue, TimesliceId oldestPossibleTimeslice); + // Flush tasks which were posted but not yet committed to the queue + static void flushPending(AsyncQueue& queue); /// Reset the queue to its initial state static void reset(AsyncQueue& queue); }; diff --git a/Framework/Core/src/AsyncQueue.cxx b/Framework/Core/src/AsyncQueue.cxx index e0e44da9cd19c..c9be88f096c0d 100644 --- a/Framework/Core/src/AsyncQueue.cxx +++ b/Framework/Core/src/AsyncQueue.cxx @@ -11,6 +11,7 @@ #include "Framework/AsyncQueue.h" #include "Framework/Signpost.h" +#include "x9.h" #include O2_DECLARE_DYNAMIC_LOG(async_queue); @@ -18,7 +19,9 @@ O2_DECLARE_DYNAMIC_LOG(async_queue); namespace o2::framework { AsyncQueue::AsyncQueue() + : inbox(x9_create_inbox(16, "async_queue", sizeof(AsyncTask))) { + this->inbox = x9_create_inbox(16, "async_queue", sizeof(AsyncTask)); } auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTaskId @@ -31,11 +34,39 @@ auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTa auto AsyncQueueHelpers::post(AsyncQueue& queue, AsyncTask const& task) -> void { - queue.tasks.push_back(task); + // Until we do not manage to write to the inbox, keep removing + // items from the queue if you are the first one which fails to + // write. + while (!x9_write_to_inbox(queue.inbox, sizeof(AsyncTask), &task)) { + AsyncQueueHelpers::flushPending(queue); + } +} + +auto AsyncQueueHelpers::flushPending(AsyncQueue& queue) -> void +{ + bool isFirst = true; + if (!std::atomic_compare_exchange_strong(&queue.first, &isFirst, false)) { + // Not the first, try again. + return; + } + // First thread which does not manage to write to the queue. + // Flush it a bit before we try again. + AsyncTask toFlush; + // This potentially stalls if the inserting tasks are faster to insert + // than we are to retrieve. We should probably have a cut-off + while (x9_read_from_inbox(queue.inbox, sizeof(AsyncTask), &toFlush)) { + queue.tasks.push_back(toFlush); + } + queue.first = true; } auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> void { + // We synchronize right before we run to get as many + // tasks as possible. Notice we might still miss some + // which will have to handled on a subsequent iteration. + AsyncQueueHelpers::flushPending(queue); + if (queue.tasks.empty()) { return; }