Skip to content

Commit 064c670

Browse files
committed
add PipelineConfig to DSL to re-implement pipeline-level config
KFP v1 supported setting pipeline-level configuration via a `PipelineConf` class. This class was deprecated and no replacement was added to KFP v2. add new PipelineConfig class to support setting pipeline-level configuration in KFP v2. Signed-off-by: Greg Sheremeta <gshereme@redhat.com>
1 parent 0d098db commit 064c670

File tree

8 files changed

+1462
-1330
lines changed

8 files changed

+1462
-1330
lines changed

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 1400 additions & 1323 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/pipeline_spec.proto

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ message PipelineSpec {
8080

8181
// Optional field. The default root output directory of the pipeline.
8282
string default_pipeline_root = 10;
83+
84+
// Contains pipeline-level config options. See PipelineConfig DSL class.
85+
PipelineConfig pipeline_config = 11;
8386
}
8487

8588
// Definition of a component.
@@ -1084,9 +1087,13 @@ message SinglePlatformSpec {
10841087
PlatformDeploymentConfig deployment_spec = 1;
10851088
}
10861089

1087-
10881090
message PlatformDeploymentConfig {
10891091
// Map of executor label to executor-level config
10901092
// Mirrors PipelineSpec.deployment_spec.executors structure
10911093
map<string, google.protobuf.Struct> executors = 1;
10921094
}
1095+
1096+
// Spec for pipeline-level config options. See PipelineConfig DSL class.
1097+
message PipelineConfig {
1098+
// TODO add pipeline-level configs
1099+
}

sdk/python/kfp/compiler/pipeline_spec_builder.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from kfp.dsl import component_factory
2929
from kfp.dsl import for_loop
3030
from kfp.dsl import pipeline_channel
31+
from kfp.dsl import pipeline_config
3132
from kfp.dsl import pipeline_context
3233
from kfp.dsl import pipeline_task
3334
from kfp.dsl import placeholders
@@ -1850,13 +1851,15 @@ def create_pipeline_spec(
18501851
pipeline: pipeline_context.Pipeline,
18511852
component_spec: structures.ComponentSpec,
18521853
pipeline_outputs: Optional[Any] = None,
1854+
pipeline_config: pipeline_config.PipelineConfig = None,
18531855
) -> Tuple[pipeline_spec_pb2.PipelineSpec, pipeline_spec_pb2.PlatformSpec]:
18541856
"""Creates a pipeline spec object.
18551857
18561858
Args:
18571859
pipeline: The instantiated pipeline object.
18581860
component_spec: The component spec structures.
18591861
pipeline_outputs: The pipeline outputs via return.
1862+
pipeline_config: The pipeline config object.
18601863
18611864
Returns:
18621865
A PipelineSpec proto representing the compiled pipeline.
@@ -1874,6 +1877,10 @@ def create_pipeline_spec(
18741877
# Schema version 2.1.0 is required for kfp-pipeline-spec>0.1.13
18751878
pipeline_spec.schema_version = '2.1.0'
18761879

1880+
# pipeline-level config options
1881+
pipeline_config_spec = pipeline_spec_pb2.PipelineConfig()
1882+
# TODO add pipeline-level config options
1883+
18771884
pipeline_spec.root.CopyFrom(
18781885
_build_component_spec_from_component_spec_structure(component_spec))
18791886

@@ -1951,6 +1958,9 @@ def create_pipeline_spec(
19511958
dag_outputs=modified_pipeline_outputs_dict,
19521959
structures_component_spec=component_spec)
19531960

1961+
# add pipeline-level config options
1962+
pipeline_spec.pipeline_config.CopyFrom(pipeline_config_spec)
1963+
19541964
return pipeline_spec, platform_spec
19551965

19561966

sdk/python/kfp/dsl/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ def my_pipeline():
264264
from kfp.dsl.for_loop import Collected
265265
from kfp.dsl.importer_node import importer
266266
from kfp.dsl.pipeline_channel import OneOf
267+
from kfp.dsl.pipeline_config import PipelineConfig
267268
from kfp.dsl.pipeline_context import pipeline
268269
from kfp.dsl.pipeline_task import PipelineTask
269270
from kfp.dsl.placeholders import ConcatPlaceholder
@@ -292,4 +293,5 @@ def my_pipeline():
292293
'IfPresentPlaceholder',
293294
'ConcatPlaceholder',
294295
'PipelineTask',
296+
'PipelineConfig',
295297
])

sdk/python/kfp/dsl/component_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from kfp.dsl import container_component_artifact_channel
2828
from kfp.dsl import container_component_class
2929
from kfp.dsl import graph_component
30+
from kfp.dsl import pipeline_config
3031
from kfp.dsl import placeholders
3132
from kfp.dsl import python_component
3233
from kfp.dsl import structures
@@ -658,6 +659,7 @@ def create_graph_component_from_func(
658659
name: Optional[str] = None,
659660
description: Optional[str] = None,
660661
display_name: Optional[str] = None,
662+
pipeline_config: pipeline_config.PipelineConfig = None,
661663
) -> graph_component.GraphComponent:
662664
"""Implementation for the @pipeline decorator.
663665
@@ -674,6 +676,7 @@ def create_graph_component_from_func(
674676
component_spec=component_spec,
675677
pipeline_func=func,
676678
display_name=display_name,
679+
pipeline_config=pipeline_config,
677680
)
678681

