From 6d29919c16b164ea276795560724f55a2dadc8c7 Mon Sep 17 00:00:00 2001 From: aplowman Date: Wed, 11 Nov 2020 22:00:39 +0000 Subject: [PATCH 01/29] First steps for iterations support --- matflow/models/construction.py | 101 +++++++++++++++++++++++++-------- matflow/models/workflow.py | 14 ++++- matflow/profile.py | 2 + 3 files changed, 89 insertions(+), 28 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 6fd88a1..e2a301f 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -791,11 +791,12 @@ def resolve_group(group, local_inputs, repeats_idx): group_resolved = copy.deepcopy(group) group_resolved.update({ - 'group_idx': group_idx, - 'group_element_idx': group_elem_idx, - 'num_groups': len(group_elem_idx), - 'group_size': len(group_elem_idx[0]), + 'group_idx_per_iteration': group_idx, + 'group_element_idx_per_iteration': group_elem_idx, + 'num_groups_per_iteration': len(group_elem_idx), + 'group_size_per_iteration': len(group_elem_idx[0]), 'group_by': new_group_by, + 'num_groups': len(group_elem_idx), }) if 'merge_priority' not in group_resolved: @@ -804,7 +805,7 @@ def resolve_group(group, local_inputs, repeats_idx): return group_resolved -def get_element_idx(task_lst, dep_idx): +def get_element_idx(task_lst, dep_idx, num_iterations): """For each task, find the element indices that determine the elements to be used (i.e from upstream tasks) to populate task inputs. @@ -857,8 +858,11 @@ def get_element_idx(task_lst, dep_idx): input_idx = arange(loc_in['length']) elem_idx_i = { 'num_elements': loc_in['length'], + 'num_elements_per_iteration': loc_in['length'], + 'num_iterations': num_iterations, 'groups': groups, 'inputs': {i: {'input_idx': input_idx} for i in loc_in['inputs']}, + 'iteration_idx': [0] * loc_in['length'], } else: @@ -936,8 +940,8 @@ def get_element_idx(task_lst, dep_idx): if in_group['group_name'] != 'default': consumed_groups.append('user_group_' + in_group['group_name']) - incoming_size = in_group['num_groups'] - group_size = in_group['group_size'] + incoming_size = in_group['num_groups_per_iteration'] + group_size = in_group['group_size_per_iteration'] if group_size > 1: non_unit_group_sizes.update({in_group['task_idx']: True}) elif in_group['task_idx'] not in non_unit_group_sizes: @@ -964,7 +968,7 @@ def get_element_idx(task_lst, dep_idx): input_alias: { 'task_idx': in_group['task_idx'], 'group': in_group['group_name'], - 'element_idx': in_group['group_element_idx'], + 'element_idx': in_group['group_element_idx_per_iteration'], } }) continue @@ -986,7 +990,7 @@ def get_element_idx(task_lst, dep_idx): 'task_idx': in_group['task_idx'], 'group': in_group['group_name'], 'element_idx': tile( - in_group['group_element_idx'], + in_group['group_element_idx_per_iteration'], existing_size ), } @@ -995,16 +999,20 @@ def get_element_idx(task_lst, dep_idx): # Generate new groups for each group name: for g_name, g in groups.items(): - new_g_idx = extend_index_list(g['group_idx'], incoming_size) + new_g_idx = extend_index_list( + g['group_idx_per_iteration'], incoming_size) new_ge_idx = to_sub_list( extend_index_list( - flatten_list(g['group_element_idx']), + flatten_list(g['group_element_idx_per_iteration']), incoming_size), - g['group_size'] + g['group_size_per_iteration'] ) - groups[g_name]['group_idx'] = new_g_idx - groups[g_name]['group_element_idx'] = new_ge_idx - groups[g_name]['num_groups'] = g['num_groups'] * incoming_size + new_num_groups = g['num_groups_per_iteration'] * incoming_size + + groups[g_name]['group_idx_per_iteration'] = new_g_idx + groups[g_name]['group_element_idx_per_iteration'] = new_ge_idx + groups[g_name]['num_groups_per_iteration'] = new_num_groups + groups[g_name]['num_groups'] = new_num_groups existing_size *= incoming_size @@ -1025,7 +1033,7 @@ def get_element_idx(task_lst, dep_idx): input_alias: { 'task_idx': in_group['task_idx'], 'group': in_group['group_name'], - 'element_idx': in_group['group_element_idx'], + 'element_idx': in_group['group_element_idx_per_iteration'], } }) @@ -1052,28 +1060,71 @@ def get_element_idx(task_lst, dep_idx): for group_name, g in prop_groups.items(): - group_reps = existing_size // (g['num_groups'] * g['group_size']) - new_g_idx = extend_index_list(g['group_idx'], group_reps) + group_reps = existing_size // (g['num_groups_per_iteration'] + * g['group_size_per_iteration']) + new_g_idx = extend_index_list(g['group_idx_per_iteration'], group_reps) new_ge_idx = to_sub_list( extend_index_list( - flatten_list(g['group_element_idx']), + flatten_list(g['group_element_idx_per_iteration']), group_reps), - g['group_size'] + g['group_size_per_iteration'] ) - prop_groups[group_name]['group_idx'] = new_g_idx - prop_groups[group_name]['group_element_idx'] = new_ge_idx - prop_groups[group_name]['num_groups'] = g['num_groups'] * group_reps + new_num_groups = g['num_groups_per_iteration'] * group_reps + + prop_groups[group_name]['group_idx_per_iteration'] = new_g_idx + prop_groups[group_name]['group_element_idx_per_iteration'] = new_ge_idx + prop_groups[group_name]['num_groups_per_iteration'] = new_num_groups + prop_groups[group_name]['num_groups'] = new_num_groups all_groups = {**groups, **prop_groups} elem_idx_i = { 'num_elements': existing_size, + 'num_elements_per_iteration': existing_size, + 'num_iterations': 1, 'inputs': ins_dict, 'groups': all_groups, + 'iteration_idx': [0] * existing_size, } element_idx.append(elem_idx_i) + # Add iterations: + for idx, elem_idx_i in enumerate(element_idx): + + new_iter_idx = [ + iter_idx + for iter_idx in range(elem_idx_i['num_iterations']) + for _ in range(elem_idx_i['num_elements_per_iteration']) + ] + element_idx[idx]['iteration_idx'] = new_iter_idx + + for input_alias, inputs_idx in elem_idx_i['inputs'].items(): + + ins_task_idx = inputs_idx.get('task_idx') + + if ins_task_idx is not None: + # For now, assume non-local inputs are parametrised from the same + # iteration of the upstream task: + + num_elems_per_iter = element_idx[ins_task_idx][ + 'num_elements_per_iteration' + ] + + new_elems_idx = [] + for iter_idx in range(elem_idx_i['num_iterations']): + for i in inputs_idx['element_idx']: + new_elems_idx.append( + [j + (iter_idx * num_elems_per_iter) for j in i] + ) + element_idx[idx]['inputs'][input_alias]['element_idx'] = new_elems_idx + + else: + # Copy local inputs' `inputs_idx` (local inputs must be the same for + # all iterations): + new_ins_idx = tile(inputs_idx['input_idx'], elem_idx_i['num_iterations']) + element_idx[idx]['inputs'][input_alias]['input_idx'] = new_ins_idx + return element_idx @@ -1157,7 +1208,7 @@ def init_local_inputs(task_lst, dep_idx, is_from_file, check_integrity): return task_lst -def init_tasks(workflow, task_lst, is_from_file, check_integrity=True): +def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity=True): """Construct and validate Task objects and the element indices from which to populate task inputs. @@ -1206,7 +1257,7 @@ def init_tasks(workflow, task_lst, is_from_file, check_integrity=True): task_lst = init_local_inputs(task_lst, dep_idx, is_from_file, check_integrity) # Find element indices that determine the elements from which task inputs are drawn: - element_idx = get_element_idx(task_lst, dep_idx) + element_idx = get_element_idx(task_lst, dep_idx, num_iterations) task_objs = [] for task_idx, task_dict in enumerate(task_lst): diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 5d6a139..7633acb 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -94,10 +94,11 @@ class Workflow(object): '_archive_excludes', '_figures', '_metadata', + '_num_iterations', ] def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None, - archive_excludes=None, figures=None, metadata=None, + archive_excludes=None, figures=None, metadata=None, num_iterations=None, check_integrity=True, profile=None, __is_from_file=False): self._id = None # Assigned once by set_ids() @@ -115,8 +116,10 @@ def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None for idx, i in enumerate(figures) ] if figures else [] self._metadata = metadata or {} + self._num_iterations = num_iterations or 1 - tasks, elements_idx = init_tasks(self, tasks, self.is_from_file, check_integrity) + tasks, elements_idx = init_tasks(self, tasks, self.is_from_file, num_iterations, + check_integrity=check_integrity) self._tasks = tasks self._elements_idx = elements_idx @@ -278,6 +281,10 @@ def figures(self): def metadata(self): return self._metadata + @property + def num_iterations(self): + return self._num_iterations + @property def elements_idx(self): return self._elements_idx @@ -447,7 +454,7 @@ def write_directories(self): if task.software_instance.requires_sources: self.get_task_sources_path(task_idx).mkdir() - num_elems = elems_idx['num_elements'] + num_elems = elems_idx['num_elements_per_iteration'] # Generate element directories: for i in range(num_elems): self.get_element_path(task_idx, i).mkdir(exist_ok=True) @@ -1138,6 +1145,7 @@ def load_HDF5_file(cls, path=None, full_path=False, check_integrity=True): WARN_ON_MISSING = [ 'figures', 'metadata', + 'num_iterations', ] for key in WARN_ON_MISSING: if key not in obj_json: diff --git a/matflow/profile.py b/matflow/profile.py index 461c5bd..e3d2263 100644 --- a/matflow/profile.py +++ b/matflow/profile.py @@ -20,6 +20,7 @@ def parse_workflow_profile(profile_path): 'archive_excludes', 'figures', 'metadata', + 'num_iterations', ] miss_keys = list(set(req_keys) - set(profile.keys())) @@ -44,6 +45,7 @@ def parse_workflow_profile(profile_path): 'tasks': profile['tasks'], 'figures': profile.get('figures'), 'metadata': profile.get('metadata'), + 'num_iterations': profile.get('num_iterations'), 'extends': profile.get('extends'), 'archive': profile.get('archive'), 'archive_excludes': profile.get('archive_excludes'), From 19c5c190aacb86911cd2be7fcd93bc27217565dc Mon Sep 17 00:00:00 2001 From: aplowman Date: Thu, 12 Nov 2020 21:10:54 +0000 Subject: [PATCH 02/29] Correctly set num_iterations in element_idx --- matflow/models/construction.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index e2a301f..f04bfbb 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -1081,7 +1081,7 @@ def get_element_idx(task_lst, dep_idx, num_iterations): elem_idx_i = { 'num_elements': existing_size, 'num_elements_per_iteration': existing_size, - 'num_iterations': 1, + 'num_iterations': num_iterations, 'inputs': ins_dict, 'groups': all_groups, 'iteration_idx': [0] * existing_size, @@ -1098,6 +1098,7 @@ def get_element_idx(task_lst, dep_idx, num_iterations): for _ in range(elem_idx_i['num_elements_per_iteration']) ] element_idx[idx]['iteration_idx'] = new_iter_idx + element_idx[idx]['num_elements'] = len(new_iter_idx) for input_alias, inputs_idx in elem_idx_i['inputs'].items(): From ea57b4c726a9c14e47d6513b49210bdc71da8fea Mon Sep 17 00:00:00 2001 From: aplowman Date: Thu, 12 Nov 2020 21:13:33 +0000 Subject: [PATCH 03/29] Initial iterations set up: repeating the workflow --- matflow/api.py | 17 +++- matflow/cli.py | 17 +++- matflow/models/task.py | 4 +- matflow/models/workflow.py | 189 ++++++++++++++++++++++++------------- 4 files changed, 151 insertions(+), 76 deletions(-) diff --git a/matflow/api.py b/matflow/api.py index b3ac186..c7a1f44 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -103,11 +103,11 @@ def load_workflow(directory, full_path=False): return workflow -def prepare_task(task_idx, directory, is_array=False): +def prepare_task(task_idx, iteration_idx, directory, is_array=False): 'Prepare a task for execution by setting inputs and running input maps.' load_extensions() workflow = load_workflow(directory) - workflow.prepare_task(task_idx, is_array=is_array) + workflow.prepare_task(task_idx, iteration_idx, is_array=is_array) def prepare_task_element(task_idx, element_idx, directory, is_array=False): @@ -117,11 +117,11 @@ def prepare_task_element(task_idx, element_idx, directory, is_array=False): workflow.prepare_task_element(task_idx, element_idx, is_array=is_array) -def process_task(task_idx, directory, is_array=False): +def process_task(task_idx, iteration_idx, directory, is_array=False): 'Process a completed task by running the output map.' load_extensions() workflow = load_workflow(directory) - workflow.process_task(task_idx, is_array=is_array) + workflow.process_task(task_idx, iteration_idx, is_array=is_array) def process_task_element(task_idx, element_idx, directory, is_array=False): @@ -167,3 +167,12 @@ def kill(directory): def cloud_connect(provider): Config.set_config() hpcflow_cloud_connect(provider, config_dir=Config.get('hpcflow_config_dir')) + + +def write_element_directories(iteration_idx, directory): + 'Generate element directories for a given iteration.' + load_extensions() + workflow = load_workflow(directory) + if iteration_idx < workflow.num_iterations: + workflow.write_element_directories(iteration_idx) + workflow.prepare_iteration(iteration_idx) diff --git a/matflow/cli.py b/matflow/cli.py index cd907b7..3e6e42b 100644 --- a/matflow/cli.py +++ b/matflow/cli.py @@ -38,11 +38,12 @@ def go(workflow_path, directory=None): @cli.command() @click.option('--task-idx', '-t', type=click.INT, required=True) +@click.option('--iteration-idx', '-i', type=click.INT, required=True) @click.option('--directory', '-d', type=click.Path(exists=True)) @click.option('--array', is_flag=True) -def prepare_task(task_idx, directory=None, array=False): +def prepare_task(task_idx, iteration_idx, directory=None, array=False): print('matflow.cli.prepare_task', flush=True) - api.prepare_task(task_idx, directory, is_array=array) + api.prepare_task(task_idx, iteration_idx, directory, is_array=array) @cli.command() @@ -57,11 +58,12 @@ def prepare_task_element(task_idx, element_idx, directory=None, array=False): @cli.command() @click.option('--task-idx', '-t', type=click.INT, required=True) +@click.option('--iteration-idx', '-i', type=click.INT, required=True) @click.option('--directory', '-d', type=click.Path(exists=True)) @click.option('--array', is_flag=True) -def process_task(task_idx, directory=None, array=False): +def process_task(task_idx, iteration_idx, directory=None, array=False): print('matflow.cli.process_task', flush=True) - api.process_task(task_idx, directory, is_array=array) + api.process_task(task_idx, iteration_idx, directory, is_array=array) @cli.command() @@ -122,5 +124,12 @@ def kill(directory): api.kill(directory) +@cli.command() +@click.option('--iteration-idx', '-i', type=click.INT, required=True) +@click.option('--directory', '-d', type=click.Path(exists=True)) +def write_element_directories(iteration_idx, directory=None): + api.write_element_directories(iteration_idx, directory) + + if __name__ == '__main__': cli() diff --git a/matflow/models/task.py b/matflow/models/task.py index 304098f..a50e1d5 100644 --- a/matflow/models/task.py +++ b/matflow/models/task.py @@ -793,7 +793,7 @@ def get_formatted_commands(self): return fmt_commands, input_vars def get_prepare_task_commands(self, is_array=False): - cmd = f'matflow prepare-task --task-idx={self.task_idx}' + cmd = f'matflow prepare-task --task-idx={self.task_idx} --iteration-idx=$ITER_IDX' cmd += f' --array' if is_array else '' cmds = [cmd] if self.software_instance.task_preparation: @@ -815,7 +815,7 @@ def get_prepare_task_element_commands(self, is_array=False): return out def get_process_task_commands(self, is_array=False): - cmd = f'matflow process-task --task-idx={self.task_idx}' + cmd = f'matflow process-task --task-idx={self.task_idx} --iteration-idx=$ITER_IDX' cmd += f' --array' if is_array else '' cmds = [cmd] if self.software_instance.task_processing: diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 7633acb..4606901 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -45,6 +45,7 @@ datetime_to_dict, get_nested_item, nested_dict_arrays_to_list, + index, ) @@ -437,27 +438,18 @@ def _get_element_temp_process_path(self, task_idx, element_idx): out = element_path.joinpath(f'task_process_{task.id}_element_{element_idx}.hdf5') return out - def write_directories(self): - 'Generate task and element directories.' - - if self.path.exists(): - raise ValueError('Directories for this workflow already exist.') - - self.path.mkdir(exist_ok=False) + def write_element_directories(self, iteration_idx): + 'Generate element directories for a given iteration.' for elems_idx, task in zip(self.elements_idx, self.tasks): - # Generate task directory: task_idx = task.task_idx - self.get_task_path(task_idx).mkdir() - - if task.software_instance.requires_sources: - self.get_task_sources_path(task_idx).mkdir() - num_elems = elems_idx['num_elements_per_iteration'] + iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] + # Generate element directories: - for i in range(num_elems): - self.get_element_path(task_idx, i).mkdir(exist_ok=True) + for elem_idx_i in iter_elem_idx: + self.get_element_path(task_idx, elem_idx_i).mkdir(exist_ok=True) # Copy any local input files to the element directories: for input_alias, inputs_idx in self.elements_idx[task_idx]['inputs'].items(): @@ -472,11 +464,11 @@ def write_directories(self): input_dict = task.local_inputs['inputs'][input_name] local_ins = [input_dict['vals'][i] for i in input_dict['vals_idx']] - for element_idx in range(num_elems): + for elem_idx_i in iter_elem_idx: - file_path = local_ins[inputs_idx['input_idx'][element_idx]] + file_path = local_ins[inputs_idx['input_idx'][elem_idx_i]] file_path_full = self.stage_directory.joinpath(file_path) - elem_path = self.get_element_path(task_idx, i) + elem_path = self.get_element_path(task_idx, elem_idx_i) dst_path = elem_path.joinpath(file_path_full.name) if not file_path_full.is_file(): @@ -486,6 +478,46 @@ def write_directories(self): shutil.copyfile(file_path_full, dst_path) + def prepare_iteration(self, iteration_idx): + + for elems_idx, task in zip(self.elements_idx, self.tasks): + + num_elems = elems_idx['num_elements_per_iteration'] + iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] + cmd_line_inputs, input_vars = self._get_command_line_inputs(task.task_idx) + + for local_in_name, var_name in input_vars.items(): + + var_file_name = '{}.txt'.format(var_name) + + # Create text file in each element directory for each in `input_vars`: + for elem_idx_i in iter_elem_idx: + + task_elem_path = self.get_element_path(task.task_idx, elem_idx_i) + in_val = cmd_line_inputs[local_in_name][elem_idx_i] + + var_file_path = task_elem_path.joinpath(var_file_name) + with var_file_path.open('w') as handle: + handle.write(in_val + '\n') + + def write_directories(self): + 'Generate task and element directories for the first iteration.' + + if self.path.exists(): + raise ValueError('Directories for this workflow already exist.') + + self.path.mkdir(exist_ok=False) + + for elems_idx, task in zip(self.elements_idx, self.tasks): + + # Generate task directory: + self.get_task_path(task.task_idx).mkdir() + + if task.software_instance.requires_sources: + self.get_task_sources_path(task.task_idx).mkdir() + + self.write_element_directories(iteration_idx=0) + def get_hpcflow_job_name(self, task, job_type, is_stats=False): """Get the scheduler job name for a given task index and job type. @@ -532,6 +564,48 @@ def get_hpcflow_job_name(self, task, job_type, is_stats=False): return out + def _get_command_line_inputs(self, task_idx): + + task = self.tasks[task_idx] + elems_idx = self.elements_idx[task_idx] + _, input_vars = task.get_formatted_commands() + + cmd_line_inputs = {} + for local_in_name, local_in in task.local_inputs['inputs'].items(): + + if local_in_name in input_vars: + # TODO: We currently only consider input_vars for local inputs. + + # Expand values for intra-task nesting: + values = [self.get_element_data(i) + for i in local_in['vals_data_idx']] + + # Format values: + fmt_func_scope = Config.get('CLI_arg_maps').get(( + task.schema.name, + task.schema.method, + task.schema.implementation + )) + fmt_func = None + if fmt_func_scope: + fmt_func = fmt_func_scope.get(local_in_name) + if not fmt_func: + fmt_func = DEFAULT_FORMATTERS.get( + type(values[0]), + lambda x: str(x) + ) + + values_fmt = [fmt_func(i) for i in values] + + # Expand values for inter-task nesting: + values_fmt_all = [ + values_fmt[i] + for i in elems_idx['inputs'][local_in_name]['input_idx'] + ] + cmd_line_inputs.update({local_in_name: values_fmt_all}) + + return cmd_line_inputs, input_vars + @requires_path_exists def get_hpcflow_workflow(self): 'Generate an hpcflow workflow to execute this workflow.' @@ -595,41 +669,7 @@ def get_hpcflow_workflow(self): fmt_commands_new.append(i) fmt_commands = fmt_commands_new - cmd_line_inputs = {} - for local_in_name, local_in in task.local_inputs['inputs'].items(): - if local_in_name in input_vars: - # TODO: We currently only consider input_vars for local inputs. - - # Expand values for intra-task nesting: - values = [self.get_element_data(i) - for i in local_in['vals_data_idx']] - - # Format values: - fmt_func_scope = Config.get('CLI_arg_maps').get(( - task.schema.name, - task.schema.method, - task.schema.implementation - )) - fmt_func = None - if fmt_func_scope: - fmt_func = fmt_func_scope.get(local_in_name) - if not fmt_func: - fmt_func = DEFAULT_FORMATTERS.get( - type(values[0]), - lambda x: str(x) - ) - - values_fmt = [fmt_func(i) for i in values] - - # Expand values for inter-task nesting: - values_fmt_all = [ - values_fmt[i] - for i in elems_idx['inputs'][local_in_name]['input_idx'] - ] - cmd_line_inputs.update({local_in_name: values_fmt_all}) - for local_in_name, var_name in input_vars.items(): - var_file_name = '{}.txt'.format(var_name) variables.update({ var_name: { @@ -641,16 +681,6 @@ def get_hpcflow_workflow(self): } }) - # Create text file in each element directory for each in `input_vars`: - for i in range(elems_idx['num_elements']): - - task_elem_path = self.get_element_path(task.task_idx, i) - in_val = cmd_line_inputs[local_in_name][i] - - var_file_path = task_elem_path.joinpath(var_file_name) - with var_file_path.open('w') as handle: - handle.write(in_val + '\n') - task_path_rel = str(self.get_task_path(task.task_idx).name) cur_prepare_opts = task.get_scheduler_options('prepare') @@ -795,6 +825,16 @@ def get_hpcflow_workflow(self): 'archive_excludes': self.archive_excludes, }) + command_groups.append({ + 'directory': '.', + 'nesting': 'hold', + 'commands': ('matflow write-element-directories ' + '--iteration-idx=$(($ITER_IDX+1))'), + 'stats': False, + 'scheduler_options': {}, + 'name': 'iterate', + }) + hf_data = { 'parallel_modes': Config.get('parallel_modes'), 'scheduler': 'sge', @@ -804,6 +844,14 @@ def get_hpcflow_workflow(self): 'variables': variables, } + if self.num_iterations > 1: + hf_data.update({ + 'loop': { + 'max_iterations': self.num_iterations, + 'groups': list(range(len(command_groups))), + } + }) + if self.archive: hf_data.update({'archive_locations': {self.archive: self.archive_definition}}) @@ -1065,6 +1113,9 @@ def write_HDF5_file(self, path=None): self.loaded_path = path + # Write command line argument files into element dirs: + self.prepare_iteration(iteration_idx=0) + @classmethod def load_HDF5_file(cls, path=None, full_path=False, check_integrity=True): """Load workflow from an HDF5 file. @@ -1297,7 +1348,7 @@ def prepare_sources(self, task_idx): handle.write(file_str) @requires_path_exists - def prepare_task(self, task_idx, is_array=False): + def prepare_task(self, task_idx, iteration_idx, is_array=False): """Prepare inputs and run input maps. Parameters @@ -1310,7 +1361,10 @@ def prepare_task(self, task_idx, is_array=False): """ task = self.tasks[task_idx] - for element in task.elements: + num_elems = self.elements_idx[task.task_idx]['num_elements_per_iteration'] + iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] + + for element in index(task.elements, iter_elem_idx): if is_array: @@ -1491,7 +1545,7 @@ def process_task_element(self, task_idx, element_idx, is_array=False): hickle.dump(dat, temp_path) @requires_path_exists - def process_task(self, task_idx, is_array=False): + def process_task(self, task_idx, iteration_idx, is_array=False): """Process outputs from an executed task: run output map and save outputs. Parameters @@ -1504,7 +1558,10 @@ def process_task(self, task_idx, is_array=False): """ task = self.tasks[task_idx] - for element in task.elements: + num_elems = self.elements_idx[task.task_idx]['num_elements_per_iteration'] + iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] + + for element in index(task.elements, iter_elem_idx): if is_array: From 5db3a1e4214d31ec0b02b108969178f94b7299cc Mon Sep 17 00:00:00 2001 From: aplowman Date: Fri, 13 Nov 2020 14:19:16 +0000 Subject: [PATCH 04/29] Save dependency_idx as Workflow attr --- matflow/models/construction.py | 2 +- matflow/models/workflow.py | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index f04bfbb..7095682 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -1281,4 +1281,4 @@ def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity task_objs.append(task) - return task_objs, element_idx + return task_objs, element_idx, dep_idx diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 4606901..cd40852 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -90,6 +90,7 @@ class Workflow(object): '_stage_directory', '_tasks', '_elements_idx', + '_dependency_idx', '_history', '_archive', '_archive_excludes', @@ -119,10 +120,17 @@ def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None self._metadata = metadata or {} self._num_iterations = num_iterations or 1 - tasks, elements_idx = init_tasks(self, tasks, self.is_from_file, num_iterations, - check_integrity=check_integrity) + tasks, elements_idx, dep_idx = init_tasks( + self, + tasks, + self.is_from_file, + self._num_iterations, + check_integrity=check_integrity + ) + self._tasks = tasks self._elements_idx = elements_idx + self._dependency_idx = dep_idx if not self.is_from_file: self._check_archive_connection() @@ -290,6 +298,10 @@ def num_iterations(self): def elements_idx(self): return self._elements_idx + @property + def dependency_idx(self): + return self._dependency_idx + @property def extends(self): return [Path(i) for i in self._extends] From 2a5024c53c79361aa5650837b5f783dd7bfe438a Mon Sep 17 00:00:00 2001 From: aplowman Date: Fri, 13 Nov 2020 14:19:27 +0000 Subject: [PATCH 05/29] Docstrings --- matflow/api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/matflow/api.py b/matflow/api.py index c7a1f44..ac3172b 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -104,7 +104,7 @@ def load_workflow(directory, full_path=False): def prepare_task(task_idx, iteration_idx, directory, is_array=False): - 'Prepare a task for execution by setting inputs and running input maps.' + 'Prepare a task (iteration) for execution by setting inputs and running input maps.' load_extensions() workflow = load_workflow(directory) workflow.prepare_task(task_idx, iteration_idx, is_array=is_array) @@ -118,7 +118,7 @@ def prepare_task_element(task_idx, element_idx, directory, is_array=False): def process_task(task_idx, iteration_idx, directory, is_array=False): - 'Process a completed task by running the output map.' + 'Process a completed task (iteration) by running the output map.' load_extensions() workflow = load_workflow(directory) workflow.process_task(task_idx, iteration_idx, is_array=is_array) From f5fca21ab9c325054314f6440484f3e1a02ba786 Mon Sep 17 00:00:00 2001 From: aplowman Date: Tue, 17 Nov 2020 13:07:25 +0000 Subject: [PATCH 06/29] Comment --- matflow/models/construction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 7095682..ab7b7dd 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -1106,7 +1106,7 @@ def get_element_idx(task_lst, dep_idx, num_iterations): if ins_task_idx is not None: # For now, assume non-local inputs are parametrised from the same - # iteration of the upstream task: + # iteration as the upstream task: num_elems_per_iter = element_idx[ins_task_idx][ 'num_elements_per_iteration' From 3ce40493e803e7c99505a35a3651d2d15b2915b8 Mon Sep 17 00:00:00 2001 From: aplowman Date: Thu, 19 Nov 2020 17:29:13 +0000 Subject: [PATCH 07/29] Fix bad manual merge --- matflow/api.py | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/matflow/api.py b/matflow/api.py index 4c9f63c..c8ca1b5 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -101,24 +101,13 @@ def load_workflow(directory, full_path=False): return workflow -<< << << < HEAD - - def prepare_task(task_idx, iteration_idx, directory, is_array=False): - 'Prepare a task (iteration) for execution by setting inputs and running input maps.' - - -== == == = - + """Prepare a task (iteration) for execution by setting inputs and running input + maps.""" -def prepare_task(task_idx, directory, is_array=False): - """Prepare a task for execution by setting inputs and running input maps.""" - - ->>>>>> > dev -load_extensions() -workflow = load_workflow(directory) -workflow.prepare_task(task_idx, iteration_idx, is_array=is_array) + load_extensions() + workflow = load_workflow(directory) + workflow.prepare_task(task_idx, iteration_idx, is_array=is_array) def prepare_task_element(task_idx, element_idx, directory, is_array=False): From 1e8fd59692aecaa1ef87048dafc753cd93e03969 Mon Sep 17 00:00:00 2001 From: APlowman Date: Tue, 8 Dec 2020 14:34:32 +0000 Subject: [PATCH 08/29] Add get_task_schemas to API --- matflow/__init__.py | 1 + matflow/api.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/matflow/__init__.py b/matflow/__init__.py index e2d3623..7ac5c8f 100644 --- a/matflow/__init__.py +++ b/matflow/__init__.py @@ -8,4 +8,5 @@ append_schema_source, prepend_schema_source, validate, + get_task_schemas, ) diff --git a/matflow/api.py b/matflow/api.py index c8ca1b5..04d7c19 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -176,3 +176,8 @@ def write_element_directories(iteration_idx, directory): if iteration_idx < workflow.num_iterations: workflow.write_element_directories(iteration_idx) workflow.prepare_iteration(iteration_idx) + + +def get_task_schemas(): + Config.set_config() + return Config.get('task_schemas') From ae7d564468e9c22fb4ac4f67cc9e51699c1101b3 Mon Sep 17 00:00:00 2001 From: APlowman Date: Tue, 8 Dec 2020 14:38:59 +0000 Subject: [PATCH 09/29] Add refresh parameter to Config.set_config --- matflow/config.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/matflow/config.py b/matflow/config.py index 8bdf25f..29ba76f 100644 --- a/matflow/config.py +++ b/matflow/config.py @@ -112,7 +112,7 @@ def get_config_file(config_dir): return config_dat, config_file @staticmethod - def set_config(config_dir=None, raise_on_set=False): + def set_config(config_dir=None, raise_on_set=False, refresh=False): """Load configuration from a YAML file.""" config_dir = Config.resolve_config_dir(config_dir) @@ -120,7 +120,8 @@ def set_config(config_dir=None, raise_on_set=False): if Config._is_set: if raise_on_set: raise ConfigurationError('Configuration is already set.') - return + elif not refresh: + return config_dat, _ = Config.get_config_file(config_dir) schema_sources = [Path(i).expanduser() for i in config_dat['task_schema_sources']] From ea4a6ef6640d3d3ceb861e97bfb2913475272b40 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 9 Dec 2020 17:30:53 +0000 Subject: [PATCH 10/29] Fix failure to raise on invalid schemas --- matflow/models/construction.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index bd4f576..6afcf79 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -667,9 +667,10 @@ def validate_task_dict(task, is_from_file, all_software, all_task_schemas, msg = (f'No matching task schema found for task name "{task["name"]}" with ' f'method "{task["method"]}" and software "{soft_inst.software}".') raise MissingSchemaError(msg) - if not Config.get('schema_validity')[schema_key]: - msg = (f'No matching extension function found for the schema with ' - f'implementation: {soft_inst.software}.') + if not Config.get('schema_validity')[schema_key][0]: + msg = (f'Task schema invalid for task schema name "{task["name"]}" with ' + f'method "{task["method"]}" and software "{soft_inst.software}": ' + f'{Config.get("schema_validity")[schema_key][1]}') raise UnsatisfiedSchemaError(msg) # Check any sources required by the main software instance are defined in the From 4c0fb34e0854d44b17cac8fc393cdcf5004a8c1d Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 9 Dec 2020 18:13:05 +0000 Subject: [PATCH 11/29] Only save an output map file once --- matflow/models/workflow.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 2760b40..c0cfe46 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -1498,6 +1498,7 @@ def process_task_element(self, task_idx, element_idx, is_array=False): out_map_lookup = Config.get('output_maps').get(schema_id) # Run output maps: + file_is_saved = [] for out_map in task.schema.output_map: # Filter only those file paths required for this output: @@ -1507,7 +1508,12 @@ def process_task_element(self, task_idx, element_idx, is_array=False): file_paths.append(out_file_path) # Save generated file as string in workflow: - if i['save'] and out_file_path.is_file(): + if ( + i['save'] and + out_file_path.is_file() and + i['name'] not in file_is_saved + ): + file_is_saved.append(i['name']) with out_file_path.open('r') as handle: file_dat = handle.read() if is_array: From 10da05eec6d482c7d4bc5d1720495a415fdaf44b Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 9 Dec 2020 18:15:08 +0000 Subject: [PATCH 12/29] Remove check that output is not an input --- matflow/models/task.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/matflow/models/task.py b/matflow/models/task.py index 0155e9e..7845973 100644 --- a/matflow/models/task.py +++ b/matflow/models/task.py @@ -245,12 +245,6 @@ def _validate_inputs_outputs(self): self.inputs[inp_idx] = inp - # Check the task does not output an input(!): - for i in self.outputs: - if i in self.input_names: - msg = f'Task schema input "{i}" cannot also be an output!' - raise TaskSchemaError(err + msg) - # Check correct keys in supplied input/output maps: for in_map_idx, in_map in enumerate(self.input_map): From 13726ccd977907f184e8017f3db8ef617dd5d634 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 9 Dec 2020 18:27:53 +0000 Subject: [PATCH 13/29] Support inputs as dependencies as well as outputs --- matflow/models/construction.py | 318 +++++++++++++++++++++++---------- matflow/models/element.py | 8 + matflow/models/workflow.py | 8 +- 3 files changed, 231 insertions(+), 103 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 6afcf79..30b8150 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -30,11 +30,11 @@ MissingSchemaError, UnsatisfiedSchemaError, MissingSoftwareSourcesError, + TaskParameterError, ) from matflow.utils import (tile, repeat, arange, extend_index_list, flatten_list, to_sub_list, get_specifier_dict) from matflow.models.task import Task, TaskSchema -from matflow.models.element import Element from matflow.models.software import SoftwareInstance @@ -47,7 +47,7 @@ def normalise_local_inputs(base=None, sequences=None, is_from_file=False): sequences : list, optional is_from_file : bool Has this task dict been loaded from a workflow file or is it associated - with a brand new workflow? + with a brand new workflow? Returns ------- @@ -61,7 +61,7 @@ def normalise_local_inputs(base=None, sequences=None, is_from_file=False): base dict (non-sequence parameters), this is set to -1. For parameters specified in the sequences list, this is whatever is specified by the user or 0 if not specified. - vals : list + vals : list List of values for this input parameter. This will be a list of length one for input parameters specified within the base dict. @@ -142,26 +142,19 @@ def normalise_local_inputs(base=None, sequences=None, is_from_file=False): return inputs_lst -def get_local_inputs(all_tasks, task_idx, dep_idx, is_from_file): +def get_local_inputs(task, is_from_file): """Combine task base/sequences/repeats to get the locally defined inputs for a task. Parameters ---------- - all_tasks : list of dict - Each dict represents a task. This is passed to allow validation of inputs, since - inputs of this task may be specified as outputs of another task. - task_idx : int - Index of the task in `all_tasks` for which local inputs are to be found. - dep_idx : list of list of int + task : dict + Dict representing a task. is_from_file : bool Has this task dict been loaded from a workflow file or is it associated - with a brand new workflow? + with a brand new workflow? """ - task = all_tasks[task_idx] - task_dep_idx = dep_idx[task_idx] - base = task['base'] num_repeats = task['repeats'] or 1 sequences = task['sequences'] @@ -171,22 +164,21 @@ def get_local_inputs(all_tasks, task_idx, dep_idx, is_from_file): schema = task['schema'] inputs_lst = normalise_local_inputs(base, sequences, is_from_file) + default_inputs_lst = [] + + for input_dict in schema.inputs: + if 'default' in input_dict: + default_inputs_lst.append({ + 'name': input_dict['name'], + 'nest_idx': -1, + 'vals': [input_dict['default']], + }) + defined_inputs = [i['name'] for i in inputs_lst] schema.check_surplus_inputs(defined_inputs) - for dep_idx_i in task_dep_idx: - for output in all_tasks[dep_idx_i]['schema'].outputs: - if output in schema.input_names: - defined_inputs.append(output) - - default_values = schema.validate_inputs(defined_inputs) - for in_name, in_val in default_values.items(): - inputs_lst.append({ - 'name': in_name, - 'nest_idx': -1, - 'vals': [in_val], - }) - + # Find the number of elements associated with this task due to its local input + # sequences: inputs_lst.sort(key=lambda x: x['nest_idx']) if inputs_lst: lengths = [len(i['vals']) for i in inputs_lst] @@ -202,7 +194,8 @@ def get_local_inputs(all_tasks, task_idx, dep_idx, is_from_file): else: total_len = 1 - local_ins = {'inputs': {}} + local_ins = {'inputs': {}, 'default_inputs': {}} + repeats_idx = [0] for idx, input_i in enumerate(inputs_lst): @@ -218,7 +211,6 @@ def get_local_inputs(all_tasks, task_idx, dep_idx, is_from_file): vals_idx = repeat(vals_idx, num_repeats) repeats_idx = tile(arange(num_repeats), total_len) - local_ins['repeats_idx'] = repeats_idx local_ins['inputs'].update({ input_i['name']: { 'vals': input_i['vals'], @@ -228,6 +220,16 @@ def get_local_inputs(all_tasks, task_idx, dep_idx, is_from_file): prev_reps = rep_i prev_tile = tile_i + local_ins['repeats_idx'] = repeats_idx + + for def_input_i in default_inputs_lst: + local_ins['default_inputs'].update({ + def_input_i['name']: { + 'vals': def_input_i['vals'], + 'vals_idx': repeat([0] * total_len, num_repeats), + } + }) + allowed_grp = schema.input_names + ['repeats'] allowed_grp_fmt = ', '.join([f'"{i}"' for i in allowed_grp]) @@ -340,6 +342,93 @@ def get_software_instance(software, run_options, all_software, type_label=''): return match +def get_input_dependency(task_info_lst, input_dict, input_task_idx): + + param_name = input_dict['name'] + param_context = input_dict['context'] + param_task = task_info_lst[input_task_idx] + + downstream_task_context = param_task['context'] + downstream_task_name = param_task['name'] + + # Initially collate as a list of dependencies. Then apply some rules to find the + # preferred single dependency. + input_dependency = [] + + for task_idx, task_info in enumerate(task_info_lst): + + if task_idx == input_task_idx: + continue + + upstream_context = task_info['context'] + + # Determine if a dependency is allowed to exist between the given parameter and + # this task, by considering the parameter context, downstream task context and + # upstream task context: + if ( + param_context == upstream_context or ( + (upstream_context == downstream_task_context) and (param_context is None) + ) or ( + upstream_context == '' + ) + ): + # A dependency may exist! The parameter as an output in an upstream task takes + # precedence over the parameter as an input in that same task: + if param_name in task_info['schema'].outputs: + input_dependency.append({ + 'from_task': task_idx, + 'dependency_type': 'output', + 'is_parameter_modifying_task': False, + }) + elif ( + param_name in task_info['schema'].input_names and + param_name in task_info['local_inputs']['inputs'] + ): + input_dependency.append({ + 'from_task': task_idx, + 'dependency_type': 'input', + 'is_parameter_modifying_task': False, + }) + + # Make a note if this task is a parameter-modifying task for this parameter, + # i.e. the parameter is both an input and an output: + if ( + param_name in task_info['schema'].outputs and + param_name in task_info['schema'].input_names + ): + input_dependency[-1].update({'is_parameter_modifying_task': True}) + + # Make sure only a single dependency exists. + + # An "input" dependency_type (meaning this parameter derives from an input in an + # upstream task) must be locally defined in only one task, otherwise the dependency + # is ill-defined: + dep_type_input_count = sum([i['dependency_type'] == 'input' + for i in input_dependency]) + if dep_type_input_count > 1: + msg = ( + f'Task input parameter "{param_name}" from task "{downstream_task_name}" with ' + f'task context "{downstream_task_context}" is found as an input parameter ' + f'for multiple tasks, which makes the task dependency ill-defined.' + ) + raise IncompatibleWorkflow(msg) + + # An "output" dependency_type (meaning this parameter derives from an output in an + # upstream task) must be also be singular, since otherwise the dependency is again + # ill-defined: + dep_type_output_count = sum([i['dependency_type'] == 'output' + for i in input_dependency]) + if dep_type_output_count > 1: + msg = ( + f'Task input parameter "{param_name}" from task "{downstream_task_name}" with ' + f'task context "{downstream_task_context}" is found as an output parameter ' + f'for multiple tasks, which makes the task dependency ill-defined.' + ) + raise IncompatibleWorkflow(msg) + + return input_dependency[0] if input_dependency else None + + def get_dependency_idx(task_info_lst): """Find the dependencies between tasks. @@ -349,6 +438,7 @@ def get_dependency_idx(task_info_lst): Each dict must have keys: context : str schema : TaskSchema + local_inputs : dict Returns ------- @@ -373,51 +463,74 @@ def get_dependency_idx(task_info_lst): """ dependency_idx = [] - all_outputs = [] - for task_info in task_info_lst: + for task_idx, task_info in enumerate(task_info_lst): - downstream_context = task_info['context'] - schema_inputs = task_info['schema'].inputs - schema_outputs = task_info['schema'].outputs + task_name, task_context = task_info['name'], task_info['context'] - # List outputs with their corresponding task contexts: - all_outputs.extend([(i, downstream_context) for i in schema_outputs]) + dep_idx_i = { + 'task_dependencies': [], + 'parameter_dependencies': {}, + } # Find which tasks this task depends on: - output_idx = [] - for input_j in schema_inputs: - - param_name = input_j['name'] - param_context = input_j['context'] + task_dep_idx = [] + for input_dict in task_info['schema'].inputs: + + input_name = input_dict['name'] + is_locally_defined = input_name in task_info['local_inputs']['inputs'] + default_defined = input_name in task_info['local_inputs']['default_inputs'] + input_dependency = get_input_dependency(task_info_lst, input_dict, task_idx) + + add_input_dep = False + if is_locally_defined: + + if input_dependency: + in_dep_task = task_info_lst[input_dependency['from_task']] + in_dep_task_name = in_dep_task['name'] + in_dep_task_context = in_dep_task['context'] + msg = ( + f'Input parameter "{input_name}" for task "{task_name}" with ' + f'task context "{task_context}" has both a local value and a ' + f'non-local value. The non-local value is derived from an ' + f'{input_dependency["dependency_type"]} of task ' + f'"{in_dep_task_name}" with task context ' + f'"{in_dep_task_context}". The local value will be used.' + ) + warn(msg) - for task_idx_k, task_info_k in enumerate(task_info_lst): + elif input_dependency: + add_input_dep = True - if param_name not in task_info_k['schema'].outputs: - continue + elif default_defined: + # Move the default value into the local inputs: + task_info_lst[task_idx]['local_inputs']['inputs'][input_name] = ( + copy.deepcopy(task_info['local_inputs']['default_inputs'][input_name]) + ) - upstream_context = task_info_k['context'] - if ( - param_context == upstream_context or ( - (upstream_context == downstream_context) and - (param_context is None) - ) or (upstream_context == '') - ): - output_idx.append(task_idx_k) + else: + msg = (f'Task input "{input_name}" for task "{task_name}" with task ' + f'context "{task_context}" is missing. A value must be specified ' + f'locally, or a default value must be provided in the schema, or ' + f'an additional task should be added to the workflow that ' + f'generates the parameter') + raise TaskParameterError(msg) - dependency_idx.append(list(set(output_idx))) + if add_input_dep: + dep_idx_i['parameter_dependencies'].update({input_name: input_dependency}) + task_dep_idx.append(input_dependency['from_task']) - if len(all_outputs) != len(set(all_outputs)): - msg = 'Multiple tasks in the workflow have the same output and context!' - raise IncompatibleWorkflow(msg) + dep_idx_i['task_dependencies'] = list(set(task_dep_idx)) + dependency_idx.append(dep_idx_i) # Check for circular dependencies in task inputs/outputs: all_deps = [] for idx, deps in enumerate(dependency_idx): - for i in deps: + for i in deps['task_dependencies']: all_deps.append(tuple(sorted([idx, i]))) if len(all_deps) != len(set(all_deps)): - msg = 'Workflow tasks are circularly dependent!' + msg = (f'Workflow tasks are circularly dependent! `dependency_idx` is: ' + f'{dependency_idx}') raise IncompatibleWorkflow(msg) return dependency_idx @@ -652,9 +765,6 @@ def validate_task_dict(task, is_from_file, all_software, all_task_schemas, type_label='.process', ) - # print(f'prepare_soft_inst:\n{prepare_soft_inst}') - # print(f'process_soft_inst:\n{process_soft_inst}') - schema_key = (task['name'], task['method'], soft_inst.software) task['software_instance'] = soft_inst @@ -703,6 +813,7 @@ def order_tasks(task_lst): must contain the keys: context : str schema : TaskSchema + output_map_options Returns ------- @@ -716,20 +827,35 @@ def order_tasks(task_lst): """ - dep_idx = get_dependency_idx(task_lst) - for i_idx, i in enumerate(task_lst): out_opts = i['schema'].validate_output_map_options(i['output_map_options']) task_lst[i_idx]['output_map_options'] = out_opts + dep_idx = get_dependency_idx(task_lst) + # Find the index at which each task must be positioned to satisfy input # dependencies, and reorder tasks (and `dep_idx`!): - min_idx = [max(i or [0]) + 1 for i in dep_idx] + min_idx = [max(i['task_dependencies'] or [0]) + 1 for i in dep_idx] task_srt_idx = np.argsort(min_idx) + # Reorder tasks: task_lst_srt = [task_lst[i] for i in task_srt_idx] - dep_idx_srt = [[np.argsort(task_srt_idx)[j] for j in dep_idx[i]] - for i in task_srt_idx] + + # Reorder dep_idx list: + dep_idx_srt = [dep_idx[i] for i in task_srt_idx] + + # Dict whose keys are old task index and values are new task index: + task_srt_idx_map = {old_idx: new_idx for old_idx, new_idx in enumerate(task_srt_idx)} + + # Remap `task_dependencies` and `from_task` in each item of dep_idx: + for idx, dep_idx_i in enumerate(dep_idx_srt): + dep_idx_i['task_dependencies'] = [ + task_srt_idx_map[i] for i in dep_idx_i['task_dependencies'] + ] + for param_name, param_dep_val in dep_idx_i['parameter_dependencies'].items(): + dep_idx_i['parameter_dependencies'][param_name]['from_task'] = ( + task_srt_idx_map[param_dep_val['from_task']] + ) # Add sorted task idx: for idx, i in enumerate(task_lst_srt): @@ -824,8 +950,8 @@ def get_element_idx(task_lst, dep_idx, num_iterations): schema : TaskSchema task_idx : int - dep_idx : list of list of int - List of length equal to `task_lst`, whose elements are integer lists that link a + dep_idx : list of dict + List of length equal to `task_lst`, whose elements are ... TODO. ..integer lists that link a given task to the indices of tasks upon which it depends. Returns @@ -843,7 +969,7 @@ def get_element_idx(task_lst, dep_idx, num_iterations): element_idx = [] for idx, downstream_task in enumerate(task_lst): - upstream_tasks = [task_lst[i] for i in dep_idx[idx]] + upstream_tasks = [task_lst[i] for i in dep_idx[idx]['task_dependencies']] # local inputs dict: loc_in = downstream_task['local_inputs'] @@ -883,7 +1009,13 @@ def get_element_idx(task_lst, dep_idx, num_iterations): if input_context is not None: if up_task['context'] != input_context: continue - if input_name in up_task['schema'].outputs: + if ( + input_name in up_task['schema'].outputs or + ( + input_name in up_task['schema'].input_names and + input_name in up_task['local_inputs']['inputs'] + ) + ): group_name_ = group_name if group_name != 'default': group_name_ = 'user_group_' + group_name @@ -1130,15 +1262,12 @@ def get_element_idx(task_lst, dep_idx, num_iterations): return element_idx -def init_local_inputs(task_lst, dep_idx, is_from_file, check_integrity): +def init_local_inputs(task_lst, is_from_file, check_integrity): """Normalise local inputs for each task. Parameters ---------- task_lst : list of dict - - dep_idx : list of list of int - is_from_file : bool Has this task dict been loaded from a workflow file or is it associated with a brand new workflow? @@ -1153,28 +1282,11 @@ def init_local_inputs(task_lst, dep_idx, is_from_file, check_integrity): for task_idx, task in enumerate(task_lst): - local_ins = get_local_inputs(task_lst, task_idx, dep_idx, is_from_file) - - if is_from_file and check_integrity: - - # Don't compare the vals_data_idx: - loaded_local_inputs = copy.deepcopy(task['local_inputs']) - for vals_dict in loaded_local_inputs['inputs'].values(): - del vals_dict['vals_data_idx'] - - if local_ins != loaded_local_inputs: - msg = ( - f'Regenerated local inputs (task: "{task["name"]}") ' - f'are not equivalent to those loaded from the ' - f'workflow file. Stored local inputs are:' - f'\n{task["local_inputs"]}\nRegenerated local ' - f'inputs are:\n{local_ins}\n.' - ) - raise WorkflowPersistenceError(msg) - + if is_from_file: local_ins = task['local_inputs'] - - task_lst[task_idx]['local_inputs'] = local_ins + else: + local_ins = get_local_inputs(task, is_from_file) + task_lst[task_idx]['local_inputs'] = local_ins # Select and set the correct command pathway index according to local inputs: loc_ins_vals = {} @@ -1210,6 +1322,16 @@ def init_local_inputs(task_lst, dep_idx, is_from_file, check_integrity): return task_lst +def validate_inputs(task_lst): + """Validate supplied task inputs.""" + + for task in task_lst: + + schema = task['schema'] + defined_inputs = task['local_inputs']['inputs'].keys() + schema.check_surplus_inputs(defined_inputs) + + def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity=True): """Construct and validate Task objects and the element indices from which to populate task inputs. @@ -1252,11 +1374,13 @@ def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity ) ] + # Validate and normalise locally defined inputs: + task_lst = init_local_inputs(task_lst, is_from_file, check_integrity=False) + # Get dependencies, sort and add `task_idx` to each task: task_lst, dep_idx = order_tasks(task_lst) - # Validate and normalise locally defined inputs: - task_lst = init_local_inputs(task_lst, dep_idx, is_from_file, check_integrity) + validate_inputs(task_lst) # Find element indices that determine the elements from which task inputs are drawn: element_idx = get_element_idx(task_lst, dep_idx, num_iterations) diff --git a/matflow/models/element.py b/matflow/models/element.py index dcdb632..f9408e9 100644 --- a/matflow/models/element.py +++ b/matflow/models/element.py @@ -50,6 +50,14 @@ def as_dict(self): self_dict['files_data_idx'] = self_dict.pop('files').as_dict() return self_dict + def get_parameter_data_idx(self, parameter_name): + try: + out = self.outputs.get_data_idx(parameter_name) + except KeyError: + out = self.inputs.get_data_idx(parameter_name) + + return out + def get_input_data_idx(self, input_name): return self.inputs.get_data_idx(input_name) diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index c0cfe46..49d86e8 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -1262,7 +1262,8 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): data_idx = [] for i in inputs_idx['element_idx'][element_idx]: src_element = self.tasks[ins_task_idx].elements[i] - data_idx.append(src_element.get_output_data_idx(input_name)) + param_data_idx = src_element.get_parameter_data_idx(input_name) + data_idx.append(param_data_idx) if inputs_idx['group'] == 'default': data_idx = data_idx[0] @@ -1684,10 +1685,8 @@ def get_element_data_map(file_path): for task_group in handle[tasks_path].values(): element_path = task_group.name + "/'elements'/data" - # print(f'task_group: {task_group}') element_dicts = [] for elem_group in handle[element_path].values(): - # print(f'elem_group: {elem_group}') params_paths = { 'inputs': elem_group.name + "/'inputs_data_idx'/data", 'outputs': elem_group.name + "/'outputs_data_idx'/data", @@ -1702,9 +1701,6 @@ def get_element_data_map(file_path): elem_idx = elem_idx.tolist() else: elem_idx = [elem_idx] - # print(f'param_name: {param_name}') - # print(f'param_type: {param_type}') - # print(f'elem_idx: type: {type(elem_idx)} {elem_idx}') params_dict[param_type].update( {param_name: [idx_map[i] for i in elem_idx]} ) From 59eb2a2f8957d21c856983708246409404e38547 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 9 Dec 2020 20:16:41 +0000 Subject: [PATCH 14/29] Support "parameter modifying" tasks --- matflow/models/construction.py | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 30b8150..108dacb 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -398,31 +398,18 @@ def get_input_dependency(task_info_lst, input_dict, input_task_idx): ): input_dependency[-1].update({'is_parameter_modifying_task': True}) - # Make sure only a single dependency exists. + if len(input_dependency) > 1: + # Only keep "parameter modifying tasks", because such task will be dependent on + # the "parameter generating" tasks: + input_dependency = [i for i in input_dependency + if i['is_parameter_modifying_task']] - # An "input" dependency_type (meaning this parameter derives from an input in an - # upstream task) must be locally defined in only one task, otherwise the dependency - # is ill-defined: - dep_type_input_count = sum([i['dependency_type'] == 'input' - for i in input_dependency]) - if dep_type_input_count > 1: - msg = ( - f'Task input parameter "{param_name}" from task "{downstream_task_name}" with ' - f'task context "{downstream_task_context}" is found as an input parameter ' - f'for multiple tasks, which makes the task dependency ill-defined.' - ) - raise IncompatibleWorkflow(msg) - - # An "output" dependency_type (meaning this parameter derives from an output in an - # upstream task) must be also be singular, since otherwise the dependency is again - # ill-defined: - dep_type_output_count = sum([i['dependency_type'] == 'output' - for i in input_dependency]) - if dep_type_output_count > 1: + # Make sure only a single dependency exists. + if len(input_dependency) > 1: msg = ( f'Task input parameter "{param_name}" from task "{downstream_task_name}" with ' - f'task context "{downstream_task_context}" is found as an output parameter ' - f'for multiple tasks, which makes the task dependency ill-defined.' + f'task context "{downstream_task_context}" is found as an input/output ' + f'parameter for multiple tasks, which makes the task dependency ill-defined.' ) raise IncompatibleWorkflow(msg) From a343f4a21a062ac1e33c097ddec90b642c390e2b Mon Sep 17 00:00:00 2001 From: APlowman Date: Fri, 11 Dec 2020 22:28:00 +0000 Subject: [PATCH 15/29] Redo task sorting algorithm; fix #40 --- matflow/models/construction.py | 100 ++++++++++++++++++++++++--------- matflow/utils.py | 40 +++++++++++++ 2 files changed, 114 insertions(+), 26 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 108dacb..e8c1926 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -33,7 +33,7 @@ TaskParameterError, ) from matflow.utils import (tile, repeat, arange, extend_index_list, flatten_list, - to_sub_list, get_specifier_dict) + to_sub_list, get_specifier_dict, move_element_forward) from matflow.models.task import Task, TaskSchema from matflow.models.software import SoftwareInstance @@ -413,7 +413,7 @@ def get_input_dependency(task_info_lst, input_dict, input_task_idx): ) raise IncompatibleWorkflow(msg) - return input_dependency[0] if input_dependency else None + return input_dependency[0] if input_dependency else {} def get_dependency_idx(task_info_lst): @@ -455,6 +455,10 @@ def get_dependency_idx(task_info_lst): task_name, task_context = task_info['name'], task_info['context'] dep_idx_i = { + 'task_name': task_name, + 'task_context': task_context, + 'original_idx': task_idx, + 'current_idx': task_idx, 'task_dependencies': [], 'parameter_dependencies': {}, } @@ -523,6 +527,71 @@ def get_dependency_idx(task_info_lst): return dependency_idx +def find_good_task_dependency_position(dep_idx, task_dependencies): + seen_ids = [] + for position, dep_idx_i in enumerate(dep_idx): + seen_ids.append(dep_idx_i['current_idx']) + if all([i in seen_ids for i in task_dependencies]): + return position + raise RuntimeError('No good position exists!') + + +def sort_dependency_idx(dependency_idx): + + dep_idx = copy.deepcopy(dependency_idx) + + # Maximum number of iterations should be that required to completely reverse the + # order, where each item in the list is dependent on only the next task in the list + # (except the final task): + N = len(dep_idx) + MAX_ITER = int((N**2 - N + 2) / 2) + + count = 0 + while True: + + count += 1 + if count > MAX_ITER: + raise RuntimeError(f'Could not sort dependency index in {MAX_ITER} ' + f'iterations!') + + repositioned = False + seen_ids = [] + for idx_i, dep_idx_i in enumerate(dep_idx): + + needs_reposition = any([i not in seen_ids + for i in dep_idx_i['task_dependencies']]) + if needs_reposition: + + good_position = find_good_task_dependency_position( + dep_idx, + dep_idx_i['task_dependencies'], + ) + dep_idx, remap_idx = move_element_forward(dep_idx, idx_i, good_position) + repositioned = True + + # Update dependencies: + for idx_j, dep_idx_j in enumerate(dep_idx): + new_task_deps = [remap_idx[i] for i in dep_idx_j['task_dependencies']] + dep_idx[idx_j]['task_dependencies'] = new_task_deps + dep_idx[idx_j]['current_idx'] = idx_j + for p_name, p_dep in dep_idx_j['parameter_dependencies'].items(): + new_task_dep = remap_idx[p_dep['from_task']] + dep_idx[idx_j]['parameter_dependencies'][p_name]['from_task'] = ( + new_task_dep + ) + + # After repositioning, restart ordering from the beginning: + break + + seen_ids.append(dep_idx_i['current_idx']) + + if not repositioned: + # No repositions were required for any task, our job is done: + break + + return dep_idx + + def validate_run_options(run_opts, type_label=''): # SGE specific: @@ -819,30 +888,9 @@ def order_tasks(task_lst): task_lst[i_idx]['output_map_options'] = out_opts dep_idx = get_dependency_idx(task_lst) - - # Find the index at which each task must be positioned to satisfy input - # dependencies, and reorder tasks (and `dep_idx`!): - min_idx = [max(i['task_dependencies'] or [0]) + 1 for i in dep_idx] - task_srt_idx = np.argsort(min_idx) - - # Reorder tasks: - task_lst_srt = [task_lst[i] for i in task_srt_idx] - - # Reorder dep_idx list: - dep_idx_srt = [dep_idx[i] for i in task_srt_idx] - - # Dict whose keys are old task index and values are new task index: - task_srt_idx_map = {old_idx: new_idx for old_idx, new_idx in enumerate(task_srt_idx)} - - # Remap `task_dependencies` and `from_task` in each item of dep_idx: - for idx, dep_idx_i in enumerate(dep_idx_srt): - dep_idx_i['task_dependencies'] = [ - task_srt_idx_map[i] for i in dep_idx_i['task_dependencies'] - ] - for param_name, param_dep_val in dep_idx_i['parameter_dependencies'].items(): - dep_idx_i['parameter_dependencies'][param_name]['from_task'] = ( - task_srt_idx_map[param_dep_val['from_task']] - ) + dep_idx_srt = sort_dependency_idx(dep_idx) + sort_idx = [i['original_idx'] for i in dep_idx_srt] + task_lst_srt = [task_lst[i] for i in sort_idx] # Add sorted task idx: for idx, i in enumerate(task_lst_srt): diff --git a/matflow/utils.py b/matflow/utils.py index 24dc780..a760287 100644 --- a/matflow/utils.py +++ b/matflow/utils.py @@ -433,3 +433,43 @@ def nested_dict_arrays_to_list(obj): for key, val in obj.items(): obj[key] = nested_dict_arrays_to_list(val) return obj + + +def move_element_forward(lst, index, position, return_map=True): + """Move a list element forward in the list to a new index position.""" + + if index > position: + raise ValueError('`index` cannot be larger than `position`, since that would ' + 'not be a "forward" move!') + + if position > len(lst) - 1: + raise ValueError('`position` must be a valid list index.') + + sub_list_1 = lst[:position + 1] + sub_list_2 = lst[position + 1:] + elem = sub_list_1.pop(index) + out = sub_list_1 + [elem] + sub_list_2 + + # Indices to the left of the element that is to be moved do not change: + idx_map_left = {i: i for i in range(0, index)} + + # The index of the moved element changes to `position` + idx_map_element = {index: position} + + # Indicies to the right of the element up to the new position are decremented: + idx_map_middle = {i: i - 1 for i in range(index + 1, position + 1)} + + # Indices to the right of the new position do not change: + idx_map_right = {i: i for i in range(position + 1, len(lst))} + + idx_map = { + **idx_map_left, + **idx_map_element, + **idx_map_middle, + **idx_map_right + } + + if return_map: + return out, idx_map + else: + return out From 94f147122a3e75410508ca6b7e19c9cc9ed8c7b6 Mon Sep 17 00:00:00 2001 From: APlowman Date: Fri, 11 Dec 2020 23:05:14 +0000 Subject: [PATCH 16/29] Improve raise message --- matflow/models/construction.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index e8c1926..1fe96ba 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -500,10 +500,12 @@ def get_dependency_idx(task_info_lst): else: msg = (f'Task input "{input_name}" for task "{task_name}" with task ' - f'context "{task_context}" is missing. A value must be specified ' - f'locally, or a default value must be provided in the schema, or ' - f'an additional task should be added to the workflow that ' - f'generates the parameter') + f'context "{task_context}" appears to be missing. A value must be ' + f'specified locally, or a default value must be provided in the ' + f'schema, or an additional task should be added to the workflow ' + f'that generates the parameter. If such a task already exists, ' + f'try reordering the tasks in the order in which would you expect ' + f'them to run.') raise TaskParameterError(msg) if add_input_dep: From 602062975ee0aac0fa48433b43f3a073344e6307 Mon Sep 17 00:00:00 2001 From: APlowman Date: Sun, 13 Dec 2020 16:36:23 +0000 Subject: [PATCH 17/29] Normalise keys in element_idx inputs dict --- matflow/models/construction.py | 186 ++++++++++++++++++++------------- matflow/models/workflow.py | 17 +-- 2 files changed, 126 insertions(+), 77 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 1fe96ba..0a60aa9 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -468,6 +468,7 @@ def get_dependency_idx(task_info_lst): for input_dict in task_info['schema'].inputs: input_name = input_dict['name'] + input_alias = input_dict['alias'] is_locally_defined = input_name in task_info['local_inputs']['inputs'] default_defined = input_name in task_info['local_inputs']['default_inputs'] input_dependency = get_input_dependency(task_info_lst, input_dict, task_idx) @@ -509,7 +510,9 @@ def get_dependency_idx(task_info_lst): raise TaskParameterError(msg) if add_input_dep: - dep_idx_i['parameter_dependencies'].update({input_name: input_dependency}) + dep_idx_i['parameter_dependencies'].update({ + input_alias: input_dependency + }) task_dep_idx.append(input_dependency['from_task']) dep_idx_i['task_dependencies'] = list(set(task_dep_idx)) @@ -969,6 +972,58 @@ def resolve_group(group, local_inputs, repeats_idx): return group_resolved +def get_input_groups(task_idx, task_lst, dependency_idx, element_idx): + """Get the (local inputs) group dict for each non-local input of a task. + + Parameters + ---------- + non_local_inputs : list of dict + + """ + + task = task_lst[task_idx] + local_input_names = list(task['local_inputs']['inputs'].keys()) + non_local_inputs = [i for i in task['schema'].inputs + if i['name'] not in local_input_names] + + input_groups = {} + for non_local_input_i in non_local_inputs: + + input_alias = non_local_input_i['alias'] + input_name = non_local_input_i['name'] + group_name = task['schema'].get_input_by_alias(input_alias)['group'] + + task_param_deps = dependency_idx[task_idx]['parameter_dependencies'] + input_task_idx = task_param_deps[input_alias]['from_task'] + input_task = task_lst[input_task_idx] + + if group_name == 'default': + group_name_ = group_name + else: + group_name_ = 'user_group_' + group_name + + group_dict = element_idx[input_task_idx]['groups'] + group_names_fmt = ', '.join([f'"{i}"' for i in group_dict.keys()]) + group_dat = group_dict.get(group_name_) + + if group_dat is None: + msg = (f'No group "{group_name}" defined in the workflow for ' + f'input "{input_name}". Defined groups are: ' + f'{group_names_fmt}.') + raise UnsatisfiedGroupParameter(msg) + + input_groups.update({ + input_alias: { + **group_dat, + 'group_name': group_name, + 'task_idx': input_task_idx, + 'task_name': input_task['name'], + } + }) + + return input_groups + + def get_element_idx(task_lst, dep_idx, num_iterations): """For each task, find the element indices that determine the elements to be used (i.e from upstream tasks) to populate task inputs. @@ -1025,56 +1080,24 @@ def get_element_idx(task_lst, dep_idx, num_iterations): 'num_elements_per_iteration': loc_in['length'], 'num_iterations': num_iterations, 'groups': groups, - 'inputs': {i: {'input_idx': input_idx} for i in loc_in['inputs']}, + 'inputs': { + i: + { + 'local_input_idx': input_idx, + 'task_idx': [None] * len(input_idx), + 'element_idx': [None] * len(input_idx), + 'group': [None] * len(input_idx), + } + for i in loc_in['inputs'] + }, 'iteration_idx': [0] * loc_in['length'], } else: # This task depends on other tasks. - ins_local = list(loc_in['inputs'].keys()) - ins_non_local = [i for i in schema.inputs if i['name'] not in ins_local] - - # Get the (local inputs) group dict for each `ins_non_local` (from upstream - # tasks): - input_groups = {} - for non_loc_inp in ins_non_local: - input_alias = non_loc_inp['alias'] - input_name = non_loc_inp['name'] - input_context = non_loc_inp['context'] - group_name = schema.get_input_by_alias(input_alias)['group'] - for up_task in upstream_tasks: - if input_context is not None: - if up_task['context'] != input_context: - continue - if ( - input_name in up_task['schema'].outputs or - ( - input_name in up_task['schema'].input_names and - input_name in up_task['local_inputs']['inputs'] - ) - ): - group_name_ = group_name - if group_name != 'default': - group_name_ = 'user_group_' + group_name - group_dict = element_idx[up_task['task_idx']]['groups'] - group_names_fmt = ', '.join([f'"{i}"' for i in group_dict.keys()]) - group_dat = group_dict.get(group_name_) - if group_dat is None: - msg = (f'No group "{group_name}" defined in the workflow for ' - f'input "{input_name}". Defined groups are: ' - f'{group_names_fmt}.') - raise UnsatisfiedGroupParameter(msg) - input_groups.update({ - input_alias: { - **group_dat, - 'group_name': group_name, - 'task_idx': up_task['task_idx'], - 'task_name': up_task['name'], - } - }) - break - + input_groups = get_input_groups(idx, task_lst, dep_idx, element_idx) is_nesting_mixed = len(set([i['nest'] for i in input_groups.values()])) > 1 + for input_alias, group_info in input_groups.items(): if group_info['merge_priority'] is None and is_nesting_mixed: @@ -1125,7 +1148,14 @@ def get_element_idx(task_lst, dep_idx, num_iterations): for i in loc_in['inputs']: inp_alias = [j['alias'] for j in schema.inputs if j['name'] == i][0] - ins_dict.update({inp_alias: {'input_idx': input_idx}}) + ins_dict.update({ + inp_alias: { + 'local_input_idx': input_idx, + 'task_idx': [None] * len(input_idx), + 'element_idx': [None] * len(input_idx), + 'group': [None] * len(input_idx), + } + }) for group_name, group in loc_in['groups'].items(): group = resolve_group(group, loc_in['inputs'], repeats_idx) @@ -1134,11 +1164,13 @@ def get_element_idx(task_lst, dep_idx, num_iterations): else: existing_size = incoming_size repeats_idx = [None] * existing_size + element_idx_i = in_group['group_element_idx_per_iteration'] ins_dict.update({ input_alias: { - 'task_idx': in_group['task_idx'], - 'group': in_group['group_name'], - 'element_idx': in_group['group_element_idx_per_iteration'], + 'local_input_idx': [None] * len(element_idx_i), + 'task_idx': [in_group['task_idx']] * len(element_idx_i), + 'element_idx': element_idx_i, + 'group': [in_group['group_name']] * len(element_idx_i), } }) continue @@ -1147,22 +1179,20 @@ def get_element_idx(task_lst, dep_idx, num_iterations): # Repeat existing: for k, v in ins_dict.items(): - if 'task_idx' in v: - elems_idx = repeat(v['element_idx'], incoming_size) - ins_dict[k]['element_idx'] = elems_idx - else: - input_idx = repeat(ins_dict[k]['input_idx'], incoming_size) - ins_dict[k]['input_idx'] = input_idx + for k2, v2 in v.items(): + ins_dict[k][k2] = repeat(v2, incoming_size) # Tile incoming: + element_idx_i = tile( + in_group['group_element_idx_per_iteration'], + existing_size, + ) ins_dict.update({ input_alias: { - 'task_idx': in_group['task_idx'], - 'group': in_group['group_name'], - 'element_idx': tile( - in_group['group_element_idx_per_iteration'], - existing_size - ), + 'local_input_idx': [None] * len(element_idx_i), + 'task_idx': [in_group['task_idx']] * len(element_idx_i), + 'group': [in_group['group_name']] * len(element_idx_i), + 'element_idx': element_idx_i, } }) @@ -1199,11 +1229,13 @@ def get_element_idx(task_lst, dep_idx, num_iterations): ) raise IncompatibleTaskNesting(msg) + element_idx_i = in_group['group_element_idx_per_iteration'] ins_dict.update({ input_alias: { - 'task_idx': in_group['task_idx'], - 'group': in_group['group_name'], - 'element_idx': in_group['group_element_idx_per_iteration'], + 'local_input_idx': [None] * len(element_idx_i), + 'task_idx': [in_group['task_idx']] * len(element_idx_i), + 'group': [in_group['group_name']] * len(element_idx_i), + 'element_idx': element_idx_i, } }) @@ -1262,9 +1294,10 @@ def get_element_idx(task_lst, dep_idx, num_iterations): # Add iterations: for idx, elem_idx_i in enumerate(element_idx): + num_iterations_i = elem_idx_i['num_iterations'] new_iter_idx = [ iter_idx - for iter_idx in range(elem_idx_i['num_iterations']) + for iter_idx in range(num_iterations_i) for _ in range(elem_idx_i['num_elements_per_iteration']) ] element_idx[idx]['iteration_idx'] = new_iter_idx @@ -1272,7 +1305,7 @@ def get_element_idx(task_lst, dep_idx, num_iterations): for input_alias, inputs_idx in elem_idx_i['inputs'].items(): - ins_task_idx = inputs_idx.get('task_idx') + ins_task_idx = inputs_idx['task_idx'][0] if ins_task_idx is not None: # For now, assume non-local inputs are parametrised from the same @@ -1283,18 +1316,31 @@ def get_element_idx(task_lst, dep_idx, num_iterations): ] new_elems_idx = [] - for iter_idx in range(elem_idx_i['num_iterations']): + new_task_idx = [] + new_group = [] + new_loc_ins_idx = [] + + for iter_idx in range(num_iterations_i): for i in inputs_idx['element_idx']: new_elems_idx.append( [j + (iter_idx * num_elems_per_iter) for j in i] ) - element_idx[idx]['inputs'][input_alias]['element_idx'] = new_elems_idx + new_task_idx.extend([ins_task_idx] * len(new_iter_idx)) + new_group.extend([inputs_idx['group'][0]] * len(new_iter_idx)) + new_loc_ins_idx.extend([None] * len(new_iter_idx)) else: # Copy local inputs' `inputs_idx` (local inputs must be the same for # all iterations): - new_ins_idx = tile(inputs_idx['input_idx'], elem_idx_i['num_iterations']) - element_idx[idx]['inputs'][input_alias]['input_idx'] = new_ins_idx + new_loc_ins_idx = tile(inputs_idx['local_input_idx'], num_iterations_i) + new_task_idx = tile(inputs_idx['task_idx'], num_iterations_i) + new_elems_idx = tile(inputs_idx['element_idx'], num_iterations_i) + new_group = tile(inputs_idx['group'], num_iterations_i) + + element_idx[idx]['inputs'][input_alias]['local_input_idx'] = new_loc_ins_idx + element_idx[idx]['inputs'][input_alias]['task_idx'] = new_task_idx + element_idx[idx]['inputs'][input_alias]['group'] = new_group + element_idx[idx]['inputs'][input_alias]['element_idx'] = new_elems_idx return element_idx diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 49d86e8..782a871 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -473,7 +473,7 @@ def write_element_directories(self, iteration_idx): for elem_idx_i in iter_elem_idx: - file_path = local_ins[inputs_idx['input_idx'][elem_idx_i]] + file_path = local_ins[inputs_idx['local_input_idx'][elem_idx_i]] file_path_full = self.stage_directory.joinpath(file_path) elem_path = self.get_element_path(task_idx, elem_idx_i) dst_path = elem_path.joinpath(file_path_full.name) @@ -606,8 +606,8 @@ def _get_command_line_inputs(self, task_idx): # Expand values for inter-task nesting: values_fmt_all = [ - values_fmt[i] - for i in elems_idx['inputs'][local_in_name]['input_idx'] + values_fmt[i] if i is not None else None + for i in elems_idx['inputs'][local_in_name]['local_input_idx'] ] cmd_line_inputs.update({local_in_name: values_fmt_all}) @@ -1256,22 +1256,25 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): input_name = [i['name'] for i in task.schema.inputs if i['alias'] == input_alias][0] - if ins_task_idx is not None: + if ins_task_idx[element_idx] is not None: # Input values sourced from previous task outputs: data_idx = [] for i in inputs_idx['element_idx'][element_idx]: - src_element = self.tasks[ins_task_idx].elements[i] + src_element = self.tasks[ins_task_idx[element_idx]].elements[i] param_data_idx = src_element.get_parameter_data_idx(input_name) data_idx.append(param_data_idx) - if inputs_idx['group'] == 'default': + if inputs_idx['group'][element_idx] == 'default': data_idx = data_idx[0] else: # Input values sourced from `local_inputs` of this task: local_data_idx = task.local_inputs['inputs'][input_name]['vals_data_idx'] - all_data_idx = [local_data_idx[i] for i in inputs_idx['input_idx']] + all_data_idx = [ + (local_data_idx[i] if i is not None else None) + for i in inputs_idx['local_input_idx'] + ] data_idx = all_data_idx[element_idx] if is_array: From 0f3756e8cf96fa6ea285019c776f175972585822 Mon Sep 17 00:00:00 2001 From: APlowman Date: Sun, 13 Dec 2020 17:47:17 +0000 Subject: [PATCH 18/29] Add iterate run options to Workflow --- matflow/api.py | 7 +++++++ matflow/config.py | 4 ++++ matflow/models/workflow.py | 11 +++++++++-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/matflow/api.py b/matflow/api.py index 04d7c19..90c03b6 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -45,6 +45,13 @@ def make_workflow(profile_path, directory=None, write_dirs=True): profile_str = handle.read() profile = {'file': profile_str, 'parsed': copy.deepcopy(workflow_dict)} + + iterate_run_opts = { + **Config.get('default_sticky_iterate_run_options'), + **Config.get('default_iterate_run_options'), + } + workflow_dict.update({'iterate_run_options': iterate_run_opts}) + workflow = Workflow(**workflow_dict, stage_directory=directory, profile=profile) workflow.set_ids() diff --git a/matflow/config.py b/matflow/config.py index 29ba76f..dbb1cd8 100644 --- a/matflow/config.py +++ b/matflow/config.py @@ -18,9 +18,11 @@ class Config(object): 'default_run_options', 'default_preparation_run_options', 'default_processing_run_options', + 'default_iterate_run_options', 'default_sticky_run_options', 'default_sticky_preparation_run_options', 'default_sticky_processing_run_options', + 'default_sticky_iterate_run_options', 'parallel_modes', 'archive_locations', ] @@ -230,9 +232,11 @@ def set_config(config_dir=None, raise_on_set=False, refresh=False): 'default_run_options', 'default_preparation_run_options', 'default_processing_run_options', + 'default_iterate_run_options', 'default_sticky_run_options', 'default_sticky_preparation_run_options', 'default_sticky_processing_run_options', + 'default_sticky_iterate_run_options', ]: Config.__conf[i] = config_dat.get(i, {}) diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 782a871..a0186e5 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -92,11 +92,13 @@ class Workflow(object): '_figures', '_metadata', '_num_iterations', + '_iterate_run_options', ] def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None, archive_excludes=None, figures=None, metadata=None, num_iterations=None, - check_integrity=True, profile=None, __is_from_file=False): + iterate_run_options=None, check_integrity=True, profile=None, + __is_from_file=False): self._id = None # Assigned once by set_ids() self._human_id = None # Assigned once by set_ids() @@ -114,6 +116,7 @@ def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None ] if figures else [] self._metadata = metadata or {} self._num_iterations = num_iterations or 1 + self._iterate_run_options = iterate_run_options or {} tasks, elements_idx, dep_idx = init_tasks( self, @@ -289,6 +292,10 @@ def metadata(self): def num_iterations(self): return self._num_iterations + @property + def iterate_run_options(self): + return self._iterate_run_options + @property def elements_idx(self): return self._elements_idx @@ -838,7 +845,7 @@ def get_hpcflow_workflow(self): 'commands': ('matflow write-element-directories ' '--iteration-idx=$(($ITER_IDX+1))'), 'stats': False, - 'scheduler_options': {}, + 'scheduler_options': self.iterate_run_options, 'name': 'iterate', }) From 80b46c68ad466eaf3cf4c173129ed1a25b8d4d25 Mon Sep 17 00:00:00 2001 From: APlowman Date: Sun, 13 Dec 2020 21:15:42 +0000 Subject: [PATCH 19/29] Add methods for finding element dependency vals --- matflow/errors.py | 4 + matflow/models/element.py | 183 ++++++++++++++++++++++++++++++++++++ matflow/models/workflow.py | 188 +++++++++++++++++++++++++++++++++++++ 3 files changed, 375 insertions(+) diff --git a/matflow/errors.py b/matflow/errors.py index 5e72281..725ac42 100644 --- a/matflow/errors.py +++ b/matflow/errors.py @@ -81,3 +81,7 @@ class UnexpectedSourceMapReturnError(Exception): class CommandError(Exception): """For problems with command groups and commands.""" + + +class WorkflowIterationError(Exception): + """For issues with resolving requested iterations.""" diff --git a/matflow/models/element.py b/matflow/models/element.py index f9408e9..d118b5e 100644 --- a/matflow/models/element.py +++ b/matflow/models/element.py @@ -1,5 +1,7 @@ """matflow.models.element.py""" +import copy + from matflow.models.parameters import Parameters @@ -100,3 +102,184 @@ def add_output(self, output_name, value=None, data_idx=None): def add_file(self, file_name, value=None, data_idx=None): return self.files.add_parameter(file_name, 'files', value, data_idx) + + def get_element_dependencies(self, recurse=False): + """Get the task/element indices of elements that a given element depends on. + + Parameters + ---------- + recurse : bool, optional + If False, only include task/element indices that are direct dependencies of + the given element. If True, also include task/element indices that indirect + dependencies of the given element. + + Returns + ------- + dict of (int : list) + Dict whose keys are task indices and whose values are lists of element indices + for a given task. + + Notes + ----- + For the inverse, see `get_dependent_elements`. + + """ + + task = self.task + workflow = task.workflow + elem_deps = {} + for inp_alias, ins in workflow.elements_idx[task.task_idx]['inputs'].items(): + if ins['task_idx'][self.element_idx] is not None: + dep_elem_idx = ins['element_idx'][self.element_idx] + # (maybe not needed) + if ins['task_idx'][self.element_idx] not in elem_deps: + elem_deps.update({ins['task_idx'][self.element_idx]: []}) + elem_deps[ins['task_idx'][self.element_idx]].extend(dep_elem_idx) + + if recurse: + new_elem_deps = copy.deepcopy(elem_deps) + for task_idx, element_idx in elem_deps.items(): + for element_idx_i in element_idx: + element_i = workflow.tasks[task_idx].elements[element_idx_i] + add_elem_deps = element_i.get_element_dependencies(recurse=True) + for k, v in add_elem_deps.items(): + if k not in new_elem_deps: + new_elem_deps.update({k: []}) + new_elem_deps[k].extend(v) + + elem_deps = new_elem_deps + + # Remove repeats: + for k, v in elem_deps.items(): + elem_deps[k] = list(set(v)) + + return elem_deps + + def get_dependent_elements(self, recurse=False): + """Get the task/element indices of elements that depend on a given element. + + Parameters + ---------- + recurse : bool, optional + If False, only include task/element indices that depend directly on the given + element. If True, also include task/element indices that depend indirectly on + the given element. + + Returns + ------- + dict of (int : list) + Dict whose keys are task indices and whose values are lists of element indices + for a given task. + + Notes + ----- + For the inverse, see `get_element_dependencies`. + + """ + + task = self.task + workflow = task.workflow + dep_elems = {} + + for task_idx, elems_idx in enumerate(workflow.elements_idx): + for inp_alias, ins in elems_idx['inputs'].items(): + if ins.get('task_idx') == task.task_idx: + for element_idx, i in enumerate(ins['element_idx']): + if self.element_idx in i: + if task_idx not in dep_elems: + dep_elems.update({task_idx: []}) + dep_elems[task_idx].append(element_idx) + + if recurse: + new_dep_elems = copy.deepcopy(dep_elems) + for task_idx, element_idx in dep_elems.items(): + for element_idx_i in element_idx: + element_i = workflow.tasks[task_idx].elements[element_idx_i] + add_elem_deps = element_i.get_dependent_elements(recurse=True) + for k, v in add_elem_deps.items(): + if k not in new_dep_elems: + new_dep_elems.update({k: []}) + new_dep_elems[k].extend(v) + + dep_elems = new_dep_elems + + # Remove repeats: + for k, v in dep_elems.items(): + dep_elems[k] = list(set(v)) + + return dep_elems + + def get_parameter_dependency_value(self, parameter_dependency_name): + + workflow = self.task.workflow + + in_tasks = workflow.get_input_tasks(parameter_dependency_name) + out_tasks = workflow.get_output_tasks(parameter_dependency_name) + elem_deps = self.get_element_dependencies(recurse=True) + + if parameter_dependency_name in self.task.schema.input_names: + param_vals = [self.get_input(parameter_dependency_name)] + + elif out_tasks: + elems = [] + out_tasks_valid = set(out_tasks) & set(elem_deps) + if not out_tasks_valid: + msg = (f'Parameter "{parameter_dependency_name}" is not a dependency of ' + f'given element of task "{self.task.name}".') + raise ValueError(msg) + for task_idx in out_tasks_valid: + for i in elem_deps[task_idx]: + elems.append(workflow.tasks[task_idx].elements[i]) + param_vals = [elem.get_output(parameter_dependency_name) for elem in elems] + + elif in_tasks: + elems = [] + in_tasks_valid = set(in_tasks) & set(elem_deps) + if not in_tasks_valid: + msg = (f'Parameter "{parameter_dependency_name}" is not a dependency of ' + f'given element of task "{self.task.name}".') + raise ValueError(msg) + for task_idx in in_tasks_valid: + for i in elem_deps[task_idx]: + elems.append(workflow.tasks[task_idx].elements[i]) + param_vals = [elem.get_input(parameter_dependency_name) for elem in elems] + else: + msg = (f'Parameter "{parameter_dependency_name}" is not an input or output ' + f'parameter for any workflow task.') + raise ValueError(msg) + + if len(param_vals) == 1: + param_vals = param_vals[0] + + return param_vals + + def get_dependent_parameter_value(self, dependent_parameter_name): + + workflow = self.task.workflow + + out_tasks = workflow.get_output_tasks(dependent_parameter_name) + dep_elems = self.get_dependent_elements(recurse=True) + + if dependent_parameter_name in self.task.schema.outputs: + param_vals = [self.get_output(dependent_parameter_name)] + + elif out_tasks: + elems = [] + out_tasks_valid = set(out_tasks) & set(dep_elems) + if not out_tasks_valid: + msg = (f'Parameter "{dependent_parameter_name}" does not depend on the ' + f'given element of task "{self.task.name}".') + raise ValueError(msg) + for task_idx in out_tasks_valid: + for i in dep_elems[task_idx]: + elems.append(workflow.tasks[task_idx].elements[i]) + param_vals = [elem.get_output(dependent_parameter_name) for elem in elems] + else: + msg = (f'Parameter "{dependent_parameter_name}" is not an output parameter ' + f'for any workflow task.') + raise ValueError(msg) + + if len(param_vals) == 1: + param_vals = param_vals[0] + + return param_vals diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index a0186e5..557fadf 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -27,6 +27,7 @@ WorkflowPersistenceError, TaskElementExecutionError, UnexpectedSourceMapReturnError, + WorkflowIterationError, ) from matflow.hicklable import to_hicklable from matflow.models.command import DEFAULT_FORMATTERS @@ -1868,3 +1869,190 @@ def get_workflow_tasks_info(file_path): } tasks_info.append(task_dict) return tasks_info + + def get_input_tasks(self, parameter_name): + 'Return task indices of tasks in which a given parameter is an input.' + + input_task_idx = [] + for task in self.tasks: + if parameter_name in task.schema.input_names: + input_task_idx.append(task.task_idx) + + return input_task_idx + + def get_output_tasks(self, parameter_name): + 'Return task indices of tasks in which a given parameter is an output.' + + output_task_idx = [] + for task in self.tasks: + if parameter_name in task.schema.outputs: + output_task_idx.append(task.task_idx) + + return output_task_idx + + def get_dependent_tasks(self, task_idx, recurse=False): + """Get the indices of tasks that depend on a given task. + + Notes + ----- + For the inverse, see `get_task_dependencies`. + + """ + + out = [] + for idx, dep_idx in enumerate(self.dependency_idx): + if task_idx in dep_idx['task_dependencies']: + out.append(idx) + + if recurse: + out += list(set([ + additional_out + for task_idx_i in out + for additional_out in + self.get_dependent_tasks(task_idx_i, recurse=True) + ])) + + return out + + def get_task_dependencies(self, task_idx, recurse=False): + """Get the indicies of tasks that a given task depends on. + + Notes + ----- + For the inverse, see `get_dependent_tasks`. + + """ + + out = self.dependency_idx[task_idx]['task_dependencies'] + + if recurse: + out += list(set([ + additional_out + for task_idx_i in out + for additional_out in + self.get_task_dependencies(task_idx_i, recurse=True) + ])) + + return out + + def get_dependent_parameters(self, parameter_name, recurse=False, return_list=False): + """Get the names of parameters that depend on a given parameter. + + Parameters + ---------- + parameter_name : str + recurse : bool, optional + If False, only include output parameters from tasks for which the given + parameter is an input. If True, include output parameters from dependent tasks + as well. By default, False. + return_list : bool, optional + If True, return a list of output parameters. If False, return a dict whose + keys are the task indices and whose values are the output parameters from each + task. By default, False. + + Returns + ------- + dict of (int : list of str) or list of str + + Notes + ----- + For the inverse, see `get_parameter_dependencies`. + + """ + + # Get the tasks where given parameter is an input: + all_task_idx = self.get_input_tasks(parameter_name) + + # If recurse, need outputs from dependent tasks as well: + if recurse: + all_task_idx += list(set([ + additional_task + for task_idx_i in all_task_idx + for additional_task in + self.get_dependent_tasks(task_idx_i, recurse=True) + ])) + + # Get output parameters from tasks: + params = { + task_idx: self.tasks[task_idx].schema.outputs + for task_idx in all_task_idx + } + + if return_list: + params = list(set([i for param_vals in params.values() for i in param_vals])) + + return params + + def get_parameter_dependencies(self, parameter_name, recurse=False, return_list=False): + """Get the names of parameters that a given parameter depends on. + + Parameters + ---------- + parameter_name : str + recurse : bool, optional + If False, only include input parameters from tasks for which the given + parameter is an output. If True, include input parameters from task + dependencies as well. By default, False. + return_list : bool, optional + If True, return a list of input parameters. If False, return a dict whose keys + are the task indices and whose values are the input parameters from each task. + By default, False. + + Returns + ------- + dict of (int : list of str) or list of str + + Notes + ----- + For the inverse, see `get_dependent_parameters`. + + """ + + # Get the tasks where given parameter is an output + all_task_idx = self.get_output_tasks(parameter_name) + + # If recurse, need inputs from tasks dependencies as well: + if recurse: + all_task_idx = list(set([ + additional_task + for task_idx_i in all_task_idx + for additional_task in + self.get_task_dependencies(task_idx_i, recurse=True) + ])) + all_task_idx + + # Get input parameters from tasks: + params = { + task_idx: self.tasks[task_idx].schema.input_names + for task_idx in all_task_idx + } + + if return_list: + params = list(set([i for param_vals in params.values() for i in param_vals])) + + return params + + def get_iteration_task_pathway(self, parameter_name): + + originating_tasks = self.get_input_tasks(parameter_name) + dep_tasks = [j for i in originating_tasks + for j in self.get_dependent_tasks(i, recurse=True)] + + # Which dep_tasks produces the iteration parameter? + outputs_iter_param = [parameter_name in self.tasks[i].schema.outputs + for i in dep_tasks] + + if not any(outputs_iter_param): + msg = (f'Parameter "{parameter_name}" is not output by any task and so ' + f'cannot be iterated.') + raise WorkflowIterationError(msg) + + # Only first task for now... + producing_task_idx = dep_tasks[outputs_iter_param.index(True)] + task_pathway = list(set(originating_tasks + dep_tasks)) + out = { + 'task_pathway': task_pathway, + 'originating_tasks': originating_tasks, + 'producing_task': producing_task_idx, + } + + return out From 410abf1cfb3a59d87e1fd1f1a32e31c9d2bc70f8 Mon Sep 17 00:00:00 2001 From: APlowman Date: Sun, 13 Dec 2020 23:13:57 +0000 Subject: [PATCH 20/29] Rejig to support specifying iteration parameter --- matflow/models/construction.py | 21 ++++------ matflow/models/workflow.py | 76 +++++++++++++++++++++++++++++----- matflow/profile.py | 2 + 3 files changed, 74 insertions(+), 25 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 0a60aa9..b5940f1 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -1024,7 +1024,7 @@ def get_input_groups(task_idx, task_lst, dependency_idx, element_idx): return input_groups -def get_element_idx(task_lst, dep_idx, num_iterations): +def get_element_idx(task_lst, dep_idx, num_iterations, iterate): """For each task, find the element indices that determine the elements to be used (i.e from upstream tasks) to populate task inputs. @@ -1415,7 +1415,7 @@ def validate_inputs(task_lst): schema.check_surplus_inputs(defined_inputs) -def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity=True): +def init_tasks(workflow, task_lst, is_from_file, check_integrity=True): """Construct and validate Task objects and the element indices from which to populate task inputs. @@ -1465,22 +1465,15 @@ def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity validate_inputs(task_lst) - # Find element indices that determine the elements from which task inputs are drawn: - element_idx = get_element_idx(task_lst, dep_idx, num_iterations) - task_objs = [] - for task_idx, task_dict in enumerate(task_lst): + task_elements = [] + for task_dict in task_lst: if is_from_file: - task_id = task_dict.pop('id') - elements = task_dict.pop('elements') - else: - task_id = None - num_elements = element_idx[task_idx]['num_elements'] - elements = [{'element_idx': elem_idx} for elem_idx in range(num_elements)] + task_elements.append(task_dict.pop('elements')) + task_id = task_dict.pop('id') if is_from_file else None task = Task(workflow=workflow, **task_dict) - task.init_elements(elements) if is_from_file: task.id = task_id @@ -1489,4 +1482,4 @@ def init_tasks(workflow, task_lst, is_from_file, num_iterations, check_integrity task_objs.append(task) - return task_objs, element_idx, dep_idx + return task_objs, task_elements, dep_idx diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 557fadf..05d9e3a 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -31,7 +31,7 @@ ) from matflow.hicklable import to_hicklable from matflow.models.command import DEFAULT_FORMATTERS -from matflow.models.construction import init_tasks +from matflow.models.construction import init_tasks, get_element_idx from matflow.models.software import SoftwareInstance from matflow.models.task import TaskStatus from matflow.models.parameters import Parameters @@ -93,13 +93,14 @@ class Workflow(object): '_figures', '_metadata', '_num_iterations', + '_iterate', '_iterate_run_options', ] def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None, archive_excludes=None, figures=None, metadata=None, num_iterations=None, - iterate_run_options=None, check_integrity=True, profile=None, - __is_from_file=False): + iterate=None, iterate_run_options=None, check_integrity=True, + profile=None, __is_from_file=False): self._id = None # Assigned once by set_ids() self._human_id = None # Assigned once by set_ids() @@ -116,20 +117,38 @@ def __init__(self, name, tasks, stage_directory=None, extends=None, archive=None for idx, i in enumerate(figures) ] if figures else [] self._metadata = metadata or {} + + tasks, task_elements, dep_idx = init_tasks(self, tasks, self.is_from_file, + check_integrity) + self._tasks = tasks + self._dependency_idx = dep_idx + self._num_iterations = num_iterations or 1 + self._iterate = self._validate_iterate(iterate, self.is_from_file) self._iterate_run_options = iterate_run_options or {} - tasks, elements_idx, dep_idx = init_tasks( - self, - tasks, - self.is_from_file, - self._num_iterations, - check_integrity=check_integrity + # Find element indices that determine the elements from which task inputs are drawn: + task_lst = [ + { + 'task_idx': i.task_idx, + 'local_inputs': i.local_inputs, + 'name': i.name, + 'schema': i.schema, + } for i in tasks + ] + elements_idx = get_element_idx( + task_lst, dep_idx, self.num_iterations, self.iterate ) - self._tasks = tasks + for task in self.tasks: + if self.is_from_file: + elements = task_elements[task.task_idx] + else: + num_elements = elements_idx[task.task_idx]['num_elements'] + elements = [{'element_idx': elem_idx} for elem_idx in range(num_elements)] + task.init_elements(elements) + self._elements_idx = elements_idx - self._dependency_idx = dep_idx if not self.is_from_file: self._check_archive_connection() @@ -224,6 +243,36 @@ def _append_history(self, action, **kwargs): for k, v in attributes.items(): handle[path].attrs[k] = v + def _validate_iterate(self, iterate_dict, is_from_file): + + if not iterate_dict: + return iterate_dict + + elif self.num_iterations is not 1: + msg = "Specify either `iterate` (dict) or `num_iterations` (int)." + raise ValueError(msg) + + req_keys = ['parameter', 'num_iterations'] + if is_from_file: + req_keys.append('task_pathway') + allowed_keys = set(req_keys) + + miss_keys = list(set(req_keys) - set(iterate_dict)) + bad_keys = list(set(iterate_dict) - allowed_keys) + msg = '`iterate` must be a dict.' + if miss_keys: + miss_keys_fmt = ', '.join(['"{}"'.format(i) for i in miss_keys]) + raise WorkflowIterationError(msg + f' Missing keys are: {miss_keys_fmt}.') + if bad_keys: + bad_keys_fmt = ', '.join(['"{}"'.format(i) for i in bad_keys]) + raise WorkflowIterationError(msg + f' Unknown keys are: {bad_keys_fmt}.') + + if not is_from_file: + task_pathway = self.get_iteration_task_pathway(iterate_dict['parameter']) + iterate_dict.update({'task_pathway': task_pathway}) + + return iterate_dict + @property def version(self): return len(self._history) @@ -293,6 +342,10 @@ def metadata(self): def num_iterations(self): return self._num_iterations + @property + def iterate(self): + return self._iterate + @property def iterate_run_options(self): return self._iterate_run_options @@ -1212,6 +1265,7 @@ def load_HDF5_file(cls, path=None, full_path=False, check_integrity=True): 'figures', 'metadata', 'num_iterations', + 'iterate', ] for key in WARN_ON_MISSING: if key not in obj_json: diff --git a/matflow/profile.py b/matflow/profile.py index e3d2263..ce1ae33 100644 --- a/matflow/profile.py +++ b/matflow/profile.py @@ -21,6 +21,7 @@ def parse_workflow_profile(profile_path): 'figures', 'metadata', 'num_iterations', + 'iterate', ] miss_keys = list(set(req_keys) - set(profile.keys())) @@ -46,6 +47,7 @@ def parse_workflow_profile(profile_path): 'figures': profile.get('figures'), 'metadata': profile.get('metadata'), 'num_iterations': profile.get('num_iterations'), + 'iterate': profile.get('iterate'), 'extends': profile.get('extends'), 'archive': profile.get('archive'), 'archive_excludes': profile.get('archive_excludes'), From 55edcb8f43ae5861c847338f4cf9f1221dbdd572 Mon Sep 17 00:00:00 2001 From: APlowman Date: Mon, 14 Dec 2020 01:16:49 +0000 Subject: [PATCH 21/29] Prepare hpcflow workflow for iterations --- matflow/models/construction.py | 15 +++++++++- matflow/models/workflow.py | 54 ++++++++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index b5940f1..2165ae0 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -12,7 +12,7 @@ import copy from warnings import warn - +from pprint import pprint import numpy as np from hpcflow.scheduler import SunGridEngine @@ -1058,9 +1058,19 @@ def get_element_idx(task_lst, dep_idx, num_iterations, iterate): # todo ensure default nest and merge_priority are set on each group (in local_inputs). + print(f'iterate: {iterate}') + print(f'num_iterations: {num_iterations}') + element_idx = [] for idx, downstream_task in enumerate(task_lst): + if iterate and idx in iterate['task_pathway']: + num_iterations = iterate['num_iterations'] + else: + num_iterations = 1 + + print(f'idx: {idx}; num_iterations: {num_iterations}') + upstream_tasks = [task_lst[i] for i in dep_idx[idx]['task_dependencies']] # local inputs dict: @@ -1342,6 +1352,9 @@ def get_element_idx(task_lst, dep_idx, num_iterations, iterate): element_idx[idx]['inputs'][input_alias]['group'] = new_group element_idx[idx]['inputs'][input_alias]['element_idx'] = new_elems_idx + print('element_idx') + pprint(element_idx) + return element_idx diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 05d9e3a..2a760d4 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -254,7 +254,7 @@ def _validate_iterate(self, iterate_dict, is_from_file): req_keys = ['parameter', 'num_iterations'] if is_from_file: - req_keys.append('task_pathway') + req_keys.extend(['task_pathway', 'producing_task', 'originating_tasks']) allowed_keys = set(req_keys) miss_keys = list(set(req_keys) - set(iterate_dict)) @@ -269,7 +269,7 @@ def _validate_iterate(self, iterate_dict, is_from_file): if not is_from_file: task_pathway = self.get_iteration_task_pathway(iterate_dict['parameter']) - iterate_dict.update({'task_pathway': task_pathway}) + iterate_dict.update(task_pathway) return iterate_dict @@ -713,6 +713,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': task.get_scheduler_options('prepare'), 'name': self.get_hpcflow_job_name(task, 'prepare-sources'), + 'meta': {'from_tasks': [task.task_idx]}, }] if task.schema.is_func: @@ -776,6 +777,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_prepare_opts, 'name': self.get_hpcflow_job_name(task, 'prepare-task'), + 'meta': {'from_tasks': [task.task_idx]}, }, { 'directory': '.', @@ -784,6 +786,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_prepare_opts, 'name': self.get_hpcflow_job_name(task, 'prepare-task'), + 'meta': {'from_tasks': [task.task_idx]}, } ]) @@ -795,6 +798,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_prepare_opts, 'name': self.get_hpcflow_job_name(task, 'prepare-task'), + 'meta': {'from_tasks': [task.task_idx]}, }) if task.software_instance.requires_sources: @@ -808,6 +812,7 @@ def get_hpcflow_workflow(self): 'name': self.get_hpcflow_job_name(task, 'run'), 'stats': task.stats, 'stats_name': self.get_hpcflow_job_name(task, 'run', is_stats=True), + 'meta': {'from_tasks': [task.task_idx]}, } env = task.software_instance.env.as_str() if env: @@ -839,6 +844,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_process_opts, 'name': self.get_hpcflow_job_name(task, 'process-prepare-task'), + 'meta': {'from_tasks': [task.task_idx, next_task.task_idx]}, }) add_process_groups = False @@ -854,6 +860,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_process_opts, 'name': self.get_hpcflow_job_name(task, 'process-task'), + 'meta': {'from_tasks': [task.task_idx]}, }, { 'directory': '.', @@ -862,6 +869,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_process_opts, 'name': self.get_hpcflow_job_name(task, 'process-task'), + 'meta': {'from_tasks': [task.task_idx]}, } ]) else: @@ -872,6 +880,7 @@ def get_hpcflow_workflow(self): 'stats': False, 'scheduler_options': cur_process_opts, 'name': self.get_hpcflow_job_name(task, 'process-task'), + 'meta': {'from_tasks': [task.task_idx]}, }) # Add variable for the task directories: @@ -920,6 +929,27 @@ def get_hpcflow_workflow(self): } }) + elif self.iterate: + + # Find which command groups are to be repeated: + iterate_groups = [] + for cmd_group_idx, cmd_group in enumerate(hf_data['command_groups']): + if ( + cmd_group['name'] == 'iterate' or + any([i in self.iterate['task_pathway'] + for i in cmd_group['meta']['from_tasks']]) + ): + iterate_groups.append(cmd_group_idx) + hf_data['command_groups'][cmd_group_idx].pop('meta', None) + # TODO: allow "meta" key in hpcflow command groups. + + hf_data.update({ + 'loop': { + 'max_iterations': self.iterate['num_iterations'], + 'groups': iterate_groups, + } + }) + if self.archive: hf_data.update({'archive_locations': {self.archive: self.archive_definition}}) @@ -1433,6 +1463,16 @@ def prepare_task(self, task_idx, iteration_idx, is_array=False): """ + if ( + iteration_idx > 0 and + self.iterate and + task_idx not in self.iterate['task_pathway'] + ): + # In the case where `prepare_task` for this task is in the same hpcflow + # command group as a `process_task` from the previous task, which is + # undergoing iteration. + return + task = self.tasks[task_idx] num_elems = self.elements_idx[task.task_idx]['num_elements_per_iteration'] iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] @@ -1636,6 +1676,16 @@ def process_task(self, task_idx, iteration_idx, is_array=False): """ + if ( + iteration_idx > 0 and + self.iterate and + task_idx not in self.iterate['task_pathway'] + ): + # In the case where `process_task` for this task is in the same hpcflow + # command group as a `prepare_task` from the next task, which is undergoing + # iteration. + return + task = self.tasks[task_idx] num_elems = self.elements_idx[task.task_idx]['num_elements_per_iteration'] iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] From 7916805cc121824ab2477078fc49d8eda757f555 Mon Sep 17 00:00:00 2001 From: APlowman Date: Tue, 15 Dec 2020 13:59:29 +0000 Subject: [PATCH 22/29] Fixes to enable iteration on parameter --- matflow/api.py | 6 +- matflow/models/construction.py | 164 +++++++++++++++++++++++---------- matflow/models/workflow.py | 20 +++- 3 files changed, 138 insertions(+), 52 deletions(-) diff --git a/matflow/api.py b/matflow/api.py index 90c03b6..d19e6f5 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -180,7 +180,11 @@ def write_element_directories(iteration_idx, directory): 'Generate element directories for a given iteration.' load_extensions() workflow = load_workflow(directory) - if iteration_idx < workflow.num_iterations: + if workflow.iterate: + num_iters = workflow.iterate['num_iterations'] + else: + num_iters = workflow.num_iterations + if iteration_idx < num_iters: workflow.write_element_directories(iteration_idx) workflow.prepare_iteration(iteration_idx) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 2165ae0..8fb3d8f 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -1058,18 +1058,11 @@ def get_element_idx(task_lst, dep_idx, num_iterations, iterate): # todo ensure default nest and merge_priority are set on each group (in local_inputs). - print(f'iterate: {iterate}') - print(f'num_iterations: {num_iterations}') - element_idx = [] for idx, downstream_task in enumerate(task_lst): if iterate and idx in iterate['task_pathway']: num_iterations = iterate['num_iterations'] - else: - num_iterations = 1 - - print(f'idx: {idx}; num_iterations: {num_iterations}') upstream_tasks = [task_lst[i] for i in dep_idx[idx]['task_dependencies']] @@ -1302,58 +1295,131 @@ def get_element_idx(task_lst, dep_idx, num_iterations, iterate): element_idx.append(elem_idx_i) # Add iterations: - for idx, elem_idx_i in enumerate(element_idx): + for task_idx, elem_idx_i in enumerate(element_idx): num_iterations_i = elem_idx_i['num_iterations'] - new_iter_idx = [ - iter_idx - for iter_idx in range(num_iterations_i) - for _ in range(elem_idx_i['num_elements_per_iteration']) - ] - element_idx[idx]['iteration_idx'] = new_iter_idx - element_idx[idx]['num_elements'] = len(new_iter_idx) - for input_alias, inputs_idx in elem_idx_i['inputs'].items(): + # print(f'Considering task {task_idx}, with num_iterations_i: {num_iterations_i}') + + # First iteration is in place, add additional iterations (if they exist for this + # task): + for iter_idx in range(1, num_iterations_i): + + # print(f'\tAdding elements for iteration {iter_idx}') - ins_task_idx = inputs_idx['task_idx'][0] + elem_iter_idx = np.array(elem_idx_i['iteration_idx']) + elems_per_iter = elem_idx_i['num_elements_per_iteration'] + iter_zero_idx = np.where(elem_iter_idx == 0)[0] + # print(f'\titer_zero_idx: {iter_zero_idx}') - if ins_task_idx is not None: - # For now, assume non-local inputs are parametrised from the same - # iteration as the upstream task: + for input_alias, inputs_idx in elem_idx_i['inputs'].items(): - num_elems_per_iter = element_idx[ins_task_idx][ - 'num_elements_per_iteration' - ] + # print(f'\t\tConsidering input "{input_alias}":\n\t\t\tinputs_idx: ' + # f'{inputs_idx}.') - new_elems_idx = [] - new_task_idx = [] - new_group = [] - new_loc_ins_idx = [] + if ( + iterate and + input_alias == iterate['parameter'] and + task_idx in iterate['originating_tasks'] + ): + # New elements should derive from the "producing task" of the previous + # iteration: - for iter_idx in range(num_iterations_i): - for i in inputs_idx['element_idx']: - new_elems_idx.append( - [j + (iter_idx * num_elems_per_iter) for j in i] + task_dep = iterate['producing_task'] + elem_iter_idx_task_dep = np.array( + element_idx[task_dep]['iteration_idx'] + ) + + # elements of task dependency belonging to the previous iteration: + iter_prev_idx_task_dep = np.where( + elem_iter_idx_task_dep == iter_idx - 1 + )[0] + + # print(f'\t\t\tElements should derive from the ' + # f'previous iteration (iter_idx-1={iter_idx-1}) "producing ' + # f'task": {task_dep}; elem_iter_idx_task_dep: ' + # f'{elem_iter_idx_task_dep}; iter_prev_idx_task_dep: ' + # f'{iter_prev_idx_task_dep}.') + + # Very hacky mess: + new_group = inputs_idx['group'][0] or 'default' + iter_current_group = [new_group] * elems_per_iter + + # Very hacky mess: + iter_current_elems_idx = [ + [iter_prev_idx_task_dep[0]] * len( + inputs_idx['element_idx'][0] or [0] ) - new_task_idx.extend([ins_task_idx] * len(new_iter_idx)) - new_group.extend([inputs_idx['group'][0]] * len(new_iter_idx)) - new_loc_ins_idx.extend([None] * len(new_iter_idx)) + ] * elems_per_iter - else: - # Copy local inputs' `inputs_idx` (local inputs must be the same for - # all iterations): - new_loc_ins_idx = tile(inputs_idx['local_input_idx'], num_iterations_i) - new_task_idx = tile(inputs_idx['task_idx'], num_iterations_i) - new_elems_idx = tile(inputs_idx['element_idx'], num_iterations_i) - new_group = tile(inputs_idx['group'], num_iterations_i) - - element_idx[idx]['inputs'][input_alias]['local_input_idx'] = new_loc_ins_idx - element_idx[idx]['inputs'][input_alias]['task_idx'] = new_task_idx - element_idx[idx]['inputs'][input_alias]['group'] = new_group - element_idx[idx]['inputs'][input_alias]['element_idx'] = new_elems_idx - - print('element_idx') - pprint(element_idx) + iter_current_task = [task_dep] * elems_per_iter + iter_current_loc = [None] * elems_per_iter + + add_elements = { + 'local_input_idx': iter_current_loc, + 'task_idx': iter_current_task, + 'element_idx': iter_current_elems_idx, + 'group': iter_current_group, + } + # print(f'\t\t\tAdditional elements are: {add_elements}') + + else: + # New elements should derive from tasks of the most recent iteration: + # print(f'\t\t\tElements should derive from the ' + # f'most recent iteration.') + if inputs_idx['local_input_idx'][0] is not None: + # Tile local inputs for new iteration: + add_elements = {k: [inputs_idx[k][i] for i in iter_zero_idx] + for k in inputs_idx} + # print(f'\t\t\tFound local inputs: additional elements ' + # f'are: {add_elements}') + else: + # Use elements from the most recent iteration of the dependency + # task: + task_dep = inputs_idx['task_idx'][0] + elem_iter_idx_task_dep = np.array( + element_idx[task_dep]['iteration_idx'] + ) + + # elements of task dependency belonging to the most recent + # iteration: + iter_last_idx_task_dep = np.where( + elem_iter_idx_task_dep == np.max(elem_iter_idx_task_dep) + )[0] + + iter_zero_elems_idx = [inputs_idx['element_idx'][i] + for i in iter_zero_idx] + iter_current_elems_idx = [ + [iter_last_idx_task_dep[j] for j in i] + for i in iter_zero_elems_idx + ] + + iter_current_group = [inputs_idx['group'][0]] * elems_per_iter + iter_current_task = [inputs_idx['task_idx'][0]] * elems_per_iter + iter_current_loc = [None] * elems_per_iter + + add_elements = { + 'local_input_idx': iter_current_loc, + 'task_idx': iter_current_task, + 'element_idx': iter_current_elems_idx, + 'group': iter_current_group, + } + + # print(f'\t\t\tFound non-local inputs: additional elements ' + # f'are: {add_elements}') + + for k in elem_idx_i['inputs'][input_alias]: + elem_idx_i['inputs'][input_alias][k] += add_elements[k] + + # Update num_elements, iteration_idx with each iter_idx loop: + new_iter_idx = ( + element_idx[task_idx]['iteration_idx'] + [iter_idx] * elems_per_iter + ) + element_idx[task_idx]['iteration_idx'] = new_iter_idx + element_idx[task_idx]['num_elements'] = len(new_iter_idx) + + # print('element_idx') + # pprint(element_idx) return element_idx diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 2a760d4..782c5ac 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -511,6 +511,13 @@ def write_element_directories(self, iteration_idx): for elems_idx, task in zip(self.elements_idx, self.tasks): + if ( + iteration_idx > 0 and + self.iterate and + task.task_idx not in self.iterate['task_pathway'] + ): + continue + task_idx = task.task_idx num_elems = elems_idx['num_elements_per_iteration'] iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] @@ -550,6 +557,13 @@ def prepare_iteration(self, iteration_idx): for elems_idx, task in zip(self.elements_idx, self.tasks): + if ( + iteration_idx > 0 and + self.iterate and + task.task_idx not in self.iterate['task_pathway'] + ): + continue + num_elems = elems_idx['num_elements_per_iteration'] iter_elem_idx = [i + (iteration_idx * num_elems) for i in range(num_elems)] cmd_line_inputs, input_vars = self._get_command_line_inputs(task.task_idx) @@ -940,8 +954,6 @@ def get_hpcflow_workflow(self): for i in cmd_group['meta']['from_tasks']]) ): iterate_groups.append(cmd_group_idx) - hf_data['command_groups'][cmd_group_idx].pop('meta', None) - # TODO: allow "meta" key in hpcflow command groups. hf_data.update({ 'loop': { @@ -950,6 +962,10 @@ def get_hpcflow_workflow(self): } }) + for cmd_group_idx, cmd_group in enumerate(hf_data['command_groups']): + # TODO: allow "meta" key in hpcflow command groups. + hf_data['command_groups'][cmd_group_idx].pop('meta', None) + if self.archive: hf_data.update({'archive_locations': {self.archive: self.archive_definition}}) From 3cadb0cca83303bf85affa0b388761315b59274a Mon Sep 17 00:00:00 2001 From: APlowman Date: Tue, 15 Dec 2020 14:07:52 +0000 Subject: [PATCH 23/29] Correct name -> alias --- matflow/models/workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 782c5ac..17a1b5c 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -1401,8 +1401,8 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): # Get inputs required for this file: in_map_inputs = { - input_name: element.get_input(input_name) - for input_name in in_map['inputs'] + input_alias: element.get_input(input_name) + for input_alias in in_map['inputs'] } file_path = task_elem_path.joinpath(in_map['file']) From bff44b1ab4041ee48e36c536ac76862ec48e3d90 Mon Sep 17 00:00:00 2001 From: APlowman Date: Tue, 15 Dec 2020 14:08:31 +0000 Subject: [PATCH 24/29] Fix correction --- matflow/models/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 17a1b5c..99d51b9 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -1401,7 +1401,7 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): # Get inputs required for this file: in_map_inputs = { - input_alias: element.get_input(input_name) + input_alias: element.get_input(input_alias) for input_alias in in_map['inputs'] } file_path = task_elem_path.joinpath(in_map['file']) From d740fae83d31e5c1af651c981c07726ddc559db0 Mon Sep 17 00:00:00 2001 From: APlowman Date: Tue, 15 Dec 2020 20:11:30 +0000 Subject: [PATCH 25/29] Better method naming --- matflow/models/workflow.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 99d51b9..78470c7 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -493,14 +493,14 @@ def _get_element_temp_output_path(self, task_idx, element_idx): return out @requires_path_exists - def _get_element_temp_prepare_path(self, task_idx, element_idx): + def _get_element_temp_array_prepare_path(self, task_idx, element_idx): task = self.tasks[task_idx] element_path = self.get_element_path(task_idx, element_idx) out = element_path.joinpath(f'task_prepare_{task.id}_element_{element_idx}.hdf5') return out @requires_path_exists - def _get_element_temp_process_path(self, task_idx, element_idx): + def _get_element_temp_array_process_path(self, task_idx, element_idx): task = self.tasks[task_idx] element_path = self.get_element_path(task_idx, element_idx) out = element_path.joinpath(f'task_process_{task.id}_element_{element_idx}.hdf5') @@ -1420,7 +1420,7 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): element.add_file(in_map['file'], value=file_dat) if is_array: - temp_path = self._get_element_temp_prepare_path(task_idx, element_idx) + temp_path = self._get_element_temp_array_prepare_path(task_idx, element_idx) dat = {'inputs': inputs_to_update, 'files': files_to_update} hickle.dump(dat, temp_path) @@ -1497,7 +1497,7 @@ def prepare_task(self, task_idx, iteration_idx, is_array=False): if is_array: - temp_path = self._get_element_temp_prepare_path( + temp_path = self._get_element_temp_array_prepare_path( task_idx, element.element_idx, ) @@ -1675,7 +1675,7 @@ def process_task_element(self, task_idx, element_idx, is_array=False): element.add_output(output_name, value=file_dat) if is_array: - temp_path = self._get_element_temp_process_path(task_idx, element_idx) + temp_path = self._get_element_temp_array_process_path(task_idx, element_idx) dat = {'outputs': outputs_to_update, 'files': files_to_update} hickle.dump(dat, temp_path) @@ -1710,7 +1710,7 @@ def process_task(self, task_idx, iteration_idx, is_array=False): if is_array: - temp_path = self._get_element_temp_process_path( + temp_path = self._get_element_temp_array_process_path( task_idx, element.element_idx, ) From 72d821be945beef19ef7afcb73487f1e4acfb162 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 16 Dec 2020 02:28:51 +0000 Subject: [PATCH 26/29] More iteration fixes --- matflow/api.py | 4 +- matflow/cli.py | 5 +- matflow/models/software.py | 6 ++- matflow/models/task.py | 20 ++++++-- matflow/models/workflow.py | 93 +++++++++++++++++++++++++++++++------- 5 files changed, 102 insertions(+), 26 deletions(-) diff --git a/matflow/api.py b/matflow/api.py index d19e6f5..4e9b207 100644 --- a/matflow/api.py +++ b/matflow/api.py @@ -145,11 +145,11 @@ def run_python_task(task_idx, element_idx, directory): workflow.run_python_task(task_idx, element_idx) -def prepare_sources(task_idx, directory): +def prepare_sources(task_idx, iteration_idx, directory): """Prepare source files.""" load_extensions() workflow = load_workflow(directory) - workflow.prepare_sources(task_idx) + workflow.prepare_sources(task_idx, iteration_idx) def append_schema_source(schema_source_path): diff --git a/matflow/cli.py b/matflow/cli.py index 2a1d44e..f5336f4 100644 --- a/matflow/cli.py +++ b/matflow/cli.py @@ -84,10 +84,11 @@ def run_python_task(task_idx, element_idx, directory=None): @cli.command() @click.option('--task-idx', '-t', type=click.INT, required=True) +@click.option('--iteration-idx', '-i', type=click.INT, required=True) @click.option('--directory', '-d', type=click.Path(exists=True)) -def prepare_sources(task_idx, directory=None): +def prepare_sources(task_idx, iteration_idx, directory=None): print('matflow.cli.prepare_sources', flush=True) - api.prepare_sources(task_idx, directory) + api.prepare_sources(task_idx, iteration_idx, directory) @cli.command() diff --git a/matflow/models/software.py b/matflow/models/software.py index 3feca28..d9eda42 100644 --- a/matflow/models/software.py +++ b/matflow/models/software.py @@ -24,7 +24,11 @@ def commands(self): return self._commands def get_formatted_commands(self, source_vars, sources_dir, task_idx): - out = [{'line': f'matflow prepare-sources --task-idx={task_idx}'}] + out = [{ + 'line': (f'matflow prepare-sources ' + f'--task-idx={task_idx} ' + f'--iteration-idx=$ITER_IDX') + }] if self.commands: for new_cmd in self.commands.splitlines(): new_cmd = new_cmd.replace('<>', sources_dir) diff --git a/matflow/models/task.py b/matflow/models/task.py index 7845973..21809f3 100644 --- a/matflow/models/task.py +++ b/matflow/models/task.py @@ -213,7 +213,14 @@ def inputs_condensed(self): def _validate_inputs_outputs(self): """Basic checks on inputs and outputs.""" - allowed_inp_specifiers = ['group', 'context', 'alias', 'file', 'default'] + allowed_inp_specifiers = [ + 'group', + 'context', + 'alias', + 'file', + 'default', + 'include_all_iterations', # inputs from all iterations sent to the input map? + ] req_inp_keys = ['name'] allowed_inp_keys = req_inp_keys + allowed_inp_specifiers allowed_inp_keys_fmt = ', '.join(['"{}"'.format(i) for i in allowed_inp_keys]) @@ -224,7 +231,12 @@ def _validate_inputs_outputs(self): # Normalise schema inputs: for inp_idx, inp in enumerate(self.inputs): - inp_defs = {'context': None, 'group': 'default', 'file': False} + inp_defs = { + 'context': None, + 'group': 'default', + 'file': False, + 'include_all_iterations': False, + } inp = get_specifier_dict(inp, name_key='name', defaults=inp_defs) for r in req_inp_keys: @@ -794,7 +806,7 @@ def get_prepare_task_commands(self, is_array=False): def get_prepare_task_element_commands(self, is_array=False): cmd = (f'matflow prepare-task-element --task-idx={self.task_idx} ' - f'--element-idx=$(($SGE_TASK_ID-1)) ' + f'--element-idx=$((($ITER_IDX * $SGE_TASK_LAST) + $SGE_TASK_ID - 1)) ' f'--directory={self.workflow.path}') cmd += f' --array' if is_array else '' cmds = [cmd] @@ -816,7 +828,7 @@ def get_process_task_commands(self, is_array=False): def get_process_task_element_commands(self, is_array=False): cmd = (f'matflow process-task-element --task-idx={self.task_idx} ' - f'--element-idx=$(($SGE_TASK_ID-1)) ' + f'--element-idx=$((($ITER_IDX * $SGE_TASK_LAST) + $SGE_TASK_ID - 1)) ' f'--directory={self.workflow.path}') cmd += f' --array' if is_array else '' cmds = [cmd] diff --git a/matflow/models/workflow.py b/matflow/models/workflow.py index 78470c7..1a1ba8b 100644 --- a/matflow/models/workflow.py +++ b/matflow/models/workflow.py @@ -736,7 +736,8 @@ def get_hpcflow_workflow(self): fmt_commands = [ { 'line': (f'matflow run-python-task --task-idx={task.task_idx} ' - f'--element-idx=$(($SGE_TASK_ID-1)) ' + f'--element-idx=' + f'$((($ITER_IDX * $SGE_TASK_LAST) + $SGE_TASK_ID - 1)) ' f'--directory={self.path}') } ] @@ -916,15 +917,16 @@ def get_hpcflow_workflow(self): 'archive_excludes': self.archive_excludes, }) - command_groups.append({ - 'directory': '.', - 'nesting': 'hold', - 'commands': ('matflow write-element-directories ' - '--iteration-idx=$(($ITER_IDX+1))'), - 'stats': False, - 'scheduler_options': self.iterate_run_options, - 'name': 'iterate', - }) + if self.num_iterations > 1 or self.iterate: + command_groups.append({ + 'directory': '.', + 'nesting': 'hold', + 'commands': ('matflow write-element-directories ' + '--iteration-idx=$(($ITER_IDX+1))'), + 'stats': False, + 'scheduler_options': self.iterate_run_options, + 'name': 'iterate', + }) hf_data = { 'parallel_modes': Config.get('parallel_modes'), @@ -1400,10 +1402,26 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): for in_map in task.schema.input_map: # Get inputs required for this file: - in_map_inputs = { - input_alias: element.get_input(input_alias) - for input_alias in in_map['inputs'] - } + in_map_inputs = {} + for input_alias in in_map['inputs']: + input_dict = task.schema.get_input_by_alias(input_alias) + if input_dict.get('include_all_iterations'): + # Collate elements from all iterations. Need to get all elements at + # the same relative position within the iteration as this one: + all_iter_elems = self.get_elements_from_all_iterations( + task_idx, + element_idx, + up_to_current=True, + ) + in_map_inputs.update({ + input_alias: { + f'iteration_{iter_idx}': elem.get_input(input_alias) + for iter_idx, elem in enumerate(all_iter_elems) + } + }) + else: + in_map_inputs.update({input_alias: element.get_input(input_alias)}) + file_path = task_elem_path.joinpath(in_map['file']) # Run input map to generate required input files: @@ -1425,12 +1443,16 @@ def prepare_task_element(self, task_idx, element_idx, is_array=False): hickle.dump(dat, temp_path) @requires_path_exists - def prepare_sources(self, task_idx): + def prepare_sources(self, task_idx, iteration_idx): """Prepare source files for the task preparation commands.""" # Note: in future, we might want to parametrise the source function, which is # why we delay its invocation until task run time. + if iteration_idx > 0: + # Source files need to be generated only once per workflow (currently). + return + task = self.tasks[task_idx] if not task.software_instance.requires_sources: @@ -1440,8 +1462,6 @@ def prepare_sources(self, task_idx): source_func = source_map['func'] source_files = source_func() - print(f'source_files:\n{source_files}') - expected_src_vars = set(source_map['sources'].keys()) returned_src_vars = set(source_files.keys()) bad_keys = returned_src_vars - expected_src_vars @@ -2176,3 +2196,42 @@ def get_iteration_task_pathway(self, parameter_name): } return out + + def get_elements_from_all_iterations(self, task_idx, element_idx, up_to_current=True): + """ + Get equivalent elements from all iterations. + + Parameters + ---------- + task_idx : int + element_idx : int + up_to_current : bool, optional + If True, only return elements from iterations up to and including the + iteration of the given element. If False, return elements from all iterations. + + Returns + ------- + list of Element + + """ + + elems_idx_i = self.elements_idx[task_idx] + + iter_idx_bool = np.zeros_like(elems_idx_i['iteration_idx'], dtype=bool) + iter_idx_bool[element_idx] = True + iter_idx_reshape = np.array(iter_idx_bool).reshape( + (elems_idx_i['num_iterations'], + elems_idx_i['num_elements_per_iteration']) + ) + + current_iter, idx_within_iteration = [i[0] for i in np.where(iter_idx_reshape)] + if not up_to_current: + iter_idx_reshape[:, idx_within_iteration] = True + else: + for i in range(current_iter): + iter_idx_reshape[i, idx_within_iteration] = True + + all_elem_idx = np.where(iter_idx_reshape.flatten())[0] + ell_elems = [self.tasks[task_idx].elements[i] for i in all_elem_idx] + + return ell_elems From b4ef88ce298ca4f64d4e626f226c2ff3a7d23953 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 16 Dec 2020 13:47:29 +0000 Subject: [PATCH 27/29] Set stats bool to False by default --- matflow/models/construction.py | 2 +- matflow/models/task.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/matflow/models/construction.py b/matflow/models/construction.py index 8fb3d8f..a4ccc72 100644 --- a/matflow/models/construction.py +++ b/matflow/models/construction.py @@ -744,7 +744,7 @@ def validate_task_dict(task, is_from_file, all_software, all_task_schemas, def_keys = { 'run_options': {}, - 'stats': True, + 'stats': False, 'base': None, 'sequences': None, 'repeats': 1, diff --git a/matflow/models/task.py b/matflow/models/task.py index 21809f3..b81f910 100644 --- a/matflow/models/task.py +++ b/matflow/models/task.py @@ -537,7 +537,7 @@ class Task(object): def __init__(self, workflow, name, method, software_instance, prepare_software_instance, process_software_instance, task_idx, run_options=None, prepare_run_options=None, process_run_options=None, - status=None, stats=True, context='', local_inputs=None, schema=None, + status=None, stats=False, context='', local_inputs=None, schema=None, resource_usage=None, base=None, sequences=None, repeats=None, groups=None, nest=None, merge_priority=None, output_map_options=None, command_pathway_idx=None): From b5f4b0dbe4726ddf9a7ee6a687e86b3c2855b4e7 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 16 Dec 2020 15:36:21 +0000 Subject: [PATCH 28/29] Bump hpcflow to v0.1.12 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e0d9eb5..c2282db 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ def get_long_description(): }, install_requires=[ 'matflow-demo-extension', - 'hpcflow>=0.1.10', + 'hpcflow>=0.1.12', 'click>7.0', 'hickle>=4.0.1', 'ruamel.yaml', From 278a9e360d7e0c40d9773819eb37efba17a9a8a6 Mon Sep 17 00:00:00 2001 From: APlowman Date: Wed, 16 Dec 2020 15:37:41 +0000 Subject: [PATCH 29/29] Prep for v0.2.12 --- CHANGELOG.md | 22 +++++++++++++++++++--- matflow/_version.py | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c069f9..b178e65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,16 +1,32 @@ # Change Log -## [0.x.xx] - xxxx.xx.xx +## [0.2.12] - 2020.12.16 ### Added -- Figures definitions -- Workflow metadata attribute for storing arbitrary metadata +- Add `Workflow.figures` attribute for storing associated figure definitions. +- Add `Workflow.metadata` attribute for storing arbitrary metadata (will later be used for Zenodo archiving). +- Add various `Workflow` static methods to help with retrieving information in the viewer without loading the whole workflow via `hickle`. +- Add `get_task_schemas` to API to load the available task schemas without generating a workflow. +- Add `refresh` bool parameter to `Config.set_config`, to force a reload of the configuration. +- Support inputs as dependencies as well as outputs. +- Support "parameter modifying" tasks (a task which outputs a parameter that is also an input to that task). +- Add `iterate_run_options` to Workflow. +- Add new methods for finding dependent and dependency tasks/parameters, upstream/downstream parameter values associated with a given element. +- Add input option: `include_all_iterations`. If True, inputs from all iterations are passed to input map functions. ### Fixed - Only save input/output map files if they exist! - Fix bug in propagating groups correctly +- Various code formatting issues +- Fix failure to raise on invalid schemas. +- Fix bug when the same file is to be saved from multiple output maps. + +### Changed +- Redo task sorting algorithm such that minimal ordering changes are made. +- Set `stats` bool to False by default. +- Bump hpcflow version to v0.1.12. ## [0.2.11] - 2020.09.29 diff --git a/matflow/_version.py b/matflow/_version.py index c5b683c..51e2f03 100644 --- a/matflow/_version.py +++ b/matflow/_version.py @@ -1 +1 @@ -__version__ = '0.2.11' +__version__ = '0.2.12'