Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition in DTD for the local termdet #698

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions parsec/interfaces/dtd/insert_function.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,9 @@ parsec_dtd_taskpool_destructor(parsec_dtd_taskpool_t *tp)
assert( tp->super.tdm.module->taskpool_state(&tp->super) == PARSEC_TERM_TP_NOT_READY );
/* But there should be 0 event on this taskpool at this time */
assert( tp->super.nb_pending_actions == 0 && tp->super.nb_tasks == 0);
/* So, we can terminate this termination detector by stating we are ready */
tp->super.tdm.module->taskpool_ready(&tp->super);
/* So, we can safely stop monitoring this taskpool, and trigger taskpool termination detection */
tp->super.tdm.module->unmonitor_taskpool(&tp->super);
parsec_taskpool_termination_detected(&tp->super);

parsec_dtd_data_collection_fini(&tp->new_tile_dc);
parsec_data_collection_destroy(&tp->new_tile_dc);
Expand Down
68 changes: 51 additions & 17 deletions parsec/mca/termdet/local/termdet_local_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,37 @@ const parsec_termdet_module_t parsec_termdet_local_module = {
}
};

/* The local detector does not need to allocate memory:
* we use the constants below to keep track of the state.
* There is no need for a constant for idle, as the termdet
* transitions directly from busy to terminated.
/* The local detector does not need to allocate memory: we use the constants
* below to keep track of the state. There is no need for a constant for idle,
* as the termdet transitions directly from busy to terminating, and then to
* terminated.
*
* The local termdet is in 'terminating' state during the call to the
* termination_detected() function. It shows the state as 'BUSY' to the user
* at this time. This is used to guarantee that a single thread calls
* termination_detected(), yet that the main thread is not allowed to exit the
* progress loop and call on_leave_wait() in parallel with
* termination_detected(). Calling on_leave_wait() and termination_detected()
* in parallel leads to race conditions for some DSLs (e.g. DTD), as they
* want to delete the monitor in termination_detected(), and create a new one
* in on_leave_wait(). As on_leave_wait() is only called by the master thread
* once taskpool_state() has returned TERMINATED, returning BUSY until *after*
* termination_detected() has returned (by any thread) ensures the proper
* order of execution.
*
* To support RECURSIVE taskpools, the taskpool (and the termination detector)
* which lives in it), might be released during the call to the user callback
* in termination_detected(). In order to support this, the termination
* detector retains the taskpool when it starts to monitor it, and releases
* it when it's done managing termination detection. As a result, the taskpool
* might be destroyed at the end of of termination_detected(). It is thus
* INCORRECT code to refer to the taskpool or the termination detector after
* a call to termination_detected().
*/
#define PARSEC_TERMDET_LOCAL_TERMINATED NULL
#define PARSEC_TERMDET_LOCAL_NOT_READY ((void*)(0x1))
#define PARSEC_TERMDET_LOCAL_BUSY ((void*)(0x2))
#define PARSEC_TERMDET_LOCAL_TERMINATING ((void*)(0x3))

static void parsec_termdet_local_monitor_taskpool(parsec_taskpool_t *tp,
parsec_termdet_termination_detected_function_t cb)
Expand All @@ -87,38 +110,47 @@ static void parsec_termdet_local_monitor_taskpool(parsec_taskpool_t *tp,
static void parsec_termdet_local_unmonitor_taskpool(parsec_taskpool_t *tp)
{
assert(&parsec_termdet_local_module.module == tp->tdm.module);
assert(tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATED);
assert(tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATED ||
(tp->tdm.monitor == PARSEC_TERMDET_LOCAL_NOT_READY && tp->nb_pending_actions == 0 && tp->nb_tasks == 0));
tp->tdm.module = NULL;
tp->tdm.callback = NULL;
}

static parsec_termdet_taskpool_state_t parsec_termdet_local_taskpool_state(parsec_taskpool_t *tp)
{
/* termination_detected() might run in parallel with taskpool_state(),
* so use a local copy of the monitor. It's fine to return an old
* value for the state. */
void *monitor = tp->tdm.monitor;
if( tp->tdm.module == NULL )
return PARSEC_TERM_TP_NOT_MONITORED;
assert(tp->tdm.module == &parsec_termdet_local_module.module);
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATED )
if( PARSEC_TERMDET_LOCAL_TERMINATED == monitor )
return PARSEC_TERM_TP_TERMINATED;
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY )
if( (PARSEC_TERMDET_LOCAL_BUSY == monitor) || (PARSEC_TERMDET_LOCAL_TERMINATING == monitor) )
return PARSEC_TERM_TP_BUSY;
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_NOT_READY )
if( PARSEC_TERMDET_LOCAL_NOT_READY == monitor )
return PARSEC_TERM_TP_NOT_READY;
assert(0);
return -1;
}

