Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run cilkifying closure on worker 0 when synced #33

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions runtime/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ CHEETAH_INTERNAL extern const char *const __cilkrts_assertion_failed;
: cilkrts_bug("%s: %d: cilk_assertion failed: %s (%p) == %s (%p)", \
__FILE__, __LINE__, #P1, _t1, #P2, _t2);})

#define CILK_ASSERT_INTEGER_EQUAL(I1, I2) \
({ long _t1 = (I1), _t2 = (I2); __builtin_expect(_t1 == _t2, 1) \
? (void)0 \
: cilkrts_bug("%s: %d: cilk_assertion failed: %s (%ld) == %s (%ld)", \
__FILE__, __LINE__, #I1, _t1, #I2, _t2);})

#define CILK_ASSERT_INDEX_ZERO(LEFT, I, RIGHT, FMT) \
(__builtin_expect(!(LEFT[I] RIGHT), 1) \
? (void)0 \
Expand Down
2 changes: 2 additions & 0 deletions runtime/global.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ global_state *global_state_init(int argc, char *argv[]) {
atomic_store_explicit(&g->cilkified, 0, memory_order_relaxed);
atomic_store_explicit(&g->disengaged_sentinel, 0, memory_order_relaxed);

g->activate_boss = false;

g->terminate = false;

g->worker_args =
Expand Down
7 changes: 5 additions & 2 deletions runtime/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ struct global_state {

// These fields are accessed exclusively by the boss thread.

jmpbuf boss_ctx __attribute__((aligned(CILK_CACHE_LINE)));
void *orig_rsp;
void *orig_rsp __attribute__((aligned(CILK_CACHE_LINE)));
bool workers_started;

// These fields are shared between the boss thread and a couple workers.
Expand All @@ -75,6 +74,10 @@ struct global_state {
// optimization would improve performance.
_Atomic uint32_t cilkified_futex __attribute__((aligned(CILK_CACHE_LINE)));
atomic_bool cilkified;
// Set to true by any worker to signal that the cilkifying function
// needs to run on the original worker. The cilkifying closure should
// be locked when this is set.
bool activate_boss;

pthread_mutex_t cilkified_lock;
pthread_cond_t cilkified_cond_var;
Expand Down
101 changes: 20 additions & 81 deletions runtime/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,44 +441,6 @@ static void __cilkrts_stop_workers(global_state *g) {
g->workers_started = false;
}

// Block until signaled the Cilkified region is done. Executed by the Cilkfying
// thread.
static inline void wait_until_cilk_done(global_state *g) {
wait_while_cilkified(g);
}

// Helper method to make the boss thread wait for the cilkified region
// to complete.
static inline __attribute__((noinline)) void boss_wait_helper(void) {
// The setjmp/longjmp to and from user code can invalidate the
// function arguments and local variables in this function. Get
// fresh copies of these arguments from the runtime's global
// state.
global_state *g = __cilkrts_tls_worker->g;
__cilkrts_stack_frame *sf = g->root_closure->frame;
CILK_BOSS_START_TIMING(g);

// Wait until the cilkified region is done executing.
wait_until_cilk_done(g);

__cilkrts_need_to_cilkify = true;

// At this point, some Cilk worker must have completed the
// Cilkified region and executed uncilkify at the end of the Cilk
// function. The longjmp will therefore jump to the end of the
// Cilk function. We need only restore the stack pointer to its
// original value on the Cilkifying thread's stack.

CILK_BOSS_STOP_TIMING(g);

// Restore the boss's original rsp, so the boss completes the Cilk
// function on its original stack.
SP(sf) = g->orig_rsp;
sysdep_restore_fp_state(sf);
sanitizer_start_switch_fiber(NULL);
__builtin_longjmp(sf->ctx, 1);
}

// Setup runtime structures to start a new Cilkified region. Executed by the
// Cilkifying thread in cilkify().
void __cilkrts_internal_invoke_cilkified_root(__cilkrts_stack_frame *sf) {
Expand Down Expand Up @@ -564,28 +526,21 @@ void __cilkrts_internal_invoke_cilkified_root(__cilkrts_stack_frame *sf) {
__cilkrts_start_workers(g);
}

if (__builtin_setjmp(g->boss_ctx) == 0) {
CILK_SWITCH_TIMING(w, INTERVAL_CILKIFY_ENTER, INTERVAL_SCHED);
do_what_it_says_boss(w, root_closure);
} else {
// The stack on which
// __cilkrts_internal_invoke_cilkified_root() was called may
// be corrupted at this point, so we call this helper method,
// marked noinline, to ensure the compiler does not try to use
// any data from the stack.
boss_wait_helper();
}
// XXX Temporary
CILK_SWITCH_TIMING(w, INTERVAL_CILKIFY_ENTER, INTERVAL_SCHED);
do_what_it_says_boss(w, root_closure);
}

// Finish the execution of a Cilkified region. Executed by a worker in g.
// Finish the execution of a Cilkified region. Executed by the boss worker.
void __cilkrts_internal_exit_cilkified_root(global_state *g,
__cilkrts_stack_frame *sf) {
__cilkrts_worker *w = __cilkrts_get_tls_worker();
CILK_ASSERT(w->l->state == WORKER_RUN);
CILK_SWITCH_TIMING(w, INTERVAL_WORK, INTERVAL_CILKIFY_EXIT);

worker_id self = w->self;
const bool is_boss = (0 == self);
CILK_ASSERT(w->self == 0);

worker_id self = 0;
ReadyDeque *deques = g->deques;

// Mark the computation as done. Also "sleep" the workers: update global
Expand All @@ -596,16 +551,6 @@ void __cilkrts_internal_exit_cilkified_root(global_state *g,
atomic_store_explicit(&g->done, 1, memory_order_release);
/* wake_all_disengaged(g); */

if (!is_boss) {
w->l->exiting = true;
__cilkrts_worker **workers = g->workers;
__cilkrts_worker *w0 = workers[0];
w0->hyper_table = w->hyper_table;
w->hyper_table = NULL;
w0->extension = w->extension;
w->extension = NULL;
}

// Clear this worker's deque. Nobody can successfully steal from this deque
// at this point, because head == tail, but we still want any subsequent
// Cilkified region to start with an empty deque. We go ahead and grab the
Expand All @@ -625,25 +570,19 @@ void __cilkrts_internal_exit_cilkified_root(global_state *g,
sf->flags = 0;

CILK_STOP_TIMING(w, INTERVAL_CILKIFY_EXIT);
if (is_boss) {
// We finished the computation on the boss thread. No need to jump to
// the runtime in this case; just return normally.
local_state *l = w->l;
atomic_store_explicit(&g->cilkified, 0, memory_order_relaxed);
l->state = WORKER_IDLE;
__cilkrts_need_to_cilkify = true;

// Restore the boss's original rsp, so the boss completes the Cilk
// function on its original stack.
SP(sf) = g->orig_rsp;
sysdep_restore_fp_state(sf);
sanitizer_start_switch_fiber(NULL);
__builtin_longjmp(sf->ctx, 1);
} else {
// done; go back to runtime
CILK_START_TIMING(w, INTERVAL_WORK);
longjmp_to_runtime(w);
}
// We finished the computation on the boss thread. No need to jump to
// the runtime in this case; just return normally.
local_state *l = w->l;
atomic_store_explicit(&g->cilkified, 0, memory_order_relaxed);
l->state = WORKER_IDLE;
__cilkrts_need_to_cilkify = true;

// Restore the boss's original rsp, so the boss completes the Cilk
// function on its original stack.
SP(sf) = g->orig_rsp;
sysdep_restore_fp_state(sf);
sanitizer_start_switch_fiber(NULL);
__builtin_longjmp(sf->ctx, 1);
}

static void global_state_terminate(global_state *g) {
Expand Down
97 changes: 72 additions & 25 deletions runtime/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ static void setup_for_sync(__cilkrts_worker *w, worker_id self, Closure *t) {
t->orig_rsp = NULL; // unset once we have sync-ed
}

static void resume_boss(__cilkrts_worker *w, worker_id self, Closure *t) {
// TODO: This should not be on any worker's deque
Closure_lock(self, t);
CILK_ASSERT_INTEGER_EQUAL(t->status, CLOSURE_SUSPENDED);
CILK_ASSERT(!Closure_has_children(t));
setup_for_sync(w, self, t);
Closure_set_status(t, CLOSURE_RUNNING);
Closure_unlock(self, t);
}

// ==============================================
// TLS related functions
// ==============================================
Expand Down Expand Up @@ -295,31 +305,44 @@ static Closure *provably_good_steal_maybe(__cilkrts_worker *const w,

Closure_assert_ownership(self, parent);
local_state *l = w->l;
global_state *g = w->g;
// cilkrts_alert(STEAL, "(provably_good_steal_maybe) cl %p",
// (void *)parent);
CILK_ASSERT(!l->provably_good_steal);

if (!Closure_has_children(parent) && parent->status == CLOSURE_SUSPENDED) {
// cilkrts_alert(STEAL | ALERT_SYNC,
// "(provably_good_steal_maybe) completing a sync");
if (Closure_has_children(parent))
return NULL;

CILK_ASSERT(parent->frame != NULL);
if (parent->status != CLOSURE_SUSPENDED)
return NULL;

/* do a provably-good steal; this is *really* simple */
l->provably_good_steal = true;
/* Only the cilkifying worker can run the cilkifying frame synced. */
if (parent == g->root_closure && w->self != 0) {
__cilkrts_stack_frame *sf = parent->frame;
CILK_ASSERT(sf);
if (sf->flags & CILK_FRAME_LAST) {
g->activate_boss = true;
return NULL;
}
}

setup_for_sync(w, self, parent);
CILK_ASSERT(parent->owner_ready_deque == NO_WORKER);
Closure_make_ready(parent);
// cilkrts_alert(STEAL | ALERT_SYNC,
// "(provably_good_steal_maybe) completing a sync");

cilkrts_alert(STEAL | ALERT_SYNC,
"(provably_good_steal_maybe) returned %p",
(void *)parent);
CILK_ASSERT(parent->frame != NULL);

return parent;
}
/* do a provably-good steal; this is *really* simple */
CILK_ASSERT(!l->provably_good_steal);
l->provably_good_steal = true;

setup_for_sync(w, self, parent);
CILK_ASSERT(parent->owner_ready_deque == NO_WORKER);
Closure_make_ready(parent);

cilkrts_alert(STEAL | ALERT_SYNC,
"(provably_good_steal_maybe) returned %p",
(void *)parent);

return NULL;
return parent;
}

/***
Expand Down Expand Up @@ -1224,7 +1247,8 @@ int Cilk_sync(__cilkrts_worker *const w, __cilkrts_stack_frame *frame) {
int res = SYNC_READY;

//----- EVENT_CILK_SYNC
ReadyDeque *deques = w->g->deques;
global_state *g = w->g;
ReadyDeque *deques = g->deques;
worker_id self = w->self;

deque_lock_self(deques, self);
Expand All @@ -1246,6 +1270,20 @@ int Cilk_sync(__cilkrts_worker *const w, __cilkrts_stack_frame *frame) {
if (Closure_has_children(t)) {
cilkrts_alert(SYNC, "(Cilk_sync) Closure %p has outstanding children",
(void *)t);
res = SYNC_NOT_READY;
} else if (self != 0 && t == g->root_closure && (t->frame->flags & CILK_FRAME_LAST)) {
cilkrts_alert(SYNC, "(Cilk_sync) Closure %p needs to run on boss",
(void *)t);
g->activate_boss = true;
res = SYNC_NOT_READY;
} else {
cilkrts_alert(SYNC, "(Cilk_sync) closure %p sync successfully",
(void *)t);
res = SYNC_READY;
}

if (res == SYNC_NOT_READY) {
// XXX not in the root closure case?
if (t->fiber) {
cilk_fiber_deallocate_to_pool(w, t->fiber);
}
Expand All @@ -1263,10 +1301,7 @@ int Cilk_sync(__cilkrts_worker *const w, __cilkrts_stack_frame *frame) {

Closure_suspend(deques, self, t);
t->user_ht = ht; /* set this after state change to suspended */
res = SYNC_NOT_READY;
} else {
cilkrts_alert(SYNC, "(Cilk_sync) closure %p sync successfully",
(void *)t);
setup_for_sync(w, self, t);
}

Expand Down Expand Up @@ -1396,6 +1431,7 @@ void do_what_it_says_boss(__cilkrts_worker *w, Closure *t) {
CILK_STOP_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_IDLE);
worker_scheduler(w);
cilkrts_bug("boss worker exited scheduling loop");
}

void worker_scheduler(__cilkrts_worker *w) {
Expand Down Expand Up @@ -1447,6 +1483,20 @@ void worker_scheduler(__cilkrts_worker *w) {
while (!t && !atomic_load_explicit(&rts->done, memory_order_acquire)) {
CILK_START_TIMING(w, INTERVAL_SCHED);
CILK_START_TIMING(w, INTERVAL_IDLE);

if (is_boss && rts->activate_boss) {
t = rts->root_closure;
resume_boss(w, self, t);
rts->activate_boss = false;
/* bookkeeping */
fails = maybe_reengage_workers
(rts, self, nworkers, w, fails,
&sample_threshold, &inefficient_history, &efficient_history,
sentinel_count_history, &sentinel_count_history_tail,
&recent_sentinel_count);
break;
}

#if ENABLE_THIEF_SLEEP
// Get the set of workers we can steal from and a local copy of the
// index-to-worker map. We'll attempt a few steals using these
Expand All @@ -1469,10 +1519,10 @@ void worker_scheduler(__cilkrts_worker *w) {
uint32_t sentinel = nworkers / 2;
#endif // ENABLE_THIEF_SLEEP
#ifndef __APPLE__
uint32_t lg_sentinel = sentinel == 0 ? 1
const uint32_t lg_sentinel = sentinel == 0 ? 1
: (8 * sizeof(sentinel)) -
__builtin_clz(sentinel);
uint32_t sentinel_div_lg_sentinel =
const uint32_t sentinel_div_lg_sentinel =
sentinel == 0 ? 1
: (sentinel >> (8 * sizeof(lg_sentinel) -
__builtin_clz(lg_sentinel)));
Expand Down Expand Up @@ -1619,9 +1669,6 @@ void worker_scheduler(__cilkrts_worker *w) {

CILK_STOP_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_IDLE);
if (is_boss) {
__builtin_longjmp(rts->boss_ctx, 1);
}
}

void *scheduler_thread_proc(void *arg) {
Expand Down
Loading