Skip to content

feat(sdk): Implement Pipeline Configuration with TTL #11269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from kfp.dsl import PipelineTaskFinalStatus
from kfp.dsl import tasks_group
from kfp.dsl import yaml_component
from kfp.dsl.pipeline_config import PipelineConfig
from kfp.dsl.types import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
import yaml
Expand Down Expand Up @@ -3891,6 +3892,35 @@ def my_pipeline():
# test that it can be compiled _again_ after reloading (tests YamlComponent internals)
compile_and_reload(loaded_pipeline)

def test_resource_ttl(self):
config = PipelineConfig()
config.set_resource_ttl(3600)

@dsl.pipeline(pipeline_config=config)
def my_pipeline():
task = comp()

expected = pipeline_spec_pb2.PlatformSpec()
json_format.ParseDict(
{
'platforms': {
'kubernetes': {
'pipelineConfig': {
'resourceTtl': 3600
}
}
}
}, expected)

self.assertEqual(my_pipeline.platform_spec, expected)

loaded_pipeline = compile_and_reload(my_pipeline)

self.assertEqual(loaded_pipeline.platform_spec, expected)

# test that it can be compiled _again_ after reloading (tests YamlComponent internals)
compile_and_reload(loaded_pipeline)

def test_task_with_exit_handler(self):

@dsl.pipeline
Expand Down Expand Up @@ -4863,7 +4893,9 @@ def roll_die_pipeline() -> str:
)

def test_if_elif_else_consumed(self):
"""Uses If, Elif, and Else branches, parameters passed to dsl.OneOf, dsl.OneOf passed to a consumer task, and different output keys on dsl.OneOf channels."""
"""Uses If, Elif, and Else branches, parameters passed to dsl.OneOf,
dsl.OneOf passed to a consumer task, and different output keys on
dsl.OneOf channels."""

@dsl.pipeline
def roll_die_pipeline():
Expand Down Expand Up @@ -5279,7 +5311,7 @@ def flip_coin_pipeline(execute_pipeline: bool):
print_task_2.outputs['a'])

def test_oneof_in_condition(self):
"""Tests that dsl.OneOf's channel can be consumed in a downstream group nested one level"""
"""Tests that dsl.OneOf's channel can be consumed in a downstream group nested one level."""

@dsl.pipeline
def roll_die_pipeline(repeat_on: str = 'Got heads!'):
Expand Down Expand Up @@ -5332,7 +5364,7 @@ def roll_die_pipeline(repeat_on: str = 'Got heads!'):
)

def test_consumed_in_nested_groups(self):
"""Tests that dsl.OneOf's channel can be consumed in a downstream group nested multiple levels"""
"""Tests that dsl.OneOf's channel can be consumed in a downstream group nested multiple levels."""

@dsl.pipeline
def roll_die_pipeline(
Expand Down
11 changes: 6 additions & 5 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2063,11 +2063,12 @@ def write_pipeline_spec_to_file(

def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig,
platformSpec: pipeline_spec_pb2.PlatformSpec):
# TODO: add pipeline config options (ttl, semaphore, etc.) to the dict
# json_format.ParseDict(
# {'pipelineConfig': {
# '<some pipeline config option>': pipelineConfig.<get that value>,
# }}, platformSpec.platforms['kubernetes'])
pipeline_config_json = json_format.ParseDict(
{
'pipelineConfig': {
'resource_ttl': pipelineConfig.get_resource_ttl(),
}
}, platformSpec.platforms['kubernetes'])

return platformSpec

Expand Down
10 changes: 7 additions & 3 deletions sdk/python/kfp/dsl/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
class PipelineConfig:
"""PipelineConfig contains pipeline-level config options."""

def __init__(self):
pass
def __init__(self, ttl=None):
self.ttl = ttl

# TODO add pipeline level configs
def get_resource_ttl(self):
return self.ttl

def set_resource_ttl(self, ttl: int):
self.ttl = ttl
Loading