From 7ad1deb2fd31f8cad3c26fe16546a122c8ebcfb0 Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Sun, 14 Apr 2024 16:13:05 +0300 Subject: [PATCH] feat(processors): add a NOOP `Processor` Add a new `sghi.etl.core.Processor` implementation, `NOOPProcessor`. This processor simply returns any data it receives without any modifications. It can be used as a placeholder or for situations where data transformation is not needed. --- docs/conf.py | 11 ++ src/sghi/etl/commons/__init__.py | 12 +- src/sghi/etl/commons/processors.py | 117 ++++++++++++++++++ .../etl/commons_tests/processors_tests.py | 91 ++++++++++++++ 4 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 src/sghi/etl/commons/processors.py create mode 100644 test/sghi/etl/commons_tests/processors_tests.py diff --git a/docs/conf.py b/docs/conf.py index 4f1f7c8..9d4b4dd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,8 +56,19 @@ nitpicky = True nitpick_ignore = [ + ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it + ("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations + ("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations ("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations + ("py:class", "sghi.etl.core._RDT"), # private type annotations + ("py:class", "sghi.etl.core._PDT"), # private type annotations + ("py:class", "sghi.etl.core.Processor"), # docs aren't published yet + ("py:exc", "ResourceDisposedError"), # docs aren't published yet + ("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet + ("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet + ("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations + ("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations ] templates_path = ["templates"] diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index b89986d..5cc051d 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -1 +1,11 @@ -"""Collection of utilities for working with SGHI ETL Worflows.""" +"""Collection of utilities for working with SGHI ETL Workflows.""" + +from .processors import NOOPProcessor +from .utils import fail_fast, fail_fast_factory, ignored_failed + +__all__ = [ + "NOOPProcessor", + "fail_fast", + "fail_fast_factory", + "ignored_failed", +] diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py new file mode 100644 index 0000000..294b534 --- /dev/null +++ b/src/sghi/etl/commons/processors.py @@ -0,0 +1,117 @@ +"""Common :class:`~sghi.etl.core.Processor` implementations.""" + +from __future__ import annotations + +import logging +from logging import Logger +from typing import Generic, Self, TypeVar, final + +from typing_extensions import override + +from sghi.disposable import not_disposed +from sghi.etl.core import Processor +from sghi.utils import type_fqn + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") # noqa: PYI018 +""""Type variable representing the data type after processing.""" + +_RDT = TypeVar("_RDT") +"""Type variable representing the raw data type.""" + + +# ============================================================================= +# PROCESSOR IMPLEMENTATIONS +# ============================================================================= + + +@final +class NOOPProcessor(Processor[_RDT, _RDT], Generic[_RDT]): + """A ``Processor`` that simply returns the received raw data as it is. + + This :class:`~sghi.etl.core.Processor` subclass is a "no-operation" (NOOP) + processor. When the :meth:`apply` method is invoked on its instances, it + returns the received raw data (without any modifications) as processed data + to its caller. This can be useful as a placeholder processor or for + situations where data transformation is not needed. + + .. admonition:: Regarding retry safety + :class: tip + + Instances of this ``Processor`` are idempotent and thus inherently safe + to retry. + """ + + __slots__ = ("_is_disposed", "_logger") + + def __init__(self) -> None: + """Create a ``NOOPProcessor`` instance.""" + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this processor has already been + disposed. + """ + return super(Processor, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def apply(self, raw_data: _RDT) -> _RDT: + """Apply the processing logic (NOOP in this case). + + Return the received raw data without any modifications to the caller. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :param raw_data: The data to be processed. + + :return: The raw data itself (without any modifications). + + :raise ResourceDisposedError: If this processor has already been + disposed. + """ + self._logger.info("Skipping data processing. Return raw data as is.") + return raw_data + + @override + def dispose(self) -> None: + self._is_disposed = True + self._logger.info("Disposal complete.") + + +# ============================================================================= +# MODULE EXPORTS +# ============================================================================= + + +__all__ = [ + "NOOPProcessor", +] diff --git a/test/sghi/etl/commons_tests/processors_tests.py b/test/sghi/etl/commons_tests/processors_tests.py new file mode 100644 index 0000000..991120e --- /dev/null +++ b/test/sghi/etl/commons_tests/processors_tests.py @@ -0,0 +1,91 @@ +# ruff: noqa: D205 +"""Tests for the :module:`sghi.etl.commons.processors` module.""" + +from __future__ import annotations + +from unittest import TestCase + +import pytest + +from sghi.disposable import ResourceDisposedError +from sghi.etl.commons import NOOPProcessor + + +class TestNOOPProcessor(TestCase): + """Tests for the :class:`sghi.etl.commons.NOOPProcessor` class.""" + + def test_apply_returns_the_expected_value(self) -> None: + """:meth:`NOOPProcessor.apply` should return its argument without any + modifications. + """ + raw_data1: list[str] = ["some", "very", "important", "raw", "data"] + raw_data2: str = "some very important raw data" + raw_data3: int = 37 + raw_data4: str | None = None + + instance = NOOPProcessor() + + assert instance.apply(raw_data1) is raw_data1 + assert instance.apply(raw_data2) is raw_data2 + assert instance.apply(raw_data3) == raw_data3 + assert instance.apply(raw_data4) is None + + instance.dispose() + + def test_dispose_has_the_intended_side_effects(self) -> None: + """Calling :meth:`NOOPProcessor.dispose` should result in the + :attr:`NOOPProcessor.is_disposed` property being set to ``True``. + """ + instance = NOOPProcessor() + instance.dispose() + + assert instance.is_disposed + + def test_multiple_dispose_invocations_is_okay(self) -> None: + """Calling :meth:`NOOPProcessor.dispose` should be okay. + + No errors should be raised and the object should remain disposed. + """ + instance = NOOPProcessor() + + for _ in range(10): + try: + instance.dispose() + except Exception as exc: # noqa: BLE001 + fail_reason: str = ( + "Calling 'NOOPProcessor.dispose()' multiple times should " + f"be okay. But the following error was raised: {exc!s}" + ) + pytest.fail(fail_reason) + + assert instance.is_disposed + + def test_usage_as_a_context_manager_behaves_as_expected(self) -> None: + """:class:`NOOPProcessor` instances are valid context managers and + should behave correctly when used as so. + """ + raw_data: list[str] = ["some", "very", "important", "raw", "data"] + with NOOPProcessor() as processor: + clean_data = processor.apply(raw_data) + assert clean_data is raw_data + + assert processor.is_disposed + + def test_usage_when_is_disposed_fails(self) -> None: + """Invoking "resource-aware" methods of a disposed instance should + result in an :exc:`ResourceDisposedError` being raised. + + Specifically, invoking the following two methods on a disposed instance + should fail: + + - :meth:`NOOPProcessor.__enter__` + - :meth:`NOOPProcessor.apply` + """ + instance = NOOPProcessor() + instance.dispose() + + with pytest.raises(ResourceDisposedError): + instance.apply("some raw data.") + + with pytest.raises(ResourceDisposedError): + instance.__enter__()