diff --git a/cpg_flow_test/configs/default_config.toml b/cpg_flow_test/configs/default_config.toml index 6139525..04f4629 100644 --- a/cpg_flow_test/configs/default_config.toml +++ b/cpg_flow_test/configs/default_config.toml @@ -107,3 +107,6 @@ sequencing_type = 'genome' [resource_overrides] # Override the default resources for a stage. + +[images] +ubuntu = 'australia-southeast1-docker.pkg.dev/cpg-common/images/sv/ubuntu1804:latest' diff --git a/cpg_flow_test/jobs/__init__.py b/cpg_flow_test/jobs/__init__.py index d9966b5..e69de29 100644 --- a/cpg_flow_test/jobs/__init__.py +++ b/cpg_flow_test/jobs/__init__.py @@ -1,6 +0,0 @@ -from jobs.build_pyramid import build_pyramid -from jobs.cumulative_calc import cumulative_calc -from jobs.filter_evens import filter_evens -from jobs.first_n_primes import first_n_primes -from jobs.iterative_digit_sum import iterative_digit_sum -from jobs.say_hi import say_hi diff --git a/cpg_flow_test/jobs/build_pyramid.py b/cpg_flow_test/jobs/build_pyramid.py index 43fafe2..bf218b6 100644 --- a/cpg_flow_test/jobs/build_pyramid.py +++ b/cpg_flow_test/jobs/build_pyramid.py @@ -1,33 +1,36 @@ from typing import Any -from cpg_flow.targets.sequencing_group import SequencingGroup -from hailtop.batch import Batch -from hailtop.batch.job import Job from loguru import logger +from hailtop.batch.job import Job + +from cpg_flow.targets.sequencing_group import SequencingGroup +from cpg_utils import Path, config, hail_batch -def build_pyramid( - b: Batch, + +def build_pyramid_job( sequencing_groups: list[SequencingGroup], input_files: dict[str, Any], job_attrs: dict[str, str], - output_file_path: str, + output_file_path: Path, ) -> list[Job]: + b = hail_batch.get_batch() + title = 'Build A Pyramid' # Compute the no evens list for each sequencing group - sg_jobs = [] - sg_output_files = [] - for sg in sequencing_groups: # type: ignore - job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id}) - no_evens_input_file_path = input_files[sg.id]['no_evens'] - no_evens_input_file = b.read_input(no_evens_input_file_path) + sg_jobs: list[Job] = [] + sg_output_files: list[Path] = [] + for sg in sequencing_groups: + job = b.new_bash_job(name=title + ': ' + sg.id, attributes=job_attrs | {'sequencing_group': sg.id}) + job.image(config.config_retrieve(['images', 'ubuntu'])) - id_sum_input_file_path = input_files[sg.id]['id_sum'] - id_sum_input_file = b.read_input(id_sum_input_file_path) + no_evens_input_file = b.read_input(input_files[sg.id]['no_evens']) - pyramid_output_file_path = str(sg.dataset.prefix() / f'{sg.id}_pyramid.txt') + id_sum_input_file = b.read_input(input_files[sg.id]['id_sum']) + + pyramid_output_file_path = sg.dataset.prefix() / f'{sg.id}_pyramid.txt' sg_output_files.append(pyramid_output_file_path) - cmd = f""" + job.command(f""" pyramid=() max_row_size=$(cat {no_evens_input_file} | rev | cut -d' ' -f1 | rev) rows=($(cat {no_evens_input_file} | cut -d' ' -f2-)) @@ -44,14 +47,14 @@ def build_pyramid( done printf "%s\\n" "${{pyramid[@]}}" > {job.pyramid_file} - """ + """) - job.command(cmd) b.write_output(job.pyramid_file, pyramid_output_file_path) sg_jobs.append(job) # Merge the no evens lists for all sequencing groups into a single file - job = b.new_job(name=title, attributes=job_attrs | {'tool': 'cat'}) + job = b.new_bash_job(name=title, attributes=job_attrs | {'tool': 'cat'}) + job.image(config.config_retrieve(['images', 'ubuntu'])) job.depends_on(*sg_jobs) inputs = ' '.join([b.read_input(f) for f in sg_output_files]) job.command(f'cat {inputs} >> {job.pyramid}') @@ -60,6 +63,4 @@ def build_pyramid( logger.info('-----PRINT PYRAMID-----') logger.info(output_file_path) - all_jobs = [job, *sg_jobs] - - return all_jobs + return [job, *sg_jobs] diff --git a/cpg_flow_test/jobs/cumulative_calc.py b/cpg_flow_test/jobs/cumulative_calc.py index 6b71fac..df1deb3 100644 --- a/cpg_flow_test/jobs/cumulative_calc.py +++ b/cpg_flow_test/jobs/cumulative_calc.py @@ -1,18 +1,20 @@ -from cpg_flow.targets.sequencing_group import SequencingGroup -from hailtop.batch import Batch -from hailtop.batch.job import Job from loguru import logger +from hailtop.batch.job import Job + +from cpg_flow.targets.sequencing_group import SequencingGroup +from cpg_utils import Path, config, hail_batch + -def cumulative_calc( - b: Batch, +def cumulative_calc_job( sequencing_group: SequencingGroup, - input_file_path: str, + input_file_path: Path, job_attrs: dict[str, str], - output_file_path: str, + output_file_path: Path, ) -> list[Job]: - title = f'Cumulative Calc: {sequencing_group.id}' - job = b.new_job(name=title, attributes=job_attrs) + b = hail_batch.get_batch() + job = b.new_job(name=f'Cumulative Calc: {sequencing_group.id}', attributes=job_attrs) + job.image(config.config_retrieve(['images', 'ubuntu'])) primes_path = b.read_input(input_file_path) cmd = f""" diff --git a/cpg_flow_test/jobs/filter_evens.py b/cpg_flow_test/jobs/filter_evens.py index 1199e32..651b692 100644 --- a/cpg_flow_test/jobs/filter_evens.py +++ b/cpg_flow_test/jobs/filter_evens.py @@ -1,34 +1,31 @@ -from typing import Any +from loguru import logger -from cpg_flow.stage import Stage, StageInput -from cpg_flow.targets.sequencing_group import SequencingGroup -from hailtop.batch import Batch from hailtop.batch.job import Job -from loguru import logger +from cpg_flow.targets.sequencing_group import SequencingGroup +from cpg_utils import Path, config, hail_batch -def filter_evens( - b: Batch, + +def filter_evens_job( sequencing_groups: list[SequencingGroup], - input_files: dict[str, dict[str, Any]], + input_files: dict[str, dict[str, Path]], job_attrs: dict[str, str], - sg_outputs: dict[str, dict[str, Any]], - output_file_path: str, + sg_outputs: dict[str, dict[str, Path] | Path], ) -> list[Job]: + b = hail_batch.get_batch() title = 'Filter Evens' # Compute the no evens list for each sequencing group sg_jobs = [] sg_output_files = [] - for sg in sequencing_groups: # type: ignore - job = b.new_job(name=title + ': ' + sg.id, attributes=job_attrs) + for sg in sequencing_groups: + job = b.new_bash_job(name=f'{title}: {sg.id}', attributes=job_attrs) + job.image(config.config_retrieve(['images', 'ubuntu'])) input_file_path = input_files[sg.id]['cumulative'] - no_evens_input_file = b.read_input(input_file_path) - no_evens_output_file_path = str(sg_outputs[sg.id]) - sg_output_files.append(no_evens_output_file_path) + sg_output_files.append(sg_outputs[sg.id]) - cmd = f""" - numbers=($(cat {no_evens_input_file})) + job.command(f""" + numbers=($(cat {b.read_input(input_file_path)})) result=() for num in "${{numbers[@]}}"; do if (( num % 2 != 0 )); then @@ -36,22 +33,21 @@ def filter_evens( fi done echo "{sg.id}: ${{result[@]}}" > {job.sg_no_evens_file} - """ + """) - job.command(cmd) - b.write_output(job.sg_no_evens_file, no_evens_output_file_path) + b.write_output(job.sg_no_evens_file, sg_outputs[sg.id]) sg_jobs.append(job) # Merge the no evens lists for all sequencing groups into a single file - job = b.new_job(name=title, attributes=job_attrs) + job = b.new_bash_job(name=title, attributes=job_attrs) + job.image(config.config_retrieve(['images', 'ubuntu'])) + job.depends_on(*sg_jobs) inputs = ' '.join([b.read_input(f) for f in sg_output_files]) job.command(f'cat {inputs} >> {job.no_evens_file}') - b.write_output(job.no_evens_file, output_file_path) + b.write_output(job.no_evens_file, sg_outputs['no_evens']) logger.info('-----PRINT NO EVENS-----') - logger.info(output_file_path) - - all_jobs = [job, *sg_jobs] + logger.info(sg_outputs['no_evens']) - return all_jobs + return [job, *sg_jobs] diff --git a/cpg_flow_test/jobs/first_n_primes.py b/cpg_flow_test/jobs/first_n_primes.py index c6b85fd..f0965c6 100644 --- a/cpg_flow_test/jobs/first_n_primes.py +++ b/cpg_flow_test/jobs/first_n_primes.py @@ -1,25 +1,23 @@ -from cpg_flow.targets.sequencing_group import SequencingGroup -from hailtop.batch import Batch -from hailtop.batch.job import Job from loguru import logger +from hailtop.batch.job import Job + +from cpg_flow.targets.sequencing_group import SequencingGroup +from cpg_utils import Path, config, hail_batch + -def first_n_primes( - b: Batch, +def first_n_primes_job( sequencing_group: SequencingGroup, - input_file_path: str, + input_file_path: Path, job_attrs: dict[str, str], - output_file_path: str, - depends_on: Job, -) -> list[Job]: - title = f'First N Primes: {sequencing_group.id}' - job = b.new_job(name=title, attributes=job_attrs) + output_file_path: Path, +) -> Job: + b = hail_batch.get_batch() + job = b.new_job(name=f'First N Primes: {sequencing_group.id}', attributes=job_attrs) + job.image(config.config_retrieve(['images', 'ubuntu'])) id_sum_path = b.read_input(input_file_path) - if depends_on: - job.depends_on(depends_on) - - cmd = f""" + job.command(f""" is_prime() {{ local num=$1 if [ $num -lt 2 ]; then @@ -46,9 +44,7 @@ def first_n_primes( done echo ${{primes[@]}} > {job.primes} - """ - - job.command(cmd) + """) logger.info('-----PRINT PRIMES-----') logger.info(output_file_path) diff --git a/cpg_flow_test/jobs/iterative_digit_sum.py b/cpg_flow_test/jobs/iterative_digit_sum.py index 3b33632..3929c74 100644 --- a/cpg_flow_test/jobs/iterative_digit_sum.py +++ b/cpg_flow_test/jobs/iterative_digit_sum.py @@ -1,19 +1,21 @@ -from cpg_flow.targets.sequencing_group import SequencingGroup -from hailtop.batch import Batch -from hailtop.batch.job import Job from loguru import logger +from hailtop.batch.job import Job + +from cpg_flow.targets.sequencing_group import SequencingGroup +from cpg_utils import Path, config, hail_batch + -def iterative_digit_sum( - b: Batch, +def iterative_digit_sum_job( sequencing_group: SequencingGroup, job_attrs: dict[str, str], - output_file_path: str, -) -> list[Job]: - title = f'Iterative Digit Sum: {sequencing_group.id}' - job = b.new_job(name=title, attributes=job_attrs) + output_file_path: Path, +) -> Job: + b = hail_batch.get_batch() + job = b.new_job(name=f'Iterative Digit Sum: {sequencing_group.id}', attributes=job_attrs) + job.image(config.config_retrieve(['images', 'ubuntu'])) - cmd = f"""\ + job.command(f"""\ #!/bin/bash # Function to calculate the iterative digit sum @@ -49,8 +51,7 @@ def iterative_digit_sum( result=$(extract_digits_and_sum {sequencing_group.id}) echo "Result: $result\n" echo $result > {job.id_sum} - """ - job.command(cmd) + """) logger.info('-----PRINT ID_SUM-----') logger.info(output_file_path) diff --git a/cpg_flow_test/jobs/say_hi.py b/cpg_flow_test/jobs/say_hi.py index c58fe0b..1e170e1 100644 --- a/cpg_flow_test/jobs/say_hi.py +++ b/cpg_flow_test/jobs/say_hi.py @@ -1,23 +1,23 @@ -from cpg_flow.targets.sequencing_group import SequencingGroup -from hailtop.batch import Batch -from hailtop.batch.job import Job from loguru import logger +from hailtop.batch.job import Job -def say_hi( - b: Batch, +from cpg_flow.targets.sequencing_group import SequencingGroup +from cpg_utils import Path, config, hail_batch + + +def say_hi_job( sequencing_group: SequencingGroup, job_attrs: dict[str, str], - output_file_path: str, -) -> list[Job]: - title = f'Say Hi: {sequencing_group.id}' - job = b.new_job(name=title, attributes=job_attrs) + output_file_path: Path, +) -> Job: + b = hail_batch.get_batch() + job = b.new_job(name=f'Say Hi: {sequencing_group.id}', attributes=job_attrs) + job.image(config.config_retrieve(['images', 'ubuntu'])) - cmd = f""" + job.command(f""" echo "This is a hello from sequencing_group {sequencing_group.id}" > {job.sayhi} - """ - - job.command(cmd) + """) logger.info('-----PRINT SAY HI-----') logger.info(output_file_path) diff --git a/cpg_flow_test/stages.py b/cpg_flow_test/stages.py index fb4bc66..483f170 100644 --- a/cpg_flow_test/stages.py +++ b/cpg_flow_test/stages.py @@ -1,5 +1,6 @@ -from typing import Any # noqa: I001 +from typing import Any +from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi from loguru import logger from cpg_flow.stage import CohortStage, MultiCohortStage, SequencingGroupStage, StageInput, StageOutput, stage @@ -7,22 +8,21 @@ from cpg_flow.targets.multicohort import MultiCohort from cpg_flow.targets.sequencing_group import SequencingGroup from cpg_utils import Path -from cpg_utils.hail_batch import get_batch -from jobs import build_pyramid, cumulative_calc, filter_evens, first_n_primes, iterative_digit_sum, say_hi """ -Here's a fun programming task with four interdependent steps, using the concept of **prime numbers** and their relationships: +Here's a fun programming task with four steps, using the concept of **prime numbers** and their relationships: --- ### Task: Prime Pyramid -Write a program that builds a "Prime Pyramid" based on a given input number \( N \). The pyramid is built in four steps: +Write a program that builds a "Prime Pyramid" based on a given input number `N`. The pyramid is built in four steps: #### Step 1: **Generate Prime Numbers** -Write a function to generate the first \( N \) prime numbers. For example, if \( N = 5 \), the output would be `[2, 3, 5, 7, 11]`. +Write a function to generate the first `N` prime numbers. i.e. if `N=5``, the output would be `[2, 3, 5, 7, 11]`. #### Step 2: **Calculate Cumulative Sums** -Using the prime numbers generated in Step 1, calculate a list of cumulative sums. Each cumulative sum is the sum of the primes up to that index. +Using the prime numbers generated in Step 1, calculate a list of cumulative sums. +Each cumulative sum is the sum of the primes up to that index. Example: For `[2, 3, 5, 7, 11]`, the cumulative sums are `[2, 5, 10, 17, 28]`. #### Step 3: **Filter Even Numbers** @@ -30,7 +30,8 @@ Example: For `[2, 5, 10, 17, 28]`, the result is `[5, 17]`. #### Step 4: **Build the Prime Pyramid** -Using the filtered numbers from Step 3, construct a pyramid. Each level of the pyramid corresponds to a filtered number, and the number determines how many stars `*` appear on that level. +Using the filtered numbers from Step 3, construct a pyramid. +Each pyramid level corresponds to a filtered number, and the number determines how many stars `*` appear on that level. Example: For `[5, 17]`, the pyramid is: ``` ***** @@ -40,9 +41,9 @@ --- ### Optional Extensions: -1. Allow the user to input \( N \) dynamically. +1. Allow the user to input `N`` dynamically. 2. Visualize the pyramid with formatting, like centering the stars. -3. Add validation to ensure \( N \) is a positive integer. +3. Add validation to ensure `N` is a positive integer. 4. Include unit tests for each step. This task is simple, yet it combines loops, conditionals, and basic data manipulations in a creative way! @@ -53,96 +54,88 @@ @stage(analysis_keys=['id_sum', 'primes'], analysis_type='custom') class GeneratePrimes(SequencingGroupStage): - def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, Path | str]: + def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, Path]: return { 'id_sum': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_id_sum.txt', 'primes': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_primes.txt', } - def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None: - # Get batch - b = get_batch() - + def queue_jobs(self, sequencing_group: SequencingGroup, _inputs: StageInput) -> StageOutput: # Print out alignment input for this sequencing group logger.info('-----ALIGNMENT INPUT-----') logger.info(sequencing_group.alignment_input) + outputs = self.expected_outputs(sequencing_group) + # Write id_sum to output file - id_sum_output_path = str(self.expected_outputs(sequencing_group).get('id_sum', '')) - job_id_sum = iterative_digit_sum(b, sequencing_group, self.get_job_attrs(sequencing_group), id_sum_output_path) + job_id_sum = iterative_digit_sum.iterative_digit_sum_job( + sequencing_group, + self.get_job_attrs(sequencing_group), + outputs['id_sum'], + ) # Generate first N primes - primes_output_path = str(self.expected_outputs(sequencing_group).get('primes', '')) - job_primes = first_n_primes( - b, + job_primes = first_n_primes.first_n_primes_job( sequencing_group, - id_sum_output_path, + outputs['id_sum'], self.get_job_attrs(sequencing_group), - primes_output_path, - depends_on=job_id_sum, + outputs['primes'], ) + # set a dependency + job_primes.depends_on(job_id_sum) + jobs = [job_id_sum, job_primes] - return self.make_outputs(sequencing_group, data=self.expected_outputs(sequencing_group), jobs=jobs) # type: ignore + return self.make_outputs(sequencing_group, data=outputs, jobs=jobs) @stage(required_stages=[GeneratePrimes], analysis_keys=['cumulative'], analysis_type='custom') class CumulativeCalc(SequencingGroupStage): - def expected_outputs(self, sequencing_group: SequencingGroup): + def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, Path]: return { 'cumulative': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_cumulative.txt', } def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None: input_txt = inputs.as_path(sequencing_group, GeneratePrimes, 'primes') - b = get_batch() + outputs = self.expected_outputs(sequencing_group) - cumulative_calc_output_path = str(self.expected_outputs(sequencing_group).get('cumulative', '')) - job_cumulative_calc = cumulative_calc( - b, + job_cumulative_calc = cumulative_calc.cumulative_calc_job( sequencing_group, input_txt, self.get_job_attrs(sequencing_group), - cumulative_calc_output_path, + outputs['cumulative'], ) - jobs = [job_cumulative_calc] - return self.make_outputs( sequencing_group, - data=self.expected_outputs(sequencing_group), - jobs=jobs, + data=outputs, + jobs=job_cumulative_calc, ) @stage(required_stages=[GeneratePrimes], analysis_keys=['hello'], analysis_type='custom') class SayHi(SequencingGroupStage): - def expected_outputs(self, sequencing_group: SequencingGroup): + def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, Path]: return { 'hello': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_cumulative.txt', } - def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None: - b = get_batch() - - hello_output_path = str(self.expected_outputs(sequencing_group).get('hello', '')) - job_say_hi = say_hi(b, sequencing_group, self.get_job_attrs(sequencing_group), hello_output_path) - - jobs = [job_say_hi] - + def queue_jobs(self, sequencing_group: SequencingGroup, _inputs: StageInput) -> StageOutput | None: + outputs = self.expected_outputs(sequencing_group) return self.make_outputs( sequencing_group, - data=self.expected_outputs(sequencing_group), - jobs=jobs, + data=outputs, + jobs=say_hi.say_hi_job(sequencing_group, self.get_job_attrs(sequencing_group), outputs['hello']), ) @stage(required_stages=[CumulativeCalc], analysis_keys=['no_evens'], analysis_type='custom') class FilterEvens(CohortStage): - def expected_outputs(self, cohort: Cohort): + def expected_outputs(self, cohort: Cohort) -> dict[str, Path]: sg_outputs = { - sg.id: str(sg.dataset.prefix() / WORKFLOW_FOLDER / f'{sg.id}_no_evens.txt') + sg.id: sg.dataset.prefix() / WORKFLOW_FOLDER / f'{sg.id}_no_evens.txt' for sg in cohort.get_sequencing_groups() } sg_outputs['no_evens'] = cohort.dataset.prefix() / WORKFLOW_FOLDER / f'{cohort.name}_no_evens.txt' @@ -150,29 +143,25 @@ def expected_outputs(self, cohort: Cohort): def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None: input_files = inputs.as_dict_by_target(CumulativeCalc) - b = get_batch() + outputs = self.expected_outputs(cohort) - sg_outputs = self.expected_outputs(cohort) - no_evens_output_path = str(sg_outputs['no_evens']) - job_no_evens = filter_evens( - b, + job_no_evens = filter_evens.filter_evens_job( cohort.get_sequencing_groups(), input_files, self.get_job_attrs(cohort), - sg_outputs, - no_evens_output_path, + outputs, ) return self.make_outputs( cohort, - data=self.expected_outputs(cohort), + data=outputs, jobs=job_no_evens, ) @stage(required_stages=[GeneratePrimes, FilterEvens], analysis_keys=['pyramid'], analysis_type='custom') class BuildAPrimePyramid(MultiCohortStage): - def expected_outputs(self, multicohort: MultiCohort): + def expected_outputs(self, multicohort: MultiCohort) -> dict[str, Path]: return { 'pyramid': multicohort.analysis_dataset.prefix() / WORKFLOW_FOLDER / f'{multicohort.name}_pyramid.txt', } @@ -186,30 +175,29 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu logger.info('----INPUT FILES GENERATE PRIMES----') logger.info(input_files_generate_primes) + outputs = self.expected_outputs(multicohort) + input_files: dict[str, dict[str, Any]] = {} for cohort in multicohort.get_cohorts(): for sg in cohort.get_sequencing_groups(): - input_files[sg.id] = {} - input_files[sg.id]['no_evens'] = input_files_filter_evens[cohort.id][sg.id] - input_files[sg.id]['id_sum'] = input_files_generate_primes[sg.id]['id_sum'] - input_files[sg.id]['primes'] = input_files_generate_primes[sg.id]['primes'] + input_files[sg.id] = { + 'no_evens': input_files_filter_evens[cohort.id][sg.id], + 'id_sum': input_files_generate_primes[sg.id]['id_sum'], + 'primes': input_files_generate_primes[sg.id]['primes'], + } logger.info('----INPUT FILES----') logger.info(input_files) - b = get_batch() - - pyramid_output_path = str(self.expected_outputs(multicohort).get('pyramid', '')) - job_pyramid = build_pyramid( - b, + job_pyramid = build_pyramid.build_pyramid_job( multicohort.get_sequencing_groups(), input_files, self.get_job_attrs(multicohort), - pyramid_output_path, + outputs['pyramid'], ) return self.make_outputs( multicohort, - data=self.expected_outputs(multicohort), + data=outputs, jobs=job_pyramid, ) diff --git a/cpg_flow_test/workflow.py b/cpg_flow_test/workflow.py index 5e05594..2b84abf 100644 --- a/cpg_flow_test/workflow.py +++ b/cpg_flow_test/workflow.py @@ -1,37 +1,37 @@ -#!/usr/bin/env python3 import os import sys -from pathlib import Path + +from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi from cpg_flow.workflow import run_workflow from cpg_utils.config import set_config_paths -from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes, SayHi TMP_DIR = os.getenv('TMP_DIR') -# CONFIG_FILE = str(Path(__file__).parent / 'config.toml') message = "Hello, Hail Batch! I'm CPG flow, nice to meet you." -def run_cpg_flow(dry_run=False): - workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi] - +def run_cpg_flow(dry_run: bool = False) -> None: config_paths = os.environ['CPG_CONFIG_PATH'].split(',') print(f'CPG_CONFIG_PATHS: {config_paths}') # Inserting after the "defaults" config, but before user configs: - # set_config_paths(config_paths[:1] + [CONFIG_FILE] + config_paths[1:]) set_config_paths(config_paths) - run_workflow(stages=workflow, dry_run=dry_run) + + run_workflow( + name='tws_prime_pyramid', + stages=[GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid, SayHi], + dry_run=dry_run, + ) -def validate_batch_workflow(): +def validate_batch_workflow() -> None: if not os.path.exists(f'{TMP_DIR}/out.txt'): print('Batch workflow failed') sys.exit(1) success = False - with open(f'{TMP_DIR}/out.txt', 'r') as f: + with open(f'{TMP_DIR}/out.txt') as f: success = f.read().strip() == message print(f'Batch workflow {"succeeded" if success else "failed"}') diff --git a/pyproject.toml b/pyproject.toml index 6f89777..8b9e975 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,9 +7,8 @@ requires-python = ">=3.10,<3.12" dependencies = [ "analysis-runner>=3.2.2", "cpg-flow~=1.3", - "cpg-utils>=5.1.1", + "cpg-utils>=5.5.0", "hail~=0.2.137", - "loguru>=0.7.3", ] [project.optional-dependencies] @@ -81,141 +80,12 @@ select = [ fixable = ["ALL"] ignore = [ - "ANN201", # Missing return type annotation for public function - "ANN204", # Missing type annotation for special method `__init__` - "ANN401", # Dynamically typed expressions (typing.Any) are disallowed in `**kwargs` - "E501", # Line length too long - "E731", # Do not assign a lambda expression, use a def - "E741", # Ambiguous variable name - "G004", # Logging statement uses f-string - "PLR0911", # Too many return statements - "PLR0912", # Too many branches - "PLR0913", # Too many arguments to function call - "PLR0915", # Too many statements - "PLW0603", # Using the global statement to update `` is discouraged - "PT018", # Assertion should be broken down into multiple parts "Q000", # Single quotes found but double quotes preferred - "S101", # Use of assert detected - "SLF001", # Private member accessed: `_preemptible` - - "ARG001", # Unused function argument - "ARG002", # Unused method argument +] - "PLR2004", # Magic value used +[tool.ruff.lint.isort] +section-order = ["future", "standard-library", "third-party", "hail", "cpg", "first-party", "local-folder"] - "ANN001", - "ANN202", - "C408", - "TID252", - "RET504", - "ERA001", - "UP032", - "RUF100", - "ISC001", - "PIE804", - "F401", - "C901", - "W605", - "RET505", - "ANN003", - "RUF013", - "UP031", - "RUF010", - "B006", - "ANN002", - "B023", - "EXE001", - "G001", - "SIM108", - "RUF005", - "G002", - "PD901", - "N999", - "SIM118", - "SIM102", - "PLW2901", - "S603", - "ARG005", - "PGH003", - "B904", - "N802", - "ISC003", - "ANN205", - "S607", - "RUF015", - "E701", - "N818", - "PIE790", - "N803", - "A002", - "RUF012", - "W291", - "S113", - "S311", - "N806", - "PLR5501", - "F403", - "SIM115", - "B007", - "F841", - "C405", - "C419", - "SIM300", - "PD011", - "UP015", - "S602", - "Q002", - "ISC002", - "COM819", - "C416", - "DTZ005", - "G003", - "S608", - "PIE808", - "B008", - "S108", - "E402", - "S605", - "F821", - "RET507", - "RET503", - "UP030", - "UP026", - "PLR1714", - "C403", - "PLR1711", - "PIE810", - "DTZ011", - "S105", - "BLE001", - "C401", - "C400", - "PLR0402", - "SIM201", - "RET506", - "C417", - "PD010", - "PLW1510", - "A001", - "W292", - "PYI024", - "Q003", - "S301", - "RET501", - "PD003", - "SIM117", - "RUF002", - "SIM105", - "E713", - "S324", - "S310", - "Q001", - "UP020", - "S506", - "N805", - "E712", - "E401", - "SIM212", - "DTZ002", - "UP007", -] +[tool.ruff.lint.isort.sections] +cpg = ["metamist", "cpg_flow", "cpg_utils"] +hail = ["hail", "hailtop"]