From 1aec6b0014cd9d68e53f9fcc07e1a9da04aceb0c Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Tue, 20 Feb 2024 11:52:11 -0700 Subject: [PATCH 1/2] add rna_copy_counts (#3351) * add rna_copy_counts * RNA -> Calculate RNA * v != '*' * v != '*' : fix conditional * prep job only display if success * allowing for multiple inputs in workflow * fix error * just one element * rollback add_default_workflow * simplify add_default_workflow --- qiita_db/metadata_template/prep_template.py | 208 +++++++++--------- qiita_db/processing_job.py | 2 +- .../handlers/study_handlers/prep_template.py | 9 +- 3 files changed, 109 insertions(+), 110 deletions(-) diff --git a/qiita_db/metadata_template/prep_template.py b/qiita_db/metadata_template/prep_template.py index f39aaacb7..d05493d3f 100644 --- a/qiita_db/metadata_template/prep_template.py +++ b/qiita_db/metadata_template/prep_template.py @@ -793,6 +793,7 @@ def _get_node_info(workflow, node): def _get_predecessors(workflow, node): # recursive method to get predecessors of a given node pred = [] + for pnode in workflow.graph.predecessors(node): pred = _get_predecessors(workflow, pnode) cxns = {x[0]: x[2] @@ -864,7 +865,8 @@ def _get_predecessors(workflow, node): if wk_params['sample']: df = ST(self.study_id).to_dataframe(samples=list(self)) for k, v in wk_params['sample'].items(): - if k not in df.columns or v not in df[k].unique(): + if k not in df.columns or (v != '*' and v not in + df[k].unique()): reqs_satisfied = False else: total_conditions_satisfied += 1 @@ -872,7 +874,8 @@ def _get_predecessors(workflow, node): if wk_params['prep']: df = self.to_dataframe() for k, v in wk_params['prep'].items(): - if k not in df.columns or v not in df[k].unique(): + if k not in df.columns or (v != '*' and v not in + df[k].unique()): reqs_satisfied = False else: total_conditions_satisfied += 1 @@ -890,117 +893,112 @@ def _get_predecessors(workflow, node): # let's just keep one, let's give it preference to the one with the # most total_conditions_satisfied - workflows = sorted(workflows, key=lambda x: x[0], reverse=True)[:1] + _, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0] missing_artifacts = dict() - for _, wk in workflows: - missing_artifacts[wk] = dict() - for node, degree in wk.graph.out_degree(): - if degree != 0: - continue - mscheme = _get_node_info(wk, node) - if mscheme not in merging_schemes: - missing_artifacts[wk][mscheme] = node - if not missing_artifacts[wk]: - del missing_artifacts[wk] + for node, degree in wk.graph.out_degree(): + if degree != 0: + continue + mscheme = _get_node_info(wk, node) + if mscheme not in merging_schemes: + missing_artifacts[mscheme] = node if not missing_artifacts: # raises option b. raise ValueError('This preparation is complete') # 3. - for wk, wk_data in missing_artifacts.items(): - previous_jobs = dict() - for ma, node in wk_data.items(): - predecessors = _get_predecessors(wk, node) - predecessors.reverse() - cmds_to_create = [] - init_artifacts = None - for i, (pnode, cnode, cxns) in enumerate(predecessors): - cdp = cnode.default_parameter - cdp_cmd = cdp.command - params = cdp.values.copy() - - icxns = {y: x for x, y in cxns.items()} - reqp = {x: icxns[y[1][0]] - for x, y in cdp_cmd.required_parameters.items()} - cmds_to_create.append([cdp_cmd, params, reqp]) - - info = _get_node_info(wk, pnode) - if info in merging_schemes: - if set(merging_schemes[info]) >= set(cxns): - init_artifacts = merging_schemes[info] - break - if init_artifacts is None: - pdp = pnode.default_parameter - pdp_cmd = pdp.command - params = pdp.values.copy() - # verifying that the workflow.artifact_type is included - # in the command input types or raise an error - wkartifact_type = wk.artifact_type - reqp = dict() - for x, y in pdp_cmd.required_parameters.items(): - if wkartifact_type not in y[1]: - raise ValueError(f'{wkartifact_type} is not part ' - 'of this preparation and cannot ' - 'be applied') - reqp[x] = wkartifact_type - - cmds_to_create.append([pdp_cmd, params, reqp]) - - if starting_job is not None: - init_artifacts = { - wkartifact_type: f'{starting_job.id}:'} - else: - init_artifacts = {wkartifact_type: self.artifact.id} - - cmds_to_create.reverse() - current_job = None - loop_starting_job = starting_job - for i, (cmd, params, rp) in enumerate(cmds_to_create): - if loop_starting_job is not None: - previous_job = loop_starting_job - loop_starting_job = None - else: - previous_job = current_job - if previous_job is None: - req_params = dict() - for iname, dname in rp.items(): - if dname not in init_artifacts: - msg = (f'Missing Artifact type: "{dname}" in ' - 'this preparation; this might be due ' - 'to missing steps or not having the ' - 'correct raw data.') - # raises option c. - raise ValueError(msg) - req_params[iname] = init_artifacts[dname] - else: - req_params = dict() - connections = dict() - for iname, dname in rp.items(): - req_params[iname] = f'{previous_job.id}{dname}' - connections[dname] = iname - params.update(req_params) - job_params = qdb.software.Parameters.load( - cmd, values_dict=params) - - if params in previous_jobs.values(): - for x, y in previous_jobs.items(): - if params == y: - current_job = x + previous_jobs = dict() + for ma, node in missing_artifacts.items(): + predecessors = _get_predecessors(wk, node) + predecessors.reverse() + cmds_to_create = [] + init_artifacts = None + for i, (pnode, cnode, cxns) in enumerate(predecessors): + cdp = cnode.default_parameter + cdp_cmd = cdp.command + params = cdp.values.copy() + + icxns = {y: x for x, y in cxns.items()} + reqp = {x: icxns[y[1][0]] + for x, y in cdp_cmd.required_parameters.items()} + cmds_to_create.append([cdp_cmd, params, reqp]) + + info = _get_node_info(wk, pnode) + if info in merging_schemes: + if set(merging_schemes[info]) >= set(cxns): + init_artifacts = merging_schemes[info] + break + if init_artifacts is None: + pdp = pnode.default_parameter + pdp_cmd = pdp.command + params = pdp.values.copy() + # verifying that the workflow.artifact_type is included + # in the command input types or raise an error + wkartifact_type = wk.artifact_type + reqp = dict() + for x, y in pdp_cmd.required_parameters.items(): + if wkartifact_type not in y[1]: + raise ValueError(f'{wkartifact_type} is not part ' + 'of this preparation and cannot ' + 'be applied') + reqp[x] = wkartifact_type + + cmds_to_create.append([pdp_cmd, params, reqp]) + + if starting_job is not None: + init_artifacts = { + wkartifact_type: f'{starting_job.id}:'} + else: + init_artifacts = {wkartifact_type: self.artifact.id} + + cmds_to_create.reverse() + current_job = None + loop_starting_job = starting_job + for i, (cmd, params, rp) in enumerate(cmds_to_create): + if loop_starting_job is not None: + previous_job = loop_starting_job + loop_starting_job = None + else: + previous_job = current_job + if previous_job is None: + req_params = dict() + for iname, dname in rp.items(): + if dname not in init_artifacts: + msg = (f'Missing Artifact type: "{dname}" in ' + 'this preparation; this might be due ' + 'to missing steps or not having the ' + 'correct raw data.') + # raises option c. + raise ValueError(msg) + req_params[iname] = init_artifacts[dname] + else: + req_params = dict() + connections = dict() + for iname, dname in rp.items(): + req_params[iname] = f'{previous_job.id}{dname}' + connections[dname] = iname + params.update(req_params) + job_params = qdb.software.Parameters.load( + cmd, values_dict=params) + + if params in previous_jobs.values(): + for x, y in previous_jobs.items(): + if params == y: + current_job = x + else: + if workflow is None: + PW = qdb.processing_job.ProcessingWorkflow + workflow = PW.from_scratch(user, job_params) + current_job = [ + j for j in workflow.graph.nodes()][0] else: - if workflow is None: - PW = qdb.processing_job.ProcessingWorkflow - workflow = PW.from_scratch(user, job_params) - current_job = [ - j for j in workflow.graph.nodes()][0] + if previous_job is None: + current_job = workflow.add( + job_params, req_params=req_params) else: - if previous_job is None: - current_job = workflow.add( - job_params, req_params=req_params) - else: - current_job = workflow.add( - job_params, req_params=req_params, - connections={previous_job: connections}) - previous_jobs[current_job] = params + current_job = workflow.add( + job_params, req_params=req_params, + connections={previous_job: connections}) + previous_jobs[current_job] = params return workflow diff --git a/qiita_db/processing_job.py b/qiita_db/processing_job.py index dcce029a6..a1f7e5baa 100644 --- a/qiita_db/processing_job.py +++ b/qiita_db/processing_job.py @@ -1020,7 +1020,7 @@ def submit(self, parent_job_id=None, dependent_jobs_list=None): # names to know if it should be executed differently and the # plugin should let Qiita know that a specific command should be ran # as job array or not - cnames_to_skip = {'Calculate Cell Counts'} + cnames_to_skip = {'Calculate Cell Counts', 'Calculate RNA Copy Counts'} if 'ENVIRONMENT' in plugin_env_script and cname not in cnames_to_skip: # the job has to be in running state so the plugin can change its` # status diff --git a/qiita_pet/handlers/study_handlers/prep_template.py b/qiita_pet/handlers/study_handlers/prep_template.py index 0af9949e3..167f981bd 100644 --- a/qiita_pet/handlers/study_handlers/prep_template.py +++ b/qiita_pet/handlers/study_handlers/prep_template.py @@ -81,11 +81,12 @@ def get(self): res['creation_job_filename'] = fp['filename'] res['creation_job_filename_body'] = fp['body'] summary = None - if res['creation_job'].outputs: - summary = relpath( + if res['creation_job'].status == 'success': + if res['creation_job'].outputs: # [0] is the id, [1] is the filepath - res['creation_job'].outputs['output'].html_summary_fp[1], - qiita_config.base_data_dir) + _file = res['creation_job'].outputs[ + 'output'].html_summary_fp[1] + summary = relpath(_file, qiita_config.base_data_dir) res['creation_job_artifact_summary'] = summary self.render('study_ajax/prep_summary.html', **res) From 57b84cf1d866b0d7f3cc84693c0e4eca894bb1c4 Mon Sep 17 00:00:00 2001 From: Stefan Janssen Date: Tue, 20 Feb 2024 19:53:56 +0100 Subject: [PATCH 2/2] fix environment_script for private plugins (#3359) * fix environment_script for private plugins I found that patch 54.sql for the test database uses an old conda activate mechanism for travis. We might want to fix this to the latest github action method of choice? * Update qiita-ci.yml * Update qiita-ci.yml fix quoting --- .github/workflows/qiita-ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/qiita-ci.yml b/.github/workflows/qiita-ci.yml index bbc6c25f5..6c3b06be1 100644 --- a/.github/workflows/qiita-ci.yml +++ b/.github/workflows/qiita-ci.yml @@ -154,6 +154,8 @@ jobs: echo "5. Setting up qiita" conda activate qiita + # adapt environment_script for private qiita plugins from travis to github actions. + sed 's#export PATH="/home/travis/miniconda3/bin:$PATH"; source #source /home/runner/.profile; conda #' -i qiita_db/support_files/patches/54.sql qiita-env make --no-load-ontologies qiita-test-install qiita plugins update