Skip to content

Make sure workers attempt keepalive #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions runtime/efficiency.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#ifndef _EFFICIENCY_H
#define _EFFICIENCY_H
// Information for histories of efficient and inefficient worker-count samples
// and for sentinel counts.
typedef uint32_t history_sample_t;
#define HISTORY_LENGTH 32
#define SENTINEL_COUNT_HISTORY 4

typedef struct history_t {
history_sample_t inefficient_history;
history_sample_t efficient_history;
unsigned int sentinel_count_history_tail;
unsigned int recent_sentinel_count;
unsigned int fails;
unsigned int sample_threshold;
unsigned int sentinel_count_history[SENTINEL_COUNT_HISTORY];
} history_t;

#endif
123 changes: 85 additions & 38 deletions runtime/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,8 @@ static void do_what_it_says(ReadyDeque *deques, __cilkrts_worker *w,
} while (t);
}

static inline void boss_scheduler(__cilkrts_worker *w);

// Thin wrapper around do_what_it_says to allow the boss thread to execute the
// Cilk computation until it would enter the work-stealing loop.
void do_what_it_says_boss(__cilkrts_worker *w, Closure *t) {
Expand All @@ -1395,18 +1397,79 @@ void do_what_it_says_boss(__cilkrts_worker *w, Closure *t) {

CILK_STOP_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_IDLE);
worker_scheduler(w);
boss_scheduler(w);
}

void worker_scheduler(__cilkrts_worker *w) {
Closure *t = NULL;
CILK_ASSERT_POINTER_EQUAL(w, __cilkrts_get_tls_worker());
static inline void boss_scheduler(__cilkrts_worker *w) {
global_state *const rts = w->g;

CILK_START_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_SCHED);

history_t history = {
.inefficient_history = 0,
.efficient_history = 0,
.sentinel_count_history_tail = 0,
.recent_sentinel_count = SENTINEL_COUNT_HISTORY,
.fails = init_fails(w->l->wake_val, rts),
.sample_threshold = SENTINEL_THRESHOLD,
.sentinel_count_history = { 1 },
};

worker_scheduler(w, &history);

#if ENABLE_THIEF_SLEEP
reset_fails(rts, history.fails);
#endif
CILK_STOP_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_IDLE);
__builtin_longjmp(rts->boss_ctx, 1);
}

static inline void non_boss_scheduler(__cilkrts_worker *w) {
CILK_START_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_SCHED);
global_state *const rts = w->g;
history_t history = {
.inefficient_history = 0,
.efficient_history = 0,
.sentinel_count_history_tail = 0,
.recent_sentinel_count = SENTINEL_COUNT_HISTORY,
.fails = init_fails(w->l->wake_val, rts),
.sample_threshold = SENTINEL_THRESHOLD,
.sentinel_count_history = { 1 },
};

while (!rts->terminate) {
worker_scheduler(w, &history);

// If it appears the computation is done, busy-wait for a while
// before exiting the work-stealing loop, in case another cilkified
// region is started soon.
unsigned int busy_fail = 0;
while (busy_fail++ < BUSY_LOOP_SPIN &&
atomic_load_explicit(&rts->done, memory_order_relaxed)) {
busy_pause();
}
if (thief_should_wait(rts)) {
break;
}
}

#if ENABLE_THIEF_SLEEP
reset_fails(rts, history.fails);
#endif

CILK_STOP_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_IDLE);
}

