From b35dbda633af25f7402dd1fab64566cfba13a89d Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Sun, 19 Nov 2023 13:34:42 +0800 Subject: [PATCH 1/9] fix --- .../parallel-run/Code/digit_identification.py | 10 ++++++---- .../file-dataset-image-inference-mnist.ipynb | 14 +++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py index ff187551a..476173126 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py @@ -8,6 +8,8 @@ from PIL import Image from azureml.core import Model +# Disable eager execution +tf.compat.v1.disable_eager_execution() def init(): global g_tf_sess @@ -16,9 +18,9 @@ def init(): model_path = Model.get_model_path("mnist-prs") # contruct graph to execute - tf.reset_default_graph() - saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta')) - g_tf_sess = tf.Session(config=tf.ConfigProto(device_count={'GPU': 0})) + tf.compat.v1.reset_default_graph() + saver = tf.compat.v1.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta')) + g_tf_sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(device_count={'GPU': 0})) saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model')) @@ -33,7 +35,7 @@ def run(mini_batch): data = Image.open(image) np_im = np.array(data).reshape((1, 784)) # perform inference - inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess) + inference_result = g_tf_sess.run(output, feed_dict={in_tensor: np_im}) # find best probability, and add to result list best_result = np.argmax(inference_result) resultList.append("{}: {}".format(os.path.basename(image), best_result)) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb index 267d97268..e62ed61bc 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb @@ -306,7 +306,7 @@ "#### An entry script\n", "This script accepts requests, scores the requests by using the model, and returns the results.\n", "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n", - " 1.\tAZUREML_BI_OUTPUT_PATH \u00e2\u20ac\u201c output folder path\n", + " 1.\tAZUREML_BI_OUTPUT_PATH – output folder path\n", "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", @@ -359,9 +359,9 @@ "from azureml.core import Environment\n", "from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n", "\n", - "batch_conda_deps = CondaDependencies.create(python_version=\"3.7\",\n", + "batch_conda_deps = CondaDependencies.create(python_version=\"3.8\",\n", " conda_packages=['pip==20.2.4'],\n", - " pip_packages=[\"tensorflow==1.15.2\", \"pillow\", \"protobuf==3.20.1\",\n", + " pip_packages=[\"tensorflow==2.13.0\", \"pillow\", \"protobuf==4.23.3\",\n", " \"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n", "batch_env = Environment(name=\"batch_environment\")\n", "batch_env.python.conda_dependencies = batch_conda_deps\n", @@ -601,9 +601,9 @@ "friendly_name": "MNIST data inferencing using ParallelRunStep", "index_order": 1, "kernelspec": { - "display_name": "Python 3.8 - AzureML", + "display_name": "prs_v1", "language": "python", - "name": "python38-azureml" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -615,7 +615,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.9" + "version": "3.8.16" }, "tags": [ "Batch Inferencing", @@ -625,4 +625,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} \ No newline at end of file +} From dbfbf9743bbeae870426eac2ca9bc1ef09af5606 Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Sun, 19 Nov 2023 13:54:19 +0800 Subject: [PATCH 2/9] fix unicode --- .../parallel-run/file-dataset-image-inference-mnist.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb index e62ed61bc..08dfb200c 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb @@ -306,7 +306,7 @@ "#### An entry script\n", "This script accepts requests, scores the requests by using the model, and returns the results.\n", "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n", - " 1.\tAZUREML_BI_OUTPUT_PATH – output folder path\n", + " 1.\tAZUREML_BI_OUTPUT_PATH - output folder path\n", "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", From 1a326a3eec548fd0e934cf505a6b090890af7435 Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 14:40:34 +0800 Subject: [PATCH 3/9] fix version issue --- .../parallel-run/Code/iris_score.py | 6 +++--- .../parallel-run/tabular-dataset-inference-iris.ipynb | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py index 5b1b89c05..ffe25dcd1 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py @@ -1,10 +1,10 @@ -import io +# import io import pickle import argparse -import numpy as np +# import numpy as np from azureml.core.model import Model -from sklearn.linear_model import LogisticRegression +# from sklearn.linear_model import LogisticRegression from azureml_user.parallel_run import EntryScript diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb index 103fd61a8..bafa19e52 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb @@ -252,7 +252,7 @@ "#### An entry script\n", "This script accepts requests, scores the requests by using the model, and returns the results.\n", "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. init method can make use of following environment variables (ParallelRunStep input):\n", - " 1.\tAZUREML_BI_OUTPUT_PATH \u00e2\u20ac\u201c output folder path\n", + " 1.\tAZUREML_BI_OUTPUT_PATH - output folder path\n", "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", @@ -308,10 +308,10 @@ "from azureml.core import Environment\n", "from azureml.core.runconfig import CondaDependencies\n", "\n", - "predict_conda_deps = CondaDependencies.create(python_version=\"3.7\", \n", + "predict_conda_deps = CondaDependencies.create(python_version=\"3.8\", \n", " conda_packages=['pip==20.2.4'],\n", - " pip_packages=[\"scikit-learn==0.20.3\",\n", - " \"azureml-core\", \"azureml-dataset-runtime[pandas,fuse]\"])\n", + " pip_packages=[\"numpy==1.19.5\", \"pandas==1.4.4\", \"scikit-learn==0.22.2\",\n", + " \"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n", "\n", "predict_env = Environment(name=\"predict_environment\")\n", "predict_env.python.conda_dependencies = predict_conda_deps\n", @@ -531,4 +531,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} \ No newline at end of file +} From 77ab1f11d7075a74ad835d319aeb8fba7ab2a00c Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 14:45:55 +0800 Subject: [PATCH 4/9] minor fix --- .../file-dataset-image-inference-mnist.ipynb | 1250 ++++++++--------- .../file-dataset-partition-per-folder.ipynb | 2 +- ...tabular-dataset-partition-per-column.ipynb | 4 +- 3 files changed, 628 insertions(+), 628 deletions(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb index 08dfb200c..70f8475db 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb @@ -1,628 +1,628 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Copyright (c) Microsoft Corporation. All rights reserved. \n", - "Licensed under the MIT License." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.png)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Using Azure Machine Learning Pipelines for Batch Inference\n", - "\n", - "In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n", - "\n", - "> **Tip**\n", - "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction.\n", - "\n", - "In this example will be take a digit identification model already-trained on MNIST dataset using the [AzureML training with deep learning example notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/ml-frameworks/keras/train-hyperparameter-tune-deploy-with-keras/train-hyperparameter-tune-deploy-with-keras.ipynb), and run that trained model on some of the MNIST test images in batch. \n", - "\n", - "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png). \n", - "\n", - "The outline of this notebook is as follows:\n", - "\n", - "- Create a DataStore referencing MNIST images stored in a blob container.\n", - "- Register the pretrained MNIST model into the model registry. \n", - "- Use the registered model to do batch inference on the images in the data blob container.\n", - "\n", - "## Prerequisites\n", - "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Connect to workspace\n", - "Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Check core SDK version number\n", - "import azureml.core\n", - "\n", - "print(\"SDK version:\", azureml.core.VERSION)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "from azureml.core import Workspace\n", - "\n", - "ws = Workspace.from_config()\n", - "print('Workspace name: ' + ws.name, \n", - " 'Azure region: ' + ws.location, \n", - " 'Subscription id: ' + ws.subscription_id, \n", - " 'Resource group: ' + ws.resource_group, sep = '\\n')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create or Attach existing compute resource\n", - "By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.\n", - "\n", - "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist.\n", - "\n", - "**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from azureml.core.compute import AmlCompute, ComputeTarget\n", - "\n", - "# choose a name for your cluster\n", - "compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n", - "compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n", - "compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 4)\n", - "\n", - "# This example uses CPU VM. For using GPU VM, set SKU to Standard_NC6s_v3\n", - "vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n", - "\n", - "\n", - "if compute_name in ws.compute_targets:\n", - " compute_target = ws.compute_targets[compute_name]\n", - " if compute_target and type(compute_target) is AmlCompute:\n", - " print('found compute target. just use it. ' + compute_name)\n", - "else:\n", - " print('creating a new compute target...')\n", - " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n", - " min_nodes = compute_min_nodes, \n", - " max_nodes = compute_max_nodes)\n", - "\n", - " # create the cluster\n", - " compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n", - " \n", - " # can poll for a minimum number of nodes and for a specific timeout. \n", - " # if no min node count is provided it will use the scale settings for the cluster\n", - " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", - " \n", - " # For a more detailed view of current AmlCompute status, use get_status()\n", - " print(compute_target.get_status().serialize())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create a datastore containing sample images\n", - "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png).\n", - "\n", - "We have created a public blob container `sampledata` on an account named `pipelinedata`, containing these images from the MNIST dataset. In the next step, we create a datastore with the name `images_datastore`, which points to this blob container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n", - "\n", - "This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.datastore import Datastore\n", - "\n", - "account_name = \"pipelinedata\"\n", - "datastore_name = \"mnist_datastore\"\n", - "container_name = \"sampledata\"\n", - "\n", - "mnist_data = Datastore.register_azure_blob_container(ws, \n", - " datastore_name=datastore_name, \n", - " container_name=container_name, \n", - " account_name=account_name,\n", - " overwrite=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, let's specify the default datastore for the outputs." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def_data_store = ws.get_default_datastore()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create a FileDataset\n", - "A [FileDataset](https://docs.microsoft.com/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) references single or multiple files in your datastores or public urls. The files can be of any format. FileDataset provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred.\n", - "You can use dataset objects as inputs. Register the datasets to the workspace if you want to reuse them later." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.dataset import Dataset\n", - "\n", - "mnist_ds_name = 'mnist_sample_data'\n", - "\n", - "path_on_datastore = mnist_data.path('mnist')\n", - "input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The input dataset can be specified as a pipeline parameter, so that you can pass in new data when rerun the PRS pipeline." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n", - "from azureml.pipeline.core import PipelineParameter\n", - "\n", - "pipeline_param = PipelineParameter(name=\"mnist_param\", default_value=input_mnist_ds)\n", - "input_mnist_ds_consumption = DatasetConsumptionConfig(\"minist_param_config\", pipeline_param).as_mount()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Intermediate/Output Data\n", - "Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "\n", - "output_dir = PipelineData(name=\"inferences\", datastore=def_data_store)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download the Model\n", - "\n", - "Download and extract the model from https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz to \"models\" directory" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import tarfile\n", - "import urllib.request\n", - "\n", - "# create directory for model\n", - "model_dir = 'models'\n", - "if not os.path.isdir(model_dir):\n", - " os.mkdir(model_dir)\n", - "\n", - "url=\"https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz\"\n", - "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", - "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", - "tar.extractall(model_dir)\n", - "\n", - "os.listdir(model_dir)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Register the model with Workspace\n", - "A registered model is a logical container for one or more files that make up your model. For example, if you have a model that's stored in multiple files, you can register them as a single model in the workspace. After you register the files, you can then download or deploy the registered model and receive all the files that you registered.\n", - "\n", - "Using tags, you can track useful information such as the name and version of the machine learning library used to train the model. Note that tags must be alphanumeric. Learn more about registering models [here](https://docs.microsoft.com/azure/machine-learning/v1/how-to-deploy-and-where#registermodel) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.model import Model\n", - "\n", - "# register downloaded model \n", - "model = Model.register(model_path=\"models/\",\n", - " model_name=\"mnist-prs\", # this is the name the model is registered as\n", - " tags={'pretrained': \"mnist\"},\n", - " description=\"Mnist trained tensorflow model\",\n", - " workspace=ws)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Using your model to make batch predictions\n", - "To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:\n", - "\n", - "#### An entry script\n", - "This script accepts requests, scores the requests by using the model, and returns the results.\n", - "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n", - " 1.\tAZUREML_BI_OUTPUT_PATH - output folder path\n", - "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", - "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", - "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", - " User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.\n", - " \n", - "\n", - "#### Dependencies\n", - "Helper scripts or Python/Conda packages required to run the entry script." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "scripts_folder = \"Code\"\n", - "script_file = \"digit_identification.py\"\n", - "\n", - "# peek at contents\n", - "with open(os.path.join(scripts_folder, script_file)) as inference_file:\n", - " print(inference_file.read())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Build and run the batch inference pipeline\n", - "The data, models, and compute resource are now available. Let's put all these together in a pipeline." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Specify the environment to run the script\n", - "Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment.\n", - "* Always include **azureml-core** and **azureml-dataset-runtime\\[fuse\\]** in the pip package list to make ParallelRunStep run properly.\n", - "\n", - "If you're using custom image (`batch_env.python.user_managed_dependencies = True`), you need to install the package to your image." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core import Environment\n", - "from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n", - "\n", - "batch_conda_deps = CondaDependencies.create(python_version=\"3.8\",\n", - " conda_packages=['pip==20.2.4'],\n", - " pip_packages=[\"tensorflow==2.13.0\", \"pillow\", \"protobuf==4.23.3\",\n", - " \"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n", - "batch_env = Environment(name=\"batch_environment\")\n", - "batch_env.python.conda_dependencies = batch_conda_deps\n", - "batch_env.docker.base_image = DEFAULT_CPU_IMAGE" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create the configuration to wrap the inference script" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n", - "\n", - "parallel_run_config = ParallelRunConfig(\n", - " source_directory=scripts_folder,\n", - " entry_script=script_file,\n", - " mini_batch_size=PipelineParameter(name=\"batch_size_param\", default_value=\"5\"),\n", - " error_threshold=10,\n", - " output_action=\"append_row\",\n", - " append_row_file_name=\"mnist_outputs.txt\",\n", - " environment=batch_env,\n", - " compute_target=compute_target,\n", - " process_count_per_node=PipelineParameter(name=\"process_count_param\", default_value=2),\n", - " node_count=2\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create the pipeline step\n", - "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "parallelrun_step = ParallelRunStep(\n", - " name=\"predict-digits-mnist\",\n", - " parallel_run_config=parallel_run_config,\n", - " inputs=[ input_mnist_ds_consumption ],\n", - " output=output_dir,\n", - " allow_reuse=False\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Run the pipeline\n", - "At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core import Experiment\n", - "\n", - "pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])\n", - "experiment = Experiment(ws, 'digit_identification')\n", - "pipeline_run = experiment.submit(pipeline)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Monitor the run\n", - "\n", - "The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# This will output information of the pipeline run, including the link to the details page of portal.\n", - "pipeline_run" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Optional: View detailed logs (streaming) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Wait the run for completion and show output log to console\n", - "pipeline_run.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### View the prediction results per input image\n", - "In the digit_identification.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import tempfile\n", - "\n", - "batch_run = pipeline_run.find_step_run(parallelrun_step.name)[0]\n", - "batch_output = batch_run.get_output_data(output_dir.name)\n", - "\n", - "target_dir = tempfile.mkdtemp()\n", - "batch_output.download(local_path=target_dir)\n", - "result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)\n", - "\n", - "df = pd.read_csv(result_file, delimiter=\":\", header=None)\n", - "df.columns = [\"Filename\", \"Prediction\"]\n", - "print(\"Prediction has \", df.shape[0], \" rows\")\n", - "df.head(10) " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Resubmit a with different dataset\n", - "Since we made the input a `PipelineParameter`, we can resubmit with a different dataset without having to create an entirely new experiment. We'll use the same datastore but use only a single image." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "path_on_datastore = mnist_data.path('mnist/0.png')\n", - "single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipeline_run_2 = experiment.submit(pipeline, \n", - " pipeline_parameters={\"mnist_param\": single_image_ds, \n", - " \"batch_size_param\": \"1\",\n", - " \"process_count_param\": 1}\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# This will output information of the pipeline run, including the link to the details page of portal.\n", - "pipeline_run_2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Wait the run for completion and show output log to console\n", - "pipeline_run_2.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Cleanup Compute resources\n", - "\n", - "For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single-run job, we are free to release the allocated compute resources." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# uncomment below and run if compute resources are no longer needed \n", - "# compute_target.delete() " - ] - } + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Azure Machine Learning Pipelines for Batch Inference\n", + "\n", + "In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n", + "\n", + "> **Tip**\n", + "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction.\n", + "\n", + "In this example will be take a digit identification model already-trained on MNIST dataset using the [AzureML training with deep learning example notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/ml-frameworks/keras/train-hyperparameter-tune-deploy-with-keras/train-hyperparameter-tune-deploy-with-keras.ipynb), and run that trained model on some of the MNIST test images in batch. \n", + "\n", + "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png). \n", + "\n", + "The outline of this notebook is as follows:\n", + "\n", + "- Create a DataStore referencing MNIST images stored in a blob container.\n", + "- Register the pretrained MNIST model into the model registry. \n", + "- Use the registered model to do batch inference on the images in the data blob container.\n", + "\n", + "## Prerequisites\n", + "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Connect to workspace\n", + "Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Check core SDK version number\n", + "import azureml.core\n", + "\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from azureml.core import Workspace\n", + "\n", + "ws = Workspace.from_config()\n", + "print('Workspace name: ' + ws.name, \n", + " 'Azure region: ' + ws.location, \n", + " 'Subscription id: ' + ws.subscription_id, \n", + " 'Resource group: ' + ws.resource_group, sep = '\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create or Attach existing compute resource\n", + "By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.\n", + "\n", + "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist.\n", + "\n", + "**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from azureml.core.compute import AmlCompute, ComputeTarget\n", + "\n", + "# choose a name for your cluster\n", + "compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n", + "compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n", + "compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 4)\n", + "\n", + "# This example uses CPU VM. For using GPU VM, set SKU to Standard_NC6s_v3\n", + "vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n", + "\n", + "\n", + "if compute_name in ws.compute_targets:\n", + " compute_target = ws.compute_targets[compute_name]\n", + " if compute_target and type(compute_target) is AmlCompute:\n", + " print('found compute target. just use it. ' + compute_name)\n", + "else:\n", + " print('creating a new compute target...')\n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n", + " min_nodes = compute_min_nodes, \n", + " max_nodes = compute_max_nodes)\n", + "\n", + " # create the cluster\n", + " compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n", + " \n", + " # can poll for a minimum number of nodes and for a specific timeout. \n", + " # if no min node count is provided it will use the scale settings for the cluster\n", + " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", + " \n", + " # For a more detailed view of current AmlCompute status, use get_status()\n", + " print(compute_target.get_status().serialize())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a datastore containing sample images\n", + "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png).\n", + "\n", + "We have created a public blob container `sampledata` on an account named `pipelinedata`, containing these images from the MNIST dataset. In the next step, we create a datastore with the name `images_datastore`, which points to this blob container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n", + "\n", + "This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.datastore import Datastore\n", + "\n", + "account_name = \"pipelinedata\"\n", + "datastore_name = \"mnist_datastore\"\n", + "container_name = \"sampledata\"\n", + "\n", + "mnist_data = Datastore.register_azure_blob_container(ws, \n", + " datastore_name=datastore_name, \n", + " container_name=container_name, \n", + " account_name=account_name,\n", + " overwrite=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, let's specify the default datastore for the outputs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def_data_store = ws.get_default_datastore()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a FileDataset\n", + "A [FileDataset](https://docs.microsoft.com/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) references single or multiple files in your datastores or public urls. The files can be of any format. FileDataset provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred.\n", + "You can use dataset objects as inputs. Register the datasets to the workspace if you want to reuse them later." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.dataset import Dataset\n", + "\n", + "mnist_ds_name = 'mnist_sample_data'\n", + "\n", + "path_on_datastore = mnist_data.path('mnist')\n", + "input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The input dataset can be specified as a pipeline parameter, so that you can pass in new data when rerun the PRS pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n", + "from azureml.pipeline.core import PipelineParameter\n", + "\n", + "pipeline_param = PipelineParameter(name=\"mnist_param\", default_value=input_mnist_ds)\n", + "input_mnist_ds_consumption = DatasetConsumptionConfig(\"minist_param_config\", pipeline_param).as_mount()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Intermediate/Output Data\n", + "Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "\n", + "output_dir = PipelineData(name=\"inferences\", datastore=def_data_store)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Download the Model\n", + "\n", + "Download and extract the model from https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz to \"models\" directory" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tarfile\n", + "import urllib.request\n", + "\n", + "# create directory for model\n", + "model_dir = 'models'\n", + "if not os.path.isdir(model_dir):\n", + " os.mkdir(model_dir)\n", + "\n", + "url=\"https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz\"\n", + "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", + "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", + "tar.extractall(model_dir)\n", + "\n", + "os.listdir(model_dir)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Register the model with Workspace\n", + "A registered model is a logical container for one or more files that make up your model. For example, if you have a model that's stored in multiple files, you can register them as a single model in the workspace. After you register the files, you can then download or deploy the registered model and receive all the files that you registered.\n", + "\n", + "Using tags, you can track useful information such as the name and version of the machine learning library used to train the model. Note that tags must be alphanumeric. Learn more about registering models [here](https://docs.microsoft.com/azure/machine-learning/v1/how-to-deploy-and-where#registermodel) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.model import Model\n", + "\n", + "# register downloaded model \n", + "model = Model.register(model_path=\"models/\",\n", + " model_name=\"mnist-prs\", # this is the name the model is registered as\n", + " tags={'pretrained': \"mnist\"},\n", + " description=\"Mnist trained tensorflow model\",\n", + " workspace=ws)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using your model to make batch predictions\n", + "To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:\n", + "\n", + "#### An entry script\n", + "This script accepts requests, scores the requests by using the model, and returns the results.\n", + "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n", + " 1.\tAZUREML_BI_OUTPUT_PATH - output folder path\n", + "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", + "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", + "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", + " User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.\n", + " \n", + "\n", + "#### Dependencies\n", + "Helper scripts or Python/Conda packages required to run the entry script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "scripts_folder = \"Code\"\n", + "script_file = \"digit_identification.py\"\n", + "\n", + "# peek at contents\n", + "with open(os.path.join(scripts_folder, script_file)) as inference_file:\n", + " print(inference_file.read())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build and run the batch inference pipeline\n", + "The data, models, and compute resource are now available. Let's put all these together in a pipeline." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Specify the environment to run the script\n", + "Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment.\n", + "* Always include **azureml-core** and **azureml-dataset-runtime\\[fuse\\]** in the pip package list to make ParallelRunStep run properly.\n", + "\n", + "If you're using custom image (`batch_env.python.user_managed_dependencies = True`), you need to install the package to your image." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core import Environment\n", + "from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n", + "\n", + "batch_conda_deps = CondaDependencies.create(python_version=\"3.8\",\n", + " conda_packages=['pip==20.2.4'],\n", + " pip_packages=[\"tensorflow==2.13.0\", \"pillow\", \"protobuf==4.23.3\",\n", + " \"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n", + "batch_env = Environment(name=\"batch_environment\")\n", + "batch_env.python.conda_dependencies = batch_conda_deps\n", + "batch_env.docker.base_image = DEFAULT_CPU_IMAGE" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the configuration to wrap the inference script" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n", + "\n", + "parallel_run_config = ParallelRunConfig(\n", + " source_directory=scripts_folder,\n", + " entry_script=script_file,\n", + " mini_batch_size=PipelineParameter(name=\"batch_size_param\", default_value=\"5\"),\n", + " error_threshold=10,\n", + " output_action=\"append_row\",\n", + " append_row_file_name=\"mnist_outputs.txt\",\n", + " environment=batch_env,\n", + " compute_target=compute_target,\n", + " process_count_per_node=PipelineParameter(name=\"process_count_param\", default_value=2),\n", + " node_count=2\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the pipeline step\n", + "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parallelrun_step = ParallelRunStep(\n", + " name=\"predict-digits-mnist\",\n", + " parallel_run_config=parallel_run_config,\n", + " inputs=[ input_mnist_ds_consumption ],\n", + " output=output_dir,\n", + " allow_reuse=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run the pipeline\n", + "At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core import Experiment\n", + "\n", + "pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])\n", + "experiment = Experiment(ws, 'digit_identification')\n", + "pipeline_run = experiment.submit(pipeline)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Monitor the run\n", + "\n", + "The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This will output information of the pipeline run, including the link to the details page of portal.\n", + "pipeline_run" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Optional: View detailed logs (streaming) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Wait the run for completion and show output log to console\n", + "pipeline_run.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### View the prediction results per input image\n", + "In the digit_identification.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import tempfile\n", + "\n", + "batch_run = pipeline_run.find_step_run(parallelrun_step.name)[0]\n", + "batch_output = batch_run.get_output_data(output_dir.name)\n", + "\n", + "target_dir = tempfile.mkdtemp()\n", + "batch_output.download(local_path=target_dir)\n", + "result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)\n", + "\n", + "df = pd.read_csv(result_file, delimiter=\":\", header=None)\n", + "df.columns = [\"Filename\", \"Prediction\"]\n", + "print(\"Prediction has \", df.shape[0], \" rows\")\n", + "df.head(10) " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Resubmit a with different dataset\n", + "Since we made the input a `PipelineParameter`, we can resubmit with a different dataset without having to create an entirely new experiment. We'll use the same datastore but use only a single image." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "path_on_datastore = mnist_data.path('mnist/0.png')\n", + "single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_run_2 = experiment.submit(pipeline, \n", + " pipeline_parameters={\"mnist_param\": single_image_ds, \n", + " \"batch_size_param\": \"1\",\n", + " \"process_count_param\": 1}\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This will output information of the pipeline run, including the link to the details page of portal.\n", + "pipeline_run_2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Wait the run for completion and show output log to console\n", + "pipeline_run_2.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup Compute resources\n", + "\n", + "For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single-run job, we are free to release the allocated compute resources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# uncomment below and run if compute resources are no longer needed \n", + "# compute_target.delete() " + ] + } + ], + "metadata": { + "authors": [ + { + "name": "prsbjdev" + } + ], + "category": "Other notebooks", + "compute": [ + "AML Compute" + ], + "datasets": [ + "MNIST" + ], + "deployment": [ + "None" + ], + "exclude_from_index": false, + "framework": [ + "None" + ], + "friendly_name": "MNIST data inferencing using ParallelRunStep", + "index_order": 1, + "kernelspec": { + "display_name": "Python 3.8 - AzureML", + "language": "python", + "name": "python38-azureml" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + }, + "tags": [ + "Batch Inferencing", + "Pipeline" ], - "metadata": { - "authors": [ - { - "name": "prsbjdev" - } - ], - "category": "Other notebooks", - "compute": [ - "AML Compute" - ], - "datasets": [ - "MNIST" - ], - "deployment": [ - "None" - ], - "exclude_from_index": false, - "framework": [ - "None" - ], - "friendly_name": "MNIST data inferencing using ParallelRunStep", - "index_order": 1, - "kernelspec": { - "display_name": "prs_v1", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - }, - "tags": [ - "Batch Inferencing", - "Pipeline" - ], - "task": "Digit identification" - }, - "nbformat": 4, - "nbformat_minor": 2 + "task": "Digit identification" + }, + "nbformat": 4, + "nbformat_minor": 2 } diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-partition-per-folder.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-partition-per-folder.ipynb index 98922dc82..cc793a0d8 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-partition-per-folder.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-partition-per-folder.ipynb @@ -390,7 +390,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.9" + "version": "3.8.16" } }, "nbformat": 4, diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.ipynb index 79e355122..224f3d812 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.ipynb @@ -413,9 +413,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.13" + "version": "3.8.16" } }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} From 1fa62a0ce8ad17277112a96b70e067d0984751c5 Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 14:58:51 +0800 Subject: [PATCH 5/9] recover indent --- .../parallel-run/tabular-dataset-inference-iris.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb index bafa19e52..df8bb0f9d 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb @@ -1,7 +1,7 @@ { "cells": [ { - "cell_type": "markdown", + "cell_type": "raw", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", From e305439944826ca83c3b77938d81d72c29c05fb9 Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 15:00:36 +0800 Subject: [PATCH 6/9] recover indent --- .../file-dataset-image-inference-mnist.ipynb | 1252 ++++++++--------- 1 file changed, 626 insertions(+), 626 deletions(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb index 70f8475db..42142b165 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.ipynb @@ -1,628 +1,628 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Copyright (c) Microsoft Corporation. All rights reserved. \n", - "Licensed under the MIT License." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.png)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Using Azure Machine Learning Pipelines for Batch Inference\n", - "\n", - "In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n", - "\n", - "> **Tip**\n", - "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction.\n", - "\n", - "In this example will be take a digit identification model already-trained on MNIST dataset using the [AzureML training with deep learning example notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/ml-frameworks/keras/train-hyperparameter-tune-deploy-with-keras/train-hyperparameter-tune-deploy-with-keras.ipynb), and run that trained model on some of the MNIST test images in batch. \n", - "\n", - "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png). \n", - "\n", - "The outline of this notebook is as follows:\n", - "\n", - "- Create a DataStore referencing MNIST images stored in a blob container.\n", - "- Register the pretrained MNIST model into the model registry. \n", - "- Use the registered model to do batch inference on the images in the data blob container.\n", - "\n", - "## Prerequisites\n", - "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Connect to workspace\n", - "Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Check core SDK version number\n", - "import azureml.core\n", - "\n", - "print(\"SDK version:\", azureml.core.VERSION)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "from azureml.core import Workspace\n", - "\n", - "ws = Workspace.from_config()\n", - "print('Workspace name: ' + ws.name, \n", - " 'Azure region: ' + ws.location, \n", - " 'Subscription id: ' + ws.subscription_id, \n", - " 'Resource group: ' + ws.resource_group, sep = '\\n')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create or Attach existing compute resource\n", - "By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.\n", - "\n", - "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist.\n", - "\n", - "**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from azureml.core.compute import AmlCompute, ComputeTarget\n", - "\n", - "# choose a name for your cluster\n", - "compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n", - "compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n", - "compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 4)\n", - "\n", - "# This example uses CPU VM. For using GPU VM, set SKU to Standard_NC6s_v3\n", - "vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n", - "\n", - "\n", - "if compute_name in ws.compute_targets:\n", - " compute_target = ws.compute_targets[compute_name]\n", - " if compute_target and type(compute_target) is AmlCompute:\n", - " print('found compute target. just use it. ' + compute_name)\n", - "else:\n", - " print('creating a new compute target...')\n", - " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n", - " min_nodes = compute_min_nodes, \n", - " max_nodes = compute_max_nodes)\n", - "\n", - " # create the cluster\n", - " compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n", - " \n", - " # can poll for a minimum number of nodes and for a specific timeout. \n", - " # if no min node count is provided it will use the scale settings for the cluster\n", - " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", - " \n", - " # For a more detailed view of current AmlCompute status, use get_status()\n", - " print(compute_target.get_status().serialize())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create a datastore containing sample images\n", - "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png).\n", - "\n", - "We have created a public blob container `sampledata` on an account named `pipelinedata`, containing these images from the MNIST dataset. In the next step, we create a datastore with the name `images_datastore`, which points to this blob container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n", - "\n", - "This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.datastore import Datastore\n", - "\n", - "account_name = \"pipelinedata\"\n", - "datastore_name = \"mnist_datastore\"\n", - "container_name = \"sampledata\"\n", - "\n", - "mnist_data = Datastore.register_azure_blob_container(ws, \n", - " datastore_name=datastore_name, \n", - " container_name=container_name, \n", - " account_name=account_name,\n", - " overwrite=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, let's specify the default datastore for the outputs." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def_data_store = ws.get_default_datastore()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create a FileDataset\n", - "A [FileDataset](https://docs.microsoft.com/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) references single or multiple files in your datastores or public urls. The files can be of any format. FileDataset provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred.\n", - "You can use dataset objects as inputs. Register the datasets to the workspace if you want to reuse them later." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.dataset import Dataset\n", - "\n", - "mnist_ds_name = 'mnist_sample_data'\n", - "\n", - "path_on_datastore = mnist_data.path('mnist')\n", - "input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The input dataset can be specified as a pipeline parameter, so that you can pass in new data when rerun the PRS pipeline." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n", - "from azureml.pipeline.core import PipelineParameter\n", - "\n", - "pipeline_param = PipelineParameter(name=\"mnist_param\", default_value=input_mnist_ds)\n", - "input_mnist_ds_consumption = DatasetConsumptionConfig(\"minist_param_config\", pipeline_param).as_mount()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Intermediate/Output Data\n", - "Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "\n", - "output_dir = PipelineData(name=\"inferences\", datastore=def_data_store)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download the Model\n", - "\n", - "Download and extract the model from https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz to \"models\" directory" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import tarfile\n", - "import urllib.request\n", - "\n", - "# create directory for model\n", - "model_dir = 'models'\n", - "if not os.path.isdir(model_dir):\n", - " os.mkdir(model_dir)\n", - "\n", - "url=\"https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz\"\n", - "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", - "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", - "tar.extractall(model_dir)\n", - "\n", - "os.listdir(model_dir)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Register the model with Workspace\n", - "A registered model is a logical container for one or more files that make up your model. For example, if you have a model that's stored in multiple files, you can register them as a single model in the workspace. After you register the files, you can then download or deploy the registered model and receive all the files that you registered.\n", - "\n", - "Using tags, you can track useful information such as the name and version of the machine learning library used to train the model. Note that tags must be alphanumeric. Learn more about registering models [here](https://docs.microsoft.com/azure/machine-learning/v1/how-to-deploy-and-where#registermodel) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.model import Model\n", - "\n", - "# register downloaded model \n", - "model = Model.register(model_path=\"models/\",\n", - " model_name=\"mnist-prs\", # this is the name the model is registered as\n", - " tags={'pretrained': \"mnist\"},\n", - " description=\"Mnist trained tensorflow model\",\n", - " workspace=ws)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Using your model to make batch predictions\n", - "To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:\n", - "\n", - "#### An entry script\n", - "This script accepts requests, scores the requests by using the model, and returns the results.\n", - "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n", - " 1.\tAZUREML_BI_OUTPUT_PATH - output folder path\n", - "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", - "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", - "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", - " User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.\n", - " \n", - "\n", - "#### Dependencies\n", - "Helper scripts or Python/Conda packages required to run the entry script." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "scripts_folder = \"Code\"\n", - "script_file = \"digit_identification.py\"\n", - "\n", - "# peek at contents\n", - "with open(os.path.join(scripts_folder, script_file)) as inference_file:\n", - " print(inference_file.read())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Build and run the batch inference pipeline\n", - "The data, models, and compute resource are now available. Let's put all these together in a pipeline." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Specify the environment to run the script\n", - "Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment.\n", - "* Always include **azureml-core** and **azureml-dataset-runtime\\[fuse\\]** in the pip package list to make ParallelRunStep run properly.\n", - "\n", - "If you're using custom image (`batch_env.python.user_managed_dependencies = True`), you need to install the package to your image." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core import Environment\n", - "from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n", - "\n", - "batch_conda_deps = CondaDependencies.create(python_version=\"3.8\",\n", - " conda_packages=['pip==20.2.4'],\n", - " pip_packages=[\"tensorflow==2.13.0\", \"pillow\", \"protobuf==4.23.3\",\n", - " \"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n", - "batch_env = Environment(name=\"batch_environment\")\n", - "batch_env.python.conda_dependencies = batch_conda_deps\n", - "batch_env.docker.base_image = DEFAULT_CPU_IMAGE" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create the configuration to wrap the inference script" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n", - "\n", - "parallel_run_config = ParallelRunConfig(\n", - " source_directory=scripts_folder,\n", - " entry_script=script_file,\n", - " mini_batch_size=PipelineParameter(name=\"batch_size_param\", default_value=\"5\"),\n", - " error_threshold=10,\n", - " output_action=\"append_row\",\n", - " append_row_file_name=\"mnist_outputs.txt\",\n", - " environment=batch_env,\n", - " compute_target=compute_target,\n", - " process_count_per_node=PipelineParameter(name=\"process_count_param\", default_value=2),\n", - " node_count=2\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create the pipeline step\n", - "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "parallelrun_step = ParallelRunStep(\n", - " name=\"predict-digits-mnist\",\n", - " parallel_run_config=parallel_run_config,\n", - " inputs=[ input_mnist_ds_consumption ],\n", - " output=output_dir,\n", - " allow_reuse=False\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Run the pipeline\n", - "At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core import Experiment\n", - "\n", - "pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])\n", - "experiment = Experiment(ws, 'digit_identification')\n", - "pipeline_run = experiment.submit(pipeline)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Monitor the run\n", - "\n", - "The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# This will output information of the pipeline run, including the link to the details page of portal.\n", - "pipeline_run" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Optional: View detailed logs (streaming) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Wait the run for completion and show output log to console\n", - "pipeline_run.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### View the prediction results per input image\n", - "In the digit_identification.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import tempfile\n", - "\n", - "batch_run = pipeline_run.find_step_run(parallelrun_step.name)[0]\n", - "batch_output = batch_run.get_output_data(output_dir.name)\n", - "\n", - "target_dir = tempfile.mkdtemp()\n", - "batch_output.download(local_path=target_dir)\n", - "result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)\n", - "\n", - "df = pd.read_csv(result_file, delimiter=\":\", header=None)\n", - "df.columns = [\"Filename\", \"Prediction\"]\n", - "print(\"Prediction has \", df.shape[0], \" rows\")\n", - "df.head(10) " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Resubmit a with different dataset\n", - "Since we made the input a `PipelineParameter`, we can resubmit with a different dataset without having to create an entirely new experiment. We'll use the same datastore but use only a single image." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "path_on_datastore = mnist_data.path('mnist/0.png')\n", - "single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipeline_run_2 = experiment.submit(pipeline, \n", - " pipeline_parameters={\"mnist_param\": single_image_ds, \n", - " \"batch_size_param\": \"1\",\n", - " \"process_count_param\": 1}\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# This will output information of the pipeline run, including the link to the details page of portal.\n", - "pipeline_run_2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Wait the run for completion and show output log to console\n", - "pipeline_run_2.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Cleanup Compute resources\n", - "\n", - "For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single-run job, we are free to release the allocated compute resources." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# uncomment below and run if compute resources are no longer needed \n", - "# compute_target.delete() " - ] - } - ], - "metadata": { - "authors": [ - { - "name": "prsbjdev" - } - ], - "category": "Other notebooks", - "compute": [ - "AML Compute" - ], - "datasets": [ - "MNIST" - ], - "deployment": [ - "None" - ], - "exclude_from_index": false, - "framework": [ - "None" - ], - "friendly_name": "MNIST data inferencing using ParallelRunStep", - "index_order": 1, - "kernelspec": { - "display_name": "Python 3.8 - AzureML", - "language": "python", - "name": "python38-azureml" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - }, - "tags": [ - "Batch Inferencing", - "Pipeline" + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Azure Machine Learning Pipelines for Batch Inference\n", + "\n", + "In this notebook, we will demonstrate how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning. Batch inference (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.\n", + "\n", + "> **Tip**\n", + "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction.\n", + "\n", + "In this example will be take a digit identification model already-trained on MNIST dataset using the [AzureML training with deep learning example notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/ml-frameworks/keras/train-hyperparameter-tune-deploy-with-keras/train-hyperparameter-tune-deploy-with-keras.ipynb), and run that trained model on some of the MNIST test images in batch. \n", + "\n", + "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png). \n", + "\n", + "The outline of this notebook is as follows:\n", + "\n", + "- Create a DataStore referencing MNIST images stored in a blob container.\n", + "- Register the pretrained MNIST model into the model registry. \n", + "- Use the registered model to do batch inference on the images in the data blob container.\n", + "\n", + "## Prerequisites\n", + "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first. This sets you up with a working config file that has information on your workspace, subscription id, etc. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Connect to workspace\n", + "Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Check core SDK version number\n", + "import azureml.core\n", + "\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from azureml.core import Workspace\n", + "\n", + "ws = Workspace.from_config()\n", + "print('Workspace name: ' + ws.name, \n", + " 'Azure region: ' + ws.location, \n", + " 'Subscription id: ' + ws.subscription_id, \n", + " 'Resource group: ' + ws.resource_group, sep = '\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create or Attach existing compute resource\n", + "By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.\n", + "\n", + "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist.\n", + "\n", + "**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from azureml.core.compute import AmlCompute, ComputeTarget\n", + "\n", + "# choose a name for your cluster\n", + "compute_name = os.environ.get(\"AML_COMPUTE_CLUSTER_NAME\", \"cpu-cluster\")\n", + "compute_min_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MIN_NODES\", 0)\n", + "compute_max_nodes = os.environ.get(\"AML_COMPUTE_CLUSTER_MAX_NODES\", 4)\n", + "\n", + "# This example uses CPU VM. For using GPU VM, set SKU to Standard_NC6s_v3\n", + "vm_size = os.environ.get(\"AML_COMPUTE_CLUSTER_SKU\", \"STANDARD_D2_V2\")\n", + "\n", + "\n", + "if compute_name in ws.compute_targets:\n", + " compute_target = ws.compute_targets[compute_name]\n", + " if compute_target and type(compute_target) is AmlCompute:\n", + " print('found compute target. just use it. ' + compute_name)\n", + "else:\n", + " print('creating a new compute target...')\n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,\n", + " min_nodes = compute_min_nodes, \n", + " max_nodes = compute_max_nodes)\n", + "\n", + " # create the cluster\n", + " compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n", + " \n", + " # can poll for a minimum number of nodes and for a specific timeout. \n", + " # if no min node count is provided it will use the scale settings for the cluster\n", + " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", + " \n", + " # For a more detailed view of current AmlCompute status, use get_status()\n", + " print(compute_target.get_status().serialize())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a datastore containing sample images\n", + "The input dataset used for this notebook differs from a standard MNIST dataset in that it has been converted to PNG images to demonstrate use of files as inputs to Batch Inference. A sample of PNG-converted images of the MNIST dataset were take from [this repository](https://github.com/myleott/mnist_png).\n", + "\n", + "We have created a public blob container `sampledata` on an account named `pipelinedata`, containing these images from the MNIST dataset. In the next step, we create a datastore with the name `images_datastore`, which points to this blob container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n", + "\n", + "This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.datastore import Datastore\n", + "\n", + "account_name = \"pipelinedata\"\n", + "datastore_name = \"mnist_datastore\"\n", + "container_name = \"sampledata\"\n", + "\n", + "mnist_data = Datastore.register_azure_blob_container(ws, \n", + " datastore_name=datastore_name, \n", + " container_name=container_name, \n", + " account_name=account_name,\n", + " overwrite=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, let's specify the default datastore for the outputs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def_data_store = ws.get_default_datastore()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a FileDataset\n", + "A [FileDataset](https://docs.microsoft.com/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) references single or multiple files in your datastores or public urls. The files can be of any format. FileDataset provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred.\n", + "You can use dataset objects as inputs. Register the datasets to the workspace if you want to reuse them later." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.dataset import Dataset\n", + "\n", + "mnist_ds_name = 'mnist_sample_data'\n", + "\n", + "path_on_datastore = mnist_data.path('mnist')\n", + "input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The input dataset can be specified as a pipeline parameter, so that you can pass in new data when rerun the PRS pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n", + "from azureml.pipeline.core import PipelineParameter\n", + "\n", + "pipeline_param = PipelineParameter(name=\"mnist_param\", default_value=input_mnist_ds)\n", + "input_mnist_ds_consumption = DatasetConsumptionConfig(\"minist_param_config\", pipeline_param).as_mount()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Intermediate/Output Data\n", + "Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "\n", + "output_dir = PipelineData(name=\"inferences\", datastore=def_data_store)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Download the Model\n", + "\n", + "Download and extract the model from https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz to \"models\" directory" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tarfile\n", + "import urllib.request\n", + "\n", + "# create directory for model\n", + "model_dir = 'models'\n", + "if not os.path.isdir(model_dir):\n", + " os.mkdir(model_dir)\n", + "\n", + "url=\"https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz\"\n", + "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", + "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", + "tar.extractall(model_dir)\n", + "\n", + "os.listdir(model_dir)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Register the model with Workspace\n", + "A registered model is a logical container for one or more files that make up your model. For example, if you have a model that's stored in multiple files, you can register them as a single model in the workspace. After you register the files, you can then download or deploy the registered model and receive all the files that you registered.\n", + "\n", + "Using tags, you can track useful information such as the name and version of the machine learning library used to train the model. Note that tags must be alphanumeric. Learn more about registering models [here](https://docs.microsoft.com/azure/machine-learning/v1/how-to-deploy-and-where#registermodel) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.model import Model\n", + "\n", + "# register downloaded model \n", + "model = Model.register(model_path=\"models/\",\n", + " model_name=\"mnist-prs\", # this is the name the model is registered as\n", + " tags={'pretrained': \"mnist\"},\n", + " description=\"Mnist trained tensorflow model\",\n", + " workspace=ws)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using your model to make batch predictions\n", + "To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:\n", + "\n", + "#### An entry script\n", + "This script accepts requests, scores the requests by using the model, and returns the results.\n", + "- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):\n", + " 1.\tAZUREML_BI_OUTPUT_PATH - output folder path\n", + "- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.
\n", + "__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
\n", + "__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.\n", + " User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.\n", + " \n", + "\n", + "#### Dependencies\n", + "Helper scripts or Python/Conda packages required to run the entry script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "scripts_folder = \"Code\"\n", + "script_file = \"digit_identification.py\"\n", + "\n", + "# peek at contents\n", + "with open(os.path.join(scripts_folder, script_file)) as inference_file:\n", + " print(inference_file.read())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build and run the batch inference pipeline\n", + "The data, models, and compute resource are now available. Let's put all these together in a pipeline." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Specify the environment to run the script\n", + "Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment.\n", + "* Always include **azureml-core** and **azureml-dataset-runtime\\[fuse\\]** in the pip package list to make ParallelRunStep run properly.\n", + "\n", + "If you're using custom image (`batch_env.python.user_managed_dependencies = True`), you need to install the package to your image." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core import Environment\n", + "from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE\n", + "\n", + "batch_conda_deps = CondaDependencies.create(python_version=\"3.8\",\n", + " conda_packages=['pip==20.2.4'],\n", + " pip_packages=[\"tensorflow==2.13.0\", \"pillow\", \"protobuf==4.23.3\",\n", + " \"azureml-core\", \"azureml-dataset-runtime[fuse]\"])\n", + "batch_env = Environment(name=\"batch_environment\")\n", + "batch_env.python.conda_dependencies = batch_conda_deps\n", + "batch_env.docker.base_image = DEFAULT_CPU_IMAGE" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the configuration to wrap the inference script" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig\n", + "\n", + "parallel_run_config = ParallelRunConfig(\n", + " source_directory=scripts_folder,\n", + " entry_script=script_file,\n", + " mini_batch_size=PipelineParameter(name=\"batch_size_param\", default_value=\"5\"),\n", + " error_threshold=10,\n", + " output_action=\"append_row\",\n", + " append_row_file_name=\"mnist_outputs.txt\",\n", + " environment=batch_env,\n", + " compute_target=compute_target,\n", + " process_count_per_node=PipelineParameter(name=\"process_count_param\", default_value=2),\n", + " node_count=2\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the pipeline step\n", + "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parallelrun_step = ParallelRunStep(\n", + " name=\"predict-digits-mnist\",\n", + " parallel_run_config=parallel_run_config,\n", + " inputs=[ input_mnist_ds_consumption ],\n", + " output=output_dir,\n", + " allow_reuse=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run the pipeline\n", + "At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core import Experiment\n", + "\n", + "pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])\n", + "experiment = Experiment(ws, 'digit_identification')\n", + "pipeline_run = experiment.submit(pipeline)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Monitor the run\n", + "\n", + "The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This will output information of the pipeline run, including the link to the details page of portal.\n", + "pipeline_run" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Optional: View detailed logs (streaming) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Wait the run for completion and show output log to console\n", + "pipeline_run.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### View the prediction results per input image\n", + "In the digit_identification.py file above you can see that the ResultList with the filename and the prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import tempfile\n", + "\n", + "batch_run = pipeline_run.find_step_run(parallelrun_step.name)[0]\n", + "batch_output = batch_run.get_output_data(output_dir.name)\n", + "\n", + "target_dir = tempfile.mkdtemp()\n", + "batch_output.download(local_path=target_dir)\n", + "result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)\n", + "\n", + "df = pd.read_csv(result_file, delimiter=\":\", header=None)\n", + "df.columns = [\"Filename\", \"Prediction\"]\n", + "print(\"Prediction has \", df.shape[0], \" rows\")\n", + "df.head(10) " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Resubmit a with different dataset\n", + "Since we made the input a `PipelineParameter`, we can resubmit with a different dataset without having to create an entirely new experiment. We'll use the same datastore but use only a single image." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "path_on_datastore = mnist_data.path('mnist/0.png')\n", + "single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_run_2 = experiment.submit(pipeline, \n", + " pipeline_parameters={\"mnist_param\": single_image_ds, \n", + " \"batch_size_param\": \"1\",\n", + " \"process_count_param\": 1}\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This will output information of the pipeline run, including the link to the details page of portal.\n", + "pipeline_run_2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Wait the run for completion and show output log to console\n", + "pipeline_run_2.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup Compute resources\n", + "\n", + "For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single-run job, we are free to release the allocated compute resources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# uncomment below and run if compute resources are no longer needed \n", + "# compute_target.delete() " + ] + } ], - "task": "Digit identification" - }, - "nbformat": 4, - "nbformat_minor": 2 -} + "metadata": { + "authors": [ + { + "name": "prsbjdev" + } + ], + "category": "Other notebooks", + "compute": [ + "AML Compute" + ], + "datasets": [ + "MNIST" + ], + "deployment": [ + "None" + ], + "exclude_from_index": false, + "framework": [ + "None" + ], + "friendly_name": "MNIST data inferencing using ParallelRunStep", + "index_order": 1, + "kernelspec": { + "display_name": "Python 3.8 - AzureML", + "language": "python", + "name": "python38-azureml" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + }, + "tags": [ + "Batch Inferencing", + "Pipeline" + ], + "task": "Digit identification" + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file From dcf6716d914b8fbdb66a99a56f2985f97d49da8a Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 15:02:54 +0800 Subject: [PATCH 7/9] recover format --- .../parallel-run/tabular-dataset-inference-iris.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb index df8bb0f9d..bafa19e52 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-inference-iris.ipynb @@ -1,7 +1,7 @@ { "cells": [ { - "cell_type": "raw", + "cell_type": "markdown", "metadata": {}, "source": [ "Copyright (c) Microsoft Corporation. All rights reserved.\n", From 50ba7cd05bb40e9d5b11f647e6a5fbbc16e63e0a Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 17:15:58 +0800 Subject: [PATCH 8/9] del unused lib --- .../parallel-run/Code/iris_score.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py index ffe25dcd1..6dc0c7cad 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/iris_score.py @@ -1,10 +1,6 @@ -# import io import pickle import argparse -# import numpy as np - from azureml.core.model import Model -# from sklearn.linear_model import LogisticRegression from azureml_user.parallel_run import EntryScript From d9479b98757c360a6d60e3e40430f8ace4fb3abc Mon Sep 17 00:00:00 2001 From: Xiangwei Wang Date: Mon, 20 Nov 2023 17:21:51 +0800 Subject: [PATCH 9/9] move one line --- .../parallel-run/Code/digit_identification.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py index 476173126..280f77cc1 100644 --- a/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py +++ b/how-to-use-azureml/machine-learning-pipelines/parallel-run/Code/digit_identification.py @@ -8,11 +8,12 @@ from PIL import Image from azureml.core import Model -# Disable eager execution -tf.compat.v1.disable_eager_execution() def init(): global g_tf_sess + + # Disable eager execution + tf.compat.v1.disable_eager_execution() # pull down model from workspace model_path = Model.get_model_path("mnist-prs")