Skip to content

Commit

Permalink
Merge pull request #83 from LightForm-group/feat/flexible-param-modif…
Browse files Browse the repository at this point in the history
…y-tasks

Feat/flexible param modify tasks
  • Loading branch information
aplowman authored Feb 14, 2021
2 parents 03e6120 + 69a0f28 commit 15d3ead
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 42 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Fix issue [#82](https://github.com/LightForm-group/matflow/issues/82) where the default group is not defined in the `Workflow.element_idx` for tasks where no local inputs are defined.

### Added

- Add support for flexible positioning of parameter-modifying tasks ([#81](https://github.com/LightForm-group/matflow/issues/81))

## [0.2.16] - 2021.02.05

### Fixed
Expand Down
135 changes: 93 additions & 42 deletions matflow/models/construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import copy
from warnings import warn
from pprint import pprint
from itertools import product
import numpy as np
from hpcflow.scheduler import SunGridEngine

Expand Down Expand Up @@ -349,10 +350,7 @@ def get_input_dependency(task_info_lst, input_dict, input_task_idx):
param_task = task_info_lst[input_task_idx]

downstream_task_context = param_task['context']
downstream_task_name = param_task['name']

# Initially collate as a list of dependencies. Then apply some rules to find the
# preferred single dependency.
input_dependency = []

for task_idx, task_info in enumerate(task_info_lst):
Expand Down Expand Up @@ -398,22 +396,7 @@ def get_input_dependency(task_info_lst, input_dict, input_task_idx):
):
input_dependency[-1].update({'is_parameter_modifying_task': True})

if len(input_dependency) > 1:
# Only keep "parameter modifying tasks", because such task will be dependent on
# the "parameter generating" tasks:
input_dependency = [i for i in input_dependency
if i['is_parameter_modifying_task']]

# Make sure only a single dependency exists.
if len(input_dependency) > 1:
msg = (
f'Task input parameter "{param_name}" from task "{downstream_task_name}" with '
f'task context "{downstream_task_context}" is found as an input/output '
f'parameter for multiple tasks, which makes the task dependency ill-defined.'
)
raise IncompatibleWorkflow(msg)

return input_dependency[0] if input_dependency else {}
return input_dependency


def get_dependency_idx(task_info_lst):
Expand Down Expand Up @@ -459,7 +442,6 @@ def get_dependency_idx(task_info_lst):
'task_context': task_context,
'original_idx': task_idx,
'current_idx': task_idx,
'task_dependencies': [],
'parameter_dependencies': {},
}

Expand All @@ -475,20 +457,7 @@ def get_dependency_idx(task_info_lst):

add_input_dep = False
if is_locally_defined:

if input_dependency:
in_dep_task = task_info_lst[input_dependency['from_task']]
in_dep_task_name = in_dep_task['name']
in_dep_task_context = in_dep_task['context']
msg = (
f'Input parameter "{input_name}" for task "{task_name}" with '
f'task context "{task_context}" has both a local value and a '
f'non-local value. The non-local value is derived from an '
f'{input_dependency["dependency_type"]} of task '
f'"{in_dep_task_name}" with task context '
f'"{in_dep_task_context}". The local value will be used.'
)
warn(msg)
pass

elif input_dependency:
add_input_dep = True
Expand All @@ -513,12 +482,18 @@ def get_dependency_idx(task_info_lst):
dep_idx_i['parameter_dependencies'].update({
input_alias: input_dependency
})
task_dep_idx.append(input_dependency['from_task'])

dep_idx_i['task_dependencies'] = list(set(task_dep_idx))
dependency_idx.append(dep_idx_i)

# Check for circular dependencies in task inputs/outputs:
return dependency_idx


def check_direct_circular_dependencies(dependency_idx):
"""Check for direct circular dependencies. Does not check for indirect circular
dependencies.
"""

all_deps = []
for idx, deps in enumerate(dependency_idx):
for i in deps['task_dependencies']:
Expand All @@ -529,8 +504,6 @@ def get_dependency_idx(task_info_lst):
f'{dependency_idx}')
raise IncompatibleWorkflow(msg)

return dependency_idx


