-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathfiber-pool.c
385 lines (344 loc) · 14 KB
/
fiber-pool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
#include <inttypes.h> /* PRIu32 */
#include <stdio.h>
#include <stdlib.h>
#include "cilk-internal.h"
#include "debug.h"
#include "fiber-header.h"
#include "fiber.h"
#include "global.h"
#include "local.h"
#include "mutex.h"
// When the pool becomes full (empty), free (allocate) this fraction
// of the pool back to (from) parent / the OS.
#define BATCH_FRACTION 2
//=========================================================================
// Currently the fiber pools are organized into two-levels, like in Hoard
// --- per-worker private pool plus a global pool. The per-worker private
// pool are accessed by the owner worker only and thus do not require
// synchronization. The global pool may be accessed concurrently and thus
// require synchronization. Thus, the pool->lock is initialized to NULL for
// per-worker pools and cilk_mutex for the global one.
//
// The per-worker pools are initlaized with some free fibers preallocated
// already and the global one starts out empty. A worker typically acquires
// and free fibers from / to the its per-worker pool but only allocate / free
// batches from / to the global parent pool when necessary (i.e., buffer
// exceeds capacity and there are fibers needed to be freed, or need fibers
// but the buffer is empty.
//
// For now, we don't ever allocate fibers into the global one --- we only use
// the global one to load balance between per-worker pools.
//=========================================================================
//=========================================================
// Private helper functions for maintaining pool stats
//=========================================================
static void fiber_pool_stat_init(struct cilk_fiber_pool *pool) {
pool->stats.in_use = 0;
pool->stats.max_in_use = 0;
pool->stats.max_free = 0;
}
#define POOL_FMT "size %3u, %4d used %4d max used %4u max free"
static void fiber_pool_stat_print_worker(__cilkrts_worker *w, void *data) {
FILE *fp = (FILE *)data;
fprintf(fp, "[W%02" PRIu32 "] " POOL_FMT "\n", w->self,
w->l->fiber_pool.size, w->l->fiber_pool.stats.in_use,
w->l->fiber_pool.stats.max_in_use, w->l->fiber_pool.stats.max_free);
}
static void fiber_pool_stat_print(struct global_state *g) {
fprintf(stderr, "\nFIBER POOL STATS\n[G ] " POOL_FMT "\n",
g->fiber_pool.size, g->fiber_pool.stats.in_use,
g->fiber_pool.stats.max_in_use, g->fiber_pool.stats.max_free);
for_each_worker(g, &fiber_pool_stat_print_worker, stderr);
fprintf(stderr, "\n");
}
//=========================================================
// Private helper functions
//=========================================================
// forward decl
static void fiber_pool_allocate_batch(worker_id self,
struct cilk_fiber_pool *pool,
unsigned int num_to_allocate);
static void fiber_pool_free_batch(worker_id self,
struct cilk_fiber_pool *pool,
unsigned int num_to_free);
/* Helper function for initializing fiber pool */
static void fiber_pool_init(struct cilk_fiber_pool *pool, size_t stacksize,
unsigned int bufsize,
struct cilk_fiber_pool *parent, int is_shared) {
cilk_mutex_init(&pool->lock);
pool->mutex_owner = NO_WORKER;
pool->shared = is_shared;
pool->stack_size = stacksize;
pool->parent = parent;
pool->capacity = bufsize;
pool->size = 0;
pool->fibers = calloc(bufsize, sizeof(*pool->fibers));
}
/* Helper function for destroying fiber pool */
static void fiber_pool_destroy(struct cilk_fiber_pool *pool) {
CILK_ASSERT(pool->size == 0);
cilk_mutex_destroy(&pool->lock);
// pool->fibers might be NULL if the fiber pool was never actually
// initialized, e.g., because no Cilk code was run.
if (pool->fibers == NULL)
return;
free(pool->fibers);
pool->parent = NULL;
pool->fibers = NULL;
}
static inline void fiber_pool_assert_ownership(worker_id self,
struct cilk_fiber_pool *pool) {
if (pool->shared)
CILK_ASSERT(pool->mutex_owner == self);
}
static inline void fiber_pool_assert_alienation(worker_id self,
struct cilk_fiber_pool *pool) {
if (pool->shared)
CILK_ASSERT(pool->mutex_owner != self);
}
static inline void fiber_pool_lock(worker_id self,
struct cilk_fiber_pool *pool) {
if (pool->shared) {
fiber_pool_assert_alienation(self, pool);
cilk_mutex_lock(&pool->lock);
pool->mutex_owner = self;
}
}
static inline void fiber_pool_unlock(worker_id self,
struct cilk_fiber_pool *pool) {
if (pool->shared) {
fiber_pool_assert_ownership(self, pool);
pool->mutex_owner = NO_WORKER;
cilk_mutex_unlock(&pool->lock);
}
}
/**
* Increase the buffer size for the free fibers. If the current size is
* already larger than the new size, do nothing. Assume lock acquired upon
* entry.
*/
static void fiber_pool_increase_capacity(worker_id self,
struct cilk_fiber_pool *pool,
unsigned int new_size) {
fiber_pool_assert_ownership(self, pool);
if (pool->capacity < new_size) {
struct cilk_fiber **larger =
realloc(pool->fibers, new_size * sizeof(*pool->fibers));
if (!larger)
CILK_ABORT("out of fiber memory");
pool->fibers = larger;
pool->capacity = new_size;
}
}
/**
* Decrease the buffer size for the free fibers. If the current size is
* already smaller than the new size, do nothing. Assume lock acquired upon
* entry.
*/
__attribute__((unused)) // unused for now
static void
fiber_pool_decrease_capacity(worker_id self, struct cilk_fiber_pool *pool,
unsigned int new_size) {
fiber_pool_assert_ownership(self, pool);
if (pool->size > new_size) {
int diff = pool->size - new_size;
fiber_pool_free_batch(self, pool, diff);
CILK_ASSERT(pool->size == new_size);
}
if (pool->capacity > new_size) {
struct cilk_fiber **smaller = (struct cilk_fiber **)realloc(
pool->fibers, new_size * sizeof(struct cilk_fiber *));
if (smaller) {
pool->fibers = smaller;
pool->capacity = new_size;
}
}
}
/**
* Allocate num_to_allocate number of new fibers into the pool.
* We will first look into the parent pool, and if the parent pool does not
* have enough, we then get it from the system.
*/
static void fiber_pool_allocate_batch(worker_id self,
struct cilk_fiber_pool *pool,
const unsigned int batch_size) {
fiber_pool_assert_ownership(self, pool);
fiber_pool_increase_capacity(self, pool, batch_size + pool->size);
unsigned int from_parent = 0;
if (pool->parent) {
struct cilk_fiber_pool *parent = pool->parent;
fiber_pool_lock(self, parent);
from_parent = parent->size <= batch_size ? parent->size : batch_size;
for (unsigned int i = 0; i < from_parent; i++) {
pool->fibers[pool->size++] = parent->fibers[--parent->size];
}
// update parent pool stats before releasing the lock on it
parent->stats.in_use += from_parent;
if (parent->stats.in_use > parent->stats.max_in_use) {
parent->stats.max_in_use = parent->stats.in_use;
}
fiber_pool_unlock(self, parent);
}
if (batch_size > from_parent) { // if we need more still
for (unsigned int i = from_parent; i < batch_size; i++) {
pool->fibers[pool->size++] =
cilk_fiber_allocate(pool->stack_size);
}
}
if (pool->size > pool->stats.max_free) {
pool->stats.max_free = pool->size;
}
}
/**
* Free num_to_free fibers from this pool back to either the parent
* or the system.
*/
static void fiber_pool_free_batch(worker_id self,
struct cilk_fiber_pool *pool,
const unsigned int batch_size) {
fiber_pool_assert_ownership(self, pool);
CILK_ASSERT(batch_size <= pool->size);
unsigned int to_parent = 0;
if (pool->parent) { // first try to free into the parent
struct cilk_fiber_pool *parent = pool->parent;
fiber_pool_lock(self, parent);
to_parent = (batch_size <= (parent->capacity - parent->size))
? batch_size
: (parent->capacity - parent->size);
// free what we can within the capacity of the parent pool
for (unsigned int i = 0; i < to_parent; i++) {
parent->fibers[parent->size++] = pool->fibers[--pool->size];
}
CILK_ASSERT(parent->size <= parent->capacity);
parent->stats.in_use -= to_parent;
if (parent->size > parent->stats.max_free) {
parent->stats.max_free = parent->size;
}
fiber_pool_unlock(self, parent);
}
if ((batch_size - to_parent) > 0) { // still need to free more
for (unsigned int i = to_parent; i < batch_size; i++) {
struct cilk_fiber *fiber = pool->fibers[--pool->size];
cilk_fiber_deallocate(fiber);
}
}
}
//=========================================================
// Supported public functions
//=========================================================
/* Global fiber pool initialization: */
void cilk_fiber_pool_global_init(global_state *g) {
unsigned int bufsize = g->options.nproc * g->options.fiber_pool_cap;
struct cilk_fiber_pool *pool = &(g->fiber_pool);
fiber_pool_init(pool, g->options.stacksize, bufsize, NULL, 1 /*shared*/);
CILK_ASSERT(NULL != pool->fibers);
fiber_pool_stat_init(pool);
/* let's not preallocate for global fiber pool for now */
}
/* This does not yet destroy the fiber pool; merely collects
* stats and print them out (if FIBER_STATS is set)
*/
void cilk_fiber_pool_global_terminate(global_state *g) {
struct cilk_fiber_pool *pool = &g->fiber_pool;
cilk_mutex_lock(&pool->lock); /* probably not needed */
while (pool->size > 0) {
struct cilk_fiber *fiber = pool->fibers[--pool->size];
cilk_fiber_deallocate_global(g, fiber);
}
cilk_mutex_unlock(&pool->lock);
if (ALERT_ENABLED(FIBER_SUMMARY))
fiber_pool_stat_print(g);
}
/* Global fiber pool clean up. */
void cilk_fiber_pool_global_destroy(global_state *g) {
fiber_pool_destroy(&g->fiber_pool); // worker 0 should have freed everything
}
/**
* Per-worker fiber pool zero initialization. Initializes the fiber pool to a
* safe zero state, in case that worker is created by
* cilk_fiber_pool_per_worker_init() never gets called on that worker. Should
* initialize the fiber bool sufficiently for calls to
* cilk_fiber_pool_per_worker_terminate() and
* cilk_fiber_pool_per_worker_destroy() to succeed.
*/
void cilk_fiber_pool_per_worker_zero_init(__cilkrts_worker *w) {
struct cilk_fiber_pool *pool = &(w->l->fiber_pool);
pool->size = 0;
pool->fibers = NULL;
}
/**
* Per-worker fiber pool initialization: should be called per worker so
* so that fiber comes from the core on which the worker is running on.
*/
void cilk_fiber_pool_per_worker_init(__cilkrts_worker *w) {
global_state *g = w->g;
unsigned int bufsize = g->options.fiber_pool_cap;
struct cilk_fiber_pool *pool = &(w->l->fiber_pool);
fiber_pool_init(pool, g->options.stacksize, bufsize, &(g->fiber_pool),
0 /* private */);
CILK_ASSERT(NULL != pool->fibers);
CILK_ASSERT(g->fiber_pool.stack_size == pool->stack_size);
fiber_pool_stat_init(pool);
fiber_pool_allocate_batch(w->self, pool, bufsize / BATCH_FRACTION);
}
/* This does not yet destroy the fiber pool; merely collects
* stats and print them out (if FIBER_STATS is set)
*/
void cilk_fiber_pool_per_worker_terminate(__cilkrts_worker *w) {
struct cilk_fiber_pool *pool = &(w->l->fiber_pool);
while (pool->size > 0) {
unsigned index = --pool->size;
struct cilk_fiber *fiber = pool->fibers[index];
pool->fibers[index] = NULL;
cilk_fiber_deallocate(fiber);
}
}
/* Per-worker fiber pool clean up. */
void cilk_fiber_pool_per_worker_destroy(__cilkrts_worker *w) {
struct cilk_fiber_pool *pool = &(w->l->fiber_pool);
fiber_pool_destroy(pool);
}
/**
* Allocate a fiber from this pool; if this pool is empty,
* allocate a batch of fibers from the parent pool (or system).
*/
struct cilk_fiber *cilk_fiber_allocate_from_pool(__cilkrts_worker *w) {
struct cilk_fiber_pool *pool = &(w->l->fiber_pool);
if (pool->size == 0) {
fiber_pool_allocate_batch(w->self, pool,
pool->capacity / BATCH_FRACTION);
}
struct cilk_fiber *ret = pool->fibers[--pool->size];
pool->stats.in_use++;
if (pool->stats.in_use > pool->stats.max_in_use) {
pool->stats.max_in_use = pool->stats.in_use;
}
CILK_ASSERT(ret);
sanitizer_unpoison_fiber(ret);
init_fiber_header(ret);
return ret;
}
/**
* Free fiber_to_return into this pool; if this pool is full,
* free a batch of fibers back into the parent pool (or system).
*/
void cilk_fiber_deallocate_to_pool(__cilkrts_worker *w,
struct cilk_fiber *fiber_to_return) {
if (fiber_to_return)
sanitizer_poison_fiber(fiber_to_return);
struct cilk_fiber_pool *pool = &(w->l->fiber_pool);
if (pool->size == pool->capacity) {
fiber_pool_free_batch(w->self, pool, pool->capacity / BATCH_FRACTION);
CILK_ASSERT((pool->capacity - pool->size) >=
(pool->capacity / BATCH_FRACTION));
}
if (fiber_to_return) {
deinit_fiber_header(fiber_to_return);
pool->fibers[pool->size++] = fiber_to_return;
pool->stats.in_use--;
if (pool->size > pool->stats.max_free) {
pool->stats.max_free = pool->size;
}
fiber_to_return = NULL;
}
}