Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add default-workflow to future artifact - SPP #3307

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions qiita_db/handlers/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def post(self):
atype = self.get_argument('artifact_type')
aname = self.get_argument('command_artifact_name', 'Name')
files = self.get_argument('files')
add_default_workflow = self.get_argument('add_default_workflow', False)

if job_id is None and prep_id is None:
raise HTTPError(
Expand Down Expand Up @@ -314,8 +315,18 @@ def post(self):
values['template'] = prep_id
cmd = qdb.software.Command.get_validator(atype)
params = qdb.software.Parameters.load(cmd, values_dict=values)
new_job = PJ.create(user, params, True)
new_job.submit()
if add_default_workflow:
pwk = qdb.processing_job.ProcessingWorkflow.from_scratch(
user, params, name=f'ProcessingWorkflow for {job_id}')
# the new job is the first job in the workflow
new_job = list(pwk.graph.nodes())[0]
# adding default pipeline to the preparation
pt = qdb.metadata_template.prep_template.PrepTemplate(prep_id)
pt.add_default_workflow(user, pwk)
pwk.submit()
else:
new_job = PJ.create(user, params, True)
new_job.submit()

r_client.set('prep_template_%d' % prep_id,
dumps({'job_id': new_job.id, 'is_qiita_job': True}))
Expand Down
52 changes: 52 additions & 0 deletions qiita_db/handlers/tests/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,58 @@ def test_post(self):
sleep(0.5)
self.assertIsNotNone(new_prep.artifact)

def test_post_insert_artifact_and_add_default_processing(self):
# now let's test adding an artifact + default processing to a new
# preparation
new_prep = qdb.metadata_template.prep_template.PrepTemplate.create(
pd.DataFrame({'new_col': {'1.SKB1.640202': 1,
'1.SKD3.640198': 2,
'1.SKM4.640180': 3}}),
qdb.study.Study(1), '16S')

# creating the fastq files to be added
fd, fp1 = mkstemp(suffix='_seqs.fastq')
close(fd)
self._clean_up_files.append(fp1)
with open(fp1, 'w') as f:
f.write("@HWI-ST753:189:D1385ACXX:1:1101:1214:1906 1:N:0:\n"
"NACGTAGGGTGCAAGCGTTGTCCGGAATNA\n"
"+\n"
"#1=DDFFFHHHHHJJJJJJJJJJJJGII#0\n")

fd, fp2 = mkstemp(suffix='_barcodes.fastq')
close(fd)
self._clean_up_files.append(fp2)
with open(fp2, 'w') as f:
f.write("@HWI-ST753:189:D1385ACXX:1:1101:1214:1906 2:N:0:\n"
"NNNCNNNNNNNNN\n"
"+\n"
"#############\n")

data = {'user_email': 'demo@microbio.me',
'artifact_type': 'FASTQ',
'prep_id': new_prep.id,
'files': dumps([(fp1, 'raw_forward_seqs'),
(fp2, 'raw_barcodes')]),
'add_default_workflow': False}
obs = self.post('/qiita_db/artifact/', headers=self.header, data=data)
self.assertEqual(obs.code, 200)
jid = loads(obs.body)['job_id']
# if we got to this point, then we should have a job and that job
# should have children jobs (generated by the default workflow)
job = qdb.processing_job.ProcessingJob(jid)
children = [c.command.name for c in job.children]
grandchildren = [gc.command.name for c in job.children
for gc in c.children]
self.assertEqual('Validate', job.command.name)
self.assertEqual(['Split libraries FASTQ'], children)
self.assertEqual(['Pick closed-reference OTUs'], grandchildren)

# just to avoid any tentative issues, let's wait for the main job to
# finish
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
while job.status not in ('error', 'success'):
sleep(0.5)


if __name__ == '__main__':
main()
46 changes: 37 additions & 9 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,13 +726,15 @@ def modification_timestamp(self):
def max_samples():
return qdb.util.max_preparation_samples()

def add_default_workflow(self, user):
"""The modification timestamp of the prep information
def add_default_workflow(self, user, workflow=None):
antgonza marked this conversation as resolved.
Show resolved Hide resolved
"""Adds the commands of the default workflow to this preparation

Parameters
----------
user : qiita_db.user.User
The user that requested to add the default workflows
workflow : qiita_db.processing_job.ProcessingWorkflow, optional
The workflow to add the default processing

Returns
-------
Expand All @@ -745,6 +747,13 @@ def add_default_workflow(self, user):
a. If this preparation doesn't have valid workflows
b. This preparation has been fully processed (no new steps needed)
c. If there is no valid initial artifact to start the workflow

Notes
-----
This method adds the commands in a default workflow (definition) to
the preparation, if a workflow (object) is passed it will add the
commands to the last artifact in that workflow but if it's None it will
create a new workflow (default)
"""
# helper functions to avoid duplication of code

Expand Down Expand Up @@ -806,9 +815,14 @@ def _get_predecessors(workflow, node):
# workflow

# 1.
prep_jobs = [j for c in self.artifact.descendants.nodes()
for j in c.jobs(show_hidden=True)
if j.command.software.type == 'artifact transformation']
# let's assume that if there is a workflow, there are no jobs
if workflow is not None:
prep_jobs = []
else:
prep_jobs = [j for c in self.artifact.descendants.nodes()
for j in c.jobs(show_hidden=True)
if j.command.software.type ==
'artifact transformation']
merging_schemes = {
qdb.archive.Archive.get_merging_scheme_from_job(j): {
x: y.id for x, y in j.outputs.items()}
Expand All @@ -821,7 +835,14 @@ def _get_predecessors(workflow, node):

# 2.
pt_dt = self.data_type()
pt_artifact = self.artifact.artifact_type
# if there is a workflow, we would need to get the artifact_type from
# the job
if workflow is not None:
starting_job = list(workflow.graph.nodes())[0]
pt_artifact = starting_job.parameters.values['artifact_type']
else:
starting_job = None
pt_artifact = self.artifact.artifact_type
workflows = [wk for wk in qdb.software.DefaultWorkflow.iter()
if wk.artifact_type == pt_artifact and
pt_dt in wk.data_type]
Expand All @@ -846,7 +867,6 @@ def _get_predecessors(workflow, node):
raise ValueError('This preparation is complete')

# 3.
workflow = None
for wk, wk_data in missing_artifacts.items():
previous_jobs = dict()
for ma, node in wk_data.items():
Expand Down Expand Up @@ -886,12 +906,20 @@ def _get_predecessors(workflow, node):

cmds_to_create.append([pdp_cmd, params, reqp])

init_artifacts = {wkartifact_type: self.artifact.id}
if starting_job is not None:
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}

cmds_to_create.reverse()
current_job = None
for i, (cmd, params, rp) in enumerate(cmds_to_create):
previous_job = current_job
if starting_job is not None:
previous_job = starting_job
starting_job = None
else:
previous_job = current_job
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
Expand Down
Loading