From 3d348d8f9df73e5a58634644b030229353fc9cf8 Mon Sep 17 00:00:00 2001 From: zazulam Date: Mon, 1 Jul 2024 23:29:37 -0400 Subject: [PATCH] feat(backend): move comp logic to workflow params Signed-off-by: zazulam Co-authored-by: droctothorpe Co-authored-by: andreafehrman Co-authored-by: MonicaZhang1 Co-authored-by: kylekaminky --- backend/src/v2/compiler/argocompiler/argo.go | 83 +++++++++++------ .../src/v2/compiler/argocompiler/argo_test.go | 27 ++++++ .../src/v2/compiler/argocompiler/container.go | 6 +- .../create_mount_delete_dynamic_pvc.yaml | 88 +++++++++--------- .../testdata/create_pod_metadata.yaml | 90 ++++++++++--------- .../argocompiler/testdata/hello_world.yaml | 29 +++--- .../argocompiler/testdata/importer.yaml | 19 ++-- 7 files changed, 206 insertions(+), 136 deletions(-) diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index a5cfed5faefa..faf5b2b69840 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -16,6 +16,7 @@ package argocompiler import ( "fmt" + "strconv" "strings" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -63,7 +64,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S if err != nil { return nil, err } - // fill root component default paramters to PipelineJob + // fill root component default parameters to PipelineJob specParams := spec.GetRoot().GetInputDefinitions().GetParameters() for name, param := range specParams { _, ok := job.RuntimeConfig.ParameterValues[name] @@ -108,6 +109,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S "pipelines.kubeflow.org/v2_component": "true", }, }, + Arguments: wfapi.Arguments{ + Parameters: []wfapi.Parameter{}, + }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, }, @@ -180,69 +184,96 @@ func (c *workflowCompiler) templateName(componentName string) string { return componentName } -// WIP: store component spec, task spec and executor spec in annotations - const ( - annotationComponents = "pipelines.kubeflow.org/components-" - annotationContainers = "pipelines.kubeflow.org/implementations-" - annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-" + argumentsComponents = "components-" + argumentsContainers = "implementations-" + argumentsKubernetesSpec = "kubernetes-" ) func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error { - return c.saveProtoToAnnotation(annotationComponents+name, spec) + baseComponentName := ExtractBaseComponentName(argumentsComponents + name) + return c.saveProtoToArguments(baseComponentName, spec) } // useComponentSpec returns a placeholder we can refer to the component spec // in argo workflow fields. func (c *workflowCompiler) useComponentSpec(name string) (string, error) { - return c.annotationPlaceholder(annotationComponents + name) + baseComponentName := ExtractBaseComponentName(argumentsComponents + name) + return c.argumentsPlaceholder(baseComponentName) } func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error { - return c.saveProtoToAnnotation(annotationContainers+name, msg) + baseComponentName := ExtractBaseComponentName(argumentsContainers + name) + return c.saveProtoToArguments(baseComponentName, msg) } func (c *workflowCompiler) useComponentImpl(name string) (string, error) { - return c.annotationPlaceholder(annotationContainers + name) + baseComponentName := ExtractBaseComponentName(argumentsContainers + name) + return c.argumentsPlaceholder(baseComponentName) } func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error { - return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec) + return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec) } func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) { - return c.annotationPlaceholder(annotationKubernetesSpec + name) + return c.argumentsPlaceholder(argumentsKubernetesSpec + name) } -// TODO(Bobgy): sanitize component name -func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error { +// saveProtoToArguments saves a proto message to the workflow arguments. The +// message is serialized to JSON and stored in the workflow arguments and then +// referenced by the workflow templates using AWF templating syntax. The reason +// for storing it in the workflow arguments is because there is a 1-many +// relationship between components and tasks that reference them. The workflow +// arguments allow us to deduplicate the component logic (implementation & spec +// in IR), significantly reducing the size of the argo workflow manifest. +func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error { if c == nil { return fmt.Errorf("compiler is nil") } - if c.wf.Annotations == nil { - c.wf.Annotations = make(map[string]string) + if c.wf.Spec.Arguments.Parameters == nil { + c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}} } - if _, alreadyExists := c.wf.Annotations[name]; alreadyExists { - return fmt.Errorf("annotation %q already exists", name) + if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil { + return nil } json, err := stablyMarshalJSON(msg) if err != nil { - return fmt.Errorf("saving component spec of %q to annotations: %w", name, err) + return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err) } - // TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set - c.wf.Annotations[name] = json + c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{ + Name: componentName, + Value: wfapi.AnyStringPtr(json), + }) return nil } -func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) { +// argumentsPlaceholder checks for the unique component name within the workflow +// arguments and returns a template tag that references the component in the +// workflow arguments. +func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) { if c == nil { return "", fmt.Errorf("compiler is nil") } - if _, exists := c.wf.Annotations[name]; !exists { - return "", fmt.Errorf("using component spec: failed to find annotation %q", name) + if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil { + return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName) } - // Reference: https://argoproj.github.io/argo-workflows/variables/ - return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil + + return workflowParameter(componentName), nil +} + +// extractBaseComponentName removes the iteration suffix that the IR compiler +// adds to the component name. +func ExtractBaseComponentName(componentName string) string { + baseComponentName := componentName + componentNameArray := strings.Split(componentName, "-") + + if _, err := strconv.Atoi(componentNameArray[len(componentNameArray)-1]); err == nil { + baseComponentName = strings.Join(componentNameArray[:len(componentNameArray)-1], "-") + + } + + return baseComponentName } const ( diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index 6c92e54574c2..f3bb1fdcb1c0 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -137,3 +137,30 @@ func load(t *testing.T, path string, platformSpecPath string) (*pipelinespec.Pip } return job, nil } + +func Test_extractBaseComponentName(t *testing.T) { + tests := []struct { + name string + componentName string + expectedBaseName string + }{ + { + name: "With dash and int", + componentName: "component-2", + expectedBaseName: "component", + }, + { + name: "Without dash and int", + componentName: "component", + expectedBaseName: "component", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := argocompiler.ExtractBaseComponentName(tt.componentName) + if result != tt.expectedBaseName { + t.Errorf("Expected: %s, Got: %s", tt.expectedBaseName, result) + } + }) + } +} diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 14ed4f706796..b04adc58f8a1 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { }, } // Update pod metadata if it defined in the Kubernetes Spec - if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok { + kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName) + + if kubernetesConfigParam != nil { k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{} - if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil { + if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil { extendPodMetadata(&executor.Metadata, k8sExecCfg) } } diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml index e3b427d2455d..45ba6039ee02 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml @@ -1,36 +1,36 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-comp: '{"executorLabel":"exec-comp"}' - pipelines.kubeflow.org/components-comp-comp-2: '{"executorLabel":"exec-comp-2"}' - pipelines.kubeflow.org/components-comp-createpvc: '{"executorLabel":"exec-createpvc","inputDefinitions":{"parameters":{"access_modes":{"parameterType":"LIST"},"annotations":{"isOptional":true,"parameterType":"STRUCT"},"pvc_name":{"isOptional":true,"parameterType":"STRING"},"pvc_name_suffix":{"isOptional":true,"parameterType":"STRING"},"size":{"parameterType":"STRING"},"storage_class_name":{"defaultValue":"","isOptional":true,"parameterType":"STRING"},"volume_name":{"isOptional":true,"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"name":{"parameterType":"STRING"}}}}' - pipelines.kubeflow.org/components-comp-deletepvc: '{"executorLabel":"exec-deletepvc","inputDefinitions":{"parameters":{"pvc_name":{"parameterType":"STRING"}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"comp":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}},"comp-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}},"createpvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-createpvc"},"inputs":{"parameters":{"access_modes":{"runtimeValue":{"constant":["ReadWriteOnce"]}},"pvc_name_suffix":{"runtimeValue":{"constant":"-my-pvc"}},"size":{"runtimeValue":{"constant":"5Gi"}},"storage_class_name":{"runtimeValue":{"constant":"standard"}}}},"taskInfo":{"name":"createpvc"}},"deletepvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-deletepvc"},"dependentTasks":["comp-2","createpvc"],"inputs":{"parameters":{"pvc_name":{"taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}}},"taskInfo":{"name":"deletepvc"}}}}}' - pipelines.kubeflow.org/implementations-comp-comp: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif - ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m - ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 - python3 -m pip install --quiet --no-warn-script-location ''kfp==2.0.0-beta.16'' - \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d) printf \"%s\" - \"$0\" \u003e \"$program_path/ephemeral_component.py\" python3 -m kfp.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\" - ","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import - *\n\ndef comp():\n pass\n\n"],"image":"python:3.7"}' - pipelines.kubeflow.org/implementations-comp-comp-2: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif - ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m - ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 - python3 -m pip install --quiet --no-warn-script-location ''kfp==2.0.0-beta.16'' - \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d) printf \"%s\" - \"$0\" \u003e \"$program_path/ephemeral_component.py\" python3 -m kfp.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\" - ","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import - *\n\ndef comp():\n pass\n\n"],"image":"python:3.7"}' - pipelines.kubeflow.org/implementations-comp-createpvc: '{"image":"argostub/createpvc"}' - pipelines.kubeflow.org/implementations-comp-deletepvc: '{"image":"argostub/deletepvc"}' - pipelines.kubeflow.org/kubernetes-comp-comp: '{"pvcMount":[{"mountPath":"/data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' - pipelines.kubeflow.org/kubernetes-comp-comp-2: '{"pvcMount":[{"mountPath":"/reused_data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' creationTimestamp: null generateName: my-pipeline- spec: - arguments: {} + arguments: + parameters: + - name: kubernetes-comp-comp + value: '{"pvcMount":[{"mountPath":"/data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' + - name: components-comp-comp + value: '{"executorLabel":"exec-comp"}' + - name: implementations-comp-comp + value: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.0.0-beta.16'' + \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d) printf + \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\" python3 -m kfp.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\" + ","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import + *\n\ndef comp():\n pass\n\n"],"image":"python:3.7"}' + - name: kubernetes-comp-comp-2 + value: '{"pvcMount":[{"mountPath":"/reused_data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' + - name: components-comp-createpvc + value: '{"executorLabel":"exec-createpvc","inputDefinitions":{"parameters":{"access_modes":{"parameterType":"LIST"},"annotations":{"isOptional":true,"parameterType":"STRUCT"},"pvc_name":{"isOptional":true,"parameterType":"STRING"},"pvc_name_suffix":{"isOptional":true,"parameterType":"STRING"},"size":{"parameterType":"STRING"},"storage_class_name":{"defaultValue":"","isOptional":true,"parameterType":"STRING"},"volume_name":{"isOptional":true,"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"name":{"parameterType":"STRING"}}}}' + - name: implementations-comp-createpvc + value: '{"image":"argostub/createpvc"}' + - name: components-comp-deletepvc + value: '{"executorLabel":"exec-deletepvc","inputDefinitions":{"parameters":{"pvc_name":{"parameterType":"STRING"}}}}' + - name: implementations-comp-deletepvc + value: '{"image":"argostub/deletepvc"}' + - name: components-root + value: '{"dag":{"tasks":{"comp":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}},"comp-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}},"createpvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-createpvc"},"inputs":{"parameters":{"access_modes":{"runtimeValue":{"constant":["ReadWriteOnce"]}},"pvc_name_suffix":{"runtimeValue":{"constant":"-my-pvc"}},"size":{"runtimeValue":{"constant":"5Gi"}},"storage_class_name":{"runtimeValue":{"constant":"standard"}}}},"taskInfo":{"name":"createpvc"}},"deletepvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-deletepvc"},"dependentTasks":["comp-2","createpvc"],"inputs":{"parameters":{"pvc_name":{"taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}}},"taskInfo":{"name":"deletepvc"}}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -180,32 +180,32 @@ spec: volumes: - emptyDir: {} name: kfp-launcher - - emptyDir: { } + - emptyDir: {} name: gcs-scratch - - emptyDir: { } + - emptyDir: {} name: s3-scratch - - emptyDir: { } + - emptyDir: {} name: minio-scratch - - emptyDir: { } + - emptyDir: {} name: dot-local-scratch - - emptyDir: { } + - emptyDir: {} name: dot-cache-scratch - - emptyDir: { } + - emptyDir: {} name: dot-config-scratch - dag: tasks: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-comp}}' + value: '{{workflow.parameters.components-comp-comp}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-comp}}' + value: '{{workflow.parameters.implementations-comp-comp}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - name: kubernetes-config - value: '{{workflow.annotations.pipelines.kubeflow.org/kubernetes-comp-comp}}' + value: '{{workflow.parameters.kubernetes-comp-comp}}' depends: createpvc.Succeeded name: comp-driver template: system-container-driver @@ -222,15 +222,15 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-comp-2}}' + value: '{{workflow.parameters.components-comp-comp}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-comp-2}}' + value: '{{workflow.parameters.implementations-comp-comp}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - name: kubernetes-config - value: '{{workflow.annotations.pipelines.kubeflow.org/kubernetes-comp-comp-2}}' + value: '{{workflow.parameters.kubernetes-comp-comp-2}}' depends: comp.Succeeded && createpvc.Succeeded name: comp-2-driver template: system-container-driver @@ -247,11 +247,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-createpvc}}' + value: '{{workflow.parameters.components-comp-createpvc}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-createpvc"},"inputs":{"parameters":{"access_modes":{"runtimeValue":{"constant":["ReadWriteOnce"]}},"pvc_name_suffix":{"runtimeValue":{"constant":"-my-pvc"}},"size":{"runtimeValue":{"constant":"5Gi"}},"storage_class_name":{"runtimeValue":{"constant":"standard"}}}},"taskInfo":{"name":"createpvc"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-createpvc}}' + value: '{{workflow.parameters.implementations-comp-createpvc}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' name: createpvc @@ -259,11 +259,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-deletepvc}}' + value: '{{workflow.parameters.components-comp-deletepvc}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-deletepvc"},"dependentTasks":["comp-2","createpvc"],"inputs":{"parameters":{"pvc_name":{"taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}}},"taskInfo":{"name":"deletepvc"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-deletepvc}}' + value: '{{workflow.parameters.implementations-comp-deletepvc}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' depends: comp-2.Succeeded && createpvc.Succeeded @@ -343,7 +343,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{}' - name: driver-type diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml index 450aed6aeca5..1aa1a3fee464 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml @@ -1,20 +1,24 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-hello-world: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/implementations-comp-hello-world: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf - \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def - hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser - = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", - dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args - = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' - pipelines.kubeflow.org/kubernetes-comp-hello-world: '{"podMetadata":{"annotations":{"experiment_id":"234567","run_id":"123456"},"labels":{"kubeflow.com/common":"test","kubeflow.com/kfp":"pipeline-node"}}}' creationTimestamp: null generateName: hello-world- spec: - arguments: {} + arguments: + parameters: + - name: kubernetes-comp-hello-world + value: '{"podMetadata":{"annotations":{"experiment_id":"234567","run_id":"123456"},"labels":{"kubeflow.com/common":"test","kubeflow.com/kfp":"pipeline-node"}}}' + - name: components-comp-hello-world + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-comp-hello-world + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -124,20 +128,20 @@ spec: name: "" resources: {} volumeMounts: - - mountPath: /kfp-launcher - name: kfp-launcher - - mountPath: /gcs - name: gcs-scratch - - mountPath: /s3 - name: s3-scratch - - mountPath: /minio - name: minio-scratch - - mountPath: /.local - name: dot-local-scratch - - mountPath: /.cache - name: dot-cache-scratch - - mountPath: /.config - name: dot-config-scratch + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch initContainers: - command: - launcher-v2 @@ -168,34 +172,34 @@ spec: outputs: {} podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' volumes: - - emptyDir: {} - name: kfp-launcher - - emptyDir: { } - name: gcs-scratch - - emptyDir: { } - name: s3-scratch - - emptyDir: { } - name: minio-scratch - - emptyDir: { } - name: dot-local-scratch - - emptyDir: { } - name: dot-cache-scratch - - emptyDir: { } - name: dot-config-scratch + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch - dag: tasks: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-hello-world}}' + value: '{{workflow.parameters.components-comp-hello-world}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-hello-world}}' + value: '{{workflow.parameters.implementations-comp-hello-world}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - name: kubernetes-config - value: '{{workflow.annotations.pipelines.kubeflow.org/kubernetes-comp-hello-world}}' + value: '{{workflow.parameters.kubernetes-comp-hello-world}}' name: hello-world-driver template: system-container-driver - arguments: @@ -282,7 +286,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{"parameters":{"text":{"stringValue":"hi there"}}}' - name: driver-type diff --git a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml index 5685ece5de58..d59959225e5d 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml @@ -1,19 +1,22 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-hello-world: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/implementations-comp-hello-world: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf - \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def - hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser - = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", - dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args - = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' creationTimestamp: null generateName: hello-world- spec: - arguments: {} + arguments: + parameters: + - name: components-comp-hello-world + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-comp-hello-world + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -180,11 +183,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-hello-world}}' + value: '{{workflow.parameters.components-comp-hello-world}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-hello-world}}' + value: '{{workflow.parameters.implementations-comp-hello-world}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' name: hello-world-driver @@ -273,7 +276,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{"parameters":{"text":{"stringValue":"hi there"}}}' - name: driver-type diff --git a/backend/src/v2/compiler/argocompiler/testdata/importer.yaml b/backend/src/v2/compiler/argocompiler/testdata/importer.yaml index d0e6ef6eaae2..23ec461c3073 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/importer.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/importer.yaml @@ -1,14 +1,17 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-importer: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"type":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset"}}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}}},"inputDefinitions":{"parameters":{"dataset2":{"type":"STRING"}}}}' - pipelines.kubeflow.org/implementations-comp-importer: '{"artifactUri":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}},"typeSchema":{"schemaTitle":"system.Dataset"}}' creationTimestamp: null generateName: pipeline-with-importer- spec: - arguments: {} + arguments: + parameters: + - name: components-comp-importer + value: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"type":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset"}}}}}' + - name: implementations-comp-importer + value: '{"artifactUri":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}},"typeSchema":{"schemaTitle":"system.Dataset"}}' + - name: components-root + value: '{"dag":{"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}}},"inputDefinitions":{"parameters":{"dataset2":{"type":"STRING"}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -81,9 +84,9 @@ spec: - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}' - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-importer}}' + value: '{{workflow.parameters.components-comp-importer}}' - name: importer - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-importer}}' + value: '{{workflow.parameters.implementations-comp-importer}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' name: importer @@ -162,7 +165,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{}' - name: driver-type