void worker_scheduler(__cilkrts_worker *w, history_t *const history) {
Closure *t = NULL;
CILK_ASSERT_POINTER_EQUAL(w, __cilkrts_get_tls_worker());

global_state *rts = w->g;
worker_id self = w->self;
const bool is_boss = (0 == self);

// Get this worker's local_state pointer, to avoid rereading it
// unnecessarily during the work-stealing loop. This optimization helps
Expand All @@ -1419,16 +1482,16 @@ void worker_scheduler(__cilkrts_worker *w) {
unsigned int nworkers = rts->nworkers;

// Initialize count of consecutive failed steal attempts.
unsigned int fails = init_fails(l->wake_val, rts);
unsigned int sample_threshold = SENTINEL_THRESHOLD;
unsigned int fails = history->fails;
unsigned int sample_threshold = history->sample_threshold;
// Local history information of the state of the system, for sentinel
// workers to use to determine when to disengage and how many workers to
// reengage.
history_t inefficient_history = 0;
history_t efficient_history = 0;
unsigned int sentinel_count_history[SENTINEL_COUNT_HISTORY] = { 1 };
unsigned int sentinel_count_history_tail = 0;
unsigned int recent_sentinel_count = SENTINEL_COUNT_HISTORY;
history_sample_t inefficient_history = history->inefficient_history;
history_sample_t efficient_history = history->efficient_history;

unsigned int sentinel_count_history_tail = history->sentinel_count_history_tail;
unsigned int recent_sentinel_count = history->recent_sentinel_count;

// Get pointers to the local and global copies of the index-to-worker map.
worker_id *index_to_worker = rts->index_to_worker;
Expand All @@ -1439,7 +1502,7 @@ void worker_scheduler(__cilkrts_worker *w) {
/* A worker entering the steal loop must have saved its reducer map into
the frame to which it belongs. */
CILK_ASSERT(!w->hyper_table ||
(is_boss && atomic_load_explicit(
(self == 0 && atomic_load_explicit(
&rts->done, memory_order_acquire)));

CILK_STOP_TIMING(w, INTERVAL_SCHED);
Expand Down Expand Up @@ -1514,7 +1577,7 @@ void worker_scheduler(__cilkrts_worker *w) {
fails = go_to_sleep_maybe(
rts, self, nworkers, NAP_THRESHOLD, w, t, fails,
&sample_threshold, &inefficient_history, &efficient_history,
sentinel_count_history, &sentinel_count_history_tail,
history->sentinel_count_history, &sentinel_count_history_tail,
&recent_sentinel_count);

if (!t) {
Expand Down Expand Up @@ -1595,33 +1658,17 @@ void worker_scheduler(__cilkrts_worker *w) {
}
#endif // ENABLE_THIEF_SLEEP
t = NULL;
} else if (!is_boss &&
atomic_load_explicit(&rts->done, memory_order_relaxed)) {
// If it appears the computation is done, busy-wait for a while
// before exiting the work-stealing loop, in case another cilkified
// region is started soon.
unsigned int busy_fail = 0;
while (busy_fail++ < BUSY_LOOP_SPIN &&
atomic_load_explicit(&rts->done, memory_order_relaxed)) {
busy_pause();
}
if (thief_should_wait(rts)) {
break;
}
}
}

// Reset the fail count.
#if ENABLE_THIEF_SLEEP
reset_fails(rts, fails);
#endif

l->rand_next = rand_state;
history->fails = fails;
history->sample_threshold = sample_threshold;
history->inefficient_history = inefficient_history;
history->efficient_history = efficient_history;

CILK_STOP_TIMING(w, INTERVAL_SCHED);
worker_change_state(w, WORKER_IDLE);
if (is_boss) {
__builtin_longjmp(rts->boss_ctx, 1);
}
history->sentinel_count_history_tail = sentinel_count_history_tail;
history->recent_sentinel_count = recent_sentinel_count;
}

void *scheduler_thread_proc(void *arg) {
Expand Down Expand Up @@ -1671,7 +1718,7 @@ void *scheduler_thread_proc(void *arg) {
// Such operations, for example might have updated the left-most view of
// a reducer.
if (!atomic_load_explicit(&rts->done, memory_order_acquire)) {
worker_scheduler(w);
non_boss_scheduler(w);
}

CILK_START_TIMING(w, INTERVAL_SLEEP_UNCILK);
Expand Down
3 changes: 2 additions & 1 deletion runtime/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "cilk-internal.h"
#include "closure.h"
#include "efficiency.h"

#define SYNC_READY 0
#define SYNC_NOT_READY 1
Expand All @@ -20,7 +21,7 @@ void Cilk_set_return(__cilkrts_worker *const ws);
void Cilk_exception_handler(__cilkrts_worker *w, char *exn);

CHEETAH_INTERNAL_NORETURN void longjmp_to_runtime(__cilkrts_worker *w);
CHEETAH_INTERNAL void worker_scheduler(__cilkrts_worker *ws);
CHEETAH_INTERNAL void worker_scheduler(__cilkrts_worker *w, history_t *const history);
CHEETAH_INTERNAL void *scheduler_thread_proc(void *arg);

CHEETAH_INTERNAL void promote_own_deque(__cilkrts_worker *w);
Expand Down
39 changes: 17 additions & 22 deletions runtime/worker_sleep.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <time.h>

#include "cilk-internal.h"
#include "efficiency.h"
#include "global.h"
#include "rts-config.h"
#include "sched_stats.h"
Expand Down Expand Up @@ -39,12 +40,6 @@
// worker state. ATTEMPTS must divide SENTINEL_THRESHOLD.
#define ATTEMPTS 4

// Information for histories of efficient and inefficient worker-count samples
// and for sentinel counts.
typedef uint32_t history_t;
#define HISTORY_LENGTH 32
#define SENTINEL_COUNT_HISTORY 4

// Amount of history that must be efficient/inefficient to reengage/disengage
// workers.
#define HISTORY_THRESHOLD (3 * HISTORY_LENGTH / 4)
Expand Down Expand Up @@ -199,15 +194,15 @@ get_worker_counts(uint64_t disengaged_sentinel, unsigned int nworkers) {

// Check if the given worker counts are inefficient, i.e., if active <
// sentinels.
__attribute__((const, always_inline)) static inline history_t
__attribute__((const, always_inline)) static inline history_sample_t
is_inefficient(worker_counts counts) {
return counts.sentinels > 1 && counts.active >= 1 &&
counts.active * AS_RATIO < counts.sentinels * 1;
}

// Check if the given worker counts are efficient, i.e., if active >= 2 *
// sentinels.
__attribute__((const, always_inline)) static inline history_t
__attribute__((const, always_inline)) static inline history_sample_t
is_efficient(worker_counts counts) {
return (counts.active * 1 >= counts.sentinels * AS_RATIO) ||
(counts.sentinels <= 1);
Expand All @@ -232,8 +227,8 @@ maybe_reengage_workers(global_state *const rts, worker_id self,
unsigned int nworkers, __cilkrts_worker *const w,
unsigned int fails,
unsigned int *const sample_threshold,
history_t *const inefficient_history,
history_t *const efficient_history,
history_sample_t *const inefficient_history,
history_sample_t *const efficient_history,
unsigned int *const sentinel_count_history,
unsigned int *const sentinel_count_history_tail,
unsigned int *const recent_sentinel_count) {
Expand All @@ -251,17 +246,17 @@ maybe_reengage_workers(global_state *const rts, worker_id self,
get_worker_counts(disengaged_sentinel - 1, nworkers);
CILK_ASSERT(counts.active >= 1);

history_t my_efficient_history = *efficient_history;
history_t my_inefficient_history = *inefficient_history;
history_sample_t my_efficient_history = *efficient_history;
history_sample_t my_inefficient_history = *inefficient_history;
unsigned int my_sentinel_count = *recent_sentinel_count;
if (fails >= *sample_threshold) {
// Update the inefficient history.
history_t curr_ineff = is_inefficient(counts);
history_sample_t curr_ineff = is_inefficient(counts);
my_inefficient_history = (my_inefficient_history >> 1) |
(curr_ineff << (HISTORY_LENGTH - 1));

// Update the efficient history.
history_t curr_eff = is_efficient(counts);
history_sample_t curr_eff = is_efficient(counts);
my_efficient_history = (my_efficient_history >> 1) |
(curr_eff << (HISTORY_LENGTH - 1));

Expand Down Expand Up @@ -377,8 +372,8 @@ handle_failed_steal_attempts(global_state *const rts, worker_id self,
__cilkrts_worker *const w,
unsigned int fails,
unsigned int *const sample_threshold,
history_t *const inefficient_history,
history_t *const efficient_history,
history_sample_t *const inefficient_history,
history_sample_t *const efficient_history,
unsigned int *const sentinel_count_history,
unsigned int *const sentinel_count_history_tail,
unsigned int *const recent_sentinel_count) {
Expand Down Expand Up @@ -428,16 +423,16 @@ handle_failed_steal_attempts(global_state *const rts, worker_id self,
*sentinel_count_history_tail = (tail + 1) % SENTINEL_COUNT_HISTORY;

// Update the efficient history.
history_t curr_eff = is_efficient(counts);
history_t my_efficient_history = *efficient_history;
history_sample_t curr_eff = is_efficient(counts);
history_sample_t my_efficient_history = *efficient_history;
my_efficient_history = (my_efficient_history >> 1) |
(curr_eff << (HISTORY_LENGTH - 1));
int32_t eff_steps = __builtin_popcount(my_efficient_history);
*efficient_history = my_efficient_history;

// Update the inefficient history.
history_t curr_ineff = is_inefficient(counts);
history_t my_inefficient_history = *inefficient_history;
history_sample_t curr_ineff = is_inefficient(counts);
history_sample_t my_inefficient_history = *inefficient_history;
my_inefficient_history = (my_inefficient_history >> 1) |
(curr_ineff << (HISTORY_LENGTH - 1));
int32_t ineff_steps =
Expand Down Expand Up @@ -547,8 +542,8 @@ static unsigned int go_to_sleep_maybe(global_state *const rts, worker_id self,
__cilkrts_worker *const w,
Closure *const t, unsigned int fails,
unsigned int *const sample_threshold,
history_t *const inefficient_history,
history_t *const efficient_history,
history_sample_t *const inefficient_history,
history_sample_t *const efficient_history,
unsigned int *const sentinel_count_history,
unsigned int *const sentinel_count_history_tail,
unsigned int *const recent_sentinel_count) {
Expand Down
Loading