Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpg_flow_test/jobs/build_pyramid.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ def build_pyramid(
b: Batch,
sequencing_groups: list[SequencingGroup],
input_files: dict[str, Any],
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
title = 'Build A Pyramid'
# Compute the no evens list for each sequencing group
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
job = b.new_job(name=title + ': ' + sg.id)
job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id})
no_evens_input_file_path = input_files[sg.id]['no_evens']
no_evens_input_file = b.read_input(no_evens_input_file_path)

Expand Down Expand Up @@ -50,7 +51,7 @@ def build_pyramid(
sg_jobs.append(job)

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title)
job = b.new_job(name=title, attributes=job_attrs | {'tool': 'cat'})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tool cat? 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep 😸

The command for this job is just job.command(f'cat {inputs} >> {job.pyramid}'). But really this is just an example of unioning in some more attributes entries.

job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.pyramid}')
Expand Down
3 changes: 2 additions & 1 deletion cpg_flow_test/jobs/cumulative_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ def cumulative_calc(
b: Batch,
sequencing_group: SequencingGroup,
input_file_path: str,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
title = f'Cumulative Calc: {sequencing_group.id}'
job = b.new_job(name=title)
job = b.new_job(name=title, attributes=job_attrs)
primes_path = b.read_input(input_file_path)

cmd = f"""
Expand Down
5 changes: 3 additions & 2 deletions cpg_flow_test/jobs/filter_evens.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def filter_evens(
b: Batch,
sequencing_groups: list[SequencingGroup],
input_files: dict[str, dict[str, Any]],
job_attrs: dict[str, str],
sg_outputs: dict[str, dict[str, Any]],
output_file_path: str,
) -> list[Job]:
Expand All @@ -20,7 +21,7 @@ def filter_evens(
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
job = b.new_job(name=title + ': ' + sg.id)
job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs)
input_file_path = input_files[sg.id]['cumulative']
no_evens_input_file = b.read_input(input_file_path)
no_evens_output_file_path = str(sg_outputs[sg.id])
Expand All @@ -42,7 +43,7 @@ def filter_evens(
sg_jobs.append(job)

# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title)
job = b.new_job(name=title, attributes=job_attrs)
job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.no_evens_file}')
Expand Down
9 changes: 7 additions & 2 deletions cpg_flow_test/jobs/first_n_primes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@


def first_n_primes(
b: Batch, sequencing_group: SequencingGroup, input_file_path: str, output_file_path: str, depends_on: Job,
b: Batch,
sequencing_group: SequencingGroup,
input_file_path: str,
job_attrs: dict[str, str],
output_file_path: str,
depends_on: Job,
) -> list[Job]:
title = f'First N Primes: {sequencing_group.id}'
job = b.new_job(name=title)
job = b.new_job(name=title, attributes=job_attrs)
id_sum_path = b.read_input(input_file_path)

if depends_on:
Expand Down
3 changes: 2 additions & 1 deletion cpg_flow_test/jobs/iterative_digit_sum.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
def iterative_digit_sum(
b: Batch,
sequencing_group: SequencingGroup,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
title = f'Iterative Digit Sum: {sequencing_group.id}'
job = b.new_job(name=title)
job = b.new_job(name=title, attributes=job_attrs)

cmd = f"""\
#!/bin/bash
Expand Down
3 changes: 2 additions & 1 deletion cpg_flow_test/jobs/say_hi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
def say_hi(
b: Batch,
sequencing_group: SequencingGroup,
job_attrs: dict[str, str],
output_file_path: str,
) -> list[Job]:
title = f'Say Hi: {sequencing_group.id}'
job = b.new_job(name=title)
job = b.new_job(name=title, attributes=job_attrs)

cmd = f"""
echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi}
Expand Down
30 changes: 25 additions & 5 deletions cpg_flow_test/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S

# Write id_sum to output file
id_sum_output_path = str(self.expected_outputs(sequencing_group).get('id_sum', ''))
job_id_sum = iterative_digit_sum(b, sequencing_group, id_sum_output_path)
job_id_sum = iterative_digit_sum(b, sequencing_group, self.get_job_attrs(sequencing_group), id_sum_output_path)

# Generate first N primes
primes_output_path = str(self.expected_outputs(sequencing_group).get('primes', ''))
job_primes = first_n_primes(b, sequencing_group, id_sum_output_path, primes_output_path, depends_on=job_id_sum)
job_primes = first_n_primes(
b,
sequencing_group,
id_sum_output_path,
self.get_job_attrs(sequencing_group),
primes_output_path,
depends_on=job_id_sum,
)

jobs = [job_id_sum, job_primes]

Expand All @@ -92,7 +99,13 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S
b = get_batch()

cumulative_calc_output_path = str(self.expected_outputs(sequencing_group).get('cumulative', ''))
job_cumulative_calc = cumulative_calc(b, sequencing_group, input_txt, cumulative_calc_output_path)
job_cumulative_calc = cumulative_calc(
b,
sequencing_group,
input_txt,
self.get_job_attrs(sequencing_group),
cumulative_calc_output_path,
)

jobs = [job_cumulative_calc]

Expand All @@ -114,7 +127,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S
b = get_batch()

hello_output_path = str(self.expected_outputs(sequencing_group).get('hello', ''))
job_say_hi = say_hi(b, sequencing_group, hello_output_path)
job_say_hi = say_hi(b, sequencing_group, self.get_job_attrs(sequencing_group), hello_output_path)

jobs = [job_say_hi]

Expand Down Expand Up @@ -145,6 +158,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
b,
cohort.get_sequencing_groups(),
input_files,
self.get_job_attrs(cohort),
sg_outputs,
no_evens_output_path,
)
Expand Down Expand Up @@ -186,7 +200,13 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu
b = get_batch()

pyramid_output_path = str(self.expected_outputs(multicohort).get('pyramid', ''))
job_pyramid = build_pyramid(b, multicohort.get_sequencing_groups(), input_files, pyramid_output_path)
job_pyramid = build_pyramid(
b,
multicohort.get_sequencing_groups(),
input_files,
self.get_job_attrs(multicohort),
pyramid_output_path,
)

return self.make_outputs(
multicohort,
Expand Down
Loading