-
Notifications
You must be signed in to change notification settings - Fork 18
Test Engine Worker
The Test Engine Worker (TE Worker) is responsible for processing Test Run jobs that are placed into the Task Queue. These workers are designed to execute automated test cases based on predefined configurations and input data. The primary purpose of the Test Engine Worker is to ensure the automated execution of tests, facilitating continuous testing, validation, and verification of ai models.
graph LR
APIGW[API Gateway] -->|Add Task| TaskQueue
TaskQueue -->|Task| TE1[Test Engine Worker 1]
TaskQueue -->|Task| TE2[Test Engine Worker 2]
The Task Queue serves as a centralized system for managing and distributing test execution jobs to available Test Engine Workers. Its primary purpose is to decouple job submission from execution, ensuring efficient workload distribution, scalability, and fault tolerance. The Task Queue helps in balancing the load, enabling multiple workers to process jobs concurrently, and ensuring test jobs are executed in a reliable and orderly manner.
The Task Queue is implemented using the Valkey, which provides a scalable and persistent stream-based messaging system.
The API Gateway (API-GW) sends Test Run task to Test Engine Workers through a message queue using XADD command. The stream name is called TestRun. TestRun tasks are represented by JSON objects and should be serialized into JSON string when added to Message Queue.
XADD TestRun * task <TestRun Object in JSON string>
The JSON schema of the TestRun object is as follows:
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "/aiverify.taskqueue.testrun.schema.json",
"title": "Test Run Task Schema",
"description": "Schema for Test Run tasks",
"type": "object",
"properties": {
"id": {
"description": "A generated unique identifier that is used to uniquely identify the task in the task queue.",
"type": "string"
},
"mode": {
"description": "Whether run model natively or use API based testing",
"type": "string",
"enum": [
"upload",
"api"
]
},
"algorithmId": {
"description": "Algorithm ID of the algorithm to run",
"type": "string"
},
"algorithmHash": {
"description": "Hash of the algorithm zip download",
"type": "string"
},
"testDataset": {
"description": "Data path to test dataset with labels",
"type": "string"
},
"testDatasetHash": {
"description": "Hash of the test dataset file or zip",
"type": "string"
},
"groundTruthDataset": {
"description": "Path to dataset with ground truth",
"type": "string"
},
"groundTruthDatasetHash": {
"description": "Hash of the ground truth dataset file or zip",
"type": "string"
},
"groundTruth": {
"description": "Name of column containing the labels",
"type": "string"
},
"modelFile": {
"description": "Model path to the saved offline model",
"type": "string"
},
"modelFileHash": {
"description": "Hash of the model file or zip",
"type": "string"
},
"apiSchema": {
"description": "Openapi V3 describing the API"
},
"apiConfig": {
"description": "Config file for API",
},
"modelType": {
"description": "The type of model",
"type": "string",
"enum": [
"classification",
"regression",
"uplift"
]
},
"algorithmArgs": {
"description": "JSON object in the format as described in the algorithm input schema."
}
},
"required": [
"id",
"mode",
"testDataset",
"testDatasetHash",
"algorithmId",
"algorithmArgs"
],
"if": {
"properties": {
"mode": {
"const": "upload"
}
}
},
"then": {
"required": [
"modelFile",
"modelFileHash"
]
},
"else": {
"required": [
"apiSchema",
"apiConfig"
]
}
}The Test Engine Workers reads jobs from the Task Queue using the XREADGROUP command in Valkey Streams, which allows workers to consume messages as part of a consumer group. This ensures that jobs are distributed among multiple workers while maintaining processing order and avoiding duplication. Each worker reads pending jobs from the queue, processes them, and acknowledges completion, enabling fault tolerance by allowing job reprocessing in case of worker failures.
The test execution consists of four main steps:
- Download algorithm and all required model and data files.
- Build the algorithm.
- Execute the test
- Post successful result to the API Gateway.
The test execution process consists of four main steps that are handled sequentially by the Test Engine Worker, ensuring a structured and automated workflow:
-
Download Algorithm and Required Files:
- The worker downloads the necessary test algorithm, model files, and data files from API Gateway.
-
Build the Algorithm:
- The algorithm is built using one of the available approaches to set up the test environment. The build module supports multiple approaches and is implemented as a loadable module, allowing flexibility based on the user environment.
-
Supported build options:
-
venv(default): Uses Python's virtual environment for dependency management. -
docker: Creates an isolated containerized environment for building.
-
-
Execute the Test:
- The test is executed using the selected execution approach. Similar to the build process, execution is handled by a loadable module, dynamically determined based on the environment configuration.
-
Supported execution options:
-
python(default): Runs the test directly using the Python interpreter. -
docker run: Executes the test within a Docker container for isolation. -
kubectl exec: Runs the test inside a Kubernetes pod, enabling cloud-native execution.
-
-
Post Successful Result to API Gateway:
- Once the test execution completes successfully, the results are posted to the API Gateway, ensuring that they are accessible for further processing, reporting, and integration with other systems.
The build and execution modules are designed as loadable modules, meaning they are dynamically loaded during startup based on environment configuration. This design allows for greater flexibility, enabling the system to adapt to different environments and test scenarios without code changes. The module to be used for both the build and execution phases is defined via environment variables, making it easy to switch between different approaches based on the test requirements.
Users can also develop their own custom build and execution modules to suit specific needs. These custom modules can be integrated seamlessly into the system by following the loadable module structure and specifying them via environment variables.
This approach provides a scalable and modular solution, allowing test workflows to run in diverse environments, from local development machines to cloud-based Kubernetes clusters.
The sequence diagram below shows the interactions between the Task Queue, Test Engine Worker and the API Gateway for Test Execution.
sequenceDiagram
participant TaskQueue as Task Queue
participant TEWorker as Test Engine Worker
participant APIGW as API Gateway
TaskQueue->>TEWorker: TaskRun Task
TEWorker->>APIGW: Download Algorithm Zip
APIGW-->>TEWorker: Algorithm Zip
TEWorker->>APIGW: Download Model File
APIGW-->>TEWorker: Model File
TEWorker->>APIGW: Download Dataset File
APIGW-->>TEWorker: Dataset File
TEWorker->>TEWorker: Execute Test
TEWorker->>APIGW: Upload Test Results
APIGW->>TEWorker: Status
All downloads from the API-GW should check whether the item to download already exist in the Test Engine Worker File Cache and match the version given in the Task object. If not, it download the required file or zip from the API Gateway and save it in the File Cache together with the hash.