Skip to content

Commit

Permalink
feat: support running workflow instances directly (#35)
Browse files Browse the repository at this point in the history
Enhance the `sghi.etl.commons.run_workflow` function to support running
`sghi.etl.core.WorkflowDefinition` instances directly. The function now
accepts either a ``WorkflowDefinition`` instance or a factory function
that supplies a ``WorkflowDefinition`` instance.
  • Loading branch information
kennedykori authored Dec 28, 2024
1 parent b8fa894 commit 8363850
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
48 changes: 30 additions & 18 deletions src/sghi/etl/commons/utils/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from logging import Logger
from typing import TYPE_CHECKING, Final, TypeVar

from sghi.utils import ensure_callable
from sghi.etl.core import WorkflowDefinition
from sghi.utils import ensure_predicate, type_fqn

if TYPE_CHECKING:
from collections.abc import Callable

from sghi.etl.core import WorkflowDefinition

# =============================================================================
# TYPES
Expand All @@ -38,7 +38,10 @@
# =============================================================================


def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
def run_workflow(
wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]
| WorkflowDefinition[_RDT, _PDT],
) -> None:
"""Execute an ETL :class:`Workflow<WorkflowDefinition>`.
.. tip::
Expand All @@ -48,10 +51,11 @@ def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
:class:`WorkflowDefinition` class that is being executed or about to
be executed.
This function accepts a factory function that supplies an ETL
``WorkflowDefinition`` instance, it then invokes the function to get the
``WorkflowDefinition``/workflow and then executes it. The execution of the
workflow proceeds as follows:
This function accepts an ETL ``WorkflowDefinition`` instance or factory
function that supplies a ``WorkflowDefinition`` instance. If a factory
function is provided, it is first invoked to get the
``WorkflowDefinition``/workflow before execution of the workflow starts.
The execution of the workflow proceeds as follows:
1. The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.prologue` property is
Expand Down Expand Up @@ -92,22 +96,30 @@ def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
description.
If an exception is raised during the workflow execution, all the workflow's
components (source, processor, sink) are disposed of followed by the
propagation of the error to the caller. If the supplied value **IS NOT** a
valid callable object, a :exc:`ValueError` is raised.
components (source, processor, sink) are disposed of, the epilogue callable
is invoked, and the error is propagated to the caller.
:param wf: A factory function that supplies the ``WorkflowDefinition``
instance to be executed. This function is only invoked once. The given
value *MUST* be a valid callable object, and it *MUST NOT* have any
required arguments.
:param wf: A ``WorkflowDefinition`` instance or a factory function that
supplies the ``WorkflowDefinition`` instance to be executed. If a
factory function is given, it is only invoked once. The given
value *MUST EITHER* be a ``WorkflowDefinition`` instance or valid
callable object.
:return: None.
:raise ValueError: If ``wf`` is NOT a callable object.
:raise ValueError: If ``wf`` is NEITHER a ``WorkflowDefinition`` instance
NOR a callable object.
"""
ensure_callable(wf, message="'wf' MUST be a valid callable object.")

wd: WorkflowDefinition = wf()
ensure_predicate(
test=callable(wf) or isinstance(wf, WorkflowDefinition),
exc_factory=ValueError,
message=(
"'wf' MUST be a valid callable object or an "
f"'{type_fqn(WorkflowDefinition)}' instance."
),
)

wd: WorkflowDefinition = wf() if callable(wf) else wf
try:
_LOGGER.info("[%s:%s] Setting up workflow ...", wd.id, wd.name)
wd.prologue()
Expand Down
17 changes: 10 additions & 7 deletions test/sghi/etl/commons_tests/utils_tests/others_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,19 @@ def save_strings_to_repo(strings: Iterable[str]) -> None: # pyright: ignore[rep
# =============================================================================


def test_run_workflow_fails_on_non_callable_input() -> None:
def test_run_workflow_fails_on_invalid_input_wf_value() -> None:
""":func:`sghi.etl.commons.utils.run_workflow` should raise a
:exc:`ValueError` when given a non-callable input value.
:exc:`ValueError` when given a non-callable input value or a
value that is not :class:`WorkflowDefinition` instance.
"""
wf = _create_workflow_factory([])
for non_callable in (None, wf()):
for invalid_val in (None, "", 1, 5.3):
with pytest.raises(ValueError, match="callable object.") as exp_info:
run_workflow(wf=non_callable) # type: ignore[reportArgumentType]
run_workflow(wf=invalid_val) # type: ignore[reportArgumentType]

assert (
exp_info.value.args[0] == "'wf' MUST be a valid callable object."
exp_info.value.args[0]
== "'wf' MUST be a valid callable object or an "
"'sghi.etl.core.WorkflowDefinition' instance."
)


Expand Down Expand Up @@ -268,7 +270,8 @@ def clean_up() -> None:
wf2 = _create_workflow_factory(repository2, 10, 60, 10, set_up, clean_up)

run_workflow(wf1)
run_workflow(wf2)
# noinspection PyTypeChecker
run_workflow(wf2())

assert repository1 == ["100", "101", "102", "103", "104"]
assert repository2 == ["110", "120", "130", "140", "150"]
Expand Down

0 comments on commit 8363850

Please sign in to comment.