Skip to content

Commit 0e37fd6

Browse files
zazulamdroctothorpeandreafehrmanMonicaZhang1KyleKaminky
authored
feat(backend): move comp logic to workflow params (#10979)
* feat(backend): move comp logic to workflow params Signed-off-by: zazulam <m.zazula@gmail.com> Co-authored-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: andreafehrman <andrea.k.fehrman@vanderbilt.edu> Co-authored-by: MonicaZhang1 <zhangmonica1@gmail.com> Co-authored-by: kylekaminky <kyle.kaminky@gmail.com> Co-authored-by: CarterFendley <carter.fendley@gmail.com> Signed-off-by: zazulam <m.zazula@gmail.com> * address pr comments Signed-off-by: zazulam <m.zazula@gmail.com> * Use function name instead of base name and address edge cases Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: zazulam <m.zazula@gmail.com> * Improve logic and update tests Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: zazulam <m.zazula@gmail.com> * POC hashing command and args Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: zazulam <m.zazula@gmail.com> * Add comments to clarify the logic Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: zazulam <m.zazula@gmail.com> * Hash entire PipelineContainerSpec Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: zazulam <m.zazula@gmail.com> --------- Signed-off-by: zazulam <m.zazula@gmail.com> Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: andreafehrman <andrea.k.fehrman@vanderbilt.edu> Co-authored-by: MonicaZhang1 <zhangmonica1@gmail.com> Co-authored-by: kylekaminky <kyle.kaminky@gmail.com> Co-authored-by: CarterFendley <carter.fendley@gmail.com>
1 parent 99bd234 commit 0e37fd6

File tree

6 files changed

+220
-136
lines changed

6 files changed

+220
-136
lines changed

backend/src/v2/compiler/argocompiler/argo.go

Lines changed: 98 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@
1515
package argocompiler
1616

1717
import (
18+
"crypto/sha256"
19+
"encoding/hex"
20+
"encoding/json"
1821
"fmt"
1922
"strings"
2023

2124
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
2225
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2326
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
27+
log "github.com/sirupsen/logrus"
2428
"google.golang.org/protobuf/proto"
2529
"google.golang.org/protobuf/types/known/structpb"
2630
k8score "k8s.io/api/core/v1"
@@ -63,7 +67,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
6367
if err != nil {
6468
return nil, err
6569
}
66-
// fill root component default paramters to PipelineJob
70+
// fill root component default parameters to PipelineJob
6771
specParams := spec.GetRoot().GetInputDefinitions().GetParameters()
6872
for name, param := range specParams {
6973
_, ok := job.RuntimeConfig.ParameterValues[name]
@@ -108,6 +112,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
108112
"pipelines.kubeflow.org/v2_component": "true",
109113
},
110114
},
115+
Arguments: wfapi.Arguments{
116+
Parameters: []wfapi.Parameter{},
117+
},
111118
ServiceAccountName: "pipeline-runner",
112119
Entrypoint: tmplEntrypoint,
113120
},
@@ -180,69 +187,134 @@ func (c *workflowCompiler) templateName(componentName string) string {
180187
return componentName
181188
}
182189

183-
// WIP: store component spec, task spec and executor spec in annotations
184-
185190
const (
186-
annotationComponents = "pipelines.kubeflow.org/components-"
187-
annotationContainers = "pipelines.kubeflow.org/implementations-"
188-
annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-"
191+
argumentsComponents = "components-"
192+
argumentsContainers = "implementations-"
193+
argumentsKubernetesSpec = "kubernetes-"
189194
)
190195

191196
func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error {
192-
return c.saveProtoToAnnotation(annotationComponents+name, spec)
197+
hashedComponent := c.hashComponentContainer(name)
198+
199+
return c.saveProtoToArguments(argumentsComponents+hashedComponent, spec)
193200
}
194201

195202
// useComponentSpec returns a placeholder we can refer to the component spec
196203
// in argo workflow fields.
197204
func (c *workflowCompiler) useComponentSpec(name string) (string, error) {
198-
return c.annotationPlaceholder(annotationComponents + name)
205+
hashedComponent := c.hashComponentContainer(name)
206+
207+
return c.argumentsPlaceholder(argumentsComponents + hashedComponent)
199208
}
200209

201210
func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error {
202-
return c.saveProtoToAnnotation(annotationContainers+name, msg)
211+
hashedComponent := c.hashComponentContainer(name)
212+
213+
return c.saveProtoToArguments(argumentsContainers+hashedComponent, msg)
203214
}
204215

205216
func (c *workflowCompiler) useComponentImpl(name string) (string, error) {
206-
return c.annotationPlaceholder(annotationContainers + name)
217+
hashedComponent := c.hashComponentContainer(name)
218+
219+
return c.argumentsPlaceholder(argumentsContainers + hashedComponent)
207220
}
208221

