Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worflow_def): add a SimpleWorkflowDefinition #11

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,27 @@
nitpicky = True

nitpick_ignore = [
("py:class", "_RDT"), # private type annotations
("py:class", "_PDT"), # private type annotations
("py:class", "Processor"), # docs aren't published yet
("py:class", "Sink"), # docs aren't published yet
("py:class", "Source"), # docs aren't published yet
("py:class", "WorkflowDefinition"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Sink"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:class", "sghi.etl.core.WorkflowDefinition"), # docs aren't published yet
("py:class", "sghi.retry.Retry"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
Expand All @@ -79,6 +85,8 @@
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
]

templates_path = ["templates"]
Expand Down
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from .sinks import NullSink, sink
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed
from .workflow_definitions import SimpleWorkflowDefinition

__all__ = [
"NOOPProcessor",
"NullSink",
"ProcessorPipe",
"SimpleWorkflowDefinition",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
Expand Down
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""
"""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""
Expand Down
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""
"""Type variable representing the data type after processing."""

_SinkCallable = Callable[[_PDT], None]

Expand Down
162 changes: 162 additions & 0 deletions src/sghi/etl/commons/workflow_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Common :class:`sghi.etl.core.WorkflowDefinition` implementations."""

from __future__ import annotations

from typing import TYPE_CHECKING, Generic, TypeVar, final

from typing_extensions import override

from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition
from sghi.utils import (
ensure_callable,
ensure_instance_of,
ensure_not_none_nor_empty,
ensure_optional_instance_of,
)

from .processors import NOOPProcessor
from .sinks import NullSink

if TYPE_CHECKING:
from collections.abc import Callable


# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""


# =============================================================================
# SPEC IMPLEMENTATIONS
# =============================================================================


@final
class SimpleWorkflowDefinition(
WorkflowDefinition[_RDT, _PDT],
Generic[_RDT, _PDT],
):
"""A simple :class:`WorkflowDefinition` implementation."""

def __init__(
self,
id: str, # noqa: A002
name: str,
source_factory: Callable[[], Source[_RDT]],
description: str | None = None,
processor_factory: Callable[[], Processor[_RDT, _PDT]] = NOOPProcessor,
sink_factory: Callable[[], Sink[_PDT]] = NullSink,
) -> None:
"""Create a new ``WorkflowDefinition`` with the provided properties.

The ``id``, ``name`` and ``source_factory`` parameters are mandatory
and MUST be valid (see individual parameter docs for details).
Providing invalid parameters will lead to either a :exc:`TypeError` (
for values of the wrong type) or :exc:`ValueError` being raised.

:param id: A unique identifier to assign to the created workflow.
This MUST be a non-empty string.
:param name: A name to assign to the created workflow. This MUST be
a non-empty string.
:param source_factory: A function that suppliers the ``Source``
associated with the created workflow. This MUST be a valid
callable.
:param description: An optional description to assign to the created
workflow. This MUST be a valid string when NOT ``None``. Defaults
to ``None`` when not provided.
:param processor_factory: An optional function that suppliers the
``Processor`` associated with the created workflow. This MUST be a
valid callable. Defaults to ``NOOPProcessor`` when not provided.
:param sink_factory: A function that suppliers the ``Sink`` associated
with the created workflow. This MUST be a valid callable. Defaults
to ``NullSink`` when not provided.

:raise TypeError: If ``id`` or ``name`` are NOT strings, or if
``description`` is provided but is NOT a string.
:raise ValueError: If one of the following is ``True``; ``id`` is an
empty string, ``name`` is an empty string, ``source_factory`` is
NOT a valid callable, ``processor_factory`` is NOT a valid callable
or ``sink_factory`` is NOT a valid callable.
"""
super().__init__()
self._id: str = ensure_not_none_nor_empty(
value=ensure_instance_of(
value=id,
klass=str,
message="'id' MUST be a string.",
),
message="'id' MUST NOT be an empty string.",
)
self._name: str = ensure_not_none_nor_empty(
value=ensure_instance_of(
value=name,
klass=str,
message="'name' MUST be a string.",
),
message="'name' MUST NOT be an empty string.",
)
self._source_factory: Callable[[], Source[_RDT]] = ensure_callable(
value=source_factory,
message="'source_factory' MUST be a callable object.",
)
self._description: str | None = ensure_optional_instance_of(
value=description,
klass=str,
message="'description' MUST be a string when NOT None.",
)
self._processor_factory: Callable[[], Processor[_RDT, _PDT]]
self._processor_factory = ensure_callable(
value=processor_factory,
message="'processor_factory' MUST be a callable object.",
)
self._sink_factory: Callable[[], Sink[_PDT]] = ensure_callable(
value=sink_factory,
message="'sink_factory' MUST be a callable object.",
)

@property
@override
def id(self) -> str:
return self._id

@property
@override
def name(self) -> str:
return self._name

@property
@override
def description(self) -> str | None:
return self._description

@property
@override
def source_factory(self) -> Callable[[], Source[_RDT]]:
return self._source_factory

@property
@override
def processor_factory(self) -> Callable[[], Processor[_RDT, _PDT]]:
return self._processor_factory

@property
@override
def sink_factory(self) -> Callable[[], Sink[_PDT]]:
return self._sink_factory


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"SimpleWorkflowDefinition",
]
Loading
Loading