@@ -909,6 +909,48 @@ def my_pipeline() -> NamedTuple('Outputs', [
909
909
]):
910
910
task = print_and_return (text = 'Hello' )
911
911
912
+ def test_pipeline_reusing_other_pipeline_multiple_times (self ):
913
+
914
+ @dsl .pipeline ()
915
+ def reusable_pipeline ():
916
+ print_op (message = 'Reused pipeline' )
917
+
918
+ @dsl .pipeline ()
919
+ def do_something_else_pipeline ():
920
+ print_op (message = 'Do something else pipeline' )
921
+
922
+ reusable_pipeline ()
923
+
924
+ @dsl .pipeline ()
925
+ def orchestrator_pipeline ():
926
+ reusable_pipeline ()
927
+
928
+ do_something_else_pipeline ()
929
+
930
+ with tempfile .TemporaryDirectory () as tmpdir :
931
+ output_yaml = os .path .join (tmpdir , 'result.yaml' )
932
+ compiler .Compiler ().compile (
933
+ pipeline_func = orchestrator_pipeline , package_path = output_yaml )
934
+ self .assertTrue (os .path .exists (output_yaml ))
935
+
936
+ with open (output_yaml , 'r' ) as f :
937
+ pipeline_spec = yaml .safe_load (f )
938
+ tasks = [
939
+ comp .get ('dag' , {}).get ('tasks' , {})
940
+ for comp in pipeline_spec ['components' ].values ()
941
+ ]
942
+ component_refs = [[
943
+ x .get ('componentRef' , {}).get ('name' )
944
+ for x in task .values ()
945
+ ]
946
+ for task in tasks ]
947
+ all_component_refs = [
948
+ item for sublist in component_refs for item in sublist
949
+ ]
950
+ counted_refs = collections .Counter (all_component_refs )
951
+ print (counted_refs )
952
+ self .assertEqual (1 , max (counted_refs .values ()))
953
+
912
954
913
955
class V2NamespaceAliasTest (unittest .TestCase ):
914
956
"""Test that imports of both modules and objects are aliased (e.g. all
0 commit comments