Skip to content

Commit

Permalink
allowing for multiple inputs in workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Feb 19, 2024
1 parent a2ef905 commit ff33546
Showing 1 changed file with 34 additions and 26 deletions.
60 changes: 34 additions & 26 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,20 +790,22 @@ def _get_node_info(workflow, node):
ccmd.name, ccmd.merging_scheme, parent_cmd_name,
parent_merging_scheme, cparams, [], pparams)

def _get_predecessors(workflow, node):
# recursive method to get predecessors of a given node
pred = []
def _get_previous_predecessors(workflow, node):
parents, cxns = [], []
for pnode in workflow.graph.predecessors(node):
pred = _get_predecessors(workflow, pnode)
cxns = {x[0]: x[2]
for x in workflow.graph.get_edge_data(
pnode, node)['connections'].connections}
data = [pnode, node, cxns]
if pred is None:
pred = [data]
else:
pred.append(data)
return pred
parents.append(pnode)
cxns.append(
{x[0]: x[1] for x in workflow.graph.get_edge_data(
pnode, node)['connections'].connections})
return [parents, node, cxns]

def _get_predecessors(workflow, node):
data = [_get_previous_predecessors(workflow, node)]
for pnode in data[0][0]:
d = _get_previous_predecessors(workflow, pnode)
if d[0]:
data.append(d)
return data

# Note: we are going to use the final BIOMs to figure out which
# processing is missing from the back/end to the front, as this
Expand Down Expand Up @@ -913,24 +915,30 @@ def _get_predecessors(workflow, node):
previous_jobs = dict()
for ma, node in wk_data.items():
predecessors = _get_predecessors(wk, node)
predecessors.reverse()
cmds_to_create = []
init_artifacts = None
for i, (pnode, cnode, cxns) in enumerate(predecessors):
for i, (pnodes, cnode, cxns) in enumerate(predecessors):
cdp = cnode.default_parameter
cdp_cmd = cdp.command
params = cdp.values.copy()

icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
# note that this assume that commands cannot have
# multiple inputs with the same name, which should
# be a safe assumption
for pnode, cxn in zip(pnodes, cxns):
info = _get_node_info(wk, pnode)
if init_artifacts is None:
init_artifacts = dict()
for k, v in merging_schemes[info].items():
if k in cxn:
k = cxn[k]
init_artifacts[k] = v

reqp = {x: y[1][0]
for x, y in cdp_cmd.required_parameters.items()}

cmds_to_create.append([cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
if set(merging_schemes[info]) >= set(cxns):
init_artifacts = merging_schemes[info]
break
if init_artifacts is None:
pdp = pnode.default_parameter
pdp_cmd = pdp.command
Expand Down Expand Up @@ -965,15 +973,15 @@ def _get_predecessors(workflow, node):
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
for aname, aid in rp.items():
if aname not in init_artifacts:
msg = (f'Missing Artifact type: "{aname}" in '
'this preparation; this might be due '
'to missing steps or not having the '
'correct raw data.')
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
req_params[aname] = init_artifacts[aname]
else:
req_params = dict()
connections = dict()
Expand Down

0 comments on commit ff33546

Please sign in to comment.