diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index f1fd162..7742880 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -8,7 +8,7 @@ pipe_processors, processor, ) -from .sinks import NullSink, sink +from .sinks import NullSink, SplitSink, sink from .sources import GatherSource, source from .utils import fail_fast, fail_fast_factory, ignored_failed from .workflow_definitions import SimpleWorkflowDefinition @@ -21,6 +21,7 @@ "SimpleWorkflowDefinition", "ScatterGatherProcessor", "SplitGatherProcessor", + "SplitSink", "fail_fast", "fail_fast_factory", "ignored_failed", diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py index 6494505..576b28d 100644 --- a/src/sghi/etl/commons/sinks.py +++ b/src/sghi/etl/commons/sinks.py @@ -3,7 +3,9 @@ from __future__ import annotations import logging -from collections.abc import Callable +from collections.abc import Callable, Iterable, Sequence +from concurrent.futures import Executor, Future, ThreadPoolExecutor +from contextlib import ExitStack from functools import update_wrapper from logging import Logger from typing import Final, Generic, Self, TypeVar, final @@ -12,7 +14,17 @@ from sghi.disposable import not_disposed from sghi.etl.core import Sink -from sghi.utils import ensure_callable, type_fqn +from sghi.retry import Retry, noop_retry +from sghi.task import ConcurrentExecutor, Task, task +from sghi.utils import ( + ensure_callable, + ensure_instance_of, + ensure_not_none_nor_empty, + ensure_predicate, + type_fqn, +) + +from .utils import fail_fast # ============================================================================= # TYPES @@ -22,6 +34,10 @@ _PDT = TypeVar("_PDT") """Type variable representing the data type after processing.""" +_T = TypeVar("_T") + +_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]] + _SinkCallable = Callable[[_PDT], None] @@ -153,6 +169,236 @@ def dispose(self) -> None: self._logger.info("Disposal complete.") +@final +class SplitSink(Sink[Sequence[_PDT]], Generic[_PDT]): + """A :class:`Sink` that splits processed data and drains the split data to + multiple sinks. + + This ``Sink`` implementation takes aggregated processed data, splits it + into its constituent data parts, and then drains each data part to each + embedded sink concurrently. That is, each data part is mapped to each + embedded sink before executing all the embedded sinks concurrently. As + such, the supplied processed data's size must equal the number of embedded + sinks contained by a sink of this type. A result gatherer function can be + provided to specify how to handle errors while draining. A :class:`retry + policy` to handle transient draining errors to the embedded sinks + can also be provided. A suitable :class:`Executor` can be specified at + instantiation to control the concurrent draining to the embedded sinks. + + Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be + retried. However, they do support retrying their embedded sinks. This is + disabled by default but can be enabled by providing a suitable value to + the ``retry_policy_factory`` constructor parameter when creating new + instances. When enabled, each embedded sink will be retried individually + per the specified retry policy in case it fails. + + Disposing instances of this class also disposes of their embedded sinks. + + .. admonition:: Regarding retry safety + :class: tip + + Instances of this ``Sink`` are **NOT SAFE** to retry. + """ # noqa: D205 + + __slots__ = ( + "_sinks", + "_retry_policy_factory", + "_executor_factory", + "_result_gatherer", + "_is_disposed", + "_logger", + "_exit_stack", + "_prepped_sinks", + "_executor", + ) + + def __init__( + self, + sinks: Sequence[Sink[_PDT]], + retry_policy_factory: Callable[[], Retry] = noop_retry, + executor_factory: Callable[[], Executor] = ThreadPoolExecutor, + result_gatherer: _ResultGatherer[None] = fail_fast, + ) -> None: + """Create a new ``SplitSink`` of the given properties. + + :param sinks: A ``Sequence`` of sinks to drain each processed data part + to. These sinks are also referred to as the embedded sinks. The + given ``Sequence`` *MUST NOT* be empty. + :param retry_policy_factory: A callable that supplies retry policy + instance(s) to apply to each embedded sink. This *MUST* be a valid + callable object. Defaults to a factory that returns retry policies + that do nothing. + :param executor_factory: A callable that suppliers suitable + ``Executor`` instance(s) to use for the concurrent draining. This + *MUST* be a valid callable object. Defaults to a factory that + returns ``ThreadPoolExecutor`` instances. + :param result_gatherer: A function that specifies how to handle + draining errors. This *MUST* be a valid callable object. Defaults + to a gatherer that fails if draining to any of the embedded sinks + failed, or returns silently otherwise. + + :raise TypeError: If ``sinks`` is NOT a ``Sequence``. + :raise ValueError: If ``sinks`` is empty or if + ``retry_policy_factory``, ``executor_factory`` and + ``result_gatherer`` are NOT callable objects. + """ + super().__init__() + ensure_not_none_nor_empty( + value=ensure_instance_of( + value=sinks, + message="'sinks' MUST be a collections.abc.Sequence object.", + klass=Sequence, + ), + message="'sinks' MUST NOT be empty.", + ) + self._sinks: Sequence[Sink[_PDT]] = tuple(sinks) + self._retry_policy_factory: Callable[[], Retry] = ensure_callable( + value=retry_policy_factory, + message="'retry_policy_factory' MUST be a callable.", + ) + self._executor_factory: Callable[[], Executor] = ensure_callable( + value=executor_factory, + message="'executor_factory' MUST be a callable.", + ) + self._result_gatherer: _ResultGatherer[None] = ensure_callable( + value=result_gatherer, + message="'result_gatherer' MUST be a callable.", + ) + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._exit_stack: ExitStack = ExitStack() + + # Prepare embedded sinks for execution by ensuring that they are all + # disposed of properly once this object is disposed. + self._prepped_sinks: Sequence[Task[Sequence[_PDT], None]] = tuple( + self._sink_to_task(self._exit_stack.push(_sink), _i) + for _i, _sink in enumerate(self._sinks) + ) + self._executor: ConcurrentExecutor[Sequence[_PDT], None] + self._executor = ConcurrentExecutor( + *self._prepped_sinks, executor=self._executor_factory() + ) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this sink has already been disposed. + """ + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def drain(self, processed_data: Sequence[_PDT]) -> None: + """Split the supplied processed data and consume each data part. + + This method decomposes the provided aggregated processed data into its + constituent data parts and maps each data part to an embedded sink; + before executing all the embedded sinks concurrently. It then applies + the result-gatherer function assigned to this instance (at creation) to + the :class:`Future` objects resulting from the concurrent execution. + Each of these ``Future`` objects wraps the result of draining each data + part to an embedded sink contained in this ``SplitSink``, and they + preserve the same order. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :param processed_data: The aggregated processed data to consume. This + *MUST* be a ``Sequence`` of processed data values whose size *MUST + EQUAL* the number of embedded sinks contained by this + ``SplitSink``. + + :return: None. + + :raise ResourceDisposedError: If this sink has already been disposed. + :raise TypeError: If ``processed_data`` *IS NOT* a ``Sequence``. + :raise ValueError: If the size of ``processed_data`` *DOES NOT EQUAL* + the number of embedded sinks in this ``SplitSink``. + """ + ensure_instance_of( + value=processed_data, + klass=Sequence, + message=( + "'processed_data' MUST be a collections.abc.Sequence object." + ), + ) + ensure_predicate( + test=len(processed_data) == len(self._sinks), + message=( + f"Expected 'processed_data' to be of size {len(self._sinks)} " + f"but got size {len(processed_data)} instead." + ), + exc_factory=ValueError, + ) + self._logger.info( + "Splitting aggregated processed data and draining each data part " + "to all available sinks." + ) + + with self._executor as executor: + futures = executor.execute(processed_data) + + self._result_gatherer(futures) + + @override + def dispose(self) -> None: + """Release any underlying resources contained by this sink. + + All embedded sinks are also disposed. After this method returns + successfully, the :attr:`is_disposed` property should return ``True``. + + .. note:: + Unless otherwise specified, trying to use methods of a + ``Disposable`` instance decorated with the + :func:`~sghi.disposable.not_disposed` decorator after this method + returns should generally be considered a programming error and + should result in a :exc:`~sghi.disposable.ResourceDisposedError` + being raised. + + This method should be idempotent allowing it to be called more + than once; only the first call, however, should have an effect. + + :return: None. + """ + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.info("Disposal complete.") + + def _sink_to_task( + self, + s: Sink[_PDT], + i: int, + ) -> Task[Sequence[_PDT], None]: + @task + def do_drain(processed_data: Sequence[_PDT]) -> None: + with s as _s: + drain = self._retry_policy_factory().retry(_s.drain) + return drain(processed_data[i]) + + return do_drain + + @final class _SinkOfCallable(Sink[_PDT], Generic[_PDT]): __slots__ = ("_delegate_to", "_is_disposed", "_logger") @@ -227,5 +473,6 @@ def dispose(self) -> None: __all__ = [ "NullSink", + "SplitSink", "sink", ] diff --git a/test/sghi/etl/commons_tests/sinks_tests.py b/test/sghi/etl/commons_tests/sinks_tests.py index e5e6035..dff3b82 100644 --- a/test/sghi/etl/commons_tests/sinks_tests.py +++ b/test/sghi/etl/commons_tests/sinks_tests.py @@ -7,13 +7,14 @@ from unittest import TestCase import pytest +from typing_extensions import override from sghi.disposable import ResourceDisposedError -from sghi.etl.commons import NullSink, sink +from sghi.etl.commons import NullSink, SplitSink, sink from sghi.etl.core import Sink if TYPE_CHECKING: - from collections.abc import Iterable, MutableSequence + from collections.abc import Iterable, MutableSequence, MutableSet, Sequence def test_sink_decorator_delegates_to_the_wrapped_callable() -> None: @@ -166,3 +167,204 @@ def test_usage_when_is_disposed_fails(self) -> None: with pytest.raises(ResourceDisposedError): instance.__enter__() + + +class TestSplitSink(TestCase): + """Tests for the :class:`sghi.etl.commons.SplitSink` class.""" + + @override + def setUp(self) -> None: + super().setUp() + self._repository1: MutableSequence[int] = [] + self._repository2: MutableSet[int] = set() + + @sink + def save_ordered(value: int) -> None: + self._repository1.append(value) + + @sink + def save_randomly(value: int) -> None: + self._repository2.add(value) + + self._embedded_sinks: Sequence[Sink[int]] = [ + save_ordered, + save_randomly, + ] + self._instance: SplitSink[int] = SplitSink(self._embedded_sinks) + + @override + def tearDown(self) -> None: + super().tearDown() + self._instance.dispose() + + def test_dispose_has_the_intended_side_effects(self) -> None: + """Calling :meth:`SplitSink.dispose` should result in the + :attr:`SplitSink.is_disposed` property being set to ``True``. + + Each embedded ``Sink`` should also be disposed. + """ + self._instance.dispose() + + assert self._instance.is_disposed + for _sink in self._embedded_sinks: + assert _sink.is_disposed + + def test_drain_fails_on_non_sequence_input_value(self) -> None: + """:meth:`SplitSink.drain` should raise a :exc:`TypeError` when invoked + with a non ``Sequence`` argument. + """ + values = (None, 67, self._embedded_sinks[0]) + for value in values: + with pytest.raises(TypeError, match="Sequence") as exp_info: + self._instance.drain(value) # type: ignore + + assert ( + exp_info.value.args[0] + == "'processed_data' MUST be a collections.abc.Sequence " + "object." + ) + + def test_drain_fails_when_input_size_not_equal_no_of_embedded_sinks( + self, + ) -> None: + """:meth:`SplitSink.drain` should raise a :exc:`ValueError` when + invoked with an argument whose size does not equal the number of + embedded sinks. + """ + values = ([], list(range(5))) + for value in values: + with pytest.raises(ValueError, match="but got size") as exp_info: + self._instance.drain(value) + + assert exp_info.value.args[0] == ( + "Expected 'processed_data' to be of size " + f"{len(self._embedded_sinks)} " + f"but got size {len(value)} instead." + ) + + def test_drain_side_effects(self) -> None: + """:meth:`SplitSink.drain` should split the supplied processed data + into constituent data parts; then drain each data part to each embedded + sink. + """ + self._instance.drain([10, 50]) + + assert len(self._repository1) == 1 + assert len(self._repository2) == 1 + assert self._repository1[0] == 10 + assert 50 in self._repository2 + + def test_instantiation_fails_on_an_empty_processors_arg(self) -> None: + """Instantiating a :class:`SplitSink` with an empty ``sinks`` argument + should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="NOT be empty") as exp_info: + SplitSink(sinks=[]) + + assert exp_info.value.args[0] == "'sinks' MUST NOT be empty." + + def test_instantiation_fails_on_non_callable_executor_factory_arg( + self, + ) -> None: + """Instantiating a :class:`SplitSink` with a non-callable value for the + ``executor_factory`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + SplitSink(sinks=self._embedded_sinks, executor_factory=None) # type: ignore + + assert ( + exp_info.value.args[0] == "'executor_factory' MUST be a callable." + ) + + def test_instantiation_fails_on_non_callable_result_gatherer_arg( + self, + ) -> None: + """Instantiating a :class:`SplitSink` with a non-callable value for the + ``result_gatherer`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + SplitSink(sinks=self._embedded_sinks, result_gatherer=None) # type: ignore + + assert ( + exp_info.value.args[0] == "'result_gatherer' MUST be a callable." + ) + + def test_instantiation_fails_on_non_callable_retry_policy_factory_arg( + self, + ) -> None: + """Instantiating a :class:`SplitSink` with a non-callable value for the + ``retry_policy_factory`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + SplitSink(sinks=self._embedded_sinks, retry_policy_factory=None) # type: ignore + + assert ( + exp_info.value.args[0] + == "'retry_policy_factory' MUST be a callable." + ) + + def test_instantiation_fails_on_non_sequence_sinks_arg(self) -> None: + """Instantiating a :class:`SplitSink` with a non ``Sequence`` ``sinks`` + argument should raise a :exc:`TypeError`. + """ + values = (None, 67, self._embedded_sinks[0]) + for value in values: + with pytest.raises(TypeError, match="Sequence") as exp_info: + SplitSink(sinks=value) # type: ignore + + assert ( + exp_info.value.args[0] + == "'sinks' MUST be a collections.abc.Sequence object." + ) + + def test_multiple_dispose_invocations_is_okay(self) -> None: + """Calling :meth:`SplitSink.dispose` multiple times should be okay. + + No errors should be raised and the object should remain disposed. + """ + for _ in range(10): + try: + self._instance.dispose() + except Exception as exc: # noqa: BLE001 + fail_reason: str = ( + "Calling 'SplitSink.dispose()' multiple times should be " + f"okay. But the following error was raised: {exc!s}" + ) + pytest.fail(fail_reason) + + assert self._instance.is_disposed + for _sinks in self._embedded_sinks: + assert _sinks.is_disposed + + def test_usage_as_a_context_manager_behaves_as_expected(self) -> None: + """:class:`SplitSink` instances are valid context managers and should + behave correctly when used as so. + """ + with self._instance: + self._instance.drain([-100, 0]) + assert len(self._repository1) == 1 + assert len(self._repository2) == 1 + assert self._repository1[0] == -100 + assert 0 in self._repository2 + + assert self._instance.is_disposed + for _sink in self._embedded_sinks: + assert _sink.is_disposed + + def test_usage_when_is_disposed_fails(self) -> None: + """Invoking "resource-aware" methods of a disposed instance should + result in an :exc:`ResourceDisposedError` being raised. + + Specifically, invoking the following two methods on a disposed instance + should fail: + + - :meth:`SplitSink.__enter__` + - :meth:`SplitSink.drain` + """ + self._instance.dispose() + + with pytest.raises(ResourceDisposedError): + self._instance.drain([10, 10]) + + with pytest.raises(ResourceDisposedError): + self._instance.__enter__()