Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING CHANGE: (#31) Removing the special nodes classes #33

Merged
merged 13 commits into from
Sep 22, 2024
18 changes: 3 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,17 @@
## Table of Contents

- [Usage](#usage)
- [Что нужно, чтобы сделать свой пайплайн?](#что-нужно-чтобы-сделать-свой-пайплайн)
- [Поддерживаемые типы узлов](#поддерживаемые-типы-узлов)
- [Development](#development)
- [Environment setup](#environment-setup)


## Usage
### Что нужно, чтобы сделать свой пайплайн?

1. Написать классы узлов
2. Связать узлы посредством указания зависимости
To create a pipeline:
1. Define classes that represent nodes
2. Connect nodes by defining the parent node(-s) or no parent for each node


### Поддерживаемые типы узлов

[Протоколы](ml_pipeline_engine/types.py)

1. [DataSource](ml_pipeline_engine/base_nodes/datasources.py)
2. [FeatureBase](ml_pipeline_engine/base_nodes/feature.py)
3. [MLModelBase](ml_pipeline_engine/base_nodes/ml_model.py)
4. [ProcessorBase](ml_pipeline_engine/base_nodes/processors.py)
5. [FeatureVectorizerBase](ml_pipeline_engine/base_nodes/vectorizer.py)

Примеры использования описаны в файле [docs/usage_examples.md](docs/usage_examples.md)

## Development
Expand Down
Empty file.
15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/datasources.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/feature.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/ml_model.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/vectorizer.py

This file was deleted.

7 changes: 4 additions & 3 deletions ml_pipeline_engine/chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import EventManagerLike
from ml_pipeline_engine.types import ModelName
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import PipelineChartLike
from ml_pipeline_engine.types import PipelineId
from ml_pipeline_engine.types import PipelineResult

NodeResultT = t.TypeVar('NodeResultT')

Entrypoint = t.Optional[t.Union[NodeLike[NodeResultT], DAGLike[NodeResultT]]]
Entrypoint = t.Optional[t.Union[NodeBase[NodeResultT], DAGLike[NodeResultT]]]


@dataclass(frozen=True, repr=False)
Expand All @@ -30,7 +31,7 @@ class PipelineChartBase:


@dataclass(frozen=True, repr=False)
class PipelineChart(PipelineChartBase):
class PipelineChart(PipelineChartBase, PipelineChartLike):
"""
Основная реализация определения пайплайна ML-модели
"""
Expand Down
3 changes: 2 additions & 1 deletion ml_pipeline_engine/context/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from ml_pipeline_engine.types import ModelName
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import PipelineChartLike
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import PipelineId


class DAGPipelineContext(EventSourceMixin):
class DAGPipelineContext(EventSourceMixin, PipelineContextLike):
"""
Контекст выполнения пайплайна ML-модели
"""
Expand Down
4 changes: 2 additions & 2 deletions ml_pipeline_engine/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from ml_pipeline_engine.parallelism import threads_pool_registry
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import DAGRunManagerLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeResultT
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import RetryPolicyLike
Expand All @@ -23,7 +23,7 @@ class DAG(DAGLike):
output_node: NodeId
is_process_pool_needed: bool
is_thread_pool_needed: bool
node_map: t.Dict[NodeId, NodeLike]
node_map: t.Dict[NodeId, NodeBase]
retry_policy: t.Type[RetryPolicyLike] = NodeRetryPolicy
run_manager: t.Type[DAGRunManagerLike] = DAGRunConcurrentManager

Expand Down
39 changes: 21 additions & 18 deletions ml_pipeline_engine/dag_builders/annotation/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import RecurrentProtocol

__all__ = [
Expand All @@ -39,7 +38,7 @@
class AnnotationDAGBuilder:
def __init__(self) -> None:
self._dag = DiGraph(name='main-graph')
self._node_map: t.Dict[NodeId, NodeLike] = dict()
self._node_map: t.Dict[NodeId, NodeBase] = dict()
self._recurrent_sub_graphs: t.List[t.Tuple[NodeId, NodeId]] = []
self._synthetic_nodes: t.List[NodeId] = []

Expand All @@ -49,22 +48,21 @@ def _check_annotations(obj: t.Any) -> None:
Проверка наличия аннотаций типов у переданного объекта.
В случае, если есть хотя бы один не типизированный параметр, будет ошибка.
"""
run_method = get_callable_run_method(obj)

obj = get_callable_run_method(obj)

annotations = getattr(obj, '__annotations__', None)
annotations = getattr(run_method, '__annotations__', None)
parameters = [
(name, bool(parameter.empty))
for name, parameter in inspect.signature(obj).parameters.items()
for name, parameter in inspect.signature(run_method).parameters.items()
if name not in ('self', 'args', 'kwargs')
]

if not annotations and parameters:
raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={obj}')
raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={run_method}')

for name, is_empty in parameters:
if is_empty and name not in annotations:
raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={obj}')
raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={run_method}')

@staticmethod
def _check_base_class(node: t.Any) -> None:
Expand All @@ -80,7 +78,7 @@ def _check_base_class(node: t.Any) -> None:
f'У объекта не существует корректного базового класса, пригодного для графа. node={node}',
)

def validate_node(self, node: NodeLike) -> None:
def validate_node(self, node: NodeBase) -> None:
"""
Валидация ноды по разным правилам
"""
Expand All @@ -89,7 +87,7 @@ def validate_node(self, node: NodeLike) -> None:
self._check_annotations(node)

@staticmethod
def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]:
def _get_input_marks_map(node: NodeBase) -> t.List[NodeInputSpec]:
"""
Получение меток зависимостей для входных kwarg-ов узла
"""
Expand All @@ -112,7 +110,7 @@ def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]:

return inputs

def _add_node_to_map(self, node: NodeLike) -> None:
def _add_node_to_map(self, node: NodeBase) -> None:
"""
Добавление узла в мэппинг "Имя узла -> Класс/функция узла"
"""
Expand All @@ -137,15 +135,15 @@ def _add_switch_node(self, node_id: NodeId, switch_decide_node_id: NodeId) -> No
self._dag.add_node(node_id, **{NodeField.is_switch: True})
self._dag.add_edge(switch_decide_node_id, node_id, **{EdgeField.is_switch: True})

def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: NodeLike): # noqa
def _traverse_breadth_first_to_dag(self, input_node: NodeBase, output_node: NodeBase): # noqa
"""
Выполнить обход зависимостей классов/функций узлов, построить граф
"""

visited = {output_node}
stack = deque([output_node])

def _set_visited(node: NodeLike) -> None:
def _set_visited(node: NodeBase) -> None:
if node in visited:
return

Expand Down Expand Up @@ -299,7 +297,7 @@ def _is_executor_needed(self) -> t.Tuple[bool, bool]:

return is_process_pool_needed, is_thread_pool_needed

def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike:
def build(self, input_node: NodeBase, output_node: NodeBase = None) -> DAGLike:
"""
Построить граф путем сборки зависимостей по аннотациям типа (меткам входов)
"""
Expand Down Expand Up @@ -327,8 +325,8 @@ def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike:


def build_dag(
input_node: NodeLike[t.Any],
output_node: NodeLike[NodeResultT],
input_node: NodeBase[t.Any],
output_node: NodeBase[NodeResultT],
) -> DAGLike[NodeResultT]:
"""
Построить граф путем сборки зависимостей по аннотациям типа (меткам входов)
Expand All @@ -347,7 +345,9 @@ def build_dag(
)


def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]:
def build_dag_single(
node: NodeBase[NodeResultT],
) -> DAGLike[NodeResultT]:
"""
Построить граф из одного узла

Expand All @@ -357,4 +357,7 @@ def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]:
Returns:
Граф
"""
return AnnotationDAGBuilder().build(input_node=node, output_node=None)
return (
AnnotationDAGBuilder()
.build(input_node=node, output_node=None)
)
34 changes: 17 additions & 17 deletions ml_pipeline_engine/dag_builders/annotation/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,75 @@
from dataclasses import dataclass

from ml_pipeline_engine.types import CaseLabel
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeBase

NodeResultT = t.TypeVar('NodeResultT')


@dataclass(frozen=True)
class InputGenericMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


@dataclass(frozen=True)
class InputMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


@dataclass(frozen=True)
class InputOneOfMark:
nodes: t.List[NodeLike[t.Any]]
nodes: t.List[NodeBase[t.Any]]


def InputOneOf(nodes: t.List[NodeLike[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def InputOneOf(nodes: t.List[NodeBase[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
"""
Принимает список нод, возвращает результат первой успешно выполненной ноды
"""
return t.cast(t.Any, InputOneOfMark(nodes))


def InputGeneric(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def InputGeneric(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, InputGenericMark(node))


def Input(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def Input(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, InputMark(node))


@dataclass(frozen=True)
class GenericInputMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


def GenericInput(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def GenericInput(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, GenericInputMark(node))


@dataclass(frozen=True)
class SwitchCaseMark:
switch: NodeLike[t.Any]
cases: t.List[t.Tuple[str, NodeLike]]
switch: NodeBase[t.Any]
cases: t.List[t.Tuple[str, NodeBase]]
name: str


def SwitchCase( # noqa: N802,RUF100
switch: NodeLike[t.Any],
cases: t.List[t.Tuple[CaseLabel, NodeLike[NodeResultT]]],
switch: NodeBase[t.Any],
cases: t.List[t.Tuple[CaseLabel, NodeBase[NodeResultT]]],
name: t.Optional[str] = None,
) -> t.Type[NodeResultT]:
return t.cast(t.Any, SwitchCaseMark(switch, cases, name))


@dataclass(frozen=True)
class RecurrentSubGraphMark:
start_node: NodeLike[NodeResultT]
dest_node: NodeLike[NodeResultT]
start_node: NodeBase[NodeResultT]
dest_node: NodeBase[NodeResultT]
max_iterations: int


def RecurrentSubGraph( # noqa: N802,RUF100
start_node: t.Type[NodeLike[NodeResultT]],
dest_node: t.Type[NodeLike[NodeResultT]],
start_node: t.Type[NodeBase[NodeResultT]],
dest_node: t.Type[NodeBase[NodeResultT]],
max_iterations: int,
) -> t.Type[NodeResultT]:
"""
Expand Down
Loading