diff --git a/README.md b/README.md index 6263fe5..6514cef 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,10 @@ Signal exit when we no longer want to use the queue `signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue +Wait for queue to become empty + +`wait`, `wait_for`, `wait_until` + Use it like this ``` @@ -181,6 +185,10 @@ Signal exit when we no longer want to use the queue `signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue +Wait for queue to become empty + +`wait`, `wait_for`, `wait_until` + Use it like this ``` @@ -242,6 +250,10 @@ Signal exit when we no longer want to use the queue `signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue +Wait for queue to become empty + +`wait`, `wait_for`, `wait_until` + Use it like this ``` @@ -371,6 +383,10 @@ Signal exit when we no longer want to use worker threads `signal_exit_when_done` +Wait for queue to become empty + +`wait`, `wait_for`, `wait_until` + Use it like this ``` diff --git a/include/base_wait_pop.h b/include/base_queue_wait.h similarity index 75% rename from include/base_wait_pop.h rename to include/base_queue_wait.h index a80ef64..e4bfeb2 100644 --- a/include/base_wait_pop.h +++ b/include/base_queue_wait.h @@ -18,10 +18,10 @@ namespace small { }; // - // base class for the wait_pop functions (parent caller must implement test_and_get function) + // base class for the wait_pop functions (parent caller must implement 'test_and_get' and 'size' functions) // template - class base_wait_pop + class base_queue_wait { public: using TimeClock = std::chrono::system_clock; @@ -29,21 +29,21 @@ namespace small { using TimePoint = std::chrono::time_point; // - // base_wait_pop + // base_queue_wait // - explicit base_wait_pop(ParentCallerT &parent_caller) + explicit base_queue_wait(ParentCallerT &parent_caller) : m_parent_caller(parent_caller) { } - base_wait_pop(const base_wait_pop &o) : base_wait_pop() { operator=(o); }; - base_wait_pop(base_wait_pop &&o) noexcept : base_wait_pop() { operator=(std::move(o)); }; + base_queue_wait(const base_queue_wait &o) : base_queue_wait() { operator=(o); }; + base_queue_wait(base_queue_wait &&o) noexcept : base_queue_wait() { operator=(std::move(o)); }; - inline base_wait_pop &operator=(const base_wait_pop &) + inline base_queue_wait &operator=(const base_queue_wait &) { return *this; } - inline base_wait_pop &operator=(base_wait_pop &&) noexcept + inline base_queue_wait &operator=(base_queue_wait &&) noexcept { return *this; } @@ -248,6 +248,49 @@ namespace small { } } + // + // wait for queues to become empty + // + inline EnumLock wait() + { + signal_exit_when_done(); + + std::unique_lock l(m_lock); + m_queues_exit_condition.wait(l, [this]() -> bool { + return is_empty_queue(); + }); + + return small::EnumLock::kExit; + } + + template + inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime) + { + using __dur = typename std::chrono::system_clock::duration; + auto __reltime = std::chrono::duration_cast<__dur>(__rtime); + if (__reltime < __rtime) { + ++__reltime; + } + return wait_until(std::chrono::system_clock::now() + __reltime); + } + + template + inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime) + { + signal_exit_when_done(); + + std::unique_lock l(m_lock); + + auto status = m_queues_exit_condition.wait_until(l, __atime, [this]() -> bool { + return is_empty_queue(); + }); + if (!status) { + return small::EnumLock::kTimeout; + } + + return small::EnumLock::kExit; + } + protected: // // call the parent @@ -255,14 +298,31 @@ namespace small { inline small::WaitFlags test_and_get(T *elem, TimePoint *time_wait_until) { *time_wait_until = TimeClock::now() + std::chrono::minutes(60); - return m_parent_caller.test_and_get(elem, time_wait_until); + auto ret = m_parent_caller.test_and_get(elem, time_wait_until); + auto is_exit_ret = ret == small::WaitFlags::kExit_Force || ret == small::WaitFlags::kExit_When_Done; + + // notify condition if q is empty or exit force + if (is_exit_ret || is_empty_queue()) { + m_queues_exit_condition.notify_all(); + } + + return ret; + } + + // + // is empty queue + // + bool is_empty_queue() + { + return is_exit_force() || m_parent_caller.size() == 0; } private: // // members // - mutable small::base_lock m_lock; // locker - ParentCallerT &m_parent_caller; // parent caller + mutable small::base_lock m_lock; // locker + std::condition_variable_any m_queues_exit_condition; // condition to wait for queues to be empty when signal_exit_when_done + ParentCallerT &m_parent_caller; // parent caller }; } // namespace small diff --git a/include/group_queue.h b/include/group_queue.h index 07e1886..8138ab3 100644 --- a/include/group_queue.h +++ b/include/group_queue.h @@ -355,8 +355,6 @@ namespace small { --m_total_count; // decrease total count } - notify_if_empty(q); - return ret; } @@ -379,8 +377,6 @@ namespace small { } } - notify_if_empty(q); - return ret; } @@ -405,8 +401,6 @@ namespace small { --m_total_count; // decrease total count } - notify_if_empty(q); - return ret; } @@ -431,8 +425,6 @@ namespace small { } } - notify_if_empty(q); - return ret; } @@ -457,8 +449,6 @@ namespace small { --m_total_count; // decrease total count } - notify_if_empty(q); - return ret; } @@ -483,8 +473,6 @@ namespace small { } } - notify_if_empty(q); - return ret; } @@ -494,12 +482,8 @@ namespace small { inline EnumLock wait() { signal_exit_when_done(); - for (auto &[group, q] : m_group_queues) { - std::unique_lock l(q); - m_queues_exit_condition.wait(l, [_q = &q]() -> bool { - return _q->empty() || _q->is_exit_force(); - }); + q.wait(); } return small::EnumLock::kExit; @@ -522,12 +506,8 @@ namespace small { signal_exit_when_done(); for (auto &[group, q] : m_group_queues) { - std::unique_lock l(q); - - auto status = m_queues_exit_condition.wait_until(l, __atime, [_q = &q]() -> bool { - return _q->empty() || _q->is_exit_force(); - }); - if (!status) { + auto status = q.wait_until(__atime); + if (status == small::EnumLock::kTimeout) { return small::EnumLock::kTimeout; } } @@ -552,27 +532,15 @@ namespace small { return it_q->second; } - // - // notify condition if q is empty - // - inline void notify_if_empty(TypeQueue &q) - { - std::unique_lock l(q); - if (q.empty() || q.is_exit_force()) { - m_queues_exit_condition.notify_all(); - } - } - private: // // members // - mutable small::base_lock m_lock; // global locker - std::atomic m_total_count{}; // count of all items - std::unordered_map m_types_groups; // map to get the group for a type - small::config_prio_queue m_prio_config; // config for the priority queue - std::unordered_map m_group_queues; // map of queues grouped by group - std::unordered_map m_types_queues; // optimize from group to type map of queues - std::condition_variable_any m_queues_exit_condition; // condition to wait for queues to be empty when signal_exit_when_done + mutable small::base_lock m_lock; // global locker + std::atomic m_total_count{}; // count of all items + std::unordered_map m_types_groups; // map to get the group for a type + small::config_prio_queue m_prio_config; // config for the priority queue + std::unordered_map m_group_queues; // map of queues grouped by group + std::unordered_map m_types_queues; // optimize from group to type map of queues }; } // namespace small diff --git a/include/lock_queue.h b/include/lock_queue.h index b0153c8..67ffccf 100644 --- a/include/lock_queue.h +++ b/include/lock_queue.h @@ -3,7 +3,7 @@ #include #include -#include "base_wait_pop.h" +#include "base_queue_wait.h" // a queue with events so we can wait for items to be available // @@ -211,14 +211,34 @@ namespace small { return m_wait.wait_pop_until(__atime, vec_elems, max_count); } + // + // wait for queue to become empty (and signal_exit_when_done) + // + inline EnumLock wait() + { + return m_wait.wait(); + } + + template + inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime) + { + return m_wait.wait_for(__rtime); + } + + template + inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime) + { + return m_wait.wait_until(__atime); + } + private: - using BaseWaitPop = small::base_wait_pop>; - friend BaseWaitPop; + using BaseQueueWait = small::base_queue_wait>; + friend BaseQueueWait; // // check for front element // - inline small::WaitFlags test_and_get(T *elem, typename BaseWaitPop::TimePoint * /* time_wait_until */) + inline small::WaitFlags test_and_get(T *elem, typename BaseQueueWait::TimePoint * /* time_wait_until */) { if (is_exit_force()) { return small::WaitFlags::kExit_Force; @@ -247,7 +267,7 @@ namespace small { // // members // - mutable BaseWaitPop m_wait{*this}; // implements locks & wait - std::deque m_queue; // queue + mutable BaseQueueWait m_wait{*this}; // implements locks & wait + std::deque m_queue; // queue }; } // namespace small diff --git a/include/prio_queue.h b/include/prio_queue.h index 10d0fff..7262dc5 100644 --- a/include/prio_queue.h +++ b/include/prio_queue.h @@ -6,7 +6,7 @@ #include #include -#include "base_wait_pop.h" +#include "base_queue_wait.h" // a queue with prirorites elements having antistarvation mechanism // @@ -365,9 +365,29 @@ namespace small { return m_wait.wait_pop_until(__atime, vec_elems, max_count); } + // + // wait for queue to become empty (and signal_exit_when_done) + // + inline EnumLock wait() + { + return m_wait.wait(); + } + + template + inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime) + { + return m_wait.wait_for(__rtime); + } + + template + inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime) + { + return m_wait.wait_until(__atime); + } + private: - using BaseWaitPop = small::base_wait_pop>; - friend BaseWaitPop; + using BaseQueueWait = small::base_queue_wait>; + friend BaseQueueWait; struct Stats { @@ -406,7 +426,7 @@ namespace small { } // extract from queue - inline small::WaitFlags test_and_get(T *elem, typename BaseWaitPop::TimePoint * /* time_wait_until */) + inline small::WaitFlags test_and_get(T *elem, typename BaseQueueWait::TimePoint * /* time_wait_until */) { if (is_exit_force()) { return small::WaitFlags::kExit_Force; @@ -478,7 +498,7 @@ namespace small { // // members // - mutable BaseWaitPop m_wait{*this}; // implements locks & wait + mutable BaseQueueWait m_wait{*this}; // implements locks & wait config_prio_queue m_config; // config for priorities and ratio of executions std::unordered_map m_prio_stats; // keep credits per priority to implement the ratio (ex: 3:1) std::unordered_map> m_prio_queues; // map of queues based on priorities diff --git a/include/time_queue.h b/include/time_queue.h index a889690..bdaa05f 100644 --- a/include/time_queue.h +++ b/include/time_queue.h @@ -4,7 +4,7 @@ #include #include -#include "base_wait_pop.h" +#include "base_queue_wait.h" // a queue with time events so we can wait for items to be available at specific time moments // @@ -37,10 +37,10 @@ namespace small { class time_queue { public: - using BaseWaitPop = small::base_wait_pop>; - using TimeClock = typename BaseWaitPop::TimeClock; - using TimeDuration = typename BaseWaitPop::TimeDuration; - using TimePoint = typename BaseWaitPop::TimePoint; + using BaseQueueWait = small::base_queue_wait>; + using TimeClock = typename BaseQueueWait::TimeClock; + using TimeDuration = typename BaseQueueWait::TimeDuration; + using TimePoint = typename BaseQueueWait::TimePoint; // // time_queue @@ -278,6 +278,26 @@ namespace small { return m_wait.wait_pop_until(__atime, vec_elems, max_count); } + // + // wait for queue to become empty (and signal_exit_when_done) + // + inline EnumLock wait() + { + return m_wait.wait(); + } + + template + inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime) + { + return m_wait.wait_for(__rtime); + } + + template + inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime) + { + return m_wait.wait_until(__atime); + } + protected: // // compute if notification is needed @@ -314,9 +334,9 @@ namespace small { // // test and get // - friend BaseWaitPop; + friend BaseQueueWait; - inline small::WaitFlags test_and_get(T *elem, typename BaseWaitPop::TimePoint *time_wait_until) + inline small::WaitFlags test_and_get(T *elem, typename BaseQueueWait::TimePoint *time_wait_until) { if (is_exit_force()) { return small::WaitFlags::kExit_Force; @@ -353,7 +373,7 @@ namespace small { // // members // - mutable BaseWaitPop m_wait{*this}; // implements locks & wait + mutable BaseQueueWait m_wait{*this}; // implements locks & wait using PriorityQueueElemT = std::pair; struct CompPriorityQueueElemT diff --git a/tests/test_group_queue.cpp b/tests/test_group_queue.cpp index 0272cec..aab1cdb 100644 --- a/tests/test_group_queue.cpp +++ b/tests/test_group_queue.cpp @@ -107,6 +107,10 @@ namespace { ASSERT_EQ(q.size(), 3); + // wait to be empty + auto ret_wait = q.wait_for(std::chrono::milliseconds(100)); + ASSERT_EQ(ret_wait, small::EnumLock::kTimeout); + // pop std::pair value{}; @@ -129,6 +133,9 @@ namespace { // check size ASSERT_EQ(q.size(), 0); + + ret_wait = q.wait(); + ASSERT_EQ(ret_wait, small::EnumLock::kExit); } TEST_F(GroupQueueTest, Queue_Operations_Vec) diff --git a/tests/test_lock_queue.cpp b/tests/test_lock_queue.cpp index 2e5666b..5774037 100644 --- a/tests/test_lock_queue.cpp +++ b/tests/test_lock_queue.cpp @@ -79,6 +79,10 @@ namespace { ASSERT_EQ(r_push, 1); ASSERT_EQ(q.size(), 1); + // wait to be empty + auto ret_wait = q.wait_for(std::chrono::milliseconds(100)); + ASSERT_EQ(ret_wait, small::EnumLock::kTimeout); + // pop int value{}; auto ret = q.wait_pop_front(&value); @@ -87,6 +91,9 @@ namespace { // check size ASSERT_EQ(q.size(), 0); + + ret_wait = q.wait(); + ASSERT_EQ(ret_wait, small::EnumLock::kExit); } TEST_F(LockQueueTest, Queue_Operations_Vec) diff --git a/tests/test_prio_queue.cpp b/tests/test_prio_queue.cpp index d55e0fe..20fc1e6 100644 --- a/tests/test_prio_queue.cpp +++ b/tests/test_prio_queue.cpp @@ -91,6 +91,10 @@ namespace { ASSERT_EQ(r_push, 1); ASSERT_EQ(q.size(), 2); + // wait to be empty + auto ret_wait = q.wait_for(std::chrono::milliseconds(100)); + ASSERT_EQ(ret_wait, small::EnumLock::kTimeout); + // pop int value{}; auto ret = q.wait_pop_front(&value); @@ -105,6 +109,9 @@ namespace { // check size ASSERT_EQ(q.size(), 0); + ret_wait = q.wait(); + ASSERT_EQ(ret_wait, small::EnumLock::kExit); + // other q small::prio_queue q1{ {.priorities{{{1 /*prio*/, 3}}}}}; diff --git a/tests/test_time_queue.cpp b/tests/test_time_queue.cpp index 54c0c22..af610ef 100644 --- a/tests/test_time_queue.cpp +++ b/tests/test_time_queue.cpp @@ -80,6 +80,10 @@ namespace { q.push_delay_for(std::chrono::milliseconds(-1), 5); ASSERT_EQ(q.size(), 1); + // wait to be empty + auto ret_wait = q.wait_for(std::chrono::milliseconds(100)); + ASSERT_EQ(ret_wait, small::EnumLock::kTimeout); + // pop int value{}; auto ret = q.wait_pop(&value); @@ -91,6 +95,9 @@ namespace { // check size ASSERT_EQ(q.size(), 0); ASSERT_LE(elapsed, 300); + + ret_wait = q.wait(); + ASSERT_EQ(ret_wait, small::EnumLock::kExit); } TEST_F(TimeQueueTest, Queue_Operations_Vec)