Skip to content

Commit

Permalink
feat(core): add support for prologue and epilogue workflow proper…
Browse files Browse the repository at this point in the history
…ties

This builds on [this changes](savannahghi/sghi-etl-core#22)
on the core library by adding `prologue` and `epilogue` properties
to `sghi.etl.core.WorkfowDefinition` implimentations and add support
for working with this properties on existing runners.
  • Loading branch information
kennedykori committed Dec 26, 2024
1 parent 002661d commit d49cc9c
Show file tree
Hide file tree
Showing 7 changed files with 776 additions and 53 deletions.
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
nitpicky = True

nitpick_ignore = [
("py:attr", "sghi.etl.core.WorkflowDefinition.epilogue"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.prologue"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.processor_factory"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.sink_factory"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.source_factory"), # docs aren't published yet
Expand Down
78 changes: 49 additions & 29 deletions src/sghi/etl/commons/utils/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,37 @@ def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
``WorkflowDefinition``/workflow and then executes it. The execution of the
workflow proceeds as follows:
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of
the supplied ``WorkflowDefinition`` is used to get the
:class:`~sghi.etl.core.Source` associated with the workflow. The
:meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then
invoked to get the raw data to process.
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property
of the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Processor` associated with the workflow. This
``Processor`` is then applied to the raw data retrieved from the
``Source`` in the previous step to obtain processed data.
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of
the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Sink` associated with the workflow. The
processed data from the previous step is drained into this ``Sink``.
- The ``Source``, ``Processor`` and ``Sink`` created in the previous
steps are disposed of. Note that this disposal also happens if an
error occurs during the workflow execution.
1. The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.prologue` property is
invoked first. If an error occurs while executing the callable, all
the rest of the steps, except the last, are skipped.
2. The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of
the supplied ``WorkflowDefinition`` is used to get the
:class:`~sghi.etl.core.Source` associated with the workflow. The
:meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then
invoked to get the raw data to process. If an error occurs while
drawing data for the ``Source``, execution jumps to step 5.
3. The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property
of the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Processor` associated with the workflow. This
``Processor`` is then applied to the raw data retrieved from the
``Source`` in the previous step to obtain processed data. If an
error occurs while processing the raw data, execution jumps to
step 5.
4. The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of
the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Sink` associated with the workflow. The
processed data from the previous step is drained into this ``Sink``.
5. The ``Source``, ``Processor`` and ``Sink`` created in the previous
steps are disposed of. Note that this disposal also happens if an
error occurs while executing any of the previous steps.
6. The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.epilogue` property is
invoked last. This is always invoked regardless of whether all the
steps in the workflow completed successfully or not.
.. note::
Expand All @@ -97,12 +108,21 @@ def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
ensure_callable(wf, message="'wf' MUST be a valid callable object.")

wd: WorkflowDefinition = wf()
_LOGGER.info("[%s:%s] Starting workflow execution ...", wd.id, wd.name)
with (
wd.source_factory() as source,
wd.processor_factory() as processor,
wd.sink_factory() as sink,
):
sink.drain(processor.apply(source.draw()))

_LOGGER.info("[%s:%s] Workflow execution complete.", wd.id, wd.name)
try:
_LOGGER.info("[%s:%s] Setting up workflow ...", wd.id, wd.name)
wd.prologue()
_LOGGER.info("[%s:%s] Starting workflow execution ...", wd.id, wd.name)
with (
wd.source_factory() as source,
wd.processor_factory() as processor,
wd.sink_factory() as sink,
):
sink.drain(processor.apply(source.draw()))
_LOGGER.info(
"[%s:%s] Workflow execution complete. Cleaning up ...",
wd.id,
wd.name,
)
finally:
wd.epilogue()
_LOGGER.info("[%s:%s] Done :)", wd.id, wd.name)
Loading

0 comments on commit d49cc9c

Please sign in to comment.