diff --git a/docs/conf.py b/docs/conf.py index a50a7a0..7bf37e6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,6 +56,8 @@ nitpicky = True nitpick_ignore = [ + ("py:attr", "sghi.etl.core.WorkflowDefinition.epilogue"), # docs aren't published yet + ("py:attr", "sghi.etl.core.WorkflowDefinition.prologue"), # docs aren't published yet ("py:attr", "sghi.etl.core.WorkflowDefinition.processor_factory"), # docs aren't published yet ("py:attr", "sghi.etl.core.WorkflowDefinition.sink_factory"), # docs aren't published yet ("py:attr", "sghi.etl.core.WorkflowDefinition.source_factory"), # docs aren't published yet diff --git a/src/sghi/etl/commons/utils/others.py b/src/sghi/etl/commons/utils/others.py index 2241f98..1b00540 100644 --- a/src/sghi/etl/commons/utils/others.py +++ b/src/sghi/etl/commons/utils/others.py @@ -53,26 +53,37 @@ def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None: ``WorkflowDefinition``/workflow and then executes it. The execution of the workflow proceeds as follows: - - The callable returned by the - :attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of - the supplied ``WorkflowDefinition`` is used to get the - :class:`~sghi.etl.core.Source` associated with the workflow. The - :meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then - invoked to get the raw data to process. - - The callable returned by the - :attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property - of the supplied ``WorkflowDefinition`` is invoked to get the - :class:`~sghi.etl.core.Processor` associated with the workflow. This - ``Processor`` is then applied to the raw data retrieved from the - ``Source`` in the previous step to obtain processed data. - - The callable returned by the - :attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of - the supplied ``WorkflowDefinition`` is invoked to get the - :class:`~sghi.etl.core.Sink` associated with the workflow. The - processed data from the previous step is drained into this ``Sink``. - - The ``Source``, ``Processor`` and ``Sink`` created in the previous - steps are disposed of. Note that this disposal also happens if an - error occurs during the workflow execution. + 1. The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.prologue` property is + invoked first. If an error occurs while executing the callable, all + the rest of the steps, except the last, are skipped. + 2. The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of + the supplied ``WorkflowDefinition`` is used to get the + :class:`~sghi.etl.core.Source` associated with the workflow. The + :meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then + invoked to get the raw data to process. If an error occurs while + drawing data for the ``Source``, execution jumps to step 5. + 3. The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property + of the supplied ``WorkflowDefinition`` is invoked to get the + :class:`~sghi.etl.core.Processor` associated with the workflow. This + ``Processor`` is then applied to the raw data retrieved from the + ``Source`` in the previous step to obtain processed data. If an + error occurs while processing the raw data, execution jumps to + step 5. + 4. The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of + the supplied ``WorkflowDefinition`` is invoked to get the + :class:`~sghi.etl.core.Sink` associated with the workflow. The + processed data from the previous step is drained into this ``Sink``. + 5. The ``Source``, ``Processor`` and ``Sink`` created in the previous + steps are disposed of. Note that this disposal also happens if an + error occurs while executing any of the previous steps. + 6. The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.epilogue` property is + invoked last. This is always invoked regardless of whether all the + steps in the workflow completed successfully or not. .. note:: @@ -97,12 +108,21 @@ def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None: ensure_callable(wf, message="'wf' MUST be a valid callable object.") wd: WorkflowDefinition = wf() - _LOGGER.info("[%s:%s] Starting workflow execution ...", wd.id, wd.name) - with ( - wd.source_factory() as source, - wd.processor_factory() as processor, - wd.sink_factory() as sink, - ): - sink.drain(processor.apply(source.draw())) - - _LOGGER.info("[%s:%s] Workflow execution complete.", wd.id, wd.name) + try: + _LOGGER.info("[%s:%s] Setting up workflow ...", wd.id, wd.name) + wd.prologue() + _LOGGER.info("[%s:%s] Starting workflow execution ...", wd.id, wd.name) + with ( + wd.source_factory() as source, + wd.processor_factory() as processor, + wd.sink_factory() as sink, + ): + sink.drain(processor.apply(source.draw())) + _LOGGER.info( + "[%s:%s] Workflow execution complete. Cleaning up ...", + wd.id, + wd.name, + ) + finally: + wd.epilogue() + _LOGGER.info("[%s:%s] Done :)", wd.id, wd.name) diff --git a/src/sghi/etl/commons/workflow_builder.py b/src/sghi/etl/commons/workflow_builder.py index 073be2b..0f9a955 100644 --- a/src/sghi/etl/commons/workflow_builder.py +++ b/src/sghi/etl/commons/workflow_builder.py @@ -53,6 +53,16 @@ _SourceFactory = Callable[[], Source[_T1]] + +# ============================================================================= +# HELPERS +# ============================================================================= + + +def _noop() -> None: + """Do nothing.""" + + # ============================================================================= # EXCEPTIONS # ============================================================================= @@ -97,6 +107,124 @@ class WorkflowBuilder(Generic[_RDT, _PDT]): :class:`sinks`. This builder class offers a convenient way to construct workflows by providing methods to register sources, processors, and sinks, either individually or using factories. + + Here are a couple of usage examples: + + Example 1 + + .. code-block:: python + :linenos: + + from sghi.etl.commons import * + + wb = WorkflowBuilder[str, str](id="test1", name="Test Workflow 1") + + + @source + def hello_world() -> str: + return "Hello, World!" + + + wb.draw_from(hello_world).drain_to(sink(print)) + run_workflow(wb) + + Example 2 + + .. code-block:: python + :linenos: + + import random + from collections.abc import Iterable + + from sghi.etl.commons import * + + wb: WorkflowBuilder[Iterable[int], Iterable[int]] + wb = WorkflowBuilder(id="test2", name="Test Workflow 2") + + + @source + def supply_ints() -> Iterable[int]: + for _ in range(10): + yield random.randint(0, 9) # noqa: S311 + + + @sink + def print_each(values: Iterable[int]) -> None: + for value in values: + print(value) + + + wb.draw_from(supply_ints).drain_to(print_each) + run_workflow(wb) + + Example 3 + + .. code-block:: python + :linenos: + + import random + from collections.abc import Iterable, Sequence + + from sghi.etl.commons import * + + wb: WorkflowBuilder[Iterable[int], Sequence[str]] + wb = WorkflowBuilder( + id="test3", + name="Test Workflow 3", + composite_processor_factory=ProcessorPipe[ + Iterable[int], Sequence[str] + ], + composite_sink_factory=ScatterSink[Sequence[str]], + ) + + + # SOURCES + # ---------------------------------------------------------------------- + @wb.draws_from + @source + def supply_ints() -> Iterable[int]: + for _ in range(10): + yield random.randint(0, 9) # noqa: S311 + + + # PROCESSORS + # ---------------------------------------------------------------------- + @wb.applies_processor + @processor + def add_100(values: Iterable[int]) -> Iterable[int]: + for v in values: + yield v + 100 + + + @wb.applies_processor + @processor + def ints_as_strings(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + + @wb.applies_processor + @processor + def values_to_sequence(values: Iterable[str]) -> Sequence[str]: + return list(values) + + + # SINKS + # ---------------------------------------------------------------------- + @wb.drains_to + @sink + def print_each(values: Sequence[str]) -> None: + for value in values: + print(value) + + + @wb.drains_to + @sink + def print_all(values: Sequence[str]) -> None: + print(f"[{", ".join(list(values))}]") + + + run_workflow(wb) + """ # noqa: D205 __slots__ = ( @@ -106,9 +234,11 @@ class WorkflowBuilder(Generic[_RDT, _PDT]): "_default_processor_factory", "_default_sink_factory", "_description", + "_epilogue", "_id", "_name", "_processor_factories", + "_prologue", "_sink_factories", "_source_factories", ) @@ -129,6 +259,8 @@ def __init__( # noqa: PLR0913 composite_source_factory: _CompositeSourceFactory = GatherSource, composite_processor_factory: _CompositeProcessorFactory = ScatterGatherProcessor, # noqa: E501 composite_sink_factory: _CompositeSinkFactory = ScatterSink, + prologue: Callable[[], None] = _noop, + epilogue: Callable[[], None] = _noop, ) -> None: r"""Create a ``WorkflowBuilder`` of the following properties. @@ -183,6 +315,12 @@ def __init__( # noqa: PLR0913 ``Sequence`` of ``Sink`` instances. Defaults to the ``ScatterSink`` class, which drains data to all its embedded ``Sink``\ s concurrently. + :param prologue: An optional function to be invoked at the beginning of + the assembled workflow(s). This MUST be a valid callable. Defaults + to a callable that does nothing when invoked. + :param epilogue: An optional function to be invoked at the end of the + created workflow(s). This MUST be a valid callable. Defaults to + a callable that does nothing when invoked. :raise TypeError: If ``id`` or ``name`` are NOT of type string. If ``description`` is NOT a string when NOT ``None``. If @@ -192,6 +330,9 @@ def __init__( # noqa: PLR0913 ``default_processor_factory``, ``default_sink_factory``, ``composite_source_factory``, ``composite_processor_factory`` or ``composite_sink_factory`` are NOT valid callable objects. + + .. versionadded:: 1.2.0 The ``epilogue`` parameter. + .. versionadded:: 1.2.0 The ``prologue`` parameter. """ super().__init__() self._id: str = ensure_not_none_nor_empty( @@ -269,6 +410,14 @@ def __init__( # noqa: PLR0913 value=composite_sink_factory, message="'composite_sink_factory' MUST be a callable object.", ) + self._prologue: Callable[[], None] = ensure_callable( + value=prologue, + message="'prologue' MUST be a callable object.", + ) + self._epilogue: Callable[[], None] = ensure_callable( + value=epilogue, + message="'epilogue' MUST be a callable object.", + ) def __call__(self) -> WorkflowDefinition[_RDT, _PDT]: """Build and return a :class:`~sghi.etl.core.WorkflowDefinition`. @@ -509,6 +658,38 @@ def description(self, __description: str | None, /) -> None: message="'description' MUST be a string when NOT None.", ) + @property + def epilogue(self) -> Callable[[], None]: + r"""A callable to be executed at the end of the assembled + ``WorkflowDefinition``\ (s). + + This is always executed regardless of whether the resulting workflows + or their :attr:`prologue` callables completed successfully. + + .. versionadded:: 1.2.0 + """ # noqa: D205 + return self._epilogue + + @epilogue.setter + def epilogue(self, __epilogue: Callable[[], None], /) -> None: + r"""Set the callable to be executed at end of the assembled + ``WorkflowDefinition``\ (s). + + :param __epilogue: A callable object to be executed at the end of the + resulting workflow(s). This callable is always invoked whenever + the resulting workflows are executed. This MUST be a valid + callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + + .. versionadded:: 1.2.0 + """ # noqa: D205 + self._epilogue: Callable[[], None] = ensure_callable( + value=__epilogue, + message="'epilogue' MUST be a callable object.", + ) + @property def id(self) -> str: """The identifier to assign to the assembled workflow(s).""" @@ -601,6 +782,41 @@ def processor_factories( ) self._processor_factories = list(__processor_factories) + @property + def prologue(self) -> Callable[[], None]: + r"""A callable to be executed at the beginning of the assembled + ``WorkflowDefinition``\ (s). + + If the execution of this callable fails, i.e. raises an exception + during the execution of the resulting workflow(s), then the rest of the + workflow is never executed, only the callable returned by the + :attr:`epilogue` property is. + + This can be useful for validating the loaded configuration, setting up + certain resources before the "main workflow" execution starts, etc. + + .. versionadded:: 1.2.0 + """ # noqa: D205 + return self._prologue + + @prologue.setter + def prologue(self, __prologue: Callable[[], None], /) -> None: + r"""Set the callable to be executed at the beginning of the assembled + ``WorkflowDefinition``\ (s). + + :param __prologue: A callable object to be executed at the beginning of + the resulting workflow(s). This MUST be valid callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + + .. versionadded:: 1.2.0 + """ # noqa: D205 + self._prologue: Callable[[], None] = ensure_callable( + value=__prologue, + message="'prologue' MUST be a callable object.", + ) + @property def sink_factories(self) -> Sequence[_SinkFactory[Any]]: r"""Get the ``Sink`` factories registered to create ``Sink``\ s for the @@ -711,6 +927,8 @@ def build(self) -> WorkflowDefinition[_RDT, _PDT]: source_factory=self._build_source_factory(), processor_factory=self._build_processor_factory(), sink_factory=self._build_sink_factory(), + prologue=self._prologue, + epilogue=self._epilogue, ) # DECORATORS @@ -837,6 +1055,59 @@ def draws_from(self, source: Source[Any]) -> Source[Any]: self.draw_from(source) return source + def mark_epilogue( + self, + epilogue: Callable[[], None], + ) -> Callable[[], None]: + r"""Set the given/decorated callable to be executed at end of the + assembled ``WorkflowDefinition``\ (s). + + This is always executed regardless of whether the resulting workflow(s) + or their :attr:`prologue` callable completed successfully. + + :param epilogue: A callable object to be executed at the end of the + resulting workflow(s). This callable is always invoked whenever + the resulting workflow(s) are executed. This MUST be a valid + callable object. + + :return: The given callable as is. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + + .. versionadded:: 1.2.0 + """ # noqa: D205 + self.epilogue = epilogue + return epilogue + + def mark_prologue( + self, + prologue: Callable[[], None], + ) -> Callable[[], None]: + r"""Set the given/decorated callable to be executed at beginning of the + assembled ``WorkflowDefinition``\ (s). + + If the execution of the given callable fails, i.e. raises an exception + during the execution of the resulting workflow(s), then the rest of the + workflow is never executed, only the callable returned by the + :attr:`epilogue` property is. + + This can be useful for validating the loaded configuration, setting up + certain resources before the "main workflow" execution starts, etc. + + :param prologue: A callable object to be executed at the beginning of + the resulting workflow(s). This MUST be a valid callable object. + + :return: The given callable as is. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + + .. versionadded:: 1.2.0 + """ # noqa: D205 + self.prologue = prologue + return prologue + # FLOW API # ------------------------------------------------------------------------- def apply_processor( diff --git a/src/sghi/etl/commons/workflow_definitions.py b/src/sghi/etl/commons/workflow_definitions.py index 956f2d2..b0d4b5c 100644 --- a/src/sghi/etl/commons/workflow_definitions.py +++ b/src/sghi/etl/commons/workflow_definitions.py @@ -33,6 +33,15 @@ """Type variable representing the raw data type.""" +# ============================================================================= +# HELPERS +# ============================================================================= + + +def _noop() -> None: + """Do nothing.""" + + # ============================================================================= # SPEC IMPLEMENTATIONS # ============================================================================= @@ -45,6 +54,17 @@ class SimpleWorkflowDefinition( ): """A simple :class:`WorkflowDefinition` implementation.""" + __slots__ = ( + "_description", + "_epilogue", + "_id", + "_name", + "_processor_factory", + "_prologue", + "_sink_factory", + "_source_factory", + ) + def __init__( # noqa: PLR0913 self, id: str, # noqa: A002 @@ -53,6 +73,8 @@ def __init__( # noqa: PLR0913 description: str | None = None, processor_factory: Callable[[], Processor[_RDT, _PDT]] = NOOPProcessor, sink_factory: Callable[[], Sink[_PDT]] = NullSink, + prologue: Callable[[], None] = _noop, + epilogue: Callable[[], None] = _noop, ) -> None: """Create a new ``WorkflowDefinition`` with the provided properties. @@ -77,13 +99,20 @@ def __init__( # noqa: PLR0913 :param sink_factory: A function that suppliers the ``Sink`` associated with the created workflow. This MUST be a valid callable. Defaults to ``NullSink`` when not provided. + :param prologue: An optional function to be invoked at the beginning of + the created workflow. This MUST be a valid callable. Defaults to + a callable that does nothing when invoked. + :param epilogue: An optional function to be invoked at the end of the + created workflow. This MUST be a valid callable. Defaults to + a callable that does nothing when invoked. :raise TypeError: If ``id`` or ``name`` are NOT strings, or if ``description`` is provided but is NOT a string. :raise ValueError: If one of the following is ``True``; ``id`` is an empty string, ``name`` is an empty string, ``source_factory`` is NOT a valid callable, ``processor_factory`` is NOT a valid callable - or ``sink_factory`` is NOT a valid callable. + , ``sink_factory`` is NOT a valid callable, ``prologue`` is NOT a + valid callable or ``epilogue`` is NOT a valid callable. """ super().__init__() self._id: str = ensure_not_none_nor_empty( @@ -120,6 +149,14 @@ def __init__( # noqa: PLR0913 value=sink_factory, message="'sink_factory' MUST be a callable object.", ) + self._prologue: Callable[[], None] = ensure_callable( + value=prologue, + message="'prologue' MUST be a callable object.", + ) + self._epilogue: Callable[[], None] = ensure_callable( + value=epilogue, + message="'epilogue' MUST be a callable object.", + ) @property @override @@ -151,6 +188,33 @@ def processor_factory(self) -> Callable[[], Processor[_RDT, _PDT]]: def sink_factory(self) -> Callable[[], Sink[_PDT]]: return self._sink_factory + @property + @override + def prologue(self) -> Callable[[], None]: + """A callable to be executed at the beginning of the workflow. + + If the execution of this callable fails, i.e. raises an exception, then + the main workflow is never executed, only the callable returned by the + :attr:`epilogue` property is. + This can be used to validate the loaded configuration, setting up + certain resources before the workflow execution starts, etc. + + .. versionadded:: 1.2.0 + """ + return self._prologue + + @property + @override + def epilogue(self) -> Callable[[], None]: + """A callable to be executed at the end of the workflow. + + This is always executed regardless of whether the resulting workflow + or its :attr:`prologue` callable completed successfully. + + .. versionadded:: 1.2.0 + """ + return self._epilogue + # ============================================================================= # MODULE EXPORTS diff --git a/test/sghi/etl/commons_tests/utils_tests/others_tests.py b/test/sghi/etl/commons_tests/utils_tests/others_tests.py index 02f8aa3..c06c373 100644 --- a/test/sghi/etl/commons_tests/utils_tests/others_tests.py +++ b/test/sghi/etl/commons_tests/utils_tests/others_tests.py @@ -12,6 +12,7 @@ NullSink, ProcessorPipe, SimpleWorkflowDefinition, + WorkflowBuilder, processor, run_workflow, sink, @@ -28,41 +29,49 @@ # ============================================================================= -def _workflow_factory_generator( +def _noop() -> None: + """Do nothing.""" + + +def _create_workflow_factory( # noqa: PLR0913 repository: MutableSequence[str], start: int = 0, stop: int = 5, step: int = 1, + prologue: Callable[[], None] = _noop, + epilogue: Callable[[], None] = _noop, ) -> Callable[[], WorkflowDefinition[Iterable[int], Iterable[str]]]: + wb: WorkflowBuilder[Iterable[int], Iterable[str]] + wb = WorkflowBuilder( + id="test_workflow", + name="Test Workflow", + composite_processor_factory=ProcessorPipe, + prologue=prologue, + epilogue=epilogue, + ) + + @wb.draws_from @source - def supply_ints() -> Iterable[int]: + def supply_ints() -> Iterable[int]: # pyright: ignore[reportUnusedFunction] yield from range(start, stop, step) + @wb.applies_processor @processor - def add_100(values: Iterable[int]) -> Iterable[int]: + def add_100(values: Iterable[int]) -> Iterable[int]: # pyright: ignore[reportUnusedFunction] for v in values: yield v + 100 + @wb.applies_processor @processor - def ints_as_strings(ints: Iterable[int]) -> Iterable[str]: + def ints_as_strings(ints: Iterable[int]) -> Iterable[str]: # pyright: ignore[reportUnusedFunction] yield from map(str, ints) + @wb.drains_to @sink - def save_strings_to_repo(strings: Iterable[str]) -> None: + def save_strings_to_repo(strings: Iterable[str]) -> None: # pyright: ignore[reportUnusedFunction] repository.extend(strings) - def _create_workflow() -> WorkflowDefinition[Iterable[int], Iterable[str]]: - return SimpleWorkflowDefinition( - id="test_workflow", - name="Test Workflow", - source_factory=lambda: supply_ints, - processor_factory=lambda: ProcessorPipe( - [add_100, ints_as_strings], - ), - sink_factory=lambda: save_strings_to_repo, - ) - - return _create_workflow + return wb # ============================================================================= @@ -74,7 +83,7 @@ def test_run_workflow_fails_on_non_callable_input() -> None: """:func:`sghi.etl.commons.utils.run_workflow` should raise a :exc:`ValueError` when given a non-callable input value. """ - wf = _workflow_factory_generator([]) + wf = _create_workflow_factory([]) for non_callable in (None, wf()): with pytest.raises(ValueError, match="callable object.") as exp_info: run_workflow(wf=non_callable) # type: ignore[reportArgumentType] @@ -84,12 +93,56 @@ def test_run_workflow_fails_on_non_callable_input() -> None: ) -def test_run_workflow_side_effects_on_failed_execution() -> None: +def test_run_workflow_side_effects_on_failed_drain() -> None: """:func:`sghi.etl.commons.utils.run_workflow` should dispose all the - workflow components (source, processor and sink) if an error occurs during - execution. + workflow components (source, processor and sink) and invoke the + ``epilogue`` callable of the workflow if an error occurs while draining + data to a :class:`Sink`. """ + @source + def greate_the_world() -> str: + return "Hello World!!" + + @sink + def failing_sink(_: str) -> None: + _err_msg: str = "Oops, something failed." + raise RuntimeError(_err_msg) + + _processor = NOOPProcessor() + _has_cleaned_up: bool = False + + def clean_up() -> None: + nonlocal _has_cleaned_up + _has_cleaned_up = True + + def create_failing_workflow() -> WorkflowDefinition[str, str]: + return SimpleWorkflowDefinition( + id="failing_workflow", + name="Failing Workflow", + source_factory=lambda: greate_the_world, + processor_factory=lambda: _processor, + sink_factory=lambda: failing_sink, + epilogue=clean_up, + ) + + with pytest.raises(RuntimeError, match="Oops, something failed."): + run_workflow(wf=create_failing_workflow) + + assert greate_the_world.is_disposed + assert _processor.is_disposed + assert failing_sink.is_disposed + assert _has_cleaned_up + + +def test_run_workflow_side_effects_on_failed_draw() -> None: + """:func:`sghi.etl.commons.utils.run_workflow` should dispose all the + workflow components (source, processor and sink) and invoke the + ``epilogue`` callable of the workflow if an error occurs while drawing data + rom a :class:`Source`. + """ + + # noinspection PyTypeChecker @source def failing_source() -> str: _err_msg: str = "Oops, something failed." @@ -97,6 +150,11 @@ def failing_source() -> str: _processor = NOOPProcessor() _sink = NullSink() + _has_cleaned_up: bool = False + + def clean_up() -> None: + nonlocal _has_cleaned_up + _has_cleaned_up = True def create_failing_workflow() -> WorkflowDefinition[str, str]: return SimpleWorkflowDefinition( @@ -105,6 +163,7 @@ def create_failing_workflow() -> WorkflowDefinition[str, str]: source_factory=lambda: failing_source, processor_factory=lambda: _processor, sink_factory=lambda: _sink, + epilogue=clean_up, ) with pytest.raises(RuntimeError, match="Oops, something failed."): @@ -113,19 +172,105 @@ def create_failing_workflow() -> WorkflowDefinition[str, str]: assert failing_source.is_disposed assert _processor.is_disposed assert _sink.is_disposed + assert _has_cleaned_up + + +def test_run_workflow_side_effects_on_failed_processing() -> None: + """:func:`sghi.etl.commons.utils.run_workflow` should dispose all the + workflow components (source, processor and sink) and invoke the + ``epilogue`` callable of the workflow if an error occurs while processing + data. + """ + + @source + def greate_the_world() -> str: + return "Hello World!!" + + # noinspection PyTypeChecker + @processor + def failing_processor(_: str) -> str: + _err_msg: str = "Oops, something failed." + raise RuntimeError(_err_msg) + + _sink = NullSink() + _has_cleaned_up: bool = False + + def clean_up() -> None: + nonlocal _has_cleaned_up + _has_cleaned_up = True + + def create_failing_workflow() -> WorkflowDefinition[str, str]: + return SimpleWorkflowDefinition( + id="failing_workflow", + name="Failing Workflow", + source_factory=lambda: greate_the_world, + processor_factory=lambda: failing_processor, + sink_factory=lambda: _sink, + epilogue=clean_up, + ) + + with pytest.raises(RuntimeError, match="Oops, something failed."): + run_workflow(wf=create_failing_workflow) + + assert greate_the_world.is_disposed + assert failing_processor.is_disposed + assert _sink.is_disposed + assert _has_cleaned_up + + +def test_run_workflow_side_effects_on_failed_prologue_execution() -> None: + """:func:`sghi.etl.commons.utils.run_workflow` should invoke the + ``epilogue`` callable of the workflow if an error occurs while executing + the ``prologue`` callable of the workflow. + """ + + def failing_prologue() -> None: + _err_msg: str = "Oops, something failed." + raise RuntimeError(_err_msg) + + _has_cleaned_up: bool = False + + def clean_up() -> None: + nonlocal _has_cleaned_up + _has_cleaned_up = True + + wf = _create_workflow_factory( + repository=[], + prologue=failing_prologue, + epilogue=clean_up, + ) + + with pytest.raises(RuntimeError, match="Oops, something failed."): + run_workflow(wf) + + assert _has_cleaned_up def test_run_workflow_side_effects_on_successful_execution() -> None: """func:`sghi.etl.commons.utils.run_workflow` should execute an ETL Workflow when given a factory function that returns the workflow. """ + has_set_up: bool = False + has_cleaned_up: bool = False + + def set_up() -> None: + nonlocal has_set_up + has_set_up = True + + def clean_up() -> None: + nonlocal has_cleaned_up + has_cleaned_up = True + repository1: list[str] = [] repository2: list[str] = [] - wf1 = _workflow_factory_generator(repository1) - wf2 = _workflow_factory_generator(repository2, 10, 60, 10) + + wf1 = _create_workflow_factory(repository1) + wf2 = _create_workflow_factory(repository2, 10, 60, 10, set_up, clean_up) run_workflow(wf1) run_workflow(wf2) assert repository1 == ["100", "101", "102", "103", "104"] assert repository2 == ["110", "120", "130", "140", "150"] + assert has_cleaned_up + assert has_set_up diff --git a/test/sghi/etl/commons_tests/workflow_builder_tests.py b/test/sghi/etl/commons_tests/workflow_builder_tests.py index db1a296..ba85f83 100644 --- a/test/sghi/etl/commons_tests/workflow_builder_tests.py +++ b/test/sghi/etl/commons_tests/workflow_builder_tests.py @@ -3,6 +3,8 @@ from __future__ import annotations +import os +import shutil from collections.abc import Iterator from typing import TYPE_CHECKING, Any, Self from unittest import TestCase @@ -39,6 +41,10 @@ # ============================================================================= +def _noop() -> None: + """Do nothing.""" + + class Zero(Source[Iterator[int]]): """A :class:`Source` that provides as many zeros as drawn. @@ -96,6 +102,8 @@ def setUp(self) -> None: processor_factories=[NOOPProcessor], sink_factories=[NullSink], source_factories=[Zero], + epilogue=_noop, + prologue=_noop, ) def test_applies_processor_fails_when_given_invalid_input(self) -> None: @@ -875,6 +883,51 @@ def test_draws_from_fails_when_given_invalid_input(self) -> None: == "'source' MUST be an 'sghi.etl.core.Source' instance." ) + def test_epilogue_modification_with_valid_value_succeeds(self) -> None: + """Setting a callable object to the :attr:`WorkflowBuilder.epilogue` + attribute should succeed. + """ + + def _remove_temp_directories() -> None: + shutil.rmtree( + path="/tmp/sghi-etl-workflow-tmp-dir", # noqa: S108 + ignore_errors=True, + ) + + try: + self._instance1.epilogue = _noop + self._instance2.epilogue = _remove_temp_directories + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.epilogue' attribute with a " + "callable object SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.epilogue is _noop + assert self._instance2.epilogue is _remove_temp_directories + + def test_epilogue_modification_with_an_invalid_value_fails(self) -> None: + """Setting a non-callable value to the :attr:`WorkflowBuilder.epilogue` + attribute should raise a :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.epilogue = non_callable # type: ignore[reportArgumentType] + + assert ( + exp_info.value.args[0] + == "'epilogue' MUST be a callable object." + ) + + def test_epilogue_return_value(self) -> None: + r""":attr:`WorkflowBuilder.epilogue` should return the callable object + to execute at the end of the assembled ``WorkflowDefinition``\ (s). + """ + assert callable(self._instance1.epilogue) + assert self._instance2.epilogue is _noop + def test_draws_from_return_value(self) -> None: """The decorator meth:`WorkflowBuilder.draws_from` should return the :class:`Source` instance given to it. @@ -975,6 +1028,122 @@ def test_invoking_instances_as_callable_return_value(self) -> None: assert workflow_def.processor_factory is NOOPProcessor assert workflow_def.sink_factory is NullSink + def test_mark_epilogue_modification_with_valid_value_succeeds( + self, + ) -> None: + """Invoking the :meth:`WorkflowBuilder.mark_epilogue` method with a + valid callable should succeed. + """ + try: + self._instance1.mark_epilogue(_noop) + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Invoking the 'WorkflowBuilder.mark_epilogue' method with " + "a callable object SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + @self._instance2.mark_epilogue + def _remove_temp_directories() -> None: + shutil.rmtree( + path="/tmp/sghi-etl-workflow-tmp-dir", # noqa: S108 + ignore_errors=True, + ) + + assert self._instance1.epilogue is _noop + assert self._instance2.epilogue is _remove_temp_directories + + def test_mark_epilogue_modification_with_an_invalid_value_fails( + self, + ) -> None: + """Passing a non-callable value to the + :meth:`WorkflowBuilder.mark_epilogue` method should raise a + :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.mark_epilogue(non_callable) # type: ignore[reportArgumentType] + + assert ( + exp_info.value.args[0] + == "'epilogue' MUST be a callable object." + ) + + def test_mark_epilogue_return_value(self) -> None: + """:meth:`WorkflowBuilder.mark_epilogue` should return the callable + object passed to it. + """ + + def _remove_temp_directories() -> None: + shutil.rmtree( + path="/tmp/sghi-etl-workflow-tmp-dir", # noqa: S108 + ignore_errors=True, + ) + + assert self._instance1.mark_epilogue(_noop) is _noop + assert ( + self._instance2.mark_epilogue(_remove_temp_directories) + is _remove_temp_directories + ) + + def test_mark_prologue_modification_with_an_invalid_value_fails( + self, + ) -> None: + """Passing a non-callable value to the + :meth:`WorkflowBuilder.mark_prologue` method should raise a + exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.mark_prologue(non_callable) # type: ignore[reportArgumentType] + + assert ( + exp_info.value.args[0] + == "'prologue' MUST be a callable object." + ) + + def test_mark_prologue_modification_with_valid_value_succeeds( + self, + ) -> None: + """Invoking the :meth:`WorkflowBuilder.mark_prologue` method with a + valid callable should succeed. + """ + try: + self._instance1.mark_prologue(_noop) + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Invoking the 'WorkflowBuilder.prologue' method with a " + "callable object SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + @self._instance2.mark_prologue + def _check_properly_configured() -> None: + if not os.environ.get("DB_PASSWORD", None): + _err_msg: str = "'DB_PASSWORD' not specified." + raise RuntimeError(_err_msg) + + assert self._instance1.prologue is _noop + assert self._instance2.prologue is _check_properly_configured + + def test_mark_prologue_return_value(self) -> None: + """:meth:`WorkflowBuilder.mark_prologue` should return the callable + object passed to it. + """ + + def _check_properly_configured() -> None: + if not os.environ.get("DB_PASSWORD", None): + _err_msg: str = "'DB_PASSWORD' not specified." + raise RuntimeError(_err_msg) + + assert self._instance1.mark_prologue(_noop) is _noop + assert ( + self._instance2.mark_epilogue(_check_properly_configured) + is _check_properly_configured + ) + def test_name_modification_with_valid_value_succeeds(self) -> None: """Setting a non-empty string to the :attr:`WorkflowBuilder.name` attribute should succeed. @@ -1063,6 +1232,50 @@ def test_processor_factories_return_value(self) -> None: assert len(self._instance2.processor_factories) == 1 assert self._instance2.processor_factories[0] == NOOPProcessor + def test_prologue_modification_with_an_invalid_value_fails(self) -> None: + """Setting a non-callable value to the :attr:`WorkflowBuilder.prologue` + attribute should raise a :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.prologue = non_callable # type: ignore[reportArgumentType] + + assert ( + exp_info.value.args[0] + == "'prologue' MUST be a callable object." + ) + + def test_prologue_modification_with_valid_value_succeeds(self) -> None: + """Setting a callable object to the :attr:`WorkflowBuilder.prologue` + attribute should succeed. + """ + + def _check_properly_configured() -> None: + if not os.environ.get("DB_PASSWORD", None): + _err_msg: str = "'DB_PASSWORD' not specified." + raise RuntimeError(_err_msg) + + try: + self._instance1.prologue = _noop + self._instance2.prologue = _check_properly_configured + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.prologue' attribute with a " + "callable object SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.prologue is _noop + assert self._instance2.prologue is _check_properly_configured + + def test_prologue_return_value(self) -> None: + r""":attr:`WorkflowBuilder.prologue` should return the callable object + to execute at the end of the assembled ``WorkflowDefinition``\ (s). + """ + assert callable(self._instance1.prologue) + assert self._instance2.prologue is _noop + def test_sink_factories_modification_with_valid_value_succeeds( self, ) -> None: diff --git a/test/sghi/etl/commons_tests/workflow_definitions_tests.py b/test/sghi/etl/commons_tests/workflow_definitions_tests.py index c1031fa..be79e70 100644 --- a/test/sghi/etl/commons_tests/workflow_definitions_tests.py +++ b/test/sghi/etl/commons_tests/workflow_definitions_tests.py @@ -36,6 +36,10 @@ def ints_to_str(ints: Iterable[int]) -> Iterable[str]: return ints_to_str +def _noop() -> None: + """Do nothing.""" + + def _printer_factory() -> Sink[Iterable[Any]]: return sink(print) @@ -60,6 +64,8 @@ def test_accessors_return_the_expected_values(self) -> None: description="A test workflow that takes ints and prints all them.", processor_factory=_ints_to_str_mapper_factory, sink_factory=_printer_factory, + epilogue=_noop, + prologue=_noop, ) assert wf_def.id == "test_workflow" @@ -68,9 +74,11 @@ def test_accessors_return_the_expected_values(self) -> None: wf_def.description == "A test workflow that takes ints and prints all them." ) + assert wf_def.prologue is _noop assert wf_def.source_factory is _ints_supplier_factory assert wf_def.processor_factory is _ints_to_str_mapper_factory assert wf_def.sink_factory is _printer_factory + assert wf_def.epilogue is _noop def test_instantiation_fails_on_none_str_id_argument(self) -> None: """Instantiating a :class:`SimpleWorkflowDefinition` with a non-string