From ba523d4b95a98afb50ae1e53f7bd5e2acde56057 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Tue, 20 Jan 2026 17:24:25 +1000 Subject: [PATCH 01/16] fix(utils.py): add dependency handling wrapper --- src/cpg_flow/utils.py | 48 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index a82fb23e..3e2aa9c7 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -21,6 +21,7 @@ import hail as hl from hailtop.batch import ResourceFile +from hailtop.batch.job import Job from cpg_utils import Path, to_path from cpg_utils.config import config_retrieve, get_config @@ -417,3 +418,50 @@ def write_to_gcs_bucket(contents, path: Path): blob.upload_from_string(contents) return bucket_name, blob_name + + +def dependency_handler( + target: Job | list[Job] | None, tail: Job | list[Job] | None, append_or_extend: bool = True +) -> None: + """ + A utility method for handling stage inter-dependencies, when it's possible that either the target job(s) + or dependency job(s) are None/empty list. + + Use case is CPG-Flow or similar tasks, where a job can either run (new output needs to be generated), or doesn't + need to run (results/intermediate files exist from a prior run). This optional process can be in the centre of a + job chain, so the dependency setting, and extension of dependencies is crucial. A common usage pattern is: + ``` + prior_jobs: list[Job] = [] + + j = class.run_process(...) + if j and prior_jobs: + j.depends_on(*prior_jobs) + if j: + prior_jobs.append(j) + ``` + + This scenario would be minimised to: + + utils.dependency_handler(target=j, tail=prior_jobs) + + Args: + target (Job | list[Job] | None): job(s) which require a depends_on relationship with job(s) in tail + tail (Job | list[Job] | None): job(s) for this target job(s) to depend on; may be None + append_or_extend: (bool) if this is True, and tail is a list, the current job(s) will be added to the tail + """ + + # no way to set a relationship between non-jobs + if target is None or tail is None: + return + + # easier if we expect everything to be a list + if isinstance(target, Job): + target = [target] + + tail_list = tail if isinstance(tail, list) else [tail] + + for job in target: + job.depends_on(*tail_list) + + if append_or_extend and isinstance(tail, list): + tail.extend(target) From a670906643946b2e956b5e01cad6595791db0a7d Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 08:59:19 +1000 Subject: [PATCH 02/16] fix(test_utils.py): added dependency test cases --- src/cpg_flow/utils.py | 14 +++-- tests/test_utils.py | 125 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 tests/test_utils.py diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 3e2aa9c7..ba4edba3 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -9,7 +9,7 @@ import time import traceback import unicodedata -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from functools import lru_cache from itertools import chain, islice from os.path import basename, dirname, join @@ -421,7 +421,7 @@ def write_to_gcs_bucket(contents, path: Path): def dependency_handler( - target: Job | list[Job] | None, tail: Job | list[Job] | None, append_or_extend: bool = True + target: Job | Iterable[Job] | None, tail: Job | Iterable[Job] | None, append_or_extend: bool = True ) -> None: """ A utility method for handling stage inter-dependencies, when it's possible that either the target job(s) @@ -445,8 +445,8 @@ def dependency_handler( utils.dependency_handler(target=j, tail=prior_jobs) Args: - target (Job | list[Job] | None): job(s) which require a depends_on relationship with job(s) in tail - tail (Job | list[Job] | None): job(s) for this target job(s) to depend on; may be None + target (Job | Iterable[Job] | None): job(s) which require a depends_on relationship with job(s) in tail + tail (Job | Iterable[Job] | None): job(s) for this target job(s) to depend on; may be None append_or_extend: (bool) if this is True, and tail is a list, the current job(s) will be added to the tail """ @@ -455,13 +455,15 @@ def dependency_handler( return # easier if we expect everything to be a list - if isinstance(target, Job): + if not isinstance(target, Iterable): target = [target] - tail_list = tail if isinstance(tail, list) else [tail] + tail_list = tail if isinstance(tail, Iterable) else [tail] for job in target: job.depends_on(*tail_list) if append_or_extend and isinstance(tail, list): tail.extend(target) + if append_or_extend and isinstance(tail, set): + tail.update(target) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..eb680427 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,125 @@ +import pytest + +from cpg_flow.utils import dependency_handler + + +class MockJob: + """ + A mock class to simulate hailtop.batch.job.Job + """ + + def __init__(self, name: str): + self.name = name + self._dependencies: set[MockJob] = set() + + def depends_on(self, *jobs): + """ + Simulate depends_on by adding jobs to a set + """ + for job in jobs: + self._dependencies.add(job) + + def __repr__(self): + return f'MockJob({self.name})' + + +def test_dependency_handler_none(): + """ + Test that None inputs are handled gracefully + """ + # Should not raise exception + dependency_handler(None, None) + + job = MockJob('job1') + dependency_handler(job, None) + assert len(job._dependencies) == 0 + + dependency_handler(None, job) + + +def test_dependency_handler_single_single(): + """ + Test 1:1 dependency + """ + job1 = MockJob('source') + job2 = MockJob('dependency') + + dependency_handler(job1, job2) + assert job2 in job1._dependencies + + +def test_dependency_handler_list_single(): + """ + Test Many:1 dependency + """ + job1 = MockJob('source1') + job2 = MockJob('source2') + dep = MockJob('dependency') + + dependency_handler([job1, job2], dep) + assert dep in job1._dependencies + assert dep in job2._dependencies + + +def test_dependency_handler_single_list(): + """ + Test 1:Many dependency + """ + job = MockJob('source') + dep1 = MockJob('dependency1') + dep2 = MockJob('dependency2') + + dependency_handler(job, [dep1, dep2]) + assert dep1 in job._dependencies + assert dep2 in job._dependencies + + +def test_dependency_handler_append_list(): + """ + Test appending to list + """ + job = MockJob('new_job') + dep1 = MockJob('existing_job') + prior_jobs = [dep1] + + dependency_handler(job, prior_jobs, append_or_extend=True) + + # Check dependency set + assert dep1 in job._dependencies + # Check list extension + assert job in prior_jobs + assert len(prior_jobs) == 2 + + +def test_dependency_handler_no_append_list(): + """ + Test NOT appending to list + """ + job = MockJob('new_job') + dep1 = MockJob('existing_job') + prior_jobs = [dep1] + + dependency_handler(job, prior_jobs, append_or_extend=False) + + # Check dependency set + assert dep1 in job._dependencies + # Check list NOT extended + assert job not in prior_jobs + assert len(prior_jobs) == 1 + + +def test_dependency_handler_update_set(): + """ + Test updating a set + """ + job = MockJob('new_job') + dep1 = MockJob('existing_job') + prior_jobs = {dep1} + + dependency_handler(job, prior_jobs, append_or_extend=True) + + # Check dependency set + assert dep1 in job._dependencies + # Check set update + assert job in prior_jobs + assert len(prior_jobs) == 2 From c98ac05937a8a709d0f83d7afeb9ae83ff208a25 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 10:53:38 +1000 Subject: [PATCH 03/16] fix(test_utils.py): pytest.parametrize --- src/cpg_flow/utils.py | 12 +- tests/test_utils.py | 260 ++++++++++++++++++++++++++---------------- 2 files changed, 167 insertions(+), 105 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index ba4edba3..6d44530b 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -452,6 +452,7 @@ def dependency_handler( # no way to set a relationship between non-jobs if target is None or tail is None: + logger.debug('No target or tail provided') return # easier if we expect everything to be a list @@ -463,7 +464,10 @@ def dependency_handler( for job in target: job.depends_on(*tail_list) - if append_or_extend and isinstance(tail, list): - tail.extend(target) - if append_or_extend and isinstance(tail, set): - tail.update(target) + if append_or_extend: + if isinstance(tail, list): + tail.extend(target) + if isinstance(tail, set): + tail.update(target) + else: + logger.warning(f'Append requested, but tail is not an iterable: {tail}') diff --git a/tests/test_utils.py b/tests/test_utils.py index eb680427..912ecbab 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,3 +1,8 @@ +import logging +from collections.abc import Iterable +from copy import deepcopy +from itertools import product + import pytest from cpg_flow.utils import dependency_handler @@ -22,104 +27,157 @@ def depends_on(self, *jobs): def __repr__(self): return f'MockJob({self.name})' - -def test_dependency_handler_none(): - """ - Test that None inputs are handled gracefully - """ - # Should not raise exception - dependency_handler(None, None) - - job = MockJob('job1') - dependency_handler(job, None) - assert len(job._dependencies) == 0 - - dependency_handler(None, job) - - -def test_dependency_handler_single_single(): - """ - Test 1:1 dependency - """ - job1 = MockJob('source') - job2 = MockJob('dependency') - - dependency_handler(job1, job2) - assert job2 in job1._dependencies - - -def test_dependency_handler_list_single(): - """ - Test Many:1 dependency - """ - job1 = MockJob('source1') - job2 = MockJob('source2') - dep = MockJob('dependency') - - dependency_handler([job1, job2], dep) - assert dep in job1._dependencies - assert dep in job2._dependencies - - -def test_dependency_handler_single_list(): - """ - Test 1:Many dependency - """ - job = MockJob('source') - dep1 = MockJob('dependency1') - dep2 = MockJob('dependency2') - - dependency_handler(job, [dep1, dep2]) - assert dep1 in job._dependencies - assert dep2 in job._dependencies - - -def test_dependency_handler_append_list(): - """ - Test appending to list - """ - job = MockJob('new_job') - dep1 = MockJob('existing_job') - prior_jobs = [dep1] - - dependency_handler(job, prior_jobs, append_or_extend=True) - - # Check dependency set - assert dep1 in job._dependencies - # Check list extension - assert job in prior_jobs - assert len(prior_jobs) == 2 - - -def test_dependency_handler_no_append_list(): - """ - Test NOT appending to list - """ - job = MockJob('new_job') - dep1 = MockJob('existing_job') - prior_jobs = [dep1] - - dependency_handler(job, prior_jobs, append_or_extend=False) - - # Check dependency set - assert dep1 in job._dependencies - # Check list NOT extended - assert job not in prior_jobs - assert len(prior_jobs) == 1 - - -def test_dependency_handler_update_set(): - """ - Test updating a set - """ - job = MockJob('new_job') - dep1 = MockJob('existing_job') - prior_jobs = {dep1} - - dependency_handler(job, prior_jobs, append_or_extend=True) - - # Check dependency set - assert dep1 in job._dependencies - # Check set update - assert job in prior_jobs - assert len(prior_jobs) == 2 + def __eq__(self, other) -> bool: + return self.name == other.name + + def __hash__(self) -> int: + return int(self.name[-1]) + + +@pytest.mark.parametrize( + ['new_dep', 'old_dep', 'append_arg', 'expect_dep', 'expect_append'], + [ + pytest.param(None, None, True, False, False), # null case + pytest.param(MockJob('job1'), None, True, False, False), # null case + pytest.param(None, MockJob('job1'), True, False, False), # null case + pytest.param([MockJob('job1')], None, True, False, False), # null case + pytest.param(None, [MockJob('job1')], True, False, False), # null case + ], +) +def test_all_dependency_handlers_null(new_dep, old_dep, append_arg, expect_dep: bool, expect_append: bool, caplog): + caplog.set_level(logging.DEBUG) + + dependency_handler(target=new_dep, tail=old_dep, append_or_extend=append_arg) + assert 'No target or tail provided' in caplog.text + + +@pytest.mark.parametrize( + ['new_dep', 'old_dep', 'append_arg', 'expect_append'], + [ + pytest.param(MockJob('job1'), [MockJob('job2'), MockJob('job3')], True, True), # set and append + pytest.param([MockJob('job1'), MockJob('job2')], [MockJob('job3')], True, True), # set and append + pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), False, False), # set and don't append + pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), True, True), # set and fail to append + ], +) +def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_append: bool, caplog): + og_tail_list = deepcopy(old_dep if isinstance(old_dep, Iterable) else [old_dep]) + + dependency_handler(target=new_dep, tail=old_dep, append_or_extend=append_arg) + + new_dep_list = deepcopy(new_dep if isinstance(new_dep, Iterable) else [new_dep]) + new_tail_list = deepcopy(old_dep if isinstance(old_dep, Iterable) else [old_dep]) + + # dependency setting, we expect all the original tail list to be in the current target dependencies + for each_new, each_old in product(new_dep_list, og_tail_list): + assert each_old in each_new._dependencies + + # appending, we expect all the original targets to be in the new tail + if expect_append: + for each_new in new_dep_list: + try: + assert each_new in new_tail_list + except AssertionError: + assert 'Append requested, but tail is not an iterable:' in caplog.text + + +# def test_dependency_handler_none(): +# """ +# Test that None inputs are handled gracefully +# """ +# # Should not raise exception +# dependency_handler(None, None) +# +# job = MockJob('job1') +# dependency_handler(job, None) +# assert len(job._dependencies) == 0 +# +# dependency_handler(None, job) +# +# +# def test_dependency_handler_single_single(): +# """ +# Test 1:1 dependency +# """ +# job1 = MockJob('source') +# job2 = MockJob('dependency') +# +# dependency_handler(job1, job2) +# assert job2 in job1._dependencies +# +# +# def test_dependency_handler_list_single(): +# """ +# Test Many:1 dependency +# """ +# job1 = MockJob('source1') +# job2 = MockJob('source2') +# dep = MockJob('dependency') +# +# dependency_handler([job1, job2], dep) +# assert dep in job1._dependencies +# assert dep in job2._dependencies +# +# +# def test_dependency_handler_single_list(): +# """ +# Test 1:Many dependency +# """ +# job = MockJob('source') +# dep1 = MockJob('dependency1') +# dep2 = MockJob('dependency2') +# +# dependency_handler(job, [dep1, dep2]) +# assert dep1 in job._dependencies +# assert dep2 in job._dependencies +# +# +# def test_dependency_handler_append_list(): +# """ +# Test appending to list +# """ +# job = MockJob('new_job') +# dep1 = MockJob('existing_job') +# prior_jobs = [dep1] +# +# dependency_handler(job, prior_jobs, append_or_extend=True) +# +# # Check dependency set +# assert dep1 in job._dependencies +# # Check list extension +# assert job in prior_jobs +# assert len(prior_jobs) == 2 +# +# +# def test_dependency_handler_no_append_list(): +# """ +# Test NOT appending to list +# """ +# job = MockJob('new_job') +# dep1 = MockJob('existing_job') +# prior_jobs = [dep1] +# +# dependency_handler(job, prior_jobs, append_or_extend=False) +# +# # Check dependency set +# assert dep1 in job._dependencies +# # Check list NOT extended +# assert job not in prior_jobs +# assert len(prior_jobs) == 1 +# +# +# def test_dependency_handler_update_set(): +# """ +# Test updating a set +# """ +# job = MockJob('new_job') +# dep1 = MockJob('existing_job') +# prior_jobs = {dep1} +# +# dependency_handler(job, prior_jobs, append_or_extend=True) +# +# # Check dependency set +# assert dep1 in job._dependencies +# # Check set update +# assert job in prior_jobs +# assert len(prior_jobs) == 2 From 0c9e4741421983b3c8f048f81fa2871d56245a4b Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 10:54:23 +1000 Subject: [PATCH 04/16] fix(test_utils.py): less deepcopy --- tests/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index 912ecbab..bed04282 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -65,8 +65,8 @@ def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_appen dependency_handler(target=new_dep, tail=old_dep, append_or_extend=append_arg) - new_dep_list = deepcopy(new_dep if isinstance(new_dep, Iterable) else [new_dep]) - new_tail_list = deepcopy(old_dep if isinstance(old_dep, Iterable) else [old_dep]) + new_dep_list = new_dep if isinstance(new_dep, Iterable) else [new_dep] + new_tail_list = old_dep if isinstance(old_dep, Iterable) else [old_dep] # dependency setting, we expect all the original tail list to be in the current target dependencies for each_new, each_old in product(new_dep_list, og_tail_list): From ebc3fc9bbdd51b27358f688c26f299ad5ba68f7a Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 10:55:13 +1000 Subject: [PATCH 05/16] fix(test_utils.py): extra test case --- tests/test_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_utils.py b/tests/test_utils.py index bed04282..152c166f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -79,6 +79,9 @@ def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_appen assert each_new in new_tail_list except AssertionError: assert 'Append requested, but tail is not an iterable:' in caplog.text + else: + for each_new in new_dep_list: + assert each_new not in new_tail_list # def test_dependency_handler_none(): From 628acaee3fbd51e013f6af3441d55c0f76221044 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 10:56:47 +1000 Subject: [PATCH 06/16] fix(test_utils.py): remove commented out tests --- tests/test_utils.py | 102 -------------------------------------------- 1 file changed, 102 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index 152c166f..569d4d9f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -82,105 +82,3 @@ def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_appen else: for each_new in new_dep_list: assert each_new not in new_tail_list - - -# def test_dependency_handler_none(): -# """ -# Test that None inputs are handled gracefully -# """ -# # Should not raise exception -# dependency_handler(None, None) -# -# job = MockJob('job1') -# dependency_handler(job, None) -# assert len(job._dependencies) == 0 -# -# dependency_handler(None, job) -# -# -# def test_dependency_handler_single_single(): -# """ -# Test 1:1 dependency -# """ -# job1 = MockJob('source') -# job2 = MockJob('dependency') -# -# dependency_handler(job1, job2) -# assert job2 in job1._dependencies -# -# -# def test_dependency_handler_list_single(): -# """ -# Test Many:1 dependency -# """ -# job1 = MockJob('source1') -# job2 = MockJob('source2') -# dep = MockJob('dependency') -# -# dependency_handler([job1, job2], dep) -# assert dep in job1._dependencies -# assert dep in job2._dependencies -# -# -# def test_dependency_handler_single_list(): -# """ -# Test 1:Many dependency -# """ -# job = MockJob('source') -# dep1 = MockJob('dependency1') -# dep2 = MockJob('dependency2') -# -# dependency_handler(job, [dep1, dep2]) -# assert dep1 in job._dependencies -# assert dep2 in job._dependencies -# -# -# def test_dependency_handler_append_list(): -# """ -# Test appending to list -# """ -# job = MockJob('new_job') -# dep1 = MockJob('existing_job') -# prior_jobs = [dep1] -# -# dependency_handler(job, prior_jobs, append_or_extend=True) -# -# # Check dependency set -# assert dep1 in job._dependencies -# # Check list extension -# assert job in prior_jobs -# assert len(prior_jobs) == 2 -# -# -# def test_dependency_handler_no_append_list(): -# """ -# Test NOT appending to list -# """ -# job = MockJob('new_job') -# dep1 = MockJob('existing_job') -# prior_jobs = [dep1] -# -# dependency_handler(job, prior_jobs, append_or_extend=False) -# -# # Check dependency set -# assert dep1 in job._dependencies -# # Check list NOT extended -# assert job not in prior_jobs -# assert len(prior_jobs) == 1 -# -# -# def test_dependency_handler_update_set(): -# """ -# Test updating a set -# """ -# job = MockJob('new_job') -# dep1 = MockJob('existing_job') -# prior_jobs = {dep1} -# -# dependency_handler(job, prior_jobs, append_or_extend=True) -# -# # Check dependency set -# assert dep1 in job._dependencies -# # Check set update -# assert job in prior_jobs -# assert len(prior_jobs) == 2 From 324d6a653eb0fa162809a8b02fcfb0993e2f5487 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 11:00:34 +1000 Subject: [PATCH 07/16] fix(stage.py): add implementation --- src/cpg_flow/stage.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/cpg_flow/stage.py b/src/cpg_flow/stage.py index f28aba27..60ed445f 100644 --- a/src/cpg_flow/stage.py +++ b/src/cpg_flow/stage.py @@ -27,7 +27,7 @@ from hailtop.batch.job import Job from cpg_flow.targets import Cohort, Dataset, MultiCohort, SequencingGroup, Target -from cpg_flow.utils import ExpectedResultT, exists +from cpg_flow.utils import ExpectedResultT, dependency_handler, exists from cpg_flow.workflow import Action, WorkflowError, get_workflow, path_walk from cpg_utils import Path, to_path from cpg_utils.config import get_config @@ -554,11 +554,8 @@ def _queue_jobs_with_checks( outputs.stage = self outputs.meta |= self.get_job_attrs(target) - for output_job in outputs.jobs: - if output_job: - for input_job in inputs.get_jobs(target): - assert input_job, f'Input dependency job for stage: {self}, target: {target}' - output_job.depends_on(input_job) + # make all output jobs dependent on all input jobs, but don't extend the list of prior deps + dependency_handler(target=outputs.jobs, tail=inputs.get_jobs(target), append_or_extend=False) if outputs.error_msg: return outputs From ed3ab866b189e0e9510dfcc7b91e2d7f5e677051 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 11:09:03 +1000 Subject: [PATCH 08/16] fix(test_utils.py): add error handling --- src/cpg_flow/utils.py | 24 ++++++++++++++---------- tests/test_utils.py | 10 ++++++++++ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 6d44530b..9b0c27e4 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -461,13 +461,17 @@ def dependency_handler( tail_list = tail if isinstance(tail, Iterable) else [tail] - for job in target: - job.depends_on(*tail_list) - - if append_or_extend: - if isinstance(tail, list): - tail.extend(target) - if isinstance(tail, set): - tail.update(target) - else: - logger.warning(f'Append requested, but tail is not an iterable: {tail}') + try: + for job in target: + job.depends_on(*tail_list) + + if append_or_extend: + if isinstance(tail, list): + tail.extend(target) + if isinstance(tail, set): + tail.update(target) + else: + logger.warning(f'Append requested, but tail is not an iterable: {tail}') + except AttributeError as ae: + logger.error(f'Failure to set dependencies between target {target} and tail {tail}') + raise ae diff --git a/tests/test_utils.py b/tests/test_utils.py index 569d4d9f..34d6def0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -82,3 +82,13 @@ def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_appen else: for each_new in new_dep_list: assert each_new not in new_tail_list + + +def test_depends_on_none(caplog): + caplog.set_level(logging.WARNING) + with pytest.raises(AttributeError): + dependency_handler( + target=[MockJob('job1'), None], tail=[MockJob('job2'), MockJob('job3')], append_or_extend=False + ) + + assert 'Failure to set dependencies between target ' in caplog.text From e8b43b505151468464c67f43387b32a1799c4140 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 11:40:01 +1000 Subject: [PATCH 09/16] fix(utils.py): strange logging error --- src/cpg_flow/utils.py | 23 +++++++++++++++++------ tests/test_utils.py | 18 +++++++++--------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 9b0c27e4..7080c6de 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -450,20 +450,31 @@ def dependency_handler( append_or_extend: (bool) if this is True, and tail is a list, the current job(s) will be added to the tail """ - # no way to set a relationship between non-jobs - if target is None or tail is None: - logger.debug('No target or tail provided') + # no way to set a relationship between non-jobs - values can be none, or empty lists + # if the tail is an empty list (no truthiness), we may still want to append to it + + # if the target is null/empty, nothing to do + if not target: + logger.debug('No Target(s), cannot set depends_on relationships') + return + + # if tail is None we can't set dependencies or append, nothing to do + if tail is None: + logger.debug('No Tail, cannot set depends_on relationships or append') return # easier if we expect everything to be a list if not isinstance(target, Iterable): target = [target] + target = target if isinstance(target, Iterable) else [target] tail_list = tail if isinstance(tail, Iterable) else [tail] try: - for job in target: - job.depends_on(*tail_list) + # don't try and call depends_on(*x) with an empty iterable + if tail_list: + for job in target: + job.depends_on(*tail_list) if append_or_extend: if isinstance(tail, list): @@ -471,7 +482,7 @@ def dependency_handler( if isinstance(tail, set): tail.update(target) else: - logger.warning(f'Append requested, but tail is not an iterable: {tail}') + logger.warning(f'Append requested, but tail is not an iterable: {tail}({type(tail)})') except AttributeError as ae: logger.error(f'Failure to set dependencies between target {target} and tail {tail}') raise ae diff --git a/tests/test_utils.py b/tests/test_utils.py index 34d6def0..3c3501a8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -35,20 +35,19 @@ def __hash__(self) -> int: @pytest.mark.parametrize( - ['new_dep', 'old_dep', 'append_arg', 'expect_dep', 'expect_append'], + ['new_dep', 'old_dep', 'error'], [ - pytest.param(None, None, True, False, False), # null case - pytest.param(MockJob('job1'), None, True, False, False), # null case - pytest.param(None, MockJob('job1'), True, False, False), # null case - pytest.param([MockJob('job1')], None, True, False, False), # null case - pytest.param(None, [MockJob('job1')], True, False, False), # null case + pytest.param(None, None, 'No Target(s), cannot set depends_on relationships'), + pytest.param(MockJob('job1'), None, 'No Tail, cannot set depends_on relationships or append'), + pytest.param(None, MockJob('job1'), 'No Target(s), cannot set depends_on relationships'), + pytest.param(None, [MockJob('job1')], 'No Target(s), cannot set depends_on relationships'), ], ) -def test_all_dependency_handlers_null(new_dep, old_dep, append_arg, expect_dep: bool, expect_append: bool, caplog): +def test_all_dependency_handlers_null(new_dep, old_dep, error: str, caplog): caplog.set_level(logging.DEBUG) - dependency_handler(target=new_dep, tail=old_dep, append_or_extend=append_arg) - assert 'No target or tail provided' in caplog.text + dependency_handler(target=new_dep, tail=old_dep) + assert error in caplog.text @pytest.mark.parametrize( @@ -58,6 +57,7 @@ def test_all_dependency_handlers_null(new_dep, old_dep, append_arg, expect_dep: pytest.param([MockJob('job1'), MockJob('job2')], [MockJob('job3')], True, True), # set and append pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), False, False), # set and don't append pytest.param([MockJob('job1'), MockJob('job2')], MockJob('job3'), True, True), # set and fail to append + pytest.param([MockJob('job1')], [], True, True), # set and fail to append ], ) def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_append: bool, caplog): From a4becea935d45ceed24c6fcbeeb8519822b17ceb Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 14:50:06 +1000 Subject: [PATCH 10/16] fix(pyproject.toml): python version --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 57a83c1a..47f88573 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ classifiers = [ # Specify the Python versions you support here. "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ] keywords = ["hail", "flow", "api", "bioinformatics", "genomics"] From 51ff49493149b8b13592c3fcf58190494fef0c38 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 15:02:07 +1000 Subject: [PATCH 11/16] fix(utils.py): remove duplicated operation --- src/cpg_flow/utils.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 7080c6de..2e818a0e 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -463,10 +463,7 @@ def dependency_handler( logger.debug('No Tail, cannot set depends_on relationships or append') return - # easier if we expect everything to be a list - if not isinstance(target, Iterable): - target = [target] - + # easier if we expect to operate on iterables target = target if isinstance(target, Iterable) else [target] tail_list = tail if isinstance(tail, Iterable) else [tail] From 2694aff52b080855dc55d0000c25297cc133e86f Mon Sep 17 00:00:00 2001 From: MattWellie Date: Wed, 21 Jan 2026 15:32:53 +1000 Subject: [PATCH 12/16] fix(test_utils.py): add a depends-on-last --- src/cpg_flow/utils.py | 15 +++++++++++---- tests/test_utils.py | 10 ++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 2e818a0e..43faf1da 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -421,7 +421,10 @@ def write_to_gcs_bucket(contents, path: Path): def dependency_handler( - target: Job | Iterable[Job] | None, tail: Job | Iterable[Job] | None, append_or_extend: bool = True + target: Job | Iterable[Job] | None, + tail: Job | Iterable[Job] | None, + append_or_extend: bool = True, + only_last: bool = False, ) -> None: """ A utility method for handling stage inter-dependencies, when it's possible that either the target job(s) @@ -447,7 +450,8 @@ def dependency_handler( Args: target (Job | Iterable[Job] | None): job(s) which require a depends_on relationship with job(s) in tail tail (Job | Iterable[Job] | None): job(s) for this target job(s) to depend on; may be None - append_or_extend: (bool) if this is True, and tail is a list, the current job(s) will be added to the tail + append_or_extend (bool): if this is True, and tail is a list, the current job(s) will be added to the tail + only_last (bool): if this is True, we only set dependencies against the last element in tail, else all of tail """ # no way to set a relationship between non-jobs - values can be none, or empty lists @@ -465,13 +469,16 @@ def dependency_handler( # easier if we expect to operate on iterables target = target if isinstance(target, Iterable) else [target] - tail_list = tail if isinstance(tail, Iterable) else [tail] + tail_list = list(tail) if isinstance(tail, Iterable) else [tail] try: # don't try and call depends_on(*x) with an empty iterable if tail_list: for job in target: - job.depends_on(*tail_list) + if only_last: + job.depends_on(tail_list[-1]) + else: + job.depends_on(*tail_list) if append_or_extend: if isinstance(tail, list): diff --git a/tests/test_utils.py b/tests/test_utils.py index 3c3501a8..5fef3ae1 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -92,3 +92,13 @@ def test_depends_on_none(caplog): ) assert 'Failure to set dependencies between target ' in caplog.text + + +def test_depends_on_only_last(): + # new behaviour - what if we only want to dependency set on the last in a job series + target = MockJob('job1') + tail = [MockJob('job2'), MockJob('job3'), MockJob('job4')] + dependency_handler(target=target, tail=tail, only_last=True) + + assert target._dependencies == {MockJob('job4')} + assert len(tail) == 4 From 2fb89b77b7ba215b568b427d24fc8b70907055c3 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Thu, 22 Jan 2026 14:15:36 +1000 Subject: [PATCH 13/16] fix(test_utils.py): split logic to please sonar --- src/cpg_flow/utils.py | 34 ++++++++++++++++++++++++++-------- tests/test_utils.py | 41 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 43faf1da..8ae6121b 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -479,14 +479,32 @@ def dependency_handler( job.depends_on(tail_list[-1]) else: job.depends_on(*tail_list) - - if append_or_extend: - if isinstance(tail, list): - tail.extend(target) - if isinstance(tail, set): - tail.update(target) - else: - logger.warning(f'Append requested, but tail is not an iterable: {tail}({type(tail)})') + append_or_extend_jobs(target, tail, append_or_extend=append_or_extend) except AttributeError as ae: logger.error(f'Failure to set dependencies between target {target} and tail {tail}') raise ae + + +def append_or_extend_jobs(target: Iterable[Job], tail: Job | Iterable[Job], append_or_extend: bool) -> None: + """ + Separates the append logic from the depends logic. + + Args: + target (list[Job): a list of Job objects + tail: a Job or iterable of Jobs. Only permit extension if this is an iterable + append_or_extend (bool): whether to attempt addition of target job(s) to tail + """ + + # if we don't want to extend the tail, this method does nothing + if not append_or_extend: + return + + # if the tail is an identifiable iterable, select the appropriate extension method + if isinstance(tail, list): + tail.extend(target) + if isinstance(tail, set): + tail.update(target) + + # if it's not iterable, log a message but don't fail + else: + logger.warning(f'Append requested, but tail is not an iterable: {tail}({type(tail)})') diff --git a/tests/test_utils.py b/tests/test_utils.py index 5fef3ae1..bda0ba0a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -5,7 +5,7 @@ import pytest -from cpg_flow.utils import dependency_handler +from cpg_flow.utils import append_or_extend_jobs, dependency_handler class MockJob: @@ -102,3 +102,42 @@ def test_depends_on_only_last(): assert target._dependencies == {MockJob('job4')} assert len(tail) == 4 + + +def test_append_extend_no_empty(): + target = [MockJob('job1')] + tail = [] + append_or_extend_jobs(target, tail, False) + assert len(tail) == 0 + + +def test_append_extend_yes_empty(): + target = [MockJob('job1')] + tail = [] + append_or_extend_jobs(target, tail, True) + assert len(tail) == 1 + assert tail == target + + +def test_append_extend_yes_full(): + target = [MockJob('job1')] + tail = [MockJob('job2')] + append_or_extend_jobs(target, tail, True) + assert len(tail) == 2 + assert tail[-1] == target[0] + + +def test_append_extend_yes_set(): + target = [MockJob('job1')] + tail = {MockJob('job2')} + append_or_extend_jobs(target, tail, True) + assert len(tail) == 2 + assert target[0] in tail + + +def test_append_extend_no_logging(caplog): + target = [MockJob('job1')] + tail = MockJob('job2') + append_or_extend_jobs(target, tail, True) + assert tail == MockJob('job2') + assert 'Append requested, but tail is not an iterable:' in caplog.text From fd50d09840f25ed85debf7d64cad5ef1e36c3e9b Mon Sep 17 00:00:00 2001 From: MattWellie Date: Thu, 29 Jan 2026 09:18:19 +1000 Subject: [PATCH 14/16] fix(test_utils.py): changes following review --- src/cpg_flow/stage.py | 2 +- src/cpg_flow/utils.py | 64 +++++++++++++++---------------------------- tests/test_utils.py | 50 +++------------------------------ 3 files changed, 27 insertions(+), 89 deletions(-) diff --git a/src/cpg_flow/stage.py b/src/cpg_flow/stage.py index 60ed445f..4ec6e4a5 100644 --- a/src/cpg_flow/stage.py +++ b/src/cpg_flow/stage.py @@ -555,7 +555,7 @@ def _queue_jobs_with_checks( outputs.meta |= self.get_job_attrs(target) # make all output jobs dependent on all input jobs, but don't extend the list of prior deps - dependency_handler(target=outputs.jobs, tail=inputs.get_jobs(target), append_or_extend=False) + dependency_handler(target=outputs.jobs, tail=inputs.get_jobs(target), append_to_tail=False) if outputs.error_msg: return outputs diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 8ae6121b..36226659 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -421,9 +421,9 @@ def write_to_gcs_bucket(contents, path: Path): def dependency_handler( - target: Job | Iterable[Job] | None, - tail: Job | Iterable[Job] | None, - append_or_extend: bool = True, + target: Job | list[Job] | None, + tail: Job | list[Job] | None, + append_to_tail: bool = True, only_last: bool = False, ) -> None: """ @@ -448,15 +448,12 @@ def dependency_handler( utils.dependency_handler(target=j, tail=prior_jobs) Args: - target (Job | Iterable[Job] | None): job(s) which require a depends_on relationship with job(s) in tail - tail (Job | Iterable[Job] | None): job(s) for this target job(s) to depend on; may be None - append_or_extend (bool): if this is True, and tail is a list, the current job(s) will be added to the tail + target (Job | list[Job] | None): job(s) which require a depends_on relationship with job(s) in tail + tail (Job | list[Job] | None): job(s) for this target job(s) to depend on; may be None + append_to_tail (bool): if this is True, and tail is a list, the current job(s) will be added to the tail only_last (bool): if this is True, we only set dependencies against the last element in tail, else all of tail """ - # no way to set a relationship between non-jobs - values can be none, or empty lists - # if the tail is an empty list (no truthiness), we may still want to append to it - # if the target is null/empty, nothing to do if not target: logger.debug('No Target(s), cannot set depends_on relationships') @@ -467,44 +464,27 @@ def dependency_handler( logger.debug('No Tail, cannot set depends_on relationships or append') return - # easier if we expect to operate on iterables - target = target if isinstance(target, Iterable) else [target] + # downstream operations are easier if we expect to operate on iterables - create new variable name + target_list = ( + target + if isinstance(target, list) + else [ + target, + ] + ) tail_list = list(tail) if isinstance(tail, Iterable) else [tail] + # use only_last switch to choose dependencies to set + deps_to_apply = [tail_list[-1]] if only_last else tail_list + try: # don't try and call depends_on(*x) with an empty iterable if tail_list: - for job in target: - if only_last: - job.depends_on(tail_list[-1]) - else: - job.depends_on(*tail_list) - append_or_extend_jobs(target, tail, append_or_extend=append_or_extend) + for job in target_list: + job.depends_on(*deps_to_apply) + + if append_to_tail and isinstance(tail, list): + tail.extend(target_list) except AttributeError as ae: logger.error(f'Failure to set dependencies between target {target} and tail {tail}') raise ae - - -def append_or_extend_jobs(target: Iterable[Job], tail: Job | Iterable[Job], append_or_extend: bool) -> None: - """ - Separates the append logic from the depends logic. - - Args: - target (list[Job): a list of Job objects - tail: a Job or iterable of Jobs. Only permit extension if this is an iterable - append_or_extend (bool): whether to attempt addition of target job(s) to tail - """ - - # if we don't want to extend the tail, this method does nothing - if not append_or_extend: - return - - # if the tail is an identifiable iterable, select the appropriate extension method - if isinstance(tail, list): - tail.extend(target) - if isinstance(tail, set): - tail.update(target) - - # if it's not iterable, log a message but don't fail - else: - logger.warning(f'Append requested, but tail is not an iterable: {tail}({type(tail)})') diff --git a/tests/test_utils.py b/tests/test_utils.py index bda0ba0a..3e99da4e 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -63,7 +63,7 @@ def test_all_dependency_handlers_null(new_dep, old_dep, error: str, caplog): def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_append: bool, caplog): og_tail_list = deepcopy(old_dep if isinstance(old_dep, Iterable) else [old_dep]) - dependency_handler(target=new_dep, tail=old_dep, append_or_extend=append_arg) + dependency_handler(target=new_dep, tail=old_dep, append_to_tail=append_arg) new_dep_list = new_dep if isinstance(new_dep, Iterable) else [new_dep] new_tail_list = old_dep if isinstance(old_dep, Iterable) else [old_dep] @@ -73,12 +73,9 @@ def test_all_dependency_handlers_real(new_dep, old_dep, append_arg, expect_appen assert each_old in each_new._dependencies # appending, we expect all the original targets to be in the new tail - if expect_append: + if expect_append and isinstance(old_dep, list): for each_new in new_dep_list: - try: - assert each_new in new_tail_list - except AssertionError: - assert 'Append requested, but tail is not an iterable:' in caplog.text + assert each_new in new_tail_list else: for each_new in new_dep_list: assert each_new not in new_tail_list @@ -88,7 +85,7 @@ def test_depends_on_none(caplog): caplog.set_level(logging.WARNING) with pytest.raises(AttributeError): dependency_handler( - target=[MockJob('job1'), None], tail=[MockJob('job2'), MockJob('job3')], append_or_extend=False + target=[MockJob('job1'), None], tail=[MockJob('job2'), MockJob('job3')], append_to_tail=False ) assert 'Failure to set dependencies between target ' in caplog.text @@ -102,42 +99,3 @@ def test_depends_on_only_last(): assert target._dependencies == {MockJob('job4')} assert len(tail) == 4 - - -def test_append_extend_no_empty(): - target = [MockJob('job1')] - tail = [] - append_or_extend_jobs(target, tail, False) - assert len(tail) == 0 - - -def test_append_extend_yes_empty(): - target = [MockJob('job1')] - tail = [] - append_or_extend_jobs(target, tail, True) - assert len(tail) == 1 - assert tail == target - - -def test_append_extend_yes_full(): - target = [MockJob('job1')] - tail = [MockJob('job2')] - append_or_extend_jobs(target, tail, True) - assert len(tail) == 2 - assert tail[-1] == target[0] - - -def test_append_extend_yes_set(): - target = [MockJob('job1')] - tail = {MockJob('job2')} - append_or_extend_jobs(target, tail, True) - assert len(tail) == 2 - assert target[0] in tail - - -def test_append_extend_no_logging(caplog): - target = [MockJob('job1')] - tail = MockJob('job2') - append_or_extend_jobs(target, tail, True) - assert tail == MockJob('job2') - assert 'Append requested, but tail is not an iterable:' in caplog.text From fd167dd1a18cee860dd28364dc3a482fb2969617 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Thu, 29 Jan 2026 09:20:29 +1000 Subject: [PATCH 15/16] fix(test_utils.py): linting following method deletion --- tests/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index 3e99da4e..dd0f8298 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -5,7 +5,7 @@ import pytest -from cpg_flow.utils import append_or_extend_jobs, dependency_handler +from cpg_flow.utils import dependency_handler class MockJob: From cd87c5f4971ab1b82c55eb555de5bd9060c070f8 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Thu, 29 Jan 2026 09:33:20 +1000 Subject: [PATCH 16/16] fix(test_utils.py): remove Iterable import --- src/cpg_flow/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpg_flow/utils.py b/src/cpg_flow/utils.py index 36226659..5386a88c 100644 --- a/src/cpg_flow/utils.py +++ b/src/cpg_flow/utils.py @@ -9,7 +9,7 @@ import time import traceback import unicodedata -from collections.abc import Iterable, Iterator +from collections.abc import Iterator from functools import lru_cache from itertools import chain, islice from os.path import basename, dirname, join @@ -472,7 +472,7 @@ def dependency_handler( target, ] ) - tail_list = list(tail) if isinstance(tail, Iterable) else [tail] + tail_list = tail if isinstance(tail, list) else [tail] # use only_last switch to choose dependencies to set deps_to_apply = [tail_list[-1]] if only_last else tail_list