diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index f69f162f1..19cf5dc5c 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -790,20 +790,22 @@ def _get_node_info(workflow, node): ccmd.name, ccmd.merging_scheme, parent_cmd_name, parent_merging_scheme, cparams, [], pparams) - def _get_predecessors(workflow, node): - # recursive method to get predecessors of a given node - pred = [] + def _get_previous_predecessors(workflow, node): + parents, cxns = [], [] for pnode in workflow.graph.predecessors(node): - pred = _get_predecessors(workflow, pnode) - cxns = {x[0]: x[2] - for x in workflow.graph.get_edge_data( - pnode, node)['connections'].connections} - data = [pnode, node, cxns] - if pred is None: - pred = [data] - else: - pred.append(data) - return pred + parents.append(pnode) + cxns.append( + {x[0]: x[1] for x in workflow.graph.get_edge_data( + pnode, node)['connections'].connections}) + return [parents, node, cxns] + + def _get_predecessors(workflow, node): + data = [_get_previous_predecessors(workflow, node)] + for pnode in data[0][0]: + d = _get_previous_predecessors(workflow, pnode) + if d[0]: + data.append(d) + return data # Note: we are going to use the final BIOMs to figure out which # processing is missing from the back/end to the front, as this @@ -913,24 +915,30 @@ def _get_predecessors(workflow, node): previous_jobs = dict() for ma, node in wk_data.items(): predecessors = _get_predecessors(wk, node) - predecessors.reverse() cmds_to_create = [] init_artifacts = None - for i, (pnode, cnode, cxns) in enumerate(predecessors): + for i, (pnodes, cnode, cxns) in enumerate(predecessors): cdp = cnode.default_parameter cdp_cmd = cdp.command params = cdp.values.copy() - icxns = {y: x for x, y in cxns.items()} - reqp = {x: icxns[y[1][0]] + # note that this assume that commands cannot have + # multiple inputs with the same name, which should + # be a safe assumption + for pnode, cxn in zip(pnodes, cxns): + info = _get_node_info(wk, pnode) + if init_artifacts is None: + init_artifacts = dict() + for k, v in merging_schemes[info].items(): + if k in cxn: + k = cxn[k] + init_artifacts[k] = v + + reqp = {x: y[1][0] for x, y in cdp_cmd.required_parameters.items()} + cmds_to_create.append([cdp_cmd, params, reqp]) - info = _get_node_info(wk, pnode) - if info in merging_schemes: - if set(merging_schemes[info]) >= set(cxns): - init_artifacts = merging_schemes[info] - break if init_artifacts is None: pdp = pnode.default_parameter pdp_cmd = pdp.command @@ -965,15 +973,15 @@ def _get_predecessors(workflow, node): previous_job = current_job if previous_job is None: req_params = dict() - for iname, dname in rp.items(): - if dname not in init_artifacts: - msg = (f'Missing Artifact type: "{dname}" in ' + for aname, aid in rp.items(): + if aname not in init_artifacts: + msg = (f'Missing Artifact type: "{aname}" in ' 'this preparation; this might be due ' 'to missing steps or not having the ' 'correct raw data.') # raises option c. raise ValueError(msg) - req_params[iname] = init_artifacts[dname] + req_params[aname] = init_artifacts[aname] else: req_params = dict() connections = dict()