Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(worflow_def): add a SimpleWorkflowDefinition
Browse files Browse the repository at this point in the history
Add a simple, `sghi.etl.core.WorkflowDefinition` implementation.
kennedykori committed Apr 16, 2024
1 parent 82a006e commit 5ab3041
Showing 6 changed files with 387 additions and 2 deletions.
8 changes: 8 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
@@ -56,21 +56,27 @@
nitpicky = True

nitpick_ignore = [
("py:class", "_RDT"), # private type annotations
("py:class", "_PDT"), # private type annotations
("py:class", "Processor"), # docs aren't published yet
("py:class", "Sink"), # docs aren't published yet
("py:class", "Source"), # docs aren't published yet
("py:class", "WorkflowDefinition"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Sink"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:class", "sghi.etl.core.WorkflowDefinition"), # docs aren't published yet
("py:class", "sghi.retry.Retry"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
@@ -79,6 +85,8 @@
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
]

templates_path = ["templates"]
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -9,11 +9,13 @@
from .sinks import NullSink, sink
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed
from .workflow_definitions import SimpleWorkflowDefinition

__all__ = [
"NOOPProcessor",
"NullSink",
"ProcessorPipe",
"SimpleWorkflowDefinition",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@


_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""
"""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@


_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""
"""Type variable representing the data type after processing."""

_SinkCallable = Callable[[_PDT], None]

162 changes: 162 additions & 0 deletions src/sghi/etl/commons/workflow_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Common :class:`sghi.etl.core.WorkflowDefinition` implementations."""

from __future__ import annotations

from typing import TYPE_CHECKING, Generic, TypeVar, final

from typing_extensions import override

from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition
from sghi.utils import (
ensure_callable,
ensure_instance_of,
ensure_not_none_nor_empty,
ensure_optional_instance_of,
)

from .processors import NOOPProcessor
from .sinks import NullSink

if TYPE_CHECKING:
from collections.abc import Callable


# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""


# =============================================================================
# SPEC IMPLEMENTATIONS
# =============================================================================


@final
class SimpleWorkflowDefinition(
WorkflowDefinition[_RDT, _PDT],
Generic[_RDT, _PDT],
):
"""A simple :class:`WorkflowDefinition` implementation."""

def __init__(
self,
id: str, # noqa: A002
name: str,
source_factory: Callable[[], Source[_RDT]],
description: str | None = None,
processor_factory: Callable[[], Processor[_RDT, _PDT]] = NOOPProcessor,
sink_factory: Callable[[], Sink[_PDT]] = NullSink,
) -> None:
"""Create a new ``WorkflowDefinition`` with the provided properties.
The ``id``, ``name`` and ``source_factory`` parameters are mandatory
and MUST be valid (see individual parameter docs for details).
Providing invalid parameters will lead to either a :exc:`TypeError` (
for values of the wrong type) or :exc:`ValueError` being raised.
:param id: A unique identifier to assign to the created workflow.
This MUST be a non-empty string.
:param name: A name to assign to the created workflow. This MUST be
a non-empty string.
:param source_factory: A function that suppliers the ``Source``
associated with the created workflow. This MUST be a valid
callable.
:param description: An optional description to assign to the created
workflow. This MUST be a valid string when NOT ``None``. Defaults
to ``None`` when not provided.
:param processor_factory: An optional function that suppliers the
``Processor`` associated with the created workflow. This MUST be a
valid callable. Defaults to ``NOOPProcessor`` when not provided.
:param sink_factory: A function that suppliers the ``Sink`` associated
with the created workflow. This MUST be a valid callable. Defaults
to ``NullSink`` when not provided.
:raise TypeError: If ``id`` or ``name`` are NOT strings, or if
``description`` is provided but is NOT a string.
:raise ValueError: If one of the following is ``True``; ``id`` is an
empty string, ``name`` is an empty string, ``source_factory`` is
NOT a valid callable, ``processor_factory`` is NOT a valid callable
or ``sink_factory`` is NOT a valid callable.
"""
super().__init__()
self._id: str = ensure_not_none_nor_empty(
value=ensure_instance_of(
value=id,
klass=str,
message="'id' MUST be a string.",
),
message="'id' MUST NOT be an empty string.",
)
self._name: str = ensure_not_none_nor_empty(
value=ensure_instance_of(
value=name,
klass=str,
message="'name' MUST be a string.",
),
message="'name' MUST NOT be an empty string.",
)
self._source_factory: Callable[[], Source[_RDT]] = ensure_callable(
value=source_factory,
message="'source_factory' MUST be a callable object.",
)
self._description: str | None = ensure_optional_instance_of(
value=description,
klass=str,
message="'description' MUST be a string when NOT None.",
)
self._processor_factory: Callable[[], Processor[_RDT, _PDT]]
self._processor_factory = ensure_callable(
value=processor_factory,
message="'processor_factory' MUST be a callable object.",
)
self._sink_factory: Callable[[], Sink[_PDT]] = ensure_callable(
value=sink_factory,
message="'sink_factory' MUST be a callable object.",
)

@property
@override
def id(self) -> str:
return self._id

@property
@override
def name(self) -> str:
return self._name

@property
@override
def description(self) -> str | None:
return self._description

@property
@override
def source_factory(self) -> Callable[[], Source[_RDT]]:
return self._source_factory

@property
@override
def processor_factory(self) -> Callable[[], Processor[_RDT, _PDT]]:
return self._processor_factory

@property
@override
def sink_factory(self) -> Callable[[], Sink[_PDT]]:
return self._sink_factory


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"SimpleWorkflowDefinition",
]
Loading

0 comments on commit 5ab3041

Please sign in to comment.