Skip to content

Commit a8374c2

Browse files
committed
feat(Backend + SDK): Update kfp backend and kubernetes sdk to support EmptyDir
Update kfp backend and kubernetes sdk to support mounting EmptyDir volumes to task pods. Inspired by #10427 Fixes: #10656 Signed-off-by: Greg Sheremeta <gshereme@redhat.com>
1 parent 9ba9bdd commit a8374c2

File tree

7 files changed

+482
-3
lines changed

7 files changed

+482
-3
lines changed

backend/src/v2/driver/driver.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"google.golang.org/protobuf/encoding/protojson"
3737
"google.golang.org/protobuf/types/known/structpb"
3838
k8score "k8s.io/api/core/v1"
39+
"k8s.io/apimachinery/pkg/api/resource"
3940
k8sres "k8s.io/apimachinery/pkg/api/resource"
4041
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4142
"k8s.io/client-go/kubernetes"
@@ -665,6 +666,33 @@ func extendPodSpecPatch(
665666
podSpec.Volumes = append(podSpec.Volumes, ephemeralVolume)
666667
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, ephemeralVolumeMount)
667668
}
669+
670+
// EmptyDirMounts
671+
for _, emptyDirVolumeSpec := range kubernetesExecutorConfig.GetEmptyDirMounts() {
672+
var sizeLimitResource *resource.Quantity
673+
if emptyDirVolumeSpec.GetSizeLimit() != "" {
674+
r := k8sres.MustParse(emptyDirVolumeSpec.GetSizeLimit())
675+
sizeLimitResource = &r
676+
}
677+
678+
emptyDirVolume := k8score.Volume{
679+
Name: emptyDirVolumeSpec.GetVolumeName(),
680+
VolumeSource: k8score.VolumeSource{
681+
EmptyDir: &k8score.EmptyDirVolumeSource{
682+
Medium: k8score.StorageMedium(emptyDirVolumeSpec.GetMedium()),
683+
SizeLimit: sizeLimitResource,
684+
},
685+
},
686+
}
687+
emptyDirVolumeMount := k8score.VolumeMount{
688+
Name: emptyDirVolumeSpec.GetVolumeName(),
689+
MountPath: emptyDirVolumeSpec.GetMountPath(),
690+
}
691+
692+
podSpec.Volumes = append(podSpec.Volumes, emptyDirVolume)
693+
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, emptyDirVolumeMount)
694+
}
695+
668696
return nil
669697
}
670698

backend/src/v2/driver/driver_test.go

Lines changed: 164 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ package driver
1515

1616
import (
1717
"encoding/json"
18+
"testing"
19+
20+
"k8s.io/apimachinery/pkg/api/resource"
1821
k8sres "k8s.io/apimachinery/pkg/api/resource"
1922
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20-
"testing"
2123

2224
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2325
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
@@ -532,7 +534,7 @@ func Test_extendPodSpecPatch_Secret(t *testing.T) {
532534
{
533535
Name: "secret1",
534536
VolumeSource: k8score.VolumeSource{
535-
Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0],},
537+
Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0]},
536538
},
537539
},
538540
},
@@ -730,7 +732,7 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) {
730732
VolumeSource: k8score.VolumeSource{
731733
ConfigMap: &k8score.ConfigMapVolumeSource{
732734
LocalObjectReference: k8score.LocalObjectReference{Name: "cm1"},
733-
Optional: &[]bool{false}[0],},
735+
Optional: &[]bool{false}[0]},
734736
},
735737
},
736738
},
@@ -890,6 +892,165 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) {
890892
}
891893
}
892894

