From 0fe8dda297295fb6205900682d3b6968769bba26 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Thu, 9 May 2024 16:58:34 +0500 Subject: [PATCH 01/12] test: Update tests to use .base_nodes.processor instead of .types --- tests/context/__init__.py | 0 tests/dag/oneof/test_fail_node.py | 18 +++---- .../dag/oneof/test_input_one_of_fails_dag.py | 24 ++++----- .../test_input_one_of_first_success_dag.py | 28 +++++----- .../test_input_one_of_last_success_dag.py | 24 ++++----- .../oneof/test_input_one_of_multiple_dag.py | 36 ++++++------- tests/dag/test_demo_ml_model_dag.py | 20 +++---- tests/dag/test_reusable_nodes.py | 28 +++++----- tests/tags/test_tags.py | 20 +++---- tests/test_utils.py | 52 ------------------- 10 files changed, 99 insertions(+), 151 deletions(-) delete mode 100644 tests/context/__init__.py diff --git a/tests/context/__init__.py b/tests/context/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/dag/oneof/test_fail_node.py b/tests/dag/oneof/test_fail_node.py index 13764c4..0c341cc 100644 --- a/tests/dag/oneof/test_fail_node.py +++ b/tests/dag/oneof/test_fail_node.py @@ -3,6 +3,7 @@ import pytest_mock from ml_pipeline_engine.base_nodes.datasources import DataSource +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric @@ -10,11 +11,10 @@ from ml_pipeline_engine.decorators import guard_datasource_error from ml_pipeline_engine.node import build_node from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeLike -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' title = 'input' @@ -37,19 +37,19 @@ def collect(self, _: InputGeneric(NodeLike)) -> t.Type[Exception]: ) -class SomeFeatureGeneric(NodeBase): +class SomeFeatureGeneric(ProcessorBase): title = 'feature' - def extract(self, fl_credit_history: InputGeneric(NodeLike)) -> int: + def process(self, fl_credit_history: InputGeneric(NodeLike)) -> int: # Не должно запускаться, так как fl_credit_history будет заполнено ошибкой. # Как следствие, эта нода обязана быть прощенной return len(fl_credit_history) -class SomeFeatureFallback(NodeBase): +class SomeFeatureFallback(ProcessorBase): title = 'feature_fallback' - def extract(self) -> int: + def process(self) -> int: return 777_777 @@ -60,10 +60,10 @@ def extract(self) -> int: ) -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, fl_credit_history_feature: InputOneOf([SomeFeature, SomeFeatureFallback])) -> int: + def process(self, fl_credit_history_feature: InputOneOf([SomeFeature, SomeFeatureFallback])) -> int: return fl_credit_history_feature @@ -72,7 +72,7 @@ async def test_fail_node( build_dag: t.Callable[..., DAGLike], mocker: pytest_mock.MockerFixture, ) -> None: - extract_patch = mocker.patch.object(SomeFeatureGeneric, 'extract') + extract_patch = mocker.patch.object(SomeFeatureGeneric, 'process') dag = build_dag(input_node=SomeInput, output_node=SomeMLModel) assert await dag.run(pipeline_context(base_num=10)) == 777_777 diff --git a/tests/dag/oneof/test_input_one_of_fails_dag.py b/tests/dag/oneof/test_input_one_of_fails_dag.py index f19d872..e50cb00 100644 --- a/tests/dag/oneof/test_input_one_of_fails_dag.py +++ b/tests/dag/oneof/test_input_one_of_fails_dag.py @@ -3,15 +3,15 @@ import pytest from ml_pipeline_engine.base_nodes.datasources import DataSource +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -39,38 +39,38 @@ def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> t.Type[Exception]: + def process(self) -> t.Type[Exception]: return Exception -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: + def process(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_input_one_of_first_success_dag.py b/tests/dag/oneof/test_input_one_of_first_success_dag.py index 0d7cfcd..be32bed 100644 --- a/tests/dag/oneof/test_input_one_of_first_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_first_success_dag.py @@ -1,15 +1,15 @@ import typing as t from ml_pipeline_engine.base_nodes.datasources import DataSource +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -36,45 +36,45 @@ def collect(self, _: Input(SomeInput)) -> int: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> int: + def process(self) -> int: return 125 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: + def process(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: return feature_value + 20 -class NoneVectorizer(NodeBase): +class NoneVectorizer(ProcessorBase): name = 'none_vectorizer' - def vectorize(self) -> None: + def process(self) -> None: return None -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer), none_value: Input(NoneVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer), none_value: Input(NoneVectorizer)) -> float: assert none_value is None return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_input_one_of_last_success_dag.py b/tests/dag/oneof/test_input_one_of_last_success_dag.py index d9b7698..c9a7911 100644 --- a/tests/dag/oneof/test_input_one_of_last_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_last_success_dag.py @@ -1,15 +1,15 @@ import typing as t from ml_pipeline_engine.base_nodes.datasources import DataSource +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -37,38 +37,38 @@ def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSourceSecond), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> int: + def process(self) -> int: return 125 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: + def process(self, feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature])) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/oneof/test_input_one_of_multiple_dag.py b/tests/dag/oneof/test_input_one_of_multiple_dag.py index 23bd98e..cfee134 100644 --- a/tests/dag/oneof/test_input_one_of_multiple_dag.py +++ b/tests/dag/oneof/test_input_one_of_multiple_dag.py @@ -1,15 +1,15 @@ import typing as t from ml_pipeline_engine.base_nodes.datasources import DataSource +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -36,52 +36,52 @@ def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureCopy(NodeBase): +class SomeFeatureCopy(ProcessorBase): name = 'some_feature_copy' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeFeatureSecond(NodeBase): +class SomeFeatureSecond(ProcessorBase): name = 'some_feature_second' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 15 -class SomeFeatureSecondCopy(NodeBase): +class SomeFeatureSecondCopy(ProcessorBase): name = 'some_feature_second_copy' - def extract(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(ErrorDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 15 -class FallbackFeature(NodeBase): +class FallbackFeature(ProcessorBase): name = 'fallback_feature' - def extract(self) -> int: + def process(self) -> int: return 130 -class FallbackFeatureSecond(NodeBase): +class FallbackFeatureSecond(ProcessorBase): name = 'fallback_feature_second' - def extract(self) -> int: + def process(self) -> int: return 130 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize( + def process( self, input_model: Input(SomeInput), feature_value: InputOneOf([SomeFeature, SomeFeatureSecond, FallbackFeature]), @@ -90,10 +90,10 @@ def vectorize( return feature_value + input_model['other_num'] + 15 + feature_value2 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/test_demo_ml_model_dag.py b/tests/dag/test_demo_ml_model_dag.py index 1f2a626..b2854f6 100644 --- a/tests/dag/test_demo_ml_model_dag.py +++ b/tests/dag/test_demo_ml_model_dag.py @@ -1,12 +1,12 @@ import typing as t +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -16,31 +16,31 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(NodeBase): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100 -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' - def vectorize(self, feature_value: Input(SomeFeature)) -> int: + def process(self, feature_value: Input(SomeFeature)) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeVectorizer)) -> float: + def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/dag/test_reusable_nodes.py b/tests/dag/test_reusable_nodes.py index c834752..3f66264 100644 --- a/tests/dag/test_reusable_nodes.py +++ b/tests/dag/test_reusable_nodes.py @@ -1,15 +1,15 @@ import typing as t +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric from ml_pipeline_engine.node import build_node from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeLike -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' def process(self, base_num: int, other_num: int) -> dict: @@ -19,32 +19,32 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(NodeBase): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100 -class SomeCommonFeature(NodeBase): +class SomeCommonFeature(ProcessorBase): name = 'some_feature' - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 # Пример Generic-ноды -class GenericVectorizer(NodeBase): +class GenericVectorizer(ProcessorBase): name = 'some_vectorizer' - async def vectorize(self, feature_value: InputGeneric(NodeLike), const: int) -> int: + async def process(self, feature_value: InputGeneric(NodeLike), const: int) -> int: return feature_value + 20 + const -class AnotherFeature(NodeBase): +class AnotherFeature(ProcessorBase): name = 'another_feature' - def extract(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100_000 @@ -67,17 +67,17 @@ def extract(self, inp: Input(SomeInput)) -> int: ) -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - def predict(self, vec_value: Input(SomeParticularVectorizer)) -> float: + def process(self, vec_value: Input(SomeParticularVectorizer)) -> float: return (vec_value + 30) / 100 -class AnotherMlModel(NodeBase): +class AnotherMlModel(ProcessorBase): name = 'another_model' - def predict(self, vec_value: Input(AnotherParticularVectorizer)) -> float: + def process(self, vec_value: Input(AnotherParticularVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/tags/test_tags.py b/tests/tags/test_tags.py index 2cf9d25..c88cde8 100644 --- a/tests/tags/test_tags.py +++ b/tests/tags/test_tags.py @@ -2,16 +2,16 @@ import pytest_mock +from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.node.enums import NodeTag from ml_pipeline_engine.parallelism import processes from ml_pipeline_engine.parallelism import threads from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeBase -class SomeInput(NodeBase): +class SomeInput(ProcessorBase): name = 'input' tags = (NodeTag.process,) @@ -22,33 +22,33 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(NodeBase): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, inp: Input(SomeInput)) -> int: + def process(self, inp: Input(SomeInput)) -> int: return inp['base_num'] + 100 -class SomeFeature(NodeBase): +class SomeFeature(ProcessorBase): name = 'some_feature' tags = (NodeTag.process,) - def extract(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: + def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int: return ds_value + inp['other_num'] + 10 -class SomeVectorizer(NodeBase): +class SomeVectorizer(ProcessorBase): name = 'some_vectorizer' tags = (NodeTag.non_async,) - def vectorize(self, feature_value: Input(SomeFeature)) -> int: + def process(self, feature_value: Input(SomeFeature)) -> int: return feature_value + 20 -class SomeMLModel(NodeBase): +class SomeMLModel(ProcessorBase): name = 'some_model' - async def predict(self, vec_value: Input(SomeVectorizer)) -> float: + async def process(self, vec_value: Input(SomeVectorizer)) -> float: return (vec_value + 30) / 100 diff --git a/tests/test_utils.py b/tests/test_utils.py index 7ca3e74..dab7d7f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,12 +1,6 @@ from uuid import UUID -import pytest - -from ml_pipeline_engine.base_nodes.datasources import DataSource -from ml_pipeline_engine.base_nodes.feature import FeatureBase -from ml_pipeline_engine.base_nodes.ml_model import MLModelBase from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.vectorizer import FeatureVectorizerBase from ml_pipeline_engine.node import generate_pipeline_id from ml_pipeline_engine.node import get_node_id from ml_pipeline_engine.node import get_run_method @@ -42,49 +36,3 @@ def process(x: int) -> int: assert get_run_method(SomeNode) == 'process' assert await run_node(SomeNode, x=10) == 10 - - class SomeNode(FeatureBase): - @staticmethod - def extract(x: int) -> int: - return x - - assert get_run_method(SomeNode) == 'extract' - assert await run_node(SomeNode, x=10) == 10 - - class SomeNode(DataSource): - @staticmethod - def collect(x: int) -> int: - return x - - assert get_run_method(SomeNode) == 'collect' - assert await run_node(SomeNode, x=10) == 10 - - class SomeNode(FeatureVectorizerBase): - @staticmethod - def vectorize(x: int) -> int: - return x - - assert get_run_method(SomeNode) == 'vectorize' - assert await run_node(SomeNode, x=10) == 10 - - class SomeNode(MLModelBase): - @staticmethod - def predict(x: int) -> int: - return x - - assert get_run_method(SomeNode) == 'predict' - assert await run_node(SomeNode, x=10) == 10 - - -def test_get_run_method_2_methods_error() -> None: - class SomeNode(NodeBase): - @staticmethod - def extract(x: int) -> int: - return x - - @staticmethod - def collect(x: int) -> int: - return x - - with pytest.raises(AssertionError): - get_run_method(SomeNode) From 08b021f1b653179227c2d18f904bdbf7cd4113a0 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Fri, 10 May 2024 14:27:03 +0500 Subject: [PATCH 02/12] test: Remove NodeProtocol.title usage from tests --- tests/dag/oneof/test_fail_node.py | 6 ++---- tests/dag/oneof/test_input_one_of_fails_dag.py | 2 -- tests/dag/oneof/test_input_one_of_first_success_dag.py | 2 -- tests/dag/oneof/test_input_one_of_last_success_dag.py | 2 -- tests/dag/oneof/test_input_one_of_multiple_dag.py | 2 -- 5 files changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/dag/oneof/test_fail_node.py b/tests/dag/oneof/test_fail_node.py index 0c341cc..8f986cb 100644 --- a/tests/dag/oneof/test_fail_node.py +++ b/tests/dag/oneof/test_fail_node.py @@ -16,14 +16,12 @@ class SomeInput(ProcessorBase): name = 'input' - title = 'input' def process(self, base_num: int) -> int: return base_num class FlDataSourceGeneric(DataSource): - title = 'source' name = 'source' @guard_datasource_error() @@ -38,7 +36,7 @@ def collect(self, _: InputGeneric(NodeLike)) -> t.Type[Exception]: class SomeFeatureGeneric(ProcessorBase): - title = 'feature' + name = 'feature' def process(self, fl_credit_history: InputGeneric(NodeLike)) -> int: # Не должно запускаться, так как fl_credit_history будет заполнено ошибкой. @@ -47,7 +45,7 @@ def process(self, fl_credit_history: InputGeneric(NodeLike)) -> int: class SomeFeatureFallback(ProcessorBase): - title = 'feature_fallback' + name = 'feature_fallback' def process(self) -> int: return 777_777 diff --git a/tests/dag/oneof/test_input_one_of_fails_dag.py b/tests/dag/oneof/test_input_one_of_fails_dag.py index e50cb00..d6bfacc 100644 --- a/tests/dag/oneof/test_input_one_of_fails_dag.py +++ b/tests/dag/oneof/test_input_one_of_fails_dag.py @@ -23,7 +23,6 @@ def process(self, base_num: int, other_num: int) -> dict: class ErrorDataSource(DataSource): name = 'some_data_source' - title = 'SomeDataSource' @guard_datasource_error() def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: @@ -32,7 +31,6 @@ def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: class ErrorDataSourceSecond(DataSource): name = 'some_data_source_second' - title = 'SomeDataSource' @guard_datasource_error() def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: diff --git a/tests/dag/oneof/test_input_one_of_first_success_dag.py b/tests/dag/oneof/test_input_one_of_first_success_dag.py index be32bed..03d8e92 100644 --- a/tests/dag/oneof/test_input_one_of_first_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_first_success_dag.py @@ -21,7 +21,6 @@ def process(self, base_num: int, other_num: int) -> dict: class SomeDataSource(DataSource): name = 'some_data_source' - title = 'SomeDataSource' def collect(self, _: Input(SomeInput)) -> int: return 110 @@ -29,7 +28,6 @@ def collect(self, _: Input(SomeInput)) -> int: class ErrorDataSource(DataSource): name = 'some_data_source_second' - title = 'SomeDataSource' @guard_datasource_error() def collect(self, _: Input(SomeInput)) -> int: diff --git a/tests/dag/oneof/test_input_one_of_last_success_dag.py b/tests/dag/oneof/test_input_one_of_last_success_dag.py index c9a7911..1095ab7 100644 --- a/tests/dag/oneof/test_input_one_of_last_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_last_success_dag.py @@ -21,7 +21,6 @@ def process(self, base_num: int, other_num: int) -> dict: class ErrorDataSource(DataSource): name = 'some_data_source' - title = 'SomeDataSource' @guard_datasource_error() def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: @@ -30,7 +29,6 @@ def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: class ErrorDataSourceSecond(DataSource): name = 'some_data_source_second' - title = 'SomeDataSource' @guard_datasource_error() def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: diff --git a/tests/dag/oneof/test_input_one_of_multiple_dag.py b/tests/dag/oneof/test_input_one_of_multiple_dag.py index cfee134..a165066 100644 --- a/tests/dag/oneof/test_input_one_of_multiple_dag.py +++ b/tests/dag/oneof/test_input_one_of_multiple_dag.py @@ -21,7 +21,6 @@ def process(self, base_num: int, other_num: int) -> dict: class SomeDataSource(DataSource): name = 'some_data_source' - title = 'SomeDataSource' def collect(self, _: Input(SomeInput)) -> int: return 110 @@ -29,7 +28,6 @@ def collect(self, _: Input(SomeInput)) -> int: class ErrorDataSource(DataSource): name = 'some_data_source_second' - title = 'SomeDataSource' @guard_datasource_error() def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: From c7bfccb2e060ac9b2f528fabb3825eaeef8df8d2 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 12 May 2024 16:45:42 +0500 Subject: [PATCH 03/12] refactor: Remove logical statements from guard_datasource_error --- ml_pipeline_engine/decorators.py | 29 +++-------------------------- ml_pipeline_engine/exceptions.py | 7 +++---- 2 files changed, 6 insertions(+), 30 deletions(-) diff --git a/ml_pipeline_engine/decorators.py b/ml_pipeline_engine/decorators.py index 1c45f01..596fdbb 100644 --- a/ml_pipeline_engine/decorators.py +++ b/ml_pipeline_engine/decorators.py @@ -1,43 +1,20 @@ import functools import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.exceptions import DataSourceCollectError from ml_pipeline_engine.logs import logger_decorators as logger -def guard_datasource_error(name: t.Optional[str] = None, title: t.Optional[str] = None) -> t.Callable: +def guard_datasource_error() -> t.Callable: def _guard_datasource_error(func: t.Callable) -> t.Callable: @functools.wraps(func) def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: - is_empty_names = name is None and title is None - if args and isinstance(args[0], DataSource): - assert is_empty_names, 'Не нужно явно указывать имя источника данных, если используется DataSource' - data_source = args[0] - else: - assert not is_empty_names, 'Укажите имя источника данных явно' - _name, _title = name, title - - class _LegacyDataSource(DataSource): - """ - Для совместимости с кодом, где не используется класс DataSource - """ - - name = _name - title = _title - - def collect(self) -> t.Any: - raise NotImplementedError - - data_source = _LegacyDataSource() + node = args[0] try: return func(*args, **kwargs) except Exception as ex: logger.info('Источник отработал с ошибкой и вернул DataSourceCollectError: %s', str(ex)) - return DataSourceCollectError( - source_title=data_source.title, - source_name=getattr(data_source, 'name', None), - ) + return DataSourceCollectError(name=getattr(node, 'name', None)) return wrapper diff --git a/ml_pipeline_engine/exceptions.py b/ml_pipeline_engine/exceptions.py index 6584852..220d40a 100644 --- a/ml_pipeline_engine/exceptions.py +++ b/ml_pipeline_engine/exceptions.py @@ -7,10 +7,9 @@ class NodeErrorType(str, Enum): class DataSourceCollectError(Exception): - def __init__(self, source_title: str, source_name: t.Optional[str] = None, *args: t.Any) -> None: + def __init__(self, name: t.Optional[str] = None, *args: t.Any) -> None: super().__init__(args) - self.source_title = source_title - self.source_name = source_name + self.name = name def __str__(self) -> str: - return f'Не удалось получить данные из источника "{self.source_title}"' + return f'Не удалось получить данные из источника "{self.name}"' From bc7ed1c9b4187a6ccf5b8e5236a08211678cad5e Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 12 May 2024 19:53:58 +0500 Subject: [PATCH 04/12] BREAKING CHANGE: Removed specific node types --- README.md | 18 +----- ml_pipeline_engine/base_nodes/datasources.py | 15 ----- ml_pipeline_engine/base_nodes/feature.py | 15 ----- ml_pipeline_engine/base_nodes/ml_model.py | 15 ----- ml_pipeline_engine/base_nodes/vectorizer.py | 15 ----- ml_pipeline_engine/node/enums.py | 4 -- ml_pipeline_engine/node/node.py | 13 +--- ml_pipeline_engine/types.py | 64 +------------------ tests/dag/oneof/test_fail_node.py | 5 +- .../dag/oneof/test_input_one_of_fails_dag.py | 9 ++- .../test_input_one_of_first_success_dag.py | 9 ++- .../test_input_one_of_last_success_dag.py | 9 ++- .../oneof/test_input_one_of_multiple_dag.py | 9 ++- tests/dag/retry/test_dag_retry.py | 7 +- tests/dag/retry/test_dag_retry__base_error.py | 7 +- tests/dag/retry/test_dag_retry__error.py | 7 +- tests/test_events/test_dag.py | 17 +++-- tests/visualization/test_visualization.py | 37 +++++------ 18 files changed, 59 insertions(+), 216 deletions(-) delete mode 100644 ml_pipeline_engine/base_nodes/datasources.py delete mode 100644 ml_pipeline_engine/base_nodes/feature.py delete mode 100644 ml_pipeline_engine/base_nodes/ml_model.py delete mode 100644 ml_pipeline_engine/base_nodes/vectorizer.py diff --git a/README.md b/README.md index ed4a4f2..cea9ad7 100644 --- a/README.md +++ b/README.md @@ -5,29 +5,17 @@ ## Table of Contents - [Usage](#usage) - - [Что нужно, чтобы сделать свой пайплайн?](#что-нужно-чтобы-сделать-свой-пайплайн) - - [Поддерживаемые типы узлов](#поддерживаемые-типы-узлов) - [Development](#development) - [Environment setup](#environment-setup) ## Usage -### Что нужно, чтобы сделать свой пайплайн? -1. Написать классы узлов -2. Связать узлы посредством указания зависимости +To create a pipeline: +1. Define classes that represent nodes +2. Connect nodes by defining the parent node(-s) or no parent for each node -### Поддерживаемые типы узлов - -[Протоколы](ml_pipeline_engine/types.py) - -1. [DataSource](ml_pipeline_engine/base_nodes/datasources.py) -2. [FeatureBase](ml_pipeline_engine/base_nodes/feature.py) -3. [MLModelBase](ml_pipeline_engine/base_nodes/ml_model.py) -4. [ProcessorBase](ml_pipeline_engine/base_nodes/processors.py) -5. [FeatureVectorizerBase](ml_pipeline_engine/base_nodes/vectorizer.py) - Примеры использования описаны в файле [docs/usage_examples.md](docs/usage_examples.md) ## Development diff --git a/ml_pipeline_engine/base_nodes/datasources.py b/ml_pipeline_engine/base_nodes/datasources.py deleted file mode 100644 index 8a8f349..0000000 --- a/ml_pipeline_engine/base_nodes/datasources.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class DataSource(NodeBase): - """ - Базовый класс для источников данных - """ - - node_type = NodeType.datasource.value - - def collect(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method collect() is not implemented') diff --git a/ml_pipeline_engine/base_nodes/feature.py b/ml_pipeline_engine/base_nodes/feature.py deleted file mode 100644 index 0dcf2c0..0000000 --- a/ml_pipeline_engine/base_nodes/feature.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class FeatureBase(NodeBase): - """ - Базовый класс для набора фичей - """ - - node_type = NodeType.feature.value - - def extract(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method extract() is not implemented') diff --git a/ml_pipeline_engine/base_nodes/ml_model.py b/ml_pipeline_engine/base_nodes/ml_model.py deleted file mode 100644 index 48fb0f5..0000000 --- a/ml_pipeline_engine/base_nodes/ml_model.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class MLModelBase(NodeBase): - """ - Базовый класс для ML-моделей - """ - - node_type = NodeType.ml_model.value - - def predict(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method predict() is not implemented') diff --git a/ml_pipeline_engine/base_nodes/vectorizer.py b/ml_pipeline_engine/base_nodes/vectorizer.py deleted file mode 100644 index 07bf1f9..0000000 --- a/ml_pipeline_engine/base_nodes/vectorizer.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - -from ml_pipeline_engine.node.enums import NodeType -from ml_pipeline_engine.types import NodeBase - - -class FeatureVectorizerBase(NodeBase): - """ - Базовый класс для векторизаторов - """ - - node_type = NodeType.vectorizer.value - - def vectorize(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - raise NotImplementedError('Method vectorize() is not implemented') diff --git a/ml_pipeline_engine/node/enums.py b/ml_pipeline_engine/node/enums.py index 883fe14..e940cf9 100644 --- a/ml_pipeline_engine/node/enums.py +++ b/ml_pipeline_engine/node/enums.py @@ -2,11 +2,7 @@ class NodeType(str, enum.Enum): - datasource = 'datasource' - feature = 'feature' - ml_model = 'ml_model' processor = 'processor' - vectorizer = 'vectorizer' generic = 'generic' switch = 'switch' input_one_of = 'input_one_of' diff --git a/ml_pipeline_engine/node/node.py b/ml_pipeline_engine/node/node.py index 4f2c1f2..ded9f6b 100644 --- a/ml_pipeline_engine/node/node.py +++ b/ml_pipeline_engine/node/node.py @@ -39,15 +39,8 @@ def get_node_id(node: NodeLike) -> NodeId: def get_run_method(node: NodeLike) -> t.Optional[str]: - run_method = None - - for method in NodeBase.RUN_METHOD_ALIASES: - if callable(getattr(node, method, None)): - if run_method is not None: - raise AssertionError(f'Node should have only one run method. {run_method} + {method} detected') - run_method = method - - return run_method + run_method = NodeBase.RUN_METHOD_ALIAS + return run_method if callable(getattr(node, run_method, None)) else None def get_callable_run_method(node: NodeLike) -> t.Callable: @@ -131,7 +124,7 @@ def build_node( run_method = get_run_method(node) if not run_method: raise RunMethodExpectedError( - f'Ожидается наличие хотя бы одного run-метода. methods={NodeBase.RUN_METHOD_ALIASES}', + f'Missing method for node execution. Expected name={NodeBase.RUN_METHOD_ALIAS}', ) if inspect.iscoroutinefunction(getattr(node, run_method)): diff --git a/ml_pipeline_engine/types.py b/ml_pipeline_engine/types.py index 0486fe0..10e2219 100644 --- a/ml_pipeline_engine/types.py +++ b/ml_pipeline_engine/types.py @@ -6,10 +6,6 @@ import networkx as nx ProcessorResultT = t.TypeVar('ProcessorResultT') -DataSourceResultT = t.TypeVar('DataSourceResultT') -FeatureResultT = t.TypeVar('FeatureResultT') -FeatureVectorizerResultT = t.TypeVar('FeatureVectorizerResultT') -MLModelResultT = t.TypeVar('MLModelResultT') NodeResultT = t.TypeVar('NodeResultT') AdditionalDataT = t.TypeVar('AdditionalDataT', bound=t.Any) @@ -68,13 +64,7 @@ class NodeProtocol(t.Protocol): Узел графа модели """ - RUN_METHOD_ALIASES = ( - 'process', - 'extract', - 'collect', - 'vectorize', - 'predict', - ) + RUN_METHOD_ALIAS = 'process' node_type: t.ClassVar[str] = None name: t.ClassVar[str] = None @@ -97,57 +87,7 @@ class ProcessorLike(RetryProtocol, TagProtocol, t.Protocol[ProcessorResultT]): ] -class DataSourceLike(RetryProtocol, TagProtocol, t.Protocol[DataSourceResultT]): - """ - Источник данных - """ - - collect: t.Union[ - t.Callable[..., DataSourceResultT], - t.Callable[..., t.Awaitable[DataSourceResultT]], - ] - - -class FeatureLike(RetryProtocol, TagProtocol, t.Protocol[FeatureResultT]): - """ - Фича - """ - - extract: t.Union[ - t.Callable[..., FeatureResultT], - t.Callable[..., t.Awaitable[FeatureResultT]], - ] - - -class FeatureVectorizerLike(RetryProtocol, TagProtocol, t.Protocol[FeatureVectorizerResultT]): - """ - Векторизатор фичей - """ - - vectorize: t.Union[ - t.Callable[..., FeatureVectorizerResultT], - t.Callable[..., t.Awaitable[FeatureVectorizerResultT]], - ] - - -class MLModelLike(RetryProtocol, TagProtocol, t.Protocol[MLModelResultT]): - """ - ML-модель - """ - - predict: t.Union[ - t.Callable[..., MLModelResultT], - t.Callable[..., t.Awaitable[MLModelResultT]], - ] - - -NodeLike = t.Union[ - t.Type[ProcessorLike[NodeResultT]], - t.Type[DataSourceLike[NodeResultT]], - t.Type[FeatureLike[NodeResultT]], - t.Type[FeatureVectorizerLike[NodeResultT]], - t.Type[MLModelLike[NodeResultT]], -] +NodeLike = t.Type[ProcessorLike[NodeResultT]] @dataclass(frozen=True) diff --git a/tests/dag/oneof/test_fail_node.py b/tests/dag/oneof/test_fail_node.py index 8f986cb..365a6aa 100644 --- a/tests/dag/oneof/test_fail_node.py +++ b/tests/dag/oneof/test_fail_node.py @@ -2,7 +2,6 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -21,11 +20,11 @@ def process(self, base_num: int) -> int: return base_num -class FlDataSourceGeneric(DataSource): +class FlDataSourceGeneric(ProcessorBase): name = 'source' @guard_datasource_error() - def collect(self, _: InputGeneric(NodeLike)) -> t.Type[Exception]: + def process(self, _: InputGeneric(NodeLike)) -> t.Type[Exception]: raise Exception diff --git a/tests/dag/oneof/test_input_one_of_fails_dag.py b/tests/dag/oneof/test_input_one_of_fails_dag.py index d6bfacc..6d196f1 100644 --- a/tests/dag/oneof/test_input_one_of_fails_dag.py +++ b/tests/dag/oneof/test_input_one_of_fails_dag.py @@ -2,7 +2,6 @@ import pytest -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -21,19 +20,19 @@ def process(self, base_num: int, other_num: int) -> dict: } -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source' @guard_datasource_error() - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class ErrorDataSourceSecond(DataSource): +class ErrorDataSourceSecond(ProcessorBase): name = 'some_data_source_second' @guard_datasource_error() - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception diff --git a/tests/dag/oneof/test_input_one_of_first_success_dag.py b/tests/dag/oneof/test_input_one_of_first_success_dag.py index 03d8e92..c544c7b 100644 --- a/tests/dag/oneof/test_input_one_of_first_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_first_success_dag.py @@ -1,6 +1,5 @@ import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -19,18 +18,18 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(DataSource): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, _: Input(SomeInput)) -> int: + def process(self, _: Input(SomeInput)) -> int: return 110 -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source_second' @guard_datasource_error() - def collect(self, _: Input(SomeInput)) -> int: + def process(self, _: Input(SomeInput)) -> int: raise Exception diff --git a/tests/dag/oneof/test_input_one_of_last_success_dag.py b/tests/dag/oneof/test_input_one_of_last_success_dag.py index 1095ab7..da27aab 100644 --- a/tests/dag/oneof/test_input_one_of_last_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_last_success_dag.py @@ -1,6 +1,5 @@ import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -19,19 +18,19 @@ def process(self, base_num: int, other_num: int) -> dict: } -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source' @guard_datasource_error() - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception -class ErrorDataSourceSecond(DataSource): +class ErrorDataSourceSecond(ProcessorBase): name = 'some_data_source_second' @guard_datasource_error() - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception diff --git a/tests/dag/oneof/test_input_one_of_multiple_dag.py b/tests/dag/oneof/test_input_one_of_multiple_dag.py index a165066..e3390a7 100644 --- a/tests/dag/oneof/test_input_one_of_multiple_dag.py +++ b/tests/dag/oneof/test_input_one_of_multiple_dag.py @@ -1,6 +1,5 @@ import typing as t -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -19,18 +18,18 @@ def process(self, base_num: int, other_num: int) -> dict: } -class SomeDataSource(DataSource): +class SomeDataSource(ProcessorBase): name = 'some_data_source' - def collect(self, _: Input(SomeInput)) -> int: + def process(self, _: Input(SomeInput)) -> int: return 110 -class ErrorDataSource(DataSource): +class ErrorDataSource(ProcessorBase): name = 'some_data_source_second' @guard_datasource_error() - def collect(self, _: Input(SomeInput)) -> t.Type[Exception]: + def process(self, _: Input(SomeInput)) -> t.Type[Exception]: raise Exception diff --git a/tests/dag/retry/test_dag_retry.py b/tests/dag/retry/test_dag_retry.py index 9440ed2..3846658 100644 --- a/tests/dag/retry/test_dag_retry.py +++ b/tests/dag/retry/test_dag_retry.py @@ -2,7 +2,6 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -27,10 +26,10 @@ def external_func() -> float: return 0.1 -class SomeNode(DataSource): +class SomeNode(ProcessorBase): exceptions = (BaseExecutionError,) - def collect(self) -> float: + def process(self) -> float: return ExternalDatasource().external_func() @@ -55,7 +54,7 @@ async def test_dag_retry( mocker: pytest_mock.MockerFixture, ) -> None: - collect_spy = mocker.spy(SomeNode, 'collect') + collect_spy = mocker.spy(SomeNode, 'process') external_func_patch = mocker.patch.object( ExternalDatasource, 'external_func', diff --git a/tests/dag/retry/test_dag_retry__base_error.py b/tests/dag/retry/test_dag_retry__base_error.py index ad8e9e5..f1186af 100644 --- a/tests/dag/retry/test_dag_retry__base_error.py +++ b/tests/dag/retry/test_dag_retry__base_error.py @@ -3,17 +3,16 @@ import pytest import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.types import DAGLike -class SomeNode(DataSource): +class SomeNode(ProcessorBase): exceptions = (Exception,) - def collect(self): # noqa + def process(self): # noqa raise BaseException('CustomError') @@ -37,7 +36,7 @@ async def test_dag_retry__base_error( build_dag: t.Callable[..., DAGLike], mocker: pytest_mock.MockerFixture, ) -> None: - collect_spy = mocker.spy(SomeNode, 'collect') + collect_spy = mocker.spy(SomeNode, 'process') with pytest.raises(BaseException, match='CustomError'): assert await build_dag(input_node=InvertNumber, output_node=DoubleNumber).run(pipeline_context(num=2.5)) diff --git a/tests/dag/retry/test_dag_retry__error.py b/tests/dag/retry/test_dag_retry__error.py index 9dff3c4..13c5d67 100644 --- a/tests/dag/retry/test_dag_retry__error.py +++ b/tests/dag/retry/test_dag_retry__error.py @@ -3,7 +3,6 @@ import pytest import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input @@ -16,8 +15,8 @@ def external_func() -> float: return 0.1 -class SomeNode(DataSource): - def collect(self): # noqa +class SomeNode(ProcessorBase): + def process(self): # noqa return ExternalDatasource().external_func() @@ -42,7 +41,7 @@ async def test_dag_retry__error( mocker: pytest_mock.MockerFixture, ) -> None: - collect_spy = mocker.spy(SomeNode, 'collect') + collect_spy = mocker.spy(SomeNode, 'process') external_func_patch = mocker.patch.object( ExternalDatasource, 'external_func', diff --git a/tests/test_events/test_dag.py b/tests/test_events/test_dag.py index 6a51a15..2b6ea6b 100644 --- a/tests/test_events/test_dag.py +++ b/tests/test_events/test_dag.py @@ -4,7 +4,6 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.chart import PipelineChart from ml_pipeline_engine.context.dag import DAGPipelineContext @@ -16,10 +15,10 @@ async def test_pipeline_chart_events_success(mocker: pytest_mock.MockerFixture, model_name_op: str) -> None: - class SomeDataSourceNode(DataSource): + class SomeDataSourceNode(ProcessorBase): name = 'some_datasource' - def collect(self, x: int) -> int: + def process(self, x: int) -> int: return x * -1 class SomeOutputNode(ProcessorBase): @@ -75,7 +74,7 @@ async def on_node_complete( assert on_node_start.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', } assert on_node_start.call_args_list[1].kwargs == { @@ -87,7 +86,7 @@ async def on_node_complete( assert on_node_complete.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', 'error': None, } @@ -99,10 +98,10 @@ async def on_node_complete( async def test_pipeline_chart_events_error(mocker: pytest_mock.MockerFixture, model_name_op: str) -> None: - class SomeDataSourceNode(DataSource): + class SomeDataSourceNode(ProcessorBase): name = 'some_datasource' - async def collect(self, x: int) -> int: + async def process(self, x: int) -> int: return x * -1 class SomeOutputNode(ProcessorBase): @@ -160,7 +159,7 @@ async def on_node_complete( assert on_node_start.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', } assert on_node_start.call_args_list[1].kwargs == { @@ -172,7 +171,7 @@ async def on_node_complete( assert on_node_complete.call_args_list[0].kwargs == { 'ctx': ANY, - 'node_id': 'datasource__some_datasource', + 'node_id': 'processor__some_datasource', 'error': None, } diff --git a/tests/visualization/test_visualization.py b/tests/visualization/test_visualization.py index 716f374..0eeb748 100644 --- a/tests/visualization/test_visualization.py +++ b/tests/visualization/test_visualization.py @@ -8,7 +8,6 @@ import pytest from click.testing import CliRunner -from ml_pipeline_engine.base_nodes.datasources import DataSource from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.cli import build_static from ml_pipeline_engine.dag_builders.annotation.builder import build_dag @@ -18,11 +17,11 @@ from ml_pipeline_engine.node import build_node -class InvertNumber(DataSource): +class InvertNumber(ProcessorBase): name = 'invert_number' verbose_name = 'Invert!' - def collect(self, num: float) -> float: + def process(self, num: float) -> float: return -num @@ -154,21 +153,17 @@ async def test_basic(call_func: t.Callable) -> None: 'target': 'processor__double_number', }, { - 'id': 'datasource__invert_number->processor__add_const', - 'source': 'datasource__invert_number', + 'id': 'processor__invert_number->processor__add_const', + 'source': 'processor__invert_number', 'target': 'processor__add_const', }, { - 'id': 'datasource__invert_number->processor__tests_visualization_test_visualization_SwitchNode', - 'source': 'datasource__invert_number', + 'id': 'processor__invert_number->processor__tests_visualization_test_visualization_SwitchNode', + 'source': 'processor__invert_number', 'target': 'processor__tests_visualization_test_visualization_SwitchNode', }, ], 'node_types': { - 'datasource': { - 'hex_bgr_color': None, - 'name': 'datasource', - }, 'processor': { 'hex_bgr_color': None, 'name': 'processor', @@ -188,7 +183,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L60', + 'code_source': 'tests/visualization/test_visualization.py#L59', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': None, 'verbose_name': None, @@ -200,7 +195,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L55', + 'code_source': 'tests/visualization/test_visualization.py#L54', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': None, 'verbose_name': None, @@ -212,7 +207,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L73', + 'code_source': 'tests/visualization/test_visualization.py#L72', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': None, 'verbose_name': None, @@ -224,7 +219,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L45', # Line for the real source! + 'code_source': 'tests/visualization/test_visualization.py#L44', # Line for the real source! 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'another_feature', 'verbose_name': None, @@ -236,7 +231,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L37', + 'code_source': 'tests/visualization/test_visualization.py#L36', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'double_number', 'verbose_name': 'Double!', @@ -248,7 +243,7 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L29', + 'code_source': 'tests/visualization/test_visualization.py#L28', 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'add_const', 'verbose_name': 'Add!', @@ -260,15 +255,15 @@ async def test_basic(call_func: t.Callable) -> None: }, { 'data': { - 'code_source': 'tests/visualization/test_visualization.py#L21', - 'doc': 'Базовый класс для источников данных', + 'code_source': 'tests/visualization/test_visualization.py#L20', + 'doc': 'Базовый класс для обработчиков общего назначения', 'name': 'invert_number', 'verbose_name': 'Invert!', }, - 'id': 'datasource__invert_number', + 'id': 'processor__invert_number', 'is_generic': False, 'is_virtual': False, - 'type': 'datasource', + 'type': 'processor', }, ], } From ca9567fe9443b54c26edb297d39678ec19595208 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 12 May 2024 20:31:44 +0500 Subject: [PATCH 05/12] refactor: Move base nodes to node/ folder --- ml_pipeline_engine/base_nodes/__init__.py | 0 .../{base_nodes/processors.py => node/base_nodes.py} | 0 ml_pipeline_engine/visualization/sample.py | 2 +- tests/dag/oneof/test_fail_node.py | 2 +- tests/dag/oneof/test_input_one_of_fails_dag.py | 2 +- tests/dag/oneof/test_input_one_of_first_success_dag.py | 2 +- tests/dag/oneof/test_input_one_of_last_success_dag.py | 2 +- tests/dag/oneof/test_input_one_of_multiple_dag.py | 2 +- tests/dag/recurrent_subgraph/test_inline_subgraph.py | 4 ++-- tests/dag/recurrent_subgraph/test_nested_subgraph.py | 4 ++-- tests/dag/recurrent_subgraph/test_simple_subgraph.py | 4 ++-- tests/dag/recurrent_subgraph/test_subgraph_default.py | 4 ++-- tests/dag/recurrent_subgraph/test_subgraph_default_retry.py | 4 ++-- .../recurrent_subgraph/test_subgraph_with_inside_switch.py | 2 +- .../recurrent_subgraph/test_subgraph_with_several_calls.py | 4 ++-- tests/dag/retry/test_dag_retry.py | 2 +- tests/dag/retry/test_dag_retry__base_error.py | 2 +- tests/dag/retry/test_dag_retry__error.py | 2 +- tests/dag/switch_case/test_concurrent_switch.py | 2 +- tests/dag/switch_case/test_dag_multiple_switch_case.py | 2 +- tests/dag/switch_case/test_dag_nested_switch_case.py | 2 +- tests/dag/switch_case/test_dag_switch_case.py | 2 +- tests/dag/test_dag_chain.py | 2 +- tests/dag/test_dag_rhombus.py | 2 +- tests/dag/test_dag_single.py | 2 +- tests/dag/test_demo_ml_model_dag.py | 2 +- tests/dag/test_reusable_nodes.py | 2 +- tests/tags/test_tags.py | 2 +- tests/test_chart/test_dag.py | 2 +- tests/test_events/test_dag.py | 2 +- tests/test_utils.py | 2 +- tests/visualization/test_visualization.py | 2 +- 32 files changed, 36 insertions(+), 36 deletions(-) delete mode 100644 ml_pipeline_engine/base_nodes/__init__.py rename ml_pipeline_engine/{base_nodes/processors.py => node/base_nodes.py} (100%) diff --git a/ml_pipeline_engine/base_nodes/__init__.py b/ml_pipeline_engine/base_nodes/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ml_pipeline_engine/base_nodes/processors.py b/ml_pipeline_engine/node/base_nodes.py similarity index 100% rename from ml_pipeline_engine/base_nodes/processors.py rename to ml_pipeline_engine/node/base_nodes.py diff --git a/ml_pipeline_engine/visualization/sample.py b/ml_pipeline_engine/visualization/sample.py index 04a4519..85413e9 100644 --- a/ml_pipeline_engine/visualization/sample.py +++ b/ml_pipeline_engine/visualization/sample.py @@ -1,7 +1,7 @@ -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.dag_builders.annotation import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node.base_nodes import ProcessorBase class Ident(ProcessorBase): diff --git a/tests/dag/oneof/test_fail_node.py b/tests/dag/oneof/test_fail_node.py index 365a6aa..c7b5c85 100644 --- a/tests/dag/oneof/test_fail_node.py +++ b/tests/dag/oneof/test_fail_node.py @@ -2,13 +2,13 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error from ml_pipeline_engine.node import build_node +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeLike diff --git a/tests/dag/oneof/test_input_one_of_fails_dag.py b/tests/dag/oneof/test_input_one_of_fails_dag.py index 6d196f1..6de2aa0 100644 --- a/tests/dag/oneof/test_input_one_of_fails_dag.py +++ b/tests/dag/oneof/test_input_one_of_fails_dag.py @@ -2,11 +2,11 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_input_one_of_first_success_dag.py b/tests/dag/oneof/test_input_one_of_first_success_dag.py index c544c7b..baeb2fc 100644 --- a/tests/dag/oneof/test_input_one_of_first_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_first_success_dag.py @@ -1,10 +1,10 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_input_one_of_last_success_dag.py b/tests/dag/oneof/test_input_one_of_last_success_dag.py index da27aab..e60f934 100644 --- a/tests/dag/oneof/test_input_one_of_last_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_last_success_dag.py @@ -1,10 +1,10 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_input_one_of_multiple_dag.py b/tests/dag/oneof/test_input_one_of_multiple_dag.py index e3390a7..da6ad8d 100644 --- a/tests/dag/oneof/test_input_one_of_multiple_dag.py +++ b/tests/dag/oneof/test_input_one_of_multiple_dag.py @@ -1,10 +1,10 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/recurrent_subgraph/test_inline_subgraph.py b/tests/dag/recurrent_subgraph/test_inline_subgraph.py index d29f552..8d5b86b 100644 --- a/tests/dag/recurrent_subgraph/test_inline_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_inline_subgraph.py @@ -1,10 +1,10 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_nested_subgraph.py b/tests/dag/recurrent_subgraph/test_nested_subgraph.py index a927597..d5ae258 100644 --- a/tests/dag/recurrent_subgraph/test_nested_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_nested_subgraph.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_simple_subgraph.py b/tests/dag/recurrent_subgraph/test_simple_subgraph.py index 7e2a266..d761776 100644 --- a/tests/dag/recurrent_subgraph/test_simple_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_simple_subgraph.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_default.py b/tests/dag/recurrent_subgraph/test_subgraph_default.py index 3f8ab47..10211a8 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_default.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_default.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py b/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py index 1f000bb..cbae78e 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py @@ -3,11 +3,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py b/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py index f2d6e31..fc4eb31 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py @@ -5,11 +5,11 @@ from tests.helpers import FactoryMocker from tests.helpers import call_object -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py b/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py index f37160e..9fdf8cf 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py @@ -2,11 +2,11 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase -from ml_pipeline_engine.base_nodes.processors import RecurrentProcessor from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph +from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node.base_nodes import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/retry/test_dag_retry.py b/tests/dag/retry/test_dag_retry.py index 3846658..035d71c 100644 --- a/tests/dag/retry/test_dag_retry.py +++ b/tests/dag/retry/test_dag_retry.py @@ -2,9 +2,9 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/retry/test_dag_retry__base_error.py b/tests/dag/retry/test_dag_retry__base_error.py index f1186af..fcc3b5a 100644 --- a/tests/dag/retry/test_dag_retry__base_error.py +++ b/tests/dag/retry/test_dag_retry__base_error.py @@ -3,9 +3,9 @@ import pytest import pytest_mock -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/retry/test_dag_retry__error.py b/tests/dag/retry/test_dag_retry__error.py index 13c5d67..a8a8f66 100644 --- a/tests/dag/retry/test_dag_retry__error.py +++ b/tests/dag/retry/test_dag_retry__error.py @@ -3,9 +3,9 @@ import pytest import pytest_mock -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_concurrent_switch.py b/tests/dag/switch_case/test_concurrent_switch.py index 4cad76e..49b779c 100644 --- a/tests/dag/switch_case/test_concurrent_switch.py +++ b/tests/dag/switch_case/test_concurrent_switch.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_multiple_switch_case.py b/tests/dag/switch_case/test_dag_multiple_switch_case.py index f9b592a..3d9549e 100644 --- a/tests/dag/switch_case/test_dag_multiple_switch_case.py +++ b/tests/dag/switch_case/test_dag_multiple_switch_case.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_nested_switch_case.py b/tests/dag/switch_case/test_dag_nested_switch_case.py index 9d93459..61dd071 100644 --- a/tests/dag/switch_case/test_dag_nested_switch_case.py +++ b/tests/dag/switch_case/test_dag_nested_switch_case.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_switch_case.py b/tests/dag/switch_case/test_dag_switch_case.py index 6f00d1c..9018a28 100644 --- a/tests/dag/switch_case/test_dag_switch_case.py +++ b/tests/dag/switch_case/test_dag_switch_case.py @@ -2,10 +2,10 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_chain.py b/tests/dag/test_dag_chain.py index 0ea8e00..8983be1 100644 --- a/tests/dag/test_dag_chain.py +++ b/tests/dag/test_dag_chain.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_rhombus.py b/tests/dag/test_dag_rhombus.py index 4c7c885..68b434d 100644 --- a/tests/dag/test_dag_rhombus.py +++ b/tests/dag/test_dag_rhombus.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_single.py b/tests/dag/test_dag_single.py index 514aba3..919ace0 100644 --- a/tests/dag/test_dag_single.py +++ b/tests/dag/test_dag_single.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation import build_dag_single +from ml_pipeline_engine.node.base_nodes import ProcessorBase class DoubleNumber(ProcessorBase): diff --git a/tests/dag/test_demo_ml_model_dag.py b/tests/dag/test_demo_ml_model_dag.py index b2854f6..5786976 100644 --- a/tests/dag/test_demo_ml_model_dag.py +++ b/tests/dag/test_demo_ml_model_dag.py @@ -1,8 +1,8 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_reusable_nodes.py b/tests/dag/test_reusable_nodes.py index 3f66264..528ffff 100644 --- a/tests/dag/test_reusable_nodes.py +++ b/tests/dag/test_reusable_nodes.py @@ -1,10 +1,10 @@ import typing as t -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric from ml_pipeline_engine.node import build_node +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeLike diff --git a/tests/tags/test_tags.py b/tests/tags/test_tags.py index c88cde8..aa28b37 100644 --- a/tests/tags/test_tags.py +++ b/tests/tags/test_tags.py @@ -2,9 +2,9 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.node.enums import NodeTag from ml_pipeline_engine.parallelism import processes from ml_pipeline_engine.parallelism import threads diff --git a/tests/test_chart/test_dag.py b/tests/test_chart/test_dag.py index 72b9e10..23c1e1f 100644 --- a/tests/test_chart/test_dag.py +++ b/tests/test_chart/test_dag.py @@ -1,8 +1,8 @@ import pytest -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.chart import PipelineChart from ml_pipeline_engine.dag_builders.annotation import build_dag_single +from ml_pipeline_engine.node.base_nodes import ProcessorBase async def test_pipeline_chart_run_success(model_name_op: str) -> None: diff --git a/tests/test_events/test_dag.py b/tests/test_events/test_dag.py index 2b6ea6b..59c7db1 100644 --- a/tests/test_events/test_dag.py +++ b/tests/test_events/test_dag.py @@ -4,11 +4,11 @@ import pytest_mock -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.chart import PipelineChart from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import Input +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import NodeId from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineResult diff --git a/tests/test_utils.py b/tests/test_utils.py index dab7d7f..27377fa 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,10 @@ from uuid import UUID -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.node import generate_pipeline_id from ml_pipeline_engine.node import get_node_id from ml_pipeline_engine.node import get_run_method from ml_pipeline_engine.node import run_node +from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import NodeBase diff --git a/tests/visualization/test_visualization.py b/tests/visualization/test_visualization.py index 0eeb748..6b1c8d4 100644 --- a/tests/visualization/test_visualization.py +++ b/tests/visualization/test_visualization.py @@ -8,13 +8,13 @@ import pytest from click.testing import CliRunner -from ml_pipeline_engine.base_nodes.processors import ProcessorBase from ml_pipeline_engine.cli import build_static from ml_pipeline_engine.dag_builders.annotation.builder import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import GenericInput from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase from ml_pipeline_engine.node import build_node +from ml_pipeline_engine.node.base_nodes import ProcessorBase class InvertNumber(ProcessorBase): From 527fb0b103811168f443153acf4da20b0b1ad67a Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 12 May 2024 21:21:43 +0500 Subject: [PATCH 06/12] BREAKING CHANGE: Remove NodeLike type --- ml_pipeline_engine/chart.py | 4 +-- ml_pipeline_engine/dag/dag.py | 4 +-- ml_pipeline_engine/dag/retrying.py | 4 +-- .../dag_builders/annotation/builder.py | 28 ++++++++------- .../dag_builders/annotation/marks.py | 34 +++++++++---------- ml_pipeline_engine/node/node.py | 15 ++++---- ml_pipeline_engine/types.py | 23 ++++--------- ml_pipeline_engine/visualization/dag.py | 6 ++-- tests/dag/oneof/test_fail_node.py | 6 ++-- tests/dag/test_reusable_nodes.py | 4 +-- 10 files changed, 61 insertions(+), 67 deletions(-) diff --git a/ml_pipeline_engine/chart.py b/ml_pipeline_engine/chart.py index e8c6d27..4207014 100644 --- a/ml_pipeline_engine/chart.py +++ b/ml_pipeline_engine/chart.py @@ -8,13 +8,13 @@ from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import EventManagerLike from ml_pipeline_engine.types import ModelName -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import PipelineId from ml_pipeline_engine.types import PipelineResult NodeResultT = t.TypeVar('NodeResultT') -Entrypoint = t.Optional[t.Union[NodeLike[NodeResultT], DAGLike[NodeResultT]]] +Entrypoint = t.Optional[t.Union[NodeBase[NodeResultT], DAGLike[NodeResultT]]] @dataclass(frozen=True, repr=False) diff --git a/ml_pipeline_engine/dag/dag.py b/ml_pipeline_engine/dag/dag.py index afb7bb1..6bc8e22 100644 --- a/ml_pipeline_engine/dag/dag.py +++ b/ml_pipeline_engine/dag/dag.py @@ -9,8 +9,8 @@ from ml_pipeline_engine.parallelism import threads_pool_registry from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import DAGRunManagerLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike from ml_pipeline_engine.types import NodeResultT from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import RetryPolicyLike @@ -23,7 +23,7 @@ class DAG(DAGLike): output_node: NodeId is_process_pool_needed: bool is_thread_pool_needed: bool - node_map: t.Dict[NodeId, NodeLike] + node_map: t.Dict[NodeId, NodeBase] retry_policy: t.Type[RetryPolicyLike] = DagRetryPolicy run_manager: t.Type[DAGRunManagerLike] = DAGRunConcurrentManager diff --git a/ml_pipeline_engine/dag/retrying.py b/ml_pipeline_engine/dag/retrying.py index 196f167..e43382f 100644 --- a/ml_pipeline_engine/dag/retrying.py +++ b/ml_pipeline_engine/dag/retrying.py @@ -2,13 +2,13 @@ from typing import Tuple from typing import Type -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import RetryPolicyLike @dataclass(frozen=True) class DagRetryPolicy(RetryPolicyLike): - node: NodeLike + node: NodeBase @property def delay(self) -> int: diff --git a/ml_pipeline_engine/dag_builders/annotation/builder.py b/ml_pipeline_engine/dag_builders/annotation/builder.py index 0dcde40..9ccd997 100644 --- a/ml_pipeline_engine/dag_builders/annotation/builder.py +++ b/ml_pipeline_engine/dag_builders/annotation/builder.py @@ -22,7 +22,6 @@ from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike from ml_pipeline_engine.types import RecurrentProtocol __all__ = [ @@ -40,7 +39,7 @@ class AnnotationDAGBuilder: def __init__(self) -> None: self._dag = DiGraph() - self._node_map: t.Dict[NodeId, NodeLike] = dict() + self._node_map: t.Dict[NodeId, NodeBase] = dict() self._recurrent_sub_graphs: t.List[t.Tuple[NodeId, NodeId]] = [] self._synthetic_nodes: t.List[NodeId] = [] @@ -81,7 +80,7 @@ def _check_base_class(node: t.Any) -> None: f'У объекта не существует корректного базового класса, пригодного для графа. node={node}', ) - def validate_node(self, node: NodeLike) -> None: + def validate_node(self, node: NodeBase) -> None: """ Валидация ноды по разным правилам """ @@ -90,7 +89,7 @@ def validate_node(self, node: NodeLike) -> None: self._check_annotations(node) @staticmethod - def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]: + def _get_input_marks_map(node: NodeBase) -> t.List[NodeInputSpec]: """ Получение меток зависимостей для входных kwarg-ов узла """ @@ -113,7 +112,7 @@ def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]: return inputs - def _add_node_to_map(self, node: NodeLike) -> None: + def _add_node_to_map(self, node: NodeBase) -> None: """ Добавление узла в мэппинг "Имя узла -> Класс/функция узла" """ @@ -138,7 +137,7 @@ def _add_switch_node(self, node_id: NodeId, switch_decide_node_id: NodeId) -> No self._dag.add_node(node_id, **{NodeField.is_switch: True}) self._dag.add_edge(switch_decide_node_id, node_id, **{EdgeField.is_switch: True}) - def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: NodeLike): # noqa + def _traverse_breadth_first_to_dag(self, input_node: NodeBase, output_node: NodeBase): # noqa """ Выполнить обход зависимостей классов/функций узлов, построить граф """ @@ -146,7 +145,7 @@ def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: Node visited = {output_node} stack = deque([output_node]) - def _set_visited(node: NodeLike) -> None: + def _set_visited(node: NodeBase) -> None: if node in visited: return @@ -316,7 +315,7 @@ def _is_executor_needed(self) -> t.Tuple[bool, bool]: return is_process_pool_needed, is_thread_pool_needed - def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike: + def build(self, input_node: NodeBase, output_node: NodeBase = None) -> DAGLike: """ Построить граф путем сборки зависимостей по аннотациям типа (меткам входов) """ @@ -344,8 +343,8 @@ def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike: def build_dag( - input_node: NodeLike[t.Any], - output_node: NodeLike[NodeResultT], + input_node: NodeBase[t.Any], + output_node: NodeBase[NodeResultT], ) -> DAGLike[NodeResultT]: """ Построить граф путем сборки зависимостей по аннотациям типа (меткам входов) @@ -364,7 +363,9 @@ def build_dag( ) -def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]: +def build_dag_single( + node: NodeBase[NodeResultT], +) -> DAGLike[NodeResultT]: """ Построить граф из одного узла @@ -374,4 +375,7 @@ def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]: Returns: Граф """ - return AnnotationDAGBuilder().build(input_node=node, output_node=None) + return ( + AnnotationDAGBuilder() + .build(input_node=node, output_node=None) + ) diff --git a/ml_pipeline_engine/dag_builders/annotation/marks.py b/ml_pipeline_engine/dag_builders/annotation/marks.py index 1349d5a..5ff24e5 100644 --- a/ml_pipeline_engine/dag_builders/annotation/marks.py +++ b/ml_pipeline_engine/dag_builders/annotation/marks.py @@ -2,60 +2,60 @@ from dataclasses import dataclass from ml_pipeline_engine.types import CaseLabel -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase NodeResultT = t.TypeVar('NodeResultT') @dataclass(frozen=True) class InputGenericMark: - node: NodeLike[t.Any] + node: NodeBase[t.Any] @dataclass(frozen=True) class InputMark: - node: NodeLike[t.Any] + node: NodeBase[t.Any] @dataclass(frozen=True) class InputOneOfMark: - nodes: t.List[NodeLike[t.Any]] + nodes: t.List[NodeBase[t.Any]] -def InputOneOf(nodes: t.List[NodeLike[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def InputOneOf(nodes: t.List[NodeBase[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 """ Принимает список нод, возвращает результат первой успешно выполненной ноды """ return t.cast(t.Any, InputOneOfMark(nodes)) -def InputGeneric(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def InputGeneric(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 return t.cast(t.Any, InputGenericMark(node)) -def Input(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def Input(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 return t.cast(t.Any, InputMark(node)) @dataclass(frozen=True) class GenericInputMark: - node: NodeLike[t.Any] + node: NodeBase[t.Any] -def GenericInput(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 +def GenericInput(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100 return t.cast(t.Any, GenericInputMark(node)) @dataclass(frozen=True) class SwitchCaseMark: - switch: NodeLike[t.Any] - cases: t.List[t.Tuple[str, NodeLike]] + switch: NodeBase[t.Any] + cases: t.List[t.Tuple[str, NodeBase]] name: str def SwitchCase( # noqa: N802,RUF100 - switch: NodeLike[t.Any], - cases: t.List[t.Tuple[CaseLabel, NodeLike[NodeResultT]]], + switch: NodeBase[t.Any], + cases: t.List[t.Tuple[CaseLabel, NodeBase[NodeResultT]]], name: t.Optional[str] = None, ) -> t.Type[NodeResultT]: return t.cast(t.Any, SwitchCaseMark(switch, cases, name)) @@ -63,14 +63,14 @@ def SwitchCase( # noqa: N802,RUF100 @dataclass(frozen=True) class RecurrentSubGraphMark: - start_node: NodeLike[NodeResultT] - dest_node: NodeLike[NodeResultT] + start_node: NodeBase[NodeResultT] + dest_node: NodeBase[NodeResultT] max_iterations: int def RecurrentSubGraph( # noqa: N802,RUF100 - start_node: t.Type[NodeLike[NodeResultT]], - dest_node: t.Type[NodeLike[NodeResultT]], + start_node: t.Type[NodeBase[NodeResultT]], + dest_node: t.Type[NodeBase[NodeResultT]], max_iterations: int, ) -> t.Type[NodeResultT]: """ diff --git a/ml_pipeline_engine/node/node.py b/ml_pipeline_engine/node/node.py index ded9f6b..fb787ad 100644 --- a/ml_pipeline_engine/node/node.py +++ b/ml_pipeline_engine/node/node.py @@ -14,7 +14,6 @@ from ml_pipeline_engine.parallelism import threads_pool_registry from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike NodeResultT = t.TypeVar('NodeResultT') @@ -27,7 +26,7 @@ def generate_node_id(prefix: str, name: t.Optional[str] = None) -> str: return f'{prefix}__{name if name is not None else uuid.uuid4().hex[-8:]}' -def get_node_id(node: NodeLike) -> NodeId: +def get_node_id(node: NodeBase) -> NodeId: node_type = node.node_type if getattr(node, 'node_type', None) else 'node' if getattr(node, 'name', None): @@ -38,12 +37,12 @@ def get_node_id(node: NodeLike) -> NodeId: return '__'.join([node_type, node_name]) -def get_run_method(node: NodeLike) -> t.Optional[str]: +def get_run_method(node: NodeBase) -> t.Optional[str]: run_method = NodeBase.RUN_METHOD_ALIAS return run_method if callable(getattr(node, run_method, None)) else None -def get_callable_run_method(node: NodeLike) -> t.Callable: +def get_callable_run_method(node: NodeBase) -> t.Callable: run_method_name = get_run_method(node) if run_method_name is not None: @@ -53,14 +52,14 @@ def get_callable_run_method(node: NodeLike) -> t.Callable: return node -def run_node_default(node: NodeLike[NodeResultT], **kwargs: t.Any) -> t.Type[NodeResultT]: +def run_node_default(node: NodeBase[NodeResultT], **kwargs: t.Any) -> t.Type[NodeResultT]: """ Запуск получения дефолтного значения узла """ return get_instance(node).get_default(**kwargs) -async def run_node(node: NodeLike[NodeResultT], *args: t.Any, **kwargs: t.Any) -> t.Type[NodeResultT]: +async def run_node(node: NodeBase[NodeResultT], *args: t.Any, **kwargs: t.Any) -> t.Type[NodeResultT]: """ Функция для запуска узла. Запуск учитывает наличие тега для декларирования запуска узлов. @@ -98,13 +97,13 @@ async def run_node(node: NodeLike[NodeResultT], *args: t.Any, **kwargs: t.Any) - def build_node( - node: NodeLike, + node: NodeBase, node_name: t.Optional[str] = None, class_name: t.Optional[str] = None, atts: t.Optional[t.Dict[str, t.Any]] = None, dependencies_default: t.Optional[t.Dict[str, t.Any]] = None, **target_dependencies: t.Any, -) -> t.Type[NodeLike]: +) -> t.Type[NodeBase]: """ Функция создает новый узел графа на основе generic-узлов. НЕ generic узел отличается тем, что целевой метод начинает зависеть от конкретных узлов diff --git a/ml_pipeline_engine/types.py b/ml_pipeline_engine/types.py index 10e2219..c180702 100644 --- a/ml_pipeline_engine/types.py +++ b/ml_pipeline_engine/types.py @@ -5,8 +5,6 @@ import networkx as nx -ProcessorResultT = t.TypeVar('ProcessorResultT') - NodeResultT = t.TypeVar('NodeResultT') AdditionalDataT = t.TypeVar('AdditionalDataT', bound=t.Any) @@ -72,24 +70,17 @@ class NodeProtocol(t.Protocol): verbose_name: t.ClassVar[str] = None -class NodeBase(NodeProtocol, RetryProtocol, TagProtocol): - pass - - -class ProcessorLike(RetryProtocol, TagProtocol, t.Protocol[ProcessorResultT]): +class NodeBase(NodeProtocol, RetryProtocol, TagProtocol, t.Protocol[NodeResultT]): """ - Узел общего назначения + Basic node interface """ process: t.Union[ - t.Callable[..., ProcessorResultT], - t.Callable[..., t.Awaitable[ProcessorResultT]], + t.Callable[..., NodeResultT], + t.Callable[..., t.Awaitable[NodeResultT]], ] -NodeLike = t.Type[ProcessorLike[NodeResultT]] - - @dataclass(frozen=True) class PipelineResult(t.Generic[NodeResultT]): """ @@ -123,7 +114,7 @@ class PipelineChartLike(t.Protocol[NodeResultT]): """ model_name: ModelName - entrypoint: t.Optional[t.Union[NodeLike[NodeResultT], 'DAGLike[NodeResultT]']] + entrypoint: t.Optional[t.Union[NodeBase[NodeResultT], 'DAGLike[NodeResultT]']] event_managers: t.List[t.Type['EventManagerLike']] artifact_store: t.Optional[t.Type['ArtifactStoreLike']] @@ -226,7 +217,7 @@ def exists(self, node_id: NodeId) -> bool: class RetryPolicyLike(t.Protocol): - node: NodeLike + node: NodeBase @property @abc.abstractmethod @@ -373,7 +364,7 @@ class DAGLike(t.Protocol[NodeResultT]): graph: nx.DiGraph input_node: NodeId output_node: NodeId - node_map: t.Dict[NodeId, NodeLike] + node_map: t.Dict[NodeId, NodeBase] run_manager: DAGRunManagerLike retry_policy: RetryPolicyLike is_process_pool_needed: bool diff --git a/ml_pipeline_engine/visualization/dag.py b/ml_pipeline_engine/visualization/dag.py index d5924d9..3a4b494 100644 --- a/ml_pipeline_engine/visualization/dag.py +++ b/ml_pipeline_engine/visualization/dag.py @@ -8,8 +8,8 @@ from ml_pipeline_engine.node import get_callable_run_method from ml_pipeline_engine.node.enums import NodeType from ml_pipeline_engine.types import DAGLike +from ml_pipeline_engine.types import NodeBase from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import NodeLike from ml_pipeline_engine.visualization import schema from ml_pipeline_engine.visualization.utils import copy_resources @@ -23,14 +23,14 @@ class GraphConfigImpl: def __init__(self, dag: DAGLike) -> None: self._dag = dag - def _get_node(self, node_id: NodeId) -> t.Type[NodeLike]: + def _get_node(self, node_id: NodeId) -> t.Type[NodeBase]: """ Get a node object. Sometimes it can be None if we work with an artificial node """ return self._dag.node_map.get(node_id) @staticmethod - def _get_node_relative_path(node: t.Type[NodeLike]) -> str: + def _get_node_relative_path(node: t.Type[NodeBase]) -> str: """ Generate relative path for a node """ diff --git a/tests/dag/oneof/test_fail_node.py b/tests/dag/oneof/test_fail_node.py index c7b5c85..6230da4 100644 --- a/tests/dag/oneof/test_fail_node.py +++ b/tests/dag/oneof/test_fail_node.py @@ -10,7 +10,7 @@ from ml_pipeline_engine.node import build_node from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase class SomeInput(ProcessorBase): @@ -24,7 +24,7 @@ class FlDataSourceGeneric(ProcessorBase): name = 'source' @guard_datasource_error() - def process(self, _: InputGeneric(NodeLike)) -> t.Type[Exception]: + def process(self, _: InputGeneric(NodeBase)) -> t.Type[Exception]: raise Exception @@ -37,7 +37,7 @@ def process(self, _: InputGeneric(NodeLike)) -> t.Type[Exception]: class SomeFeatureGeneric(ProcessorBase): name = 'feature' - def process(self, fl_credit_history: InputGeneric(NodeLike)) -> int: + def process(self, fl_credit_history: InputGeneric(NodeBase)) -> int: # Не должно запускаться, так как fl_credit_history будет заполнено ошибкой. # Как следствие, эта нода обязана быть прощенной return len(fl_credit_history) diff --git a/tests/dag/test_reusable_nodes.py b/tests/dag/test_reusable_nodes.py index 528ffff..8e83c11 100644 --- a/tests/dag/test_reusable_nodes.py +++ b/tests/dag/test_reusable_nodes.py @@ -6,7 +6,7 @@ from ml_pipeline_engine.node import build_node from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike -from ml_pipeline_engine.types import NodeLike +from ml_pipeline_engine.types import NodeBase class SomeInput(ProcessorBase): @@ -37,7 +37,7 @@ def process(self, ds_value: Input(SomeDataSource), inp: Input(SomeInput)) -> int class GenericVectorizer(ProcessorBase): name = 'some_vectorizer' - async def process(self, feature_value: InputGeneric(NodeLike), const: int) -> int: + async def process(self, feature_value: InputGeneric(NodeBase), const: int) -> int: return feature_value + 20 + const From 9b2d7951a5855248f054ef3a414f77f622316036 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 12 May 2024 21:37:56 +0500 Subject: [PATCH 07/12] chore: Update imports for ProcessorBase --- ml_pipeline_engine/node/__init__.py | 1 + ml_pipeline_engine/node/base_nodes.py | 3 ++- ml_pipeline_engine/visualization/sample.py | 2 +- tests/dag/oneof/test_fail_node.py | 2 +- tests/dag/oneof/test_input_one_of_fails_dag.py | 2 +- tests/dag/oneof/test_input_one_of_first_success_dag.py | 2 +- tests/dag/oneof/test_input_one_of_last_success_dag.py | 2 +- tests/dag/oneof/test_input_one_of_multiple_dag.py | 2 +- tests/dag/recurrent_subgraph/test_inline_subgraph.py | 4 ++-- tests/dag/recurrent_subgraph/test_nested_subgraph.py | 4 ++-- tests/dag/recurrent_subgraph/test_simple_subgraph.py | 4 ++-- tests/dag/recurrent_subgraph/test_subgraph_default.py | 4 ++-- tests/dag/recurrent_subgraph/test_subgraph_default_retry.py | 4 ++-- .../recurrent_subgraph/test_subgraph_with_inside_switch.py | 2 +- .../recurrent_subgraph/test_subgraph_with_several_calls.py | 4 ++-- tests/dag/retry/test_dag_retry.py | 2 +- tests/dag/retry/test_dag_retry__base_error.py | 2 +- tests/dag/retry/test_dag_retry__error.py | 2 +- tests/dag/switch_case/test_concurrent_switch.py | 2 +- tests/dag/switch_case/test_dag_multiple_switch_case.py | 2 +- tests/dag/switch_case/test_dag_nested_switch_case.py | 2 +- tests/dag/switch_case/test_dag_switch_case.py | 2 +- tests/dag/test_dag_chain.py | 2 +- tests/dag/test_dag_rhombus.py | 2 +- tests/dag/test_dag_single.py | 2 +- tests/dag/test_demo_ml_model_dag.py | 2 +- tests/dag/test_reusable_nodes.py | 2 +- tests/tags/test_tags.py | 2 +- tests/test_chart/test_dag.py | 2 +- tests/test_events/test_dag.py | 2 +- tests/test_utils.py | 2 +- tests/visualization/test_visualization.py | 2 +- 32 files changed, 39 insertions(+), 37 deletions(-) diff --git a/ml_pipeline_engine/node/__init__.py b/ml_pipeline_engine/node/__init__.py index aa3c5ca..29919c2 100644 --- a/ml_pipeline_engine/node/__init__.py +++ b/ml_pipeline_engine/node/__init__.py @@ -1,3 +1,4 @@ +from ml_pipeline_engine.node.base_nodes import * # noqa from ml_pipeline_engine.node.enums import * # noqa from ml_pipeline_engine.node.errors import * # noqa from ml_pipeline_engine.node.node import * # noqa diff --git a/ml_pipeline_engine/node/base_nodes.py b/ml_pipeline_engine/node/base_nodes.py index 7efa2c5..6ee74ed 100644 --- a/ml_pipeline_engine/node/base_nodes.py +++ b/ml_pipeline_engine/node/base_nodes.py @@ -3,6 +3,7 @@ from ml_pipeline_engine.node.enums import NodeType from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import NodeBase +from ml_pipeline_engine.types import NodeResultT from ml_pipeline_engine.types import Recurrent from ml_pipeline_engine.types import RecurrentProtocol @@ -14,7 +15,7 @@ class ProcessorBase(NodeBase): node_type = NodeType.processor.value - def process(self, *args: t.Any, **kwargs: t.Any) -> t.Any: + def process(self, *args: t.Any, **kwargs: t.Any) -> NodeResultT: raise NotImplementedError('Method process() is not implemented') diff --git a/ml_pipeline_engine/visualization/sample.py b/ml_pipeline_engine/visualization/sample.py index 85413e9..22e8a70 100644 --- a/ml_pipeline_engine/visualization/sample.py +++ b/ml_pipeline_engine/visualization/sample.py @@ -1,7 +1,7 @@ from ml_pipeline_engine.dag_builders.annotation import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase class Ident(ProcessorBase): diff --git a/tests/dag/oneof/test_fail_node.py b/tests/dag/oneof/test_fail_node.py index 6230da4..6eaf88d 100644 --- a/tests/dag/oneof/test_fail_node.py +++ b/tests/dag/oneof/test_fail_node.py @@ -7,8 +7,8 @@ from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import build_node -from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase diff --git a/tests/dag/oneof/test_input_one_of_fails_dag.py b/tests/dag/oneof/test_input_one_of_fails_dag.py index 6de2aa0..6112ff2 100644 --- a/tests/dag/oneof/test_input_one_of_fails_dag.py +++ b/tests/dag/oneof/test_input_one_of_fails_dag.py @@ -6,7 +6,7 @@ from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_input_one_of_first_success_dag.py b/tests/dag/oneof/test_input_one_of_first_success_dag.py index baeb2fc..748c43e 100644 --- a/tests/dag/oneof/test_input_one_of_first_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_first_success_dag.py @@ -4,7 +4,7 @@ from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_input_one_of_last_success_dag.py b/tests/dag/oneof/test_input_one_of_last_success_dag.py index e60f934..f4bac45 100644 --- a/tests/dag/oneof/test_input_one_of_last_success_dag.py +++ b/tests/dag/oneof/test_input_one_of_last_success_dag.py @@ -4,7 +4,7 @@ from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/oneof/test_input_one_of_multiple_dag.py b/tests/dag/oneof/test_input_one_of_multiple_dag.py index da6ad8d..d24954d 100644 --- a/tests/dag/oneof/test_input_one_of_multiple_dag.py +++ b/tests/dag/oneof/test_input_one_of_multiple_dag.py @@ -4,7 +4,7 @@ from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputOneOf from ml_pipeline_engine.decorators import guard_datasource_error -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/recurrent_subgraph/test_inline_subgraph.py b/tests/dag/recurrent_subgraph/test_inline_subgraph.py index 8d5b86b..a8dd51e 100644 --- a/tests/dag/recurrent_subgraph/test_inline_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_inline_subgraph.py @@ -3,8 +3,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph -from ml_pipeline_engine.node.base_nodes import ProcessorBase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_nested_subgraph.py b/tests/dag/recurrent_subgraph/test_nested_subgraph.py index d5ae258..ebb1bc1 100644 --- a/tests/dag/recurrent_subgraph/test_nested_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_nested_subgraph.py @@ -6,8 +6,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph -from ml_pipeline_engine.node.base_nodes import ProcessorBase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_simple_subgraph.py b/tests/dag/recurrent_subgraph/test_simple_subgraph.py index d761776..fd737f9 100644 --- a/tests/dag/recurrent_subgraph/test_simple_subgraph.py +++ b/tests/dag/recurrent_subgraph/test_simple_subgraph.py @@ -6,8 +6,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph -from ml_pipeline_engine.node.base_nodes import ProcessorBase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_default.py b/tests/dag/recurrent_subgraph/test_subgraph_default.py index 10211a8..10c44f0 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_default.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_default.py @@ -6,8 +6,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph -from ml_pipeline_engine.node.base_nodes import ProcessorBase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py b/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py index cbae78e..ecc3ff6 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_default_retry.py @@ -6,8 +6,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph -from ml_pipeline_engine.node.base_nodes import ProcessorBase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py b/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py index fc4eb31..898b489 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_inside_switch.py @@ -9,7 +9,7 @@ from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py b/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py index 9fdf8cf..ceb5e5c 100644 --- a/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py +++ b/tests/dag/recurrent_subgraph/test_subgraph_with_several_calls.py @@ -5,8 +5,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import RecurrentSubGraph -from ml_pipeline_engine.node.base_nodes import ProcessorBase -from ml_pipeline_engine.node.base_nodes import RecurrentProcessor +from ml_pipeline_engine.node import ProcessorBase +from ml_pipeline_engine.node import RecurrentProcessor from ml_pipeline_engine.types import AdditionalDataT from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import Recurrent diff --git a/tests/dag/retry/test_dag_retry.py b/tests/dag/retry/test_dag_retry.py index 035d71c..e2fd72d 100644 --- a/tests/dag/retry/test_dag_retry.py +++ b/tests/dag/retry/test_dag_retry.py @@ -4,7 +4,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/retry/test_dag_retry__base_error.py b/tests/dag/retry/test_dag_retry__base_error.py index fcc3b5a..a8eba9a 100644 --- a/tests/dag/retry/test_dag_retry__base_error.py +++ b/tests/dag/retry/test_dag_retry__base_error.py @@ -5,7 +5,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/retry/test_dag_retry__error.py b/tests/dag/retry/test_dag_retry__error.py index a8a8f66..8177baf 100644 --- a/tests/dag/retry/test_dag_retry__error.py +++ b/tests/dag/retry/test_dag_retry__error.py @@ -5,7 +5,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_concurrent_switch.py b/tests/dag/switch_case/test_concurrent_switch.py index 49b779c..0507515 100644 --- a/tests/dag/switch_case/test_concurrent_switch.py +++ b/tests/dag/switch_case/test_concurrent_switch.py @@ -5,7 +5,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_multiple_switch_case.py b/tests/dag/switch_case/test_dag_multiple_switch_case.py index 3d9549e..80d2a9c 100644 --- a/tests/dag/switch_case/test_dag_multiple_switch_case.py +++ b/tests/dag/switch_case/test_dag_multiple_switch_case.py @@ -5,7 +5,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_nested_switch_case.py b/tests/dag/switch_case/test_dag_nested_switch_case.py index 61dd071..f8917c1 100644 --- a/tests/dag/switch_case/test_dag_nested_switch_case.py +++ b/tests/dag/switch_case/test_dag_nested_switch_case.py @@ -5,7 +5,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/switch_case/test_dag_switch_case.py b/tests/dag/switch_case/test_dag_switch_case.py index 9018a28..57e6658 100644 --- a/tests/dag/switch_case/test_dag_switch_case.py +++ b/tests/dag/switch_case/test_dag_switch_case.py @@ -5,7 +5,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_chain.py b/tests/dag/test_dag_chain.py index 8983be1..1685c84 100644 --- a/tests/dag/test_dag_chain.py +++ b/tests/dag/test_dag_chain.py @@ -2,7 +2,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_rhombus.py b/tests/dag/test_dag_rhombus.py index 68b434d..c856a65 100644 --- a/tests/dag/test_dag_rhombus.py +++ b/tests/dag/test_dag_rhombus.py @@ -2,7 +2,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_dag_single.py b/tests/dag/test_dag_single.py index 919ace0..79374d2 100644 --- a/tests/dag/test_dag_single.py +++ b/tests/dag/test_dag_single.py @@ -2,7 +2,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation import build_dag_single -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase class DoubleNumber(ProcessorBase): diff --git a/tests/dag/test_demo_ml_model_dag.py b/tests/dag/test_demo_ml_model_dag.py index 5786976..b3f2a0f 100644 --- a/tests/dag/test_demo_ml_model_dag.py +++ b/tests/dag/test_demo_ml_model_dag.py @@ -2,7 +2,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import DAGLike diff --git a/tests/dag/test_reusable_nodes.py b/tests/dag/test_reusable_nodes.py index 8e83c11..7f09187 100644 --- a/tests/dag/test_reusable_nodes.py +++ b/tests/dag/test_reusable_nodes.py @@ -3,8 +3,8 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import InputGeneric +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import build_node -from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import DAGLike from ml_pipeline_engine.types import NodeBase diff --git a/tests/tags/test_tags.py b/tests/tags/test_tags.py index aa28b37..f127d5e 100644 --- a/tests/tags/test_tags.py +++ b/tests/tags/test_tags.py @@ -4,7 +4,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node.enums import NodeTag from ml_pipeline_engine.parallelism import processes from ml_pipeline_engine.parallelism import threads diff --git a/tests/test_chart/test_dag.py b/tests/test_chart/test_dag.py index 23c1e1f..70fc576 100644 --- a/tests/test_chart/test_dag.py +++ b/tests/test_chart/test_dag.py @@ -2,7 +2,7 @@ from ml_pipeline_engine.chart import PipelineChart from ml_pipeline_engine.dag_builders.annotation import build_dag_single -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase async def test_pipeline_chart_run_success(model_name_op: str) -> None: diff --git a/tests/test_events/test_dag.py b/tests/test_events/test_dag.py index 59c7db1..3d3e62e 100644 --- a/tests/test_events/test_dag.py +++ b/tests/test_events/test_dag.py @@ -8,7 +8,7 @@ from ml_pipeline_engine.context.dag import DAGPipelineContext from ml_pipeline_engine.dag_builders.annotation import build_dag from ml_pipeline_engine.dag_builders.annotation.marks import Input -from ml_pipeline_engine.node.base_nodes import ProcessorBase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.types import NodeId from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineResult diff --git a/tests/test_utils.py b/tests/test_utils.py index 27377fa..97d6b6c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,10 @@ from uuid import UUID +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import generate_pipeline_id from ml_pipeline_engine.node import get_node_id from ml_pipeline_engine.node import get_run_method from ml_pipeline_engine.node import run_node -from ml_pipeline_engine.node.base_nodes import ProcessorBase from ml_pipeline_engine.types import NodeBase diff --git a/tests/visualization/test_visualization.py b/tests/visualization/test_visualization.py index 6b1c8d4..fe83754 100644 --- a/tests/visualization/test_visualization.py +++ b/tests/visualization/test_visualization.py @@ -13,8 +13,8 @@ from ml_pipeline_engine.dag_builders.annotation.marks import GenericInput from ml_pipeline_engine.dag_builders.annotation.marks import Input from ml_pipeline_engine.dag_builders.annotation.marks import SwitchCase +from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import build_node -from ml_pipeline_engine.node.base_nodes import ProcessorBase class InvertNumber(ProcessorBase): From 9895d5d46d4570856a0a9b4fff12168b90745555 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 12 May 2024 21:45:40 +0500 Subject: [PATCH 08/12] BREAKING CHANGE: Remove NodeBase.title attribute --- ml_pipeline_engine/types.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ml_pipeline_engine/types.py b/ml_pipeline_engine/types.py index c180702..d073aa6 100644 --- a/ml_pipeline_engine/types.py +++ b/ml_pipeline_engine/types.py @@ -66,7 +66,6 @@ class NodeProtocol(t.Protocol): node_type: t.ClassVar[str] = None name: t.ClassVar[str] = None - title: t.ClassVar[str] = None # TODO: Remove it in the future verbose_name: t.ClassVar[str] = None From a0c5255efd4e22b9e2d97dc073176f0a0e0d0cfa Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Mon, 10 Jun 2024 23:54:45 +0500 Subject: [PATCH 09/12] fix: Remove NodeProtocol and EventManagerBase --- ml_pipeline_engine/chart.py | 3 ++- ml_pipeline_engine/context/dag.py | 3 ++- ml_pipeline_engine/events.py | 15 --------------- ml_pipeline_engine/types.py | 11 ++--------- 4 files changed, 6 insertions(+), 26 deletions(-) diff --git a/ml_pipeline_engine/chart.py b/ml_pipeline_engine/chart.py index 4207014..18b56c6 100644 --- a/ml_pipeline_engine/chart.py +++ b/ml_pipeline_engine/chart.py @@ -9,6 +9,7 @@ from ml_pipeline_engine.types import EventManagerLike from ml_pipeline_engine.types import ModelName from ml_pipeline_engine.types import NodeBase +from ml_pipeline_engine.types import PipelineChartLike from ml_pipeline_engine.types import PipelineId from ml_pipeline_engine.types import PipelineResult @@ -30,7 +31,7 @@ class PipelineChartBase: @dataclass(frozen=True, repr=False) -class PipelineChart(PipelineChartBase): +class PipelineChart(PipelineChartBase, PipelineChartLike): """ Основная реализация определения пайплайна ML-модели """ diff --git a/ml_pipeline_engine/context/dag.py b/ml_pipeline_engine/context/dag.py index 70131b3..bc7969b 100644 --- a/ml_pipeline_engine/context/dag.py +++ b/ml_pipeline_engine/context/dag.py @@ -9,10 +9,11 @@ from ml_pipeline_engine.types import ModelName from ml_pipeline_engine.types import NodeId from ml_pipeline_engine.types import PipelineChartLike +from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineId -class DAGPipelineContext(EventSourceMixin): +class DAGPipelineContext(EventSourceMixin, PipelineContextLike): """ Контекст выполнения пайплайна ML-модели """ diff --git a/ml_pipeline_engine/events.py b/ml_pipeline_engine/events.py index 8d80f6a..28ec124 100644 --- a/ml_pipeline_engine/events.py +++ b/ml_pipeline_engine/events.py @@ -2,24 +2,9 @@ from ml_pipeline_engine.types import EventManagerLike from ml_pipeline_engine.types import NodeId -from ml_pipeline_engine.types import PipelineContextLike from ml_pipeline_engine.types import PipelineResult -class EventManagerBase: - async def on_pipeline_start(self, ctx: PipelineContextLike) -> None: - ... - - async def on_pipeline_complete(self, ctx: PipelineContextLike, result: PipelineResult) -> None: - ... - - async def on_node_start(self, ctx: PipelineContextLike, node_id: NodeId) -> None: - ... - - async def on_node_complete(self, ctx: PipelineContextLike, node_id: NodeId, error: t.Optional[Exception]) -> None: - ... - - class EventSourceMixin: _get_event_managers: t.Callable[..., t.List[t.Type[EventManagerLike]]] diff --git a/ml_pipeline_engine/types.py b/ml_pipeline_engine/types.py index eb201a1..402c511 100644 --- a/ml_pipeline_engine/types.py +++ b/ml_pipeline_engine/types.py @@ -57,23 +57,16 @@ def next_iteration(self, data: t.Optional[AdditionalDataT]) -> Recurrent: """ -class NodeProtocol(t.Protocol): +class NodeBase(RetryProtocol, TagProtocol, t.Protocol[NodeResultT]): """ - Узел графа модели + Basic node interface """ - RUN_METHOD_ALIAS = 'process' node_type: t.ClassVar[str] = None name: t.ClassVar[str] = None verbose_name: t.ClassVar[str] = None - -class NodeBase(NodeProtocol, RetryProtocol, TagProtocol, t.Protocol[NodeResultT]): - """ - Basic node interface - """ - process: t.Union[ t.Callable[..., NodeResultT], t.Callable[..., t.Awaitable[NodeResultT]], From 8ffd3fe77182c554a7f7572d1e02dae841b217a1 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Mon, 17 Jun 2024 16:15:55 +0500 Subject: [PATCH 10/12] fix: Remove get_run_method method --- ml_pipeline_engine/node/node.py | 17 ++++++----------- tests/test_utils.py | 2 -- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/ml_pipeline_engine/node/node.py b/ml_pipeline_engine/node/node.py index 1ff8516..565db08 100644 --- a/ml_pipeline_engine/node/node.py +++ b/ml_pipeline_engine/node/node.py @@ -38,17 +38,12 @@ def get_node_id(node: NodeBase) -> NodeId: return '__'.join([node_type, node_name]) -def get_run_method(node: NodeBase) -> t.Optional[str]: - run_method = NodeBase.RUN_METHOD_ALIAS - return run_method if callable(getattr(node, run_method, None)) else None - - def get_callable_run_method(node: NodeBase) -> t.Callable: - run_method_name = get_run_method(node) + run_method = NodeBase.RUN_METHOD_ALIAS - if run_method_name is not None: + if callable(getattr(node, run_method, None)): node = get_instance(node) - return getattr(node, run_method_name) + return getattr(node, run_method) return node @@ -123,10 +118,10 @@ def build_node( if not inspect.isclass(node): raise ClassExpectedError('Для создания узла ожидается объекта класса') - run_method = get_run_method(node) - if not run_method: + run_method = NodeBase.RUN_METHOD_ALIAS + if not callable(getattr(node, run_method, None)): raise RunMethodExpectedError( - f'Missing method for node execution. Expected name={NodeBase.RUN_METHOD_ALIAS}', + f'Missing method for node execution. Expected name={run_method}', ) if inspect.iscoroutinefunction(getattr(node, run_method)): diff --git a/tests/test_utils.py b/tests/test_utils.py index 21cc993..f084f1a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,7 +3,6 @@ from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import generate_pipeline_id from ml_pipeline_engine.node import get_node_id -from ml_pipeline_engine.node import get_run_method from ml_pipeline_engine.node import run_node @@ -26,5 +25,4 @@ class SomeNode(ProcessorBase): def process(x: int) -> int: return x - assert get_run_method(SomeNode) == 'process' assert await run_node(SomeNode, x=10, node_id='an_example') == 10 From 4d32382edc56bab14a3b61af2649e379eef2c601 Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Sun, 18 Aug 2024 13:04:45 +0500 Subject: [PATCH 11/12] fix: get_callable_run_method method raises exception --- .../dag_builders/annotation/builder.py | 11 ++++----- ml_pipeline_engine/node/node.py | 8 +++---- tests/test_utils.py | 23 +++++++++++++++++++ 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/ml_pipeline_engine/dag_builders/annotation/builder.py b/ml_pipeline_engine/dag_builders/annotation/builder.py index 469a34e..fd68f52 100644 --- a/ml_pipeline_engine/dag_builders/annotation/builder.py +++ b/ml_pipeline_engine/dag_builders/annotation/builder.py @@ -48,22 +48,21 @@ def _check_annotations(obj: t.Any) -> None: Проверка наличия аннотаций типов у переданного объекта. В случае, если есть хотя бы один не типизированный параметр, будет ошибка. """ + run_method = get_callable_run_method(obj) - obj = get_callable_run_method(obj) - - annotations = getattr(obj, '__annotations__', None) + annotations = getattr(run_method, '__annotations__', None) parameters = [ (name, bool(parameter.empty)) - for name, parameter in inspect.signature(obj).parameters.items() + for name, parameter in inspect.signature(run_method).parameters.items() if name not in ('self', 'args', 'kwargs') ] if not annotations and parameters: - raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={obj}') + raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={run_method}') for name, is_empty in parameters: if is_empty and name not in annotations: - raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={obj}') + raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={run_method}') @staticmethod def _check_base_class(node: t.Any) -> None: diff --git a/ml_pipeline_engine/node/node.py b/ml_pipeline_engine/node/node.py index 565db08..43ed3c0 100644 --- a/ml_pipeline_engine/node/node.py +++ b/ml_pipeline_engine/node/node.py @@ -41,11 +41,11 @@ def get_node_id(node: NodeBase) -> NodeId: def get_callable_run_method(node: NodeBase) -> t.Callable: run_method = NodeBase.RUN_METHOD_ALIAS - if callable(getattr(node, run_method, None)): - node = get_instance(node) - return getattr(node, run_method) + if not callable(getattr(node, run_method, None)): + raise RunMethodExpectedError(f'Missing method for node execution. Expected name={run_method}') - return node + node = get_instance(node) + return getattr(node, run_method) def run_node_default(node: NodeBase[NodeResultT], **kwargs: t.Any) -> t.Type[NodeResultT]: diff --git a/tests/test_utils.py b/tests/test_utils.py index f084f1a..454686a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,9 +1,15 @@ +import typing as t from uuid import UUID +import pytest + from ml_pipeline_engine.node import ProcessorBase from ml_pipeline_engine.node import generate_pipeline_id from ml_pipeline_engine.node import get_node_id from ml_pipeline_engine.node import run_node +from ml_pipeline_engine.node.errors import RunMethodExpectedError +from ml_pipeline_engine.types import DAGLike +from ml_pipeline_engine.types import NodeBase def test_generate_pipeline_id() -> None: @@ -26,3 +32,20 @@ def process(x: int) -> int: return x assert await run_node(SomeNode, x=10, node_id='an_example') == 10 + + +async def test_build_graph__error_no_process_method( + build_dag: t.Callable[..., DAGLike], +) -> None: + class SomeNode(NodeBase): + @staticmethod + def process(x: int) -> int: + return x + + class AnotherNode(NodeBase): + @staticmethod + def not_process(x: int) -> int: + return x + + with pytest.raises(RunMethodExpectedError): + build_dag(input_node=SomeNode, output_node=AnotherNode) From edd533b8c0fdfbfb6a85a7946d2a2f3c0f96a96d Mon Sep 17 00:00:00 2001 From: Evgeniia Lukmanova Date: Wed, 11 Sep 2024 10:42:29 +0500 Subject: [PATCH 12/12] fix: Remove RUN_METHOD_ALIAS constant --- ml_pipeline_engine/node/node.py | 27 +++++++++++---------------- ml_pipeline_engine/types.py | 2 -- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/ml_pipeline_engine/node/node.py b/ml_pipeline_engine/node/node.py index 43ed3c0..20dda3d 100644 --- a/ml_pipeline_engine/node/node.py +++ b/ml_pipeline_engine/node/node.py @@ -39,13 +39,11 @@ def get_node_id(node: NodeBase) -> NodeId: def get_callable_run_method(node: NodeBase) -> t.Callable: - run_method = NodeBase.RUN_METHOD_ALIAS - - if not callable(getattr(node, run_method, None)): - raise RunMethodExpectedError(f'Missing method for node execution. Expected name={run_method}') + if not callable(getattr(node, 'process', None)): + raise RunMethodExpectedError('Missing method for node execution') node = get_instance(node) - return getattr(node, run_method) + return node.process def run_node_default(node: NodeBase[NodeResultT], **kwargs: t.Any) -> t.Type[NodeResultT]: @@ -118,20 +116,17 @@ def build_node( if not inspect.isclass(node): raise ClassExpectedError('Для создания узла ожидается объекта класса') - run_method = NodeBase.RUN_METHOD_ALIAS - if not callable(getattr(node, run_method, None)): - raise RunMethodExpectedError( - f'Missing method for node execution. Expected name={run_method}', - ) - - if inspect.iscoroutinefunction(getattr(node, run_method)): + process_method = getattr(node, 'process', None) + if not callable(process_method): + raise RunMethodExpectedError('Missing method for node execution') + if inspect.iscoroutinefunction(process_method): async def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: - return await getattr(node, run_method)(*args, **kwargs, **(dependencies_default or {})) + return await process_method(*args, **kwargs, **(dependencies_default or {})) else: def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: - return getattr(node, run_method)(*args, **kwargs, **(dependencies_default or {})) + return process_method(*args, **kwargs, **(dependencies_default or {})) class_name = class_name or f'Generic{node.__name__}' created_node = type( @@ -139,7 +134,7 @@ def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: (node,), { # Меняем на lambda-функцию, чтобы убить ссылку на метод родительского класса. - run_method: class_method, + 'process': class_method, '__module__': __name__, '__generic_class__': node, 'name': node_name or node.name, @@ -147,7 +142,7 @@ def class_method(*args: t.Any, **kwargs: t.Any) -> t.Any: }, ) - method = getattr(created_node, run_method) + method = created_node.process method.__annotations__.update(target_dependencies) globals()[class_name] = created_node diff --git a/ml_pipeline_engine/types.py b/ml_pipeline_engine/types.py index 402c511..ff661fa 100644 --- a/ml_pipeline_engine/types.py +++ b/ml_pipeline_engine/types.py @@ -61,8 +61,6 @@ class NodeBase(RetryProtocol, TagProtocol, t.Protocol[NodeResultT]): """ Basic node interface """ - RUN_METHOD_ALIAS = 'process' - node_type: t.ClassVar[str] = None name: t.ClassVar[str] = None verbose_name: t.ClassVar[str] = None