diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index c701aadc2ee..0be993d5545 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1793,9 +1793,11 @@ func createPVC( return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to get id from createdExecution") } */ - err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient) - if err != nil { - return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entrty for create pvc: %w", err) + if opts.Task.GetCachingOptions().GetEnableCache() { + err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient) + if err != nil { + return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entry for create pvc: %w", err) + } } return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil @@ -1902,9 +1904,11 @@ func deletePVC( return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to get id from createdExecution") } */ - err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient) - if err != nil { - return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entrty for delete pvc: %w", err) + if opts.Task.GetCachingOptions().GetEnableCache() && ecfg.CachedMLMDExecutionID != "" { + err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient) + if err != nil { + return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entry for delete pvc: %w", err) + } } return createdExecution, pb.Execution_COMPLETE, nil diff --git a/samples/v2/Makefile b/samples/v2/Makefile index e348b64fc32..72015474d2b 100644 --- a/samples/v2/Makefile +++ b/samples/v2/Makefile @@ -16,12 +16,12 @@ .PHONY: all all: @for f in $$(find . -name "*.py"); do \ - compiled="$${f%.*}_pipeline.json"; \ + compiled="$${f%.*}_pipeline.yaml"; \ echo "compiling $${f} to $${compiled}"; \ - dsl-compile-v2 --py "$${f}" --out $${compiled}""; \ + kfp dsl compile --py "$${f}" --output $${compiled}""; \ done # clean up all genereated pipeline job JSON files .PHONY: clean clean: - @find . -name "*.json" -exec rm {} \; + @find . -name "*.yaml" -exec rm {} \; diff --git a/samples/v2/pipeline_with_volume_no_cache.py b/samples/v2/pipeline_with_volume_no_cache.py new file mode 100644 index 00000000000..762d8efa66d --- /dev/null +++ b/samples/v2/pipeline_with_volume_no_cache.py @@ -0,0 +1,68 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline with no caching on volume creation, mount and deletion in v2 engine pipeline.""" +from kfp import dsl +from kfp import kubernetes + + +@dsl.component +def producer() -> str: + with open('/data/file.txt', 'w') as file: + file.write('Hello world') + with open('/data/file.txt', 'r') as file: + content = file.read() + print(content) + return content + + +@dsl.component +def consumer() -> str: + with open('/data/file.txt', 'r') as file: + content = file.read() + print(content) + return content + + +@dsl.pipeline +def pipeline_with_volume_no_cache(): + pvc1 = kubernetes.CreatePVC( + pvc_name_suffix='-my-pvc', + access_modes=['ReadWriteOnce'], + size='5Mi', + storage_class_name='standard', + ).set_caching_options(False) + + task1 = producer() + task2 = consumer().after(task1) + + kubernetes.mount_pvc( + task1, + pvc_name=pvc1.outputs['name'], + mount_path='/data', + ) + kubernetes.mount_pvc( + task2, + pvc_name=pvc1.outputs['name'], + mount_path='/data', + ) + + delete_pvc1 = kubernetes.DeletePVC( + pvc_name=pvc1.outputs['name']).after(task2).set_caching_options(False) + +if __name__ == '__main__': + # execute only if run as a script + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline_with_volume, + package_path='pipeline_with_volume.json') \ No newline at end of file diff --git a/samples/v2/pipeline_with_volume_no_cache_test.py b/samples/v2/pipeline_with_volume_no_cache_test.py new file mode 100644 index 00000000000..b14caf73c4f --- /dev/null +++ b/samples/v2/pipeline_with_volume_no_cache_test.py @@ -0,0 +1,37 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import unittest + +from kfp.samples.test.utils import KfpTask +from kfp.samples.test.utils import run_pipeline_func +from kfp.samples.test.utils import TestCase +import kfp_server_api + +from .pipeline_with_volume import pipeline_with_volume_no_cache + + +def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, + tasks: dict[str, KfpTask], **kwargs): + t.assertEqual(run.status, 'Succeeded') + + +if __name__ == '__main__': + run_pipeline_func([ + TestCase( + pipeline_func=pipeline_with_volume_no_cache, + verify_func=verify, + ), + ])