diff --git a/README.md b/README.md index ed4a4f2..cea9ad7 100644 --- a/README.md +++ b/README.md @@ -5,29 +5,17 @@ ## Table of Contents - [Usage](#usage) - - [Что нужно, чтобы сделать свой пайплайн?](#что-нужно-чтобы-сделать-свой-пайплайн) - - [Поддерживаемые типы узлов](#поддерживаемые-типы-узлов) - [Development](#development) - [Environment setup](#environment-setup) ## Usage -### Что нужно, чтобы сделать свой пайплайн? -1. Написать классы узлов -2. Связать узлы посредством указания зависимости +To create a pipeline: +1. Define classes that represent nodes +2. Connect nodes by defining the parent node(-s) or no parent for each node -### Поддерживаемые типы узлов - -[Протоколы](ml_pipeline_engine/types.py) - -1. [DataSource](ml_pipeline_engine/base_nodes/datasources.py) -2. [FeatureBase](ml_pipeline_engine/base_nodes/feature.py) -3. [MLModelBase](ml_pipeline_engine/base_nodes/ml_model.py) -4. [ProcessorBase](ml_pipeline_engine/base_nodes/processors.py) -5. [FeatureVectorizerBase](ml_pipeline_engine/base_nodes/vectorizer.py) - Примеры использования описаны в файле [docs/usage_examples.md](docs/usage_examples.md) ## Development diff --git a/ml_pipeline_engine/base_nodes/__init__.py b/ml_pipeline_engine/base_nodes/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ml_pipeline_engine/base_nodes/datasources.py b/ml_pipeline_engine/base_nodes/datasources.py deleted file mode 100644 index 8a8f349..0000000 --- a/ml_pipeline_engine/base_nodes/datasources.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class DataSource(NodeBase): - """ - Базовый класс для источников данных - """ - - node_type = NodeType.datasource.value - - def collect(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method collect() is not implemented') diff --git a/ml_pipeline_engine/base_nodes/feature.py b/ml_pipeline_engine/base_nodes/feature.py deleted file mode 100644 index 0dcf2c0..0000000 --- a/ml_pipeline_engine/base_nodes/feature.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class FeatureBase(NodeBase): - """ - Базовый класс для набора фичей - """ - - node_type = NodeType.feature.value - - def extract(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method extract() is not implemented') diff --git a/ml_pipeline_engine/base_nodes/ml_model.py b/ml_pipeline_engine/base_nodes/ml_model.py deleted file mode 100644 index 48fb0f5..0000000 --- a/ml_pipeline_engine/base_nodes/ml_model.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class MLModelBase(NodeBase): - """ - Базовый класс для ML-моделей - """ - - node_type = NodeType.ml_model.value - - def predict(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method predict() is not implemented') diff --git a/ml_pipeline_engine/base_nodes/vectorizer.py b/ml_pipeline_engine/base_nodes/vectorizer.py deleted file mode 100644 index 07bf1f9..0000000 --- a/ml_pipeline_engine/base_nodes/vectorizer.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class FeatureVectorizerBase(NodeBase): - """ - Базовый класс для векторизаторов - """ - - node_type = NodeType.vectorizer.value - - def vectorize(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method vectorize() is not implemented') diff --git a/ml_pipeline_engine/chart.py b/ml_pipeline_engine/chart.py index e8c6d27..18b56c6 100644 --- a/ml_pipeline_engine/chart.py +++ b/ml_pipeline_engine/chart.py @@ -8,13 +8,14 @@ from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import EventManagerLike from ml_pipeline_engine.types import ModelName -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase +from ml_pipeline_engine.types import PipelineChartLike from ml_pipeline_engine.types import PipelineId from ml_pipeline_engine.types import PipelineResult NodeResultT = t.TypeVar('NodeResultT') -Entrypoint = t.Optional[t.Union[NodeLike[NodeResultT], DAGLike[NodeResultT]]] +Entrypoint = t.Optional[t.Union[NodeBase[NodeResultT], DAGLike[NodeResultT]]] @dataclass(frozen=True, repr=False) @@ -30,7 +31,7 @@ class PipelineChartBase: @dataclass(frozen=True, repr=False) -class PipelineChart(PipelineChartBase): +class PipelineChart(PipelineChartBase, PipelineChartLike): """ Основная реализация определения пайплайна ML-модели """ diff --git a/ml_pipeline_engine/context/dag.py b/ml_pipeline_engine/context/dag.py index 70131b3..bc7969b 100644 --- a/ml_pipeline_engine/context/dag.py +++ b/ml_pipeline_engine/context/dag.py @@ -9,10 +9,11 @@ from ml_pipeline_engine.types import ModelName from ml_pipeline_engine.types import NodeId from ml_pipeline_engine.types import PipelineChartLike +from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineId -class DAGPipelineContext(EventSourceMixin): +class DAGPipelineContext(EventSourceMixin, PipelineContextLike): """ Контекст выполнения пайплайна ML-модели """ diff --git a/ml_pipeline_engine/dag/dag.py b/ml_pipeline_engine/dag/dag.py index 22b7e38..cf7f1e0 100644 --- a/ml_pipeline_engine/dag/dag.py +++ b/ml_pipeline_engine/dag/dag.py @@ -9,8 +9,8 @@ from ml_pipeline_engine.parallelism import threads_pool_registry from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import DAGRunManagerLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike from ml_pipeline_engine.types import NodeResultT from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import RetryPolicyLike @@ -23,7 +23,7 @@ class DAG(DAGLike): output_node: NodeId is_process_pool_needed: bool is_thread_pool_needed: bool - node_map: t.Dict[NodeId, NodeLike] + node_map: t.Dict[NodeId, NodeBase] retry_policy: t.Type[RetryPolicyLike] = NodeRetryPolicy run_manager: t.Type[DAGRunManagerLike] = DAGRunConcurrentManager diff --git a/ml_pipeline_engine/dag_builders/annotation/builder.py b/ml_pipeline_engine/dag_builders/annotation/builder.py index b62d182..fd68f52 100644 --- a/ml_pipeline_engine/dag_builders/annotation/builder.py +++ b/ml_pipeline_engine/dag_builders/annotation/builder.py @@ -21,7 +21,6 @@ from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike from ml_pipeline_engine.types import RecurrentProtocol __all__ = [ @@ -39,7 +38,7 @@ class AnnotationDAGBuilder: def __init__(self) -> None: self._dag = DiGraph(name='main-graph') - self._node_map: t.Dict[NodeId, NodeLike] = dict() + self._node_map: t.Dict[NodeId, NodeBase] = dict() self._recurrent_sub_graphs: t.List[t.Tuple[NodeId, NodeId]] = [] self._synthetic_nodes: t.List[NodeId] = [] @@ -49,22 +48,21 @@ def _check_annotations(obj: t.Any) -> None: Проверка наличия аннотаций типов у переданного объекта. В случае, если есть хотя бы один не типизированный параметр, будет ошибка. """ + run_method = get_callable_run_method(obj) - obj = get_callable_run_method(obj) - - annotations = getattr(obj, '__annotations__', None) + annotations = getattr(run_method, '__annotations__', None) parameters = [ (name, bool(parameter.empty)) - for name, parameter in inspect.signature(obj).parameters.items() + for name, parameter in inspect.signature(run_method).parameters.items() if name not in ('self', 'args', 'kwargs') ] if not annotations and parameters: - raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={obj}') + raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={run_method}') for name, is_empty in parameters: if is_empty and name not in annotations: - raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={obj}') + raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={run_method}') @staticmethod def _check_base_class(node: t.Any) -> None: @@ -80,7 +78,7 @@ def _check_base_class(node: t.Any) -> None: f'У объекта не существует корректного базового класса, пригодного для графа. node={node}', ) - def validate_node(self, node: NodeLike) -> None: + def validate_node(self, node: NodeBase) -> None: """ Валидация ноды по разным правилам """ @@ -89,7 +87,7 @@ def validate_node(self, node: NodeLike) -> None: self._check_annotations(node) @staticmethod - def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]: + def _get_input_marks_map(node: NodeBase) -> t.List[NodeInputSpec]: """ Получение меток зависимостей для входных kwarg-ов узла """ @@ -112,7 +110,7 @@ def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]: return inputs - def _add_node_to_map(self, node: NodeLike) -> None: + def _add_node_to_map(self, node: NodeBase) -> None: """ Добавление узла в мэппинг "Имя узла -> Класс/функция узла" """ @@ -137,7 +135,7 @@ def _add_switch_node(self, node_id: NodeId, switch_decide_node_id: NodeId) -> No self._dag.add_node(node_id, **{NodeField.is_switch: True}) self._dag.add_edge(switch_decide_node_id, node_id, **{EdgeField.is_switch: True}) - def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: NodeLike): # noqa + def _traverse_breadth_first_to_dag(self, input_node: NodeBase, output_node: NodeBase): # noqa """ Выполнить обход зависимостей классов/функций узлов, построить граф """ @@ -145,7 +143,7 @@ def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: Node visited = {output_node} stack = deque([output_node]) - def _set_visited(node: NodeLike) -> None: + def _set_visited(node: NodeBase) -> None: if node in visited: return @@ -299,7 +297,7 @@ def _is_executor_needed(self) -> t.Tuple[bool, bool]: return is_process_pool_needed, is_thread_pool_needed - def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike: + def build(self, input_node: NodeBase, output_node: NodeBase = None) -> DAGLike: """ Построить граф путем сборки зависимостей по аннотациям типа (меткам входов) """ @@ -327,8 +325,8 @@ def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike: def build_dag( - input_node: NodeLike[t.Any], - output_node: NodeLike[NodeResultT], + input_node: NodeBase[t.Any], + output_node: NodeBase[NodeResultT], ) -> DAGLike[NodeResultT]: """ Построить граф путем сборки зависимостей по аннотациям типа (меткам входов) @@ -347,7 +345,9 @@ def build_dag( ) -def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]: +def build_dag_single( + node: NodeBase[NodeResultT], +) -> DAGLike[NodeResultT]: """ Построить граф из одного узла @@ -357,4 +357,7 @@ def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]: Returns: Граф """ - return AnnotationDAGBuilder().build(input_node=node, output_node=None) + return ( + AnnotationDAGBuilder() + .build(input_node=node, output_node=None) + ) diff --git a/ml_pipeline_engine/dag_builders/annotation/marks.py b/ml_pipeline_engine/dag_builders/annotation/marks.py index 1349d5a..5ff24e5 100644 --- a/ml_pipeline_engine/dag_builders/annotation/marks.py +++ b/ml_pipeline_engine/dag_builders/annotation/marks.py @@ -2,60 +2,60 @@ from dataclasses import dataclass from ml_pipeline_engine.types import CaseLabel -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase NodeResultT = t.TypeVar('NodeResultT') @dataclass(frozen=True) class InputGenericMark: - node: NodeLike[t.Any] + node: NodeBase[t.Any] @dataclass(frozen=True) class InputMark: - node: NodeLike[t.Any] + node: NodeBase[t.Any] @dataclass(frozen=True) class InputOneOfMark: - nodes: t.List[NodeLike[t.Any]] + nodes: t.List[NodeBase[t.Any]] -def InputOneOf(nodes: t.List[NodeLike[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def InputOneOf(nodes: t.List[NodeBase[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 """ Принимает список нод, возвращает результат первой успешно выполненной ноды """ return t.cast(t.Any, InputOneOfMark(nodes)) -def InputGeneric(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def InputGeneric(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 return t.cast(t.Any, InputGenericMark(node)) -def Input(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def Input(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 return t.cast(t.Any, InputMark(node)) @dataclass(frozen=True) class GenericInputMark: - node: NodeLike[t.Any] + node: NodeBase[t.Any] -def GenericInput(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def GenericInput(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 return t.cast(t.Any, GenericInputMark(node)) @dataclass(frozen=True) class SwitchCaseMark: - switch: NodeLike[t.Any] - cases: t.List[t.Tuple[str, NodeLike]] + switch: NodeBase[t.Any] + cases: t.List[t.Tuple[str, NodeBase]] name: str def SwitchCase( # noqa: N802,RUF100 - switch: NodeLike[t.Any], - cases: t.List[t.Tuple[CaseLabel, NodeLike[NodeResultT]]], + switch: NodeBase[t.Any], + cases: t.List[t.Tuple[CaseLabel, NodeBase[NodeResultT]]], name: t.Optional[str] = None, ) -> t.Type[NodeResultT]: return t.cast(t.Any, SwitchCaseMark(switch, cases, name)) @@ -63,14 +63,14 @@ def SwitchCase( # noqa: N802,RUF100 @dataclass(frozen=True) class RecurrentSubGraphMark: - start_node: NodeLike[NodeResultT] - dest_node: NodeLike[NodeResultT] + start_node: NodeBase[NodeResultT] + dest_node: NodeBase[NodeResultT] max_iterations: int def RecurrentSubGraph( # noqa: N802,RUF100 - start_node: t.Type[NodeLike[NodeResultT]], - dest_node: t.Type[NodeLike[NodeResultT]], + start_node: t.Type[NodeBase[NodeResultT]], + dest_node: t.Type[NodeBase[NodeResultT]], max_iterations: int, ) -> t.Type[NodeResultT]: """ diff --git a/ml_pipeline_engine/events.py b/ml_pipeline_engine/events.py index 8d80f6a..28ec124 100644 --- a/ml_pipeline_engine/events.py +++ b/ml_pipeline_engine/events.py @@ -2,24 +2,9 @@ from ml_pipeline_engine.types import EventManagerLike from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineResult -class EventManagerBase: - async def on_pipeline_start(self, ctx: PipelineContextLike) -> None: - ... - - async def on_pipeline_complete(self, ctx: PipelineContextLike, result: PipelineResult) -> None: - ... - - async def on_node_start(self, ctx: PipelineContextLike, node_id: NodeId) -> None: - ... - - async def on_node_complete(self, ctx: PipelineContextLike, node_id: NodeId, error: t.Optional[Exception]) -> None: - ... - - class EventSourceMixin: _get_event_managers: t.Callable[..., t.List[t.Type[EventManagerLike]]] diff --git a/ml_pipeline_engine/node/__init__.py b/ml_pipeline_engine/node/__init__.py index aa3c5ca..29919c2 100644 --- a/ml_pipeline_engine/node/__init__.py +++ b/ml_pipeline_engine/node/__init__.py @@ -1,3 +1,4 @@ +from ml_pipeline_engine.node.base_nodes import * # noqa from ml_pipeline_engine.node.enums import * # noqa from ml_pipeline_engine.node.errors import * # noqa from ml_pipeline_engine.node.node import * # noqa diff --git a/ml_pipeline_engine/base_nodes/processors.py b/ml_pipeline_engine/node/base_nodes.py similarity index 87% rename from ml_pipeline_engine/base_nodes/processors.py rename to ml_pipeline_engine/node/base_nodes.py index 7efa2c5..6ee74ed 100644 --- a/ml_pipeline_engine/base_nodes/processors.py +++ b/ml_pipeline_engine/node/base_nodes.py @@ -3,6 +3,7 @@ from ml_pipeline_engine.node.enums import NodeType from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import NodeBase +from ml_pipeline_engine.types import NodeResultT from ml_pipeline_engine.types import Recurrent from ml_pipeline_engine.types import RecurrentProtocol @@ -14,7 +15,7 @@ class ProcessorBase(NodeBase): node_type = NodeType.processor.value - def process(self, *args: t.Any, **kwargs: t.Any) -> t.Any: + def process(self, *args: t.Any, **kwargs: t.Any) -> NodeResultT: raise NotImplementedError('Method process() is not implemented') diff --git a/ml_pipeline_engine/node/enums.py b/ml_pipeline_engine/node/enums.py index 95393e3..5027cb0 100644 --- a/ml_pipeline_engine/node/enums.py +++ b/ml_pipeline_engine/node/enums.py @@ -2,11 +2,7 @@ class NodeType(str, enum.Enum): - datasource = 'datasource' - feature = 'feature' - ml_model = 'ml_model' processor = 'processor' - vectorizer = 'vectorizer' generic = 'generic' switch = 'switch' input_one_of = 'input_one_of' diff --git a/ml_pipeline_engine/node/node.py b/ml_pipeline_engine/node/node.py index 8681660..20dda3d 100644 --- a/ml_pipeline_engine/node/node.py +++ b/ml_pipeline_engine/node/node.py @@ -15,7 +15,6 @@ from ml_pipeline_engine.parallelism import threads_pool_registry from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike NodeResultT = t.TypeVar('NodeResultT') @@ -28,7 +27,7 @@ def generate_node_id(prefix: str, name: t.Optional[str] = None) -> str: return f'{prefix}__{name if name is not None else uuid.uuid4().hex[-8:]}' -def get_node_id(node: NodeLike) -> NodeId: +def get_node_id(node: NodeBase) -> NodeId: node_type = node.node_type if getattr(node, 'node_type', None) else 'node' if getattr(node, 'name', None): @@ -39,36 +38,22 @@ def get_node_id(node: NodeLike) -> NodeId: return '__'.join([node_type, node_name]) -def get_run_method(node: NodeLike) -> t.Optional[str]: - run_method = None +def get_callable_run_method(node: NodeBase) -> t.Callable: + if not callable(getattr(node, 'process', None)): + raise RunMethodExpectedError('Missing method for node execution') - for method in NodeBase.RUN_METHOD_ALIASES: - if callable(getattr(node, method, None)): - if run_method is not None: - raise AssertionError(f'Node should have only one run method. {run_method} + {method} detected') - run_method = method + node = get_instance(node) + return node.process - return run_method - -def get_callable_run_method(node: NodeLike) -> t.Callable: - run_method_name = get_run_method(node) - - if run_method_name is not None: - node = get_instance(node) - return getattr(node, run_method_name) - - return node - - -def run_node_default(node: NodeLike[NodeResultT], **kwargs: t.Any) -> t.Type[NodeResultT]: +def run_node_default(node: NodeBase[NodeResultT], **kwargs: t.Any) -> t.Type[NodeResultT]: """ Get default value from the node """ return get_instance(node).get_default(**kwargs) -async def run_node(node: NodeLike[NodeResultT], *args: t.Any, node_id: NodeId, **kwargs: t.Any) -> t.Type[NodeResultT]: +async def run_node(node: NodeBase[NodeResultT], *args: t.Any, node_id: NodeId, **kwargs: t.Any) -> t.Type[NodeResultT]: """ Run a node in a specific way according to the node's tags """ @@ -107,14 +92,14 @@ async def run_node(node: NodeLike[NodeResultT], *args: t.Any, node_id: NodeId, * def build_node( - node: NodeLike, + node: NodeBase, node_name: t.Optional[str] = None, class_name: t.Optional[str] = None, atts: t.Optional[t.Dict[str, t.Any]] = None, attrs: t.Optional[t.Dict[str, t.Any]] = None, dependencies_default: t.Optional[t.Dict[str, t.Any]] = None, **target_dependencies: t.Any, -) -> t.Type[NodeLike]: +) -> t.Type[NodeBase]: """ Build new node that inherits all properties from the basic node. @@ -131,20 +116,17 @@ def build_node( if not inspect.isclass(node): raise ClassExpectedError('Для создания узла ожидается объекта класса') - run_method = get_run_method(node) - if not run_method: - raise RunMethodExpectedError( - f'Ожидается наличие хотя бы одного run-метода. methods={NodeBase.RUN_METHOD_ALIASES}', - ) - - if inspect.iscoroutinefunction(getattr(node, run_method)): + process_method = getattr(node, 'process', None) + if not callable(process_method): + raise RunMethodExpectedError('Missing method for node execution') + if inspect.iscoroutinefunction(process_method): async def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: - return await getattr(node, run_method)(*args, **kwargs, **(dependencies_default or {})) + return await process_method(*args, **kwargs, **(dependencies_default or {})) else: def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: - return getattr(node, run_method)(*args, **kwargs, **(dependencies_default or {})) + return process_method(*args, **kwargs, **(dependencies_default or {})) class_name = class_name or f'Generic{node.__name__}' created_node = type( @@ -152,7 +134,7 @@ def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: (node,), { # Меняем на lambda-функцию, чтобы убить ссылку на метод родительского класса. - run_method: class_method, + 'process': class_method, '__module__': __name__, '__generic_class__': node, 'name': node_name or node.name, @@ -160,7 +142,7 @@ def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: }, ) - method = getattr(created_node, run_method) + method = created_node.process method.__annotations__.update(target_dependencies) globals()[class_name] = created_node diff --git a/ml_pipeline_engine/node/retrying.py b/ml_pipeline_engine/node/retrying.py index f0213b0..7da106b 100644 --- a/ml_pipeline_engine/node/retrying.py +++ b/ml_pipeline_engine/node/retrying.py @@ -2,13 +2,13 @@ from typing import Tuple from typing import Type -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import RetryPolicyLike @dataclass(frozen=True) class NodeRetryPolicy(RetryPolicyLike): - node: NodeLike + node: NodeBase @property def delay(self) -> int: diff --git a/ml_pipeline_engine/types.py b/ml_pipeline_engine/types.py index fc8b152..ff661fa 100644 --- a/ml_pipeline_engine/types.py +++ b/ml_pipeline_engine/types.py @@ -5,12 +5,6 @@ import networkx as nx -ProcessorResultT = t.TypeVar('ProcessorResultT') -DataSourceResultT = t.TypeVar('DataSourceResultT') -FeatureResultT = t.TypeVar('FeatureResultT') -FeatureVectorizerResultT = t.TypeVar('FeatureVectorizerResultT') -MLModelResultT = t.TypeVar('MLModelResultT') - NodeResultT = t.TypeVar('NodeResultT') AdditionalDataT = t.TypeVar('AdditionalDataT', bound=t.Any) @@ -63,93 +57,20 @@ def next_iteration(self, data: t.Optional[AdditionalDataT]) -> Recurrent: """ -class NodeProtocol(t.Protocol): +class NodeBase(RetryProtocol, TagProtocol, t.Protocol[NodeResultT]): """ - Узел графа модели + Basic node interface """ - - RUN_METHOD_ALIASES = ( - 'process', - 'extract', - 'collect', - 'vectorize', - 'predict', - ) - node_type: t.ClassVar[str] = None name: t.ClassVar[str] = None - title: t.ClassVar[str] = None # TODO: Remove it in the future verbose_name: t.ClassVar[str] = None - -class NodeBase(NodeProtocol, RetryProtocol, TagProtocol): - pass - - -class ProcessorLike(RetryProtocol, TagProtocol, t.Protocol[ProcessorResultT]): - """ - Узел общего назначения - """ - process: t.Union[ - t.Callable[..., ProcessorResultT], - t.Callable[..., t.Awaitable[ProcessorResultT]], + t.Callable[..., NodeResultT], + t.Callable[..., t.Awaitable[NodeResultT]], ] -class DataSourceLike(RetryProtocol, TagProtocol, t.Protocol[DataSourceResultT]): - """ - Источник данных - """ - - collect: t.Union[ - t.Callable[..., DataSourceResultT], - t.Callable[..., t.Awaitable[DataSourceResultT]], - ] - - -class FeatureLike(RetryProtocol, TagProtocol, t.Protocol[FeatureResultT]): - """ - Фича - """ - - extract: t.Union[ - t.Callable[..., FeatureResultT], - t.Callable[..., t.Awaitable[FeatureResultT]], - ] - - -class FeatureVectorizerLike(RetryProtocol, TagProtocol, t.Protocol[FeatureVectorizerResultT]): - """ - Векторизатор фичей - """ - - vectorize: t.Union[ - t.Callable[..., FeatureVectorizerResultT], - t.Callable[..., t.Awaitable[FeatureVectorizerResultT]], - ] - - -class MLModelLike(RetryProtocol, TagProtocol, t.Protocol[MLModelResultT]): - """ - ML-модель - """ - - predict: t.Union[ - t.Callable[..., MLModelResultT], - t.Callable[..., t.Awaitable[MLModelResultT]], - ] - - -NodeLike = t.Union[ - t.Type[ProcessorLike[NodeResultT]], - t.Type[DataSourceLike[NodeResultT]], - t.Type[FeatureLike[NodeResultT]], - t.Type[FeatureVectorizerLike[NodeResultT]], - t.Type[MLModelLike[NodeResultT]], -] - - @dataclass(frozen=True) class PipelineResult(t.Generic[NodeResultT]): """ @@ -183,7 +104,7 @@ class PipelineChartLike(t.Protocol[NodeResultT]): """ model_name: ModelName - entrypoint: t.Optional[t.Union[NodeLike[NodeResultT], 'DAGLike[NodeResultT]']] + entrypoint: t.Optional[t.Union[NodeBase[NodeResultT], 'DAGLike[NodeResultT]']] event_managers: t.List[t.Type['EventManagerLike']] artifact_store: t.Optional[t.Type['ArtifactStoreLike']] @@ -286,7 +207,7 @@ def exists(self, node_id: NodeId) -> bool: class RetryPolicyLike(t.Protocol): - node: NodeLike + node: NodeBase @property @abc.abstractmethod @@ -325,7 +246,7 @@ class DAGLike(t.Protocol[NodeResultT]): graph: nx.DiGraph input_node: NodeId output_node: NodeId - node_map: t.Dict[NodeId, NodeLike] + node_map: t.Dict[NodeId, NodeBase] run_manager: DAGRunManagerLike retry_policy: RetryPolicyLike is_process_pool_needed: bool diff --git a/ml_pipeline_engine/visualization/dag.py b/ml_pipeline_engine/visualization/dag.py index d5924d9..3a4b494 100644 --- a/ml_pipeline_engine/visualization/dag.py +++ b/ml_pipeline_engine/visualization/dag.py @@ -8,8 +8,8 @@ from ml_pipeline_engine.node import get_callable_run_method from ml_pipeline_engine.node.enums import NodeType from ml_pipeline_engine.types import DAGLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike from ml_pipeline_engine.visualization import schema from ml_pipeline_engine.visualization.utils import copy_resources @@ -23,14 +23,14 @@ class GraphConfigImpl: def __init__(self, dag: DAGLike) -> None: self._dag = dag - def _get_node(self, node_id: NodeId) -> t.Type[NodeLike]: + def _get_node(self, node_id: NodeId) -> t.Type[NodeBase]: """ Get a node object. Sometimes it can be None if we work with an artificial node """ return self._dag.node_map.get(node_id) @staticmethod - def _get_node_relative_path(node: t.Type[NodeLike]) -> str: + def _get_node_relative_path(node: t.Type[NodeBase]) -> str: """ Generate relative path for a node """ diff --git a/ml_pipeline_engine/visualization/sample.py b/ml_pipeline_engine/visualization/sample.py index 04a4519..22e8a70 100644 --- a/ml_pipeline_engine/visualization/sample.py +++ b/ml_pipeline_engine/visualization/sample.py @@ -1,7 +1,7 @@ -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.dag_builders.annotation import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase class Ident(ProcessorBase): diff --git a/tests/context/__init__.py b/tests/context/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/dag/oneof/test_input_one_of_fails_dag.py b/tests/dag/oneof/test_input_one_of_fails_dag.py index 5436545..4e0391f 100644 --- a/tests/dag/oneof/test_input_one_of_fails_dag.py +++ b/tests/dag/oneof/test_input_one_of_fails_dag.py @@ -2,15 +2,14 @@ import pytest -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -20,54 +19,52 @@ def process(self, base_num: int, other_num: int) -> dict: } -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class ErrorDataSourceSecond(DataSource): +class ErrorDataSourceSecond(ProcessorBase): name = 'some_data_source_second' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> t.Type[Exception]: + def process(self) -> t.Type[Exception]: return Exception -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: + def process(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_input_one_of_first_success_dag.py b/tests/dag/oneof/test_input_one_of_first_success_dag.py index 69079f5..c0db0db 100644 --- a/tests/dag/oneof/test_input_one_of_first_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_first_success_dag.py @@ -1,14 +1,13 @@ import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -18,61 +17,59 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(DataSource): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> int: + def process(self, _: Input(SomeInput)) -> int: return 110 -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source_second' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> int: + def process(self, _: Input(SomeInput)) -> int: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> int: + def process(self) -> int: return 125 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: + def process(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: return feature_value + 20 -class NoneVectorizer(NodeBase): +class NoneVectorizer(ProcessorBase): name = 'none_vectorizer' - def vectorize(self) -> None: + def process(self) -> None: return None -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer), none_value: Input(NoneVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer), none_value: Input(NoneVectorizer)) -> float: assert none_value is None return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_input_one_of_last_success_dag.py b/tests/dag/oneof/test_input_one_of_last_success_dag.py index 247379a..b95bfff 100644 --- a/tests/dag/oneof/test_input_one_of_last_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_last_success_dag.py @@ -1,14 +1,13 @@ import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -18,54 +17,52 @@ def process(self, base_num: int, other_num: int) -> dict: } -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class ErrorDataSourceSecond(DataSource): +class ErrorDataSourceSecond(ProcessorBase): name = 'some_data_source_second' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> int: + def process(self) -> int: return 125 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: + def process(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_input_one_of_multiple_dag.py b/tests/dag/oneof/test_input_one_of_multiple_dag.py index a1ff0ef..f658526 100644 --- a/tests/dag/oneof/test_input_one_of_multiple_dag.py +++ b/tests/dag/oneof/test_input_one_of_multiple_dag.py @@ -1,14 +1,13 @@ import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -18,68 +17,66 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(DataSource): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> int: + def process(self, _: Input(SomeInput)) -> int: return 110 -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source_second' - title = 'SomeDataSource' - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureCopy(NodeBase): +class SomeFeatureCopy(ProcessorBase): name = 'some_feature_copy' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 15 -class SomeFeatureSecondCopy(NodeBase): +class SomeFeatureSecondCopy(ProcessorBase): name = 'some_feature_second_copy' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 15 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> int: + def process(self) -> int: return 130 -class FallbackFeatureSecond(NodeBase): +class FallbackFeatureSecond(ProcessorBase): name = 'fallback_feature_second' - def extract(self) -> int: + def process(self) -> int: return 130 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize( + def process( self, input_model: Input(SomeInput), feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature]), @@ -88,10 +85,10 @@ def vectorize( return feature_value + input_model['other_num'] + 15 + feature_value2 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_oneof_all_errors.py b/tests/dag/oneof/test_oneof_all_errors.py index 6b5e080..405c95f 100644 --- a/tests/dag/oneof/test_oneof_all_errors.py +++ b/tests/dag/oneof/test_oneof_all_errors.py @@ -2,11 +2,11 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag import OneOfDoesNotHaveResultError from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_oneof_fail_node.py b/tests/dag/oneof/test_oneof_fail_node.py index 956f49f..a6f4bb8 100644 --- a/tests/dag/oneof/test_oneof_fail_node.py +++ b/tests/dag/oneof/test_oneof_fail_node.py @@ -2,30 +2,27 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import build_node from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase -from ml_pipeline_engine.types import NodeLike -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' - title = 'input' def process(self, base_num: int) -> int: return base_num -class FlDataSourceGeneric(DataSource): - title = 'source' +class FlDataSourceGeneric(ProcessorBase): name = 'source' - def collect(self, **__: t.Any) -> t.Type[Exception]: + def process(self, **__: t.Any) -> t.Type[Exception]: raise Exception @@ -35,17 +32,17 @@ def collect(self, **__: t.Any) -> t.Type[Exception]: ) -class SomeFeatureGeneric(NodeBase): - title = 'feature' +class SomeFeatureGeneric(ProcessorBase): + name = 'feature' - def extract(self, fl_credit_history: InputGeneric(NodeLike), **__: t.Any) -> int: + def process(self, fl_credit_history: InputGeneric(NodeBase), **__: t.Any) -> int: return len(fl_credit_history) -class SomeFeatureFallback(NodeBase): - title = 'feature_fallback' +class SomeFeatureFallback(ProcessorBase): + name = 'feature_fallback' - def extract(self) -> int: + def process(self) -> int: return 777_777 @@ -56,10 +53,10 @@ def extract(self) -> int: ) -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, fl_credit_history_feature: InputOneOf([SomeFeature, SomeFeatureFallback])) -> int: + def process(self, fl_credit_history_feature: InputOneOf([SomeFeature, SomeFeatureFallback])) -> int: return fl_credit_history_feature @@ -68,7 +65,7 @@ async def test_fail_node( build_dag: t.Callable[..., DAGLike], mocker: pytest_mock.MockerFixture, ) -> None: - extract_patch = mocker.patch.object(SomeFeatureGeneric, 'extract') + extract_patch = mocker.patch.object(SomeFeatureGeneric, 'process') dag = build_dag(input_node=SomeInput, output_node=SomeMLModel) assert await dag.run(pipeline_context(base_num=10)) == 777_777 diff --git a/tests/dag/oneof/test_oneof_node_success.py b/tests/dag/oneof/test_oneof_node_success.py index 25cb163..540d15e 100644 --- a/tests/dag/oneof/test_oneof_node_success.py +++ b/tests/dag/oneof/test_oneof_node_success.py @@ -1,9 +1,9 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_oneof_recurrent_same_ancestors.py b/tests/dag/oneof/test_oneof_recurrent_same_ancestors.py index ffa3703..615f02b 100644 --- a/tests/dag/oneof/test_oneof_recurrent_same_ancestors.py +++ b/tests/dag/oneof/test_oneof_recurrent_same_ancestors.py @@ -1,12 +1,12 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import GenericInput from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.node import build_node from ml_pipeline_engine.node.enums import NodeTag from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_oneof_subgraph_with_single_node.py b/tests/dag/oneof/test_oneof_subgraph_with_single_node.py index 579f60e..692f852 100644 --- a/tests/dag/oneof/test_oneof_subgraph_with_single_node.py +++ b/tests/dag/oneof/test_oneof_subgraph_with_single_node.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_oneof_with_recurrent_subgraph.py b/tests/dag/oneof/test_oneof_with_recurrent_subgraph.py index 88dcba7..c7c138a 100644 --- a/tests/dag/oneof/test_oneof_with_recurrent_subgraph.py +++ b/tests/dag/oneof/test_oneof_with_recurrent_subgraph.py @@ -1,11 +1,11 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_inline_subgraph.py b/tests/dag/recurrent_subgraph/test_inline_subgraph.py index d29f552..a8dd51e 100644 --- a/tests/dag/recurrent_subgraph/test_inline_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_inline_subgraph.py @@ -1,10 +1,10 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_nested_subgraph.py b/tests/dag/recurrent_subgraph/test_nested_subgraph.py index a927597..ebb1bc1 100644 --- a/tests/dag/recurrent_subgraph/test_nested_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_nested_subgraph.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_simple_subgraph.py b/tests/dag/recurrent_subgraph/test_simple_subgraph.py index 7e2a266..fd737f9 100644 --- a/tests/dag/recurrent_subgraph/test_simple_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_simple_subgraph.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_default.py b/tests/dag/recurrent_subgraph/test_subgraph_default.py index 3f8ab47..10c44f0 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_default.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_default.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py b/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py index 1f000bb..ecc3ff6 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_error.py b/tests/dag/recurrent_subgraph/test_subgraph_with_error.py index 8726eac..6d1800f 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_error.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_error.py @@ -2,11 +2,11 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py b/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py index f2d6e31..898b489 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py @@ -5,11 +5,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py b/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py index 15d23fe..d47cede 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py @@ -2,11 +2,11 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/retry/test_dag_retry.py b/tests/dag/retry/test_dag_retry.py index 9440ed2..e2fd72d 100644 --- a/tests/dag/retry/test_dag_retry.py +++ b/tests/dag/retry/test_dag_retry.py @@ -2,10 +2,9 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike @@ -27,10 +26,10 @@ def external_func() -> float: return 0.1 -class SomeNode(DataSource): +class SomeNode(ProcessorBase): exceptions = (BaseExecutionError,) - def collect(self) -> float: + def process(self) -> float: return ExternalDatasource().external_func() @@ -55,7 +54,7 @@ async def test_dag_retry( mocker: pytest_mock.MockerFixture, ) -> None: - collect_spy = mocker.spy(SomeNode, 'collect') + collect_spy = mocker.spy(SomeNode, 'process') external_func_patch = mocker.patch.object( ExternalDatasource, 'external_func', diff --git a/tests/dag/retry/test_dag_retry__base_error.py b/tests/dag/retry/test_dag_retry__base_error.py index ad8e9e5..a8eba9a 100644 --- a/tests/dag/retry/test_dag_retry__base_error.py +++ b/tests/dag/retry/test_dag_retry__base_error.py @@ -3,17 +3,16 @@ import pytest import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike -class SomeNode(DataSource): +class SomeNode(ProcessorBase): exceptions = (Exception,) - def collect(self): # noqa + def process(self): # noqa raise BaseException('CustomError') @@ -37,7 +36,7 @@ async def test_dag_retry__base_error( build_dag: t.Callable[..., DAGLike], mocker: pytest_mock.MockerFixture, ) -> None: - collect_spy = mocker.spy(SomeNode, 'collect') + collect_spy = mocker.spy(SomeNode, 'process') with pytest.raises(BaseException, match='CustomError'): assert await build_dag(input_node=InvertNumber, output_node=DoubleNumber).run(pipeline_context(num=2.5)) diff --git a/tests/dag/retry/test_dag_retry__error.py b/tests/dag/retry/test_dag_retry__error.py index 9dff3c4..8177baf 100644 --- a/tests/dag/retry/test_dag_retry__error.py +++ b/tests/dag/retry/test_dag_retry__error.py @@ -3,10 +3,9 @@ import pytest import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike @@ -16,8 +15,8 @@ def external_func() -> float: return 0.1 -class SomeNode(DataSource): - def collect(self): # noqa +class SomeNode(ProcessorBase): + def process(self): # noqa return ExternalDatasource().external_func() @@ -42,7 +41,7 @@ async def test_dag_retry__error( mocker: pytest_mock.MockerFixture, ) -> None: - collect_spy = mocker.spy(SomeNode, 'collect') + collect_spy = mocker.spy(SomeNode, 'process') external_func_patch = mocker.patch.object( ExternalDatasource, 'external_func', diff --git a/tests/dag/switch_case/test_concurrent_switch.py b/tests/dag/switch_case/test_concurrent_switch.py index 445807b..1631e8a 100644 --- a/tests/dag/switch_case/test_concurrent_switch.py +++ b/tests/dag/switch_case/test_concurrent_switch.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_multiple_switch_case.py b/tests/dag/switch_case/test_dag_multiple_switch_case.py index f9b592a..80d2a9c 100644 --- a/tests/dag/switch_case/test_dag_multiple_switch_case.py +++ b/tests/dag/switch_case/test_dag_multiple_switch_case.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_nested_switch_case.py b/tests/dag/switch_case/test_dag_nested_switch_case.py index a9a60d8..5bef5ad 100644 --- a/tests/dag/switch_case/test_dag_nested_switch_case.py +++ b/tests/dag/switch_case/test_dag_nested_switch_case.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_switch_case.py b/tests/dag/switch_case/test_dag_switch_case.py index 6f00d1c..57e6658 100644 --- a/tests/dag/switch_case/test_dag_switch_case.py +++ b/tests/dag/switch_case/test_dag_switch_case.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_chain.py b/tests/dag/test_dag_chain.py index 0ea8e00..1685c84 100644 --- a/tests/dag/test_dag_chain.py +++ b/tests/dag/test_dag_chain.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_rhombus.py b/tests/dag/test_dag_rhombus.py index 4c7c885..c856a65 100644 --- a/tests/dag/test_dag_rhombus.py +++ b/tests/dag/test_dag_rhombus.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_single.py b/tests/dag/test_dag_single.py index 514aba3..79374d2 100644 --- a/tests/dag/test_dag_single.py +++ b/tests/dag/test_dag_single.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation import build_dag_single +from ml_pipeline_engine.node import ProcessorBase class DoubleNumber(ProcessorBase): diff --git a/tests/dag/test_demo_ml_model_dag.py b/tests/dag/test_demo_ml_model_dag.py index 1f2a626..b3f2a0f 100644 --- a/tests/dag/test_demo_ml_model_dag.py +++ b/tests/dag/test_demo_ml_model_dag.py @@ -2,11 +2,11 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -16,31 +16,31 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(NodeBase): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100 -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: Input(SomeFeature)) -> int: + def process(self, feature_value: Input(SomeFeature)) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/test_reusable_nodes.py b/tests/dag/test_reusable_nodes.py index c834752..7f09187 100644 --- a/tests/dag/test_reusable_nodes.py +++ b/tests/dag/test_reusable_nodes.py @@ -3,13 +3,13 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import build_node from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase -from ml_pipeline_engine.types import NodeLike -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -19,32 +19,32 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(NodeBase): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100 -class SomeCommonFeature(NodeBase): +class SomeCommonFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 # Пример Generic-ноды -class GenericVectorizer(NodeBase): +class GenericVectorizer(ProcessorBase): name = 'some_vectorizer' - async def vectorize(self, feature_value: InputGeneric(NodeLike), const: int) -> int: + async def process(self, feature_value: InputGeneric(NodeBase), const: int) -> int: return feature_value + 20 + const -class AnotherFeature(NodeBase): +class AnotherFeature(ProcessorBase): name = 'another_feature' - def extract(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100_000 @@ -67,17 +67,17 @@ def extract(self, inp: Input(SomeInput)) -> int: ) -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeParticularVectorizer)) -> float: + def process(self, vec_value: Input(SomeParticularVectorizer)) -> float: return (vec_value + 30) / 100 -class AnotherMlModel(NodeBase): +class AnotherMlModel(ProcessorBase): name = 'another_model' - def predict(self, vec_value: Input(AnotherParticularVectorizer)) -> float: + def process(self, vec_value: Input(AnotherParticularVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/tags/test_tags.py b/tests/tags/test_tags.py index 2cf9d25..f127d5e 100644 --- a/tests/tags/test_tags.py +++ b/tests/tags/test_tags.py @@ -4,14 +4,14 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node.enums import NodeTag from ml_pipeline_engine.parallelism import processes from ml_pipeline_engine.parallelism import threads from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' tags = (NodeTag.process,) @@ -22,33 +22,33 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(NodeBase): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100 -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' tags = (NodeTag.process,) - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' tags = (NodeTag.non_async,) - def vectorize(self, feature_value: Input(SomeFeature)) -> int: + def process(self, feature_value: Input(SomeFeature)) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - async def predict(self, vec_value: Input(SomeVectorizer)) -> float: + async def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/test_chart/test_dag.py b/tests/test_chart/test_dag.py index 72b9e10..70fc576 100644 --- a/tests/test_chart/test_dag.py +++ b/tests/test_chart/test_dag.py @@ -1,8 +1,8 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.chart import PipelineChart from ml_pipeline_engine.dag_builders.annotation import build_dag_single +from ml_pipeline_engine.node import ProcessorBase async def test_pipeline_chart_run_success(model_name_op: str) -> None: diff --git a/tests/test_events/test_dag.py b/tests/test_events/test_dag.py index 6a51a15..3d3e62e 100644 --- a/tests/test_events/test_dag.py +++ b/tests/test_events/test_dag.py @@ -4,22 +4,21 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.chart import PipelineChart from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import NodeId from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineResult async def test_pipeline_chart_events_success(mocker: pytest_mock.MockerFixture, model_name_op: str) -> None: - class SomeDataSourceNode(DataSource): + class SomeDataSourceNode(ProcessorBase): name = 'some_datasource' - def collect(self, x: int) -> int: + def process(self, x: int) -> int: return x * -1 class SomeOutputNode(ProcessorBase): @@ -75,7 +74,7 @@ async def on_node_complete( assert on_node_start.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', } assert on_node_start.call_args_list[1].kwargs == { @@ -87,7 +86,7 @@ async def on_node_complete( assert on_node_complete.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', 'error': None, } @@ -99,10 +98,10 @@ async def on_node_complete( async def test_pipeline_chart_events_error(mocker: pytest_mock.MockerFixture, model_name_op: str) -> None: - class SomeDataSourceNode(DataSource): + class SomeDataSourceNode(ProcessorBase): name = 'some_datasource' - async def collect(self, x: int) -> int: + async def process(self, x: int) -> int: return x * -1 class SomeOutputNode(ProcessorBase): @@ -160,7 +159,7 @@ async def on_node_complete( assert on_node_start.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', } assert on_node_start.call_args_list[1].kwargs == { @@ -172,7 +171,7 @@ async def on_node_complete( assert on_node_complete.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', 'error': None, } diff --git a/tests/test_utils.py b/tests/test_utils.py index 81d136b..454686a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,12 +1,14 @@ +import typing as t from uuid import UUID import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import generate_pipeline_id from ml_pipeline_engine.node import get_node_id -from ml_pipeline_engine.node import get_run_method from ml_pipeline_engine.node import run_node +from ml_pipeline_engine.node.errors import RunMethodExpectedError +from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase @@ -29,19 +31,21 @@ class SomeNode(ProcessorBase): def process(x: int) -> int: return x - assert get_run_method(SomeNode) == 'process' assert await run_node(SomeNode, x=10, node_id='an_example') == 10 -def test_get_run_method_2_methods_error() -> None: +async def test_build_graph__error_no_process_method( + build_dag: t.Callable[..., DAGLike], +) -> None: class SomeNode(NodeBase): @staticmethod - def extract(x: int) -> int: + def process(x: int) -> int: return x + class AnotherNode(NodeBase): @staticmethod - def collect(x: int) -> int: + def not_process(x: int) -> int: return x - with pytest.raises(AssertionError): - get_run_method(SomeNode) + with pytest.raises(RunMethodExpectedError): + build_dag(input_node=SomeNode, output_node=AnotherNode) diff --git a/tests/visualization/test_visualization.py b/tests/visualization/test_visualization.py index 716f374..fe83754 100644 --- a/tests/visualization/test_visualization.py +++ b/tests/visualization/test_visualization.py @@ -8,21 +8,20 @@ import pytest from click.testing import CliRunner -from ml_pipeline_engine.base_nodes.datasources import DataSource -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.cli import build_static from ml_pipeline_engine.dag_builders.annotation.builder import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import GenericInput from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import build_node -class InvertNumber(DataSource): +class InvertNumber(ProcessorBase): name = 'invert_number' verbose_name = 'Invert!' - def collect(self, num: float) -> float: + def process(self, num: float) -> float: return -num @@ -154,21 +153,17 @@ async def test_basic(call_func: t.Callable) -> None: 'target': 'processor__double_number', }, { - 'id': 'datasource__invert_number->processor__add_const', - 'source': 'datasource__invert_number', + 'id': 'processor__invert_number->processor__add_const', + 'source': 'processor__invert_number', 'target': 'processor__add_const', }, { - 'id': 'datasource__invert_number->processor__tests_visualization_test_visualization_SwitchNode', - 'source': 'datasource__invert_number', + 'id': 'processor__invert_number->processor__tests_visualization_test_visualization_SwitchNode', + 'source': 'processor__invert_number', 'target': 'processor__tests_visualization_test_visualization_SwitchNode', }, ], 'node_types': { - 'datasource': { - 'hex_bgr_color': None, - 'name': 'datasource', - }, 'processor': { 'hex_bgr_color': None, 'name': 'processor', @@ -188,7 +183,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L60', + 'code_source': 'tests/visualization/test_visualization.py#L59', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': None, 'verbose_name': None, @@ -200,7 +195,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L55', + 'code_source': 'tests/visualization/test_visualization.py#L54', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': None, 'verbose_name': None, @@ -212,7 +207,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L73', + 'code_source': 'tests/visualization/test_visualization.py#L72', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': None, 'verbose_name': None, @@ -224,7 +219,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L45', # Line for the real source! + 'code_source': 'tests/visualization/test_visualization.py#L44', # Line for the real source! 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'another_feature', 'verbose_name': None, @@ -236,7 +231,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L37', + 'code_source': 'tests/visualization/test_visualization.py#L36', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'double_number', 'verbose_name': 'Double!', @@ -248,7 +243,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L29', + 'code_source': 'tests/visualization/test_visualization.py#L28', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'add_const', 'verbose_name': 'Add!', @@ -260,15 +255,15 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L21', - 'doc': 'Базовый класс для источников данных', + 'code_source': 'tests/visualization/test_visualization.py#L20', + 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'invert_number', 'verbose_name': 'Invert!', }, - 'id': 'datasource__invert_number', + 'id': 'processor__invert_number', 'is_generic': False, 'is_virtual': False, - 'type': 'datasource', + 'type': 'processor', }, ], }