From 7d0a4b7a4b9ccdb7f9a6d762d69f58b7dacadab7 Mon Sep 17 00:00:00 2001 From: John Marshall Date: Mon, 11 Aug 2025 16:48:35 +1200 Subject: [PATCH 1/2] Reformat in anticipation of adding job_attrs parameter --- cpg_flow_test/jobs/first_n_primes.py | 6 +++++- cpg_flow_test/stages.py | 22 +++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/cpg_flow_test/jobs/first_n_primes.py b/cpg_flow_test/jobs/first_n_primes.py index 5fe198c..1ab9d5a 100644 --- a/cpg_flow_test/jobs/first_n_primes.py +++ b/cpg_flow_test/jobs/first_n_primes.py @@ -5,7 +5,11 @@ def first_n_primes( - b: Batch, sequencing_group: SequencingGroup, input_file_path: str, output_file_path: str, depends_on: Job, + b: Batch, + sequencing_group: SequencingGroup, + input_file_path: str, + output_file_path: str, + depends_on: Job, ) -> list[Job]: title = f'First N Primes: {sequencing_group.id}' job = b.new_job(name=title) diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index 3ca7ee6..607988c 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -73,7 +73,13 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S # Generate first N primes primes_output_path = str(self.expected_outputs(sequencing_group).get('primes', '')) - job_primes = first_n_primes(b, sequencing_group, id_sum_output_path, primes_output_path, depends_on=job_id_sum) + job_primes = first_n_primes( + b, + sequencing_group, + id_sum_output_path, + primes_output_path, + depends_on=job_id_sum, + ) jobs = [job_id_sum, job_primes] @@ -92,7 +98,12 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S b = get_batch() cumulative_calc_output_path = str(self.expected_outputs(sequencing_group).get('cumulative', '')) - job_cumulative_calc = cumulative_calc(b, sequencing_group, input_txt, cumulative_calc_output_path) + job_cumulative_calc = cumulative_calc( + b, + sequencing_group, + input_txt, + cumulative_calc_output_path, + ) jobs = [job_cumulative_calc] @@ -186,7 +197,12 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu b = get_batch() pyramid_output_path = str(self.expected_outputs(multicohort).get('pyramid', '')) - job_pyramid = build_pyramid(b, multicohort.get_sequencing_groups(), input_files, pyramid_output_path) + job_pyramid = build_pyramid( + b, + multicohort.get_sequencing_groups(), + input_files, + pyramid_output_path, + ) return self.make_outputs( multicohort, From 570e414022bab98170e545bf99eab3597d32ba32 Mon Sep 17 00:00:00 2001 From: John Marshall Date: Mon, 11 Aug 2025 16:48:35 +1200 Subject: [PATCH 2/2] Use Target.get_job_attrs() and propagate to Batch.new_job() calls --- cpg_flow_test/jobs/build_pyramid.py | 5 +++-- cpg_flow_test/jobs/cumulative_calc.py | 3 ++- cpg_flow_test/jobs/filter_evens.py | 5 +++-- cpg_flow_test/jobs/first_n_primes.py | 3 ++- cpg_flow_test/jobs/iterative_digit_sum.py | 3 ++- cpg_flow_test/jobs/say_hi.py | 3 ++- cpg_flow_test/stages.py | 8 ++++++-- 7 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cpg_flow_test/jobs/build_pyramid.py b/cpg_flow_test/jobs/build_pyramid.py index 89466ff..43fafe2 100644 --- a/cpg_flow_test/jobs/build_pyramid.py +++ b/cpg_flow_test/jobs/build_pyramid.py @@ -10,6 +10,7 @@ def build_pyramid( b: Batch, sequencing_groups: list[SequencingGroup], input_files: dict[str, Any], + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = 'Build A Pyramid' @@ -17,7 +18,7 @@ def build_pyramid( sg_jobs = [] sg_output_files = [] for sg in sequencing_groups: # type: ignore - job = b.new_job(name=title + ': ' + sg.id) + job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id}) no_evens_input_file_path = input_files[sg.id]['no_evens'] no_evens_input_file = b.read_input(no_evens_input_file_path) @@ -50,7 +51,7 @@ def build_pyramid( sg_jobs.append(job) # Merge the no evens lists for all sequencing groups into a single file - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs | {'tool': 'cat'}) job.depends_on(*sg_jobs) inputs = ' '.join([b.read_input(f) for f in sg_output_files]) job.command(f'cat {inputs} >> {job.pyramid}') diff --git a/cpg_flow_test/jobs/cumulative_calc.py b/cpg_flow_test/jobs/cumulative_calc.py index 48aee6f..6b71fac 100644 --- a/cpg_flow_test/jobs/cumulative_calc.py +++ b/cpg_flow_test/jobs/cumulative_calc.py @@ -8,10 +8,11 @@ def cumulative_calc( b: Batch, sequencing_group: SequencingGroup, input_file_path: str, + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = f'Cumulative Calc: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) primes_path = b.read_input(input_file_path) cmd = f""" diff --git a/cpg_flow_test/jobs/filter_evens.py b/cpg_flow_test/jobs/filter_evens.py index 7b8d610..1199e32 100644 --- a/cpg_flow_test/jobs/filter_evens.py +++ b/cpg_flow_test/jobs/filter_evens.py @@ -11,6 +11,7 @@ def filter_evens( b: Batch, sequencing_groups: list[SequencingGroup], input_files: dict[str, dict[str, Any]], + job_attrs: dict[str, str], sg_outputs: dict[str, dict[str, Any]], output_file_path: str, ) -> list[Job]: @@ -20,7 +21,7 @@ def filter_evens( sg_jobs = [] sg_output_files = [] for sg in sequencing_groups: # type: ignore - job = b.new_job(name=title + ': ' + sg.id) + job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs) input_file_path = input_files[sg.id]['cumulative'] no_evens_input_file = b.read_input(input_file_path) no_evens_output_file_path = str(sg_outputs[sg.id]) @@ -42,7 +43,7 @@ def filter_evens( sg_jobs.append(job) # Merge the no evens lists for all sequencing groups into a single file - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) job.depends_on(*sg_jobs) inputs = ' '.join([b.read_input(f) for f in sg_output_files]) job.command(f'cat {inputs} >> {job.no_evens_file}') diff --git a/cpg_flow_test/jobs/first_n_primes.py b/cpg_flow_test/jobs/first_n_primes.py index 1ab9d5a..c6b85fd 100644 --- a/cpg_flow_test/jobs/first_n_primes.py +++ b/cpg_flow_test/jobs/first_n_primes.py @@ -8,11 +8,12 @@ def first_n_primes( b: Batch, sequencing_group: SequencingGroup, input_file_path: str, + job_attrs: dict[str, str], output_file_path: str, depends_on: Job, ) -> list[Job]: title = f'First N Primes: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) id_sum_path = b.read_input(input_file_path) if depends_on: diff --git a/cpg_flow_test/jobs/iterative_digit_sum.py b/cpg_flow_test/jobs/iterative_digit_sum.py index 32f22b1..3b33632 100644 --- a/cpg_flow_test/jobs/iterative_digit_sum.py +++ b/cpg_flow_test/jobs/iterative_digit_sum.py @@ -7,10 +7,11 @@ def iterative_digit_sum( b: Batch, sequencing_group: SequencingGroup, + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = f'Iterative Digit Sum: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) cmd = f"""\ #!/bin/bash diff --git a/cpg_flow_test/jobs/say_hi.py b/cpg_flow_test/jobs/say_hi.py index 2a55f2d..c58fe0b 100644 --- a/cpg_flow_test/jobs/say_hi.py +++ b/cpg_flow_test/jobs/say_hi.py @@ -7,10 +7,11 @@ def say_hi( b: Batch, sequencing_group: SequencingGroup, + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = f'Say Hi: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) cmd = f""" echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi} diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index 607988c..fb4bc66 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -69,7 +69,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S # Write id_sum to output file id_sum_output_path = str(self.expected_outputs(sequencing_group).get('id_sum', '')) - job_id_sum = iterative_digit_sum(b, sequencing_group, id_sum_output_path) + job_id_sum = iterative_digit_sum(b, sequencing_group, self.get_job_attrs(sequencing_group), id_sum_output_path) # Generate first N primes primes_output_path = str(self.expected_outputs(sequencing_group).get('primes', '')) @@ -77,6 +77,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S b, sequencing_group, id_sum_output_path, + self.get_job_attrs(sequencing_group), primes_output_path, depends_on=job_id_sum, ) @@ -102,6 +103,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S b, sequencing_group, input_txt, + self.get_job_attrs(sequencing_group), cumulative_calc_output_path, ) @@ -125,7 +127,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S b = get_batch() hello_output_path = str(self.expected_outputs(sequencing_group).get('hello', '')) - job_say_hi = say_hi(b, sequencing_group, hello_output_path) + job_say_hi = say_hi(b, sequencing_group, self.get_job_attrs(sequencing_group), hello_output_path) jobs = [job_say_hi] @@ -156,6 +158,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None: b, cohort.get_sequencing_groups(), input_files, + self.get_job_attrs(cohort), sg_outputs, no_evens_output_path, ) @@ -201,6 +204,7 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu b, multicohort.get_sequencing_groups(), input_files, + self.get_job_attrs(multicohort), pyramid_output_path, )