679682

sdk/python/kfp/dsl/graph_component.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from kfp.compiler import pipeline_spec_builder as builder
2121
from kfp.dsl import base_component
2222
from kfp.dsl import pipeline_channel
23+
from kfp.dsl import pipeline_config
2324
from kfp.dsl import pipeline_context
2425
from kfp.dsl import structures
2526
from kfp.pipeline_spec import pipeline_spec_pb2
@@ -37,9 +38,11 @@ def __init__(
3738
component_spec: structures.ComponentSpec,
3839
pipeline_func: Callable,
3940
display_name: Optional[str] = None,
41+
pipeline_config: pipeline_config.PipelineConfig = None,
4042
):
4143
super().__init__(component_spec=component_spec)
4244
self.pipeline_func = pipeline_func
45+
self.pipeline_config = pipeline_config
4346

4447
args_list = []
4548
signature = inspect.signature(pipeline_func)
@@ -69,6 +72,7 @@ def __init__(
6972
pipeline=dsl_pipeline,
7073
component_spec=self.component_spec,
7174
pipeline_outputs=pipeline_outputs,
75+
pipeline_config=pipeline_config,
7276
)
7377

7478
pipeline_root = getattr(pipeline_func, 'pipeline_root', None)

sdk/python/kfp/dsl/pipeline_config.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2024 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Pipeline-level config options."""
15+
16+
17+
class PipelineConfig:
18+
"""PipelineConfig contains pipeline-level config options."""
19+
20+
def __init__(self):
21+
pass
22+
23+
# TODO add pipeline level configs

sdk/python/kfp/dsl/pipeline_context.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@
1717
from typing import Callable, Optional
1818

1919
from kfp.dsl import component_factory
20+
from kfp.dsl import pipeline_config
2021
from kfp.dsl import pipeline_task
2122
from kfp.dsl import tasks_group
2223
from kfp.dsl import utils
2324

2425

25-
def pipeline(func: Optional[Callable] = None,
26-
*,
27-
name: Optional[str] = None,
28-
description: Optional[str] = None,
29-
pipeline_root: Optional[str] = None,
30-
display_name: Optional[str] = None) -> Callable:
26+
def pipeline(
27+
func: Optional[Callable] = None,
28+
*,
29+
name: Optional[str] = None,
30+
description: Optional[str] = None,
31+
pipeline_root: Optional[str] = None,
32+
display_name: Optional[str] = None,
33+
pipeline_config: pipeline_config.PipelineConfig = None) -> Callable:
3134
"""Decorator used to construct a pipeline.
3235
3336
Example
@@ -49,6 +52,7 @@ def my_pipeline(a: str, b: int):
4952
pipeline_root: The root directory from which to read input and output
5053
parameters and artifacts.
5154
display_name: A human-readable name for the pipeline.
55+
pipeline_config: Pipeline-level config options.
5256
"""
5357
if func is None:
5458
return functools.partial(
@@ -57,6 +61,7 @@ def my_pipeline(a: str, b: int):
5761
description=description,
5862
pipeline_root=pipeline_root,
5963
display_name=display_name,
64+
pipeline_config=pipeline_config,
6065
)
6166

6267
if pipeline_root:
@@ -67,6 +72,7 @@ def my_pipeline(a: str, b: int):
6772
name=name,
6873
description=description,
6974
display_name=display_name,
75+
pipeline_config=pipeline_config,
7076
)
7177

7278

0 commit comments

Comments
 (0)