Skip to content

Commit

Permalink
Fix enable_caching issues when handling PVC creation/deletion
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>
  • Loading branch information
rimolive committed Jan 15, 2025
1 parent 60e4163 commit ac2c533
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 9 deletions.
16 changes: 10 additions & 6 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,9 +1793,11 @@ func createPVC(
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to get id from createdExecution")
}
*/
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
if err != nil {
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entrty for create pvc: %w", err)
if opts.Task.GetCachingOptions().GetEnableCache() {
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
if err != nil {
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entry for create pvc: %w", err)
}
}

return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil
Expand Down Expand Up @@ -1902,9 +1904,11 @@ func deletePVC(
return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to get id from createdExecution")
}
*/
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
if err != nil {
return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entrty for delete pvc: %w", err)
if opts.Task.GetCachingOptions().GetEnableCache() && ecfg.CachedMLMDExecutionID != "" {
err = createCache(ctx, createdExecution, opts, taskStartedTime, fingerPrint, cacheClient)
if err != nil {
return createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create cache entry for delete pvc: %w", err)
}
}

return createdExecution, pb.Execution_COMPLETE, nil
Expand Down
6 changes: 3 additions & 3 deletions samples/v2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
.PHONY: all
all:
@for f in $$(find . -name "*.py"); do \
compiled="$${f%.*}_pipeline.json"; \
compiled="$${f%.*}_pipeline.yaml"; \
echo "compiling $${f} to $${compiled}"; \
dsl-compile-v2 --py "$${f}" --out $${compiled}""; \
kfp dsl compile --py "$${f}" --output $${compiled}""; \
done

# clean up all genereated pipeline job JSON files
.PHONY: clean
clean:
@find . -name "*.json" -exec rm {} \;
@find . -name "*.yaml" -exec rm {} \;
68 changes: 68 additions & 0 deletions samples/v2/pipeline_with_volume_no_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline with no caching on volume creation, mount and deletion in v2 engine pipeline."""
from kfp import dsl
from kfp import kubernetes


@dsl.component
def producer() -> str:
with open('/data/file.txt', 'w') as file:
file.write('Hello world')
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content


@dsl.component
def consumer() -> str:
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content


@dsl.pipeline
def pipeline_with_volume_no_cache():
pvc1 = kubernetes.CreatePVC(
pvc_name_suffix='-my-pvc',
access_modes=['ReadWriteOnce'],
size='5Mi',
storage_class_name='standard',
).set_caching_options(False)

task1 = producer()
task2 = consumer().after(task1)

kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)

delete_pvc1 = kubernetes.DeletePVC(
pvc_name=pvc1.outputs['name']).after(task2).set_caching_options(False)

if __name__ == '__main__':
# execute only if run as a script
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=pipeline_with_volume,
package_path='pipeline_with_volume.json')
37 changes: 37 additions & 0 deletions samples/v2/pipeline_with_volume_no_cache_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import unittest

from kfp.samples.test.utils import KfpTask
from kfp.samples.test.utils import run_pipeline_func
from kfp.samples.test.utils import TestCase
import kfp_server_api

from .pipeline_with_volume import pipeline_with_volume_no_cache


def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun,
tasks: dict[str, KfpTask], **kwargs):
t.assertEqual(run.status, 'Succeeded')


if __name__ == '__main__':
run_pipeline_func([
TestCase(
pipeline_func=pipeline_with_volume_no_cache,
verify_func=verify,
),
])

0 comments on commit ac2c533

Please sign in to comment.