Skip to content

Commit

Permalink
Merge pull request #321 from bosilca/bugfix/184
Browse files Browse the repository at this point in the history
[BBT#237] Allow sender to send data of any size.
  • Loading branch information
abouteiller authored Feb 16, 2024
2 parents d4b7cd1 + c3c3ae4 commit 391fed8
Show file tree
Hide file tree
Showing 35 changed files with 582 additions and 222 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ option(PARSEC_DIST_PRIORITIES
option(PARSEC_DIST_COLLECTIVES
"Use optimized asynchronous operations where collective communication pattern is detected" ON)
set (PARSEC_DIST_SHORT_LIMIT 1 CACHE STRING
"Use the short protocol (no flow control) for messages smaller than the limit in KB. Performs better if smaller than the MTU")
"Use the short protocol (no flow control) for messages smaller than the limit in KB. Performs better if smaller than the MTU.")

### GPU engine parameters
option(PARSEC_GPU_ALLOC_PER_TILE
Expand Down
5 changes: 3 additions & 2 deletions parsec/interfaces/dtd/insert_function.c
Original file line number Diff line number Diff line change
Expand Up @@ -2018,10 +2018,11 @@ output_data_of_dtd_task(parsec_execution_stream_t *es,

static int datatype_lookup_of_dtd_task(parsec_execution_stream_t *es,
const parsec_task_t *this_task,
const parsec_task_t *parent_task,
uint32_t *flow_mask, parsec_dep_data_description_t *data)
{
parsec_arena_datatype_t *adt;
(void)es;
(void)es; (void)parent_task;
data->remote.src_count = data->remote.dst_count = 1;
data->remote.src_displ = data->remote.dst_displ = 0;
data->data_future = NULL;
Expand Down Expand Up @@ -2184,7 +2185,7 @@ parsec_dtd_create_task_classv(const char *name,
tc->release_deps = parsec_dtd_release_deps;
tc->prepare_input = data_lookup_of_dtd_task;
tc->prepare_output = output_data_of_dtd_task;
tc->get_datatype = (parsec_datatype_lookup_t *)datatype_lookup_of_dtd_task;
tc->get_datatype = datatype_lookup_of_dtd_task;
tc->complete_execution = complete_hook_of_dtd;
tc->release_task = parsec_release_dtd_task_to_mempool;

Expand Down
129 changes: 45 additions & 84 deletions parsec/interfaces/ptg/ptg-compiler/jdf2c.c
Original file line number Diff line number Diff line change
Expand Up @@ -4329,7 +4329,7 @@ static void jdf_generate_one_function( const jdf_t *jdf, jdf_function_entry_t *f
string_arena_add_string(sa, " .time_estimate = %s,\n", prefix);
sprintf(prefix, "datatype_lookup_of_%s_%s", jdf_basename, f->fname);
jdf_generate_code_datatype_lookup(jdf, f, prefix);
string_arena_add_string(sa, " .get_datatype = (parsec_datatype_lookup_t*)%s,\n", prefix);
string_arena_add_string(sa, " .get_datatype = %s,\n", prefix);

sprintf(prefix, "hook_of_%s_%s", jdf_basename, f->fname);
jdf_generate_code_hooks(jdf, f, prefix);
Expand Down Expand Up @@ -4590,6 +4590,8 @@ static void jdf_generate_destructor( const jdf_t *jdf )
" __parsec_tp->super.super.taskpool_id, dependencies_size);\n"
" parsec_profiling_add_information(\"MEMORY_USAGE\", meminfo);\n"
" }\n"
"#else\n"
" (void)dependencies_size;\n"
"#endif\n");
}

Expand Down Expand Up @@ -5028,9 +5030,6 @@ jdf_generate_code_fillup_datatypes(string_arena_t * sa_tmp_arena, string_are
{
/* Prepare the memory layout of the output dependency. */
if( dump_all || (sa_arena == NULL) || strcmp(string_arena_get_string(sa_tmp_arena), string_arena_get_string(sa_arena))) {
if( strcmp(target, "local") == 0) {
if(sa_arena != NULL) string_arena_add_string(sa_out, " data%sdata_future = NULL;\n", access);
}
/* The type might change (possibly from undefined), so let's output */
if(sa_arena != NULL){
string_arena_init(sa_arena);
Expand Down Expand Up @@ -5158,7 +5157,9 @@ jdf_generate_code_reshape_input_from_desc(const jdf_t *jdf,
* reshaping of the future. We mark that with NULL and MPI_DATYPE_NULL.
* Otherwise, we use SRC and DST dt.
*/
coutput("%s data.data = chunk;", spaces);
coutput("%s data.data = chunk;\n"
"%s data.data_future = NULL;\n",
spaces, spaces);
reshape_dtt_dst = dl->datatype_local;
reshape_dtt_src = dl->datatype_data;

Expand Down Expand Up @@ -6258,13 +6259,15 @@ jdf_generate_code_datatype_lookup(const jdf_t *jdf,
ai.sa = sa2;
ai.holder = "this_task->locals.";
ai.expr = NULL;
coutput("static int %s(parsec_execution_stream_t *es, const %s *this_task,\n"
coutput("static int %s(parsec_execution_stream_t *es, const parsec_task_t *this_generic_task, const parsec_task_t *parent_task,\n"
" uint32_t* flow_mask, parsec_dep_data_description_t* data)\n"
"{\n"
" const %s *this_task = (%s*)this_generic_task;\n"
" const __parsec_%s_internal_taskpool_t *__parsec_tp = (__parsec_%s_internal_taskpool_t *)this_task->taskpool;\n"
" (void)__parsec_tp; (void)es; (void)this_task; (void)data;\n"
" (void)__parsec_tp; (void)es; (void)this_task; (void)data; (void)parent_task;\n"
"%s",
name, parsec_get_name(jdf, f, "task_t"),
name,
parsec_get_name(jdf, f, "task_t"), parsec_get_name(jdf, f, "task_t"),
jdf_basename, jdf_basename,
UTIL_DUMP_LIST(sa, f->locals, next,
dump_local_assignments, &ai, "", " ", "\n", "\n"));
Expand Down Expand Up @@ -6389,12 +6392,8 @@ jdf_generate_code_datatype_lookup(const jdf_t *jdf,
coutput(" no_mask_match:\n");

coutput(" data->data = NULL;\n"
" data->local.arena = data->remote.arena = NULL;\n"
" data->local.src_datatype = data->local.dst_datatype = PARSEC_DATATYPE_NULL;\n"
" data->remote.src_datatype = data->remote.dst_datatype = PARSEC_DATATYPE_NULL;\n"
" data->local.src_count = data->local.dst_count = 0;\n"
" data->remote.src_count = data->remote.dst_count = 0;\n"
" data->local.src_displ = data->local.dst_displ = 0;\n"
" data->remote.src_displ = data->remote.dst_displ = 0;\n"
" data->data_future = NULL;\n"
" (*flow_mask) = 0; /* nothing left */\n"
Expand Down Expand Up @@ -7619,21 +7618,20 @@ static void jdf_check_relatives( jdf_function_entry_t *f, jdf_dep_flags_t flow_t

#define OUTPUT_PREV_DEPS(MASK, SA_DATATYPE, SA_DEPS) \
if( strlen(string_arena_get_string((SA_DEPS))) ) { \
if( strlen(string_arena_get_string((SA_DATATYPE))) ) { \
string_arena_add_string(sa_coutput, \
" %s", \
string_arena_get_string((SA_DATATYPE))); \
} \
if( (JDF_DEP_FLOW_OUT & flow_type) && fl->flow_dep_mask_out == (MASK) ) { \
string_arena_add_string(sa_coutput, \
" %s", \
" %s" \
" %s", \
string_arena_get_string((SA_DATATYPE)), \
string_arena_get_string((SA_DEPS))); \
} else { \
string_arena_add_string(sa_coutput, \
" if( action_mask & 0x%x ) {\n" \
" %s" \
" %s" \
" }\n", \
MASK, string_arena_get_string((SA_DEPS))); \
MASK, string_arena_get_string((SA_DATATYPE)), \
string_arena_get_string((SA_DEPS))); \
} \
string_arena_init((SA_DEPS)); \
string_arena_init((SA_DATATYPE)); \
Expand All @@ -7655,27 +7653,18 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
string_arena_t *sa_deps = string_arena_new(1024);
string_arena_t *sa_datatype = string_arena_new(1024);

string_arena_t *sa_arena = string_arena_new(256);
string_arena_t *sa_tmp_arena = string_arena_new(256);
string_arena_t *sa_count = string_arena_new(256);
string_arena_t *sa_tmp_arena = string_arena_new(256);
string_arena_t *sa_tmp_count = string_arena_new(256);
string_arena_t *sa_displ = string_arena_new(256);
string_arena_t *sa_tmp_displ = string_arena_new(256);
string_arena_t *sa_type = string_arena_new(256);
string_arena_t *sa_tmp_type = string_arena_new(256);
string_arena_t *sa_temp = string_arena_new(1024);

string_arena_t *sa_arena_r = string_arena_new(256);
string_arena_t *sa_tmp_arena_r = string_arena_new(256);
string_arena_t *sa_count_r = string_arena_new(256);
string_arena_t *sa_tmp_arena_r = string_arena_new(256);
string_arena_t *sa_tmp_count_r = string_arena_new(256);
string_arena_t *sa_displ_r = string_arena_new(256);
string_arena_t *sa_tmp_displ_r = string_arena_new(256);
string_arena_t *sa_type_r = string_arena_new(256);
string_arena_t *sa_tmp_type_r = string_arena_new(256);
string_arena_t *sa_temp_r = string_arena_new(1024);

int last_datatype_idx;
assignment_info_t ai;
expr_info_t info = EMPTY_EXPR_INFO;
int nb_open_ldef;
Expand Down Expand Up @@ -7725,22 +7714,14 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
for(fl = f->dataflow; fl != NULL; fl = fl->next) {
flowempty = 1;
flowtomem = 0;
last_datatype_idx = -1;
string_arena_init(sa_coutput);
string_arena_init(sa_deps);
string_arena_init(sa_datatype);
string_arena_init(sa_arena);
string_arena_init(sa_count);
string_arena_init(sa_displ);
string_arena_init(sa_type);
string_arena_init(sa_arena_r);
string_arena_init(sa_count_r);
string_arena_init(sa_displ_r);
string_arena_init(sa_type_r);
nb_open_ldef = 0;

string_arena_add_string(sa_coutput,
" data.data = this_task->data._f_%s.data_out;\n",
" data.data = this_task->data._f_%s.data_out;\n"
" data.data_future = NULL;\n",
fl->varname);

for(dl = fl->deps; dl != NULL; dl = dl->next) {
Expand All @@ -7759,16 +7740,9 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
string_arena_init(sa_tmp_type_r);
string_arena_init(sa_tmp_displ_r);


string_arena_init(sa_datatype);
/*********************************/
/* LOCAL DATATYPE FOR RESHAPPING */
/* always checked if !=; deps are
* grouped by type_remote, thus no
* checking for last_datatype_idx
* dl->dep_datatype_index
* Change that to minimize the
* number of reshapings? /!\
*/
/*********************************/
if( JDF_FLOW_TYPE_CTL & fl->flow_flags ) {
string_arena_add_string(sa_tmp_arena, "NULL");
Expand All @@ -7794,10 +7768,10 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
&& (NULL == reshape_dtt.layout) ){ /* User didn't specify a custom layout*/

string_arena_add_string(sa_tmp_type, "PARSEC_DATATYPE_NULL");
}else{
} else {
if( NULL == reshape_dtt.layout ){ /* User didn't specify a custom layout*/
string_arena_add_string(sa_tmp_type, "%s->opaque_dtt", string_arena_get_string(sa_temp));
}else{
} else {
string_arena_add_string(sa_tmp_type, "%s", dump_expr((void**)reshape_dtt.layout, &info));
}
}
Expand All @@ -7807,17 +7781,6 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
string_arena_add_string(sa_tmp_displ, "%s", dump_expr((void**)reshape_dtt.displ, &info));
}

string_arena_add_string(sa_datatype," if (action_mask & (PARSEC_ACTION_RESHAPE_ON_RELEASE | PARSEC_ACTION_RESHAPE_REMOTE_ON_RELEASE | PARSEC_ACTION_SEND_REMOTE_DEPS)) {\n");
jdf_generate_code_fillup_datatypes(sa_tmp_arena, sa_arena,
sa_tmp_type, sa_type,
sa_tmp_displ, sa_displ,
sa_tmp_count, sa_count,
NULL,
NULL,
NULL,
sa_datatype,
".", "local", 0);

/* Generate the remote datatype info only during releasing deps of
* a real task. That is, avoid it when after reception during
* release deps of a fake predecessor task.
Expand All @@ -7843,10 +7806,10 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
* of iterate_successors -> get_datatype to recv the data; we are running over "fake predecessor task"
* and the goal is to check the successor datatype) */
string_arena_add_string(sa_tmp_type_r, "(data.data != NULL ? data.data->dtt : PARSEC_DATATYPE_NULL )");
}else{
} else {
if( NULL == dl->datatype_remote.layout ){ /* User didn't specify a custom layout*/
string_arena_add_string(sa_tmp_type_r, "%s->opaque_dtt", string_arena_get_string(sa_temp));
}else{
} else {
string_arena_add_string(sa_tmp_type_r, "%s", dump_expr((void**)dl->datatype_remote.layout, &info));
}
}
Expand All @@ -7855,20 +7818,26 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
string_arena_add_string(sa_tmp_displ_r, "%s", dump_expr((void**)dl->datatype_remote.displ, &info));
}

if( last_datatype_idx != dl->dep_datatype_index ) {
jdf_generate_code_fillup_datatypes(sa_tmp_arena_r, sa_arena_r,
sa_tmp_type_r, sa_type_r,
sa_tmp_displ_r, sa_displ_r,
sa_tmp_count_r, sa_count_r,
NULL,
NULL,
NULL,
sa_datatype,
".", "remote", 0);

last_datatype_idx = dl->dep_datatype_index;
string_arena_add_string(sa_datatype," if (action_mask & (PARSEC_ACTION_RESHAPE_ON_RELEASE | PARSEC_ACTION_RESHAPE_REMOTE_ON_RELEASE | PARSEC_ACTION_SEND_REMOTE_DEPS)) {\n");
jdf_generate_code_fillup_datatypes(sa_tmp_arena, NULL,
sa_tmp_type, NULL,
sa_tmp_displ, NULL,
sa_tmp_count, NULL,
NULL,
NULL,
NULL,
sa_datatype,
".", "local", 0);
jdf_generate_code_fillup_datatypes(sa_tmp_arena_r, NULL,
sa_tmp_type_r, NULL,
sa_tmp_displ_r, NULL,
sa_tmp_count_r, NULL,
NULL,
NULL,
NULL,
sa_datatype,
".", "remote", 0);

}
//end if of string_arena_add_string(sa_datatype," if (action_mask & (PARSEC_ACTION_RESHAPE_ON_RELEASE | PARSEC_ACTION_RESHAPE_REMOTE_ON_RELEASE | PARSEC_ACTION_SEND_REMOTE_DEPS)) {\n");
string_arena_add_string(sa_datatype," }\n");

Expand Down Expand Up @@ -8039,23 +8008,15 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf,
string_arena_free(sa_coutput);
string_arena_free(sa_deps);
string_arena_free(sa_datatype);
string_arena_free(sa_arena);
string_arena_free(sa_tmp_arena);
string_arena_free(sa_count);
string_arena_free(sa_tmp_count);
string_arena_free(sa_displ);
string_arena_free(sa_tmp_displ);
string_arena_free(sa_type);
string_arena_free(sa_tmp_type);
string_arena_free(sa_temp);

string_arena_free(sa_arena_r);
string_arena_free(sa_tmp_arena_r);
string_arena_free(sa_count_r);
string_arena_free(sa_tmp_count_r);
string_arena_free(sa_displ_r);
string_arena_free(sa_tmp_displ_r);
string_arena_free(sa_type_r);
string_arena_free(sa_tmp_type_r);
string_arena_free(sa_temp_r);

Expand Down
8 changes: 5 additions & 3 deletions parsec/mca/device/transfer_gpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ release_task_of_gpu_d2h_task(parsec_execution_stream_t* es,

static int
datatype_lookup_of_gpu_d2h_task( parsec_execution_stream_t * es,
const parsec_gpu_d2h_task_t* this_task,
uint32_t * flow_mask, parsec_dep_data_description_t * data)
const parsec_gpu_d2h_task_t* this_task,
const parsec_task_t * parent_task,
uint32_t * flow_mask,
parsec_dep_data_description_t * data)
{
(void)es; (void)this_task; (void)flow_mask; (void)data;
(void)es; (void)this_task; (void)parent_task; (void)flow_mask; (void)data;
return PARSEC_SUCCESS;
}

Expand Down
22 changes: 13 additions & 9 deletions parsec/parsec.c
Original file line number Diff line number Diff line change
Expand Up @@ -2785,19 +2785,23 @@ int parsec_context_query(parsec_context_t *context, parsec_context_query_cmd_t c
return context->my_rank;

case PARSEC_CONTEXT_QUERY_DEVICES:
int device_type = va_arg(args, int), count = 0;
for( uint32_t i = 0; i < parsec_nb_devices; i++ ) {
dev = parsec_mca_device_get(i);
if( dev->type & device_type ) count++;
{
int device_type = va_arg(args, int), count = 0;
for( uint32_t i = 0; i < parsec_nb_devices; i++ ) {
dev = parsec_mca_device_get(i);
if( dev->type & device_type ) count++;
}
return count;
}
return count;

case PARSEC_CONTEXT_QUERY_CORES:
int nb_total_comp_threads = 0;
for (int idx = 0; idx < context->nb_vp; idx++) {
nb_total_comp_threads += context->virtual_processes[idx]->nb_cores;
{
int nb_total_comp_threads = 0;
for (int idx = 0; idx < context->nb_vp; idx++) {
nb_total_comp_threads += context->virtual_processes[idx]->nb_cores;
}
return nb_total_comp_threads;
}
return nb_total_comp_threads;

case PARSEC_CONTEXT_QUERY_ACTIVE_TASKPOOLS:
return context->active_taskpools;
Expand Down
1 change: 1 addition & 0 deletions parsec/parsec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ typedef parsec_hook_return_t (parsec_evaluate_function_t)(const parsec_task_t* t
*/
typedef int (parsec_datatype_lookup_t)(struct parsec_execution_stream_s* es,
const parsec_task_t * this_task,
const parsec_task_t * parent_task,
uint32_t * flow_mask,
parsec_dep_data_description_t * data);

Expand Down
Loading

0 comments on commit 391fed8

Please sign in to comment.