-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaffinity_thread_pool_lockfree.h
383 lines (358 loc) · 13.1 KB
/
affinity_thread_pool_lockfree.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#pragma once
#include <thread>
#include <chrono>
#include <vector>
#include <queue>
#include <boost/lockfree/spsc_queue.hpp>
#include "thread_pool_pinning.h"
// thread pool with multiple queues of inputs to a function
// using a boost spsc lockfree queue
// each queue belongs to a specific worker thread
template<typename Arg>
class affinity_thread_pool_lockfree
{
public:
// definitions of the family of function that can be executed by the worker
typedef std::function<void(const Arg&, std::size_t)> FunctionType;
private:
// number of threads
const std::size_t m_number_of_workers;
// thread pool CPU pinning information
const thread_pool_pinning m_pinning;
// time for submitter to wait when queue full (nanoseconds)
const unsigned long m_submit_wait_time;
// time to wait when idle (nanoseconds)
const unsigned long m_wait_time;
// indication that the pool should not receive new Args
volatile bool m_done;
// single producer single consumer queue is used
// note that each worker has its own queue, so there is a single consumer
using queue_t = boost::lockfree::spsc_queue<Arg, boost::lockfree::fixed_sized<true>>;
// queues of work items, one for each worker
std::vector<std::unique_ptr<queue_t>> m_queues;
// following counters are used to estimate the queue size
// note that due to race conditions size may be off
// however, error does not grow over time
// note that this is done to prevent use of atomic counter for size of queue
std::vector<unsigned long long> m_queues_push;
std::vector<unsigned long long> m_queues_pop;
// the actual worker threads
std::vector<std::thread> m_workers;
// running counter incremented every time a submit
// is called. this is used so that when submitting
// in round robin into the queue, we start every time from a different queue
std::size_t m_running_counter;
// number of running threads
std::atomic<std::size_t> m_number_of_running_workers;
// definitions of the family of cleanup functions on Arg
typedef std::function<void(const Arg&)> CleanupFunctionType;
// cleanup function on Arg when FunctionType is not executed
// assumption is that if Arg dtor is not enough (e.g. Arg is a pointer)
// then FunctionType would do the cleanup
const CleanupFunctionType m_cleanup_function;
// Generic name assigned as a prefix to each worker thread
const std::string m_name;
// class wrapping around the function allow passing the worker id
// at the ctor instead of every call
class InternalFunctionType
{
private:
const FunctionType F;
const std::size_t m_worker_id;
public:
// ctor
InternalFunctionType(FunctionType f, std::size_t worker_id) : F(f), m_worker_id(worker_id) {}
// functor
void operator()(const Arg& arg) const
{
F(arg, m_worker_id);
}
};
// this is the function executed by the worker threads
// it pull items ot of the queue until signaled to stop
// it also passes the worker_id to F
void worker(FunctionType F, std::size_t worker_id)
{
assert(worker_id < m_number_of_workers);
assert(m_number_of_running_workers < m_number_of_workers);
++m_number_of_running_workers;
auto& q = m_queues[worker_id];
auto& pop_count = m_queues_pop[worker_id];
const InternalFunctionType InternalF(F, worker_id);
while (true)
{
#if BOOST_VERSION > 105300
const unsigned int consumed_args = q->consume_all(InternalF);
if (consumed_args == 0)
{
// if queue is empty and work is done - the thread exits
if (m_done)
{
return;
}
// wait until there is something in queue
std::this_thread::sleep_for(std::chrono::nanoseconds(m_wait_time));
}
else
{
pop_count += consumed_args;
}
#else
Arg arg;
if (!q->pop(arg))
{
// if queue is empty and work is done - the thread exits
if (m_done)
{
return;
}
// wait until there is something in queue
std::this_thread::sleep_for(std::chrono::nanoseconds(m_wait_time));
}
else
{
InternalF(arg);
++pop_count;
}
#endif
}
}
// by default we assume the dtor of Arg takes
// care of cleanup
static void default_cleanup_function(const Arg&)
{
//! no-op
}
public:
// indicating that work may be executed on any thread
static const int NoAffinity = -1;
// return estimated worker queue size
size_t queue_size(size_t worker_id) const
{
// A valid worker_id should return valid value
// Retrun 0 in case of worker_id > number of running workers,
// This is a valid usecase case for PDB table workerTable(in case of specific thread exclusion).
assert(worker_id < std::thread::hardware_concurrency());
if (worker_id < m_queues_push.size())
{
// due to race conditions it may be that pop > push
// in such a case we will return zero
// note that if: (push - pop) > 9223372036854775807 we will return zero here
// this is, however, impossible since the max queue size is 65535
const long long int current_size = m_queues_push[worker_id] - m_queues_pop[worker_id];
return std::max(current_size, 0LL);
}
return 0;
}
// return the pinning policy used
thread_pool_pinning::policy_t pinning_policy() const
{
return m_pinning.policy();
}
// don't allow copying
affinity_thread_pool_lockfree& operator=(const affinity_thread_pool_lockfree&) = delete;
affinity_thread_pool_lockfree(const affinity_thread_pool_lockfree&) = delete;
// destructor clean the threads but dont wait for them to finish
~affinity_thread_pool_lockfree()
{
// stop all threads without finishing the work
if (!m_done)
{
stop(false);
}
}
// constructor spawn the threads
affinity_thread_pool_lockfree(std::size_t number_of_workers,
std::size_t queue_size,
FunctionType F,
const thread_pool_pinning& pinning,
unsigned long submit_wait_time,
unsigned long wait_time,
CleanupFunctionType C = std::bind(&affinity_thread_pool_lockfree::default_cleanup_function, std::placeholders::_1),
const std::string& name = "") :
m_number_of_workers(number_of_workers),
m_pinning(pinning),
m_submit_wait_time(submit_wait_time),
m_wait_time(wait_time),
m_done(false),
m_queues_push(m_number_of_workers),
m_queues_pop(m_number_of_workers),
m_running_counter(0),
m_number_of_running_workers(0),
m_cleanup_function(C),
m_name(name)
{
// Create all queues first.
for (auto i = 0U; i < m_number_of_workers; ++i)
{
m_queues.emplace_back(new queue_t(queue_size));
}
// Reset queue size counters
for (auto i = 0U; i < m_number_of_workers; ++i)
{
m_queues_push[i] = 0;
m_queues_pop[i] = 0;
try
{
// start all worker threads
m_workers.push_back(std::thread(&affinity_thread_pool_lockfree::worker, this, F, i));
// Set the name of the thread based on worker ID
const std::string workerName = m_name + "Worker" + std::to_string(i);
// only 16 char names are allowed. so, first 15 char are passed plust null terminate
pthread_setname_np(m_workers[i].native_handle(), workerName.substr(0,15).c_str());
cpu_set_t cpu_set;
if (m_pinning.get_cpu_set(i, &cpu_set))
{
pthread_setaffinity_np(m_workers[i].native_handle(), sizeof(cpu_set_t), &cpu_set);
}
// if pinning is not possible we just dont set any affinity for the thread
}
catch (...)
{
// failed to start a thread
// make sure that we dont wait or all threads to start or we will wait forever
m_done = true;
return;
}
}
// make sure we dont start before all threads are up and running
// this would prevent a case where we join on thread that didnt start when stopping
while (m_number_of_running_workers < m_number_of_workers)
{
// let them start
std::this_thread::yield();
}
}
// submit new argument to be processed by the threads
// blocking call
void submit(const Arg& arg, int worker_id = NoAffinity)
{
assert(worker_id < static_cast<int>(m_number_of_workers) && worker_id >= NoAffinity);
if (m_done)
{
return;
}
else if (worker_id == NoAffinity)
{
// no affinity, find a free queue
bool pushed = false;
// dont always start from firts queue
std::size_t q_idx = (++m_running_counter)%m_number_of_workers;
while (!m_done && !pushed)
{
for (auto i = 0U; i < m_number_of_workers; ++i)
{
if (m_queues[q_idx]->push(arg))
{
// increment the queue size on successful push
++m_queues_push[q_idx];
pushed = true;
break;
}
// try the next queue
q_idx = (q_idx+1)%m_number_of_workers;
}
// wait until queue has space
std::this_thread::sleep_for(std::chrono::nanoseconds(m_submit_wait_time));
}
}
else
{
// has affinity, try using a specific worker
while (!m_queues[worker_id]->push(arg))
{
// queue is full, wait until queue has space
std::this_thread::sleep_for(std::chrono::nanoseconds(m_submit_wait_time));
}
// increment the queue size on successful push
++m_queues_push[worker_id];
}
}
// submit new argument to be processed by the threads if queue has space
// non-blocking call
bool try_submit(const Arg& arg, int worker_id = NoAffinity)
{
assert(worker_id < static_cast<int>(m_number_of_workers) && worker_id >= NoAffinity);
if (m_done)
{
return false;
}
else if (worker_id == NoAffinity)
{
// no affinity, find a free queue
bool pushed = false;
// dont always start from firts queue
std::size_t q_idx = (++m_running_counter)%m_number_of_workers;
for (auto i = 0U; i < m_number_of_workers; ++i)
{
if (m_queues[q_idx]->push(arg))
{
// increment the queue size on successful push
++m_queues_push[q_idx];
pushed = true;
break;
}
// try the next queue
q_idx = (q_idx+1)%m_number_of_workers;
}
return pushed;
}
else
{
// has affinity, try using a specific worker
if (m_queues[worker_id]->push(arg))
{
// increment the queue size on successful push
++m_queues_push[worker_id];
return true;
}
return false;
}
}
// stop all threads, may or may not wait to finish
void stop(bool wait)
{
// no need to call twice
if (m_done)
{
return;
}
// dont allow new submitions
m_done = true;
if (!wait)
{
// drain the queues
for (auto& q : m_queues)
{
#if BOOST_VERSION > 105300
q->consume_all(m_cleanup_function);
#else
Arg arg;
while(q->pop(arg))
{
m_cleanup_function(arg);
}
#endif
}
// Reset all queue push/pop counters to zero
for (auto i = 0U; i < m_number_of_workers; ++i)
{
m_queues_push[i] = 0;
m_queues_pop[i] = 0;
}
}
for (auto& worker : m_workers)
{
// join on threads until they actually finish
try
{
worker.join();
}
catch (...)
{
// could happen if F is deadlocked
// not much we can do here
}
}
}
};