895+
func Test_extendPodSpecPatch_EmptyVolumeMount(t *testing.T) {
896+
medium := "Memory"
897+
sizeLimit := "1Gi"
898+
var sizeLimitResource *resource.Quantity
899+
r := k8sres.MustParse(sizeLimit)
900+
sizeLimitResource = &r
901+
902+
tests := []struct {
903+
name string
904+
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
905+
podSpec *k8score.PodSpec
906+
expected *k8score.PodSpec
907+
}{
908+
{
909+
"Valid - emptydir mount with no medium or size limit",
910+
&kubernetesplatform.KubernetesExecutorConfig{
911+
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
912+
{
913+
VolumeName: "emptydir1",
914+
MountPath: "/data/path",
915+
},
916+
},
917+
},
918+
&k8score.PodSpec{
919+
Containers: []k8score.Container{
920+
{
921+
Name: "main",
922+
},
923+
},
924+
},
925+
&k8score.PodSpec{
926+
Containers: []k8score.Container{
927+
{
928+
Name: "main",
929+
VolumeMounts: []k8score.VolumeMount{
930+
{
931+
Name: "emptydir1",
932+
MountPath: "/data/path",
933+
},
934+
},
935+
},
936+
},
937+
Volumes: []k8score.Volume{
938+
{
939+
Name: "emptydir1",
940+
VolumeSource: k8score.VolumeSource{
941+
EmptyDir: &k8score.EmptyDirVolumeSource{},
942+
},
943+
},
944+
},
945+
},
946+
},
947+
{
948+
"Valid - emptydir mount with medium and size limit",
949+
&kubernetesplatform.KubernetesExecutorConfig{
950+
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
951+
{
952+
VolumeName: "emptydir1",
953+
MountPath: "/data/path",
954+
Medium: &medium,
955+
SizeLimit: &sizeLimit,
956+
},
957+
},
958+
},
959+
&k8score.PodSpec{
960+
Containers: []k8score.Container{
961+
{
962+
Name: "main",
963+
},
964+
},
965+
},
966+
&k8score.PodSpec{
967+
Containers: []k8score.Container{
968+
{
969+
Name: "main",
970+
VolumeMounts: []k8score.VolumeMount{
971+
{
972+
Name: "emptydir1",
973+
MountPath: "/data/path",
974+
},
975+
},
976+
},
977+
},
978+
Volumes: []k8score.Volume{
979+
{
980+
Name: "emptydir1",
981+
VolumeSource: k8score.VolumeSource{
982+
EmptyDir: &k8score.EmptyDirVolumeSource{
983+
Medium: k8score.StorageMedium(medium),
984+
SizeLimit: sizeLimitResource,
985+
},
986+
},
987+
},
988+
},
989+
},
990+
},
991+
{
992+
"Valid - multiple emptydir mounts",
993+
&kubernetesplatform.KubernetesExecutorConfig{
994+
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
995+
{
996+
VolumeName: "emptydir1",
997+
MountPath: "/data/path",
998+
},
999+
{
1000+
VolumeName: "emptydir2",
1001+
MountPath: "/data/path2",
1002+
},
1003+
},
1004+
},
1005+
&k8score.PodSpec{
1006+
Containers: []k8score.Container{
1007+
{
1008+
Name: "main",
1009+
},
1010+
},
1011+
},
1012+
&k8score.PodSpec{
1013+
Containers: []k8score.Container{
1014+
{
1015+
Name: "main",
1016+
VolumeMounts: []k8score.VolumeMount{
1017+
{
1018+
Name: "emptydir1",
1019+
MountPath: "/data/path",
1020+
},
1021+
{
1022+
Name: "emptydir2",
1023+
MountPath: "/data/path2",
1024+
},
1025+
},
1026+
},
1027+
},
1028+
Volumes: []k8score.Volume{
1029+
{
1030+
Name: "emptydir1",
1031+
VolumeSource: k8score.VolumeSource{
1032+
EmptyDir: &k8score.EmptyDirVolumeSource{},
1033+
},
1034+
},
1035+
{
1036+
Name: "emptydir2",
1037+
VolumeSource: k8score.VolumeSource{
1038+
EmptyDir: &k8score.EmptyDirVolumeSource{},
1039+
},
1040+
},
1041+
},
1042+
},
1043+
},
1044+
}
1045+
for _, tt := range tests {
1046+
t.Run(tt.name, func(t *testing.T) {
1047+
err := extendPodSpecPatch(tt.podSpec, tt.k8sExecCfg, nil, nil)
1048+
assert.Nil(t, err)
1049+
assert.Equal(t, tt.expected, tt.podSpec)
1050+
})
1051+
}
1052+
}
1053+
8931054
func Test_extendPodSpecPatch_ImagePullSecrets(t *testing.T) {
8941055
tests := []struct {
8951056
name string

kubernetes_platform/python/kfp/kubernetes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
'add_toleration',
2323
'CreatePVC',
2424
'DeletePVC',
25+
'empty_dir_mount',
2526
'mount_pvc',
2627
'set_image_pull_policy',
2728
'use_field_path_as_env',
@@ -49,3 +50,4 @@
4950
from kfp.kubernetes.volume import CreatePVC
5051
from kfp.kubernetes.volume import DeletePVC
5152
from kfp.kubernetes.volume import mount_pvc
53+
from kfp.kubernetes.empty_dir import empty_dir_mount
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright 2024 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Optional
16+
17+
from google.protobuf import json_format
18+
from kfp.dsl import PipelineTask
19+
from kfp.kubernetes import common
20+
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb
21+
22+
23+
def empty_dir_mount(
24+
task: PipelineTask,
25+
volume_name: str,
26+
mount_path: str,
27+
medium: Optional[str] = None,
28+
size_limit: Optional[str] = None,
29+
) -> PipelineTask:
30+
"""Mount an EmptyDir volume to the task's container.
31+
32+
Args:
33+
task: Pipeline task.
34+
volume_name: Name of the EmptyDir volume.
35+
mount_path: Path within the container at which the EmptyDir should be mounted.
36+
medium: Storage medium to back the EmptyDir. Must be one of `Memory` or `HugePages`. Defaults to `None`.
37+
size_limit: Maximum size of the EmptyDir. For example, `5Gi`. Defaults to `None`.
38+
39+
Returns:
40+
Task object with updated EmptyDir mount configuration.
41+
"""
42+
43+
msg = common.get_existing_kubernetes_config_as_message(task)
44+
45+
empty_dir_mount = pb.EmptyDirMount(
46+
volume_name=volume_name,
47+
mount_path=mount_path,
48+
medium=medium,
49+
size_limit=size_limit,
50+
)
51+
52+
msg.empty_dir_mounts.append(empty_dir_mount)
53+
54+
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)
55+
56+
return task
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright 2024 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from kfp import dsl
16+
from kfp import kubernetes
17+
18+
19+
@dsl.component
20+
def comp():
21+
pass
22+
23+
@dsl.pipeline
24+
def my_pipeline():
25+
task = comp()
26+
kubernetes.empty_dir_mount(
27+
task,
28+
volume_name='emptydir-vol-1',
29+
mount_path='/mnt/my_vol_1',
30+
medium='Memory',
31+
size_limit='1Gi'
32+
)
33+
34+
if __name__ == '__main__':
35+
from kfp import compiler
36+
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))

0 commit comments

Comments
 (0)