-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathworker_coord.h
355 lines (319 loc) · 12.8 KB
/
worker_coord.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#ifndef _WORKER_COORD_H
#define _WORKER_COORD_H
// Routines for coordinating workers, specifically, putting workers to sleep and
// waking workers when execution enters and leaves cilkified regions.
#include <stdatomic.h>
#include <stdint.h>
#include <limits.h>
#ifdef __linux__
#include <errno.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#endif
#include "global.h"
#define USER_USE_FUTEX 1
#ifdef __linux__
#define USE_FUTEX USER_USE_FUTEX
#else
#define USE_FUTEX 0
#endif
#if USE_FUTEX
//=========================================================
// Primitive futex operations.
//=========================================================
#define errExit(msg) \
do { \
perror(msg); \
exit(EXIT_FAILURE); \
} while (false)
// Convenience wrapper for futex syscall.
static inline long futex(_Atomic uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, uint32_t *uaddr2,
uint32_t val3) {
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}
// Wait for the futex pointed to by `futexp` to become 1.
static inline void fwait(_Atomic uint32_t *futexp) {
// We don't worry about spurious wakeups here, since we ensure that all
// calls to fwait are contained in their own loops that effectively check
// for spurious wakeups.
long s = futex(futexp, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
if (__builtin_expect(s == -1 && errno != EAGAIN, false))
errExit("futex-FUTEX_WAIT");
}
// Set the futex pointed to by `futexp` to 1, and wake up 1 thread waiting on
// that futex.
static inline void fpost(_Atomic uint32_t *futexp) {
atomic_store_explicit(futexp, 1, memory_order_release);
long s = futex(futexp, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
if (s == -1)
errExit("futex-FUTEX_WAKE");
}
// Set the futex pointed to by `futexp` to 1, and wake up all threads waiting on
// that futex.
static inline void fbroadcast(_Atomic uint32_t *futexp) {
atomic_store_explicit(futexp, 1, memory_order_release);
long s = futex(futexp, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
if (s == -1)
errExit("futex-FUTEX_WAKE");
}
#endif
//=========================================================
// Common internal interface for managing execution of workers.
//=========================================================
__attribute__((always_inline)) static inline void busy_loop_pause() {
#ifdef __SSE__
__builtin_ia32_pause();
#endif
#ifdef __aarch64__
__builtin_arm_yield();
#endif
}
__attribute__((always_inline)) static inline void busy_pause(void) {
for (int i = 0; i < BUSY_PAUSE; ++i)
busy_loop_pause();
}
// Routines to update global flags to prevent workers from re-entering the
// work-stealing loop. Note that we don't wait for the workers to exit the
// work-stealing loop, since its more efficient to allow that to happen
// eventually.
// Routines to control the cilkified state.
static inline void set_cilkified(global_state *g) {
// Set g->cilkified = 1, indicating that the execution is now cilkified.
atomic_store_explicit(&g->cilkified, 1, memory_order_release);
#if USE_FUTEX
atomic_store_explicit(&g->cilkified_futex, 0, memory_order_release);
#endif
}
// Mark the computation as no longer cilkified and signal the thread that
// originally cilkified the execution.
static inline void signal_uncilkified(global_state *g) {
#if USE_FUTEX
atomic_store_explicit(&g->cilkified, 0, memory_order_release);
fpost(&g->cilkified_futex);
#else
pthread_mutex_lock(&(g->cilkified_lock));
atomic_store_explicit(&g->cilkified, 0, memory_order_release);
pthread_cond_signal(&g->cilkified_cond_var);
pthread_mutex_unlock(&(g->cilkified_lock));
#endif
}
// Wait on g->cilkified to be set to 0, indicating the end of the Cilkified
// region.
static inline void wait_while_cilkified(global_state *g) {
unsigned int fail = 0;
while (fail++ < BUSY_LOOP_SPIN) {
if (!atomic_load_explicit(&g->cilkified, memory_order_acquire)) {
return;
}
busy_pause();
}
#if USE_FUTEX
while (atomic_load_explicit(&g->cilkified, memory_order_acquire)) {
fwait(&g->cilkified_futex);
}
#else
// TODO: Convert pthread_mutex_lock, pthread_mutex_unlock, and
// pthread_cond_wait to cilk_* equivalents.
pthread_mutex_lock(&(g->cilkified_lock));
// There may be a *very unlikely* scenario where the Cilk computation has
// already been completed before even starting to wait. In that case, do
// not wait and continue directly. Also handle spurious wakeups with a
// 'while' instead of an 'if'.
while (atomic_load_explicit(&g->cilkified, memory_order_acquire)) {
pthread_cond_wait(&(g->cilkified_cond_var), &(g->cilkified_lock));
}
pthread_mutex_unlock(&(g->cilkified_lock));
#endif
}
//=========================================================
// Operations to disengage and reengage workers within the work-stealing loop.
//=========================================================
// Reset the shared variable for disengaging thief threads.
static inline void reset_disengaged_var(global_state *g) {
#if !USE_FUTEX
pthread_mutex_lock(&g->disengaged_lock);
#endif
atomic_store_explicit(&g->disengaged_thieves_futex, 0,
memory_order_release);
#if !USE_FUTEX
pthread_mutex_unlock(&g->disengaged_lock);
#endif
}
// Request to reengage `count` thief threads.
static inline void request_more_thieves(global_state *g, uint32_t count) {
CILK_ASSERT(count > 0);
// Don't allow this routine increment the futex beyond half the number of
// workers on the system. This bounds how many successful steals can
// possibly keep thieves engaged unnecessarily in the future, when there may
// not be as much parallelism.
int32_t max_requests = (int32_t)(g->nworkers / 2);
#if USE_FUTEX
// This step synchronizes with concurrent calls to request_more_thieves and
// concurrent calls to try_to_disengage_thief.
while (true) {
uint32_t disengaged_thieves_futex = atomic_load_explicit(
&g->disengaged_thieves_futex, memory_order_acquire);
int32_t max_to_wake = max_requests - disengaged_thieves_futex;
if (max_to_wake <= 0)
return;
uint64_t to_wake = max_to_wake < (int32_t)count ? max_to_wake : count;
if (atomic_compare_exchange_strong_explicit(
&g->disengaged_thieves_futex, &disengaged_thieves_futex,
disengaged_thieves_futex + to_wake, memory_order_release,
memory_order_relaxed)) {
// We successfully updated the futex. Wake the thief threads
// waiting on this futex.
long s = futex(&g->disengaged_thieves_futex, FUTEX_WAKE_PRIVATE,
to_wake, NULL, NULL, 0);
if (s == -1)
errExit("futex-FUTEX_WAKE");
return;
}
}
#else
pthread_mutex_lock(&g->disengaged_lock);
uint32_t disengaged_thieves_futex = atomic_load_explicit(
&g->disengaged_thieves_futex, memory_order_acquire);
int32_t max_to_wake = max_requests - disengaged_thieves_futex;
if (max_to_wake <= 0) {
pthread_mutex_unlock(&g->disengaged_lock);
return;
}
uint32_t to_wake = max_to_wake < (int32_t)count ? max_to_wake : count;
atomic_store_explicit(&g->disengaged_thieves_futex,
disengaged_thieves_futex + to_wake,
memory_order_release);
while (to_wake-- > 0) {
pthread_cond_signal(&g->disengaged_cond_var);
}
pthread_mutex_unlock(&g->disengaged_lock);
#endif
}
#if USE_FUTEX
static inline uint32_t thief_disengage_futex(_Atomic uint32_t *futexp) {
// This step synchronizes with calls to request_more_thieves.
while (true) {
// Decrement the futex when woken up. The loop and compare-exchange are
// designed to handle cases where multiple threads waiting on the futex
// were woken up and where there may be spurious wakeups.
uint32_t val;
while ((val = atomic_load_explicit(futexp, memory_order_relaxed)) > 0) {
if (atomic_compare_exchange_weak_explicit(futexp, &val, val - 1,
memory_order_release,
memory_order_relaxed)) {
return val;
}
busy_loop_pause();
}
// Wait on the futex.
long s = futex(futexp, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
if (__builtin_expect(s == -1 && errno != EAGAIN, false))
errExit("futex-FUTEX_WAIT");
}
}
#else
static inline uint32_t thief_disengage_cond_var(_Atomic uint32_t *count,
pthread_mutex_t *lock,
pthread_cond_t *cond_var) {
// This step synchronizes with calls to request_more_thieves.
pthread_mutex_lock(lock);
while (true) {
uint32_t val = atomic_load_explicit(count, memory_order_acquire);
if (val > 0) {
atomic_store_explicit(count, val - 1, memory_order_release);
pthread_mutex_unlock(lock);
return val;
}
pthread_cond_wait(cond_var, lock);
}
}
#endif
static inline uint32_t thief_disengage(global_state *g) {
#if USE_FUTEX
return thief_disengage_futex(&g->disengaged_thieves_futex);
#else
return thief_disengage_cond_var(&g->disengaged_thieves_futex,
&g->disengaged_lock,
&g->disengaged_cond_var);
#endif
}
// Signal to all disengaged thief threads to resume work-stealing.
static inline void wake_all_disengaged(global_state *g) {
#if USE_FUTEX
atomic_store_explicit(&g->disengaged_thieves_futex, INT_MAX,
memory_order_release);
long s = futex(&g->disengaged_thieves_futex, FUTEX_WAKE_PRIVATE, INT_MAX,
NULL, NULL, 0);
if (s == -1)
errExit("futex-FUTEX_WAKE");
#else
pthread_mutex_lock(&g->disengaged_lock);
atomic_store_explicit(&g->disengaged_thieves_futex, INT_MAX,
memory_order_release);
pthread_cond_broadcast(&g->disengaged_cond_var);
pthread_mutex_unlock(&g->disengaged_lock);
#endif
}
// Reset global state to make thief threads sleep for signal to start
// work-stealing again.
static inline void sleep_thieves(global_state *g) {
reset_disengaged_var(g);
}
// Called by a thief thread. Causes the thief thread to wait for a signal to
// start work-stealing.
static inline uint32_t thief_wait(global_state *g) {
return thief_disengage(g);
}
// Called by a thief thread. Check if the thief should start waiting for the
// start of a cilkified region. If a new cilkified region has been started
// already, update the global state to indicate that this worker is engaged in
// work stealing.
static inline bool thief_should_wait(global_state *g) {
_Atomic uint32_t *futexp = &g->disengaged_thieves_futex;
uint32_t val = atomic_load_explicit(futexp, memory_order_relaxed);
#if USE_FUTEX
while (val > 0) {
if (atomic_compare_exchange_weak_explicit(futexp, &val, val - 1,
memory_order_release,
memory_order_relaxed))
return false;
busy_loop_pause();
val = atomic_load_explicit(futexp, memory_order_relaxed);
}
return true;
#else
if (val == 0)
return true;
pthread_mutex_t *lock = &g->disengaged_lock;
pthread_mutex_lock(lock);
val = atomic_load_explicit(futexp, memory_order_relaxed);
if (val > 0) {
atomic_store_explicit(futexp, val - 1, memory_order_release);
pthread_mutex_unlock(lock);
return false;
}
pthread_mutex_unlock(lock);
return true;
#endif
}
// Signal the thief threads to start work-stealing (or terminate, if
// g->terminate == 1).
static inline void wake_thieves(global_state *g) {
#if USE_FUTEX
atomic_store_explicit(&g->disengaged_thieves_futex, g->nworkers - 1,
memory_order_release);
long s = futex(&g->disengaged_thieves_futex, FUTEX_WAKE_PRIVATE, INT_MAX,
NULL, NULL, 0);
if (s == -1)
errExit("futex-FUTEX_WAKE");
#else
pthread_mutex_lock(&g->disengaged_lock);
atomic_store_explicit(&g->disengaged_thieves_futex, g->nworkers - 1,
memory_order_release);
pthread_cond_broadcast(&g->disengaged_cond_var);
pthread_mutex_unlock(&g->disengaged_lock);
#endif
}
#endif /* _WORKER_COORD_H */