/* Reminder: it is incorrect to refer to tp (and/or the termination detector
* which lives in the taskpool) after this function is called */
static void parsec_termdet_local_termination_detected(parsec_taskpool_t *tp)
{
assert( tp->tdm.module != NULL);
assert( tp->tdm.module == &parsec_termdet_local_module.module );
assert( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATED );
assert( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_TERMINATING );
abouteiller marked this conversation as resolved.
Show resolved Hide resolved

PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL\tTASKPOOL %p: termination detected", tp);

if(NULL != tp->tdm.callback) {
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL\tTASKPOOL %p: calling callback", tp);
tp->tdm.callback(tp);
}
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_TERMINATING, PARSEC_TERMDET_LOCAL_TERMINATED);
PARSEC_OBJ_RELEASE(tp);
}

static int parsec_termdet_local_taskpool_ready(parsec_taskpool_t *tp)
Expand All @@ -128,11 +160,11 @@ static int parsec_termdet_local_taskpool_ready(parsec_taskpool_t *tp)
assert( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_NOT_READY );
parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_NOT_READY, PARSEC_TERMDET_LOCAL_BUSY);
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p READY", tp);

PARSEC_OBJ_RETAIN(tp);
abouteiller marked this conversation as resolved.
Show resolved Hide resolved
if( tp->nb_pending_actions == 0) {
/* It's possible another thread sees nb_pending_actions == 0 and BUSY before me, so call the callback
* only if I'm the one setting to terminated */
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
}
}
Expand All @@ -142,12 +174,14 @@ static int parsec_termdet_local_taskpool_ready(parsec_taskpool_t *tp)
static int32_t parsec_termdet_local_taskpool_set_nb_tasks(parsec_taskpool_t *tp, int32_t v)
{
int32_t ov, nbpa = 1; // By default we are not the one to discover nbpa gets to 0
int32_t ret = tp->nb_tasks;
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p NB_TASKS -> %d", tp, v);
assert(v >= 0 || v == PARSEC_RUNTIME_RESERVED_NB_TASKS);
if(tp->nb_tasks != v) {
do {
ov = tp->nb_tasks;
} while(! parsec_atomic_cas_int32(&tp->nb_tasks, ov, v));
ret = tp->nb_tasks;
if( ov == 0 && v > 0 ) {
nbpa = parsec_atomic_fetch_inc_int32(&tp->nb_pending_actions) + 1;
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p NB_PA %d -> %d", tp, nbpa-1, nbpa);
Expand All @@ -157,12 +191,12 @@ static int32_t parsec_termdet_local_taskpool_set_nb_tasks(parsec_taskpool_t *tp,
}
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && nbpa == 0 ) {
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p nbpa == 0", tp);
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
}
}
}
return tp->nb_tasks;
return ret;
}

static int32_t parsec_termdet_local_taskpool_set_runtime_actions(parsec_taskpool_t *tp, int32_t v)
Expand All @@ -174,7 +208,7 @@ static int32_t parsec_termdet_local_taskpool_set_runtime_actions(parsec_taskpool
ov = tp->nb_pending_actions;
} while(!parsec_atomic_cas_int32(&tp->nb_pending_actions, ov, v));
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && v == 0 ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
}
}
Expand All @@ -199,9 +233,9 @@ static int32_t parsec_termdet_local_taskpool_addto_nb_tasks(parsec_taskpool_t *t
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "TERMDET-LOCAL:\tTASKPOOL %p NB_PA %d -> %d", tp, nbpa+1, nbpa);
}
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && nbpa == 0 ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
}
}
}
return ov+v;
}
Expand All @@ -216,7 +250,7 @@ static int32_t parsec_termdet_local_taskpool_addto_runtime_actions(parsec_taskpo
ov = parsec_atomic_fetch_add_int32(&tp->nb_pending_actions, v);
assert(ov+v >= 0);
if( tp->tdm.monitor == PARSEC_TERMDET_LOCAL_BUSY && ov+v == 0 ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATED) ) {
if( parsec_atomic_cas_ptr(&tp->tdm.monitor, PARSEC_TERMDET_LOCAL_BUSY, PARSEC_TERMDET_LOCAL_TERMINATING) ) {
parsec_termdet_local_termination_detected(tp);
}
}
Expand Down
2 changes: 1 addition & 1 deletion parsec/parsec.c
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,7 @@ void parsec_taskpool_unregister( parsec_taskpool_t* tp )
parsec_atomic_lock( &taskpool_array_lock );
assert( tp->taskpool_id < taskpool_array_size );
assert( taskpool_array[tp->taskpool_id] == tp );
assert( PARSEC_TERM_TP_TERMINATED == tp->tdm.module->taskpool_state(tp) );
assert( tp->tdm.module == NULL || PARSEC_TERM_TP_TERMINATED == tp->tdm.module->taskpool_state(tp) );
taskpool_array[tp->taskpool_id] = NOTASKPOOL;
parsec_atomic_unlock( &taskpool_array_lock );
}
Expand Down
Loading