diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..08b67a5 Binary files /dev/null and b/.DS_Store differ diff --git a/downloads/fraud_detection.yaml b/downloads/fraud_detection.yaml new file mode 100644 index 0000000..b0ce1cd --- /dev/null +++ b/downloads/fraud_detection.yaml @@ -0,0 +1,638 @@ +# PIPELINE DEFINITION +# Name: fraud-detection-training-pipeline +# Description: Trains the fraud detection model. +# Inputs: +# datastore: dict +# hyperparameters: dict +# Outputs: +# evaluate-keras-model-performance-classification_metrics: system.ClassificationMetrics +# evaluate-keras-model-performance-metrics: system.Metrics +components: + comp-convert-keras-to-onnx: + executorLabel: exec-convert-keras-to-onnx + inputDefinitions: + artifacts: + keras_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + onnx_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + comp-evaluate-keras-model-performance: + executorLabel: exec-evaluate-keras-model-performance + inputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + scaler: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + test_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + previous_model_metrics: + parameterType: STRUCT + outputDefinitions: + artifacts: + classification_metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 + markdown: + artifactType: + schemaTitle: system.Markdown + schemaVersion: 0.0.1 + metrics: + artifactType: + schemaTitle: system.Metrics + schemaVersion: 0.0.1 + comp-fetch-transactionsdb-data: + executorLabel: exec-fetch-transactionsdb-data + inputDefinitions: + parameters: + datastore: + parameterType: STRUCT + outputDefinitions: + artifacts: + dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-preprocess-transactiondb-data: + executorLabel: exec-preprocess-transactiondb-data + inputDefinitions: + artifacts: + in_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + scaler: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + test_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + train_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + val_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + class_weights: + parameterType: STRUCT + comp-train-fraud-model: + executorLabel: exec-train-fraud-model + inputDefinitions: + artifacts: + scaler: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + train_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + val_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + class_weights: + parameterType: STRUCT + hyperparameters: + parameterType: STRUCT + outputDefinitions: + artifacts: + trained_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + comp-validate-onnx-model: + executorLabel: exec-validate-onnx-model + inputDefinitions: + artifacts: + keras_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + onnx_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + test_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-validate-transactiondb-data: + executorLabel: exec-validate-transactiondb-data + inputDefinitions: + artifacts: + dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: BOOLEAN +deploymentSpec: + executors: + exec-convert-keras-to-onnx: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - convert_keras_to_onnx + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'tf2onnx' 'onnx'\ + \ 'pandas' 'scikit-learn' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef convert_keras_to_onnx(\n keras_model: Input[Model],\n onnx_model:\ + \ Output[Model],\n):\n import tf2onnx, onnx\n import keras\n import\ + \ tensorflow as tf\n\n trained_keras_model = keras.saving.load_model(keras_model.path)\n\ + \ input_signature = [tf.TensorSpec(trained_keras_model.inputs[0].shape,\ + \ trained_keras_model.inputs[0].dtype, name='input')]\n trained_keras_model.output_names\ + \ = ['output']\n onnx_model_proto, _ = tf2onnx.convert.from_keras(trained_keras_model,\ + \ input_signature)\n\n onnx_model.path += \".onnx\"\n onnx.save(onnx_model_proto,\ + \ onnx_model.path)\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-evaluate-keras-model-performance: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - evaluate_keras_model_performance + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'tf2onnx' 'onnx'\ + \ 'pandas' 'scikit-learn' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef evaluate_keras_model_performance(\n model: Input[Model],\n\ + \ test_data: Input[Dataset],\n scaler: Input[Model],\n previous_model_metrics:\ + \ dict,\n metrics: Output[Metrics],\n classification_metrics: Output[ClassificationMetrics],\n\ + \ markdown: Output[Markdown]\n):\n import keras\n import pandas\ + \ as pd\n from sklearn.metrics import confusion_matrix\n import numpy\ + \ as np\n\n trained_model = keras.saving.load_model(model.path)\n \ + \ with open(test_data.path, 'rb') as pickle_file:\n X_test, y_test\ + \ = pd.read_pickle(pickle_file)\n with open(scaler.path, 'rb') as pickle_file:\n\ + \ st_scaler = pd.read_pickle(pickle_file)\n\n y_pred_temp = trained_model.predict(st_scaler.transform(X_test.values))\n\ + \ y_pred_temp = np.asarray(np.squeeze(y_pred_temp))\n threshold =\ + \ 0.95\n y_pred = np.where(y_pred_temp > threshold, 1,0)\n accuracy\ + \ = np.sum(np.asarray(y_test) == y_pred) / len(y_pred)\n\n metrics.log_metric(\"\ + Accuracy\", accuracy)\n metrics.log_metric(\"Prev Model Accuracy\", previous_model_metrics[\"\ + accuracy\"])\n\n cmatrix = confusion_matrix(np.asarray(y_test), y_pred)\n\ + \ cmatrix = cmatrix.tolist()\n targets = [\"0\", \"1\"] #TODO: Replace\ + \ with info from schema\n classification_metrics.log_confusion_matrix(targets,\ + \ cmatrix)\n\n with open(markdown.path, 'w') as f:\n f.write(\"\ + ### Accuracy\\n\")\n f.write(f'Accuracy: {accuracy:.2f}\\n')\n \ + \ f.write(\"### Previous Model Accuracy\\n\")\n f.write(f'Accuracy:\ + \ {previous_model_metrics[\"accuracy\"]:.2f}\\n')\n\n if accuracy <=\ + \ previous_model_metrics[\"accuracy\"]:\n raise Exception(\"Accuracy\ + \ is lower than the previous models\")\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-fetch-transactionsdb-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - fetch_transactionsdb_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'psycopg2' 'pandas'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef fetch_transactionsdb_data(\n datastore: dict,\n dataset:\ + \ Output[Dataset]\n):\n \"\"\"\n Fetches data from the transactionsdb\ + \ datastore\n \"\"\"\n import urllib.request\n print(\"starting\ + \ download...\")\n url = datastore['url']\n urllib.request.urlretrieve(url,\ + \ dataset.path)\n print(\"done\")\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 + exec-preprocess-transactiondb-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - preprocess_transactiondb_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'scikit-learn'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef preprocess_transactiondb_data(\n in_data: Input[Dataset],\n\ + \ train_data: Output[Dataset],\n val_data: Output[Dataset],\n test_data:\ + \ Output[Dataset],\n scaler: Output[Model],\n) -> NamedTuple('outputs',\ + \ class_weights=dict):\n \"\"\"\n Takes the dataset and preprocesses\ + \ it to better train on the fraud detection model.\n The preprocessing\ + \ consists of:\n 1. Splitting the dataset into training, validation,\ + \ and testing.\n 2. Creating a scaler which scales down the training\ + \ dataset. This scaler is saved as an artifact.\n 3. Calculates the class\ + \ weights, which will later be used during the training.\n \"\"\"\n\n\ + \ from sklearn.model_selection import train_test_split\n from sklearn.preprocessing\ + \ import StandardScaler\n from sklearn.utils import class_weight\n \ + \ import pandas as pd\n import pickle\n import numpy as np\n from\ + \ typing import NamedTuple\n\n df = pd.read_csv(in_data.path)\n print(df.head())\n\ + \ X = df.drop(columns = ['repeat_retailer','distance_from_home', 'fraud'])\n\ + \ y = df['fraud']\n\n # Split the data into training and testing sets\ + \ so you have something to test the trained model with.\n\n # X_train,\ + \ X_test, y_train, y_test = train_test_split(X,y, test_size = 0.2, stratify\ + \ = y)\n X_train, X_test, y_train, y_test = train_test_split(X,y, test_size\ + \ = 0.2, shuffle = False)\n\n X_train, X_val, y_train, y_val = train_test_split(X_train,y_train,\ + \ test_size = 0.2, stratify = y_train)\n\n # Scale the data to remove\ + \ mean and have unit variance. The data will be between -1 and 1, which\ + \ makes it a lot easier for the model to learn than random (and potentially\ + \ large) values.\n # It is important to only fit the scaler to the training\ + \ data, otherwise you are leaking information about the global distribution\ + \ of variables (which is influenced by the test set) into the training set.\n\ + \n st_scaler = StandardScaler()\n\n X_train = st_scaler.fit_transform(X_train.values)\n\ + \n train_data.path += \".pkl\"\n val_data.path += \".pkl\"\n test_data.path\ + \ += \".pkl\"\n scaler.path += \".pkl\"\n\n with open(train_data.path,\ + \ \"wb\") as handle:\n pickle.dump((X_train, y_train), handle)\n\ + \ with open(val_data.path, \"wb\") as handle:\n pickle.dump((X_val,\ + \ y_val), handle)\n with open(test_data.path, \"wb\") as handle:\n \ + \ pickle.dump((X_test, y_test), handle)\n with open(scaler.path,\ + \ \"wb\") as handle:\n pickle.dump(st_scaler, handle)\n\n # Since\ + \ the dataset is unbalanced (it has many more non-fraud transactions than\ + \ fraudulent ones), set a class weight to weight the few fraudulent transactions\ + \ higher than the many non-fraud transactions.\n\n class_weights = class_weight.compute_class_weight('balanced',classes\ + \ = np.unique(y_train),y = y_train)\n class_weights = {i : class_weights[i]\ + \ for i in range(len(class_weights))}\n\n outputs = NamedTuple('outputs',\ + \ class_weights=dict)\n return outputs(class_weights)\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 + exec-train-fraud-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_fraud_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'scikit-learn'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_fraud_model(\n train_data: Input[Dataset],\n val_data:\ + \ Input[Dataset],\n scaler: Input[Model],\n class_weights: dict,\n\ + \ hyperparameters: dict,\n trained_model: Output[Model]\n):\n \"\ + \"\"\n Trains a dense tensorflow model.\n \"\"\"\n\n from keras.models\ + \ import Sequential\n from keras.layers import Dense, Dropout, BatchNormalization,\ + \ Activation\n import pickle\n import pandas as pd\n import sklearn\n\ + \n with open(train_data.path, 'rb') as pickle_file:\n X_train,\ + \ y_train = pd.read_pickle(pickle_file)\n with open(val_data.path, 'rb')\ + \ as pickle_file:\n X_val, y_val = pd.read_pickle(pickle_file)\n\ + \ with open(scaler.path, 'rb') as pickle_file:\n st_scaler = pd.read_pickle(pickle_file)\n\ + \n y_train = y_train.to_numpy()\n y_val = y_val.to_numpy()\n\n \ + \ model = Sequential()\n model.add(Dense(32, activation = 'relu', input_dim\ + \ = X_train.shape[1]))\n model.add(Dropout(0.2))\n model.add(Dense(32))\n\ + \ model.add(BatchNormalization())\n model.add(Activation('relu'))\n\ + \ model.add(Dropout(0.2))\n model.add(Dense(32))\n model.add(BatchNormalization())\n\ + \ model.add(Activation('relu'))\n model.add(Dropout(0.2))\n model.add(Dense(1,\ + \ activation = 'sigmoid'))\n model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])\n\ + \ model.summary()\n\n epochs = hyperparameters[\"epochs\"]\n history\ + \ = model.fit(X_train, y_train, epochs=epochs, \\\n \ + \ validation_data=(st_scaler.transform(X_val.values),y_val), \\\n \ + \ verbose = True, class_weight = class_weights)\n \ + \ print(\"Training of model is complete\")\n\n trained_model.path +=\ + \ \".keras\"\n model.save(trained_model.path)\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-validate-onnx-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - validate_onnx_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'onnxruntime'\ + \ 'pandas' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef validate_onnx_model(\n onnx_model: Input[Model],\n keras_model:\ + \ Input[Model],\n test_data: Input[Dataset],\n):\n import onnxruntime\ + \ as rt\n import pandas as pd\n import numpy as np\n import keras\n\ + \n with open(test_data.path, 'rb') as pickle_file:\n X_test, _\ + \ = pd.read_pickle(pickle_file) \n _keras_model = keras.saving.load_model(keras_model.path)\n\ + \ onnx_session = rt.InferenceSession(onnx_model.path, providers=rt.get_available_providers())\n\ + \n onnx_input_name = onnx_session.get_inputs()[0].name\n onnx_output_name\ + \ = onnx_session.get_outputs()[0].name\n onnx_pred = onnx_session.run([onnx_output_name],\ + \ {onnx_input_name: X_test.values.astype(np.float32)})\n\n keras_pred\ + \ = _keras_model(X_test.values)\n\n print(\"Keras Pred: \", keras_pred)\n\ + \ print(\"ONNX Pred: \", onnx_pred[0])\n\n for rt_res, keras_res in\ + \ zip(onnx_pred[0], keras_pred):\n np.testing.assert_allclose(rt_res,\ + \ keras_res, rtol=1e-5, atol=1e-5)\n\n print(\"Results match\")\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-validate-transactiondb-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - validate_transactiondb_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef validate_transactiondb_data(\n dataset: Input[Dataset]\n)\ + \ -> bool:\n \"\"\"\n Validates if the data schema is correct and\ + \ if the values are reasonable.\n \"\"\"\n\n if not dataset.path:\n\ + \ raise Exception(\"dataset not found\")\n return True\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 +pipelineInfo: + description: Trains the fraud detection model. + name: fraud-detection-training-pipeline +root: + dag: + outputs: + artifacts: + evaluate-keras-model-performance-classification_metrics: + artifactSelectors: + - outputArtifactKey: classification_metrics + producerSubtask: evaluate-keras-model-performance + evaluate-keras-model-performance-metrics: + artifactSelectors: + - outputArtifactKey: metrics + producerSubtask: evaluate-keras-model-performance + tasks: + convert-keras-to-onnx: + cachingOptions: + enableCache: true + componentRef: + name: comp-convert-keras-to-onnx + dependentTasks: + - train-fraud-model + inputs: + artifacts: + keras_model: + taskOutputArtifact: + outputArtifactKey: trained_model + producerTask: train-fraud-model + taskInfo: + name: convert-keras-to-onnx + evaluate-keras-model-performance: + cachingOptions: + enableCache: true + componentRef: + name: comp-evaluate-keras-model-performance + dependentTasks: + - preprocess-transactiondb-data + - train-fraud-model + inputs: + artifacts: + model: + taskOutputArtifact: + outputArtifactKey: trained_model + producerTask: train-fraud-model + scaler: + taskOutputArtifact: + outputArtifactKey: scaler + producerTask: preprocess-transactiondb-data + test_data: + taskOutputArtifact: + outputArtifactKey: test_data + producerTask: preprocess-transactiondb-data + parameters: + previous_model_metrics: + runtimeValue: + constant: + accuracy: 0.85 + taskInfo: + name: evaluate-keras-model-performance + fetch-transactionsdb-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-fetch-transactionsdb-data + inputs: + parameters: + datastore: + componentInputParameter: datastore + taskInfo: + name: fetch-transactionsdb-data + preprocess-transactiondb-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-preprocess-transactiondb-data + dependentTasks: + - fetch-transactionsdb-data + inputs: + artifacts: + in_data: + taskOutputArtifact: + outputArtifactKey: dataset + producerTask: fetch-transactionsdb-data + taskInfo: + name: preprocess-transactiondb-data + train-fraud-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-fraud-model + dependentTasks: + - preprocess-transactiondb-data + inputs: + artifacts: + scaler: + taskOutputArtifact: + outputArtifactKey: scaler + producerTask: preprocess-transactiondb-data + train_data: + taskOutputArtifact: + outputArtifactKey: train_data + producerTask: preprocess-transactiondb-data + val_data: + taskOutputArtifact: + outputArtifactKey: val_data + producerTask: preprocess-transactiondb-data + parameters: + class_weights: + taskOutputParameter: + outputParameterKey: class_weights + producerTask: preprocess-transactiondb-data + hyperparameters: + componentInputParameter: hyperparameters + taskInfo: + name: train-fraud-model + validate-onnx-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-validate-onnx-model + dependentTasks: + - convert-keras-to-onnx + - preprocess-transactiondb-data + - train-fraud-model + inputs: + artifacts: + keras_model: + taskOutputArtifact: + outputArtifactKey: trained_model + producerTask: train-fraud-model + onnx_model: + taskOutputArtifact: + outputArtifactKey: onnx_model + producerTask: convert-keras-to-onnx + test_data: + taskOutputArtifact: + outputArtifactKey: test_data + producerTask: preprocess-transactiondb-data + taskInfo: + name: validate-onnx-model + validate-transactiondb-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-validate-transactiondb-data + dependentTasks: + - fetch-transactionsdb-data + inputs: + artifacts: + dataset: + taskOutputArtifact: + outputArtifactKey: dataset + producerTask: fetch-transactionsdb-data + taskInfo: + name: validate-transactiondb-data + inputDefinitions: + parameters: + datastore: + parameterType: STRUCT + hyperparameters: + parameterType: STRUCT + outputDefinitions: + artifacts: + evaluate-keras-model-performance-classification_metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 + evaluate-keras-model-performance-metrics: + artifactType: + schemaTitle: system.Metrics + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0 diff --git a/downloads/get_data_train_upload.yaml b/downloads/get_data_train_upload.yaml new file mode 100644 index 0000000..4927057 --- /dev/null +++ b/downloads/get_data_train_upload.yaml @@ -0,0 +1,255 @@ +# PIPELINE DEFINITION +# Name: 7-get-data-train-upload +components: + comp-get-data: + executorLabel: exec-get-data + outputDefinitions: + artifacts: + data_output_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-train-model: + executorLabel: exec-train-model + inputDefinitions: + artifacts: + data_input_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + model_output_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-upload-model: + executorLabel: exec-upload-model + inputDefinitions: + artifacts: + input_model_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-get-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.5.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_data(data_output_path: OutputPath()):\n import urllib.request\n\ + \ print(\"starting download...\")\n url = \"https://raw.githubusercontent.com/rh-aiservices-bu/fraud-detection/main/data/card_transdata.csv\"\ + \n urllib.request.urlretrieve(url, data_output_path)\n print(\"done\"\ + )\n\n" + image: quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301 + exec-train-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.5.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'tf2onnx' 'seaborn'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_model(data_input_path: InputPath(), model_output_path:\ + \ OutputPath()):\n import numpy as np\n import pandas as pd\n from\ + \ keras.models import Sequential\n from keras.layers import Dense, Dropout,\ + \ BatchNormalization, Activation\n from sklearn.model_selection import\ + \ train_test_split\n from sklearn.preprocessing import StandardScaler\n\ + \ from sklearn.utils import class_weight\n import tf2onnx\n import\ + \ onnx\n import pickle\n from pathlib import Path\n\n # Load the\ + \ CSV data which we will use to train the model.\n # It contains the\ + \ following fields:\n # distancefromhome - The distance from home where\ + \ the transaction happened.\n # distancefromlast_transaction - The\ + \ distance from last transaction happened.\n # ratiotomedianpurchaseprice\ + \ - Ratio of purchased price compared to median purchase price.\n # \ + \ repeat_retailer - If it's from a retailer that already has been purchased\ + \ from before.\n # used_chip - If the (credit card) chip was used.\n\ + \ # usedpinnumber - If the PIN number was used.\n # online_order\ + \ - If it was an online order.\n # fraud - If the transaction is fraudulent.\n\ + \ Data = pd.read_csv(data_input_path)\n\n # Set the input (X) and\ + \ output (Y) data.\n # The only output data we have is if it's fraudulent\ + \ or not, and all other fields go as inputs to the model.\n\n X = Data.drop(columns\ + \ = ['repeat_retailer','distance_from_home', 'fraud'])\n y = Data['fraud']\n\ + \n # Split the data into training and testing sets so we have something\ + \ to test the trained model with.\n\n # X_train, X_test, y_train, y_test\ + \ = train_test_split(X,y, test_size = 0.2, stratify = y)\n X_train, X_test,\ + \ y_train, y_test = train_test_split(X,y, test_size = 0.2, shuffle = False)\n\ + \n X_train, X_val, y_train, y_val = train_test_split(X_train,y_train,\ + \ test_size = 0.2, stratify = y_train)\n\n # Scale the data to remove\ + \ mean and have unit variance. This means that the data will be between\ + \ -1 and 1, which makes it a lot easier for the model to learn than random\ + \ potentially large values.\n # It is important to only fit the scaler\ + \ to the training data, otherwise you are leaking information about the\ + \ global distribution of variables (which is influenced by the test set)\ + \ into the training set.\n\n scaler = StandardScaler()\n\n X_train\ + \ = scaler.fit_transform(X_train.values)\n\n Path(\"artifact\").mkdir(parents=True,\ + \ exist_ok=True)\n with open(\"artifact/test_data.pkl\", \"wb\") as handle:\n\ + \ pickle.dump((X_test, y_test), handle)\n with open(\"artifact/scaler.pkl\"\ + , \"wb\") as handle:\n pickle.dump(scaler, handle)\n\n # Since\ + \ the dataset is unbalanced (it has many more non-fraud transactions than\ + \ fraudulent ones), we set a class weight to weight the few fraudulent transactions\ + \ higher than the many non-fraud transactions.\n\n class_weights = class_weight.compute_class_weight('balanced',classes\ + \ = np.unique(y_train),y = y_train)\n class_weights = {i : class_weights[i]\ + \ for i in range(len(class_weights))}\n\n\n # Build the model, the model\ + \ we build here is a simple fully connected deep neural network, containing\ + \ 3 hidden layers and one output layer.\n\n model = Sequential()\n \ + \ model.add(Dense(32, activation = 'relu', input_dim = len(X.columns)))\n\ + \ model.add(Dropout(0.2))\n model.add(Dense(32))\n model.add(BatchNormalization())\n\ + \ model.add(Activation('relu'))\n model.add(Dropout(0.2))\n model.add(Dense(32))\n\ + \ model.add(BatchNormalization())\n model.add(Activation('relu'))\n\ + \ model.add(Dropout(0.2))\n model.add(Dense(1, activation = 'sigmoid'))\n\ + \ model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])\n\ + \ model.summary()\n\n\n # Train the model and get performance\n\n\ + \ epochs = 2\n history = model.fit(X_train, y_train, epochs=epochs,\ + \ \\\n validation_data=(scaler.transform(X_val.values),y_val),\ + \ \\\n verbose = True, class_weight = class_weights)\n\ + \n # Save the model as ONNX for easy use of ModelMesh\n\n model_proto,\ + \ _ = tf2onnx.convert.from_keras(model)\n print(model_output_path)\n\ + \ onnx.save(model_proto, model_output_path)\n\n" + image: quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301 + exec-upload-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - upload_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.5.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'botocore'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef upload_model(input_model_path: InputPath()):\n import os\n\ + \ import boto3\n import botocore\n\n aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')\n\ + \ aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')\n \ + \ endpoint_url = os.environ.get('AWS_S3_ENDPOINT')\n region_name =\ + \ os.environ.get('AWS_DEFAULT_REGION')\n bucket_name = os.environ.get('AWS_S3_BUCKET')\n\ + \n s3_key = os.environ.get(\"S3_KEY\")\n\n session = boto3.session.Session(aws_access_key_id=aws_access_key_id,\n\ + \ aws_secret_access_key=aws_secret_access_key)\n\ + \n s3_resource = session.resource(\n 's3',\n config=botocore.client.Config(signature_version='s3v4'),\n\ + \ endpoint_url=endpoint_url,\n region_name=region_name)\n\n\ + \ bucket = s3_resource.Bucket(bucket_name)\n\n print(f\"Uploading\ + \ {s3_key}\")\n bucket.upload_file(input_model_path, s3_key)\n\n" + env: + - name: S3_KEY + value: models/fraud/1/model.onnx + image: quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301 +pipelineInfo: + name: 7-get-data-train-upload +root: + dag: + tasks: + get-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-data + taskInfo: + name: get-data + train-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-model + dependentTasks: + - get-data + inputs: + artifacts: + data_input_path: + taskOutputArtifact: + outputArtifactKey: data_output_path + producerTask: get-data + taskInfo: + name: train-model + upload-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-upload-model + dependentTasks: + - train-model + inputs: + artifacts: + input_model_path: + taskOutputArtifact: + outputArtifactKey: model_output_path + producerTask: train-model + taskInfo: + name: upload-model +schemaVersion: 2.1.0 +sdkVersion: kfp-2.5.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-upload-model: + secretAsEnv: + - keyToEnv: + - envVar: AWS_ACCESS_KEY_ID + secretKey: AWS_ACCESS_KEY_ID + - envVar: AWS_SECRET_ACCESS_KEY + secretKey: AWS_SECRET_ACCESS_KEY + - envVar: AWS_DEFAULT_REGION + secretKey: AWS_DEFAULT_REGION + - envVar: AWS_S3_BUCKET + secretKey: AWS_S3_BUCKET + - envVar: AWS_S3_ENDPOINT + secretKey: AWS_S3_ENDPOINT + secretName: aws-connection-my-storage diff --git a/modules/LABENV/pages/index.adoc b/modules/LABENV/pages/index.adoc index 50bec64..68d9919 100644 --- a/modules/LABENV/pages/index.adoc +++ b/modules/LABENV/pages/index.adoc @@ -31,9 +31,9 @@ Red Hat OpenShift Operators automate the creation, configuration, and management Included in Red Hat OpenShift is the Embedded OperatorHub, a registry of certified Operators from software vendors and open source projects. Within the Embedded OperatorHub you can browse and install a library of Operators that have been verified to work with Red Hat OpenShift and that have been packaged for easy lifecycle management. -== Lab: Installation of Red Hat OpenShift AI +== Lab Exercise: Installation of Red Hat OpenShift AI -This section will discuss the process for installing the dependent operators using the OpenShift Web Console. +This section will discuss the process for installing the dependent operators using the OpenShift Web Console. ( ~15 minutes ) IMPORTANT: The installation requires a user with the _cluster-admin_ role @@ -41,7 +41,7 @@ This exercise uses the Red Hat Demo Platform; specifically the OpenShift Contain . Login to the Red Hat OpenShift using a user which has the _cluster-admin_ role assigned. -. Navigate to **Operators** -> **OperatorHub** and search for each of the following Operators individually. Click on the button or tile for each. In the pop up window that opens, ensure you select the latest version in the *stable* channel and click on **Install** to open the operator's installation view. For this lab you can skip the installation of the optional operators. +. Navigate to **Operators** -> **OperatorHub** and search for each of the following Operators individually. For this lab you can skip the installation of the optional operators. [*] You do not have to wait for the previous Operator to complete before installing the next. For this lab you can skip the installation of the optional operators as there is no accelerator required. // Should this be a note? diff --git a/modules/LABENV/pages/minio-install.adoc b/modules/LABENV/pages/minio-install.adoc index 32ace95..913b871 100644 --- a/modules/LABENV/pages/minio-install.adoc +++ b/modules/LABENV/pages/minio-install.adoc @@ -200,7 +200,7 @@ image::minio_setup.gif[width=600] From the OCP Dashboard: - . Select Networking / Routes from the navigation menu. + . Select Networking / Routes from the navigation menu. . This will display two routes, one for the UI & another for the API. (if the routes are not visible, make sure you have the project selected that matches your data sicence project created earlier) @@ -223,7 +223,7 @@ Once logged into the MinIO Console: .. *pipelines* - .. *storage* + .. *my-storage* .. *models* (optional) diff --git a/modules/ROOT/pages/index.adoc b/modules/ROOT/pages/index.adoc index 06c777a..9c57b6e 100644 --- a/modules/ROOT/pages/index.adoc +++ b/modules/ROOT/pages/index.adoc @@ -5,7 +5,7 @@ Data science pipelines can be a game-changer for AI model development. By breaking down complex tasks into smaller, manageable steps, we can optimize each part of the process, ensuring that our models are trained and validated. Additionally, pipelines can help us maintain consistent results by versioning inputs and outputs, allowing us to track changes and identify potential issues. -This course is tailored for infrastructure solution architects and engineers who are tasked with deploying and managing data science pipelines on the OpenShift AI platform. By the end of this course, learners will have a solid understanding of how to deploy and support data scientists who will use RHOAI to design, build, and maintain efficient and effective data science pipelines in an OpenShift AI environment. +This course is tailored for infrastructure solution architects and engineers who are tasked with deploying and managing data science pipelines on the OpenShift AI platform. By the end of this course, learners will have a solid understanding of how to deploy resources and support data scientists who will use RHOAI to design, build, and maintain efficient and effective data science pipelines in an OpenShift AI environment. Let's explore how pipelines can help us optimize training tasks, manage caching steps, and create more maintainable and reusable workloads. diff --git a/modules/chapter1/pages/dsp-concepts.adoc b/modules/chapter1/pages/dsp-concepts.adoc index 09444ca..7cfee39 100644 --- a/modules/chapter1/pages/dsp-concepts.adoc +++ b/modules/chapter1/pages/dsp-concepts.adoc @@ -46,11 +46,11 @@ Data science pipelines may consists of several key activities that are performed A single pipeline may include the ability to train multiple models, complete complex hyperparameter searches, or more. Data Scientists can use a well crafted pipeline to quickly iterate on a model, adjust how data is transformed, test different algorithms, and more. While the steps described above describe a common pattern for model training, different use cases and projects may have vastly different requirements and the tools and framework selected for creating a data science pipeline should help to enable a flexible design. -=== Technical Knowledge +=== RHOAI Data Science Pipeline Engine OpenShift AI uses Kubeflow pipelines with Argo workflows as the engine. Kubeflow provides a rich set of tools for managing ML workloads, while Argo workflows offer powerful automation capabilities. Together, they enable us to create robust, scalable, and manageable pipelines for AI model development and serving. -Pipelines can include various components, such as data ingestion, data preprocessing, model training, evaluation, and deployment. These components can be configured to run in a specific order, and the pipeline can be executed multiple times to produce different versions of models or artifacts. +Pipelines can include various components, such as data ingestion, data preprocessing, model training, evaluation, and deployment. _These components can be configured to run in a specific order, and the pipeline can be executed multiple times to produce different versions of models or artifacts._ Additionally, pipelines can support control flows to handle complex dependencies between tasks. Once a pipeline is defined, executing it becomes a simple RUN command, and the status of each execution can be tracked and monitored, ensuring that the desired outputs are produced successfully. diff --git a/modules/chapter1/pages/dsp-intro.adoc b/modules/chapter1/pages/dsp-intro.adoc index 95fb1bc..e57f991 100644 --- a/modules/chapter1/pages/dsp-intro.adoc +++ b/modules/chapter1/pages/dsp-intro.adoc @@ -2,26 +2,26 @@ == What is a machine learning pipeline -A machine learning pipeline is a crucial component in the development and productionization of machine learning systems, helping data scientists and data engineers manage the complexity of the end-to-end machine learning process and helping them to develop accurate and scalable solutions for a wide range of applications. - Machine learning (ML) pipelines are a key part of the data science process, helping data scientists to streamline their work and automate tasks. They can make the model development process more efficient and reproducible, while also reducing the risk of errors. +Enabling data scientists and data engineers manage the complexity of the end-to-end machine learning process and helping them to develop accurate and scalable solutions for a wide range of applications. + === Data science pipeline benefits: . *Modularization:* Pipelines enable you to break down the machine learning process into modular, well-defined steps. Each step can be developed, tested and optimized independently, making it easier to manage and maintain the workflow. . *Reproducibility:* Machine learning pipelines make it easier to reproduce experiments. By defining the sequence of steps and their parameters in a pipeline, you can recreate the entire process exactly, ensuring consistent results. If a step fails or a model's performance deteriorates, the pipeline can be configured to raise alerts or take corrective actions. -*Experimentation:* You can experiment with different data preprocessing techniques, feature selections, and models by modifying individual steps within the pipeline. This flexibility enables for rapid iteration and optimization. + . *Experimentation:* You can experiment with different data preprocessing techniques, feature selections, and models by modifying individual steps within the pipeline. This flexibility enables for rapid iteration and optimization. -*Collaboration:* Pipelines make it easier for teams of data scientists and engineers to collaborate. Since the workflow is structured and documented, it's easier for team members to understand and contribute to the project. + . *Collaboration:* Pipelines make it easier for teams of data scientists and engineers to collaborate. Since the workflow is structured and documented, it's easier for team members to understand and contribute to the project. -*Version control and documentation:* You can use version control systems to track changes in your pipeline's code and configuration, ensuring that you can roll back to previous versions if needed. A well-structured pipeline encourages better documentation of each step. + . *Version control and documentation:* You can use version control systems to track changes in your pipeline's code and configuration, ensuring that you can roll back to previous versions if needed. A well-structured pipeline encourages better documentation of each step. === Machine learning lifecycles & DevOps Machine learning lifecycles can vary in complexity and may involve additional steps depending on the use case, such as hyperparameter optimization, cross-validation, and feature selection. The goal of a machine learning pipeline is to automate and standardize these processes, making it easier to develop and maintain ML models for various applications. -Integration with DevOps (2010s): Machine learning pipelines started to be integrated with DevOps practices to enable continuous integration and deployment (CI/CD) of machine learning models. This integration emphasized the need for reproducibility, version control and monitoring in ML pipelines. This integration is referred to as machine learning operations, or MLOps, which helps data science teams effectively manage the complexity of managing ML orchestration. In a real-time deployment, the pipeline replies to a request within milliseconds of the request. +Machine learning pipelines started to be integrated with DevOps practices to enable continuous integration and deployment (CI/CD) of machine learning models. This integration emphasized the need for reproducibility, version control and monitoring in ML pipelines. This integration is referred to as machine learning operations, or *MLOps*, which helps data science teams effectively manage the complexity of managing ML orchestration. In a real-time deployment, the pipeline replies to a request within milliseconds of the request. diff --git a/modules/chapter2/images/clone_repo_jupyter.gif b/modules/chapter2/images/clone_repo_jupyter.gif new file mode 100644 index 0000000..4e2b2a3 Binary files /dev/null and b/modules/chapter2/images/clone_repo_jupyter.gif differ diff --git a/modules/chapter2/images/logon_jupyterlab.gif b/modules/chapter2/images/logon_jupyterlab.gif new file mode 100644 index 0000000..d9e282d Binary files /dev/null and b/modules/chapter2/images/logon_jupyterlab.gif differ diff --git a/modules/chapter2/nav.adoc b/modules/chapter2/nav.adoc index 2ccbe53..8b69cac 100644 --- a/modules/chapter2/nav.adoc +++ b/modules/chapter2/nav.adoc @@ -1,4 +1,4 @@ * xref:index.adoc[] -** xref:managing-dsp-pipelines.adoc[] +//** xref:managing-dsp-pipelines.adoc[] ** xref:data-science-pipeline-app.adoc[] ** xref:rhoai-resources.adoc[] \ No newline at end of file diff --git a/modules/chapter2/pages/data-science-pipeline-app.adoc b/modules/chapter2/pages/data-science-pipeline-app.adoc index f1c42d9..cfbb5b4 100644 --- a/modules/chapter2/pages/data-science-pipeline-app.adoc +++ b/modules/chapter2/pages/data-science-pipeline-app.adoc @@ -1,6 +1,6 @@ = Data Science Pipeline Applications -The *DataSciencePipelineApplication* (dspa) custom resource creates several pods that are necessary to utilize the tools. This includes the creation of an API endpoint and a database where metadata is stored. The API endpoint is used by the OpenShift AI Dashboard, as well as tools like *Elyra* and the *kfp* package to manage and execute pipelines. +The *DataSciencePipelineApplication* (dspa) custom resource creates several pods that are necessary to utilize the tooling. This includes the creation of an API endpoint and a database where metadata is stored. The API endpoint is used by the OpenShift AI Dashboard, as well as tools like *Elyra* and the *kfp* package to manage and execute pipelines. image::dpsa_services.gif[width=600] @@ -30,7 +30,7 @@ While a *DataSciencePipelineApplication* is a namespace scoped object, workbench Only one dpsa deployment can exist per data science project. (nampespace) -== Exercise: Create a Data Science Pipeline Instance +== Lab Exercise: Create a Data Science Pipeline Instance To begin we will use the *Minio* instance created with the lab environment to act as the S3 artifact storage for the *DataSciencePipelineApplication*. @@ -55,7 +55,7 @@ Bucket: pipelines ``` + ``` -Name: storage +Name: my-storage Access key: minio Secret key: minio321! Endpoint: http://minio-service.pipelines-example.svc:9000 @@ -84,11 +84,11 @@ image::pipeline_server_setup.gif[width=600] //+ //image::create-dspa-create-pipeline-server.png[] -. Click the key icon in the right side of the `Access Key` field, and select the `data-science-pipelines` data connection. The fields in the form are automatically populated. +. Click the key icon in the right side of the `Access Key` field, and select the `pipelines` data connection. The fields in the form are automatically populated. //+ //image::create-dspa-configure-pipeline-server.png[] -. Click `Configure pipeline server`. After several seconds, the loading icon should complete and the `Pipelines` section will now show an option to `Import pipeline` along with a message that says `No pipelines`. +. Click `Configure pipeline server`. After several seconds, the loading icon should complete and the `Pipelines` section will now show an option to `Import pipeline`. //+ //image::create-dspa-verify-pipeline-server.png[] diff --git a/modules/chapter2/pages/index.adoc b/modules/chapter2/pages/index.adoc index 8f3e8b4..d4dc39f 100644 --- a/modules/chapter2/pages/index.adoc +++ b/modules/chapter2/pages/index.adoc @@ -1,47 +1,17 @@ = Data Science Pipelines -== Why Pipelines - -Help break complex AI tasks into multi step workloads that is more maintainable and reusable -Each step can be optimized for specific accelerators, configured and automated individually. -Complex training tasks that require optimizations, validations, scoring, data analysis, parallel training. -caching steps -Track and Versions -Different algorithms models and architecture to select the best combination -Keep track of variables and configurations. -Compare experiments to select the best one -Gain insights between inputs and results produced -Version the changes across the experiments - track versions, changes, outcomes - -=== How do we do it? - -DSP - helps data scientists track, automate, and version - -Data scientist can Configure a pipeline -Which is sequence of components or tasks -Represented by a directed acyclic graph. DAG - -Visualization of the sequence of tasks, also track outputs or artifcacts produced - -Pipeline control flows such as loops - -Pipeline is only a definition. - different versions of pipelines. -Executing a pipeline definition become a RUN -Understand the status of each of the executions -Were the artifacts produced successfully. - -=== Two basic principles: -Automate - Automated reduced human error -Track and version inputs & outputs of your model lifecycle to improve AI solution quality. - -Automation - Separated by Role -Inner loop - Model Development -Outer Loop - Model serving & availability -Monitoring and metrics -Tools of the data scientist: -Choice is notebooks & python for AI Model Development -Create notebooks that can automate the whole lifecycle process -Package model - +== Managing Data Science Pipelines 2.0 + +=== Configuring a pipeline server + +Before you can successfully create a pipeline in OpenShift AI, you must configure a pipeline server. This task includes configuring where your pipeline artifacts and data are stored. + + * You have an existing S3-compatible object storage bucket and you have configured write access to your S3 bucket on your storage account. + * You have created a data science project that you can add a pipeline server to. + * By default in RHOAI when a pipeline server is created it deploys a namespace mariaDB. + * If you are configuring a pipeline server with an external database. + ** Red Hat recommends that you use MySQL version 8.x. + ** Red Hat recommends that you use at least MariaDB version 10.5. diff --git a/modules/chapter2/pages/kubeflow.adoc b/modules/chapter2/pages/kubeflow.adoc index 7bc1823..f651b56 100644 --- a/modules/chapter2/pages/kubeflow.adoc +++ b/modules/chapter2/pages/kubeflow.adoc @@ -1 +1,44 @@ -= kubeflow \ No newline at end of file += kubeflow + +== Why Pipelines + +Help break complex AI tasks into multi step workloads that is more maintainable and reusable +Each step can be optimized for specific accelerators, configured and automated individually. +Complex training tasks that require optimizations, validations, scoring, data analysis, parallel training. +caching steps +Track and Versions +Different algorithms models and architecture to select the best combination +Keep track of variables and configurations. +Compare experiments to select the best one +Gain insights between inputs and results produced +Version the changes across the experiments - track versions, changes, outcomes + +=== How do we do it? + +DSP - helps data scientists track, automate, and version + +Data scientist can Configure a pipeline +Which is sequence of components or tasks +Represented by a directed acyclic graph. DAG + +Visualization of the sequence of tasks, also track outputs or artifcacts produced + +Pipeline control flows such as loops + +Pipeline is only a definition. - different versions of pipelines. +Executing a pipeline definition become a RUN +Understand the status of each of the executions +Were the artifacts produced successfully. + +=== Two basic principles: +Automate - Automated reduced human error +Track and version inputs & outputs of your model lifecycle to improve AI solution quality. + +Automation - Separated by Role +Inner loop - Model Development +Outer Loop - Model serving & availability +Monitoring and metrics +Tools of the data scientist: +Choice is notebooks & python for AI Model Development +Create notebooks that can automate the whole lifecycle process +Package model - \ No newline at end of file diff --git a/modules/chapter2/pages/managing-dsp-pipelines.adoc b/modules/chapter2/pages/managing-dsp-pipelines.adoc index 5c10b49..1f22c58 100644 --- a/modules/chapter2/pages/managing-dsp-pipelines.adoc +++ b/modules/chapter2/pages/managing-dsp-pipelines.adoc @@ -6,9 +6,10 @@ Before you can successfully create a pipeline in OpenShift AI, you must configure a pipeline server. This task includes configuring where your pipeline artifacts and data are stored. - * You have an existing S3-compatible object storage bucket and you have configured write access to your S3 bucket on your storage account. + * You have an existing S3-compatible object storage bucket and you have configured write access to your S3 bucket on your storage account. * You have created a data science project that you can add a pipeline server to. - * If you are configuring a pipeline server with an external database + * By default in RHOAI when a pipeline server is created it deploys a namespace mariaDB. + * If you are configuring a pipeline server with an external database. ** Red Hat recommends that you use MySQL version 8.x. ** Red Hat recommends that you use at least MariaDB version 10.5. diff --git a/modules/chapter2/pages/rhoai-resources.adoc b/modules/chapter2/pages/rhoai-resources.adoc index 42b6891..511b898 100644 --- a/modules/chapter2/pages/rhoai-resources.adoc +++ b/modules/chapter2/pages/rhoai-resources.adoc @@ -1,10 +1,10 @@ = OpenShift AI Resources -== Creating a WorkBench +== Lab Exercise: Creating a WorkBench image::dsp_workbench.gif[width=600] -Navigate to the Data Science Project section of the OpenShift AI Console /Dashboard. Select the fraud-detection project. +Navigate to the Data Science Project section of the OpenShift AI Console / Dashboard. Select the fraud-detection project. //image::create_workbench.png[width=640] @@ -27,21 +27,18 @@ Depending on the notebook image selected and the deployment size, it can take be -== Jupyter Notebooks +== Lab Exercise: Access JupyterLab -// video::llm_jupyter_v3.mp4[width=640] - -== Open JupyterLab JupyterLab enables you to work with documents and activities such as Jupyter notebooks, text editors, terminals, and custom components in a flexible, integrated, and extensible manner. For a demonstration of JupyterLab and its features, https://jupyterlab.readthedocs.io/en/stable/getting_started/overview.html#what-will-happen-to-the-classic-notebook[you can view this video., window=_blank] -Return to the fraud-detection workbench dashboard in the OpenShift AI console. +Return to the `fraud-detection` workbench dashboard in the OpenShift AI console. + +image::logon_jupyterlab.gif[width=600] . Select the *Open* link to the right of the status section of the fraud-detection workbench + -image::oai_open_jupyter.png[width=640] - . When the new window opens, use the OpenShift admin user & password to login to JupyterLab. . A landing page will prompt for Access Authorization. Make sure the boxes are checked for: @@ -54,16 +51,18 @@ image::oai_open_jupyter.png[width=640] If the *OPEN* link for the notebook is grayed out, the notebook container is still starting. This process can take a few minutes & up to 20+ minutes depending on the notebook image / resources we opted to choose. -== Inside JupyterLab +== Lab Exercise: Working with files in JupyterLab This takes us to the JupyterLab screen where we can select multiple options / tools / to work to begin our data science experimentation. -Our first action is to clone a git repository that contains a notebooks including an example notebook to familize yourself with the Jupiter notebook environment. +Our first action is to clone a git repository that contains notebooks including an sample notebook to familize yourself with the Jupiter notebook environment. + +image::clone_repo_jupyter.gif[width=600] -[NOTE} -==== -Add github repo here -==== +[NOTE] +```yaml +https://github.com/rh-aiservices-bu/fraud-detection.git +``` . Copy the URL link above @@ -74,10 +73,11 @@ image::clone_a_repo.png[width=640] . Paste the link into the *clone a repo* pop up, make sure the *included submodules are checked*, then click the clone. - . Navigate to the XYZ_ADD_CORRECT_FOLDER_HERE folder: + . Navigate to the `fraud-detection` folder: - . Then open the file: ABC_ADD_CORRECT_FILE -+ -image::navigate_ollama_notebook.png[width=640] + . Then open the file: 0_sandbox.ipynb - . Explore the notebook, and then continue. + . Follow the directions in the notebook to explore Jupyter. + + +Continue onto the next section: Creating Pipelines with Elyra. diff --git a/modules/chapter3/images/elyra_pipeline_submit.gif b/modules/chapter3/images/elyra_pipeline_submit.gif new file mode 100644 index 0000000..107129e Binary files /dev/null and b/modules/chapter3/images/elyra_pipeline_submit.gif differ diff --git a/modules/chapter3/images/pipeline_storage.gif b/modules/chapter3/images/pipeline_storage.gif new file mode 100644 index 0000000..ddbdf6e Binary files /dev/null and b/modules/chapter3/images/pipeline_storage.gif differ diff --git a/modules/chapter3/images/save_model_storage.gif b/modules/chapter3/images/save_model_storage.gif new file mode 100644 index 0000000..cc4a71c Binary files /dev/null and b/modules/chapter3/images/save_model_storage.gif differ diff --git a/modules/chapter3/pages/elyra-pipelines.adoc b/modules/chapter3/pages/elyra-pipelines.adoc index a9f4f79..ac55f78 100644 --- a/modules/chapter3/pages/elyra-pipelines.adoc +++ b/modules/chapter3/pages/elyra-pipelines.adoc @@ -45,39 +45,41 @@ In order to create Elyra pipelines with the visual pipeline editor: * Once the pipeline is complete, you can submit it to the Data Science Pipelines engine. -== Exercise: Offline scoring for fraud detection +== Lab Exercise: Fraud Detection Pipeline === Working with Elyra -==== Exploring the Code +Let's now use Elyra to package the code into a pipeline and submit it to the Data Science Pipelines backend in order to: + +* Rely on the pipeline scheduler to manage the pipeline execution without having to depend on my workbench session. +* Keep track of the pipeline execution along with the previous executions. +* Be able to control resource usage of individual pipeline tasks in a fine-grained manner. + +==== Review opening JupyterLab Once the `fraud-detection` workbench has successfully started, we will being the process of exploring and building our pipeline. . Ensure that the `fraud-detection` workbench is in `Running` state. Click the `Open` link on the far right of the work bench menu. Log in to the workbench as the `admin` user. If you are running the workbench for the first time, click `Allow selected permissions` in the `Authorize Access` page to open the Jupyter Notebook interface. -== In Jupyter Clone Repository for Fraud-Detection +=== In Jupyter Clone Repository for Fraud-Detection . If you haven't already, clone the git repository below in the Jupyter notebook: + ``` https://github.com/rh-aiservices-bu/fraud-detection.git ``` -. Double click or open the fraud-detection folder in the explorer window - -image::elyra_pipeline_nodes.gif[width=600] +. Double click or open the *fraud-detection folder* in the explorer window -=== Building the Pipeline - -Let's now use Elyra to package the code into a pipeline and submit it to the Data Science Pipelines backend in order to: +== Lab Exercise: Building the Pipeline -* Rely on the pipeline scheduler to manage the pipeline execution without having to depend on my workbench session. -* Keep track of the pipeline execution along with the previous executions. -* Be able to control resource usage of individual pipeline tasks in a fine-grained manner. +image::elyra_pipeline_nodes.gif[width=600] . Click on the `Pipeline Editor` tile in the launcher menu. This opens up Elyra's visual pipeline editor. You will use the visual pipeline editor to drag-and-drop files from the file browser onto the canvas area. These files then define the individual tasks of your pipeline. +. Rename the pipeline file to `fraud-detection-elyra.pipeline` and hit `Save Pipeline` in the top toolbar. + . Drag the `experiment_train.ipynb` notebook onto the empty canvas. This will allow the pipeline to ingest the data we want to classify, pre-process the data, train a model, and run a sample test to validate the model is working as intended. + //image::pipeline-1.png[] @@ -114,13 +116,15 @@ Before we can submit our pipeline, we have to configure the pipeline to specify: .. Select the `PIPELINE PROPERTIES` tab of the settings menu. Configurations in this section apply defaults to all nodes in the pipeline. -.. Scroll down to `Generic Node Defaults` and click on the drop down menu of `Runtime Image`. Select the `fraud detection runtime` that we previously defined. +.. Scroll down to `Generic Node Defaults` and click on the drop down menu of `Runtime Image`. Select the `TensorFlow with Cuda and Python 3.9 (UBI)` runtime image. + image::experiment_node_config.gif[width=600] + NOTE: Do not select any of the nodes in the canvas when you open the panel. You will see the `PIPELINE PROPERTIES` tab only when none of the nodes are selected. Click anywhere on the canvas and then open the panel. -. Next we will configure the data connection to the `my-storage` bucket as a Kubernetes secret. In the `PIPELINE PROPERTIES` section, click `Add` beneath the `Kubernetes Secrets` section and add the following five entries: +. Next we will configure the data connection to the `my-storage` bucket as a Kubernetes secret. By default these secrets are created in the environment variable of pipeline properties, but need to be located in the Kubernetes secrets to be used in the pipeline. Copy entries from the environment variables section; add these in the kubernetes secrets for save_model (node2) task in Elyra. + +. In the `PIPELINE PROPERTIES` section, click `Add` beneath the `Kubernetes Secrets` section and add the following five entries: + -- * `AWS_ACCESS_KEY_ID` @@ -133,32 +137,33 @@ NOTE: Do not select any of the nodes in the canvas when you open the panel. You Each parameter will include the following options: + -- -* `Environment Variable`: the parameter name +* `Environment Variable`: *the parameter name* * `Secret Name`: `aws-connection-my-storage` (the name of the Kubernetes secret belonging to the data connection) -* `Secret Key`: the parameter name +* `Secret Key`: *the parameter name* -- + -//image::pipeline-config-3.png[] -+ -[NOTE] -==== -A data connection in OpenShift AI is a standard Kubernetes secret that adheres to a specific format. A data connection name is always pre-pended with `aws-connection-`. To explore the data connection you can find the secret in the `Workloads` -> `Secrets` menu in the OpenShift Web Console. -==== +image::save_model_storage.gif[width=600] + [NOTE] ==== The AWS default region is another parameter in the data connection, which is used for AWS S3-based connections. ==== -. Next we will configure the data to be passed between the nodes. Click on the `model_loading.py` node. If you're still in the configuration menu, you should now see the `NODE PROPERTIES` tab. If not, right-click on the node and select `Open Properties`. +. Next we will configure the data to be passed between the nodes. Click on the `experiment_train` node. If you're still in the configuration menu, you should now see the `NODE PROPERTIES` tab. If not, right-click on the node and select `Open Properties`. + //image::pipeline-config-4.png[] . Under `Runtime Image` and `Kubernetes Secrets`, you can see that the global pipeline settings are used by default. +image::experiment_node_config_2.gif[width=600] + +. In the `File Dependencies` section, you can declare one or more _input files_. These input files are consumed by this pipeline task as the data needed to train to the model. + +.. Under file dependencies *click add*, next select browse and choose the data/card_transdata.csv file which provides a sampling of credit card to be used. + . In the `Outputs` section, you can declare one or more _output files_. These output files are created by this pipeline task and are made available to all subsequent tasks. -. Click `Add` in the `Outputs` section and input `models/fraud/1/model.onnx`. This ensures that the downloaded model artifact is available to downstream tasks, including the `save_models` task. +.. Click `Add` in the `Outputs` section and input `models/fraud/1/model.onnx`. This ensures that the downloaded model artifact is available to downstream tasks, including the `save_models` task. + //image::pipeline-config-5.png[] + @@ -178,29 +183,28 @@ Output files are automatically managed by Data Science Pipelines, and stored in `Mount Volumes` can be helpful when a large amount of files, or a large dataset is required to be stored. `Mount Volumes` also have the ability to persist data between runs of a pipeline, which can allow a volume to act as a cache for files between executions. ==== -+ + [NOTE] ==== We could have declared the data volume as a global pipeline property for simplicity. However, this would have prevented parallel execution of model loading and data ingestion/preprocessing since data volumes can only be used by a single task by default. ==== -. Rename the pipeline file to `fraud-detection-elyra.pipeline` and hit `Save Pipeline` in the top toolbar. -+ -//image::pipeline-config-7.png[] ==== Running the pipeline We have now fully created and configured the pipeline, so let's now see it in action! . In the visual editor, click on the *Play* icon (`Run Pipeline`). Leave the default values and hit `OK`. -+ + +image::elyra_pipeline_submit.gif[width=600] + [TIP] ==== *Data Science Pipelines* should be selected as the default execution environment automatically when starting the pipeline run. OpenShift AI will automatically configure and select the *DataSciencePipelinesApplication* instance we created previously as the default execution environment. This will happen provided the *DataSciencePipelinesApplication* was created before the workbench was started and it is located in the same namespace as the workbench. If you wish to use *DataSciencePipelinesApplication* that is located in a different namespace from your workbench you can manually configure an execution environment. ==== -+ + [WARNING] ==== If you configure the pipeline server after you have created a workbench and specified a notebook image within the workbench, you will not be able to execute the pipeline, even after restarting the notebook. @@ -235,7 +239,7 @@ Save your changes. . In the `Scheduled` tab you're able to schedule runs of the offline scoring pipeline according to a predefined schedule such as daily or according to a Cron statement. + //image::pipeline-scheduled.png[] -+ + [WARNING] ==== Pipeline versioning implemented in Data Science Pipelines. @@ -254,7 +258,7 @@ Let's finally peek behind the scenes and inspect the S3 bucket that Elyra and Da * `artifacts`: A folder used by Data Science Pipelines to store the metadata of each pipeline task for each pipeline run. * One folder for each pipeline run with name `[pipeline-name]-[timestamp]`. These folders are managed by Elyra and contain all file dependencies, log files, and output files of each task. -- -+ + [NOTE] ==== The logs from the Pipeline submitted from Elyra will show generic task information and logs, including showing the execution of our python files as a subtask. Log details from our code is not recorded in the pipeline logs. diff --git a/modules/chapter4/attachments/fraud_detection.yaml b/modules/chapter4/attachments/fraud_detection.yaml new file mode 100644 index 0000000..b0ce1cd --- /dev/null +++ b/modules/chapter4/attachments/fraud_detection.yaml @@ -0,0 +1,638 @@ +# PIPELINE DEFINITION +# Name: fraud-detection-training-pipeline +# Description: Trains the fraud detection model. +# Inputs: +# datastore: dict +# hyperparameters: dict +# Outputs: +# evaluate-keras-model-performance-classification_metrics: system.ClassificationMetrics +# evaluate-keras-model-performance-metrics: system.Metrics +components: + comp-convert-keras-to-onnx: + executorLabel: exec-convert-keras-to-onnx + inputDefinitions: + artifacts: + keras_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + onnx_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + comp-evaluate-keras-model-performance: + executorLabel: exec-evaluate-keras-model-performance + inputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + scaler: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + test_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + previous_model_metrics: + parameterType: STRUCT + outputDefinitions: + artifacts: + classification_metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 + markdown: + artifactType: + schemaTitle: system.Markdown + schemaVersion: 0.0.1 + metrics: + artifactType: + schemaTitle: system.Metrics + schemaVersion: 0.0.1 + comp-fetch-transactionsdb-data: + executorLabel: exec-fetch-transactionsdb-data + inputDefinitions: + parameters: + datastore: + parameterType: STRUCT + outputDefinitions: + artifacts: + dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-preprocess-transactiondb-data: + executorLabel: exec-preprocess-transactiondb-data + inputDefinitions: + artifacts: + in_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + scaler: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + test_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + train_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + val_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + class_weights: + parameterType: STRUCT + comp-train-fraud-model: + executorLabel: exec-train-fraud-model + inputDefinitions: + artifacts: + scaler: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + train_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + val_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + class_weights: + parameterType: STRUCT + hyperparameters: + parameterType: STRUCT + outputDefinitions: + artifacts: + trained_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + comp-validate-onnx-model: + executorLabel: exec-validate-onnx-model + inputDefinitions: + artifacts: + keras_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + onnx_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + test_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-validate-transactiondb-data: + executorLabel: exec-validate-transactiondb-data + inputDefinitions: + artifacts: + dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: BOOLEAN +deploymentSpec: + executors: + exec-convert-keras-to-onnx: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - convert_keras_to_onnx + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'tf2onnx' 'onnx'\ + \ 'pandas' 'scikit-learn' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef convert_keras_to_onnx(\n keras_model: Input[Model],\n onnx_model:\ + \ Output[Model],\n):\n import tf2onnx, onnx\n import keras\n import\ + \ tensorflow as tf\n\n trained_keras_model = keras.saving.load_model(keras_model.path)\n\ + \ input_signature = [tf.TensorSpec(trained_keras_model.inputs[0].shape,\ + \ trained_keras_model.inputs[0].dtype, name='input')]\n trained_keras_model.output_names\ + \ = ['output']\n onnx_model_proto, _ = tf2onnx.convert.from_keras(trained_keras_model,\ + \ input_signature)\n\n onnx_model.path += \".onnx\"\n onnx.save(onnx_model_proto,\ + \ onnx_model.path)\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-evaluate-keras-model-performance: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - evaluate_keras_model_performance + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'tf2onnx' 'onnx'\ + \ 'pandas' 'scikit-learn' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef evaluate_keras_model_performance(\n model: Input[Model],\n\ + \ test_data: Input[Dataset],\n scaler: Input[Model],\n previous_model_metrics:\ + \ dict,\n metrics: Output[Metrics],\n classification_metrics: Output[ClassificationMetrics],\n\ + \ markdown: Output[Markdown]\n):\n import keras\n import pandas\ + \ as pd\n from sklearn.metrics import confusion_matrix\n import numpy\ + \ as np\n\n trained_model = keras.saving.load_model(model.path)\n \ + \ with open(test_data.path, 'rb') as pickle_file:\n X_test, y_test\ + \ = pd.read_pickle(pickle_file)\n with open(scaler.path, 'rb') as pickle_file:\n\ + \ st_scaler = pd.read_pickle(pickle_file)\n\n y_pred_temp = trained_model.predict(st_scaler.transform(X_test.values))\n\ + \ y_pred_temp = np.asarray(np.squeeze(y_pred_temp))\n threshold =\ + \ 0.95\n y_pred = np.where(y_pred_temp > threshold, 1,0)\n accuracy\ + \ = np.sum(np.asarray(y_test) == y_pred) / len(y_pred)\n\n metrics.log_metric(\"\ + Accuracy\", accuracy)\n metrics.log_metric(\"Prev Model Accuracy\", previous_model_metrics[\"\ + accuracy\"])\n\n cmatrix = confusion_matrix(np.asarray(y_test), y_pred)\n\ + \ cmatrix = cmatrix.tolist()\n targets = [\"0\", \"1\"] #TODO: Replace\ + \ with info from schema\n classification_metrics.log_confusion_matrix(targets,\ + \ cmatrix)\n\n with open(markdown.path, 'w') as f:\n f.write(\"\ + ### Accuracy\\n\")\n f.write(f'Accuracy: {accuracy:.2f}\\n')\n \ + \ f.write(\"### Previous Model Accuracy\\n\")\n f.write(f'Accuracy:\ + \ {previous_model_metrics[\"accuracy\"]:.2f}\\n')\n\n if accuracy <=\ + \ previous_model_metrics[\"accuracy\"]:\n raise Exception(\"Accuracy\ + \ is lower than the previous models\")\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-fetch-transactionsdb-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - fetch_transactionsdb_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'psycopg2' 'pandas'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef fetch_transactionsdb_data(\n datastore: dict,\n dataset:\ + \ Output[Dataset]\n):\n \"\"\"\n Fetches data from the transactionsdb\ + \ datastore\n \"\"\"\n import urllib.request\n print(\"starting\ + \ download...\")\n url = datastore['url']\n urllib.request.urlretrieve(url,\ + \ dataset.path)\n print(\"done\")\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 + exec-preprocess-transactiondb-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - preprocess_transactiondb_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'scikit-learn'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef preprocess_transactiondb_data(\n in_data: Input[Dataset],\n\ + \ train_data: Output[Dataset],\n val_data: Output[Dataset],\n test_data:\ + \ Output[Dataset],\n scaler: Output[Model],\n) -> NamedTuple('outputs',\ + \ class_weights=dict):\n \"\"\"\n Takes the dataset and preprocesses\ + \ it to better train on the fraud detection model.\n The preprocessing\ + \ consists of:\n 1. Splitting the dataset into training, validation,\ + \ and testing.\n 2. Creating a scaler which scales down the training\ + \ dataset. This scaler is saved as an artifact.\n 3. Calculates the class\ + \ weights, which will later be used during the training.\n \"\"\"\n\n\ + \ from sklearn.model_selection import train_test_split\n from sklearn.preprocessing\ + \ import StandardScaler\n from sklearn.utils import class_weight\n \ + \ import pandas as pd\n import pickle\n import numpy as np\n from\ + \ typing import NamedTuple\n\n df = pd.read_csv(in_data.path)\n print(df.head())\n\ + \ X = df.drop(columns = ['repeat_retailer','distance_from_home', 'fraud'])\n\ + \ y = df['fraud']\n\n # Split the data into training and testing sets\ + \ so you have something to test the trained model with.\n\n # X_train,\ + \ X_test, y_train, y_test = train_test_split(X,y, test_size = 0.2, stratify\ + \ = y)\n X_train, X_test, y_train, y_test = train_test_split(X,y, test_size\ + \ = 0.2, shuffle = False)\n\n X_train, X_val, y_train, y_val = train_test_split(X_train,y_train,\ + \ test_size = 0.2, stratify = y_train)\n\n # Scale the data to remove\ + \ mean and have unit variance. The data will be between -1 and 1, which\ + \ makes it a lot easier for the model to learn than random (and potentially\ + \ large) values.\n # It is important to only fit the scaler to the training\ + \ data, otherwise you are leaking information about the global distribution\ + \ of variables (which is influenced by the test set) into the training set.\n\ + \n st_scaler = StandardScaler()\n\n X_train = st_scaler.fit_transform(X_train.values)\n\ + \n train_data.path += \".pkl\"\n val_data.path += \".pkl\"\n test_data.path\ + \ += \".pkl\"\n scaler.path += \".pkl\"\n\n with open(train_data.path,\ + \ \"wb\") as handle:\n pickle.dump((X_train, y_train), handle)\n\ + \ with open(val_data.path, \"wb\") as handle:\n pickle.dump((X_val,\ + \ y_val), handle)\n with open(test_data.path, \"wb\") as handle:\n \ + \ pickle.dump((X_test, y_test), handle)\n with open(scaler.path,\ + \ \"wb\") as handle:\n pickle.dump(st_scaler, handle)\n\n # Since\ + \ the dataset is unbalanced (it has many more non-fraud transactions than\ + \ fraudulent ones), set a class weight to weight the few fraudulent transactions\ + \ higher than the many non-fraud transactions.\n\n class_weights = class_weight.compute_class_weight('balanced',classes\ + \ = np.unique(y_train),y = y_train)\n class_weights = {i : class_weights[i]\ + \ for i in range(len(class_weights))}\n\n outputs = NamedTuple('outputs',\ + \ class_weights=dict)\n return outputs(class_weights)\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 + exec-train-fraud-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_fraud_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'scikit-learn'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_fraud_model(\n train_data: Input[Dataset],\n val_data:\ + \ Input[Dataset],\n scaler: Input[Model],\n class_weights: dict,\n\ + \ hyperparameters: dict,\n trained_model: Output[Model]\n):\n \"\ + \"\"\n Trains a dense tensorflow model.\n \"\"\"\n\n from keras.models\ + \ import Sequential\n from keras.layers import Dense, Dropout, BatchNormalization,\ + \ Activation\n import pickle\n import pandas as pd\n import sklearn\n\ + \n with open(train_data.path, 'rb') as pickle_file:\n X_train,\ + \ y_train = pd.read_pickle(pickle_file)\n with open(val_data.path, 'rb')\ + \ as pickle_file:\n X_val, y_val = pd.read_pickle(pickle_file)\n\ + \ with open(scaler.path, 'rb') as pickle_file:\n st_scaler = pd.read_pickle(pickle_file)\n\ + \n y_train = y_train.to_numpy()\n y_val = y_val.to_numpy()\n\n \ + \ model = Sequential()\n model.add(Dense(32, activation = 'relu', input_dim\ + \ = X_train.shape[1]))\n model.add(Dropout(0.2))\n model.add(Dense(32))\n\ + \ model.add(BatchNormalization())\n model.add(Activation('relu'))\n\ + \ model.add(Dropout(0.2))\n model.add(Dense(32))\n model.add(BatchNormalization())\n\ + \ model.add(Activation('relu'))\n model.add(Dropout(0.2))\n model.add(Dense(1,\ + \ activation = 'sigmoid'))\n model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])\n\ + \ model.summary()\n\n epochs = hyperparameters[\"epochs\"]\n history\ + \ = model.fit(X_train, y_train, epochs=epochs, \\\n \ + \ validation_data=(st_scaler.transform(X_val.values),y_val), \\\n \ + \ verbose = True, class_weight = class_weights)\n \ + \ print(\"Training of model is complete\")\n\n trained_model.path +=\ + \ \".keras\"\n model.save(trained_model.path)\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-validate-onnx-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - validate_onnx_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'onnxruntime'\ + \ 'pandas' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef validate_onnx_model(\n onnx_model: Input[Model],\n keras_model:\ + \ Input[Model],\n test_data: Input[Dataset],\n):\n import onnxruntime\ + \ as rt\n import pandas as pd\n import numpy as np\n import keras\n\ + \n with open(test_data.path, 'rb') as pickle_file:\n X_test, _\ + \ = pd.read_pickle(pickle_file) \n _keras_model = keras.saving.load_model(keras_model.path)\n\ + \ onnx_session = rt.InferenceSession(onnx_model.path, providers=rt.get_available_providers())\n\ + \n onnx_input_name = onnx_session.get_inputs()[0].name\n onnx_output_name\ + \ = onnx_session.get_outputs()[0].name\n onnx_pred = onnx_session.run([onnx_output_name],\ + \ {onnx_input_name: X_test.values.astype(np.float32)})\n\n keras_pred\ + \ = _keras_model(X_test.values)\n\n print(\"Keras Pred: \", keras_pred)\n\ + \ print(\"ONNX Pred: \", onnx_pred[0])\n\n for rt_res, keras_res in\ + \ zip(onnx_pred[0], keras_pred):\n np.testing.assert_allclose(rt_res,\ + \ keras_res, rtol=1e-5, atol=1e-5)\n\n print(\"Results match\")\n\n" + image: quay.io/hukhan/tensorflow:2.17.0 + exec-validate-transactiondb-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - validate_transactiondb_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef validate_transactiondb_data(\n dataset: Input[Dataset]\n)\ + \ -> bool:\n \"\"\"\n Validates if the data schema is correct and\ + \ if the values are reasonable.\n \"\"\"\n\n if not dataset.path:\n\ + \ raise Exception(\"dataset not found\")\n return True\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 +pipelineInfo: + description: Trains the fraud detection model. + name: fraud-detection-training-pipeline +root: + dag: + outputs: + artifacts: + evaluate-keras-model-performance-classification_metrics: + artifactSelectors: + - outputArtifactKey: classification_metrics + producerSubtask: evaluate-keras-model-performance + evaluate-keras-model-performance-metrics: + artifactSelectors: + - outputArtifactKey: metrics + producerSubtask: evaluate-keras-model-performance + tasks: + convert-keras-to-onnx: + cachingOptions: + enableCache: true + componentRef: + name: comp-convert-keras-to-onnx + dependentTasks: + - train-fraud-model + inputs: + artifacts: + keras_model: + taskOutputArtifact: + outputArtifactKey: trained_model + producerTask: train-fraud-model + taskInfo: + name: convert-keras-to-onnx + evaluate-keras-model-performance: + cachingOptions: + enableCache: true + componentRef: + name: comp-evaluate-keras-model-performance + dependentTasks: + - preprocess-transactiondb-data + - train-fraud-model + inputs: + artifacts: + model: + taskOutputArtifact: + outputArtifactKey: trained_model + producerTask: train-fraud-model + scaler: + taskOutputArtifact: + outputArtifactKey: scaler + producerTask: preprocess-transactiondb-data + test_data: + taskOutputArtifact: + outputArtifactKey: test_data + producerTask: preprocess-transactiondb-data + parameters: + previous_model_metrics: + runtimeValue: + constant: + accuracy: 0.85 + taskInfo: + name: evaluate-keras-model-performance + fetch-transactionsdb-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-fetch-transactionsdb-data + inputs: + parameters: + datastore: + componentInputParameter: datastore + taskInfo: + name: fetch-transactionsdb-data + preprocess-transactiondb-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-preprocess-transactiondb-data + dependentTasks: + - fetch-transactionsdb-data + inputs: + artifacts: + in_data: + taskOutputArtifact: + outputArtifactKey: dataset + producerTask: fetch-transactionsdb-data + taskInfo: + name: preprocess-transactiondb-data + train-fraud-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-fraud-model + dependentTasks: + - preprocess-transactiondb-data + inputs: + artifacts: + scaler: + taskOutputArtifact: + outputArtifactKey: scaler + producerTask: preprocess-transactiondb-data + train_data: + taskOutputArtifact: + outputArtifactKey: train_data + producerTask: preprocess-transactiondb-data + val_data: + taskOutputArtifact: + outputArtifactKey: val_data + producerTask: preprocess-transactiondb-data + parameters: + class_weights: + taskOutputParameter: + outputParameterKey: class_weights + producerTask: preprocess-transactiondb-data + hyperparameters: + componentInputParameter: hyperparameters + taskInfo: + name: train-fraud-model + validate-onnx-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-validate-onnx-model + dependentTasks: + - convert-keras-to-onnx + - preprocess-transactiondb-data + - train-fraud-model + inputs: + artifacts: + keras_model: + taskOutputArtifact: + outputArtifactKey: trained_model + producerTask: train-fraud-model + onnx_model: + taskOutputArtifact: + outputArtifactKey: onnx_model + producerTask: convert-keras-to-onnx + test_data: + taskOutputArtifact: + outputArtifactKey: test_data + producerTask: preprocess-transactiondb-data + taskInfo: + name: validate-onnx-model + validate-transactiondb-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-validate-transactiondb-data + dependentTasks: + - fetch-transactionsdb-data + inputs: + artifacts: + dataset: + taskOutputArtifact: + outputArtifactKey: dataset + producerTask: fetch-transactionsdb-data + taskInfo: + name: validate-transactiondb-data + inputDefinitions: + parameters: + datastore: + parameterType: STRUCT + hyperparameters: + parameterType: STRUCT + outputDefinitions: + artifacts: + evaluate-keras-model-performance-classification_metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 + evaluate-keras-model-performance-metrics: + artifactType: + schemaTitle: system.Metrics + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0 diff --git a/modules/chapter4/attachments/get_data_train_upload.yaml b/modules/chapter4/attachments/get_data_train_upload.yaml new file mode 100644 index 0000000..4927057 --- /dev/null +++ b/modules/chapter4/attachments/get_data_train_upload.yaml @@ -0,0 +1,255 @@ +# PIPELINE DEFINITION +# Name: 7-get-data-train-upload +components: + comp-get-data: + executorLabel: exec-get-data + outputDefinitions: + artifacts: + data_output_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-train-model: + executorLabel: exec-train-model + inputDefinitions: + artifacts: + data_input_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + model_output_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-upload-model: + executorLabel: exec-upload-model + inputDefinitions: + artifacts: + input_model_path: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-get-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.5.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_data(data_output_path: OutputPath()):\n import urllib.request\n\ + \ print(\"starting download...\")\n url = \"https://raw.githubusercontent.com/rh-aiservices-bu/fraud-detection/main/data/card_transdata.csv\"\ + \n urllib.request.urlretrieve(url, data_output_path)\n print(\"done\"\ + )\n\n" + image: quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301 + exec-train-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.5.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'tf2onnx' 'seaborn'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_model(data_input_path: InputPath(), model_output_path:\ + \ OutputPath()):\n import numpy as np\n import pandas as pd\n from\ + \ keras.models import Sequential\n from keras.layers import Dense, Dropout,\ + \ BatchNormalization, Activation\n from sklearn.model_selection import\ + \ train_test_split\n from sklearn.preprocessing import StandardScaler\n\ + \ from sklearn.utils import class_weight\n import tf2onnx\n import\ + \ onnx\n import pickle\n from pathlib import Path\n\n # Load the\ + \ CSV data which we will use to train the model.\n # It contains the\ + \ following fields:\n # distancefromhome - The distance from home where\ + \ the transaction happened.\n # distancefromlast_transaction - The\ + \ distance from last transaction happened.\n # ratiotomedianpurchaseprice\ + \ - Ratio of purchased price compared to median purchase price.\n # \ + \ repeat_retailer - If it's from a retailer that already has been purchased\ + \ from before.\n # used_chip - If the (credit card) chip was used.\n\ + \ # usedpinnumber - If the PIN number was used.\n # online_order\ + \ - If it was an online order.\n # fraud - If the transaction is fraudulent.\n\ + \ Data = pd.read_csv(data_input_path)\n\n # Set the input (X) and\ + \ output (Y) data.\n # The only output data we have is if it's fraudulent\ + \ or not, and all other fields go as inputs to the model.\n\n X = Data.drop(columns\ + \ = ['repeat_retailer','distance_from_home', 'fraud'])\n y = Data['fraud']\n\ + \n # Split the data into training and testing sets so we have something\ + \ to test the trained model with.\n\n # X_train, X_test, y_train, y_test\ + \ = train_test_split(X,y, test_size = 0.2, stratify = y)\n X_train, X_test,\ + \ y_train, y_test = train_test_split(X,y, test_size = 0.2, shuffle = False)\n\ + \n X_train, X_val, y_train, y_val = train_test_split(X_train,y_train,\ + \ test_size = 0.2, stratify = y_train)\n\n # Scale the data to remove\ + \ mean and have unit variance. This means that the data will be between\ + \ -1 and 1, which makes it a lot easier for the model to learn than random\ + \ potentially large values.\n # It is important to only fit the scaler\ + \ to the training data, otherwise you are leaking information about the\ + \ global distribution of variables (which is influenced by the test set)\ + \ into the training set.\n\n scaler = StandardScaler()\n\n X_train\ + \ = scaler.fit_transform(X_train.values)\n\n Path(\"artifact\").mkdir(parents=True,\ + \ exist_ok=True)\n with open(\"artifact/test_data.pkl\", \"wb\") as handle:\n\ + \ pickle.dump((X_test, y_test), handle)\n with open(\"artifact/scaler.pkl\"\ + , \"wb\") as handle:\n pickle.dump(scaler, handle)\n\n # Since\ + \ the dataset is unbalanced (it has many more non-fraud transactions than\ + \ fraudulent ones), we set a class weight to weight the few fraudulent transactions\ + \ higher than the many non-fraud transactions.\n\n class_weights = class_weight.compute_class_weight('balanced',classes\ + \ = np.unique(y_train),y = y_train)\n class_weights = {i : class_weights[i]\ + \ for i in range(len(class_weights))}\n\n\n # Build the model, the model\ + \ we build here is a simple fully connected deep neural network, containing\ + \ 3 hidden layers and one output layer.\n\n model = Sequential()\n \ + \ model.add(Dense(32, activation = 'relu', input_dim = len(X.columns)))\n\ + \ model.add(Dropout(0.2))\n model.add(Dense(32))\n model.add(BatchNormalization())\n\ + \ model.add(Activation('relu'))\n model.add(Dropout(0.2))\n model.add(Dense(32))\n\ + \ model.add(BatchNormalization())\n model.add(Activation('relu'))\n\ + \ model.add(Dropout(0.2))\n model.add(Dense(1, activation = 'sigmoid'))\n\ + \ model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])\n\ + \ model.summary()\n\n\n # Train the model and get performance\n\n\ + \ epochs = 2\n history = model.fit(X_train, y_train, epochs=epochs,\ + \ \\\n validation_data=(scaler.transform(X_val.values),y_val),\ + \ \\\n verbose = True, class_weight = class_weights)\n\ + \n # Save the model as ONNX for easy use of ModelMesh\n\n model_proto,\ + \ _ = tf2onnx.convert.from_keras(model)\n print(model_output_path)\n\ + \ onnx.save(model_proto, model_output_path)\n\n" + image: quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301 + exec-upload-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - upload_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.5.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'botocore'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef upload_model(input_model_path: InputPath()):\n import os\n\ + \ import boto3\n import botocore\n\n aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')\n\ + \ aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')\n \ + \ endpoint_url = os.environ.get('AWS_S3_ENDPOINT')\n region_name =\ + \ os.environ.get('AWS_DEFAULT_REGION')\n bucket_name = os.environ.get('AWS_S3_BUCKET')\n\ + \n s3_key = os.environ.get(\"S3_KEY\")\n\n session = boto3.session.Session(aws_access_key_id=aws_access_key_id,\n\ + \ aws_secret_access_key=aws_secret_access_key)\n\ + \n s3_resource = session.resource(\n 's3',\n config=botocore.client.Config(signature_version='s3v4'),\n\ + \ endpoint_url=endpoint_url,\n region_name=region_name)\n\n\ + \ bucket = s3_resource.Bucket(bucket_name)\n\n print(f\"Uploading\ + \ {s3_key}\")\n bucket.upload_file(input_model_path, s3_key)\n\n" + env: + - name: S3_KEY + value: models/fraud/1/model.onnx + image: quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301 +pipelineInfo: + name: 7-get-data-train-upload +root: + dag: + tasks: + get-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-data + taskInfo: + name: get-data + train-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-model + dependentTasks: + - get-data + inputs: + artifacts: + data_input_path: + taskOutputArtifact: + outputArtifactKey: data_output_path + producerTask: get-data + taskInfo: + name: train-model + upload-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-upload-model + dependentTasks: + - train-model + inputs: + artifacts: + input_model_path: + taskOutputArtifact: + outputArtifactKey: model_output_path + producerTask: train-model + taskInfo: + name: upload-model +schemaVersion: 2.1.0 +sdkVersion: kfp-2.5.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-upload-model: + secretAsEnv: + - keyToEnv: + - envVar: AWS_ACCESS_KEY_ID + secretKey: AWS_ACCESS_KEY_ID + - envVar: AWS_SECRET_ACCESS_KEY + secretKey: AWS_SECRET_ACCESS_KEY + - envVar: AWS_DEFAULT_REGION + secretKey: AWS_DEFAULT_REGION + - envVar: AWS_S3_BUCKET + secretKey: AWS_S3_BUCKET + - envVar: AWS_S3_ENDPOINT + secretKey: AWS_S3_ENDPOINT + secretName: aws-connection-my-storage diff --git a/modules/chapter4/pages/index.adoc b/modules/chapter4/pages/index.adoc index d2cf798..4f80e72 100644 --- a/modules/chapter4/pages/index.adoc +++ b/modules/chapter4/pages/index.adoc @@ -1,3 +1,11 @@ = KFP SDK -write imported pipelines section. \ No newline at end of file +Red Hat OpenShift AI offers two out-of-the-box mechanisms to work with Data Science Pipelines in terms of building and running machine learning pipelines. + +The first mechanism is the *Elyra Pipelines* JupyterLab extension, which provides a visual editor for creating pipelines based on Jupyter notebooks as well as `Python` or `R` scripts. + +The second mechanism, and the one discussed here is based on the *Kubeflow Pipelines SDK*. With the SDK, pipelines are built using `Python` scripts and submitted to the Data Science Pipelines runtime to be scheduled for execution. + +While the Elyra extension offers an easy to use visual editor to compose pipelines, and is generally used for simple workflows, the Kubeflow Pipelines SDK (*kfp*) offers a flexible Python Domain Specific Language (DSL) API to create pipelines from Python code. This approach offers you flexibility in composing complex workflows and has the added benefit of offering all the Python tooling, frameworks and developer experience that comes with writing Python code. + +OpenShift AI uses the *_Argo Wotkflows_* runtime to execute pipelines, which is why your Kubeflow pipeline containing Python code needs to be compiled into a yaml definition before it can be submitted to the runtime. Steps in the pipeline are executed as ephemeral pods (one per step). \ No newline at end of file diff --git a/modules/chapter4/pages/kfp-import.adoc b/modules/chapter4/pages/kfp-import.adoc index 3c90781..e7e5dc0 100644 --- a/modules/chapter4/pages/kfp-import.adoc +++ b/modules/chapter4/pages/kfp-import.adoc @@ -1,36 +1,25 @@ = Kubeflow Pipelines SDK -Red Hat OpenShift AI offers two out-of-the-box mechanisms to work with Data Science Pipelines in terms of building and running machine learning pipelines. +In the previous section, we created a two task pipeline by using the Elyra GUI pipeline editor. It’s often desirable to create pipelines by using code that can be version-controlled and shared with others. The kfp SDK provides a Python API for creating pipelines. The SDK is available as a Python package that you can install by using the `pip install kfp` command. With this package, you can use Python code to create a pipeline and then compile it to YAML format. Then we can import the YAML code into OpenShift AI. -The first mechanism is the *Elyra Pipelines* JupyterLab extension, which provides a visual editor for creating pipelines based on Jupyter notebooks as well as `Python` or `R` scripts. +This course does not delve into the details of how to use the SDK. Instead, it provides the files for you to view and upload. -The second mechanism, and the one discussed here is based on the *Kubeflow Pipelines SDK*. With the SDK, pipelines are built using `Python` scripts and submitted to the Data Science Pipelines runtime to be scheduled for execution. +//[NOTE] +//==== +//Data Science Pipelines in Red Hat OpenShift AI are managed by the *data-science-pipelines-operator-controller-manager* operator in the *redhat-ods-applications* namespace. The Custom Resource (CR) is an instance of *datasciencepipelinesapplications.datasciencepipelinesapplications.opendatahub.io*. +//==== -While the Elyra extension offers an easy to use visual editor to compose pipelines, and is generally used for simple workflows, the Kubeflow Pipelines SDK (*kfp*) offers a flexible Python Domain Specific Language (DSL) API to create pipelines from Python code. This approach offers you flexibility in composing complex workflows and has the added benefit of offering all the Python tooling, frameworks and developer experience that comes with writing Python code. - -OpenShift AI uses the *_Argo Wotkflows_* runtime to execute pipelines, which is why your Kubeflow pipeline containing Python code needs to be compiled into a yaml definition before it can be submitted to the runtime. Steps in the pipeline are executed as ephemeral pods (one per step). - -[NOTE] -==== -Data Science Pipelines in Red Hat OpenShift AI are managed by the *data-science-pipelines-operator-controller-manager* operator in the *redhat-ods-applications* namespace. The Custom Resource (CR) is an instance of *datasciencepipelinesapplications.datasciencepipelinesapplications.opendatahub.io*. -==== - -== Exercise: Creating a Simple Data Science Pipeline with the KFP SDK +== Exercise: Importing a Data Science Pipeline === Prerequisites -* Continue to use the `fraud-detection` Data Science Project that you created in the previous section. Ensure you complete the exercises in the previous section on Elyra. You will reuse several components from the previous exercise. - +* Continue to use the `fraud-detection` Data Science Project that you created in the previous section. We won't need a workbench in this section, but you should have completed all exercises through the Data Science Pipelines section of the course. . In the RHOAI side navigation menu, click `Data Science Pipelines > Pipelines`It's possible to click on the graph nodes to reveal information of the steps. -+ -// image::post-pipeline-run.png[title=Pipeline execution graphical view] -. Once the pipeline has completed, it is possible to access the output and pipeline artifacts (if used) in the object storage browser of the Minio web console. Open the Minio web console (you installed and configured Minio in the previous exercise on Elyra pipelines). -+ -In the Minio web console, click `Object Browser > data-science-pipelines > artifacts > PIPELINE_NAME-XXX`, where `xxxxx` is a randomly generated number for the pipeline run. You should the output artifacts generated by the pipeline. -+ +//In the Minio web console, click `Object Browser > data-science-pipelines > artifacts > PIPELINE_NAME-XXX`, where `xxxxx` is a randomly generated number for the pipeline run. You should the output artifacts generated by the pipeline. + // image::object-store-after-run.png[] === Experiments And Runs