-
Notifications
You must be signed in to change notification settings - Fork 0
/
messagequeue.hpp
80 lines (69 loc) · 2.23 KB
/
messagequeue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* @file messagequeue.hpp
* @author Zizheng Guo (https://github.com/gzz2000)
* @brief A message queue implementation for inter-thread communication and timeouts.
*
* Copyright(c) 2020 Zizheng Guo.
* Distributed under the MIT License (http://opensource.org/licenses/MIT)
*
*/
#ifndef MESSAGEQUEUE_HPP
#define MESSAGEQUEUE_HPP
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <map>
typedef std::chrono::time_point<std::chrono::steady_clock> timer_index;
template<typename T>
class messagequeue {
std::queue<T> q;
mutable std::mutex mutex;
std::condition_variable cond;
std::map<timer_index, T> timers;
public:
inline T pop() {
std::unique_lock<std::mutex> lock(mutex);
while(q.empty()) {
if(timers.empty()) cond.wait(lock);
else {
auto status = cond.wait_until(lock, timers.begin()->first);
if(status == std::cv_status::timeout && !timers.empty()) {
T ret = std::move(timers.begin()->second);
timers.erase(timers.begin());
return ret;
}
}
}
T ret = std::move(q.front());
q.pop();
return ret;
}
inline void push(const T &msg) {
std::lock_guard<std::mutex> lock(mutex);
q.push(msg);
cond.notify_one();
}
inline timer_index setTimeout(const T &msg, int delay /* ms */) {
std::lock_guard<std::mutex> lock(mutex);
timer_index index = std::chrono::steady_clock::now() + std::chrono::milliseconds(delay);
while(timers.find(index) != timers.end()) index += std::chrono::microseconds(1);
timers[index] = msg;
cond.notify_one(); // most to reset the timer
return index;
}
inline void clearTimeout(timer_index index) {
std::lock_guard<std::mutex> lock(mutex);
auto it = timers.find(index);
if(it != timers.end()) {
timers.erase(it);
cond.notify_one();
}
}
// inline size_t size() const {
// std::lock_guard<std::mutex> lock(mutex);
// return q.size(); // + timers.size();
// }
// inline bool empty() const
};
#endif // MESSAGEQUEUE_HPP