diff --git a/cpg_flow_test/jobs/build_pyramid.py b/cpg_flow_test/jobs/build_pyramid.py index 89466ff..43fafe2 100644 --- a/cpg_flow_test/jobs/build_pyramid.py +++ b/cpg_flow_test/jobs/build_pyramid.py @@ -10,6 +10,7 @@ def build_pyramid( b: Batch, sequencing_groups: list[SequencingGroup], input_files: dict[str, Any], + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = 'Build A Pyramid' @@ -17,7 +18,7 @@ def build_pyramid( sg_jobs = [] sg_output_files = [] for sg in sequencing_groups: # type: ignore - job = b.new_job(name=title + ': ' + sg.id) + job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id}) no_evens_input_file_path = input_files[sg.id]['no_evens'] no_evens_input_file = b.read_input(no_evens_input_file_path) @@ -50,7 +51,7 @@ def build_pyramid( sg_jobs.append(job) # Merge the no evens lists for all sequencing groups into a single file - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs | {'tool': 'cat'}) job.depends_on(*sg_jobs) inputs = ' '.join([b.read_input(f) for f in sg_output_files]) job.command(f'cat {inputs} >> {job.pyramid}') diff --git a/cpg_flow_test/jobs/cumulative_calc.py b/cpg_flow_test/jobs/cumulative_calc.py index 48aee6f..6b71fac 100644 --- a/cpg_flow_test/jobs/cumulative_calc.py +++ b/cpg_flow_test/jobs/cumulative_calc.py @@ -8,10 +8,11 @@ def cumulative_calc( b: Batch, sequencing_group: SequencingGroup, input_file_path: str, + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = f'Cumulative Calc: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) primes_path = b.read_input(input_file_path) cmd = f""" diff --git a/cpg_flow_test/jobs/filter_evens.py b/cpg_flow_test/jobs/filter_evens.py index 7b8d610..1199e32 100644 --- a/cpg_flow_test/jobs/filter_evens.py +++ b/cpg_flow_test/jobs/filter_evens.py @@ -11,6 +11,7 @@ def filter_evens( b: Batch, sequencing_groups: list[SequencingGroup], input_files: dict[str, dict[str, Any]], + job_attrs: dict[str, str], sg_outputs: dict[str, dict[str, Any]], output_file_path: str, ) -> list[Job]: @@ -20,7 +21,7 @@ def filter_evens( sg_jobs = [] sg_output_files = [] for sg in sequencing_groups: # type: ignore - job = b.new_job(name=title + ': ' + sg.id) + job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs) input_file_path = input_files[sg.id]['cumulative'] no_evens_input_file = b.read_input(input_file_path) no_evens_output_file_path = str(sg_outputs[sg.id]) @@ -42,7 +43,7 @@ def filter_evens( sg_jobs.append(job) # Merge the no evens lists for all sequencing groups into a single file - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) job.depends_on(*sg_jobs) inputs = ' '.join([b.read_input(f) for f in sg_output_files]) job.command(f'cat {inputs} >> {job.no_evens_file}') diff --git a/cpg_flow_test/jobs/first_n_primes.py b/cpg_flow_test/jobs/first_n_primes.py index 5fe198c..c6b85fd 100644 --- a/cpg_flow_test/jobs/first_n_primes.py +++ b/cpg_flow_test/jobs/first_n_primes.py @@ -5,10 +5,15 @@ def first_n_primes( - b: Batch, sequencing_group: SequencingGroup, input_file_path: str, output_file_path: str, depends_on: Job, + b: Batch, + sequencing_group: SequencingGroup, + input_file_path: str, + job_attrs: dict[str, str], + output_file_path: str, + depends_on: Job, ) -> list[Job]: title = f'First N Primes: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) id_sum_path = b.read_input(input_file_path) if depends_on: diff --git a/cpg_flow_test/jobs/iterative_digit_sum.py b/cpg_flow_test/jobs/iterative_digit_sum.py index 32f22b1..3b33632 100644 --- a/cpg_flow_test/jobs/iterative_digit_sum.py +++ b/cpg_flow_test/jobs/iterative_digit_sum.py @@ -7,10 +7,11 @@ def iterative_digit_sum( b: Batch, sequencing_group: SequencingGroup, + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = f'Iterative Digit Sum: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) cmd = f"""\ #!/bin/bash diff --git a/cpg_flow_test/jobs/say_hi.py b/cpg_flow_test/jobs/say_hi.py index 2a55f2d..c58fe0b 100644 --- a/cpg_flow_test/jobs/say_hi.py +++ b/cpg_flow_test/jobs/say_hi.py @@ -7,10 +7,11 @@ def say_hi( b: Batch, sequencing_group: SequencingGroup, + job_attrs: dict[str, str], output_file_path: str, ) -> list[Job]: title = f'Say Hi: {sequencing_group.id}' - job = b.new_job(name=title) + job = b.new_job(name=title, attributes=job_attrs) cmd = f""" echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi} diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index 3ca7ee6..fb4bc66 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -69,11 +69,18 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S # Write id_sum to output file id_sum_output_path = str(self.expected_outputs(sequencing_group).get('id_sum', '')) - job_id_sum = iterative_digit_sum(b, sequencing_group, id_sum_output_path) + job_id_sum = iterative_digit_sum(b, sequencing_group, self.get_job_attrs(sequencing_group), id_sum_output_path) # Generate first N primes primes_output_path = str(self.expected_outputs(sequencing_group).get('primes', '')) - job_primes = first_n_primes(b, sequencing_group, id_sum_output_path, primes_output_path, depends_on=job_id_sum) + job_primes = first_n_primes( + b, + sequencing_group, + id_sum_output_path, + self.get_job_attrs(sequencing_group), + primes_output_path, + depends_on=job_id_sum, + ) jobs = [job_id_sum, job_primes] @@ -92,7 +99,13 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S b = get_batch() cumulative_calc_output_path = str(self.expected_outputs(sequencing_group).get('cumulative', '')) - job_cumulative_calc = cumulative_calc(b, sequencing_group, input_txt, cumulative_calc_output_path) + job_cumulative_calc = cumulative_calc( + b, + sequencing_group, + input_txt, + self.get_job_attrs(sequencing_group), + cumulative_calc_output_path, + ) jobs = [job_cumulative_calc] @@ -114,7 +127,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S b = get_batch() hello_output_path = str(self.expected_outputs(sequencing_group).get('hello', '')) - job_say_hi = say_hi(b, sequencing_group, hello_output_path) + job_say_hi = say_hi(b, sequencing_group, self.get_job_attrs(sequencing_group), hello_output_path) jobs = [job_say_hi] @@ -145,6 +158,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None: b, cohort.get_sequencing_groups(), input_files, + self.get_job_attrs(cohort), sg_outputs, no_evens_output_path, ) @@ -186,7 +200,13 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu b = get_batch() pyramid_output_path = str(self.expected_outputs(multicohort).get('pyramid', '')) - job_pyramid = build_pyramid(b, multicohort.get_sequencing_groups(), input_files, pyramid_output_path) + job_pyramid = build_pyramid( + b, + multicohort.get_sequencing_groups(), + input_files, + self.get_job_attrs(multicohort), + pyramid_output_path, + ) return self.make_outputs( multicohort,