From 47841d6f92d74d59535e5978226097c452d01454 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:31:29 -0700 Subject: [PATCH 1/2] Update documentation to reflect CPU-only execution mode (#1924) * Documents writing a stage that supports CPU execution mode * Updates `docs/source/developer_guide/contributing.md` cleaning up build and troubleshooting sections. Requires PRs #1851 & #1906 to be merged first Closes [#1737](https://github.com/nv-morpheus/Morpheus/issues/1737) ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - David Gardner (https://github.com/dagardner-nv) - Yuchen Zhang (https://github.com/yczhang-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1924 --- .../examples_cuda-125_arch-x86_64.yaml | 1 + dependencies.yaml | 7 + docs/README.md | 10 +- docs/source/basics/building_a_pipeline.md | 10 +- docs/source/basics/overview.rst | 15 +- docs/source/cloud_deployment_guide.md | 14 -- docs/source/developer_guide/contributing.md | 91 ++++++++---- ...modular_pipeline_digital_fingerprinting.md | 6 - .../guides/1_simple_python_stage.md | 39 ++++-- .../guides/2_real_world_phishing.md | 66 +++++---- .../guides/3_simple_cpp_stage.md | 23 ++- .../guides/4_source_cpp_stage.md | 7 +- .../guides/5_digital_fingerprinting.md | 7 +- .../6_digital_fingerprinting_reference.md | 3 - .../guides/9_control_messages.md | 36 ++--- examples/abp_nvsmi_detection/README.md | 5 +- examples/cpu_only/README.md | 72 ++++++++++ .../1_simple_python_stage/pass_thru_deco.py | 2 +- .../developer_guide/2_2_rabbitmq/README.md | 1 + .../4_rabbitmq_cpp_stage/README.md | 2 - .../rabbitmq_source_stage.py | 6 +- .../gnn_fraud_detection_pipeline/README.md | 6 +- examples/llm/agents/README.md | 6 +- examples/llm/agents/run.py | 2 +- examples/llm/completion/README.md | 5 +- examples/llm/completion/run.py | 2 +- examples/ransomware_detection/README.md | 1 - examples/root_cause_analysis/README.md | 4 +- python/morpheus/morpheus/io/deserializers.py | 2 +- .../morpheus/morpheus/utils/type_aliases.py | 12 +- python/morpheus/morpheus/utils/type_utils.py | 132 +++++++++++++++++- 31 files changed, 426 insertions(+), 169 deletions(-) create mode 100644 examples/cpu_only/README.md diff --git a/conda/environments/examples_cuda-125_arch-x86_64.yaml b/conda/environments/examples_cuda-125_arch-x86_64.yaml index ffcae28e4a..e387e2c9bf 100644 --- a/conda/environments/examples_cuda-125_arch-x86_64.yaml +++ b/conda/environments/examples_cuda-125_arch-x86_64.yaml @@ -42,6 +42,7 @@ dependencies: - pip - pluggy=1.3 - pydantic +- pynvml=11.4 - pypdf=3.17.4 - pypdfium2=4.30 - python-confluent-kafka>=1.9.2,<1.10.0a0 diff --git a/dependencies.yaml b/dependencies.yaml index 95809bb0ee..05393f209c 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -150,6 +150,7 @@ files: arch: [x86_64] includes: - cve-mitigation + - example-abp-nvsmi - example-dfp-prod - example-gnn - example-llms @@ -442,6 +443,12 @@ dependencies: - dgl==2.0.0 - dglgo + example-abp-nvsmi: + common: + - output_types: [conda] + packages: + - pynvml=11.4 + example-llms: common: - output_types: [conda] diff --git a/docs/README.md b/docs/README.md index 4fe4c43e58..b0a3162a6a 100644 --- a/docs/README.md +++ b/docs/README.md @@ -17,18 +17,10 @@ # Building Documentation -Additional packages required for building the documentation are defined in `./conda_docs.yml`. - -## Install Additional Dependencies -From the root of the Morpheus repo: -```bash -conda env update --solver=libmamba -n morpheus --file conda/environments/dev_cuda-125_arch-x86_64.yaml --prune -``` - ## Build Morpheus and Documentation ``` CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_DOCS=ON" ./scripts/compile.sh --target morpheus_docs ``` Outputs to `build/docs/html` - + If the documentation build is unsuccessful, refer to the **Out of Date Build Cache** section in [Troubleshooting](./source/extra_info/troubleshooting.md) to troubleshoot. diff --git a/docs/source/basics/building_a_pipeline.md b/docs/source/basics/building_a_pipeline.md index 65fadb0cf6..06985d5ef6 100644 --- a/docs/source/basics/building_a_pipeline.md +++ b/docs/source/basics/building_a_pipeline.md @@ -107,7 +107,7 @@ morpheus --log_level=DEBUG run pipeline-other \ Then the following error displays: ``` -RuntimeError: The to-file stage cannot handle input of . Accepted input types: (,) +RuntimeError: The to-file stage cannot handle input of . Accepted input types: (,) ``` This indicates that the ``to-file`` stage cannot accept the input type of `morpheus.messages.ControlMessage`. This is because the ``to-file`` stage has no idea how to write that class to a file; it only knows how to write instances of `morpheus.messages.message_meta.MessageMeta`. To ensure you have a valid pipeline, examine the `Accepted input types: (,)` portion of the message. This indicates you need a stage that converts from the output type of the `deserialize` stage, `ControlMessage`, to `MessageMeta`, which is exactly what the `serialize` stage does. @@ -207,7 +207,7 @@ This example shows an NLP Pipeline which uses several stages available in Morphe #### Launching Triton Run the following to launch Triton and load the `sid-minibert` model: ```bash -docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 nvcr.io/nvidia/morpheus/morpheus-tritonserver-models:24.10 --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model sid-minibert-onnx +docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 nvcr.io/nvidia/morpheus/morpheus-tritonserver-models:24.10 tritonserver --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model sid-minibert-onnx ``` #### Launching Kafka @@ -216,15 +216,15 @@ Follow steps 1-8 in [Quick Launch Kafka Cluster](../developer_guide/contributing ![../img/nlp_kitchen_sink.png](../img/nlp_kitchen_sink.png) ```bash -morpheus --log_level=INFO run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=32 \ +morpheus --log_level=INFO run --pipeline_batch_size=1024 --model_max_batch_size=32 \ pipeline-nlp --viz_file=.tmp/nlp_kitchen_sink.png \ from-file --filename examples/data/pcap_dump.jsonlines \ deserialize \ preprocess \ - inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8001 \ + inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8000 \ monitor --description "Inference Rate" --smoothing=0.001 --unit "inf" \ add-class \ - filter --threshold=0.8 \ + filter --filter_source=TENSOR --threshold=0.8 \ serialize --include 'timestamp' --exclude '^_ts_' \ to-kafka --bootstrap_servers localhost:9092 --output_topic "inference_output" \ monitor --description "ToKafka Rate" --smoothing=0.001 --unit "msg" diff --git a/docs/source/basics/overview.rst b/docs/source/basics/overview.rst index ca1f8b6981..4b00a59062 100644 --- a/docs/source/basics/overview.rst +++ b/docs/source/basics/overview.rst @@ -39,16 +39,22 @@ run: $ morpheus run --help Usage: morpheus run [OPTIONS] COMMAND [ARGS]... + Run subcommand, used for running a pipeline + Options: - --num_threads INTEGER RANGE Number of internal pipeline threads to use [default: 12; x>=1] + --num_threads INTEGER RANGE Number of internal pipeline threads to use [default: 64; x>=1] --pipeline_batch_size INTEGER RANGE Internal batch size for the pipeline. Can be much larger than the model batch size. Also used for Kafka consumers [default: 256; x>=1] --model_max_batch_size INTEGER RANGE Max batch size to use for the model [default: 8; x>=1] --edge_buffer_size INTEGER RANGE - The size of buffered channels to use between nodes in a pipeline. Larger values reduce backpressure at the cost of memory. Smaller values will push - messages through the pipeline quicker. Must be greater than 1 and a power of 2 (i.e. 2, 4, 8, 16, etc.) [default: 128; x>=2] - --use_cpp BOOLEAN Whether or not to use C++ node and message types or to prefer python. Only use as a last resort if bugs are encountered [default: True] + The size of buffered channels to use between nodes in a pipeline. Larger values reduce backpressure at the cost of memory. Smaller + values will push messages through the pipeline quicker. Must be greater than 1 and a power of 2 (i.e. 2, 4, 8, 16, etc.) [default: + 128; x>=2] + --use_cpp BOOLEAN [Deprecated] Whether or not to use C++ node and message types or to prefer python. Only use as a last resort if bugs are encountered. + Cannot be used with --use_cpu_only [default: True] + --use_cpu_only Whether or not to run in CPU only mode, setting this to True will disable C++ mode. Cannot be used with --use_cpp + --manual_seed INTEGER RANGE Manually seed the random number generators used by Morpheus, useful for testing. [x>=1] --help Show this message and exit. Commands: @@ -57,6 +63,7 @@ run: pipeline-nlp Run the inference pipeline with a NLP model pipeline-other Run a custom inference pipeline without a specific model type + Currently, Morpheus pipeline can be operated in four different modes. * ``pipeline-ae`` diff --git a/docs/source/cloud_deployment_guide.md b/docs/source/cloud_deployment_guide.md index 1dac95c9ae..060e85a452 100644 --- a/docs/source/cloud_deployment_guide.md +++ b/docs/source/cloud_deployment_guide.md @@ -434,11 +434,9 @@ Inference and training based on a user ID (`user123`). The model is trained once ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=2 \ --edge_buffer_size=4 \ --pipeline_batch_size=1024 \ --model_max_batch_size=1024 \ - --use_cpp=False \ pipeline-ae \ --columns_file=data/columns_ae_cloudtrail.txt \ --userid_filter=user123 \ @@ -480,11 +478,9 @@ Pipeline example to read data from a file, run inference using a `phishing-bert- ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=2 \ --edge_buffer_size=4 \ --pipeline_batch_size=1024 \ --model_max_batch_size=32 \ - --use_cpp=True \ pipeline-nlp \ --model_seq_length=128 \ --labels_file=data/labels_phishing.txt \ @@ -510,11 +506,9 @@ Pipeline example to read messages from an input Kafka topic, run inference using ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=2 \ --edge_buffer_size=4 \ --pipeline_batch_size=1024 \ --model_max_batch_size=32 \ - --use_cpp=True \ pipeline-nlp \ --model_seq_length=128 \ --labels_file=data/labels_phishing.txt \ @@ -557,9 +551,7 @@ Pipeline example to read data from a file, run inference using a `sid-minibert-o ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=3 \ --edge_buffer_size=4 \ - --use_cpp=True \ --pipeline_batch_size=1024 \ --model_max_batch_size=32 \ pipeline-nlp \ @@ -586,9 +578,7 @@ Pipeline example to read messages from an input Kafka topic, run inference using ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=3 \ --edge_buffer_size=4 \ - --use_cpp=True \ --pipeline_batch_size=1024 \ --model_max_batch_size=32 \ pipeline-nlp \ @@ -631,11 +621,9 @@ Pipeline example to read data from a file, run inference using an `abp-nvsmi-xgb ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=3 \ --edge_buffer_size=4 \ --pipeline_batch_size=1024 \ --model_max_batch_size=64 \ - --use_cpp=True \ pipeline-fil --columns_file=data/columns_fil.txt \ from-file --filename=./examples/data/nvsmi.jsonlines \ monitor --description 'FromFile Rate' --smoothing=0.001 \ @@ -657,10 +645,8 @@ Pipeline example to read messages from an input Kafka topic, run inference using ```bash helm install --set ngc.apiKey="$API_KEY" \ --set sdk.args="morpheus --log_level=DEBUG run \ - --num_threads=3 \ --pipeline_batch_size=1024 \ --model_max_batch_size=64 \ - --use_cpp=True \ pipeline-fil --columns_file=data/columns_fil.txt \ from-kafka --input_topic --bootstrap_servers broker:9092 \ monitor --description 'FromKafka Rate' --smoothing=0.001 \ diff --git a/docs/source/developer_guide/contributing.md b/docs/source/developer_guide/contributing.md index b31edbfc64..aa52cccda7 100644 --- a/docs/source/developer_guide/contributing.md +++ b/docs/source/developer_guide/contributing.md @@ -153,14 +153,12 @@ This workflow utilizes a Docker container to set up most dependencies ensuring a If a Conda environment on the host machine is preferred over Docker, it is relatively easy to install the necessary dependencies (In reality, the Docker workflow creates a Conda environment inside the container). -Note: These instructions assume the user is using `mamba` instead of `conda` since its improved solver speed is very helpful when working with a large number of dependencies. If you are not familiar with `mamba` you can install it with `conda install -n base -c conda-forge mamba` (Make sure to only install into the base environment). `mamba` is a drop in replacement for `conda` and all Conda commands are compatible between the two. - #### Prerequisites - Volta architecture GPU or better - [CUDA 12.1](https://developer.nvidia.com/cuda-12-1-0-download-archive) -- `conda` and `mamba` - - If `conda` and `mamba` are not installed, we recommend using the MiniForge install guide which is located [here](https://github.com/conda-forge/miniforge). This will install both `conda` and `mamba` and set the channel default to use `conda-forge`. +- `conda` + - If `conda` is not installed, we recommend using the [MiniForge install guide](https://github.com/conda-forge/miniforge). This will install `conda` and set the channel default to use `conda-forge`. 1. Set up environment variables and clone the repo: ```bash @@ -168,13 +166,10 @@ Note: These instructions assume the user is using `mamba` instead of `conda` sin git clone https://github.com/nv-morpheus/Morpheus.git $MORPHEUS_ROOT cd $MORPHEUS_ROOT ``` - -2. Ensure all submodules are checked out: - -```bash -git submodule update --init --recursive -``` - +1. Ensure all submodules are checked out: + ```bash + git submodule update --init --recursive + ``` 1. Create the Morpheus Conda environment ```bash conda env create --solver=libmamba -n morpheus --file conda/environments/dev_cuda-125_arch-x86_64.yaml @@ -182,19 +177,18 @@ git submodule update --init --recursive ``` This creates a new environment named `morpheus`, and activates that environment. -1. Build Morpheus + + > **Note**: The `dev_cuda-121_arch-x86_64.yaml` Conda environment file specifies all of the dependencies required to build Morpheus and run Morpheus. However many of the examples, and optional packages such as `morpheus_llm` require additional dependencies. Alternately the following command can be used to create the Conda environment: ```bash - ./scripts/compile.sh + conda env create --solver=libmamba -n morpheus --file conda/environments/all_cuda-121_arch-x86_64.yaml + conda activate morpheus ``` - This script will run both CMake Configure with default options and CMake build. -1. Install Morpheus +1. Build Morpheus ```bash - pip install -e ${MORPHEUS_ROOT}/python/morpheus - pip install -e ${MORPHEUS_ROOT}/python/morpheus_llm - pip install -e ${MORPHEUS_ROOT}/python/morpheus_dfp + ./scripts/compile.sh ``` - Once Morpheus has been built, it can be installed into the current virtual environment. -1. Test the build (Note: some tests will be skipped)\ + This script will build and install Morpheus into the Conda environment. +1. Test the build (Note: some tests will be skipped) Some of the tests will rely on external data sets. ```bash MORPHEUS_ROOT=${PWD} @@ -213,15 +207,26 @@ git submodule update --init --recursive npm install -g camouflage-server@0.15 ``` - Run all tests: - ```bash - pytest --run_slow - ``` -1. Optional: Install cuML - - Many users may wish to install cuML. Due to the complex dependency structure and versioning requirements, we need to specify exact versions of each package. The command to accomplish this is: + - Run end-to-end (aka slow) tests: + ```bash + pytest --run_slow + ``` +1. Optional: Run Kafka and Milvus tests + - Download Kafka: ```bash - mamba install -c rapidsai -c nvidia -c conda-forge cuml=23.06 + python ./ci/scripts/download_kafka.py ``` + + - Run all tests (this will skip over tests that require optional dependencies which are not installed): + ```bash + pytest --run_slow --run_kafka --run_milvus + ``` + + - Run all tests including those that require optional dependencies: + ```bash + pytest --fail_missing --run_slow --run_kafka --run_milvus + ``` + 1. Run Morpheus ```bash morpheus run pipeline-nlp ... @@ -372,6 +377,36 @@ Due to the large number of dependencies, it's common to run into build issues. T - Message indicating `git apply ...` failed - Many of the dependencies require small patches to make them work. These patches must be applied once and only once. If this error displays, try deleting the offending package from the `build/_deps/` directory or from `.cache/cpm/`. - If all else fails, delete the entire `build/` directory and `.cache/` directory. + - Older build artifacts when performing an in-place build. + - When built with `MORPHEUS_PYTHON_INPLACE_BUILD=ON` compiled libraries will be deployed in-place in the source tree, and older build artifacts exist in the source tree. Remove these with: + ```bash + find ./python -name "*.so" -delete + find ./examples -name "*.so" -delete + ``` + - Issues building documentation + - Intermediate documentation build artifacts can cause errors for Sphinx. To remove these, run: + ```bash + rm -rf build/docs/ docs/source/_modules docs/source/_lib + ``` + - CI Issues + - To run CI locally, the `ci/scripts/run_ci_local.sh` script can be used. For example to run a local CI build: + ```bash + ci/scripts/run_ci_local.sh build + ``` + - Build artifacts resulting from a local CI run can be found in the `.tmp/local_ci_tmp/` directory. + - To troubleshoot a particular CI stage it can be helpful to run: + ```bash + ci/scripts/run_ci_local.sh bash + ``` + + This will open a bash shell inside the CI container with all of the environment variables typically set during a CI run. From here you can run the commands that would typically be run by one of the CI scripts in `ci/scripts/github`. + + To run a CI stage requiring a GPU (ex: `test`), set the `USE_GPU` environment variable to `1`: + ```bash + USE_GPU=1 ci/scripts/run_ci_local.sh bash + ``` + +Refer to the [troubleshooting guide](../extra_info/troubleshooting.md) for more information on common issues and how to resolve them. ## Licensing Morpheus is licensed under the Apache v2.0 license. All new source files including CMake and other build scripts should contain the Apache v2.0 license header. Any edits to existing source code should update the date range of the copyright to the current year. The format for the license header is: @@ -401,7 +436,7 @@ Third-party code included in the source tree (that is not pulled in as an extern Ex: ``` /** - * SPDX-FileCopyrightText: Copyright (c) 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) , NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md b/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md index e48b8c6df2..74ddb500cb 100644 --- a/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md +++ b/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md @@ -539,7 +539,6 @@ To run the DFP pipelines with the example datasets within the container, run the ```bash python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ - --use_cpp=true \ --source duo \ --start_time "2022-08-01" \ --duration "60d" \ @@ -551,7 +550,6 @@ To run the DFP pipelines with the example datasets within the container, run the ```bash python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ - --use_cpp=true \ --source duo \ --start_time "2022-08-30" \ --input_file "./control_messages/duo_payload_inference.json" @@ -561,7 +559,6 @@ To run the DFP pipelines with the example datasets within the container, run the ```bash python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ - --use_cpp=true \ --source duo \ --start_time "2022-08-01" \ --duration "60d" \ @@ -573,7 +570,6 @@ To run the DFP pipelines with the example datasets within the container, run the ```bash python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ - --use_cpp=true \ --source azure \ --start_time "2022-08-01" \ --duration "60d" \ @@ -585,7 +581,6 @@ To run the DFP pipelines with the example datasets within the container, run the ```bash python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ - --use_cpp=true \ --source azure \ --start_time "2022-08-30" \ --input_file "./control_messages/azure_payload_inference.json" @@ -595,7 +590,6 @@ To run the DFP pipelines with the example datasets within the container, run the ```bash python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ - --use_cpp=true \ --source azure \ --start_time "2022-08-01" \ --duration "60d" \ diff --git a/docs/source/developer_guide/guides/1_simple_python_stage.md b/docs/source/developer_guide/guides/1_simple_python_stage.md index 0ed1a08d59..27586de578 100644 --- a/docs/source/developer_guide/guides/1_simple_python_stage.md +++ b/docs/source/developer_guide/guides/1_simple_python_stage.md @@ -29,7 +29,7 @@ To start, we will implement a single stage that could be included in a pipeline. ### Stand-alone Function -The stand-alone function approach is the simplest way to define a stage. The function should accept a single argument, which will be the input message, and return a single value, which will be the output message. The function should be decorated with the `morpheus.pipeline.stage_decorator.stage` decorator. +The stand-alone function approach is the simplest way to define a stage. The function should accept a single argument, which will be the input message, and return a single value, which will be the output message. The function should be decorated with the {py:func}`~morpheus.pipeline.stage_decorator.stage` decorator. ```python import typing @@ -52,6 +52,20 @@ def pass_thru_stage(message: typing.Any) -> typing.Any: return message ``` +By default, Morpheus stages are assumed to require a GPU. However since this stage doesn't perform any specific GPU operations. We can indicate that the stage does not require a GPU by passing a tuple of supported execution modes to the decorator as follows: +```python +import typing + +from morpheus.config import ExecutionMode +from morpheus.pipeline.stage_decorator import stage + + +@stage(name="pass-thru", execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU)) +def pass_thru_stage(message: typing.Any) -> typing.Any: + # Return the message for the next stage + return message +``` + We can then add our stage to a pipeline as follows: ```python config = Config() @@ -60,7 +74,7 @@ pipeline = LinearPipeline(config) pipeline.add_stage(pass_thru_stage(config)) ``` -It is possible to provide additional keyword arguments to the function. Consider the following example: +It is also possible to provide additional keyword arguments to the function. Consider the following example: ```python @stage def multiplier(message: MessageMeta, *, column: str, value: int | float = 2.0) -> MessageMeta: @@ -76,11 +90,13 @@ pipe.add_stage(multiplier(config, column='probs', value=5)) ### Stage Class -The class based approach to defining a stage offers a bit more flexibility, specifically the ability to validate constructor arguments, and perform any needed setup prior to being invoked in a pipeline. Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class. +The class based approach to defining a stage offers a bit more flexibility, specifically the ability to validate constructor arguments, and perform any needed setup prior to being invoked in a pipeline. Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from {py:class}`~morpheus.pipeline.single_port_stage.SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the {py:class}`~morpheus.pipeline.single_output_source.SingleOutputSource` base class. + +Stages in Morpheus define what types of data they accept, and the type of data that they emit. In this example we are emitting messages of the same type that is received, this is actually quite common and Morpheus provides a mixin class, {py:class}`~morpheus.pipeline.pass_thru_type_mixin.PassThruTypeMixin`, to simplify this. -Stages in Morpheus define what types of data they accept, and the type of data that they emit. In this example we are emitting messages of the same type that is received, this is actually quite common and Morpheus provides a mixin class, `PassThruTypeMixin`, to simplify this. +Similar to the function based stage, the class based stage will be not require a GPU, and we will indicate that it is able to be used in both GPU and CPU execution modes by utilizing the {py:class}`~morpheus.pipeline.execution_mode_mixins.GpuAndCpuMixin`. -Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [`numpydoc`](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line. +Optionally, stages can be registered as a command with the Morpheus CLI using the {py:func}`~morpheus.cli.register_stage.register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [`numpydoc`](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line. We start our class definition with a few basic imports: @@ -91,12 +107,13 @@ import mrc from mrc.core import operators as ops from morpheus.cli.register_stage import register_stage +from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @register_stage("pass-thru") -class PassThruStage(PassThruTypeMixin, SinglePortStage): +class PassThruStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage): ``` There are four methods that need to be defined in our new subclass to implement the stage interface: `name`, `accepted_types`, `compute_schema`, `supports_cpp_node`, and `_build_single`. In practice, it is often necessary to define at least one more method which will perform the actual work of the stage; by convention, this method is typically named `on_data`, which we will define in our examples. @@ -108,7 +125,7 @@ There are four methods that need to be defined in our new subclass to implement return "pass-thru" ``` -The `accepted_types` method returns a tuple of message classes that this stage is able to accept as input. Morpheus uses this to validate that the parent of this stage emits a message that this stage can accept. Since our stage is a pass through, we will declare that we can accept any incoming message type. Note that production stages will often declare only a single Morpheus message class such as `MessageMeta` or `ControlMessage` (refer to the message classes defined in `morpheus.messages` for a complete list). +The `accepted_types` method returns a tuple of message classes that this stage is able to accept as input. Morpheus uses this to validate that the parent of this stage emits a message that this stage can accept. Since our stage is a pass through, we will declare that we can accept any incoming message type. Note that production stages will often declare only a single Morpheus message class such as {py:class}`~morpheus.messages.MessageMeta` or {py:class}`~morpheus.messages.ControlMessage` (refer to the message classes defined in {py:mod}`~morpheus.messages` for a complete list). ```python def accepted_types(self) -> tuple: return (typing.Any,) @@ -171,12 +188,13 @@ import mrc from mrc.core import operators as ops from morpheus.cli.register_stage import register_stage +from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @register_stage("pass-thru") -class PassThruStage(PassThruTypeMixin, SinglePortStage): +class PassThruStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage): """ A Simple Pass Through Stage """ @@ -191,12 +209,11 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage): def supports_cpp_node(self) -> bool: return False - def on_data(self, message: typing.Any): + def on_data(self, message: typing.Any) -> typing.Any: # Return the message for the next stage return message - def _build_single(self, builder: mrc.Builder, - input_node: mrc.SegmentObject) -> mrc.SegmentObject: + def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: node = builder.make_node(self.unique_name, ops.map(self.on_data)) builder.make_edge(input_node, node) diff --git a/docs/source/developer_guide/guides/2_real_world_phishing.md b/docs/source/developer_guide/guides/2_real_world_phishing.md index 0d27f0de98..b1ae038f1a 100644 --- a/docs/source/developer_guide/guides/2_real_world_phishing.md +++ b/docs/source/developer_guide/guides/2_real_world_phishing.md @@ -29,7 +29,7 @@ For this task, we'll need to define a new stage, which we will call our `Recipie 1. Count the number of recipients in the email's metadata. 1. Emit a Morpheus `MessageMeta` object that will contain the record content along with the augmented metadata. -For this stage, the code will be similar to the previous example with a few notable changes. We will be working with the `MessageMeta` class. This is a Morpheus message containing a [cuDF](https://docs.rapids.ai/api/cudf/stable/) [DataFrame](https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/dataframe/). Since we will expect our new stage to operate on `MessageMeta` types, our new `accepted_types` method is defined as: +For this stage, the code will be similar to the previous example with a few notable changes. We will be working with the {py:class}`~morpheus.messages.MessageMeta` class. This is a Morpheus message containing a [cuDF](https://docs.rapids.ai/api/cudf/stable/) [DataFrame](https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/dataframe/). Since we will expect our new stage to operate on `MessageMeta` types, our new `accepted_types` method is defined as: ```python def accepted_types(self) -> tuple: @@ -99,13 +99,13 @@ def __init__(self, config: Config): Refer to the [Stage Constructors](#stage-constructors) section for more details. -Since the purpose of this stage is specifically tied to pre-processing text data for an NLP pipeline, when we register the stage, we will explicitly limit the stage to NLP pipelines: +Since the purpose of this stage is specifically tied to pre-processing text data for an NLP pipeline, when we register the stage, we will explicitly limit the stage to NLP pipelines. In addition to this since the pipeline our stage is operating in is a GPU pipeline, we will not be utilizing the `GpuAndCpuMixin` mixin from the previous example.: ```python @register_stage("recipient-features", modes=[PipelineModes.NLP]) class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage): ``` -Our `_build_single` method remains unchanged from the previous example; even though we are modifying the incoming messages, our input and output types remain the same and we continue to make use of the `PassThruTypeMixin`. +Our `_build_single` method remains unchanged from the previous example; even though we are modifying the incoming messages, our input and output types remain the same and we continue to make use of the {py:class}`~morpheus.pipeline.pass_thru_type_mixin.PassThruTypeMixin`. ### The Completed Preprocessing Stage @@ -540,7 +540,7 @@ MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] default="phishing-bert-onnx", help="The name of the model that is deployed on Tritonserver.", ) -@click.option("--server_url", default='localhost:8001', help="Tritonserver url.") +@click.option("--server_url", default='localhost:8000', help="Tritonserver url.") @click.option( "--output_file", default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), @@ -630,7 +630,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis recipient-features \ deserialize \ preprocess --vocab_hash_file=data/bert-base-uncased-hash.txt --truncation=true --do_lower_case=true --add_special_tokens=false \ - inf-triton --model_name=phishing-bert-onnx --server_url=localhost:8001 --force_convert_inputs=true \ + inf-triton --model_name=phishing-bert-onnx --server_url=localhost:8000 --force_convert_inputs=true \ monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \ add-scores --label=is_phishing \ serialize \ @@ -639,7 +639,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis ## Stage Constructors -In our `RecipientFeaturesStage` example we added a constructor to our stage, however we didn't go into much detail on the implementation. Every stage constructor must receive an instance of a `morpheus.config.Config` object as its first argument and is then free to define additional stage-specific arguments after that. The Morpheus configuration object will contain configuration parameters needed by multiple stages in the pipeline, and the constructor in each Morpheus stage is free to inspect these. In contrast, parameters specific to a single stage are typically defined as constructor arguments. It is a best practice to perform any necessary validation checks in the constructor, and raising an exception in the case of mis-configuration. This allows us to fail early rather than after the pipeline has started. +In our `RecipientFeaturesStage` example we added a constructor to our stage, however we didn't go into much detail on the implementation. Every stage constructor must receive an instance of a {py:class}`~morpheus.config.Config` object as its first argument and is then free to define additional stage-specific arguments after that. The Morpheus configuration object will contain configuration parameters needed by multiple stages in the pipeline, and the constructor in each Morpheus stage is free to inspect these. In contrast, parameters specific to a single stage are typically defined as constructor arguments. It is a best practice to perform any necessary validation checks in the constructor, and raising an exception in the case of mis-configuration. This allows us to fail early rather than after the pipeline has started. In our `RecipientFeaturesStage` example, we hard-coded the Bert separator token. Let's instead refactor the code to receive that as a constructor argument. This new constructor argument is documented following the [`numpydoc`](https://numpydoc.readthedocs.io/en/latest/format.html#parameters) formatting style allowing it to be documented properly for both API and CLI users. Let's also take the opportunity to verify that the pipeline mode is set to `morpheus.config.PipelineModes.NLP`. @@ -742,12 +742,18 @@ Options: ### Class Based Approach -Creating a new source stage is similar to defining any other stage with a few differences. First, we will be subclassing `SingleOutputSource` including the `PreallocatorMixin`. Second, the required methods are the `name` property, `_build_source`, `compute_schema` and `supports_cpp_node` methods. +Creating a new source stage is similar to defining any other stage with a few differences. First, we will be subclassing {py:class}`~morpheus.pipeline.single_output_source.SingleOutputSource` and including the `PreallocatorMixin`. Second, the required methods are the `name` property, `_build_source`, `compute_schema` and `supports_cpp_node` methods. In this example, we will create a source that reads messages from a [RabbitMQ](https://www.rabbitmq.com/) queue using the [pika](https://pika.readthedocs.io/en/stable/#) client for Python. For simplicity, we will assume that authentication is not required for our RabbitMQ exchange and that the body of the RabbitMQ messages will be JSON formatted. Both authentication and support for other formats could be easily added later. The `PreallocatorMixin` when added to a stage class, typically a source stage, indicates that the stage emits newly constructed DataFrames either directly or contained in a `MessageMeta` instance into the pipeline. Adding this mixin allows any columns needed by other stages to be inserted into the DataFrame. +Similar to the pass through stage, this new source stage should be able to operate in both GPU and CPU execution modes, as such we will be using the `GpuAndCpuMixin` mixin. One thing to note is that the DataFrame payload of a `MessageMeta` object is always a `cudf.DataFrame` when running in GPU mode and a `pandas.DataFrame` when running in CPU mode. When supporting both GPU and CPU execution modes, care must be taken to avoid directly importing `cudf` (or any other package requiring a GPU) when running in CPU mode on a system without a GPU and would therefore result in an error. Stages are able to examine the execution mode with the `morpheus.config.Config.execution_mode` attribute. The {py:func}`~morpheus.utils.type_utils.get_df_pkg` helper method is used to import the appropriate DataFrame package based on the execution mode in the constructor: +```python + # This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode + self._df_pkg = get_df_pkg(config.execution_mode) +``` + The `compute_schema` method allows us to define our output type of `MessageMeta`, we do so by calling the `set_type` method of the `output_schema` attribute of the `StageSchema` object passed into the method. Of note here is that it is perfectly valid for a stage to determine its output type based upon configuration arguments passed into the constructor. However the stage must document a single output type per output port. If a stage emitted multiple output types, then the types must share a common base class which would serve as the stage's output type. ```python def compute_schema(self, schema: StageSchema): @@ -771,7 +777,7 @@ def source_generator(self, subscription: mrc.Subscription) -> collections.abc.It if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) - df = cudf.io.read_json(buffer, orient='records', lines=True) + df = self._df_pkg.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) @@ -799,20 +805,20 @@ import mrc import pandas as pd import pika -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.messages.message_meta import MessageMeta +from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stage_schema import StageSchema +from morpheus.utils.type_utils import get_df_pkg logger = logging.getLogger(__name__) @register_stage("from-rabbitmq") -class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): +class RabbitMQSourceStage(PreallocatorMixin, GpuAndCpuMixin, SingleOutputSource): """ Source stage used to load messages from a RabbitMQ queue. @@ -854,6 +860,9 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): self._poll_interval = pd.Timedelta(poll_interval) + # This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode + self._df_pkg = get_df_pkg(config.execution_mode) + @property def name(self) -> str: return "from-rabbitmq" @@ -874,7 +883,7 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) - df = cudf.io.read_json(buffer, orient='records', lines=True) + df = self._df_pkg.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) @@ -889,7 +898,7 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): ``` ### Function Based Approach -Similar to the `stage` decorator used in previous examples Morpheus provides a `source` decorator which wraps a generator function to be used as a source stage. In the class based approach we explicitly added the `PreallocatorMixin`, when using the `source` decorator the return type annotation will be inspected and a stage will be created with the `PreallocatorMixin` if the return type is a `DataFrame` type or a message which contains a `DataFrame` (`MessageMeta` and `ControlMessage`). +Similar to the `stage` decorator used in previous examples Morpheus provides a {py:func}`~morpheus.pipeline.stage_decorator.source` decorator which wraps a generator function to be used as a source stage. In the class based approach we explicitly added the `PreallocatorMixin`, when using the `source` decorator the return type annotation will be inspected and a stage will be created with the `PreallocatorMixin` if the return type is a `DataFrame` type or a message which contains a `DataFrame` (`MessageMeta` and `ControlMessage`). We will also indicate which execution modes are supported by the stage by setting the `execution_modes` argument to the decorator. The code for the function will first perform the same setup as was used in the class constructor, then entering a nearly identical loop as that in the `source_generator` method. @@ -903,15 +912,15 @@ import mrc import pandas as pd import pika -import cudf - +from morpheus.config import ExecutionMode from morpheus.messages.message_meta import MessageMeta from morpheus.pipeline.stage_decorator import source +from morpheus.utils.type_utils import get_df_pkg logger = logging.getLogger(__name__) -@source(name="from-rabbitmq") +@source(name="from-rabbitmq", execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU)) def rabbitmq_source(subscription: mrc.Subscription, host: str, exchange: str, @@ -950,13 +959,15 @@ def rabbitmq_source(subscription: mrc.Subscription, poll_interval = pd.Timedelta(poll_interval) + df_pkg = get_df_pkg() + try: while subscription.is_subscribed(): (method_frame, _, body) = channel.basic_get(queue_name) if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) - df = cudf.io.read_json(buffer, orient='records', lines=True) + df = df_pkg.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) @@ -995,16 +1006,21 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> return node ``` -Similar to our previous examples, most of the actual business logic of the stage is contained in the `on_data` method. In this case, we grab a reference to the [cuDF](https://docs.rapids.ai/api/cudf/stable/) [DataFrame](https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/dataframe/) attached to the incoming message. We then serialize to an [`io.StringIO`](https://docs.python.org/3.10/library/io.html?highlight=stringio#io.StringIO) buffer, which is then sent to RabbitMQ. +Similar to our previous examples, most of the actual business logic of the stage is contained in the `on_data` method. In this case, we grab a reference to the DataFrane attached to the incoming message. We then serialize to an [`io.StringIO`](https://docs.python.org/3.10/library/io.html?highlight=stringio#io.StringIO) buffer, which is then sent to RabbitMQ. + +> **Note**: This stage supports both GPU and CPU execution modes. When running in GPU mode, the payload of a `MessageMeta` object is always a [cuDF](https://docs.rapids.ai/api/cudf/stable/) [DataFrame](https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/dataframe/). When running in CPU mode, the payload is always a [pandas](https://pandas.pydata.org/) [DataFrane](https://pandas.pydata.org/docs/reference/frame.html). In many cases the two will be API compatible without requiring any changes to the code. In some cases however, the API may differ slightly and there is a need to know the payload type, care must be taken not to directly import `cudf` or any other package requiring a GPU when running in CPU mode on a system without a GPU. Morpheus provides some helper methods to assist with this in the {py:mod}`~morpheus.utils.type_utils` module, such as {py:func}`~morpheus.utils.type_utils.is_cudf_type` and {py:func}`~morpheus.utils.type_utils.get_df_pkg_from_obj`. ```python -def on_data(self, message: MessageMeta): - df = message.df - buffer = StringIO() - df.to_json(buffer, orient='records', lines=True) - body = buffer.getvalue().strip() - self._channel.basic_publish(exchange=self._exchange, routing_key=self._routing_key, body=body) - return message + def on_data(self, message: MessageMeta) -> MessageMeta: + df = message.df + + buffer = StringIO() + df.to_json(buffer, orient='records', lines=True) + body = buffer.getvalue().strip() + + self._channel.basic_publish(exchange=self._exchange, routing_key=self._routing_key, body=body) + + return message ``` The two new methods introduced in this example are the `on_error` and `on_complete` methods. For both methods, we want to make sure the [connection](https://pika.readthedocs.io/en/stable/modules/connection.html) object is properly closed. diff --git a/docs/source/developer_guide/guides/3_simple_cpp_stage.md b/docs/source/developer_guide/guides/3_simple_cpp_stage.md index f21317475d..676a2df8a6 100644 --- a/docs/source/developer_guide/guides/3_simple_cpp_stage.md +++ b/docs/source/developer_guide/guides/3_simple_cpp_stage.md @@ -34,18 +34,13 @@ pip install ./ ## Overview Morpheus offers the choice of writing pipeline stages in either Python or C++. For many use cases, a Python stage is perfectly fine. However, in the event that a Python stage becomes a bottleneck for the pipeline, then writing a C++ implementation for the stage becomes advantageous. The C++ implementations of Morpheus stages and messages utilize the [pybind11](https://pybind11.readthedocs.io/en/stable/index.html) library to provide Python bindings. -We have been defining our stages in Python up to this point, the option of defining a C++ implementation is only available to stages implemented as classes. Many of the stages included with Morpheus have both a Python and a C++ implementation, and Morpheus will use the C++ implementations by default. You can explicitly disable the use of C++ stage implementations by calling `morpheus.config.CppConfig.set_should_use_cpp(False)`: +We have been defining our stages in Python up to this point, the option of defining a C++ implementation is only available to stages implemented as classes. Many of the stages included with Morpheus have both a Python and a C++ implementation, and Morpheus will use the C++ implementations by default when running in the GPU execution mode. When running in the CPU execution mode, Morpheus will always use the Python implementation. -```python -from morpheus.config import CppConfig -CppConfig.set_should_use_cpp(False) -``` +If a stage does not have a C++ implementation, Morpheus will fall back to the Python implementation without any additional configuration. Morpheus stages which only contain a C++ implementation, still require a Python class to register the stage, and provide the stage's configuration. -If a stage does not have a C++ implementation, Morpheus will fall back to the Python implementation without any additional configuration and operate in a hybrid execution mode. +In addition to C++ accelerated stage implementations, Morpheus also provides a C++ implementation for message primitives. When using the GPU GPU execution mode (the default), constructing one of the Python message classes defined under {py:mod}`~morpheus.messages` will return a Python object with bindings to the underlying C++ implementation. -In addition to C++ accelerated stage implementations, Morpheus also provides a C++ implementation for message primitives. When C++ execution is enabled, constructing one of the Python message classes defined under `morpheus.messages` will return a Python object with bindings to the underlying C++ implementation. - -Since we are defining our stages in Python, it becomes the responsibility of the Python stage to build a C++ accelerated node. This happens in the `_build_source` and `_build_single` methods. Ultimately it is the decision of a Python stage to build a Python node or a C++ node. It is perfectly acceptable to build a Python node when `morpheus.config.CppConfig.get_should_use_cpp()` is configured to `True`. It is not acceptable, however, to build a C++ node when `morpheus.config.CppConfig.get_should_use_cpp() == False`. The reason is the C++ implementations of Morpheus' messages can be consumed by Python and C++ stage implementations alike. However when `morpheus.config.CppConfig.get_should_use_cpp() == False`, the Python implementations of each message type will be used which cannot be consumed by the C++ implementations of stages. +Since we are defining our stages in Python, it becomes the responsibility of the Python stage to build a C++ accelerated node. This happens in the `_build_source` and `_build_single` methods. The Python stage should call `self._build_cpp_node()` to determine if a C++ node should be built, and ultimately it is the decision of a Python stage to build a Python node or a C++ node. It is perfectly acceptable to build a Python node when `self._build_cpp_node()` is returns `True`. It is not acceptable, however, to build a C++ node when `self._build_cpp_node()` returns `False`. The reason is the C++ implementations of Morpheus messages can be consumed by Python and C++ stage implementations alike. However the Python implementations of Morpheus messages cannot be consumed by the C++ implementations of stages. Python stages which have a C++ implementation must advertise this functionality by returning a value of `True` from the `supports_cpp_node` method: @@ -84,7 +79,7 @@ Both the `PythonSource` and `PythonNode` classes are defined in the `pymrc/node. As in our Python guide, we will start with a simple pass through stage which can be used as a starting point for future development of other stages. Note that by convention, C++ classes in Morpheus have the same name as their corresponding Python classes and are located under a directory named `_lib`. We will be following that convention. To start, we will create a `_lib` directory and a new empty `__init__.py` file. -While our Python implementation accepts messages of any type (in the form of Python objects), on the C++ side we don't have that flexibility since our node is subject to C++ static typing rules. In practice, this isn't a limitation as we usually know which specific message types we need to work with. For this example we will be working with the `ControlMessage` as our input and output type, it is also a common base type for many other Morpheus message classes. This means that at build time our Python stage implementation is able to build a C++ node when the incoming type is `ControlMessage`, while falling back to the existing Python implementation otherwise. +While our Python implementation accepts messages of any type (in the form of Python objects), on the C++ side we don't have that flexibility since our node is subject to C++ static typing rules. In practice, this isn't a limitation as we usually know which specific message types we need to work with. For this example we will be working with the `ControlMessage` as our input and output type. This means that at build time our Python stage implementation is able to build a C++ node when the incoming type is `ControlMessage`, while falling back to the existing Python implementation otherwise. To start with, we have our Morpheus and MRC-specific includes: @@ -371,7 +366,7 @@ def compute_schema(self, schema: StageSchema): ``` > **Note**: We are still using the `PassThruTypeMixin` to handle the requirements of setting the output type. -As mentioned in the previous section, our `_build_single` method needs to be updated to build a C++ node when the input type is `ControlMessage` and when `morpheus.config.CppConfig.get_should_use_cpp()` is `True` using the `self._build_cpp_node()` method. The `_build_cpp_node()` method compares both `morpheus.config.CppConfig.get_should_use_cpp()` and `supports_cpp_node()` and returns `True` only when both methods return `True`. +As mentioned in the previous section, our `_build_single` method needs to be updated to build a C++ node when the input type is `ControlMessage` and when `self._build_cpp_node()` returns `True`. ```python def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: @@ -398,13 +393,14 @@ from mrc.core import operators as ops from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.messages import ControlMessage +from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stage_schema import StageSchema @register_stage("pass-thru") -class PassThruStage(PassThruTypeMixin, SinglePortStage): +class PassThruStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage): def __init__(self, config: Config): super().__init__(config) @@ -438,11 +434,10 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage): builder.make_edge(input_node, node) return node - ``` ## Testing the Stage -To test the updated stage we will build a simple pipeline using the Morpheus command line tool. In order to illustrate the stage building a C++ node only when the input type is a `ControlMessage` we will insert the `pass-thru` stage in twice in the pipeline. In the first instance the input type will be `MessageMeta` and the stage will fallback to using a Python node, and in the second instance the input type will be a `ControlMessage` and the stage will build a C++ node. +To test the updated stage we will build a simple pipeline using the Morpheus command line tool. In order to illustrate the stage building a C++ node only when the input type is a `ControlMessage` we will insert the `pass-thru` stage in twice in the pipeline. In the first instance the input type will be `MessageMeta` and the stage will fallback to using a Python node, and in the second instance the input type will be a `ControlMessage` and the stage will build a C++ node. ```bash PYTHONPATH="examples/developer_guide/3_simple_cpp_stage/src" \ diff --git a/docs/source/developer_guide/guides/4_source_cpp_stage.md b/docs/source/developer_guide/guides/4_source_cpp_stage.md index 49beda09c2..02049e595b 100644 --- a/docs/source/developer_guide/guides/4_source_cpp_stage.md +++ b/docs/source/developer_guide/guides/4_source_cpp_stage.md @@ -36,6 +36,8 @@ For this example, we are going to add a C++ implementation for the `RabbitMQSour For communicating with [RabbitMQ](https://www.rabbitmq.com/) we will be using the [SimpleAmqpClient](https://github.com/alanxz/SimpleAmqpClient) library, and [libcudf](https://docs.rapids.ai/api/libcudf/stable/index.html) for constructing the `DataFrame`. +> **Note**: Since the C++ implementation will only be used when the execution mode is set to GPU. It is safe to assume the C++ implementation will always interact with cuDF DataFrames, and the Python implementation will always interact with pandas DataFrames. + ## Header Definition Our includes: @@ -477,7 +479,8 @@ PYBIND11_MODULE(rabbitmq_cpp_stage, m) ## Python Changes -Previously, our stage connected to the RabbitMQ server in the constructor. This is no longer advantageous to us when C++ execution is enabled. Instead, we will record our constructor arguments and move the connection code to a new `connect` method. Our new constructor and `connect` methods are updated to: +Previously, our stage connected to the RabbitMQ server in the constructor. This is no longer advantageous to us when C++ execution is enabled. Instead, we will record our constructor arguments and move the connection code to a new `connect` method. Since this stage's C++ implementation will always be used when running in GPU mode, we can assume the Python implementation will always interact with pandas DataFrames. +Our new constructor and `connect` methods are updated to: ```python def __init__(self, @@ -513,7 +516,7 @@ def connect(self): self._channel.queue_bind(exchange=self._exchange, queue=self._queue_name) ``` -Lastly, our `_build_source` method needs to be updated to build a C++ node when `morpheus.config.CppConfig.get_should_use_cpp()` is configured to `True` by using the `self._build_cpp_node()` method. +Lastly, our `_build_source` method needs to be updated to build a C++ node when `self._build_cpp_node()` returns `True`. ```python def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: diff --git a/docs/source/developer_guide/guides/5_digital_fingerprinting.md b/docs/source/developer_guide/guides/5_digital_fingerprinting.md index e64b8a91d4..4ad65fa6d2 100644 --- a/docs/source/developer_guide/guides/5_digital_fingerprinting.md +++ b/docs/source/developer_guide/guides/5_digital_fingerprinting.md @@ -186,11 +186,14 @@ docker compose build > This is most likely due to using an older version of the `docker-compose` command, instead re-run the build with `docker compose`. Refer to [Migrate to Compose V2](https://docs.docker.com/compose/migrate/) for more information. #### Downloading the example datasets -First, we will need to install `s3fs` and then run the `examples/digital_fingerprinting/fetch_example_data.py` script. This will download the example data into the `examples/data/dfp` dir. +First, we will need to install additional requirements in to the Conda environment. Then run the `examples/digital_fingerprinting/fetch_example_data.py` script. This will download the example data into the `examples/data/dfp` dir. From the Morpheus repo, run: ```bash -pip install s3fs +conda env update --solver=libmamba \ + -n ${CONDA_DEFAULT_ENV} \ + --file ./conda/environments/examples_cuda-121_arch-x86_64.yaml + python examples/digital_fingerprinting/fetch_example_data.py all ``` diff --git a/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md b/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md index b9a2e3a786..d60f64f19e 100644 --- a/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md +++ b/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md @@ -33,13 +33,10 @@ import os from morpheus.config import Config from morpheus.config import ConfigAutoEncoder -from morpheus.config import CppConfig from morpheus.cli.utils import get_package_relative_file from morpheus.utils.file_utils import load_labels_file ``` ```python -CppConfig.set_should_use_cpp(False) - config = Config() config.num_threads = len(os.sched_getaffinity(0)) config.ae = ConfigAutoEncoder() diff --git a/docs/source/developer_guide/guides/9_control_messages.md b/docs/source/developer_guide/guides/9_control_messages.md index 2be63e2081..f852f2cc86 100644 --- a/docs/source/developer_guide/guides/9_control_messages.md +++ b/docs/source/developer_guide/guides/9_control_messages.md @@ -32,13 +32,13 @@ Control Messages are straightforward objects that contain `tasks`, `metadata`, a Control Messages can handle tasks such as `training`, `inference`, and a catchall category `other`. Tasks can be added, checked for existence, or removed from the Control Message using methods like `add_task`, `has_task`, and `remove_task`. ```python -import morpheus._lib.messages as messages +from morpheus.messages import ControlMessage task_data = { "....": "...." } -msg = messages.ControlMessage() +msg = ControlMessage() msg.add_task("training", task_data) if msg.has_task("training"): task = msg.remove_task("training") @@ -49,9 +49,9 @@ if msg.has_task("training"): Metadata is a set of key-value pairs that offer supplementary information about the Control Message and must be JSON serializable. You can set, check, and retrieve metadata values using the `set_metadata`, `has_metadata`, and `get_metadata` methods, respectively. ```python -import morpheus._lib.messages as messages +from morpheus.messages import ControlMessage -msg = messages.ControlMessage() +msg = ControlMessage() msg.set_metadata("description", "This is a sample control message.") if msg.has_metadata("description"): description = msg.get_metadata("description") @@ -63,12 +63,13 @@ The payload of a Control Message is a Morpheus `MessageMeta` object that can car ```python import cudf -import morpheus._lib.messages as messages +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta data = cudf.DataFrame() # some data -msg_meta = messages.MessageMeta(data) -msg = messages.ControlMessage() +msg_meta = MessageMeta(data) +msg = ControlMessage() msg.payload(msg_meta) @@ -82,25 +83,18 @@ msg_meta == retrieved_payload # True **The `MultiMessage` type was deprecated in 24.06 and has been completely removed in version 24.10.** When upgrading to 24.10, all uses of `MultiMessage` need to be converted to `ControlMessage`. Each `MultiMessage` functionality has a corresponding equivalent in `ControlMessage`, as illustrated below. -```python -import cudf -from morpheus.messages import MultiMessage, ControlMessage - -data = cudf.DataFrame() -msg_meta = MessageMeta(data) -``` | **Functionality** | **MultiMessage** | **ControlMessage** | | -------------------------------------------------------------- | ------------------------------------------ | ------------------------------------------------------------------- | | Initialization | `multi_msg = MultiMessage(msg_meta)` | `control_msg = ControlMessage()`
`control_msg.payload(msg_meta)` | -| Get `cudf.DataFrame` | `multi_msg.get_meta()` | `control_msg.payload().get_data()` | -| Get columns from `cudf.DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` | -| Set columns values to `cudf.DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` | -| Get sliced `cudf.DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` | -| Copy the `cudf.DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` | +| Get `DataFrame` | `multi_msg.get_meta()` | `control_msg.payload().get_data()` | +| Get columns from `DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` | +| Set columns values to `DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` | +| Get sliced `DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` | +| Copy the `DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` | | | **MultiTensorMessage** | **ControlMessage** | -| Get the inference tensor `cupy.ndarray` | `multi_tensor_msg.tensor()` | `control_msg.tensors()` | +| Get the inference tensor `ndarray` | `multi_tensor_msg.tensor()` | `control_msg.tensors()` | | Get a specific inference tensor | `multi_tensor_msg.get_tensor(tensor_name)` | `control_msg.tensors().get_tensor(tensor_name)` | -Note that the `get_slice()` and `copy_ranges()` functions in `ControlMessage` return the `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance. +Note that in the `ControlMessage` column the `get_slice()` and `copy_ranges()` methods are being called on the `MessageMeta` payload and thus return a `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance. diff --git a/examples/abp_nvsmi_detection/README.md b/examples/abp_nvsmi_detection/README.md index eab83358e2..b29ad6bb84 100644 --- a/examples/abp_nvsmi_detection/README.md +++ b/examples/abp_nvsmi_detection/README.md @@ -61,7 +61,10 @@ In this example we will be using the `examples/data/nvsmi.jsonlines` dataset tha This example can be easily applied to datasets generated from your own NVIDIA GPU devices. If NetQ is not deployed in your environment, the `nvsmi_data_extract.py` script is provided which uses [pyNVML](https://pypi.org/project/nvidia-ml-py/) and [pandas](https://pandas.pydata.org/) to generate data similar to NetQ. `pyNVML` contains the Python bindings for NVIDIA Management Library (NVML), the same library used by `nvidia-smi`. -`pyNVML` and `pandas` come already installed on the Morpheus release and development Docker images. Otherwise, they will need to be installed before running the script. +pyNVML is not installed by default, use the following command to install it: +```bash +conda env update --solver=libmamba -n morpheus --file conda/environments/examples_cuda-121_arch-x86_64.yaml +``` Run the following to start generating your dataset: ``` diff --git a/examples/cpu_only/README.md b/examples/cpu_only/README.md new file mode 100644 index 0000000000..feac382a3f --- /dev/null +++ b/examples/cpu_only/README.md @@ -0,0 +1,72 @@ + + +# CPU Only Example Using Morpheus + +## Supported Environments +| Environment | Supported | Notes | +|-------------|-----------|-------| +| Conda | ✔ | | +| Morpheus Docker Container | ✔ | | +| Morpheus Release Container | ✔ | | +| Dev Container | ✔ | | + +## CPU Only Pipeline +This example demonstrates a simple Morpheus pipeline which is able to operate on a host without access GPU. + +> **Note**: A more complex example of a pipeline that can execute without a GPU is also available at `examples/llm/completion/README.md` + +From the root of the Morpheus repo, run: +```bash +python examples/cpu_only/run.py --help +``` + +Output: +``` +Usage: run.py [OPTIONS] + +Options: + --use_cpu_only Whether or not to run in CPU only mode, + setting this to True will disable C++ mode. + --log_level [CRITICAL|FATAL|ERROR|WARN|WARNING|INFO|DEBUG] + Specify the logging level to use. [default: + DEBUG] + --in_file PATH Input file [required] + --out_file FILE Output file [required] + --help Show this message and exit. +``` + +To launch the configured Morpheus pipeline with the sample data that is provided in `examples/data`, run the following: + +```bash +python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/out.jsonlines +``` + +### CLI Example + +From the root of the Morpheus repo, run: +```bash +morpheus --log_level INFO \ + run --use_cpu_only \ + pipeline-other \ + from-file --filename=examples/data/email.jsonlines \ + monitor --description "source" \ + deserialize \ + monitor --description "deserialize" \ + serialize \ + to-file --filename=.tmp/out.jsonlines --overwrite +``` diff --git a/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py b/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py index 9755f63765..cd71e83b63 100644 --- a/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py +++ b/examples/developer_guide/1_simple_python_stage/pass_thru_deco.py @@ -19,7 +19,7 @@ from morpheus.pipeline.stage_decorator import stage -@stage(execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU)) +@stage(name="pass-thru", execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU)) def pass_thru_stage(message: typing.Any) -> typing.Any: # Return the message for the next stage return message diff --git a/examples/developer_guide/2_2_rabbitmq/README.md b/examples/developer_guide/2_2_rabbitmq/README.md index 5b657b580f..db9465a31e 100644 --- a/examples/developer_guide/2_2_rabbitmq/README.md +++ b/examples/developer_guide/2_2_rabbitmq/README.md @@ -62,6 +62,7 @@ This will read JSON data from the `examples/data/email.jsonlines` file and publi The `write_simple.py` script will exit as soon as the message is written to the queue. The `read_simple.py` script will continue reading from the queue until explicitly shut down with a control-C. +> **Note**: Both the `read_simple.py` and `write_simple.py` scripts will launch independent Morpheus pipelines, both of which can optionally execute in CPU-only mode by setting the `--use_cpu_only` flag. ## Alternate Morpheus CLI usage In the above examples we defined the pipeline using the Python API in the `read_simple.py` and `write_simple.py` scripts. Alternately, we could have defined the same pipelines using the Morpheus CLI tool. diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/README.md b/examples/developer_guide/4_rabbitmq_cpp_stage/README.md index 988381e1c6..33db31f443 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/README.md +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/README.md @@ -18,8 +18,6 @@ limitations under the License. # Example RabbitMQ stages This example builds upon the `examples/developer_guide/2_2_rabbitmq` example adding a C++ implementation for the `RabbitMQSourceStage` along with adding package install scripts. -This example adds two flags to the `read_simple.py` script. A `--use_cpp` flag which defaults to `True` and a `--num_threads` flag which defaults to the number of cores on the system as returned by `len(os.sched_getaffinity(0))`. - ## Supported Environments | Environment | Supported | Notes | |-------------|-----------|-------| diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py index 752ee0fb01..a408ca0b49 100755 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py @@ -28,7 +28,6 @@ from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stage_schema import StageSchema -from morpheus.utils.type_utils import get_df_pkg logger = logging.getLogger(__name__) @@ -72,9 +71,6 @@ def __init__(self, self._poll_interval = pd.Timedelta(poll_interval) - # This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode - self._df_pkg = get_df_pkg(config.execution_mode) - @property def name(self) -> str: return "from-rabbitmq" @@ -122,7 +118,7 @@ def source_generator(self, subscription: mrc.Subscription): if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) - df = self._df_pkg.read_json(buffer, orient='records', lines=True) + df = pd.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index 3945eced97..c7206787a6 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -27,10 +27,10 @@ All environments require additional Conda packages which can be installed with e ## Requirements -Prior to running the GNN fraud detection pipeline, additional requirements must be installed in to your Conda environment. A supplemental requirements file has been provided in this example directory. +Prior to running the GNN fraud detection pipeline, additional requirements must be installed in to your Conda environment. ```bash -mamba env update \ +conda env update --solver=libmamba \ -n ${CONDA_DEFAULT_ENV} \ --file ./conda/environments/examples_cuda-125_arch-x86_64.yaml ``` @@ -117,7 +117,7 @@ From the root of the Morpheus repo, run: PYTHONPATH="examples" \ morpheus --log_level INFO \ --plugin "gnn_fraud_detection_pipeline" \ - run --use_cpp False --pipeline_batch_size 1024 --model_max_batch_size 32 --edge_buffer_size 4 \ + run --pipeline_batch_size 1024 --model_max_batch_size 32 --edge_buffer_size 4 \ pipeline-other --model_fea_length 70 --label=probs \ from-file --filename examples/gnn_fraud_detection_pipeline/validation.csv --filter_null False \ deserialize \ diff --git a/examples/llm/agents/README.md b/examples/llm/agents/README.md index 2721452a93..bb5c1d9bc3 100644 --- a/examples/llm/agents/README.md +++ b/examples/llm/agents/README.md @@ -104,7 +104,7 @@ export SERPAPI_API_KEY="" Install the required dependencies. ```bash -mamba env update \ +conda env update --solver=libmamba \ -n ${CONDA_DEFAULT_ENV} \ --file ./conda/environments/examples_cuda-125_arch-x86_64.yaml ``` @@ -131,6 +131,10 @@ python examples/llm/main.py agents simple [OPTIONS] ``` ### Options: +- `--use_cpu_only` + - **Description**: Run in CPU only mode + - **Default**: `False` + - `--num_threads INTEGER RANGE` - **Description**: Number of internal pipeline threads to use. - **Default**: `12` diff --git a/examples/llm/agents/run.py b/examples/llm/agents/run.py index a97bce4b78..60d85eac84 100644 --- a/examples/llm/agents/run.py +++ b/examples/llm/agents/run.py @@ -25,7 +25,7 @@ def run(): @run.command(help="Runs a simple finite pipeline with a single execution of a LangChain agent from a fixed input") -@click.option('--use_cpu_only', default=False, type=bool, is_flag=True, help=("Whether or not to run in CPU only mode")) +@click.option('--use_cpu_only', default=False, type=bool, is_flag=True, help="Run in CPU only mode") @click.option( "--num_threads", default=len(os.sched_getaffinity(0)), diff --git a/examples/llm/completion/README.md b/examples/llm/completion/README.md index e72ffe1ce6..562e1a1020 100644 --- a/examples/llm/completion/README.md +++ b/examples/llm/completion/README.md @@ -78,7 +78,7 @@ Before running the pipeline, ensure that the `NGC_API_KEY` environment variable Install the required dependencies. ```bash -mamba env update \ +conda env update --solver=libmamba \ -n ${CONDA_DEFAULT_ENV} \ --file ./conda/environments/examples_cuda-125_arch-x86_64.yaml ``` @@ -114,6 +114,9 @@ python examples/llm/main.py completion [OPTIONS] COMMAND [ARGS]... - `pipeline` ##### Options: +- `--use_cpu_only` + - **Description**: Run in CPU only mode + - **Default**: `False` - `--num_threads INTEGER RANGE` - **Description**: Number of internal pipeline threads to use. diff --git a/examples/llm/completion/run.py b/examples/llm/completion/run.py index 4ba702c700..ed2e8a6c3d 100644 --- a/examples/llm/completion/run.py +++ b/examples/llm/completion/run.py @@ -26,7 +26,7 @@ def run(): @run.command() -@click.option('--use_cpu_only', default=False, type=bool, is_flag=True, help=("Whether or not to run in CPU only mode")) +@click.option('--use_cpu_only', default=False, type=bool, is_flag=True, help="Run in CPU only mode") @click.option( "--num_threads", default=len(os.sched_getaffinity(0)), diff --git a/examples/ransomware_detection/README.md b/examples/ransomware_detection/README.md index 0619af26ec..4b15a30b71 100644 --- a/examples/ransomware_detection/README.md +++ b/examples/ransomware_detection/README.md @@ -88,7 +88,6 @@ Usage: run.py [OPTIONS] Options: --debug BOOLEAN - --use_cpp BOOLEAN --num_threads INTEGER RANGE Number of internal pipeline threads to use [x>=1] --n_dask_workers INTEGER RANGE Number of dask workers [x>=2] diff --git a/examples/root_cause_analysis/README.md b/examples/root_cause_analysis/README.md index 5d038fa959..943c00fad2 100644 --- a/examples/root_cause_analysis/README.md +++ b/examples/root_cause_analysis/README.md @@ -105,8 +105,8 @@ From the Morpheus repo root directory, run: ```bash export MORPHEUS_ROOT=$(pwd) morpheus --log_level=DEBUG \ -`# Run a pipeline with 5 threads and a model batch size of 32 (Must match Triton config)` \ -run --num_threads=8 --edge_buffer_size=4 --use_cpp=True --pipeline_batch_size=1024 --model_max_batch_size=32 \ +`# Run a pipeline with 8 threads and a model batch size of 32 (Must match Triton config)` \ +run --num_threads=8 --edge_buffer_size=4 --pipeline_batch_size=1024 --model_max_batch_size=32 \ `# Specify a NLP pipeline with 128 sequence length (Must match Triton config)` \ pipeline-nlp --model_seq_length=128 --label=not_root_cause --label=is_root_cause \ `# 1st Stage: Read from file` \ diff --git a/python/morpheus/morpheus/io/deserializers.py b/python/morpheus/morpheus/io/deserializers.py index 34703ac50e..15867664f7 100644 --- a/python/morpheus/morpheus/io/deserializers.py +++ b/python/morpheus/morpheus/io/deserializers.py @@ -107,7 +107,7 @@ def read_file_to_df(file_name: typing.Union[str, io.IOBase], Whether to filter null rows after loading, by default True. filter_null_columns : list[str]|str, default = 'data' Column or columns to filter null values from. Ignored when `filter_null` is False. - df_type : typing.Literal[, optional + df_type : typing.Literal["cudf", "pandas"], optional What type of parser to use. Options are 'cudf' and 'pandas', by default "pandas". Returns diff --git a/python/morpheus/morpheus/utils/type_aliases.py b/python/morpheus/morpheus/utils/type_aliases.py index 5e977918c7..0028c076fe 100644 --- a/python/morpheus/morpheus/utils/type_aliases.py +++ b/python/morpheus/morpheus/utils/type_aliases.py @@ -23,8 +23,18 @@ import cudf DataFrameModule = typing.Literal["cudf", "pandas"] +"""Valid DataFrame modules.""" + DataFrameType = typing.Union["pandas.DataFrame", "cudf.DataFrame"] +"""Alias for pandas and cuDF DataFrame types.""" + SeriesType = typing.Union["pandas.Series", "cudf.Series"] +"""Alias for pandas and cuDF Series types.""" NDArrayType = typing.Union["numpy.ndarray", "cupy.ndarray"] -TensorMapType = dict[str, NDArrayType] +"""Alias for NumPy and CuPy ndarray types.""" + +# Intentionally using `typing.Dict` instead of `dict` to avoid a Sphinx build error. +# https://github.com/nv-morpheus/Morpheus/issues/1956 +TensorMapType = typing.Dict[str, NDArrayType] +"""Alias for a dictionary of tensor names to tensors represented as either a NumPy or CuPy ndarray.""" diff --git a/python/morpheus/morpheus/utils/type_utils.py b/python/morpheus/morpheus/utils/type_utils.py index 7dd629b687..95c870e30f 100644 --- a/python/morpheus/morpheus/utils/type_utils.py +++ b/python/morpheus/morpheus/utils/type_utils.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Utility functions for working with types.""" import inspect import types @@ -175,6 +176,16 @@ def get_full_qualname(klass: type) -> str: def df_type_str_to_exec_mode(df_type_str: DataFrameModule) -> ExecutionMode: """ Return the appropriate execution mode based on the DataFrame type string. + + Parameters + ---------- + df_type_str : `morpheus.utils.type_aliases.DataFrameModule` + The DataFrame type string. + + Returns + ------- + `morpheus.config.ExecutionMode` + The associated execution mode based on the DataFrame type string. """ if df_type_str == "cudf": return ExecutionMode.GPU @@ -186,6 +197,19 @@ def df_type_str_to_exec_mode(df_type_str: DataFrameModule) -> ExecutionMode: def exec_mode_to_df_type_str(execution_mode: ExecutionMode) -> DataFrameModule: + """ + Return the appropriate DataFrame type string based on the execution mode. + + Parameters + ---------- + execution_mode : `morpheus.config.ExecutionMode` + The execution mode. + + Returns + ------- + `morpheus.utils.type_aliases.DataFrameModule` + The associated DataFrame type string based on the execution mode. + """ if execution_mode == ExecutionMode.GPU: return "cudf" @@ -193,6 +217,14 @@ def exec_mode_to_df_type_str(execution_mode: ExecutionMode) -> DataFrameModule: def cpp_mode_to_exec_mode() -> ExecutionMode: + """ + Return the execution mode based on the configuration of the global `morpheus.config.CppConfig` singleton. + + Returns + ------- + `morpheus.config.ExecutionMode` + The execution mode. + """ if CppConfig.get_should_use_cpp(): return ExecutionMode.GPU return ExecutionMode.CPU @@ -200,7 +232,17 @@ def cpp_mode_to_exec_mode() -> ExecutionMode: def df_type_str_to_pkg(df_type_str: DataFrameModule) -> types.ModuleType: """ - Return the appropriate DataFrame package based on the DataFrame type string. + Import and return the appropriate DataFrame package based on the DataFrame type string. + + Parameters + ---------- + df_type_str : `morpheus.utils.type_aliases.DataFrameModule` + The DataFrame type string. + + Returns + ------- + `types.ModuleType` + The associated DataFrame package based on the DataFrame type string. """ if df_type_str == "cudf": import cudf @@ -224,7 +266,28 @@ def get_df_pkg(selector: ExecutionMode = None) -> types.ModuleType: def get_df_pkg(selector: ExecutionMode | DataFrameModule = None) -> types.ModuleType: """ - Return the appropriate DataFrame package based on the execution mode. + Return the appropriate DataFrame package based on `selector` which can be either an `ExecutionMode` instance, a + DataFrame type string, or `None`. + + When `None` the execution mode is determined by the global `morpheus.config.CppConfig` singleton. + + This method is best used within code which needs to operate in both CPU and GPU modes, where simply importing `cudf` + would cause an import error if the user is not using a GPU. + Example usage:: + + from morpheus.utils.type_utils import get_df_pkg + df_pkg = get_df_pkg() + ser = df_pkg.Series([1,2,3]) + + Parameters + ---------- + selector : `morpheus.utils.type_aliases.DataFrameModule` | `morpheus.config.ExecutionMode` | None, optional + The selector to determine the DataFrame package, by default None. + + Returns + ------- + `types.ModuleType` + The associated DataFrame package based on the selector. """ if selector is None: execution_mode = cpp_mode_to_exec_mode() @@ -252,7 +315,26 @@ def get_df_class(selector: ExecutionMode = None) -> type[DataFrameType]: def get_df_class(selector: ExecutionMode | DataFrameModule = None) -> type[DataFrameType]: """ - Return the appropriate DataFrame class based on the execution mode. + Return the appropriate DataFrame `selector` which can be either an `ExecutionMode` instance, a + DataFrame type string, or `None`. + + When `None` the execution mode is determined by the global `morpheus.config.CppConfig` singleton. + + This method is best used within code which needs to construct a DataFrame in both CPU and GPU modes. + Example usage:: + + from morpheus.utils.type_utils import get_df_class + df_class = get_df_class() + df = df_class({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}) + + Parameters + ---------- + selector : `morpheus.utils.type_aliases.DataFrameModule` | `morpheus.config.ExecutionMode` | None, optional + The selector to determine the DataFrame class, by default None. + + Returns + ------- + `type[DataFrameType]` """ df_pkg = get_df_pkg(selector) return df_pkg.DataFrame @@ -261,13 +343,33 @@ def get_df_class(selector: ExecutionMode | DataFrameModule = None) -> type[DataF def is_cudf_type(obj: typing.Any) -> bool: """ Check if a given object (DataFrame, Series, RangeIndex etc...) is a cuDF type. + + Parameters + ---------- + obj : `typing.Any` + The object to check. + + Returns + ------- + `bool` + `True` if the object is a cuDF type, `False` otherwise. """ return "cudf" in str(type(obj)) def get_df_pkg_from_obj(obj: typing.Any) -> types.ModuleType: """ - Return the appropriate DataFrame package based on the DataFrame object. + Return the appropriate DataFrame package based on a given object (DataFrame, Series, RangeIndex etc...). + + Parameters + ---------- + obj : `typing.Any` + The object to check. + + Returns + ------- + `types.ModuleType` + The associated DataFrame package based on the object. """ if is_cudf_type(obj): import cudf @@ -279,6 +381,16 @@ def get_df_pkg_from_obj(obj: typing.Any) -> types.ModuleType: def is_dataframe(obj: typing.Any) -> bool: """ Check if a given object is a pandas or cudf DataFrame. + + Parameters + ---------- + obj : `typing.Any` + The object to check. + + Returns + ------- + `bool` + `True` if the object is a DataFrame, `False` otherwise. """ df_pkg = get_df_pkg_from_obj(obj) return isinstance(obj, df_pkg.DataFrame) @@ -287,6 +399,18 @@ def is_dataframe(obj: typing.Any) -> bool: def get_array_pkg(execution_mode: ExecutionMode = None) -> types.ModuleType: """ Return the appropriate array package (CuPy for GPU, NumPy for CPU) based on the execution mode. + + When `None` the execution mode is determined by the global `morpheus.config.CppConfig` singleton. + + Parameters + ---------- + execution_mode : `morpheus.config.ExecutionMode`, optional + The execution mode, by default `None`. + + Returns + ------- + `types.ModuleType` + The associated array package based on the execution mode. """ if execution_mode is None: execution_mode = cpp_mode_to_exec_mode() From 5a2d71fa43de0a79cbeb408ec7fc4db21e6e60be Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Fri, 18 Oct 2024 18:16:53 -0700 Subject: [PATCH 2/2] Provide a timeout to the queue.get call in `HttpServerSourceStage` to avoid spinlocking (#1928) * Only impacts the Python impl of this stage Closes [#1910](https://github.com/nv-morpheus/Morpheus/issues/1910) ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1928 --- .../morpheus/morpheus/stages/input/http_server_source_stage.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/morpheus/morpheus/stages/input/http_server_source_stage.py b/python/morpheus/morpheus/stages/input/http_server_source_stage.py index 0459a458e4..c286c2dcd2 100644 --- a/python/morpheus/morpheus/stages/input/http_server_source_stage.py +++ b/python/morpheus/morpheus/stages/input/http_server_source_stage.py @@ -248,7 +248,8 @@ def _generate_frames(self, subscription: mrc.Subscription) -> typing.Iterator[Me # shutdown since we already returned an OK response to the client. df = None try: - df = self._queue.get(block=False) + # Intentionally not using self._queue_timeout here since that value is rather high + df = self._queue.get(block=False, timeout=0.1) self._queue_size -= 1 except queue.Empty: if (not self._http_server.is_running() or self.is_stop_requested()