Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING CHANGE: (#31) Removing the special nodes classes #33

Merged
merged 13 commits into from
Sep 22, 2024
18 changes: 3 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,17 @@
## Table of Contents

- [Usage](#usage)
- [Что нужно, чтобы сделать свой пайплайн?](#что-нужно-чтобы-сделать-свой-пайплайн)
- [Поддерживаемые типы узлов](#поддерживаемые-типы-узлов)
- [Development](#development)
- [Environment setup](#environment-setup)


## Usage
### Что нужно, чтобы сделать свой пайплайн?

1. Написать классы узлов
2. Связать узлы посредством указания зависимости
To create a pipeline:
1. Define classes that represent nodes
2. Connect nodes by defining the parent node(-s) or no parent for each node


### Поддерживаемые типы узлов

[Протоколы](ml_pipeline_engine/types.py)

1. [DataSource](ml_pipeline_engine/base_nodes/datasources.py)
2. [FeatureBase](ml_pipeline_engine/base_nodes/feature.py)
3. [MLModelBase](ml_pipeline_engine/base_nodes/ml_model.py)
4. [ProcessorBase](ml_pipeline_engine/base_nodes/processors.py)
5. [FeatureVectorizerBase](ml_pipeline_engine/base_nodes/vectorizer.py)

Примеры использования описаны в файле [docs/usage_examples.md](docs/usage_examples.md)

## Development
Expand Down
Empty file.
15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/datasources.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/feature.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/ml_model.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/vectorizer.py

This file was deleted.

7 changes: 4 additions & 3 deletions ml_pipeline_engine/chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import EventManagerLike
from ml_pipeline_engine.types import ModelName
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import PipelineChartLike
from ml_pipeline_engine.types import PipelineId
from ml_pipeline_engine.types import PipelineResult

NodeResultT = t.TypeVar('NodeResultT')

Entrypoint = t.Optional[t.Union[NodeLike[NodeResultT], DAGLike[NodeResultT]]]
Entrypoint = t.Optional[t.Union[NodeBase[NodeResultT], DAGLike[NodeResultT]]]


@dataclass(frozen=True, repr=False)
Expand All @@ -30,7 +31,7 @@ class PipelineChartBase:


@dataclass(frozen=True, repr=False)
class PipelineChart(PipelineChartBase):
class PipelineChart(PipelineChartBase, PipelineChartLike):
"""
Основная реализация определения пайплайна ML-модели
"""
Expand Down
3 changes: 2 additions & 1 deletion ml_pipeline_engine/context/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from ml_pipeline_engine.types import ModelName
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import PipelineChartLike
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import PipelineId


class DAGPipelineContext(EventSourceMixin):
class DAGPipelineContext(EventSourceMixin, PipelineContextLike):
"""
Контекст выполнения пайплайна ML-модели
"""
Expand Down
4 changes: 2 additions & 2 deletions ml_pipeline_engine/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from ml_pipeline_engine.parallelism import threads_pool_registry
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import DAGRunManagerLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeResultT
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import RetryPolicyLike
Expand All @@ -23,7 +23,7 @@ class DAG(DAGLike):
output_node: NodeId
is_process_pool_needed: bool
is_thread_pool_needed: bool
node_map: t.Dict[NodeId, NodeLike]
node_map: t.Dict[NodeId, NodeBase]
retry_policy: t.Type[RetryPolicyLike] = NodeRetryPolicy
run_manager: t.Type[DAGRunManagerLike] = DAGRunConcurrentManager

Expand Down
28 changes: 16 additions & 12 deletions ml_pipeline_engine/dag_builders/annotation/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import RecurrentProtocol

__all__ = [
Expand All @@ -39,7 +38,7 @@
class AnnotationDAGBuilder:
def __init__(self) -> None:
self._dag = DiGraph(name='main-graph')
self._node_map: t.Dict[NodeId, NodeLike] = dict()
self._node_map: t.Dict[NodeId, NodeBase] = dict()
self._recurrent_sub_graphs: t.List[t.Tuple[NodeId, NodeId]] = []
self._synthetic_nodes: t.List[NodeId] = []

Expand Down Expand Up @@ -80,7 +79,7 @@ def _check_base_class(node: t.Any) -> None:
f'У объекта не существует корректного базового класса, пригодного для графа. node={node}',
)

def validate_node(self, node: NodeLike) -> None:
def validate_node(self, node: NodeBase) -> None:
"""
Валидация ноды по разным правилам
"""
Expand All @@ -89,7 +88,7 @@ def validate_node(self, node: NodeLike) -> None:
self._check_annotations(node)

@staticmethod
def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]:
def _get_input_marks_map(node: NodeBase) -> t.List[NodeInputSpec]:
"""
Получение меток зависимостей для входных kwarg-ов узла
"""
Expand All @@ -112,7 +111,7 @@ def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]:

return inputs

def _add_node_to_map(self, node: NodeLike) -> None:
def _add_node_to_map(self, node: NodeBase) -> None:
"""
Добавление узла в мэппинг "Имя узла -> Класс/функция узла"
"""
Expand All @@ -137,15 +136,15 @@ def _add_switch_node(self, node_id: NodeId, switch_decide_node_id: NodeId) -> No
self._dag.add_node(node_id, **{NodeField.is_switch: True})
self._dag.add_edge(switch_decide_node_id, node_id, **{EdgeField.is_switch: True})

def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: NodeLike): # noqa
def _traverse_breadth_first_to_dag(self, input_node: NodeBase, output_node: NodeBase): # noqa
"""
Выполнить обход зависимостей классов/функций узлов, построить граф
"""

visited = {output_node}
stack = deque([output_node])

def _set_visited(node: NodeLike) -> None:
def _set_visited(node: NodeBase) -> None:
if node in visited:
return

Expand Down Expand Up @@ -299,7 +298,7 @@ def _is_executor_needed(self) -> t.Tuple[bool, bool]:

return is_process_pool_needed, is_thread_pool_needed

def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike:
def build(self, input_node: NodeBase, output_node: NodeBase = None) -> DAGLike:
"""
Построить граф путем сборки зависимостей по аннотациям типа (меткам входов)
"""
Expand Down Expand Up @@ -327,8 +326,8 @@ def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike:


def build_dag(
input_node: NodeLike[t.Any],
output_node: NodeLike[NodeResultT],
input_node: NodeBase[t.Any],
output_node: NodeBase[NodeResultT],
) -> DAGLike[NodeResultT]:
"""
Построить граф путем сборки зависимостей по аннотациям типа (меткам входов)
Expand All @@ -347,7 +346,9 @@ def build_dag(
)


def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]:
def build_dag_single(
node: NodeBase[NodeResultT],
) -> DAGLike[NodeResultT]:
"""
Построить граф из одного узла

Expand All @@ -357,4 +358,7 @@ def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]:
Returns:
Граф
"""
return AnnotationDAGBuilder().build(input_node=node, output_node=None)
return (
AnnotationDAGBuilder()
.build(input_node=node, output_node=None)
)
34 changes: 17 additions & 17 deletions ml_pipeline_engine/dag_builders/annotation/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,75 @@
from dataclasses import dataclass

from ml_pipeline_engine.types import CaseLabel
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeBase

NodeResultT = t.TypeVar('NodeResultT')


@dataclass(frozen=True)
class InputGenericMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


@dataclass(frozen=True)
class InputMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


@dataclass(frozen=True)
class InputOneOfMark:
nodes: t.List[NodeLike[t.Any]]
nodes: t.List[NodeBase[t.Any]]


def InputOneOf(nodes: t.List[NodeLike[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def InputOneOf(nodes: t.List[NodeBase[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
"""
Принимает список нод, возвращает результат первой успешно выполненной ноды
"""
return t.cast(t.Any, InputOneOfMark(nodes))


def InputGeneric(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def InputGeneric(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, InputGenericMark(node))


def Input(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def Input(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, InputMark(node))


@dataclass(frozen=True)
class GenericInputMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


def GenericInput(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def GenericInput(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, GenericInputMark(node))


@dataclass(frozen=True)
class SwitchCaseMark:
switch: NodeLike[t.Any]
cases: t.List[t.Tuple[str, NodeLike]]
switch: NodeBase[t.Any]
cases: t.List[t.Tuple[str, NodeBase]]
name: str


def SwitchCase( # noqa: N802,RUF100
switch: NodeLike[t.Any],
cases: t.List[t.Tuple[CaseLabel, NodeLike[NodeResultT]]],
switch: NodeBase[t.Any],
cases: t.List[t.Tuple[CaseLabel, NodeBase[NodeResultT]]],
name: t.Optional[str] = None,
) -> t.Type[NodeResultT]:
return t.cast(t.Any, SwitchCaseMark(switch, cases, name))


@dataclass(frozen=True)
class RecurrentSubGraphMark:
start_node: NodeLike[NodeResultT]
dest_node: NodeLike[NodeResultT]
start_node: NodeBase[NodeResultT]
dest_node: NodeBase[NodeResultT]
max_iterations: int


def RecurrentSubGraph( # noqa: N802,RUF100
start_node: t.Type[NodeLike[NodeResultT]],
dest_node: t.Type[NodeLike[NodeResultT]],
start_node: t.Type[NodeBase[NodeResultT]],
dest_node: t.Type[NodeBase[NodeResultT]],
max_iterations: int,
) -> t.Type[NodeResultT]:
"""
Expand Down
15 changes: 0 additions & 15 deletions ml_pipeline_engine/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,9 @@

from ml_pipeline_engine.types import EventManagerLike
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import PipelineResult


class EventManagerBase:
async def on_pipeline_start(self, ctx: PipelineContextLike) -> None:
...

async def on_pipeline_complete(self, ctx: PipelineContextLike, result: PipelineResult) -> None:
...

async def on_node_start(self, ctx: PipelineContextLike, node_id: NodeId) -> None:
...

async def on_node_complete(self, ctx: PipelineContextLike, node_id: NodeId, error: t.Optional[Exception]) -> None:
...


class EventSourceMixin:
_get_event_managers: t.Callable[..., t.List[t.Type[EventManagerLike]]]

Expand Down
1 change: 1 addition & 0 deletions ml_pipeline_engine/node/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ml_pipeline_engine.node.base_nodes import * # noqa
from ml_pipeline_engine.node.enums import * # noqa
from ml_pipeline_engine.node.errors import * # noqa
from ml_pipeline_engine.node.node import * # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ml_pipeline_engine.node.enums import NodeType
from ml_pipeline_engine.types import AdditionalDataT
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeResultT
from ml_pipeline_engine.types import Recurrent
from ml_pipeline_engine.types import RecurrentProtocol

Expand All @@ -14,7 +15,7 @@ class ProcessorBase(NodeBase):

node_type = NodeType.processor.value

def process(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
def process(self, *args: t.Any, **kwargs: t.Any) -> NodeResultT:
raise NotImplementedError('Method process() is not implemented')


Expand Down
Loading