Skip to content

Commit

Permalink
fix #3427
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Aug 26, 2024
1 parent 93f5327 commit 5a697a5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
11 changes: 11 additions & 0 deletions qiita_db/software.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,9 +1995,20 @@ def graph(self):
qdb.sql_connection.TRN.add(sql, [self.id])
db_edges = qdb.sql_connection.TRN.execute_fetchindex()

# let's track what nodes are actually being used so if they do not
# have an edge we still return them as part of the graph
used_nodes = nodes.copy()
for edge_id, p_id, c_id in db_edges:
e = DefaultWorkflowEdge(edge_id)
g.add_edge(nodes[p_id], nodes[c_id], connections=e)
if p_id in used_nodes:
del used_nodes[p_id]
if c_id in used_nodes:
del used_nodes[c_id]
# adding the missing nodes
for ms in used_nodes:
g.add_node(nodes[ms])

return g

@property
Expand Down
62 changes: 41 additions & 21 deletions qiita_db/test/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,8 @@ def test_descendants_with_jobs(self):
'"phred_offset": "auto"}')
params = qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str)
user = qdb.user.User('test@foo.bar')
wf = qdb.processing_job.ProcessingWorkflow.from_scratch(
user, params, name='Test WF')
qdb.user.User('test@foo.bar'), params, name='Test WF')
parent = list(wf.graph.nodes())[0]
wf.add(qdb.software.DefaultParameters(10),
connections={parent: {'demultiplexed': 'input_data'}})
Expand Down Expand Up @@ -699,6 +698,8 @@ def setUp(self):

self._clean_up_files.extend([self.fwd, self.rev])

self.user = qdb.user.User('test@foo.bar')

def tearDown(self):
for f in self._clean_up_files:
if exists(f):
Expand Down Expand Up @@ -1039,7 +1040,7 @@ def test_delete_in_construction_job(self):
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
'"phred_offset": ""}' % test.id)
qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
self.user,
qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str))
uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1],
Expand All @@ -1064,7 +1065,7 @@ def test_delete_error_running_job(self):
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
'"phred_offset": ""}' % test.id)
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
self.user,
qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str))
job._set_status('running')
Expand Down Expand Up @@ -1147,7 +1148,7 @@ def test_delete_with_jobs(self):
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
'"phred_offset": ""}' % test.id)
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
self.user,
qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str))
job._set_status('success')
Expand Down Expand Up @@ -1177,8 +1178,7 @@ def test_being_deleted_by(self):
cmd = qiita_plugin.get_command('delete_artifact')
params = qdb.software.Parameters.load(
cmd, values_dict={'artifact': test.id})
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params, True)
job = qdb.processing_job.ProcessingJob.create(self.user, params, True)
job._set_status('running')

# verifying that there is a job and is the same than above
Expand All @@ -1189,8 +1189,7 @@ def test_being_deleted_by(self):
self.assertIsNone(test.being_deleted_by)

# now, let's actually remove
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params, True)
job = qdb.processing_job.ProcessingJob.create(self.user, params, True)
job.submit()
# let's wait for job
wait_for_processing_job(job.id)
Expand All @@ -1207,7 +1206,7 @@ def test_delete_as_output_job(self):
data = {'OTU table': {'filepaths': [(fp, 'biom')],
'artifact_type': 'BIOM'}}
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
self.user,
qdb.software.Parameters.load(
qdb.software.Command.get_validator('BIOM'),
values_dict={'files': dumps({'biom': [fp]}),
Expand Down Expand Up @@ -1448,29 +1447,50 @@ def test_descendants_with_jobs(self):
data_type="16S")
self.assertEqual(len(a.analysis.artifacts), 3)
# 3. add jobs conencting the new artifact to the other root
# - currently:
# a -> job -> b
# c
# job1 connects b & c
# job2 connects a & c
# - expected:
# a --> job -> b
# |-> job2 -> out
# ^
# |-----|---> job1 -> out
# c ------------|
cmd = qdb.software.Command.create(
qdb.software.Software(1),
"CommandWithMultipleInputs", "", {
'input_b': ['artifact:["BIOM"]', None],
'input_c': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'})
params = qdb.software.Parameters.load(
cmd, values_dict={'input_b': a.children[0].id, 'input_c': c.id})
job1 = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params)
'input_x': ['artifact:["BIOM"]', None],
'input_y': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'})
params = qdb.software.Parameters.load(
cmd, values_dict={'input_b': a.id, 'input_c': c.id})
job2 = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params)
cmd, values_dict={'input_x': a.children[0].id, 'input_y': c.id})
wf = qdb.processing_job.ProcessingWorkflow.from_scratch(
self.user, params, name='Test WF')
job1 = list(wf.graph.nodes())[0]

