From 5ab30413d1be45c9d20c4b7b0931c4598b0d092f Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Tue, 16 Apr 2024 16:49:34 +0300 Subject: [PATCH] feat(worflow_def): add a `SimpleWorkflowDefinition` Add a simple, `sghi.etl.core.WorkflowDefinition` implementation. --- docs/conf.py | 8 + src/sghi/etl/commons/__init__.py | 2 + src/sghi/etl/commons/processors.py | 2 +- src/sghi/etl/commons/sinks.py | 2 +- src/sghi/etl/commons/workflow_definitions.py | 162 +++++++++++++ .../workflow_definitions_tests.py | 213 ++++++++++++++++++ 6 files changed, 387 insertions(+), 2 deletions(-) create mode 100644 src/sghi/etl/commons/workflow_definitions.py create mode 100644 test/sghi/etl/commons_tests/workflow_definitions_tests.py diff --git a/docs/conf.py b/docs/conf.py index d8aad8d..67e0a1b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,9 +56,12 @@ nitpicky = True nitpick_ignore = [ + ("py:class", "_RDT"), # private type annotations + ("py:class", "_PDT"), # private type annotations ("py:class", "Processor"), # docs aren't published yet ("py:class", "Sink"), # docs aren't published yet ("py:class", "Source"), # docs aren't published yet + ("py:class", "WorkflowDefinition"), # docs aren't published yet ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it ("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations @@ -66,11 +69,14 @@ ("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations ("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations ("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations ("py:class", "sghi.etl.core._RDT"), # private type annotations ("py:class", "sghi.etl.core._PDT"), # private type annotations ("py:class", "sghi.etl.core.Processor"), # docs aren't published yet ("py:class", "sghi.etl.core.Sink"), # docs aren't published yet ("py:class", "sghi.etl.core.Source"), # docs aren't published yet + ("py:class", "sghi.etl.core.WorkflowDefinition"), # docs aren't published yet ("py:class", "sghi.retry.Retry"), # docs aren't published yet ("py:exc", "ResourceDisposedError"), # docs aren't published yet ("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet @@ -79,6 +85,8 @@ ("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations ("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations ("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations ] templates_path = ["templates"] diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index c43e42d..d1489d1 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -9,11 +9,13 @@ from .sinks import NullSink, sink from .sources import source from .utils import fail_fast, fail_fast_factory, ignored_failed +from .workflow_definitions import SimpleWorkflowDefinition __all__ = [ "NOOPProcessor", "NullSink", "ProcessorPipe", + "SimpleWorkflowDefinition", "fail_fast", "fail_fast_factory", "ignored_failed", diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index 5240bc1..fabb2e7 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -23,7 +23,7 @@ _PDT = TypeVar("_PDT") -""""Type variable representing the data type after processing.""" +"""Type variable representing the data type after processing.""" _RDT = TypeVar("_RDT") """Type variable representing the raw data type.""" diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py index d883d3a..6494505 100644 --- a/src/sghi/etl/commons/sinks.py +++ b/src/sghi/etl/commons/sinks.py @@ -20,7 +20,7 @@ _PDT = TypeVar("_PDT") -""""Type variable representing the data type after processing.""" +"""Type variable representing the data type after processing.""" _SinkCallable = Callable[[_PDT], None] diff --git a/src/sghi/etl/commons/workflow_definitions.py b/src/sghi/etl/commons/workflow_definitions.py new file mode 100644 index 0000000..80bd473 --- /dev/null +++ b/src/sghi/etl/commons/workflow_definitions.py @@ -0,0 +1,162 @@ +"""Common :class:`sghi.etl.core.WorkflowDefinition` implementations.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Generic, TypeVar, final + +from typing_extensions import override + +from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition +from sghi.utils import ( + ensure_callable, + ensure_instance_of, + ensure_not_none_nor_empty, + ensure_optional_instance_of, +) + +from .processors import NOOPProcessor +from .sinks import NullSink + +if TYPE_CHECKING: + from collections.abc import Callable + + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") +"""Type variable representing the data type after processing.""" + +_RDT = TypeVar("_RDT") +"""Type variable representing the raw data type.""" + + +# ============================================================================= +# SPEC IMPLEMENTATIONS +# ============================================================================= + + +@final +class SimpleWorkflowDefinition( + WorkflowDefinition[_RDT, _PDT], + Generic[_RDT, _PDT], +): + """A simple :class:`WorkflowDefinition` implementation.""" + + def __init__( + self, + id: str, # noqa: A002 + name: str, + source_factory: Callable[[], Source[_RDT]], + description: str | None = None, + processor_factory: Callable[[], Processor[_RDT, _PDT]] = NOOPProcessor, + sink_factory: Callable[[], Sink[_PDT]] = NullSink, + ) -> None: + """Create a new ``WorkflowDefinition`` with the provided properties. + + The ``id``, ``name`` and ``source_factory`` parameters are mandatory + and MUST be valid (see individual parameter docs for details). + Providing invalid parameters will lead to either a :exc:`TypeError` ( + for values of the wrong type) or :exc:`ValueError` being raised. + + :param id: A unique identifier to assign to the created workflow. + This MUST be a non-empty string. + :param name: A name to assign to the created workflow. This MUST be + a non-empty string. + :param source_factory: A function that suppliers the ``Source`` + associated with the created workflow. This MUST be a valid + callable. + :param description: An optional description to assign to the created + workflow. This MUST be a valid string when NOT ``None``. Defaults + to ``None`` when not provided. + :param processor_factory: An optional function that suppliers the + ``Processor`` associated with the created workflow. This MUST be a + valid callable. Defaults to ``NOOPProcessor`` when not provided. + :param sink_factory: A function that suppliers the ``Sink`` associated + with the created workflow. This MUST be a valid callable. Defaults + to ``NullSink`` when not provided. + + :raise TypeError: If ``id`` or ``name`` are NOT strings, or if + ``description`` is provided but is NOT a string. + :raise ValueError: If one of the following is ``True``; ``id`` is an + empty string, ``name`` is an empty string, ``source_factory`` is + NOT a valid callable, ``processor_factory`` is NOT a valid callable + or ``sink_factory`` is NOT a valid callable. + """ + super().__init__() + self._id: str = ensure_not_none_nor_empty( + value=ensure_instance_of( + value=id, + klass=str, + message="'id' MUST be a string.", + ), + message="'id' MUST NOT be an empty string.", + ) + self._name: str = ensure_not_none_nor_empty( + value=ensure_instance_of( + value=name, + klass=str, + message="'name' MUST be a string.", + ), + message="'name' MUST NOT be an empty string.", + ) + self._source_factory: Callable[[], Source[_RDT]] = ensure_callable( + value=source_factory, + message="'source_factory' MUST be a callable object.", + ) + self._description: str | None = ensure_optional_instance_of( + value=description, + klass=str, + message="'description' MUST be a string when NOT None.", + ) + self._processor_factory: Callable[[], Processor[_RDT, _PDT]] + self._processor_factory = ensure_callable( + value=processor_factory, + message="'processor_factory' MUST be a callable object.", + ) + self._sink_factory: Callable[[], Sink[_PDT]] = ensure_callable( + value=sink_factory, + message="'sink_factory' MUST be a callable object.", + ) + + @property + @override + def id(self) -> str: + return self._id + + @property + @override + def name(self) -> str: + return self._name + + @property + @override + def description(self) -> str | None: + return self._description + + @property + @override + def source_factory(self) -> Callable[[], Source[_RDT]]: + return self._source_factory + + @property + @override + def processor_factory(self) -> Callable[[], Processor[_RDT, _PDT]]: + return self._processor_factory + + @property + @override + def sink_factory(self) -> Callable[[], Sink[_PDT]]: + return self._sink_factory + + +# ============================================================================= +# MODULE EXPORTS +# ============================================================================= + + +__all__ = [ + "SimpleWorkflowDefinition", +] diff --git a/test/sghi/etl/commons_tests/workflow_definitions_tests.py b/test/sghi/etl/commons_tests/workflow_definitions_tests.py new file mode 100644 index 0000000..625a38a --- /dev/null +++ b/test/sghi/etl/commons_tests/workflow_definitions_tests.py @@ -0,0 +1,213 @@ +# ruff: noqa: D205 +"""Tests for the :module:`sghi.etl.commons.workflow_definitions` module.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any +from unittest import TestCase + +import pytest + +from sghi.etl.commons import SimpleWorkflowDefinition, processor, sink, source + +if TYPE_CHECKING: + from collections.abc import Iterable + + from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition + +# ============================================================================= +# TESTS HELPERS +# ============================================================================= + + +def _ints_supplier_factory() -> Source[Iterable[int]]: + @source + def supply_ints(count: int = 5) -> Iterable[int]: + yield from range(count) + + return supply_ints + + +def _ints_to_str_mapper_factory() -> Processor[Iterable[int], Iterable[str]]: + @processor + def ints_to_str(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + return ints_to_str + + +def _printer_factory() -> Sink[Iterable[Any]]: + return sink(print) + + +# ============================================================================= +# TESTS +# ============================================================================= + + +class TestSimpleWorkflowDefinitions(TestCase): + """Tests for :class:`sghi.etl.commons.SimpleWorkflowDefinition` class.""" + + def test_accessors_return_the_expected_values(self) -> None: + """:class:`SimpleWorkflowDefinition` accessors should return the + assigned values at creation. + """ + wf_def: WorkflowDefinition[Iterable[int], Iterable[str]] + wf_def = SimpleWorkflowDefinition( + id="test_workflow", + name="Test Workflow", + source_factory=_ints_supplier_factory, + description="A test workflow that takes ints and prints all them.", + processor_factory=_ints_to_str_mapper_factory, + sink_factory=_printer_factory, + ) + + assert wf_def.id == "test_workflow" + assert wf_def.name == "Test Workflow" + assert ( + wf_def.description + == "A test workflow that takes ints and prints all them." + ) + assert wf_def.source_factory is _ints_supplier_factory + assert wf_def.processor_factory is _ints_to_str_mapper_factory + assert wf_def.sink_factory is _printer_factory + + def test_instantiation_fails_on_none_str_id_argument(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with a non-string + ``id`` argument should raise a :exc:`TypeError`. + """ + invalid_ids = (None, 1, [], _ints_supplier_factory) + + for invalid_id in invalid_ids: + with pytest.raises(TypeError, match="be a string") as exp_info: + SimpleWorkflowDefinition( + id=invalid_id, # type: ignore + name="Test Workflow", + source_factory=_ints_supplier_factory, + ) + + assert exp_info.value.args[0] == "'id' MUST be a string." + + def test_instantiation_fails_on_an_empty_str_id_argument(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with an empty + ``id`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="NOT be an empty") as exp_info: + SimpleWorkflowDefinition( + id="", + name="Test Workflow", + source_factory=_ints_supplier_factory, + ) + + assert exp_info.value.args[0] == "'id' MUST NOT be an empty string." + + def test_instantiation_fails_on_none_str_name_argument(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with a non-string + ``name`` argument should raise a :exc:`TypeError`. + """ + invalid_names = (None, 1, [], _ints_supplier_factory) + + for invalid_name in invalid_names: + with pytest.raises(TypeError, match="be a string") as exp_info: + SimpleWorkflowDefinition( + id="test_workflow", + name=invalid_name, # type: ignore + source_factory=_ints_supplier_factory, + ) + + assert exp_info.value.args[0] == "'name' MUST be a string." + + def test_instantiation_fails_on_an_empty_str_name_argument(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with an empty + ``name`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="NOT be an empty") as exp_info: + SimpleWorkflowDefinition( + id="test_workflow", + name="", + source_factory=_ints_supplier_factory, + ) + + assert exp_info.value.args[0] == "'name' MUST NOT be an empty string." + + def test_instantiation_fails_on_invalid_source_factory_arg(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with a + non-callable ``source_factory`` argument should raise a + :exc:`ValueError`. + """ + invalid_src_factories = (None, 1, [], (), "not a callable") + + for invalid_src_factory in invalid_src_factories: + with pytest.raises(ValueError, match="be a callable") as exp_info: + SimpleWorkflowDefinition( + id="test_workflow", + name="Test Workflow", + source_factory=invalid_src_factory, # type: ignore + ) + + assert ( + exp_info.value.args[0] + == "'source_factory' MUST be a callable object." + ) + + def test_instantiation_fails_on_invalid_description_argument(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with a non-string + or ``None`` ``description`` fails. + """ + invalid_descriptions = (1, (), [], _ints_supplier_factory) + + for invalid_description in invalid_descriptions: + with pytest.raises(TypeError, match="be a string") as exp_info: + SimpleWorkflowDefinition( + id="test_workflow", + name="Test Workflow", + source_factory=_ints_supplier_factory, + description=invalid_description, # type: ignore + ) + + assert ( + exp_info.value.args[0] + == "'description' MUST be a string when NOT None." + ) + + def test_instantiation_fails_on_invld_processor_factory_arg(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with a + non-callable ``processor_factory`` argument should raise a + :exc:`ValueError`. + """ + invalid_prc_factories = (None, 1, [], (), "not a callable") + + for invalid_prc_factory in invalid_prc_factories: + with pytest.raises(ValueError, match="be a callable") as exp_info: + SimpleWorkflowDefinition( + id="test_workflow", + name="Test Workflow", + source_factory=_ints_supplier_factory, + processor_factory=invalid_prc_factory, # type: ignore + ) + + assert ( + exp_info.value.args[0] + == "'processor_factory' MUST be a callable object." + ) + + def test_instantiation_fails_on_invalid_sink_factory_arg(self) -> None: + """Instantiating a :class:`SimpleWorkflowDefinition` with a + non-callable ``sink_factory`` argument should raise a + :exc:`ValueError`. + """ + invalid_sink_factories = (None, 1, [], (), "not a callable") + + for invalid_sink_factory in invalid_sink_factories: + with pytest.raises(ValueError, match="be a callable") as exp_info: + SimpleWorkflowDefinition( + id="test_workflow", + name="Test Workflow", + source_factory=_ints_supplier_factory, + sink_factory=invalid_sink_factory, # type: ignore + ) + + assert ( + exp_info.value.args[0] + == "'sink_factory' MUST be a callable object." + )