From 1e483a77ffcb859b9f4ae2f4cb78cb51e9c87217 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Thu, 22 Feb 2024 17:34:40 -0700 Subject: [PATCH] add full workflow case --- qiita_db/metadata_template/prep_template.py | 52 ++++++++++++++----- .../test/test_prep_template.py | 40 ++++++++++++++ 2 files changed, 78 insertions(+), 14 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index d05493d3f..6a53d2149 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -794,16 +794,24 @@ def _get_predecessors(workflow, node): # recursive method to get predecessors of a given node pred = [] - for pnode in workflow.graph.predecessors(node): + parents = list(workflow.graph.predecessors(node)) + for pnode in parents: pred = _get_predecessors(workflow, pnode) cxns = {x[0]: x[2] for x in workflow.graph.get_edge_data( pnode, node)['connections'].connections} data = [pnode, node, cxns] if pred is None: - pred = [data] - else: - pred.append(data) + pred = [] + + # making sure that if the node has extra parents they are + # generated first + parents.remove(pnode) + if parents: + for pnode in parents: + pred.extend(_get_predecessors(workflow, pnode)) + + pred.append(data) return pred # Note: we are going to use the final BIOMs to figure out which @@ -920,7 +928,7 @@ def _get_predecessors(workflow, node): icxns = {y: x for x, y in cxns.items()} reqp = {x: icxns[y[1][0]] for x, y in cdp_cmd.required_parameters.items()} - cmds_to_create.append([cdp_cmd, params, reqp]) + cmds_to_create.append([cdp, cdp_cmd, params, reqp]) info = _get_node_info(wk, pnode) if info in merging_schemes: @@ -942,7 +950,7 @@ def _get_predecessors(workflow, node): 'be applied') reqp[x] = wkartifact_type - cmds_to_create.append([pdp_cmd, params, reqp]) + cmds_to_create.append([pdp, pdp_cmd, params, reqp]) if starting_job is not None: init_artifacts = { @@ -953,14 +961,16 @@ def _get_predecessors(workflow, node): cmds_to_create.reverse() current_job = None loop_starting_job = starting_job - for i, (cmd, params, rp) in enumerate(cmds_to_create): + previous_dps = dict() + for i, (dp, cmd, params, rp) in enumerate(cmds_to_create): if loop_starting_job is not None: previous_job = loop_starting_job loop_starting_job = None else: previous_job = current_job + + req_params = dict() if previous_job is None: - req_params = dict() for iname, dname in rp.items(): if dname not in init_artifacts: msg = (f'Missing Artifact type: "{dname}" in ' @@ -970,12 +980,25 @@ def _get_predecessors(workflow, node): # raises option c. raise ValueError(msg) req_params[iname] = init_artifacts[dname] + if len(dp.command.required_parameters) > 1: + raise ValueError('Not implemented') else: - req_params = dict() - connections = dict() - for iname, dname in rp.items(): - req_params[iname] = f'{previous_job.id}{dname}' - connections[dname] = iname + if len(dp.command.required_parameters) == 1: + cxns = dict() + for iname, dname in rp.items(): + req_params[iname] = f'{previous_job.id}{dname}' + cxns[dname] = iname + connections = {previous_job: cxns} + else: + GH = wk.graph + connections = dict() + for pn in GH.predecessors(node): + pndp = pn.default_parameter + n, cnx, _ = GH.get_edge_data( + pn, node)['connections'].connections[0] + _job = previous_dps[pndp.id] + req_params[cnx] = f'{_job.id}{n}' + connections[_job] = {n: cnx} params.update(req_params) job_params = qdb.software.Parameters.load( cmd, values_dict=params) @@ -997,8 +1020,9 @@ def _get_predecessors(workflow, node): else: current_job = workflow.add( job_params, req_params=req_params, - connections={previous_job: connections}) + connections=connections) previous_jobs[current_job] = params + previous_dps[dp.id] = current_job return workflow diff --git a/qiita_db/metadata_template/test/test_prep_template.py b/qiita_db/metadata_template/test/test_prep_template.py index 41ec15077..81769082d 100644 --- a/qiita_db/metadata_template/test/test_prep_template.py +++ b/qiita_db/metadata_template/test/test_prep_template.py @@ -1477,6 +1477,46 @@ def test_artifact_setter(self): "the parameters are the same as jobs"): pt.add_default_workflow(qdb.user.User('test@foo.bar')) + # Then, let's clean up again and add a new command/step with 2 + # BIOM input artifacts + for pj in wk.graph.nodes: + pj._set_error('Killed') + cmd = qdb.software.Command.create( + qdb.software.Software(1), "Multiple BIOM as inputs", "", { + 'req_artifact_1': ['artifact:["BIOM"]', None], + 'req_artifact_2': ['artifact:["BIOM"]', None], + }, outputs={'MB-output': 'BIOM'}) + cmd_dp = qdb.software.DefaultParameters.create("", cmd) + # creating the new node for the cmd and linking it's two inputs with + # two inputs + sql = f""" + INSERT INTO qiita.default_workflow_node ( + default_workflow_id, default_parameter_set_id) + VALUES (1, {cmd_dp.id}); + INSERT INTO qiita.default_workflow_edge ( + parent_id, child_id) + VALUES (8, 10); + INSERT INTO qiita.default_workflow_edge ( + parent_id, child_id) + VALUES (9, 10); + INSERT INTO qiita.default_workflow_edge_connections ( + default_workflow_edge_id, parent_output_id, child_input_id) + VALUES (6, 3, 99); + INSERT INTO qiita.default_workflow_edge_connections ( + default_workflow_edge_id, parent_output_id, child_input_id) + VALUES (7, 3, 100) + """ + qdb.sql_connection.perform_as_transaction(sql) + wk = pt.add_default_workflow(qdb.user.User('test@foo.bar')) + self.assertEqual(len(wk.graph.nodes), 6) + self.assertEqual(len(wk.graph.edges), 5) + self.assertCountEqual( + [x.command.name for x in wk.graph.nodes], + # we should have 2 split libraries and 3 close reference + ['Split libraries FASTQ', 'Split libraries FASTQ', + 'Pick closed-reference OTUs', 'Pick closed-reference OTUs', + 'Pick closed-reference OTUs', 'Multiple BIOM as inputs']) + # now let's test that an error is raised when there is no valid initial # input data; this moves the data type from FASTQ to taxa_summary for # the default_workflow_id = 1