From e34df41c34e4ccd13dd9f0965cb8ec34e5368fdb Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Mon, 26 Aug 2024 15:42:58 -0600 Subject: [PATCH] fix #3427 (#3428) --- qiita_db/software.py | 11 ++++++ qiita_db/test/test_artifact.py | 62 ++++++++++++++++++++++------------ qiita_pet/handlers/software.py | 44 +++++++++++++++++++++--- 3 files changed, 92 insertions(+), 25 deletions(-) diff --git a/qiita_db/software.py b/qiita_db/software.py index 8b27078a3..fee35a21b 100644 --- a/qiita_db/software.py +++ b/qiita_db/software.py @@ -1995,9 +1995,20 @@ def graph(self): qdb.sql_connection.TRN.add(sql, [self.id]) db_edges = qdb.sql_connection.TRN.execute_fetchindex() + # let's track what nodes are actually being used so if they do not + # have an edge we still return them as part of the graph + used_nodes = nodes.copy() for edge_id, p_id, c_id in db_edges: e = DefaultWorkflowEdge(edge_id) g.add_edge(nodes[p_id], nodes[c_id], connections=e) + if p_id in used_nodes: + del used_nodes[p_id] + if c_id in used_nodes: + del used_nodes[c_id] + # adding the missing nodes + for ms in used_nodes: + g.add_node(nodes[ms]) + return g @property diff --git a/qiita_db/test/test_artifact.py b/qiita_db/test/test_artifact.py index 789d7ffb2..76833ed98 100644 --- a/qiita_db/test/test_artifact.py +++ b/qiita_db/test/test_artifact.py @@ -404,9 +404,8 @@ def test_descendants_with_jobs(self): '"phred_offset": "auto"}') params = qdb.software.Parameters.load(qdb.software.Command(1), json_str=json_str) - user = qdb.user.User('test@foo.bar') wf = qdb.processing_job.ProcessingWorkflow.from_scratch( - user, params, name='Test WF') + qdb.user.User('test@foo.bar'), params, name='Test WF') parent = list(wf.graph.nodes())[0] wf.add(qdb.software.DefaultParameters(10), connections={parent: {'demultiplexed': 'input_data'}}) @@ -699,6 +698,8 @@ def setUp(self): self._clean_up_files.extend([self.fwd, self.rev]) + self.user = qdb.user.User('test@foo.bar') + def tearDown(self): for f in self._clean_up_files: if exists(f): @@ -1039,7 +1040,7 @@ def test_delete_in_construction_job(self): '"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, ' '"phred_offset": ""}' % test.id) qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), + self.user, qdb.software.Parameters.load(qdb.software.Command(1), json_str=json_str)) uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1], @@ -1064,7 +1065,7 @@ def test_delete_error_running_job(self): '"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, ' '"phred_offset": ""}' % test.id) job = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), + self.user, qdb.software.Parameters.load(qdb.software.Command(1), json_str=json_str)) job._set_status('running') @@ -1147,7 +1148,7 @@ def test_delete_with_jobs(self): '"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, ' '"phred_offset": ""}' % test.id) job = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), + self.user, qdb.software.Parameters.load(qdb.software.Command(1), json_str=json_str)) job._set_status('success') @@ -1177,8 +1178,7 @@ def test_being_deleted_by(self): cmd = qiita_plugin.get_command('delete_artifact') params = qdb.software.Parameters.load( cmd, values_dict={'artifact': test.id}) - job = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), params, True) + job = qdb.processing_job.ProcessingJob.create(self.user, params, True) job._set_status('running') # verifying that there is a job and is the same than above @@ -1189,8 +1189,7 @@ def test_being_deleted_by(self): self.assertIsNone(test.being_deleted_by) # now, let's actually remove - job = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), params, True) + job = qdb.processing_job.ProcessingJob.create(self.user, params, True) job.submit() # let's wait for job wait_for_processing_job(job.id) @@ -1207,7 +1206,7 @@ def test_delete_as_output_job(self): data = {'OTU table': {'filepaths': [(fp, 'biom')], 'artifact_type': 'BIOM'}} job = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), + self.user, qdb.software.Parameters.load( qdb.software.Command.get_validator('BIOM'), values_dict={'files': dumps({'biom': [fp]}), @@ -1448,29 +1447,50 @@ def test_descendants_with_jobs(self): data_type="16S") self.assertEqual(len(a.analysis.artifacts), 3) # 3. add jobs conencting the new artifact to the other root + # - currently: # a -> job -> b # c - # job1 connects b & c - # job2 connects a & c + # - expected: + # a --> job -> b + # |-> job2 -> out + # ^ + # |-----|---> job1 -> out + # c ------------| cmd = qdb.software.Command.create( qdb.software.Software(1), "CommandWithMultipleInputs", "", { - 'input_b': ['artifact:["BIOM"]', None], - 'input_c': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'}) - params = qdb.software.Parameters.load( - cmd, values_dict={'input_b': a.children[0].id, 'input_c': c.id}) - job1 = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), params) + 'input_x': ['artifact:["BIOM"]', None], + 'input_y': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'}) params = qdb.software.Parameters.load( - cmd, values_dict={'input_b': a.id, 'input_c': c.id}) - job2 = qdb.processing_job.ProcessingJob.create( - qdb.user.User('test@foo.bar'), params) + cmd, values_dict={'input_x': a.children[0].id, 'input_y': c.id}) + wf = qdb.processing_job.ProcessingWorkflow.from_scratch( + self.user, params, name='Test WF') + job1 = list(wf.graph.nodes())[0] + cmd_dp = qdb.software.DefaultParameters.create("", cmd) + wf.add(cmd_dp, req_params={'input_x': a.id, 'input_y': c.id}) + job2 = list(wf.graph.nodes())[1] jobs = [j[1] for e in a.descendants_with_jobs.edges for j in e if j[0] == 'job'] self.assertIn(job1, jobs) self.assertIn(job2, jobs) + # 4. add job3 connecting job2 output with c as inputs + # - expected: + # a --> job -> b + # |-> job2 -> out -> job3 -> out + # ^ ^ + # | | + # | | + # |-----|---> job1 -> out + # c ------------| + wf.add(cmd_dp, connections={ + job1: {'out': 'input_x'}, job2: {'out': 'input_y'}}) + job3 = list(wf.graph.nodes())[2] + jobs = [j[1] for e in a.descendants_with_jobs.edges + for j in e if j[0] == 'job'] + self.assertIn(job3, jobs) + @qiita_test_checker() class ArtifactArchiveTests(TestCase): diff --git a/qiita_pet/handlers/software.py b/qiita_pet/handlers/software.py index 7e4ec9afd..54526a3d0 100644 --- a/qiita_pet/handlers/software.py +++ b/qiita_pet/handlers/software.py @@ -61,6 +61,7 @@ def _default_parameters_parsing(node): # getting the main default parameters nodes = [] edges = [] + at = w.artifact_type # first get edges as this will give us the main connected commands # and their order @@ -72,18 +73,22 @@ def _default_parameters_parsing(node): # output_type: output_node_name}, ...} # for easy look up and merge of output_names main_nodes = dict() + not_used_nodes = {n.id: n for n in graph.nodes} for i, (x, y) in enumerate(graph.edges): + if x.id in not_used_nodes: + del not_used_nodes[x.id] + if y.id in not_used_nodes: + del not_used_nodes[y.id] + vals_x, input_x, output_x = _default_parameters_parsing(x) + vals_y, input_y, output_y = _default_parameters_parsing(y) + connections = [] for a, _, c in graph[x][y]['connections'].connections: connections.append("%s | %s" % (a, c)) - vals_x, input_x, output_x = _default_parameters_parsing(x) - vals_y, input_y, output_y = _default_parameters_parsing(y) - if i == 0: # we are in the first element so we can specifically select # the type we are looking for - at = w.artifact_type if at in input_x[0][1]: input_x[0][1] = at else: @@ -144,6 +149,37 @@ def _default_parameters_parsing(node): wparams = w.parameters + # adding nodes without edges + # as a first step if not_used_nodes is not empty we'll confirm that + # nodes/edges are empty; in theory we should never hit this + if not_used_nodes and (nodes or edges): + raise ValueError( + 'Error, please check your workflow configuration') + + # note that this block is similar but not identical to adding connected + # nodes + for i, (_, x) in enumerate(not_used_nodes.items()): + vals_x, input_x, output_x = _default_parameters_parsing(x) + if at in input_x[0][1]: + input_x[0][1] = at + else: + input_x[0][1] = '** WARNING, NOT DEFINED **' + + name_x = vals_x[0] + if vals_x not in (nodes): + nodes.append(vals_x) + for a, b in input_x: + if b in inputs: + name = inputs[b] + else: + name = 'input_%s_%s' % (name_x, b) + nodes.append([name, a, b]) + edges.append([name, vals_x[0]]) + for a, b in output_x: + name = 'output_%s_%s' % (name_x, b) + nodes.append([name, a, b]) + edges.append([name_x, name]) + workflows.append( {'name': w.name, 'id': w.id, 'data_types': w.data_type, 'description': w.description, 'active': w.active,