209222
func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error {
210-
return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec)
223+
return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec)
211224
}
212225

213226
func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) {
214-
return c.annotationPlaceholder(annotationKubernetesSpec + name)
227+
return c.argumentsPlaceholder(argumentsKubernetesSpec + name)
215228
}
216229

217-
// TODO(Bobgy): sanitize component name
218-
func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error {
230+
// saveProtoToArguments saves a proto message to the workflow arguments. The
231+
// message is serialized to JSON and stored in the workflow arguments and then
232+
// referenced by the workflow templates using AWF templating syntax. The reason
233+
// for storing it in the workflow arguments is because there is a 1-many
234+
// relationship between components and tasks that reference them. The workflow
235+
// arguments allow us to deduplicate the component logic (implementation & spec
236+
// in IR), significantly reducing the size of the argo workflow manifest.
237+
func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error {
219238
if c == nil {
220239
return fmt.Errorf("compiler is nil")
221240
}
222-
if c.wf.Annotations == nil {
223-
c.wf.Annotations = make(map[string]string)
241+
if c.wf.Spec.Arguments.Parameters == nil {
242+
c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}}
224243
}
225-
if _, alreadyExists := c.wf.Annotations[name]; alreadyExists {
226-
return fmt.Errorf("annotation %q already exists", name)
244+
if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil {
245+
return nil
227246
}
228247
json, err := stablyMarshalJSON(msg)
229248
if err != nil {
230-
return fmt.Errorf("saving component spec of %q to annotations: %w", name, err)
249+
return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err)
231250
}
232-
// TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set
233-
c.wf.Annotations[name] = json
251+
c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{
252+
Name: componentName,
253+
Value: wfapi.AnyStringPtr(json),
254+
})
234255
return nil
235256
}
236257

237-
func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) {
258+
// argumentsPlaceholder checks for the unique component name within the workflow
259+
// arguments and returns a template tag that references the component in the
260+
// workflow arguments.
261+
func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) {
238262
if c == nil {
239263
return "", fmt.Errorf("compiler is nil")
240264
}
241-
if _, exists := c.wf.Annotations[name]; !exists {
242-
return "", fmt.Errorf("using component spec: failed to find annotation %q", name)
265+
if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil {
266+
return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName)
267+
}
268+
269+
return workflowParameter(componentName), nil
270+
}
271+
272+
// hashComponentContainer serializes and hashes the container field of a given
273+
// component.
274+
func (c *workflowCompiler) hashComponentContainer(componentName string) string {
275+
log.Debug("componentName: ", componentName)
276+
// Return early for root component since it has no command and args.
277+
if componentName == "root" {
278+
return componentName
243279
}
244-
// Reference: https://argoproj.github.io/argo-workflows/variables/
245-
return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil
280+
if c.executors != nil { // Don't bother if there are no executors in the pipeline spec.
281+
// Look up the executorLabel for the component in question.
282+
executorLabel := c.spec.Components[componentName].GetExecutorLabel()
283+
log.Debug("executorLabel: ", executorLabel)
284+
// Iterate through the list of executors.
285+
for executorName, executorValue := range c.executors {
286+
log.Debug("executorName: ", executorName)
287+
// If one of them matches the executorLabel we extracted earlier...
288+
if executorName == executorLabel {
289+
// Get the corresponding container.
290+
container := executorValue.GetContainer()
291+
if container != nil {
292+
containerHash, err := hashValue(container)
293+
if err != nil {
294+
// Do not bubble up since this is not a breaking error
295+
// and we can just return the componentName in full.
296+
log.Debug("Error hashing container: ", err)
297+
}
298+
299+
return containerHash
300+
}
301+
}
302+
}
303+
}
304+
305+
return componentName
306+
}
307+
308+
// hashValue serializes and hashes a provided value.
309+
func hashValue(value interface{}) (string, error) {
310+
bytes, err := json.Marshal(value)
311+
if err != nil {
312+
return "", err
313+
}
314+
h := sha256.New()
315+
h.Write([]byte(bytes))
316+
317+
return hex.EncodeToString(h.Sum(nil)), nil
246318
}
247319

248320
const (

backend/src/v2/compiler/argocompiler/container.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
358358
},
359359
}
360360
// Update pod metadata if it defined in the Kubernetes Spec
361-
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
361+
kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName)
362+
363+
if kubernetesConfigParam != nil {
362364
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
363-
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
365+
if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
364366
extendPodMetadata(&executor.Metadata, k8sExecCfg)
365367
}
366368
}

0 commit comments

Comments
 (0)