From 8e0b1db7bddfb4923262190956238bfd6f44f7fe Mon Sep 17 00:00:00 2001 From: "Ricardo M. Oliveira" Date: Wed, 25 Sep 2024 12:07:20 -0300 Subject: [PATCH] Introduces PipelineConfig class with TTL configuration Signed-off-by: Ricardo M. Oliveira --- api/v2alpha1/go/cachekey/cache_key.pb.go | 6 ++--- .../go/pipelinespec/pipeline_spec.pb.go | 24 ++++++++++++++----- api/v2alpha1/pipeline_spec.proto | 1 + backend/src/apiserver/template/v2_template.go | 23 +++++++++++++++--- backend/src/v2/compiler/argocompiler/argo.go | 13 ++++++++++ .../kfp/compiler/pipeline_spec_builder.py | 5 ++++ sdk/python/kfp/dsl/pipeline_config.py | 5 ++++ 7 files changed, 65 insertions(+), 12 deletions(-) diff --git a/api/v2alpha1/go/cachekey/cache_key.pb.go b/api/v2alpha1/go/cachekey/cache_key.pb.go index eb29fd917c7b..1d6fa474c0ed 100644 --- a/api/v2alpha1/go/cachekey/cache_key.pb.go +++ b/api/v2alpha1/go/cachekey/cache_key.pb.go @@ -14,7 +14,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 +// protoc-gen-go v1.33.0 // protoc v3.17.3 // source: cache_key.proto @@ -42,7 +42,7 @@ type CacheKey struct { unknownFields protoimpl.UnknownFields InputArtifactNames map[string]*ArtifactNameList `protobuf:"bytes,1,rep,name=inputArtifactNames,proto3" json:"inputArtifactNames,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // Deprecated: Do not use. + // Deprecated: Marked as deprecated in cache_key.proto. InputParameters map[string]*pipelinespec.Value `protobuf:"bytes,2,rep,name=inputParameters,proto3" json:"inputParameters,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` OutputArtifactsSpec map[string]*pipelinespec.RuntimeArtifact `protobuf:"bytes,3,rep,name=outputArtifactsSpec,proto3" json:"outputArtifactsSpec,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` OutputParametersSpec map[string]string `protobuf:"bytes,4,rep,name=outputParametersSpec,proto3" json:"outputParametersSpec,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` @@ -89,7 +89,7 @@ func (x *CacheKey) GetInputArtifactNames() map[string]*ArtifactNameList { return nil } -// Deprecated: Do not use. +// Deprecated: Marked as deprecated in cache_key.proto. func (x *CacheKey) GetInputParameters() map[string]*pipelinespec.Value { if x != nil { return x.InputParameters diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index b4bca0cec5ff..8bc9b28cc62f 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -2525,6 +2525,9 @@ type PipelineConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + // TODO add pipeline-level configs + PipelineTtl int32 `protobuf:"varint,1,opt,name=pipelineTtl,proto3" json:"pipelineTtl,omitempty"` } func (x *PipelineConfig) Reset() { @@ -2559,6 +2562,13 @@ func (*PipelineConfig) Descriptor() ([]byte, []int) { return file_pipeline_spec_proto_rawDescGZIP(), []int{30} } +func (x *PipelineConfig) GetPipelineTtl() int32 { + if x != nil { + return x.PipelineTtl + } + return 0 +} + // The runtime config of a PipelineJob. type PipelineJob_RuntimeConfig struct { state protoimpl.MessageState @@ -6523,12 +6533,14 @@ var file_pipeline_spec_proto_rawDesc = []byte{ 0x6b, 0x65, 0x79, 0x12, 0x2d, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x10, 0x0a, 0x0e, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, - 0x6e, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x66, 0x6c, 0x6f, 0x77, 0x2f, - 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x32, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, - 0x6e, 0x65, 0x73, 0x70, 0x65, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x32, 0x0a, 0x0e, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, + 0x6e, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x69, 0x70, 0x65, + 0x6c, 0x69, 0x6e, 0x65, 0x54, 0x74, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, + 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x54, 0x74, 0x6c, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x66, 0x6c, 0x6f, + 0x77, 0x2f, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x76, 0x32, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x69, 0x70, 0x65, + 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x70, 0x65, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index 075913f80520..84933d918c70 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -1104,4 +1104,5 @@ message PlatformDeploymentConfig { // Spec for pipeline-level config options. See PipelineConfig DSL class. message PipelineConfig { // TODO add pipeline-level configs + int32 pipelineTtl = 1; } diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index d14ddffdaeb7..482bfbcf5d97 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -23,6 +23,7 @@ import ( "regexp" "strings" + "github.com/golang/glog" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/kubeflow/pipelines/backend/src/apiserver/model" @@ -77,9 +78,17 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } } + var pipeline_options argocompiler.Options + if t.platformSpec.PipelineConfig.Ttl != nil { + glog.Info("Found pipeline config") + pipeline_options = argocompiler.Options{ + TtlSeconds: *t.platformSpec.PipelineConfig.Ttl, + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, &pipeline_options) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher}) } @@ -300,9 +309,17 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u } } + var pipeline_options *argocompiler.Options + if t.platformSpec.PipelineConfig.Ttl != nil { + glog.Info("Found pipeline config") + pipeline_options = &argocompiler.Options{ + TtlSeconds: *t.platformSpec.PipelineConfig.Ttl, + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, pipeline_options) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil) } @@ -344,7 +361,7 @@ func IsPlatformSpecWithKubernetesConfig(template []byte) bool { return false } _, ok := platformSpec.Platforms["kubernetes"] - return ok + return ok || platformSpec.PipelineConfig != nil } func (t *V2Spec) validatePipelineJobInputs(job *pipelinespec.PipelineJob) error { diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index 1f1c19ed3ec1..4c7aa8e560a1 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -40,8 +40,13 @@ type Options struct { // optional PipelineRoot string // TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode. + TtlSeconds int32 } +const ( + pipeline_default_ttlSeconds = int32(30) +) + func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) { // clone jobArg, because we don't want to change it jobMsg := proto.Clone(jobArg) @@ -86,6 +91,11 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S } } + pipeline_ttlseconds := pipeline_default_ttlSeconds + if &opts.TtlSeconds != nil { + pipeline_ttlseconds = opts.TtlSeconds + } + // initialization wf := &wfapi.Workflow{ TypeMeta: k8smeta.TypeMeta{ @@ -117,6 +127,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, + TTLStrategy: &wfapi.TTLStrategy{ + SecondsAfterCompletion: &pipeline_ttlseconds, + }, }, } c := &workflowCompiler{ diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 3f1575005da5..e6e84775899c 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -2076,6 +2076,11 @@ def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig, # {'pipelineConfig': { # '': pipelineConfig., # }}, platformSpec.platforms['kubernetes']) + + pipeline_config_json = json_format.ParseDict( + {'pipelineConfig': { + 'pipelineTtl': pipelineConfig.get_ttl(), + }}, platformSpec.platforms['kubernetes']) return platformSpec diff --git a/sdk/python/kfp/dsl/pipeline_config.py b/sdk/python/kfp/dsl/pipeline_config.py index a4e90c28a012..cdc2ca25b32b 100644 --- a/sdk/python/kfp/dsl/pipeline_config.py +++ b/sdk/python/kfp/dsl/pipeline_config.py @@ -21,3 +21,8 @@ def __init__(self): pass # TODO add pipeline level configs + def set_ttl(self, ttl: int): + self.__ttl = ttl + + def get_ttl(self) -> int: + return int(self.__ttl)