Skip to content

Commit 1090395

Browse files
committed
WIP: implement setting ttl on pipelines
Signed-off-by: Greg Sheremeta <gshereme@redhat.com>
1 parent 2fb4922 commit 1090395

File tree

10 files changed

+505
-402
lines changed

10 files changed

+505
-402
lines changed

api/v2alpha1/go/cachekey/cache_key.pb.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 423 additions & 397 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/pipeline_spec.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,8 @@ message ValueOrRuntimeParameter {
679679
}
680680

681681
// The definition of the deployment config of the pipeline. It contains the
682-
// the platform specific executor configs for KFP OSS.
682+
// the platform specific executor configs for KFP OSS, as well as platform level config
683+
// such as settings to pass to an Argo Workflows Workflow CR.
683684
message PipelineDeploymentConfig {
684685
// The specification on a container invocation.
685686
// The string fields of the message support string based placeholder contract
@@ -852,6 +853,11 @@ message PipelineDeploymentConfig {
852853
}
853854
// Map from executor label to executor spec.
854855
map<string, ExecutorSpec> executors = 1;
856+
857+
// begin platform level / workflow level config
858+
859+
// duration in seconds after which the pipeline platform will do garbage collection
860+
optional int32 completed_pipeline_ttl = 2;
855861
}
856862

857863
// Value is the value of the field.

backend/src/v2/compiler/argocompiler/argo.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
7272
}
7373
}
7474

