Skip to content

Commit

Permalink
Add wait, wait_for, wait_until for all queues to wait to become empty (
Browse files Browse the repository at this point in the history
  • Loading branch information
herrcristi authored Dec 25, 2024
1 parent 078b900 commit 0485755
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 71 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ Signal exit when we no longer want to use the queue

`signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue

Wait for queue to become empty

`wait`, `wait_for`, `wait_until`

Use it like this

```
Expand Down Expand Up @@ -181,6 +185,10 @@ Signal exit when we no longer want to use the queue

`signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue

Wait for queue to become empty

`wait`, `wait_for`, `wait_until`

Use it like this

```
Expand Down Expand Up @@ -242,6 +250,10 @@ Signal exit when we no longer want to use the queue

`signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue

Wait for queue to become empty

`wait`, `wait_for`, `wait_until`

Use it like this

```
Expand Down Expand Up @@ -371,6 +383,10 @@ Signal exit when we no longer want to use worker threads

`signal_exit_when_done`

Wait for queue to become empty

`wait`, `wait_for`, `wait_until`

Use it like this

```
Expand Down
82 changes: 71 additions & 11 deletions include/base_wait_pop.h → include/base_queue_wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,32 @@ namespace small {
};

//
// base class for the wait_pop functions (parent caller must implement test_and_get function)
// base class for the wait_pop functions (parent caller must implement 'test_and_get' and 'size' functions)
//
template <typename T, typename ParentCallerT>
class base_wait_pop
class base_queue_wait
{
public:
using TimeClock = std::chrono::system_clock;
using TimeDuration = TimeClock::duration;
using TimePoint = std::chrono::time_point<TimeClock>;

//
// base_wait_pop
// base_queue_wait
//
explicit base_wait_pop(ParentCallerT &parent_caller)
explicit base_queue_wait(ParentCallerT &parent_caller)
: m_parent_caller(parent_caller)
{
}

base_wait_pop(const base_wait_pop &o) : base_wait_pop() { operator=(o); };
base_wait_pop(base_wait_pop &&o) noexcept : base_wait_pop() { operator=(std::move(o)); };
base_queue_wait(const base_queue_wait &o) : base_queue_wait() { operator=(o); };
base_queue_wait(base_queue_wait &&o) noexcept : base_queue_wait() { operator=(std::move(o)); };

inline base_wait_pop &operator=(const base_wait_pop &)
inline base_queue_wait &operator=(const base_queue_wait &)
{
return *this;
}
inline base_wait_pop &operator=(base_wait_pop &&) noexcept
inline base_queue_wait &operator=(base_queue_wait &&) noexcept
{
return *this;
}
Expand Down Expand Up @@ -248,21 +248,81 @@ namespace small {
}
}

//
// wait for queues to become empty
//
inline EnumLock wait()
{
signal_exit_when_done();

std::unique_lock l(m_lock);
m_queues_exit_condition.wait(l, [this]() -> bool {
return is_empty_queue();
});

return small::EnumLock::kExit;
}

template <typename _Rep, typename _Period>
inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime)
{
using __dur = typename std::chrono::system_clock::duration;
auto __reltime = std::chrono::duration_cast<__dur>(__rtime);
if (__reltime < __rtime) {
++__reltime;
}
return wait_until(std::chrono::system_clock::now() + __reltime);
}

template <typename _Clock, typename _Duration>
inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime)
{
signal_exit_when_done();

std::unique_lock l(m_lock);

auto status = m_queues_exit_condition.wait_until(l, __atime, [this]() -> bool {
return is_empty_queue();
});
if (!status) {
return small::EnumLock::kTimeout;
}

return small::EnumLock::kExit;
}

protected:
//
// call the parent
//
inline small::WaitFlags test_and_get(T *elem, TimePoint *time_wait_until)
{
*time_wait_until = TimeClock::now() + std::chrono::minutes(60);
return m_parent_caller.test_and_get(elem, time_wait_until);
auto ret = m_parent_caller.test_and_get(elem, time_wait_until);
auto is_exit_ret = ret == small::WaitFlags::kExit_Force || ret == small::WaitFlags::kExit_When_Done;

// notify condition if q is empty or exit force
if (is_exit_ret || is_empty_queue()) {
m_queues_exit_condition.notify_all();
}

return ret;
}

//
// is empty queue
//
bool is_empty_queue()
{
return is_exit_force() || m_parent_caller.size() == 0;
}

private:
//
// members
//
mutable small::base_lock m_lock; // locker
ParentCallerT &m_parent_caller; // parent caller
mutable small::base_lock m_lock; // locker
std::condition_variable_any m_queues_exit_condition; // condition to wait for queues to be empty when signal_exit_when_done
ParentCallerT &m_parent_caller; // parent caller
};
} // namespace small
50 changes: 9 additions & 41 deletions include/group_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ namespace small {
--m_total_count; // decrease total count
}

notify_if_empty(q);

return ret;
}

Expand All @@ -379,8 +377,6 @@ namespace small {
}
}

notify_if_empty(q);

return ret;
}

Expand All @@ -405,8 +401,6 @@ namespace small {
--m_total_count; // decrease total count
}

notify_if_empty(q);

return ret;
}

Expand All @@ -431,8 +425,6 @@ namespace small {
}
}

notify_if_empty(q);

return ret;
}

Expand All @@ -457,8 +449,6 @@ namespace small {
--m_total_count; // decrease total count
}

notify_if_empty(q);

return ret;
}

Expand All @@ -483,8 +473,6 @@ namespace small {
}
}

notify_if_empty(q);

return ret;
}

Expand All @@ -494,12 +482,8 @@ namespace small {
inline EnumLock wait()
{
signal_exit_when_done();

for (auto &[group, q] : m_group_queues) {
std::unique_lock l(q);
m_queues_exit_condition.wait(l, [_q = &q]() -> bool {
return _q->empty() || _q->is_exit_force();
});
q.wait();
}

return small::EnumLock::kExit;
Expand All @@ -522,12 +506,8 @@ namespace small {
signal_exit_when_done();

for (auto &[group, q] : m_group_queues) {
std::unique_lock l(q);

auto status = m_queues_exit_condition.wait_until(l, __atime, [_q = &q]() -> bool {
return _q->empty() || _q->is_exit_force();
});
if (!status) {
auto status = q.wait_until(__atime);
if (status == small::EnumLock::kTimeout) {
return small::EnumLock::kTimeout;
}
}
Expand All @@ -552,27 +532,15 @@ namespace small {
return it_q->second;
}

//
// notify condition if q is empty
//
inline void notify_if_empty(TypeQueue &q)
{
std::unique_lock l(q);
if (q.empty() || q.is_exit_force()) {
m_queues_exit_condition.notify_all();
}
}

private:
//
// members
//
mutable small::base_lock m_lock; // global locker
std::atomic<std::size_t> m_total_count{}; // count of all items
std::unordered_map<TypeT, GroupT> m_types_groups; // map to get the group for a type
small::config_prio_queue<PrioT> m_prio_config; // config for the priority queue
std::unordered_map<GroupT, TypeQueue> m_group_queues; // map of queues grouped by group
std::unordered_map<TypeT, TypeQueue *> m_types_queues; // optimize from group to type map of queues
std::condition_variable_any m_queues_exit_condition; // condition to wait for queues to be empty when signal_exit_when_done
mutable small::base_lock m_lock; // global locker
std::atomic<std::size_t> m_total_count{}; // count of all items
std::unordered_map<TypeT, GroupT> m_types_groups; // map to get the group for a type
small::config_prio_queue<PrioT> m_prio_config; // config for the priority queue
std::unordered_map<GroupT, TypeQueue> m_group_queues; // map of queues grouped by group
std::unordered_map<TypeT, TypeQueue *> m_types_queues; // optimize from group to type map of queues
};
} // namespace small
32 changes: 26 additions & 6 deletions include/lock_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <atomic>
#include <deque>

#include "base_wait_pop.h"
#include "base_queue_wait.h"

// a queue with events so we can wait for items to be available
//
Expand Down Expand Up @@ -211,14 +211,34 @@ namespace small {
return m_wait.wait_pop_until(__atime, vec_elems, max_count);
}

//
// wait for queue to become empty (and signal_exit_when_done)
//
inline EnumLock wait()
{
return m_wait.wait();
}

template <typename _Rep, typename _Period>
inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime)
{
return m_wait.wait_for(__rtime);
}

template <typename _Clock, typename _Duration>
inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime)
{
return m_wait.wait_until(__atime);
}

private:
using BaseWaitPop = small::base_wait_pop<T, small::lock_queue<T>>;
friend BaseWaitPop;
using BaseQueueWait = small::base_queue_wait<T, small::lock_queue<T>>;
friend BaseQueueWait;

//
// check for front element
//
inline small::WaitFlags test_and_get(T *elem, typename BaseWaitPop::TimePoint * /* time_wait_until */)
inline small::WaitFlags test_and_get(T *elem, typename BaseQueueWait::TimePoint * /* time_wait_until */)
{
if (is_exit_force()) {
return small::WaitFlags::kExit_Force;
Expand Down Expand Up @@ -247,7 +267,7 @@ namespace small {
//
// members
//
mutable BaseWaitPop m_wait{*this}; // implements locks & wait
std::deque<T> m_queue; // queue
mutable BaseQueueWait m_wait{*this}; // implements locks & wait
std::deque<T> m_queue; // queue
};
} // namespace small
Loading

0 comments on commit 0485755

Please sign in to comment.