Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ classifiers = [
# Specify the Python versions you support here.
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
]
keywords = ["hail", "flow", "api", "bioinformatics", "genomics"]

Expand Down
9 changes: 3 additions & 6 deletions src/cpg_flow/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from hailtop.batch.job import Job

from cpg_flow.targets import Cohort, Dataset, MultiCohort, SequencingGroup, Target
from cpg_flow.utils import ExpectedResultT, exists
from cpg_flow.utils import ExpectedResultT, dependency_handler, exists
from cpg_flow.workflow import Action, WorkflowError, get_workflow, path_walk
from cpg_utils import Path, to_path
from cpg_utils.config import get_config
Expand Down Expand Up @@ -554,11 +554,8 @@ def _queue_jobs_with_checks(
outputs.stage = self
outputs.meta |= self.get_job_attrs(target)

for output_job in outputs.jobs:
if output_job:
for input_job in inputs.get_jobs(target):
assert input_job, f'Input dependency job for stage: {self}, target: {target}'
output_job.depends_on(input_job)
# make all output jobs dependent on all input jobs, but don't extend the list of prior deps
dependency_handler(target=outputs.jobs, tail=inputs.get_jobs(target), append_to_tail=False)

if outputs.error_msg:
return outputs
Expand Down
71 changes: 71 additions & 0 deletions src/cpg_flow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import hail as hl
from hailtop.batch import ResourceFile
from hailtop.batch.job import Job

from cpg_utils import Path, to_path
from cpg_utils.config import config_retrieve, get_config
Expand Down Expand Up @@ -417,3 +418,73 @@ def write_to_gcs_bucket(contents, path: Path):
blob.upload_from_string(contents)

return bucket_name, blob_name


def dependency_handler(
target: Job | list[Job] | None,
tail: Job | list[Job] | None,
append_to_tail: bool = True,
only_last: bool = False,
) -> None:
"""
A utility method for handling stage inter-dependencies, when it's possible that either the target job(s)
or dependency job(s) are None/empty list.

Use case is CPG-Flow or similar tasks, where a job can either run (new output needs to be generated), or doesn't
need to run (results/intermediate files exist from a prior run). This optional process can be in the centre of a
job chain, so the dependency setting, and extension of dependencies is crucial. A common usage pattern is:
```
prior_jobs: list[Job] = []

j = class.run_process(...)
if j and prior_jobs:
j.depends_on(*prior_jobs)
if j:
prior_jobs.append(j)
```

This scenario would be minimised to:

utils.dependency_handler(target=j, tail=prior_jobs)

Args:
target (Job | list[Job] | None): job(s) which require a depends_on relationship with job(s) in tail
tail (Job | list[Job] | None): job(s) for this target job(s) to depend on; may be None
append_to_tail (bool): if this is True, and tail is a list, the current job(s) will be added to the tail
only_last (bool): if this is True, we only set dependencies against the last element in tail, else all of tail
"""

# if the target is null/empty, nothing to do
if not target:
logger.debug('No Target(s), cannot set depends_on relationships')
return

# if tail is None we can't set dependencies or append, nothing to do
if tail is None:
logger.debug('No Tail, cannot set depends_on relationships or append')
return

# downstream operations are easier if we expect to operate on iterables - create new variable name
target_list = (
target
if isinstance(target, list)
else [
target,
]
)
tail_list = tail if isinstance(tail, list) else [tail]

# use only_last switch to choose dependencies to set
deps_to_apply = [tail_list[-1]] if only_last else tail_list

try:
# don't try and call depends_on(*x) with an empty iterable
if tail_list:
for job in target_list:
job.depends_on(*deps_to_apply)

if append_to_tail and isinstance(tail, list):
tail.extend(target_list)
except AttributeError as ae:
logger.error(f'Failure to set dependencies between target {target} and tail {tail}')
raise ae
101 changes: 101 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from collections.abc import Iterable
from copy import deepcopy
from itertools import product

import pytest

from cpg_flow.utils import dependency_handler


class MockJob:
"""
A mock class to simulate hailtop.batch.job.Job
"""

def __init__(self, name: str):
self.name = name
self._dependencies: set[MockJob] = set()

def depends_on(self, *jobs):
"""
Simulate depends_on by adding jobs to a set
"""
for job in jobs:
self._dependencies.add(job)

def __repr__(self):
return f'MockJob({self.name})'

def __eq__(self, other) -> bool:
return self.name == other.name

def __hash__(self) -> int:
return int(self.name[-1])


@pytest.mark.parametrize(
['new_dep', 'old_dep', 'error'],
[
pytest.param(None, None, 'No Target(s), cannot set depends_on relationships'),
pytest.param(MockJob('job1'), None, 'No Tail, cannot set depends_on relationships or append'),
pytest.param(None, MockJob('job1'), 'No Target(s), cannot set depends_on relationships'),
pytest.param(None, [MockJob('job1')], 'No Target(s), cannot set depends_on relationships'),
],
)
def test_all_dependency_handlers_null(new_dep, old_dep, error: str, caplog):
caplog.set_level(logging.DEBUG)

dependency_handler(target=new_dep, tail=old_dep)
assert error in caplog.text


@pytest.mark.parametrize(
['new_dep', 'old_dep', 'append_arg', 'expect_append'],
[
pytest.param(MockJob('job1'), [MockJob('job2'), MockJob('job3')], True, True), # set and append
pytest.param([MockJob('job1'), MockJob('job2')], [MockJob('job3')], True, True), # set and append
pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), False, False), # set and don't append
pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), True, True), # set and fail to append
pytest.param([MockJob('job1')], [], True, True), # set and fail to append
],
)
def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_append: bool, caplog):
og_tail_list = deepcopy(old_dep if isinstance(old_dep, Iterable) else [old_dep])

dependency_handler(target=new_dep, tail=old_dep, append_to_tail=append_arg)

new_dep_list = new_dep if isinstance(new_dep, Iterable) else [new_dep]
new_tail_list = old_dep if isinstance(old_dep, Iterable) else [old_dep]

# dependency setting, we expect all the original tail list to be in the current target dependencies
for each_new, each_old in product(new_dep_list, og_tail_list):
assert each_old in each_new._dependencies

# appending, we expect all the original targets to be in the new tail
if expect_append and isinstance(old_dep, list):
for each_new in new_dep_list:
assert each_new in new_tail_list
else:
for each_new in new_dep_list:
assert each_new not in new_tail_list


def test_depends_on_none(caplog):
caplog.set_level(logging.WARNING)
with pytest.raises(AttributeError):
dependency_handler(
target=[MockJob('job1'), None], tail=[MockJob('job2'), MockJob('job3')], append_to_tail=False
)

assert 'Failure to set dependencies between target ' in caplog.text


def test_depends_on_only_last():
# new behaviour - what if we only want to dependency set on the last in a job series
target = MockJob('job1')
tail = [MockJob('job2'), MockJob('job3'), MockJob('job4')]
dependency_handler(target=target, tail=tail, only_last=True)

assert target._dependencies == {MockJob('job4')}
assert len(tail) == 4
Loading