75+
// pipeline-level config options
76+
var completedPipelineTtl int32 = 0
77+
78+
if deploy.CompletedPipelineTtl != nil {
79+
completedPipelineTtl = *deploy.CompletedPipelineTtl
80+
}
81+
7582
var kubernetesSpec *pipelinespec.SinglePlatformSpec
7683
if kubernetesSpecArg != nil {
7784
// clone kubernetesSpecArg, because we don't want to change it
@@ -112,6 +119,16 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
112119
Entrypoint: tmplEntrypoint,
113120
},
114121
}
122+
123+
// set pipeline-level config options
124+
125+
// completed pipeline ttl
126+
if completedPipelineTtl > 0 {
127+
wf.Spec.TTLStrategy = &wfapi.TTLStrategy{
128+
SecondsAfterCompletion: &completedPipelineTtl,
129+
}
130+
}
131+
115132
c := &workflowCompiler{
116133
wf: wf,
117134
templates: make(map[string]*wfapi.Template),

sdk/python/kfp/compiler/pipeline_spec_builder.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from kfp.dsl import component_factory
2929
from kfp.dsl import for_loop
3030
from kfp.dsl import pipeline_channel
31+
from kfp.dsl import pipeline_config
3132
from kfp.dsl import pipeline_context
3233
from kfp.dsl import pipeline_task
3334
from kfp.dsl import placeholders
@@ -1850,13 +1851,15 @@ def create_pipeline_spec(
18501851
pipeline: pipeline_context.Pipeline,
18511852
component_spec: structures.ComponentSpec,
18521853
pipeline_outputs: Optional[Any] = None,
1854+
pipeline_config: pipeline_config.PipelineConfig = None,
18531855
) -> Tuple[pipeline_spec_pb2.PipelineSpec, pipeline_spec_pb2.PlatformSpec]:
18541856
"""Creates a pipeline spec object.
18551857
18561858
Args:
18571859
pipeline: The instantiated pipeline object.
18581860
component_spec: The component spec structures.
18591861
pipeline_outputs: The pipeline outputs via return.
1862+
pipeline_config: The pipeline config object.
18601863
18611864
Returns:
18621865
A PipelineSpec proto representing the compiled pipeline.
@@ -1874,6 +1877,10 @@ def create_pipeline_spec(
18741877
# Schema version 2.1.0 is required for kfp-pipeline-spec>0.1.13
18751878
pipeline_spec.schema_version = '2.1.0'
18761879

1880+
# pipeline-level config options
1881+
if pipeline_config.completed_pipeline_ttl_seconds is not None and pipeline_config.completed_pipeline_ttl_seconds > 0:
1882+
deployment_config.completed_pipeline_ttl = pipeline_config.completed_pipeline_ttl_seconds
1883+
18771884
pipeline_spec.root.CopyFrom(
18781885
_build_component_spec_from_component_spec_structure(component_spec))
18791886

sdk/python/kfp/dsl/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ def my_pipeline():
266266
from kfp.dsl.pipeline_channel import OneOf
267267
from kfp.dsl.pipeline_context import pipeline
268268
from kfp.dsl.pipeline_task import PipelineTask
269+
from kfp.dsl.pipeline_config import PipelineConfig
269270
from kfp.dsl.placeholders import ConcatPlaceholder
270271
from kfp.dsl.placeholders import IfPresentPlaceholder
271272
from kfp.dsl.structures import ContainerSpec
@@ -292,4 +293,5 @@ def my_pipeline():
292293
'IfPresentPlaceholder',
293294
'ConcatPlaceholder',
294295
'PipelineTask',
296+
'PipelineConfig',
295297
])

sdk/python/kfp/dsl/component_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from kfp.dsl import container_component_artifact_channel
2828
from kfp.dsl import container_component_class
2929
from kfp.dsl import graph_component
30+
from kfp.dsl import pipeline_config
3031
from kfp.dsl import placeholders
3132
from kfp.dsl import python_component
3233
from kfp.dsl import structures
@@ -658,6 +659,7 @@ def create_graph_component_from_func(
658659
name: Optional[str] = None,
659660
description: Optional[str] = None,
660661
display_name: Optional[str] = None,
662+
pipeline_config: pipeline_config.PipelineConfig = None,
661663
) -> graph_component.GraphComponent:
662664
"""Implementation for the @pipeline decorator.
663665
@@ -674,6 +676,7 @@ def create_graph_component_from_func(
674676
component_spec=component_spec,
675677
pipeline_func=func,
676678
display_name=display_name,
679+
pipeline_config=pipeline_config,
677680
)
678681

679682

sdk/python/kfp/dsl/graph_component.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from kfp.compiler import pipeline_spec_builder as builder
2121
from kfp.dsl import base_component
2222
from kfp.dsl import pipeline_channel
23+
from kfp.dsl import pipeline_config
2324
from kfp.dsl import pipeline_context
2425
from kfp.dsl import structures
2526
from kfp.pipeline_spec import pipeline_spec_pb2
@@ -37,9 +38,11 @@ def __init__(
3738
component_spec: structures.ComponentSpec,
3839
pipeline_func: Callable,
3940
display_name: Optional[str] = None,
41+
pipeline_config: pipeline_config.PipelineConfig = None,
4042
):
4143
super().__init__(component_spec=component_spec)
4244
self.pipeline_func = pipeline_func
45+
self.pipeline_config = pipeline_config
4346

4447
args_list = []
4548
signature = inspect.signature(pipeline_func)
@@ -69,6 +72,7 @@ def __init__(
6972
pipeline=dsl_pipeline,
7073
component_spec=self.component_spec,
7174
pipeline_outputs=pipeline_outputs,
75+
pipeline_config=pipeline_config,
7276
)
7377

7478
pipeline_root = getattr(pipeline_func, 'pipeline_root', None)

sdk/python/kfp/dsl/pipeline_config.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright 2024 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Pipeline-level config options."""
15+
16+
17+
class PipelineConfig():
18+
"""PipelineConfig contains pipeline-level config options."""
19+
20+
def __init__(self):
21+
self.completed_pipeline_ttl_seconds = None
22+
23+
def set_completed_pipeline_ttl_seconds(self, seconds: int):
24+
"""Configures the duration in seconds after which the pipeline platform will
25+
do garbage collection. This is dependent on the platform's implementation, but
26+
typically involves deleting pods and higher level resources associated with the
27+
pods.
28+
29+
Args:
30+
seconds: number of seconds for the platform to do garbage collection after
31+
the pipeline run is completed.
32+
"""
33+
self.completed_pipeline_ttl_seconds = seconds
34+
return self

sdk/python/kfp/dsl/pipeline_context.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from typing import Callable, Optional
1818

1919
from kfp.dsl import component_factory
20+
from kfp.dsl import pipeline_config
2021
from kfp.dsl import pipeline_task
2122
from kfp.dsl import tasks_group
2223
from kfp.dsl import utils
@@ -27,7 +28,8 @@ def pipeline(func: Optional[Callable] = None,
2728
name: Optional[str] = None,
2829
description: Optional[str] = None,
2930
pipeline_root: Optional[str] = None,
30-
display_name: Optional[str] = None) -> Callable:
31+
display_name: Optional[str] = None,
32+
pipeline_config: pipeline_config.PipelineConfig = None) -> Callable:
3133
"""Decorator used to construct a pipeline.
3234
3335
Example
@@ -57,6 +59,7 @@ def my_pipeline(a: str, b: int):
5759
description=description,
5860
pipeline_root=pipeline_root,
5961
display_name=display_name,
62+
pipeline_config=pipeline_config,
6063
)
6164

6265
if pipeline_root:
@@ -67,6 +70,7 @@ def my_pipeline(a: str, b: int):
6770
name=name,
6871
description=description,
6972
display_name=display_name,
73+
pipeline_config=pipeline_config,
7074
)
7175

7276

0 commit comments

Comments
 (0)