cmd_dp = qdb.software.DefaultParameters.create("", cmd)
wf.add(cmd_dp, req_params={'input_x': a.id, 'input_y': c.id})
job2 = list(wf.graph.nodes())[1]
jobs = [j[1] for e in a.descendants_with_jobs.edges
for j in e if j[0] == 'job']
self.assertIn(job1, jobs)
self.assertIn(job2, jobs)

# 4. add job3 connecting job2 output with c as inputs
# - expected:
# a --> job -> b
# |-> job2 -> out -> job3 -> out
# ^ ^
# | |
# | |
# |-----|---> job1 -> out
# c ------------|
wf.add(cmd_dp, connections={
job1: {'out': 'input_x'}, job2: {'out': 'input_y'}})
job3 = list(wf.graph.nodes())[2]
jobs = [j[1] for e in a.descendants_with_jobs.edges
for j in e if j[0] == 'job']
self.assertIn(job3, jobs)


@qiita_test_checker()
class ArtifactArchiveTests(TestCase):
Expand Down
44 changes: 40 additions & 4 deletions qiita_pet/handlers/software.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def _default_parameters_parsing(node):
# getting the main default parameters
nodes = []
edges = []
at = w.artifact_type

# first get edges as this will give us the main connected commands
# and their order
Expand All @@ -72,18 +73,22 @@ def _default_parameters_parsing(node):
# output_type: output_node_name}, ...}
# for easy look up and merge of output_names
main_nodes = dict()
not_used_nodes = {n.id: n for n in graph.nodes}
for i, (x, y) in enumerate(graph.edges):
if x.id in not_used_nodes:
del not_used_nodes[x.id]
if y.id in not_used_nodes:
del not_used_nodes[y.id]
vals_x, input_x, output_x = _default_parameters_parsing(x)
vals_y, input_y, output_y = _default_parameters_parsing(y)

connections = []
for a, _, c in graph[x][y]['connections'].connections:
connections.append("%s | %s" % (a, c))

vals_x, input_x, output_x = _default_parameters_parsing(x)
vals_y, input_y, output_y = _default_parameters_parsing(y)

if i == 0:
# we are in the first element so we can specifically select
# the type we are looking for
at = w.artifact_type
if at in input_x[0][1]:
input_x[0][1] = at
else:
Expand Down Expand Up @@ -144,6 +149,37 @@ def _default_parameters_parsing(node):

wparams = w.parameters

# adding nodes without edges
# as a first step if not_used_nodes is not empty we'll confirm that
# nodes/edges are empty; in theory we should never hit this
if not_used_nodes and (nodes or edges):
raise ValueError(
'Error, please check your workflow configuration')

# note that this block is similar but not identical to adding connected
# nodes
for i, (_, x) in enumerate(not_used_nodes.items()):
vals_x, input_x, output_x = _default_parameters_parsing(x)
if at in input_x[0][1]:
input_x[0][1] = at
else:
input_x[0][1] = '** WARNING, NOT DEFINED **'

name_x = vals_x[0]
if vals_x not in (nodes):
nodes.append(vals_x)
for a, b in input_x:
if b in inputs:
name = inputs[b]
else:
name = 'input_%s_%s' % (name_x, b)
nodes.append([name, a, b])
edges.append([name, vals_x[0]])
for a, b in output_x:
name = 'output_%s_%s' % (name_x, b)
nodes.append([name, a, b])
edges.append([name_x, name])

workflows.append(
{'name': w.name, 'id': w.id, 'data_types': w.data_type,
'description': w.description, 'active': w.active,
Expand Down

0 comments on commit 5a697a5

Please sign in to comment.