From c5546a0f0e192e76da0bb55edcee5a470d692ffc Mon Sep 17 00:00:00 2001 From: Jiri Petrlik Date: Tue, 19 Nov 2024 12:55:32 +0100 Subject: [PATCH] RHOAIENG-12192 - Extend DSP e2e tests --- .../v2/cache-disabled/ray_integration.py | 6 +- .../ray_integration_compiled.yaml | 10 +- .../v2/cache-disabled/ray_job_integration.py | 377 ++++++++++++++++++ .../ray_job_integration_compiled.yaml | 262 ++++++++++++ .../test-pipelines-integration.robot | 127 ++++++ .../1101__data-science-pipelines-kfp.robot | 21 - 6 files changed, 774 insertions(+), 29 deletions(-) create mode 100644 ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py create mode 100644 ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml create mode 100644 ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration.py b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration.py index d59b619a9..08bb09a24 100644 --- a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration.py +++ b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration.py @@ -6,11 +6,11 @@ # image and the sdk has a fixed value because the version matters -@dsl.component(packages_to_install=["codeflare-sdk==0.21.1"], base_image=common_base_image) +@dsl.component(packages_to_install=["codeflare-sdk==v0.24.0"], base_image=common_base_image) def ray_fn() -> int: import ray # noqa: PLC0415 from codeflare_sdk import generate_cert # noqa: PLC0415 - from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration # noqa: PLC0415 + from codeflare_sdk.ray.cluster import Cluster, ClusterConfiguration # noqa: PLC0415 cluster = Cluster( ClusterConfiguration( @@ -24,7 +24,7 @@ def ray_fn() -> int: worker_cpu_limits=1, worker_memory_requests=1, worker_memory_limits=2, - image="quay.io/modh/ray:2.35.0-py39-cu121", + image="quay.io/modh/ray@sha256:0d715f92570a2997381b7cafc0e224cfa25323f18b9545acfd23bc2b71576d06", verify_tls=False ) ) diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration_compiled.yaml b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration_compiled.yaml index 42bf5f227..1e5d8a03d 100644 --- a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration_compiled.yaml +++ b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_integration_compiled.yaml @@ -22,9 +22,9 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'codeflare-sdk==0.21.1'\ + \ python3 -m pip install --quiet --no-warn-script-location 'codeflare-sdk==v0.24.0'\ \ && \"$0\" \"$@\"\n" - sh - -ec @@ -38,14 +38,14 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef ray_fn() -> int:\n import ray # noqa: PLC0415\n from codeflare_sdk\ - \ import generate_cert # noqa: PLC0415\n from codeflare_sdk.cluster.cluster\ + \ import generate_cert # noqa: PLC0415\n from codeflare_sdk.ray.cluster\ \ import Cluster, ClusterConfiguration # noqa: PLC0415\n\n cluster =\ \ Cluster(\n ClusterConfiguration(\n name=\"raytest\"\ ,\n num_workers=1,\n head_cpu_requests=1,\n \ \ head_cpu_limits=1,\n head_memory_requests=4,\n \ \ head_memory_limits=4,\n worker_cpu_requests=1,\n \ \ worker_cpu_limits=1,\n worker_memory_requests=1,\n \ - \ worker_memory_limits=2,\n image=\"quay.io/modh/ray:2.35.0-py39-cu121\"\ + \ worker_memory_limits=2,\n image=\"quay.io/modh/ray@sha256:0d715f92570a2997381b7cafc0e224cfa25323f18b9545acfd23bc2b71576d06\"\ ,\n verify_tls=False\n )\n )\n\n # always clean\ \ the resources\n cluster.down()\n print(cluster.status())\n cluster.up()\n\ \ cluster.wait_ready()\n print(cluster.status())\n print(cluster.details())\n\ @@ -76,4 +76,4 @@ root: taskInfo: name: ray-fn schemaVersion: 2.1.0 -sdkVersion: kfp-2.10.1 +sdkVersion: kfp-2.11.0 diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py new file mode 100644 index 000000000..bb6ce5935 --- /dev/null +++ b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration.py @@ -0,0 +1,377 @@ +from kfp import compiler, dsl + +common_base_image = ( + "registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61" +) + +# image and the sdk has a fixed value because the version matters +@dsl.component(packages_to_install=["codeflare-sdk==v0.24.0"], base_image=common_base_image) +def ray_fn( + AWS_DEFAULT_ENDPOINT: str, + AWS_STORAGE_BUCKET: str, + AWS_ACCESS_KEY_ID: str, + AWS_SECRET_ACCESS_KEY: str, + AWS_STORAGE_BUCKET_MNIST_DIR: str +) -> None: + import openshift + import subprocess + import ray # noqa: PLC0415 + import tempfile + from codeflare_sdk import generate_cert # noqa: PLC0415 + from codeflare_sdk.ray.cluster import Cluster, ClusterConfiguration # noqa: PLC0415 + from codeflare_sdk.ray.client import RayJobClient + from time import sleep + + training_script = """ +import os + +import torch +import requests +from pytorch_lightning import LightningModule, Trainer +from pytorch_lightning.callbacks.progress import TQDMProgressBar +from torch import nn +from torch.nn import functional as F +from torch.utils.data import DataLoader, random_split, RandomSampler +from torchmetrics import Accuracy +from torchvision import transforms +from torchvision.datasets import MNIST +import gzip +import shutil +from minio import Minio + +PATH_DATASETS = os.environ.get("PATH_DATASETS", ".") +BATCH_SIZE = 256 if torch.cuda.is_available() else 64 + +local_mnist_path = os.path.dirname(os.path.abspath(__file__)) + +print("prior to running the trainer") +print("MASTER_ADDR: is ", os.getenv("MASTER_ADDR")) +print("MASTER_PORT: is ", os.getenv("MASTER_PORT")) + +STORAGE_BUCKET_EXISTS = "AWS_DEFAULT_ENDPOINT" in os.environ +print("STORAGE_BUCKET_EXISTS: ", STORAGE_BUCKET_EXISTS) + +print(f'Storage_Bucket_Default_Endpoint : is {os.environ.get("AWS_DEFAULT_ENDPOINT")}' if "AWS_DEFAULT_ENDPOINT" in os.environ else "") +print(f'Storage_Bucket_Name : is {os.environ.get("AWS_STORAGE_BUCKET")}' if "AWS_STORAGE_BUCKET" in os.environ else "") +print(f'Storage_Bucket_Mnist_Directory : is {os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR")}' if "AWS_STORAGE_BUCKET_MNIST_DIR" in os.environ else "") + + +class LitMNIST(LightningModule): + def __init__(self, data_dir=PATH_DATASETS, hidden_size=64, learning_rate=2e-4): + super().__init__() + + # Set our init args as class attributes + self.data_dir = data_dir + self.hidden_size = hidden_size + self.learning_rate = learning_rate + + # Hardcode some dataset specific attributes + self.num_classes = 10 + self.dims = (1, 28, 28) + channels, width, height = self.dims + self.transform = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)), + ] + ) + + # Define PyTorch model + self.model = nn.Sequential( + nn.Flatten(), + nn.Linear(channels * width * height, hidden_size), + nn.ReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, hidden_size), + nn.ReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, self.num_classes), + ) + + self.val_accuracy = Accuracy(task="multiclass", num_classes=10) + self.test_accuracy = Accuracy(task="multiclass", num_classes=10) + + def forward(self, x): + x = self.model(x) + return F.log_softmax(x, dim=1) + + def training_step(self, batch, batch_idx): + x, y = batch + logits = self(x) + loss = F.nll_loss(logits, y) + return loss + + def validation_step(self, batch, batch_idx): + x, y = batch + logits = self(x) + loss = F.nll_loss(logits, y) + preds = torch.argmax(logits, dim=1) + self.val_accuracy.update(preds, y) + + # Calling self.log will surface up scalars for you in TensorBoard + self.log("val_loss", loss, prog_bar=True) + self.log("val_acc", self.val_accuracy, prog_bar=True) + + def test_step(self, batch, batch_idx): + x, y = batch + logits = self(x) + loss = F.nll_loss(logits, y) + preds = torch.argmax(logits, dim=1) + self.test_accuracy.update(preds, y) + + # Calling self.log will surface up scalars for you in TensorBoard + self.log("test_loss", loss, prog_bar=True) + self.log("test_acc", self.test_accuracy, prog_bar=True) + + def configure_optimizers(self): + optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) + return optimizer + + #################### + # DATA RELATED HOOKS + #################### + + def prepare_data(self): + # download + print("Downloading MNIST dataset...") + + if ( + STORAGE_BUCKET_EXISTS + and os.environ.get("AWS_DEFAULT_ENDPOINT") != "" + and os.environ.get("AWS_DEFAULT_ENDPOINT") != None + ): + print("Using storage bucket to download datasets...") + + dataset_dir = os.path.join(self.data_dir, "MNIST/raw") + endpoint = os.environ.get("AWS_DEFAULT_ENDPOINT") + access_key = os.environ.get("AWS_ACCESS_KEY_ID") + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + bucket_name = os.environ.get("AWS_STORAGE_BUCKET") + + # remove prefix if specified in storage bucket endpoint url + secure = True + if endpoint.startswith("https://"): + endpoint = endpoint[len("https://") :] + elif endpoint.startswith("http://"): + endpoint = endpoint[len("http://") :] + secure = False + + client = Minio( + endpoint, + access_key=access_key, + secret_key=secret_key, + cert_check=False, + secure=secure, + ) + + if not os.path.exists(dataset_dir): + os.makedirs(dataset_dir) + else: + print(f"Directory '{dataset_dir}' already exists") + + # To download datasets from storage bucket's specific directory, use prefix to provide directory name + prefix = os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR") + # download all files from prefix folder of storage bucket recursively + for item in client.list_objects(bucket_name, prefix=prefix, recursive=True): + file_name = item.object_name[len(prefix) + 1 :] + dataset_file_path = os.path.join(dataset_dir, file_name) + if not os.path.exists(dataset_file_path): + client.fget_object(bucket_name, item.object_name, dataset_file_path) + else: + print(f"File-path '{dataset_file_path}' already exists") + # Unzip files + with gzip.open(dataset_file_path, "rb") as f_in: + with open(dataset_file_path.split(".")[:-1][0], "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + # delete zip file + os.remove(dataset_file_path) + unzipped_filepath = dataset_file_path.split(".")[0] + if os.path.exists(unzipped_filepath): + print( + f"Unzipped and saved dataset file to path - {unzipped_filepath}" + ) + download_datasets = False + + else: + print("Using default MNIST mirror reference to download datasets...") + download_datasets = True + + MNIST(self.data_dir, train=True, download=download_datasets) + MNIST(self.data_dir, train=False, download=download_datasets) + + def setup(self, stage=None): + # Assign train/val datasets for use in dataloaders + if stage == "fit" or stage is None: + mnist_full = MNIST( + self.data_dir, train=True, transform=self.transform, download=False + ) + self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000]) + + # Assign test dataset for use in dataloader(s) + if stage == "test" or stage is None: + self.mnist_test = MNIST( + self.data_dir, train=False, transform=self.transform, download=False + ) + + def train_dataloader(self): + return DataLoader( + self.mnist_train, + batch_size=BATCH_SIZE, + sampler=RandomSampler(self.mnist_train, num_samples=1000), + ) + + def val_dataloader(self): + return DataLoader(self.mnist_val, batch_size=BATCH_SIZE) + + def test_dataloader(self): + return DataLoader(self.mnist_test, batch_size=BATCH_SIZE) + +# Init DataLoader from MNIST Dataset + +model = LitMNIST(data_dir=local_mnist_path) + +print("GROUP: ", int(os.environ.get("GROUP_WORLD_SIZE", 1))) +print("LOCAL: ", int(os.environ.get("LOCAL_WORLD_SIZE", 1))) + +# Initialize a trainer +trainer = Trainer( + # devices=1 if torch.cuda.is_available() else None, # limiting got iPython runs + max_epochs=3, + callbacks=[TQDMProgressBar(refresh_rate=20)], + num_nodes=int(os.environ.get("GROUP_WORLD_SIZE", 1)), + devices=int(os.environ.get("LOCAL_WORLD_SIZE", 1)), + strategy="ddp", +) + +# Train the model +trainer.fit(model) +""" + + pip_requirements = """ +pytorch_lightning==2.4.0 +torchmetrics==1.6.0 +torchvision==0.20.1 +minio +""" + + def assert_job_completion(status): + if status == "SUCCEEDED": + print(f"Job has completed: '{status}'") + assert True + else: + print(f"Job has completed: '{status}'") + assert False + + def assert_jobsubmit_withlogin(cluster, mnist_directory): + with open("/run/secrets/kubernetes.io/serviceaccount/token") as token_file: + auth_token = token_file.read() + print("Auth token: " + auth_token) + ray_dashboard = cluster.cluster_dashboard_uri() + header = {"Authorization": f"Bearer {auth_token}"} + client = RayJobClient(address=ray_dashboard, headers=header, verify=False) + + submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={ + "working_dir": mnist_directory, + "pip": mnist_directory + "/mnist_pip_requirements.txt", + "env_vars": { + "AWS_DEFAULT_ENDPOINT": AWS_DEFAULT_ENDPOINT, + "AWS_STORAGE_BUCKET": AWS_STORAGE_BUCKET, + "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID, + "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY, + "AWS_STORAGE_BUCKET_MNIST_DIR": AWS_STORAGE_BUCKET_MNIST_DIR + }, + }, + entrypoint_num_cpus=1, + ) + print(f"Submitted job with ID: {submission_id}") + done = False + time = 0 + timeout = 900 + while not done: + status = client.get_job_status(submission_id) + if status.is_terminal(): + break + if not done: + print(status) + if timeout and time >= timeout: + raise TimeoutError(f"job has timed out after waiting {timeout}s") + sleep(5) + time += 5 + + logs = client.get_job_logs(submission_id) + print(logs) + + assert_job_completion(status) + + client.delete_job(submission_id) + + cluster.down() + + cluster = Cluster( + ClusterConfiguration( + name="raytest", + num_workers=1, + head_cpu_requests=1, + head_cpu_limits=1, + head_memory_requests=4, + head_memory_limits=4, + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=2, + image="quay.io/modh/ray@sha256:db667df1bc437a7b0965e8031e905d3ab04b86390d764d120e05ea5a5c18d1b4", + verify_tls=False + ) + ) + + # always clean the resources + cluster.down() + print(cluster.status()) + cluster.up() + cluster.wait_ready() + print(cluster.status()) + print(cluster.details()) + + ray_dashboard_uri = cluster.cluster_dashboard_uri() + ray_cluster_uri = cluster.cluster_uri() + print(ray_dashboard_uri) + print(ray_cluster_uri) + + # before proceeding make sure the cluster exists and the uri is not empty + assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding" + assert ray_dashboard_uri, "Ray dashboard needs to be started and set before proceeding" + + mnist_directory = tempfile.mkdtemp(prefix="mnist-dir") + with open(mnist_directory + "/mnist.py", "w") as mnist_file: + mnist_file.write(training_script) + with open(mnist_directory + "/mnist_pip_requirements.txt", "w") as pip_requirements_file: + pip_requirements_file.write(pip_requirements) + + assert_jobsubmit_withlogin(cluster, mnist_directory) + + cluster.down() + +@dsl.pipeline( + name="Ray Integration Test", + description="Ray Integration Test", +) +def ray_job_integration( + AWS_DEFAULT_ENDPOINT: str, + AWS_STORAGE_BUCKET: str, + AWS_ACCESS_KEY_ID: str, + AWS_SECRET_ACCESS_KEY: str, + AWS_STORAGE_BUCKET_MNIST_DIR: str +): + ray_fn( + AWS_DEFAULT_ENDPOINT=AWS_DEFAULT_ENDPOINT, + AWS_STORAGE_BUCKET=AWS_STORAGE_BUCKET, + AWS_ACCESS_KEY_ID=AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY=AWS_SECRET_ACCESS_KEY, + AWS_STORAGE_BUCKET_MNIST_DIR=AWS_STORAGE_BUCKET_MNIST_DIR + ).set_caching_options(False) + + +if __name__ == "__main__": + compiler.Compiler().compile(ray_job_integration, package_path=__file__.replace(".py", "_compiled.yaml")) diff --git a/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml new file mode 100644 index 000000000..408345cea --- /dev/null +++ b/ods_ci/tests/Resources/Files/pipeline-samples/v2/cache-disabled/ray_job_integration_compiled.yaml @@ -0,0 +1,262 @@ +# PIPELINE DEFINITION +# Name: ray-integration-test +# Description: Ray Integration Test +# Inputs: +# AWS_ACCESS_KEY_ID: str +# AWS_DEFAULT_ENDPOINT: str +# AWS_SECRET_ACCESS_KEY: str +# AWS_STORAGE_BUCKET: str +# AWS_STORAGE_BUCKET_MNIST_DIR: str +components: + comp-ray-fn: + executorLabel: exec-ray-fn + inputDefinitions: + parameters: + AWS_ACCESS_KEY_ID: + parameterType: STRING + AWS_DEFAULT_ENDPOINT: + parameterType: STRING + AWS_SECRET_ACCESS_KEY: + parameterType: STRING + AWS_STORAGE_BUCKET: + parameterType: STRING + AWS_STORAGE_BUCKET_MNIST_DIR: + parameterType: STRING +deploymentSpec: + executors: + exec-ray-fn: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - ray_fn + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.11.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'codeflare-sdk==v0.24.0'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef ray_fn(\n AWS_DEFAULT_ENDPOINT: str,\n AWS_STORAGE_BUCKET:\ + \ str,\n AWS_ACCESS_KEY_ID: str,\n AWS_SECRET_ACCESS_KEY: str,\n \ + \ AWS_STORAGE_BUCKET_MNIST_DIR: str\n) -> None:\n import openshift\n\ + \ import subprocess\n import ray # noqa: PLC0415\n import tempfile\n\ + \ from codeflare_sdk import generate_cert # noqa: PLC0415\n from\ + \ codeflare_sdk.ray.cluster import Cluster, ClusterConfiguration # noqa:\ + \ PLC0415\n from codeflare_sdk.ray.client import RayJobClient\n from\ + \ time import sleep\n\n training_script = \"\"\"\nimport os\n\nimport\ + \ torch\nimport requests\nfrom pytorch_lightning import LightningModule,\ + \ Trainer\nfrom pytorch_lightning.callbacks.progress import TQDMProgressBar\n\ + from torch import nn\nfrom torch.nn import functional as F\nfrom torch.utils.data\ + \ import DataLoader, random_split, RandomSampler\nfrom torchmetrics import\ + \ Accuracy\nfrom torchvision import transforms\nfrom torchvision.datasets\ + \ import MNIST\nimport gzip\nimport shutil\nfrom minio import Minio\n\n\ + PATH_DATASETS = os.environ.get(\"PATH_DATASETS\", \".\")\nBATCH_SIZE = 256\ + \ if torch.cuda.is_available() else 64\n\nlocal_mnist_path = os.path.dirname(os.path.abspath(__file__))\n\ + \nprint(\"prior to running the trainer\")\nprint(\"MASTER_ADDR: is \", os.getenv(\"\ + MASTER_ADDR\"))\nprint(\"MASTER_PORT: is \", os.getenv(\"MASTER_PORT\"))\n\ + \nSTORAGE_BUCKET_EXISTS = \"AWS_DEFAULT_ENDPOINT\" in os.environ\nprint(\"\ + STORAGE_BUCKET_EXISTS: \", STORAGE_BUCKET_EXISTS)\n\nprint(f'Storage_Bucket_Default_Endpoint\ + \ : is {os.environ.get(\"AWS_DEFAULT_ENDPOINT\")}' if \"AWS_DEFAULT_ENDPOINT\"\ + \ in os.environ else \"\")\nprint(f'Storage_Bucket_Name : is {os.environ.get(\"\ + AWS_STORAGE_BUCKET\")}' if \"AWS_STORAGE_BUCKET\" in os.environ else \"\"\ + )\nprint(f'Storage_Bucket_Mnist_Directory : is {os.environ.get(\"AWS_STORAGE_BUCKET_MNIST_DIR\"\ + )}' if \"AWS_STORAGE_BUCKET_MNIST_DIR\" in os.environ else \"\")\n\n\nclass\ + \ LitMNIST(LightningModule):\n def __init__(self, data_dir=PATH_DATASETS,\ + \ hidden_size=64, learning_rate=2e-4):\n super().__init__()\n\n \ + \ # Set our init args as class attributes\n self.data_dir =\ + \ data_dir\n self.hidden_size = hidden_size\n self.learning_rate\ + \ = learning_rate\n\n # Hardcode some dataset specific attributes\n\ + \ self.num_classes = 10\n self.dims = (1, 28, 28)\n \ + \ channels, width, height = self.dims\n self.transform = transforms.Compose(\n\ + \ [\n transforms.ToTensor(),\n \ + \ transforms.Normalize((0.1307,), (0.3081,)),\n ]\n )\n\ + \n # Define PyTorch model\n self.model = nn.Sequential(\n\ + \ nn.Flatten(),\n nn.Linear(channels * width * height,\ + \ hidden_size),\n nn.ReLU(),\n nn.Dropout(0.1),\n\ + \ nn.Linear(hidden_size, hidden_size),\n nn.ReLU(),\n\ + \ nn.Dropout(0.1),\n nn.Linear(hidden_size, self.num_classes),\n\ + \ )\n\n self.val_accuracy = Accuracy(task=\"multiclass\",\ + \ num_classes=10)\n self.test_accuracy = Accuracy(task=\"multiclass\"\ + , num_classes=10)\n\n def forward(self, x):\n x = self.model(x)\n\ + \ return F.log_softmax(x, dim=1)\n\n def training_step(self, batch,\ + \ batch_idx):\n x, y = batch\n logits = self(x)\n loss\ + \ = F.nll_loss(logits, y)\n return loss\n\n def validation_step(self,\ + \ batch, batch_idx):\n x, y = batch\n logits = self(x)\n \ + \ loss = F.nll_loss(logits, y)\n preds = torch.argmax(logits,\ + \ dim=1)\n self.val_accuracy.update(preds, y)\n\n # Calling\ + \ self.log will surface up scalars for you in TensorBoard\n self.log(\"\ + val_loss\", loss, prog_bar=True)\n self.log(\"val_acc\", self.val_accuracy,\ + \ prog_bar=True)\n\n def test_step(self, batch, batch_idx):\n \ + \ x, y = batch\n logits = self(x)\n loss = F.nll_loss(logits,\ + \ y)\n preds = torch.argmax(logits, dim=1)\n self.test_accuracy.update(preds,\ + \ y)\n\n # Calling self.log will surface up scalars for you in TensorBoard\n\ + \ self.log(\"test_loss\", loss, prog_bar=True)\n self.log(\"\ + test_acc\", self.test_accuracy, prog_bar=True)\n\n def configure_optimizers(self):\n\ + \ optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)\n\ + \ return optimizer\n\n ####################\n # DATA RELATED\ + \ HOOKS\n ####################\n\n def prepare_data(self):\n \ + \ # download\n print(\"Downloading MNIST dataset...\")\n\n \ + \ if (\n STORAGE_BUCKET_EXISTS\n and os.environ.get(\"\ + AWS_DEFAULT_ENDPOINT\") != \"\"\n and os.environ.get(\"AWS_DEFAULT_ENDPOINT\"\ + ) != None\n ):\n print(\"Using storage bucket to download\ + \ datasets...\")\n\n dataset_dir = os.path.join(self.data_dir,\ + \ \"MNIST/raw\")\n endpoint = os.environ.get(\"AWS_DEFAULT_ENDPOINT\"\ + )\n access_key = os.environ.get(\"AWS_ACCESS_KEY_ID\")\n \ + \ secret_key = os.environ.get(\"AWS_SECRET_ACCESS_KEY\")\n \ + \ bucket_name = os.environ.get(\"AWS_STORAGE_BUCKET\")\n\n \ + \ # remove prefix if specified in storage bucket endpoint url\n \ + \ secure = True\n if endpoint.startswith(\"https://\"):\n\ + \ endpoint = endpoint[len(\"https://\") :]\n elif\ + \ endpoint.startswith(\"http://\"):\n endpoint = endpoint[len(\"\ + http://\") :]\n secure = False\n\n client = Minio(\n\ + \ endpoint,\n access_key=access_key,\n \ + \ secret_key=secret_key,\n cert_check=False,\n\ + \ secure=secure,\n )\n\n if not os.path.exists(dataset_dir):\n\ + \ os.makedirs(dataset_dir)\n else:\n \ + \ print(f\"Directory '{dataset_dir}' already exists\")\n\n \ + \ # To download datasets from storage bucket's specific directory, use\ + \ prefix to provide directory name\n prefix = os.environ.get(\"\ + AWS_STORAGE_BUCKET_MNIST_DIR\")\n # download all files from prefix\ + \ folder of storage bucket recursively\n for item in client.list_objects(bucket_name,\ + \ prefix=prefix, recursive=True):\n file_name = item.object_name[len(prefix)\ + \ + 1 :]\n dataset_file_path = os.path.join(dataset_dir,\ + \ file_name)\n if not os.path.exists(dataset_file_path):\n\ + \ client.fget_object(bucket_name, item.object_name, dataset_file_path)\n\ + \ else:\n print(f\"File-path '{dataset_file_path}'\ + \ already exists\")\n # Unzip files\n with\ + \ gzip.open(dataset_file_path, \"rb\") as f_in:\n with\ + \ open(dataset_file_path.split(\".\")[:-1][0], \"wb\") as f_out:\n \ + \ shutil.copyfileobj(f_in, f_out)\n # delete\ + \ zip file\n os.remove(dataset_file_path)\n \ + \ unzipped_filepath = dataset_file_path.split(\".\")[0]\n \ + \ if os.path.exists(unzipped_filepath):\n print(\n\ + \ f\"Unzipped and saved dataset file to path - {unzipped_filepath}\"\ + \n )\n download_datasets = False\n\n \ + \ else:\n print(\"Using default MNIST mirror reference to download\ + \ datasets...\")\n download_datasets = True\n\n MNIST(self.data_dir,\ + \ train=True, download=download_datasets)\n MNIST(self.data_dir,\ + \ train=False, download=download_datasets)\n\n def setup(self, stage=None):\n\ + \ # Assign train/val datasets for use in dataloaders\n if\ + \ stage == \"fit\" or stage is None:\n mnist_full = MNIST(\n\ + \ self.data_dir, train=True, transform=self.transform, download=False\n\ + \ )\n self.mnist_train, self.mnist_val = random_split(mnist_full,\ + \ [55000, 5000])\n\n # Assign test dataset for use in dataloader(s)\n\ + \ if stage == \"test\" or stage is None:\n self.mnist_test\ + \ = MNIST(\n self.data_dir, train=False, transform=self.transform,\ + \ download=False\n )\n\n def train_dataloader(self):\n \ + \ return DataLoader(\n self.mnist_train,\n batch_size=BATCH_SIZE,\n\ + \ sampler=RandomSampler(self.mnist_train, num_samples=1000),\n\ + \ )\n\n def val_dataloader(self):\n return DataLoader(self.mnist_val,\ + \ batch_size=BATCH_SIZE)\n\n def test_dataloader(self):\n return\ + \ DataLoader(self.mnist_test, batch_size=BATCH_SIZE)\n\n# Init DataLoader\ + \ from MNIST Dataset\n\nmodel = LitMNIST(data_dir=local_mnist_path)\n\n\ + print(\"GROUP: \", int(os.environ.get(\"GROUP_WORLD_SIZE\", 1)))\nprint(\"\ + LOCAL: \", int(os.environ.get(\"LOCAL_WORLD_SIZE\", 1)))\n\n# Initialize\ + \ a trainer\ntrainer = Trainer(\n # devices=1 if torch.cuda.is_available()\ + \ else None, # limiting got iPython runs\n max_epochs=3,\n callbacks=[TQDMProgressBar(refresh_rate=20)],\n\ + \ num_nodes=int(os.environ.get(\"GROUP_WORLD_SIZE\", 1)),\n devices=int(os.environ.get(\"\ + LOCAL_WORLD_SIZE\", 1)),\n strategy=\"ddp\",\n)\n\n# Train the model\n\ + trainer.fit(model)\n\"\"\"\n\n pip_requirements = \"\"\"\npytorch_lightning==2.4.0\n\ + torchmetrics==1.6.0\ntorchvision==0.20.1\nminio\n\"\"\"\n\n def assert_job_completion(status):\n\ + \ if status == \"SUCCEEDED\":\n print(f\"Job has completed:\ + \ '{status}'\")\n assert True\n else:\n print(f\"\ + Job has completed: '{status}'\")\n assert False\n\n def assert_jobsubmit_withlogin(cluster,\ + \ mnist_directory):\n with open(\"/run/secrets/kubernetes.io/serviceaccount/token\"\ + ) as token_file:\n auth_token = token_file.read()\n print(\"\ + Auth token: \" + auth_token)\n ray_dashboard = cluster.cluster_dashboard_uri()\n\ + \ header = {\"Authorization\": f\"Bearer {auth_token}\"}\n \ + \ client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n\ + \n submission_id = client.submit_job(\n entrypoint=\"\ + python mnist.py\",\n runtime_env={\n \"working_dir\"\ + : mnist_directory,\n \"pip\": mnist_directory + \"/mnist_pip_requirements.txt\"\ + ,\n \"env_vars\": {\n \"AWS_DEFAULT_ENDPOINT\"\ + : AWS_DEFAULT_ENDPOINT,\n \"AWS_STORAGE_BUCKET\": AWS_STORAGE_BUCKET,\n\ + \ \"AWS_ACCESS_KEY_ID\": AWS_ACCESS_KEY_ID,\n \ + \ \"AWS_SECRET_ACCESS_KEY\": AWS_SECRET_ACCESS_KEY,\n \ + \ \"AWS_STORAGE_BUCKET_MNIST_DIR\": AWS_STORAGE_BUCKET_MNIST_DIR\n\ + \ },\n },\n entrypoint_num_cpus=1,\n\ + \ )\n print(f\"Submitted job with ID: {submission_id}\")\n\ + \ done = False\n time = 0\n timeout = 900\n \ + \ while not done:\n status = client.get_job_status(submission_id)\n\ + \ if status.is_terminal():\n break\n \ + \ if not done:\n print(status)\n if timeout\ + \ and time >= timeout:\n raise TimeoutError(f\"job has\ + \ timed out after waiting {timeout}s\")\n sleep(5)\n \ + \ time += 5\n\n logs = client.get_job_logs(submission_id)\n\ + \ print(logs)\n\n assert_job_completion(status)\n\n \ + \ client.delete_job(submission_id)\n\n cluster.down()\n\n cluster\ + \ = Cluster(\n ClusterConfiguration(\n name=\"raytest\"\ + ,\n num_workers=1,\n head_cpu_requests=1,\n \ + \ head_cpu_limits=1,\n head_memory_requests=4,\n \ + \ head_memory_limits=4,\n worker_cpu_requests=1,\n \ + \ worker_cpu_limits=1,\n worker_memory_requests=1,\n \ + \ worker_memory_limits=2,\n image=\"quay.io/modh/ray@sha256:db667df1bc437a7b0965e8031e905d3ab04b86390d764d120e05ea5a5c18d1b4\"\ + ,\n verify_tls=False\n )\n )\n\n # always clean\ + \ the resources\n cluster.down()\n print(cluster.status())\n cluster.up()\n\ + \ cluster.wait_ready()\n print(cluster.status())\n print(cluster.details())\n\ + \n ray_dashboard_uri = cluster.cluster_dashboard_uri()\n ray_cluster_uri\ + \ = cluster.cluster_uri()\n print(ray_dashboard_uri)\n print(ray_cluster_uri)\n\ + \n # before proceeding make sure the cluster exists and the uri is not\ + \ empty\n assert ray_cluster_uri, \"Ray cluster needs to be started and\ + \ set before proceeding\"\n assert ray_dashboard_uri, \"Ray dashboard\ + \ needs to be started and set before proceeding\"\n\n mnist_directory\ + \ = tempfile.mkdtemp(prefix=\"mnist-dir\")\n with open(mnist_directory\ + \ + \"/mnist.py\", \"w\") as mnist_file:\n mnist_file.write(training_script)\n\ + \ with open(mnist_directory + \"/mnist_pip_requirements.txt\", \"w\"\ + ) as pip_requirements_file:\n pip_requirements_file.write(pip_requirements)\n\ + \n assert_jobsubmit_withlogin(cluster, mnist_directory)\n\n cluster.down()\n\ + \n" + image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 +pipelineInfo: + description: Ray Integration Test + name: ray-integration-test +root: + dag: + tasks: + ray-fn: + cachingOptions: {} + componentRef: + name: comp-ray-fn + inputs: + parameters: + AWS_ACCESS_KEY_ID: + componentInputParameter: AWS_ACCESS_KEY_ID + AWS_DEFAULT_ENDPOINT: + componentInputParameter: AWS_DEFAULT_ENDPOINT + AWS_SECRET_ACCESS_KEY: + componentInputParameter: AWS_SECRET_ACCESS_KEY + AWS_STORAGE_BUCKET: + componentInputParameter: AWS_STORAGE_BUCKET + AWS_STORAGE_BUCKET_MNIST_DIR: + componentInputParameter: AWS_STORAGE_BUCKET_MNIST_DIR + taskInfo: + name: ray-fn + inputDefinitions: + parameters: + AWS_ACCESS_KEY_ID: + parameterType: STRING + AWS_DEFAULT_ENDPOINT: + parameterType: STRING + AWS_SECRET_ACCESS_KEY: + parameterType: STRING + AWS_STORAGE_BUCKET: + parameterType: STRING + AWS_STORAGE_BUCKET_MNIST_DIR: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.11.0 diff --git a/ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot b/ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot new file mode 100644 index 000000000..9a60d797a --- /dev/null +++ b/ods_ci/tests/Tests/0600__distributed_workloads/0601__workloads_orchestration/test-pipelines-integration.robot @@ -0,0 +1,127 @@ +*** Settings *** +Documentation Test suite for OpenShift Pipeline using kfp python package + +Resource ../../../Resources/RHOSi.resource +Resource ../../../Resources/ODS.robot +Resource ../../../Resources/Common.robot +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDashboard.robot +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDataSciencePipelines.resource +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDataScienceProject/Permissions.resource +Resource ../../../Resources/Page/ODH/ODHDashboard/ODHDataScienceProject/Projects.resource +Resource ../../../Resources/CLI/DataSciencePipelines/DataSciencePipelinesBackend.resource +Resource ../../../Resources/Page/DistributedWorkloads/DistributedWorkloads.resource +Library DateTime +Library ../../../../libs/DataSciencePipelinesAPI.py +Library ../../../../libs/DataSciencePipelinesKfp.py +Test Tags DataSciencePipelines-Backend +Suite Setup Data Science Pipelines Suite Setup +Suite Teardown RHOSi Teardown + + +*** Variables *** +${PROJECT_NAME}= dw-pipelines +${KUEUE_RESOURCES_SETUP_FILEPATH}= tests/Resources/Page/DistributedWorkloads/kueue_resources_setup.sh + + +*** Test Cases *** +Verify Ods Users Can Create And Run A Data Science Pipeline With Ray Using The kfp Python Package + [Documentation] Creates, runs pipelines with regular user. Double check the pipeline result and clean + ... the pipeline resources. + ... AutomationBugOnDisconnected: RHOAIENG-12514 + [Tags] Tier1 + ... AutomationBugOnDisconnected + ... DistributedWorkloads + ... WorkloadsOrchestration + ... DataSciencePipelines-DistributedWorkloads + ${params_dict}= Create Dictionary + ... AWS_DEFAULT_ENDPOINT=${AWS_DEFAULT_ENDPOINT} + ... AWS_STORAGE_BUCKET=${AWS_STORAGE_BUCKET} + ... AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + ... AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + ... AWS_STORAGE_BUCKET_MNIST_DIR=${AWS_STORAGE_BUCKET_MNIST_DIR} + End To End Pipeline Workflow Using Kfp + ... admin_username=${TEST_USER.USERNAME} + ... admin_password=${TEST_USER.PASSWORD} + ... username=${TEST_USER_3.USERNAME} + ... password=${TEST_USER_3.PASSWORD} + ... project=${PROJECT_NAME} + ... python_file=cache-disabled/ray_job_integration.py + ... method_name=ray_job_integration + ... status_check_timeout=600 + ... pipeline_params=${params_dict} + ... ray=${TRUE} + [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} + +Verify Ods Users Can Create And Run A Data Science Pipeline With Ray Job Using The kfp Python Package + [Documentation] Creates, runs pipelines with regular user. Double check the pipeline result and clean + ... the pipeline resources. + ... AutomationBugOnDisconnected: RHOAIENG-12514 + [Tags] Tier1 + ... AutomationBugOnDisconnected + ... DistributedWorkloads + ... WorkloadsOrchestration + ... DataSciencePipelines-DistributedWorkloads + ${ray_dict}= Create Dictionary + End To End Pipeline Workflow Using Kfp + ... admin_username=${TEST_USER.USERNAME} + ... admin_password=${TEST_USER.PASSWORD} + ... username=${TEST_USER_3.USERNAME} + ... password=${TEST_USER_3.PASSWORD} + ... project=${PROJECT_NAME} + ... python_file=cache-disabled/ray_integration.py + ... method_name=ray_integration + ... status_check_timeout=600 + ... pipeline_params=${ray_dict} + ... ray=${TRUE} + [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} + + +*** Keywords *** +# robocop: disable:line-too-long +End To End Pipeline Workflow Using Kfp + [Documentation] Create, run and double check the pipeline result using Kfp python package. In the end, + ... clean the pipeline resources. + [Arguments] ${username} ${password} ${admin_username} ${admin_password} ${project} ${python_file} + ... ${method_name} ${pipeline_params} ${status_check_timeout}=160 ${ray}=${FALSE} + + Projects.Delete Project Via CLI By Display Name ${project} + Projects.Create Data Science Project From CLI name=${project} + + DataSciencePipelinesBackend.Create PipelineServer Using Custom DSPA ${project} + + ${status} Login And Wait Dsp Route ${admin_username} ${admin_password} ${project} + Should Be True ${status} == 200 Could not login to the Data Science Pipelines Rest API OR DSP routing is not working + # we remove and add a new project for sanity. LocalQueue is per namespace + IF ${ray} == ${TRUE} + Setup Kueue Resources ${project} cluster-queue-user resource-flavor-user local-queue-user + END + # The run_robot_test.sh is sending the --variablefile ${TEST_VARIABLES_FILE} which may contain the `PIP_INDEX_URL` + # and `PIP_TRUSTED_HOST` variables, e.g. for disconnected testing. + Launch Data Science Project Main Page username=${admin_username} password=${admin_password} + Assign Contributor Permissions To User ${username} in Project ${project} + ${pip_index_url} = Get Variable Value ${PIP_INDEX_URL} ${NONE} + ${pip_trusted_host} = Get Variable Value ${PIP_TRUSTED_HOST} ${NONE} + Log pip_index_url = ${pip_index_url} / pip_trusted_host = ${pip_trusted_host} + ${run_id} Create Run From Pipeline Func ${username} ${password} ${project} + ... ${python_file} ${method_name} pipeline_params=${pipeline_params} pip_index_url=${pip_index_url} + ... pip_trusted_host=${pip_trusted_host} + ${run_status} Check Run Status ${run_id} timeout=${status_check_timeout} + Should Be Equal As Strings ${run_status} SUCCEEDED Pipeline run doesn't have a status that means success. Check the logs + +Data Science Pipelines Suite Setup + [Documentation] Data Science Pipelines Suite Setup + Set Library Search Order SeleniumLibrary + RHOSi Setup + +Setup Kueue Resources + [Documentation] Setup the kueue resources for the project + [Arguments] ${project_name} ${cluster_queue_name} ${resource_flavor_name} ${local_queue_name} + # Easy for debug + Log sh ${KUEUE_RESOURCES_SETUP_FILEPATH} ${cluster_queue_name} ${resource_flavor_name} ${local_queue_name} ${project_name} "2" "8" + ${result} = Run Process sh ${KUEUE_RESOURCES_SETUP_FILEPATH} ${cluster_queue_name} ${resource_flavor_name} ${local_queue_name} ${project_name} "2" "8" + ... shell=true + ... stderr=STDOUT + Log ${result.stdout} + IF ${result.rc} != 0 + FAIL Failed to setup kueue resources + END diff --git a/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot b/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot index ed16dcad7..455bb3c62 100644 --- a/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot +++ b/ods_ci/tests/Tests/1100__data_science_pipelines/1101__data-science-pipelines-kfp.robot @@ -76,27 +76,6 @@ Verify Upload Download In Data Science Pipelines Using The kfp Python Package ... pipeline_params=${upload_download_dict} [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} -Verify Ods Users Can Create And Run A Data Science Pipeline With Ray Using The kfp Python Package - [Documentation] Creates, runs pipelines with regular user. Double check the pipeline result and clean - ... the pipeline resources. - ... AutomationBugOnDisconnected: RHOAIENG-12514 - [Tags] Tier1 AutomationBugOnDisconnected - Skip If Component Is Not Enabled ray - Skip If Component Is Not Enabled codeflare - ${ray_dict}= Create Dictionary - End To End Pipeline Workflow Using Kfp - ... admin_username=${TEST_USER.USERNAME} - ... admin_password=${TEST_USER.PASSWORD} - ... username=${TEST_USER_3.USERNAME} - ... password=${TEST_USER_3.PASSWORD} - ... project=${PROJECT_NAME} - ... python_file=cache-disabled/ray_integration.py - ... method_name=ray_integration - ... status_check_timeout=600 - ... pipeline_params=${ray_dict} - ... ray=${TRUE} - [Teardown] Projects.Delete Project Via CLI By Display Name ${PROJECT_NAME} - *** Keywords *** # robocop: disable:line-too-long