|
| 1 | +# Metrics Documentation |
| 2 | + |
| 3 | +The Conductor Python SDK includes built-in metrics collection using Prometheus to monitor worker performance, API requests, and task execution. |
| 4 | + |
| 5 | +## Table of Contents |
| 6 | + |
| 7 | +- [Quick Reference](#quick-reference) |
| 8 | +- [Configuration](#configuration) |
| 9 | +- [Metric Types](#metric-types) |
| 10 | +- [Examples](#examples) |
| 11 | + |
| 12 | +## Quick Reference |
| 13 | + |
| 14 | +| Metric Name | Type | Labels | Description | |
| 15 | +|------------|------|--------|-------------| |
| 16 | +| `api_request_time_seconds` | Timer (quantile gauge) | `method`, `uri`, `status`, `quantile` | API request latency to Conductor server | |
| 17 | +| `api_request_time_seconds_count` | Gauge | `method`, `uri`, `status` | Total number of API requests | |
| 18 | +| `api_request_time_seconds_sum` | Gauge | `method`, `uri`, `status` | Total time spent in API requests | |
| 19 | +| `task_poll_total` | Counter | `taskType` | Number of task poll attempts | |
| 20 | +| `task_poll_time` | Gauge | `taskType` | Most recent poll duration (legacy) | |
| 21 | +| `task_poll_time_seconds` | Timer (quantile gauge) | `taskType`, `status`, `quantile` | Task poll latency distribution | |
| 22 | +| `task_poll_time_seconds_count` | Gauge | `taskType`, `status` | Total number of poll attempts by status | |
| 23 | +| `task_poll_time_seconds_sum` | Gauge | `taskType`, `status` | Total time spent polling | |
| 24 | +| `task_execute_time` | Gauge | `taskType` | Most recent execution duration (legacy) | |
| 25 | +| `task_execute_time_seconds` | Timer (quantile gauge) | `taskType`, `status`, `quantile` | Task execution latency distribution | |
| 26 | +| `task_execute_time_seconds_count` | Gauge | `taskType`, `status` | Total number of task executions by status | |
| 27 | +| `task_execute_time_seconds_sum` | Gauge | `taskType`, `status` | Total time spent executing tasks | |
| 28 | +| `task_execute_error_total` | Counter | `taskType`, `exception` | Number of task execution errors | |
| 29 | +| `task_update_time_seconds` | Timer (quantile gauge) | `taskType`, `status`, `quantile` | Task update latency distribution | |
| 30 | +| `task_update_time_seconds_count` | Gauge | `taskType`, `status` | Total number of task updates by status | |
| 31 | +| `task_update_time_seconds_sum` | Gauge | `taskType`, `status` | Total time spent updating tasks | |
| 32 | +| `task_update_error_total` | Counter | `taskType`, `exception` | Number of task update errors | |
| 33 | +| `task_result_size` | Gauge | `taskType` | Size of task result payload (bytes) | |
| 34 | +| `task_execution_queue_full_total` | Counter | `taskType` | Number of times execution queue was full | |
| 35 | +| `task_paused_total` | Counter | `taskType` | Number of polls while worker paused | |
| 36 | +| `external_payload_used_total` | Counter | `taskType`, `payloadType` | External payload storage usage count | |
| 37 | +| `workflow_input_size` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) | |
| 38 | +| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Workflow start error count | |
| 39 | + |
| 40 | +### Label Values |
| 41 | + |
| 42 | +**`status`**: `SUCCESS`, `FAILURE` |
| 43 | +**`method`**: `GET`, `POST`, `PUT`, `DELETE` |
| 44 | +**`uri`**: API endpoint path (e.g., `/tasks/poll/batch/{taskType}`, `/tasks/update-v2`) |
| 45 | +**`status` (HTTP)**: HTTP response code (`200`, `401`, `404`, `500`) or `error` |
| 46 | +**`quantile`**: `0.5` (p50), `0.75` (p75), `0.9` (p90), `0.95` (p95), `0.99` (p99) |
| 47 | +**`payloadType`**: `input`, `output` |
| 48 | +**`exception`**: Exception type or error message |
| 49 | + |
| 50 | +### Example Metrics Output |
| 51 | + |
| 52 | +```prometheus |
| 53 | +# API Request Metrics |
| 54 | +api_request_time_seconds{method="GET",uri="/tasks/poll/batch/myTask",status="200",quantile="0.5"} 0.112 |
| 55 | +api_request_time_seconds{method="GET",uri="/tasks/poll/batch/myTask",status="200",quantile="0.99"} 0.245 |
| 56 | +api_request_time_seconds_count{method="GET",uri="/tasks/poll/batch/myTask",status="200"} 1000.0 |
| 57 | +api_request_time_seconds_sum{method="GET",uri="/tasks/poll/batch/myTask",status="200"} 114.5 |
| 58 | +
|
| 59 | +# Task Poll Metrics |
| 60 | +task_poll_total{taskType="myTask"} 10264.0 |
| 61 | +task_poll_time_seconds{taskType="myTask",status="SUCCESS",quantile="0.95"} 0.025 |
| 62 | +task_poll_time_seconds_count{taskType="myTask",status="SUCCESS"} 1000.0 |
| 63 | +task_poll_time_seconds_count{taskType="myTask",status="FAILURE"} 95.0 |
| 64 | +
|
| 65 | +# Task Execution Metrics |
| 66 | +task_execute_time_seconds{taskType="myTask",status="SUCCESS",quantile="0.99"} 0.017 |
| 67 | +task_execute_time_seconds_count{taskType="myTask",status="SUCCESS"} 120.0 |
| 68 | +task_execute_error_total{taskType="myTask",exception="TimeoutError"} 3.0 |
| 69 | +
|
| 70 | +# Task Update Metrics |
| 71 | +task_update_time_seconds{taskType="myTask",status="SUCCESS",quantile="0.95"} 0.096 |
| 72 | +task_update_time_seconds_count{taskType="myTask",status="SUCCESS"} 15.0 |
| 73 | +``` |
| 74 | + |
| 75 | +## Configuration |
| 76 | + |
| 77 | +### Enabling Metrics |
| 78 | + |
| 79 | +Metrics are enabled by providing a `MetricsSettings` object when creating a `TaskHandler`: |
| 80 | + |
| 81 | +```python |
| 82 | +from conductor.client.configuration.configuration import Configuration |
| 83 | +from conductor.client.configuration.settings.metrics_settings import MetricsSettings |
| 84 | +from conductor.client.automator.task_handler import TaskHandler |
| 85 | + |
| 86 | +# Configure metrics |
| 87 | +metrics_settings = MetricsSettings( |
| 88 | + directory='/path/to/metrics', # Directory where metrics file will be written |
| 89 | + file_name='conductor_metrics.prom', # Metrics file name (default: 'conductor_metrics.prom') |
| 90 | + update_interval=10 # Update interval in seconds (default: 10) |
| 91 | +) |
| 92 | + |
| 93 | +# Configure Conductor connection |
| 94 | +api_config = Configuration( |
| 95 | + server_api_url='http://localhost:8080/api', |
| 96 | + debug=False |
| 97 | +) |
| 98 | + |
| 99 | +# Create task handler with metrics |
| 100 | +with TaskHandler( |
| 101 | + configuration=api_config, |
| 102 | + metrics_settings=metrics_settings, |
| 103 | + workers=[...] |
| 104 | +) as task_handler: |
| 105 | + task_handler.start_processes() |
| 106 | +``` |
| 107 | + |
| 108 | +### AsyncIO Workers |
| 109 | + |
| 110 | +For AsyncIO-based workers: |
| 111 | + |
| 112 | +```python |
| 113 | +from conductor.client.automator.task_handler_asyncio import TaskHandlerAsyncIO |
| 114 | + |
| 115 | +async with TaskHandlerAsyncIO( |
| 116 | + configuration=api_config, |
| 117 | + metrics_settings=metrics_settings, |
| 118 | + scan_for_annotated_workers=True, |
| 119 | + import_modules=['your_module'] |
| 120 | +) as task_handler: |
| 121 | + await task_handler.start() |
| 122 | +``` |
| 123 | + |
| 124 | +### Metrics File Cleanup |
| 125 | + |
| 126 | +For multiprocess workers using Prometheus multiprocess mode, clean the metrics directory on startup to avoid stale data: |
| 127 | + |
| 128 | +```python |
| 129 | +import os |
| 130 | +import shutil |
| 131 | + |
| 132 | +metrics_dir = '/path/to/metrics' |
| 133 | +if os.path.exists(metrics_dir): |
| 134 | + shutil.rmtree(metrics_dir) |
| 135 | +os.makedirs(metrics_dir, exist_ok=True) |
| 136 | + |
| 137 | +metrics_settings = MetricsSettings( |
| 138 | + directory=metrics_dir, |
| 139 | + file_name='conductor_metrics.prom', |
| 140 | + update_interval=10 |
| 141 | +) |
| 142 | +``` |
| 143 | + |
| 144 | + |
| 145 | +## Metric Types |
| 146 | + |
| 147 | +### Quantile Gauges (Timers) |
| 148 | + |
| 149 | +All timing metrics use quantile gauges to track latency distribution: |
| 150 | + |
| 151 | +- **Quantile labels**: Each metric includes 5 quantiles (p50, p75, p90, p95, p99) |
| 152 | +- **Count suffix**: `{metric_name}_count` tracks total number of observations |
| 153 | +- **Sum suffix**: `{metric_name}_sum` tracks total time spent |
| 154 | + |
| 155 | +**Example calculation (average):** |
| 156 | +``` |
| 157 | +average = task_poll_time_seconds_sum / task_poll_time_seconds_count |
| 158 | +average = 18.75 / 1000.0 = 0.01875 seconds |
| 159 | +``` |
| 160 | + |
| 161 | +**Why quantiles instead of histograms?** |
| 162 | +- More accurate percentile tracking with sliding window (last 1000 observations) |
| 163 | +- No need to pre-configure bucket boundaries |
| 164 | +- Lower memory footprint |
| 165 | +- Direct percentile values without interpolation |
| 166 | + |
| 167 | +### Sliding Window |
| 168 | + |
| 169 | +Quantile metrics use a sliding window of the last 1000 observations to calculate percentiles. This provides: |
| 170 | +- Recent performance data (not cumulative) |
| 171 | +- Accurate percentile estimation |
| 172 | +- Bounded memory usage |
| 173 | + |
| 174 | +## Examples |
| 175 | + |
| 176 | +### Querying Metrics with PromQL |
| 177 | + |
| 178 | +**Average API request latency:** |
| 179 | +```promql |
| 180 | +rate(api_request_time_seconds_sum[5m]) / rate(api_request_time_seconds_count[5m]) |
| 181 | +``` |
| 182 | + |
| 183 | +**API error rate:** |
| 184 | +```promql |
| 185 | +sum(rate(api_request_time_seconds_count{status=~"4..|5.."}[5m])) |
| 186 | +/ |
| 187 | +sum(rate(api_request_time_seconds_count[5m])) |
| 188 | +``` |
| 189 | + |
| 190 | +**Task poll success rate:** |
| 191 | +```promql |
| 192 | +sum(rate(task_poll_time_seconds_count{status="SUCCESS"}[5m])) |
| 193 | +/ |
| 194 | +sum(rate(task_poll_time_seconds_count[5m])) |
| 195 | +``` |
| 196 | + |
| 197 | +**p95 task execution time:** |
| 198 | +```promql |
| 199 | +task_execute_time_seconds{quantile="0.95"} |
| 200 | +``` |
| 201 | + |
| 202 | +**Slowest API endpoints (p99):** |
| 203 | +```promql |
| 204 | +topk(10, api_request_time_seconds{quantile="0.99"}) |
| 205 | +``` |
| 206 | + |
| 207 | +### Complete Example |
| 208 | + |
| 209 | +```python |
| 210 | +import os |
| 211 | +import shutil |
| 212 | +from conductor.client.configuration.configuration import Configuration |
| 213 | +from conductor.client.configuration.settings.metrics_settings import MetricsSettings |
| 214 | +from conductor.client.automator.task_handler import TaskHandler |
| 215 | +from conductor.client.worker.worker_interface import WorkerInterface |
| 216 | + |
| 217 | +# Clean metrics directory |
| 218 | +metrics_dir = os.path.join(os.path.expanduser('~'), 'conductor_metrics') |
| 219 | +if os.path.exists(metrics_dir): |
| 220 | + shutil.rmtree(metrics_dir) |
| 221 | +os.makedirs(metrics_dir, exist_ok=True) |
| 222 | + |
| 223 | +# Configure metrics |
| 224 | +metrics_settings = MetricsSettings( |
| 225 | + directory=metrics_dir, |
| 226 | + file_name='conductor_metrics.prom', |
| 227 | + update_interval=10 # Update file every 10 seconds |
| 228 | +) |
| 229 | + |
| 230 | +# Configure Conductor |
| 231 | +api_config = Configuration( |
| 232 | + server_api_url='http://localhost:8080/api', |
| 233 | + debug=False |
| 234 | +) |
| 235 | + |
| 236 | +# Define worker |
| 237 | +class MyWorker(WorkerInterface): |
| 238 | + def execute(self, task): |
| 239 | + return {'status': 'completed'} |
| 240 | + |
| 241 | + def get_task_definition_name(self): |
| 242 | + return 'my_task' |
| 243 | + |
| 244 | +# Start with metrics |
| 245 | +with TaskHandler( |
| 246 | + configuration=api_config, |
| 247 | + metrics_settings=metrics_settings, |
| 248 | + workers=[MyWorker()] |
| 249 | +) as task_handler: |
| 250 | + task_handler.start_processes() |
| 251 | +``` |
| 252 | + |
| 253 | +### Scraping with Prometheus |
| 254 | + |
| 255 | +Configure Prometheus to scrape the metrics file: |
| 256 | + |
| 257 | +```yaml |
| 258 | +# prometheus.yml |
| 259 | +scrape_configs: |
| 260 | + - job_name: 'conductor-python-sdk' |
| 261 | + static_configs: |
| 262 | + - targets: ['localhost:8000'] # Use file_sd or custom exporter |
| 263 | + metric_relabel_configs: |
| 264 | + - source_labels: [taskType] |
| 265 | + target_label: task_type |
| 266 | +``` |
| 267 | +
|
| 268 | +**Note:** Since metrics are written to a file, you'll need to either: |
| 269 | +1. Use Prometheus's `textfile` collector with Node Exporter |
| 270 | +2. Create a simple HTTP server to expose the metrics file |
| 271 | +3. Use a custom exporter to read and serve the file |
| 272 | + |
| 273 | +### Example HTTP Metrics Server |
| 274 | + |
| 275 | +```python |
| 276 | +from http.server import HTTPServer, SimpleHTTPRequestHandler |
| 277 | +import os |
| 278 | +
|
| 279 | +class MetricsHandler(SimpleHTTPRequestHandler): |
| 280 | + def do_GET(self): |
| 281 | + if self.path == '/metrics': |
| 282 | + metrics_file = '/path/to/conductor_metrics.prom' |
| 283 | + if os.path.exists(metrics_file): |
| 284 | + with open(metrics_file, 'rb') as f: |
| 285 | + content = f.read() |
| 286 | + self.send_response(200) |
| 287 | + self.send_header('Content-Type', 'text/plain; version=0.0.4') |
| 288 | + self.end_headers() |
| 289 | + self.wfile.write(content) |
| 290 | + else: |
| 291 | + self.send_response(404) |
| 292 | + self.end_headers() |
| 293 | + else: |
| 294 | + self.send_response(404) |
| 295 | + self.end_headers() |
| 296 | +
|
| 297 | +# Run server |
| 298 | +httpd = HTTPServer(('0.0.0.0', 8000), MetricsHandler) |
| 299 | +httpd.serve_forever() |
| 300 | +``` |
| 301 | + |
| 302 | +## Best Practices |
| 303 | + |
| 304 | +1. **Clean metrics directory on startup** to avoid stale multiprocess metrics |
| 305 | +2. **Monitor disk space** as metrics files can grow with many task types |
| 306 | +3. **Use appropriate update_interval** (10-60 seconds recommended) |
| 307 | +4. **Set up alerts** on error rates and high latencies |
| 308 | +5. **Monitor queue saturation** (`task_execution_queue_full_total`) for backpressure |
| 309 | +6. **Track API errors** by status code to identify authentication or server issues |
| 310 | +7. **Use p95/p99 latencies** for SLO monitoring rather than averages |
| 311 | + |
| 312 | +## Troubleshooting |
| 313 | + |
| 314 | +### Metrics file is empty |
| 315 | +- Ensure `MetricsCollector` is registered as an event listener |
| 316 | +- Check that workers are actually polling and executing tasks |
| 317 | +- Verify the metrics directory has write permissions |
| 318 | + |
| 319 | +### Stale metrics after restart |
| 320 | +- Clean the metrics directory on startup (see Configuration section) |
| 321 | +- Prometheus's `multiprocess` mode requires cleanup between runs |
| 322 | + |
| 323 | +### High memory usage |
| 324 | +- Reduce the sliding window size (default: 1000 observations) |
| 325 | +- Increase `update_interval` to write less frequently |
| 326 | +- Limit the number of unique label combinations |
| 327 | + |
| 328 | +### Missing metrics |
| 329 | +- Verify `metrics_settings` is passed to TaskHandler/TaskHandlerAsyncIO |
| 330 | +- Check that the SDK version supports the metric you're looking for |
| 331 | +- Ensure workers are properly registered and running |
0 commit comments