|
| 1 | +# Copyright 2021 The Kubeflow Authors |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +# %% |
| 16 | +import os |
| 17 | +from typing import Dict, List |
| 18 | +import json |
| 19 | +import yaml |
| 20 | +from kubernetes import client as k8s_client |
| 21 | +import kfp.deprecated as kfp |
| 22 | + |
| 23 | +download_gcs_tgz = kfp.components.load_component_from_file( |
| 24 | + 'components/download_gcs_tgz.yaml') |
| 25 | +run_sample = kfp.components.load_component_from_file( |
| 26 | + 'components/run_sample.yaml') |
| 27 | +kaniko = kfp.components.load_component_from_file('components/kaniko.yaml') |
| 28 | +build_go = kfp.components.load_component_from_file('components/build_go.yaml') |
| 29 | + |
| 30 | +_MINUTE = 60 # seconds |
| 31 | + |
| 32 | + |
| 33 | +@kfp.dsl.pipeline(name='v2 sample test') |
| 34 | +def v2_sample_test( |
| 35 | + samples_config: List[Dict] = [ |
| 36 | + { # TODO(Bobgy): why is the default value needed to pass argo lint? |
| 37 | + 'name': 'example', |
| 38 | + 'path': 'samples.v2.hello_world_test' |
| 39 | + } |
| 40 | + ], |
| 41 | + context: 'URI' = 'gs://your-bucket/path/to/context.tar.gz', |
| 42 | + gcs_root: 'URI' = 'gs://ml-pipeline-test/v2', |
| 43 | + image_registry: 'URI' = 'gcr.io/ml-pipeline-test', |
| 44 | + kfp_host: 'URI' = 'http://ml-pipeline:8888', |
| 45 | + kfp_package_path: |
| 46 | + 'URI' = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python' |
| 47 | +): |
| 48 | + download_src_op = download_gcs_tgz(gcs_path=context).set_cpu_limit( |
| 49 | + '0.5').set_memory_limit('500Mi').set_display_name('download_src') |
| 50 | + download_src_op.execution_options.caching_strategy.max_cache_staleness = "P0D" |
| 51 | + |
| 52 | + def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp: |
| 53 | + task: kfp.dsl.ContainerOp = kaniko( |
| 54 | + context_artifact=download_src_op.outputs['folder'], |
| 55 | + destination=f'{image_registry}/{name}', |
| 56 | + dockerfile=dockerfile, |
| 57 | + ) |
| 58 | + # CPU request/limit can be more flexible (request < limit), because being assigned to a node |
| 59 | + # with insufficient CPU resource will only slow the task down, but not fail. |
| 60 | + task.container.set_cpu_request('1').set_cpu_limit('2') |
| 61 | + # Memory request/limit needs to be more rigid (request == limit), because in a node without |
| 62 | + # enough memory, the task can hang indefinetely or OOM. |
| 63 | + task.container.set_memory_request('4Gi').set_memory_limit('4Gi') |
| 64 | + task.set_display_name(f'build-image-{name}') |
| 65 | + task.set_retry( |
| 66 | + 1, policy='Always' |
| 67 | + ) # Always -> retry on both system error and user code failure. |
| 68 | + return task |
| 69 | + |
| 70 | + # build v2 go images |
| 71 | + build_go_op = build_go( |
| 72 | + destination=f'{image_registry}/kfp-', |
| 73 | + context=download_src_op.outputs['folder'], |
| 74 | + ) |
| 75 | + build_go_op.set_retry(1, policy='Always') |
| 76 | + build_go_op.container.set_cpu_request('1').set_cpu_limit('2') |
| 77 | + build_go_op.container.set_memory_request('4Gi').set_memory_limit('4Gi') |
| 78 | + |
| 79 | + # build sample test image |
| 80 | + build_samples_image_op = build_image( |
| 81 | + name='v2-sample-test', |
| 82 | + dockerfile='backend/src/v2/test/Dockerfile', |
| 83 | + ) |
| 84 | + |
| 85 | + # run test samples in parallel |
| 86 | + with kfp.dsl.ParallelFor(samples_config) as sample: |
| 87 | + run_sample_op: kfp.dsl.ContainerOp = run_sample( |
| 88 | + name=sample.name, |
| 89 | + sample_path=sample.path, |
| 90 | + gcs_root=gcs_root, |
| 91 | + external_host=kfp_host, |
| 92 | + launcher_v2_image=build_go_op.outputs['digest_launcher_v2'], |
| 93 | + driver_image=build_go_op.outputs['digest_driver'], |
| 94 | + backend_compiler=build_go_op.outputs['backend_compiler'], |
| 95 | + ) |
| 96 | + run_sample_op.container.image = build_samples_image_op.outputs['digest'] |
| 97 | + run_sample_op.set_display_name(f'sample_{sample.name}') |
| 98 | + run_sample_op.set_retry(1, policy='Always') |
| 99 | + |
| 100 | + run_sample_op.container.add_env_variable( |
| 101 | + k8s_client.V1EnvVar( |
| 102 | + name='KFP_PACKAGE_PATH', value=kfp_package_path)) |
| 103 | + |
| 104 | + |
| 105 | +def main( |
| 106 | + context: str, |
| 107 | + gcr_root: str, |
| 108 | + gcs_root: str, |
| 109 | + experiment: str = 'v2_sample_test', |
| 110 | + timeout_mins: float = 40, |
| 111 | + kfp_package_path: |
| 112 | + str = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python', |
| 113 | + samples_config: str = os.path.join('samples', 'test', 'config.yaml'), |
| 114 | +): |
| 115 | + REPO_ROOT = os.path.join('..', '..', '..', '..') |
| 116 | + samples_config_path = os.path.join(REPO_ROOT, samples_config) |
| 117 | + samples_config_content = None |
| 118 | + with open(samples_config_path, 'r') as stream: |
| 119 | + samples_config_content = yaml.safe_load(stream) |
| 120 | + |
| 121 | + client = kfp.Client() |
| 122 | + # TODO(Bobgy): avoid using private fields when getting loaded config |
| 123 | + host = client._existing_config.host |
| 124 | + client.create_experiment( |
| 125 | + name=experiment, |
| 126 | + description='An experiment with Kubeflow Pipelines v2 sample test runs.' |
| 127 | + ) |
| 128 | + conf = kfp.dsl.PipelineConf() |
| 129 | + conf.set_timeout( |
| 130 | + timeout_mins * _MINUTE |
| 131 | + ) # add timeout to avoid pipelines stuck in running leak indefinetely |
| 132 | + |
| 133 | + print('Using KFP package path: {}'.format(kfp_package_path)) |
| 134 | + run_result = client.create_run_from_pipeline_func( |
| 135 | + v2_sample_test, |
| 136 | + { |
| 137 | + 'samples_config': samples_config_content, |
| 138 | + 'context': context, |
| 139 | + 'image_registry': f'{gcr_root}/test', |
| 140 | + 'gcs_root': gcs_root, |
| 141 | + 'kfp_host': host, |
| 142 | + 'kfp_package_path': kfp_package_path, |
| 143 | + }, |
| 144 | + experiment_name=experiment, |
| 145 | + pipeline_conf=conf, |
| 146 | + ) |
| 147 | + print("Run details page URL:") |
| 148 | + print(f"{host}/#/runs/details/{run_result.run_id}") |
| 149 | + run_response = run_result.wait_for_run_completion(timeout_mins * _MINUTE) |
| 150 | + run = run_response.run |
| 151 | + from pprint import pprint |
| 152 | + # Hide verbose content |
| 153 | + run_response.run.pipeline_spec.workflow_manifest = None |
| 154 | + pprint(run_response.run) |
| 155 | + print("Run details page URL:") |
| 156 | + print(f"{host}/#/runs/details/{run_result.run_id}") |
| 157 | + assert run.status == 'Succeeded' |
| 158 | + # TODO(Bobgy): print debug info |
| 159 | + |
| 160 | + |
| 161 | +# %% |
| 162 | +if __name__ == "__main__": |
| 163 | + import fire |
| 164 | + fire.Fire(main) |
0 commit comments