Skip to content

Commit 7ebe489

Browse files
committed
Fix enable_caching issues when handling PVC creation/deletion
Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>
1 parent 60e4163 commit 7ebe489

File tree

4 files changed

+117
-8
lines changed

4 files changed

+117
-8
lines changed

backend/src/v2/driver/driver.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,9 +1793,11 @@ func createPVC(
17931793
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to get id from createdExecution")
17941794
}
17951795
*/
1796-
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
1797-
if err != nil {
1798-
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entrty for create pvc: %w", err)
1796+
if opts.Task.GetCachingOptions().GetEnableCache() {
1797+
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
1798+
if err != nil {
1799+
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entry for create pvc: %w", err)
1800+
}
17991801
}
18001802

18011803
return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil
@@ -1902,9 +1904,11 @@ func deletePVC(
19021904
return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to get id from createdExecution")
19031905
}
19041906
*/
1905-
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
1906-
if err != nil {
1907-
return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entrty for delete pvc: %w", err)
1907+
if opts.Task.GetCachingOptions().GetEnableCache() && ecfg.CachedMLMDExecutionID != "" {
1908+
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
1909+
if err != nil {
1910+
return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entry for delete pvc: %w", err)
1911+
}
19081912
}
19091913

19101914
return createdExecution, pb.Execution_COMPLETE, nil

samples/v2/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
.PHONY: all
1717
all:
1818
@for f in $$(find . -name "*.py"); do \
19-
compiled="$${f%.*}_pipeline.json"; \
19+
compiled="$${f%.*}_pipeline.yaml"; \
2020
echo "compiling $${f} to $${compiled}"; \
21-
dsl-compile-v2 --py "$${f}" --out $${compiled}""; \
21+
kfp dsl compile --py "$${f}" --output $${compiled}""; \
2222
done
2323

2424
# clean up all genereated pipeline job JSON files
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2023 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Pipeline with volume creation, mount and deletion in v2 engine pipeline."""
15+
from kfp import dsl
16+
from kfp import kubernetes
17+
18+
19+
@dsl.component
20+
def producer() -> str:
21+
with open('/data/file.txt', 'w') as file:
22+
file.write('Hello world')
23+
with open('/data/file.txt', 'r') as file:
24+
content = file.read()
25+
print(content)
26+
return content
27+
28+
29+
@dsl.component
30+
def consumer() -> str:
31+
with open('/data/file.txt', 'r') as file:
32+
content = file.read()
33+
print(content)
34+
return content
35+
36+
37+
@dsl.pipeline
38+
def pipeline_with_volume_no_cache():
39+
pvc1 = kubernetes.CreatePVC(
40+
pvc_name_suffix='-my-pvc',
41+
access_modes=['ReadWriteOnce'],
42+
size='5Mi',
43+
storage_class_name='standard',
44+
).set_caching_options(False)
45+
46+
task1 = producer()
47+
task2 = consumer().after(task1)
48+
49+
kubernetes.mount_pvc(
50+
task1,
51+
pvc_name=pvc1.outputs['name'],
52+
mount_path='/data',
53+
)
54+
kubernetes.mount_pvc(
55+
task2,
56+
pvc_name=pvc1.outputs['name'],
57+
mount_path='/data',
58+
)
59+
60+
delete_pvc1 = kubernetes.DeletePVC(
61+
pvc_name=pvc1.outputs['name']).after(task2)
62+
63+
if __name__ == '__main__':
64+
# execute only if run as a script
65+
from kfp import compiler
66+
compiler.Compiler().compile(
67+
pipeline_func=pipeline_with_volume,
68+
package_path='pipeline_with_volume.json')
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Copyright 2021 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import unittest
17+
18+
from kfp.samples.test.utils import KfpTask
19+
from kfp.samples.test.utils import run_pipeline_func
20+
from kfp.samples.test.utils import TestCase
21+
import kfp_server_api
22+
23+
from .pipeline_with_volume import pipeline_with_volume_no_cache
24+
25+
26+
def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun,
27+
tasks: dict[str, KfpTask], **kwargs):
28+
t.assertEqual(run.status, 'Succeeded')
29+
30+
31+
if __name__ == '__main__':
32+
run_pipeline_func([
33+
TestCase(
34+
pipeline_func=pipeline_with_volume_no_cache,
35+
verify_func=verify,
36+
),
37+
])

0 commit comments

Comments
 (0)