def find_good_task_dependency_position(dep_idx, task_dependencies):
seen_ids = []
Expand All @@ -541,6 +514,83 @@ def find_good_task_dependency_position(dep_idx, task_dependencies):
raise RuntimeError('No good position exists!')


def singularise_input_dependencies(dep_idx_multi):

# Enumerate all different "pathways" through the multiple parameter dependencies:
param_keys = []
param_vals = []
for idx, dep_idx in enumerate(dep_idx_multi):
for param_name, param_deps in dep_idx['parameter_dependencies'].items():
param_keys.append((idx, param_name))
param_vals.append(range(len(param_deps)))

param_val_pathways = list(product(*param_vals))

trial_dep_idx = []
for param_val_pathway_i in param_val_pathways:
dep_idx_pathway_i = copy.deepcopy(dep_idx_multi)
for param_key_idx, (task_idx, param_name) in enumerate(param_keys):
dep_idx_pathway_i[task_idx]['parameter_dependencies'][param_name] = (
dep_idx_pathway_i[task_idx]['parameter_dependencies'][param_name][
param_val_pathway_i[param_key_idx]
]
)
# Add task dependencies to each task:
for dep_idx in dep_idx_pathway_i:
dep_idx['task_dependencies'] = list(set([
p_val['from_task'] for p_val in dep_idx['parameter_dependencies'].values()
]))

try:
check_direct_circular_dependencies(dep_idx_pathway_i)
trial_dep_idx.append(dep_idx_pathway_i)
except IncompatibleWorkflow:
continue

# If multiple pathways, keep those that do not require reordering, if any exist:
trial_dep_idx_srt = []
no_reorder_exists = False
for pathway_i in trial_dep_idx:
try:
pathway_i_srt = sort_dependency_idx(pathway_i)

req_reorder = any([i['original_idx'] != i['current_idx']
for i in pathway_i_srt])
if not req_reorder:
no_reorder_exists = True
trial_dep_idx_srt.append((pathway_i_srt, req_reorder))
except RuntimeError:
# Due to indirect (or direct) circular dependencies
continue

if no_reorder_exists:
# Remove pathways that require reordering:
trial_dep_idx_srt = [i[0] for i in trial_dep_idx_srt if not i[1]]
else:
trial_dep_idx_srt = [i[0] for i in trial_dep_idx_srt]

if len(trial_dep_idx_srt) > 1:
# If still multiple pathways, let parameter-modifying tasks take precedence:
max_num_param_modifying_task_deps = 0
preferred_pathway_idx = 0
for pathway_idx, pathway_i in enumerate(trial_dep_idx_srt):
num_param_modifying_task_deps = sum([
v["is_parameter_modifying_task"]
for i in pathway_i
for k, v in i["parameter_dependencies"].items()
])
if num_param_modifying_task_deps > max_num_param_modifying_task_deps:
max_num_param_modifying_task_deps = num_param_modifying_task_deps
preferred_pathway_idx = pathway_idx

dep_idx_singular = trial_dep_idx_srt[preferred_pathway_idx]

else:
dep_idx_singular = trial_dep_idx_srt[0]

return dep_idx_singular


def sort_dependency_idx(dependency_idx):

dep_idx = copy.deepcopy(dependency_idx)
Expand Down Expand Up @@ -893,15 +943,16 @@ def order_tasks(task_lst):
task_lst[i_idx]['output_map_options'] = out_opts

dep_idx = get_dependency_idx(task_lst)
dep_idx_srt = sort_dependency_idx(dep_idx)
sort_idx = [i['original_idx'] for i in dep_idx_srt]
dep_idx_singled = singularise_input_dependencies(dep_idx)

sort_idx = [i['original_idx'] for i in dep_idx_singled]
task_lst_srt = [task_lst[i] for i in sort_idx]

# Add sorted task idx:
for idx, i in enumerate(task_lst_srt):
i['task_idx'] = idx

return task_lst_srt, dep_idx_srt
return task_lst_srt, dep_idx_singled


def set_default_nesting(task_lst, dep_idx):
Expand Down

0 comments on commit 15d3ead

Please sign in to comment.