diff --git a/pyproject.toml b/pyproject.toml index 57a83c1a..47f88573 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ classifiers = [ # Specify the Python versions you support here. "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ] keywords = ["hail", "flow", "api", "bioinformatics", "genomics"] diff --git a/src/cpg_flow/stage.py b/src/cpg_flow/stage.py index f28aba27..4ec6e4a5 100644 --- a/src/cpg_flow/stage.py +++ b/src/cpg_flow/stage.py @@ -27,7 +27,7 @@ from hailtop.batch.job import Job from cpg_flow.targets import Cohort, Dataset, MultiCohort, SequencingGroup, Target -from cpg_flow.utils import ExpectedResultT, exists +from cpg_flow.utils import ExpectedResultT, dependency_handler, exists from cpg_flow.workflow import Action, WorkflowError, get_workflow, path_walk from cpg_utils import Path, to_path from cpg_utils.config import get_config @@ -554,11 +554,8 @@ def _queue_jobs_with_checks( outputs.stage = self outputs.meta |= self.get_job_attrs(target) - for output_job in outputs.jobs: - if output_job: - for input_job in inputs.get_jobs(target): - assert input_job, f'Input dependency job for stage: {self}, target: {target}' - output_job.depends_on(input_job) + # make all output jobs dependent on all input jobs, but don't extend the list of prior deps + dependency_handler(target=outputs.jobs, tail=inputs.get_jobs(target), append_to_tail=False) if outputs.error_msg: return outputs diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index a82fb23e..5386a88c 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -21,6 +21,7 @@ import hail as hl from hailtop.batch import ResourceFile +from hailtop.batch.job import Job from cpg_utils import Path, to_path from cpg_utils.config import config_retrieve, get_config @@ -417,3 +418,73 @@ def write_to_gcs_bucket(contents, path: Path): blob.upload_from_string(contents) return bucket_name, blob_name + + +def dependency_handler( + target: Job | list[Job] | None, + tail: Job | list[Job] | None, + append_to_tail: bool = True, + only_last: bool = False, +) -> None: + """ + A utility method for handling stage inter-dependencies, when it's possible that either the target job(s) + or dependency job(s) are None/empty list. + + Use case is CPG-Flow or similar tasks, where a job can either run (new output needs to be generated), or doesn't + need to run (results/intermediate files exist from a prior run). This optional process can be in the centre of a + job chain, so the dependency setting, and extension of dependencies is crucial. A common usage pattern is: + ``` + prior_jobs: list[Job] = [] + + j = class.run_process(...) + if j and prior_jobs: + j.depends_on(*prior_jobs) + if j: + prior_jobs.append(j) + ``` + + This scenario would be minimised to: + + utils.dependency_handler(target=j, tail=prior_jobs) + + Args: + target (Job | list[Job] | None): job(s) which require a depends_on relationship with job(s) in tail + tail (Job | list[Job] | None): job(s) for this target job(s) to depend on; may be None + append_to_tail (bool): if this is True, and tail is a list, the current job(s) will be added to the tail + only_last (bool): if this is True, we only set dependencies against the last element in tail, else all of tail + """ + + # if the target is null/empty, nothing to do + if not target: + logger.debug('No Target(s), cannot set depends_on relationships') + return + + # if tail is None we can't set dependencies or append, nothing to do + if tail is None: + logger.debug('No Tail, cannot set depends_on relationships or append') + return + + # downstream operations are easier if we expect to operate on iterables - create new variable name + target_list = ( + target + if isinstance(target, list) + else [ + target, + ] + ) + tail_list = tail if isinstance(tail, list) else [tail] + + # use only_last switch to choose dependencies to set + deps_to_apply = [tail_list[-1]] if only_last else tail_list + + try: + # don't try and call depends_on(*x) with an empty iterable + if tail_list: + for job in target_list: + job.depends_on(*deps_to_apply) + + if append_to_tail and isinstance(tail, list): + tail.extend(target_list) + except AttributeError as ae: + logger.error(f'Failure to set dependencies between target {target} and tail {tail}') + raise ae diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..dd0f8298 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,101 @@ +import logging +from collections.abc import Iterable +from copy import deepcopy +from itertools import product + +import pytest + +from cpg_flow.utils import dependency_handler + + +class MockJob: + """ + A mock class to simulate hailtop.batch.job.Job + """ + + def __init__(self, name: str): + self.name = name + self._dependencies: set[MockJob] = set() + + def depends_on(self, *jobs): + """ + Simulate depends_on by adding jobs to a set + """ + for job in jobs: + self._dependencies.add(job) + + def __repr__(self): + return f'MockJob({self.name})' + + def __eq__(self, other) -> bool: + return self.name == other.name + + def __hash__(self) -> int: + return int(self.name[-1]) + + +@pytest.mark.parametrize( + ['new_dep', 'old_dep', 'error'], + [ + pytest.param(None, None, 'No Target(s), cannot set depends_on relationships'), + pytest.param(MockJob('job1'), None, 'No Tail, cannot set depends_on relationships or append'), + pytest.param(None, MockJob('job1'), 'No Target(s), cannot set depends_on relationships'), + pytest.param(None, [MockJob('job1')], 'No Target(s), cannot set depends_on relationships'), + ], +) +def test_all_dependency_handlers_null(new_dep, old_dep, error: str, caplog): + caplog.set_level(logging.DEBUG) + + dependency_handler(target=new_dep, tail=old_dep) + assert error in caplog.text + + +@pytest.mark.parametrize( + ['new_dep', 'old_dep', 'append_arg', 'expect_append'], + [ + pytest.param(MockJob('job1'), [MockJob('job2'), MockJob('job3')], True, True), # set and append + pytest.param([MockJob('job1'), MockJob('job2')], [MockJob('job3')], True, True), # set and append + pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), False, False), # set and don't append + pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), True, True), # set and fail to append + pytest.param([MockJob('job1')], [], True, True), # set and fail to append + ], +) +def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_append: bool, caplog): + og_tail_list = deepcopy(old_dep if isinstance(old_dep, Iterable) else [old_dep]) + + dependency_handler(target=new_dep, tail=old_dep, append_to_tail=append_arg) + + new_dep_list = new_dep if isinstance(new_dep, Iterable) else [new_dep] + new_tail_list = old_dep if isinstance(old_dep, Iterable) else [old_dep] + + # dependency setting, we expect all the original tail list to be in the current target dependencies + for each_new, each_old in product(new_dep_list, og_tail_list): + assert each_old in each_new._dependencies + + # appending, we expect all the original targets to be in the new tail + if expect_append and isinstance(old_dep, list): + for each_new in new_dep_list: + assert each_new in new_tail_list + else: + for each_new in new_dep_list: + assert each_new not in new_tail_list + + +def test_depends_on_none(caplog): + caplog.set_level(logging.WARNING) + with pytest.raises(AttributeError): + dependency_handler( + target=[MockJob('job1'), None], tail=[MockJob('job2'), MockJob('job3')], append_to_tail=False + ) + + assert 'Failure to set dependencies between target ' in caplog.text + + +def test_depends_on_only_last(): + # new behaviour - what if we only want to dependency set on the last in a job series + target = MockJob('job1') + tail = [MockJob('job2'), MockJob('job3'), MockJob('job4')] + dependency_handler(target=target, tail=tail, only_last=True) + + assert target._dependencies == {MockJob('job4')} + assert len(tail) == 4