Skip to content

Commit

Permalink
Improve complete job (#3443)
Browse files Browse the repository at this point in the history
* Update CHANGELOG.md

* improve complete_job

* CREATE pg_trgm

* EXTENSION IF NOT EXISTS

* fix tests

* SELECT change

* = -> ILIKE
  • Loading branch information
antgonza authored Oct 18, 2024
1 parent 35a051e commit edd3eed
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Deployed on October 14th, 2024
* `Woltka v0.1.7, paired-end` superseded `Woltka v0.1.6` in `qp-woltka`; [more information](https://qiita.ucsd.edu/static/doc/html/processingdata/woltka_pairedend.html). Thank you to @qiyunzhu for the benchmarks!
* Other general fixes, like [#3424](https://github.com/qiita-spots/qiita/pull/3424), [#3425](https://github.com/qiita-spots/qiita/pull/3425), [#3439](https://github.com/qiita-spots/qiita/pull/3439), [#3440](https://github.com/qiita-spots/qiita/pull/3440).
* General SPP improvements, like: [NuQC modified to preserve metadata in fastq files](https://github.com/biocore/mg-scripts/pull/155), [use squeue instead of sacct](https://github.com/biocore/mg-scripts/pull/152), , [job aborts if Qiita study contains sample metadata columns reserved for prep-infos](https://github.com/biocore/mg-scripts/pull/151), [metapool generates OverrideCycles value](https://github.com/biocore/metagenomics_pooling_notebook/pull/225).
* We updated the available parameters for `Filter features against reference [filter_features]`, `Non V4 16S sequence assessment [non_v4_16s]` and all the phylogenetic analytical commands so they can use `Greengenes2 2024.09`.



Expand Down
4 changes: 3 additions & 1 deletion qiita_db/handlers/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def post(self, job_id):
cmd, values_dict={'job_id': job_id,
'payload': self.request.body.decode(
'ascii')})
job = qdb.processing_job.ProcessingJob.create(job.user, params)
# complete_job are unique so it is fine to force them to be created
job = qdb.processing_job.ProcessingJob.create(
job.user, params, force=True)
job.submit()

self.finish()
Expand Down
77 changes: 39 additions & 38 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,10 @@ def create(cls, user, parameters, force=False):
TTRN = qdb.sql_connection.TRN
with TTRN:
command = parameters.command

# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email, processing_job_status,
COUNT(aopj.artifact_id)
if not force:
# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email,
processing_job_status, COUNT(aopj.artifact_id)
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
Expand All @@ -596,41 +596,42 @@ def create(cls, user, parameters, force=False):
GROUP BY processing_job_id, email,
processing_job_status"""

# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
# the sql variable expects the list of parameters but if there
# is no param we need to replace the {0} with an empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs and not force:
raise ValueError(
'Cannot create job because the parameters are the same as '
'jobs that are queued, running or already have '
'succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))
# the sql variable expects the list of parameters but if
# there is no param we need to replace the {0} with an
# empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs:
raise ValueError(
'Cannot create job because the parameters are the '
'same as jobs that are queued, running or already '
'have succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))

sql = """INSERT INTO qiita.processing_job
(email, command_id, command_parameters,
Expand Down
57 changes: 57 additions & 0 deletions qiita_db/support_files/patches/93.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- Oct 18, 2024
-- ProcessingJob.create can take up to 52 seconds if creating a complete_job; mainly
-- due to the number of jobs of this command and using json. The solution in the database
-- is to convert to jsonb and index the values of the database

-- ### This are the stats before the change in a single example
-- GroupAggregate (cost=67081.81..67081.83 rows=1 width=77) (actual time=51859.962..51862.637 rows=1 loops=1)
-- Group Key: processing_job.processing_job_id, processing_job_status.processing_job_status
-- -> Sort (cost=67081.81..67081.81 rows=1 width=77) (actual time=51859.952..51862.627 rows=1 loops=1)
-- Sort Key: processing_job.processing_job_id, processing_job_status.processing_job_status
-- Sort Method: quicksort Memory: 25kB
-- -> Nested Loop Left Join (cost=4241.74..67081.80 rows=1 width=77) (actual time=51859.926..51862.604 rows=1 loops=1)
-- -> Nested Loop (cost=4237.30..67069.64 rows=1 width=69) (actual time=51859.889..51862.566 rows=1 loops=1)
-- Join Filter: (processing_job.processing_job_status_id = processing_job_status.processing_job_status_id)
-- Rows Removed by Join Filter: 1
-- -> Gather (cost=4237.30..67068.50 rows=1 width=45) (actual time=51859.846..51862.522 rows=1 loops=1)
-- Workers Planned: 2
-- Workers Launched: 2
-- -> Parallel Bitmap Heap Scan on processing_job (cost=3237.30..66068.40 rows=1 width=45) (actual time=51785.317..51785.446 rows=0 loops=3)
-- Recheck Cond: (command_id = 83)
-- Filter: (((command_parameters ->> 'job_id'::text) ~~* '3432a908-f7b8-4e36-89fc-88f3310b84d5'::text) AND ((command_parameters ->> '
-- payload'::text) ~~* '{"success": true, "error": "", "artifacts": {"alpha_diversity": {"artifact_type": "alpha_vector", "filepaths": [["/qmounts/qiita_test_data/tes
-- tlocal/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity/alpha-diversity.tsv", "plain_text"], ["/qmounts/qiita_test_data/testloca
-- l/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity.qza", "qza"]], "archive": {}}}}'::text))
-- Rows Removed by Filter: 97315
-- Heap Blocks: exact=20133
-- -> Bitmap Index Scan on idx_processing_job_command_id (cost=0.00..3237.30 rows=294517 width=0) (actual time=41.569..41.569 rows=
-- 293054 loops=1)
-- Index Cond: (command_id = 83)
-- -> Seq Scan on processing_job_status (cost=0.00..1.09 rows=4 width=40) (actual time=0.035..0.035 rows=2 loops=1)
-- Filter: ((processing_job_status)::text = ANY ('{success,waiting,running,in_construction}'::text[]))
-- Rows Removed by Filter: 1
-- -> Bitmap Heap Scan on artifact_output_processing_job aopj (cost=4.43..12.14 rows=2 width=24) (actual time=0.031..0.031 rows=0 loops=1)
-- Recheck Cond: (processing_job.processing_job_id = processing_job_id)
-- -> Bitmap Index Scan on idx_artifact_output_processing_job_job (cost=0.00..4.43 rows=2 width=0) (actual time=0.026..0.026 rows=0 loops=1)
-- Index Cond: (processing_job_id = processing_job.processing_job_id)
-- Planning Time: 1.173 ms
-- Execution Time: 51862.756 ms

-- Note: for this to work you need to have created as admin the extension
-- CREATE EXTENSION pg_trgm;
CREATE EXTENSION IF NOT EXISTS "pg_trgm" WITH SCHEMA public;

-- This alter table will take close to 11 min
ALTER TABLE qiita.processing_job
ALTER COLUMN command_parameters TYPE JSONB USING command_parameters::jsonb;

-- This indexing will take like 5 min
CREATE INDEX IF NOT EXISTS processing_job_command_parameters_job_id ON qiita.processing_job
USING GIN((command_parameters->>'job_id') gin_trgm_ops);

-- This indexing will take like an hour
CREATE INDEX IF NOT EXISTS processing_job_command_parameters_payload ON qiita.processing_job
USING GIN((command_parameters->>'payload') gin_trgm_ops);

-- After the changes
-- 18710.404 ms

0 comments on commit edd3eed

Please sign in to comment.