Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processors): add a NOOP Processor #3

Merged
merged 1 commit into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,19 @@
nitpicky = True

nitpick_ignore = [
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
]

templates_path = ["templates"]
Expand Down
12 changes: 11 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
"""Collection of utilities for working with SGHI ETL Worflows."""
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"NOOPProcessor",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
]
117 changes: 117 additions & 0 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Common :class:`~sghi.etl.core.Processor` implementations."""

from __future__ import annotations

import logging
from logging import Logger
from typing import Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Processor
from sghi.utils import type_fqn

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT") # noqa: PYI018
""""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""


# =============================================================================
# PROCESSOR IMPLEMENTATIONS
# =============================================================================


@final
class NOOPProcessor(Processor[_RDT, _RDT], Generic[_RDT]):
"""A ``Processor`` that simply returns the received raw data as it is.

This :class:`~sghi.etl.core.Processor` subclass is a "no-operation" (NOOP)
processor. When the :meth:`apply` method is invoked on its instances, it
returns the received raw data (without any modifications) as processed data
to its caller. This can be useful as a placeholder processor or for
situations where data transformation is not needed.

.. admonition:: Regarding retry safety
:class: tip

Instances of this ``Processor`` are idempotent and thus inherently safe
to retry.
"""

__slots__ = ("_is_disposed", "_logger")

def __init__(self) -> None:
"""Create a ``NOOPProcessor`` instance."""
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:return: This instance.

:raise ResourceDisposedError: If this processor has already been
disposed.
"""
return super(Processor, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def apply(self, raw_data: _RDT) -> _RDT:
"""Apply the processing logic (NOOP in this case).

Return the received raw data without any modifications to the caller.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:param raw_data: The data to be processed.

:return: The raw data itself (without any modifications).

:raise ResourceDisposedError: If this processor has already been
disposed.
"""
self._logger.info("Skipping data processing. Return raw data as is.")
return raw_data

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"NOOPProcessor",
]
91 changes: 91 additions & 0 deletions test/sghi/etl/commons_tests/processors_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.processors` module."""

from __future__ import annotations

from unittest import TestCase

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import NOOPProcessor


class TestNOOPProcessor(TestCase):
"""Tests for the :class:`sghi.etl.commons.NOOPProcessor` class."""

def test_apply_returns_the_expected_value(self) -> None:
""":meth:`NOOPProcessor.apply` should return its argument without any
modifications.
"""
raw_data1: list[str] = ["some", "very", "important", "raw", "data"]
raw_data2: str = "some very important raw data"
raw_data3: int = 37
raw_data4: str | None = None

instance = NOOPProcessor()

assert instance.apply(raw_data1) is raw_data1
assert instance.apply(raw_data2) is raw_data2
assert instance.apply(raw_data3) == raw_data3
assert instance.apply(raw_data4) is None

instance.dispose()

def test_dispose_has_the_intended_side_effects(self) -> None:
"""Calling :meth:`NOOPProcessor.dispose` should result in the
:attr:`NOOPProcessor.is_disposed` property being set to ``True``.
"""
instance = NOOPProcessor()
instance.dispose()

assert instance.is_disposed

def test_multiple_dispose_invocations_is_okay(self) -> None:
"""Calling :meth:`NOOPProcessor.dispose` should be okay.

No errors should be raised and the object should remain disposed.
"""
instance = NOOPProcessor()

for _ in range(10):
try:
instance.dispose()
except Exception as exc: # noqa: BLE001
fail_reason: str = (
"Calling 'NOOPProcessor.dispose()' multiple times should "
f"be okay. But the following error was raised: {exc!s}"
)
pytest.fail(fail_reason)

assert instance.is_disposed

def test_usage_as_a_context_manager_behaves_as_expected(self) -> None:
""":class:`NOOPProcessor` instances are valid context managers and
should behave correctly when used as so.
"""
raw_data: list[str] = ["some", "very", "important", "raw", "data"]
with NOOPProcessor() as processor:
clean_data = processor.apply(raw_data)
assert clean_data is raw_data

assert processor.is_disposed

def test_usage_when_is_disposed_fails(self) -> None:
"""Invoking "resource-aware" methods of a disposed instance should
result in an :exc:`ResourceDisposedError` being raised.

Specifically, invoking the following two methods on a disposed instance
should fail:

- :meth:`NOOPProcessor.__enter__`
- :meth:`NOOPProcessor.apply`
"""
instance = NOOPProcessor()
instance.dispose()

with pytest.raises(ResourceDisposedError):
instance.apply("some raw data.")

with pytest.raises(ResourceDisposedError):
instance.__enter__()
Loading