From 394ce55d1ea44f7be72716e12088e356b3c96b2b Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 16 Feb 2026 22:43:10 -0800 Subject: [PATCH 01/12] fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude --- ami/jobs/views.py | 11 +- ami/ml/orchestration/nats_queue.py | 7 +- scripts/psv2_integration_test.sh | 292 +++++++++++++++++++++++++++++ 3 files changed, 306 insertions(+), 4 deletions(-) create mode 100755 scripts/psv2_integration_test.sh diff --git a/ami/jobs/views.py b/ami/jobs/views.py index dd8da01b2..3cd2347c0 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -55,11 +55,12 @@ def filter_queryset(self, request, queryset, view): incomplete_only = url_boolean_param(request, "incomplete_only", default=False) # Filter to incomplete jobs if requested (checks "results" stage status) if incomplete_only: - # Create filters for each final state to exclude + # Exclude jobs with a terminal top-level status + queryset = queryset.exclude(status__in=JobState.final_states()) + + # Also exclude jobs where the "results" stage has a final state status final_states = JobState.final_states() exclude_conditions = Q() - - # Exclude jobs where the "results" stage has a final state status for state in final_states: # JSON path query to check if results stage status is in final states # @TODO move to a QuerySet method on Job model if/when this needs to be reused elsewhere @@ -233,6 +234,10 @@ def tasks(self, request, pk=None): if job.dispatch_mode != JobDispatchMode.ASYNC_API: raise ValidationError("Only async_api jobs have fetchable tasks") + # Don't fetch tasks from completed/failed/revoked jobs + if job.status in JobState.final_states(): + return Response({"tasks": []}) + # Validate that the job has a pipeline if not job.pipeline: raise ValidationError("This job does not have a pipeline configured") diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index fa7188627..b87c0159b 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -24,7 +24,12 @@ async def get_connection(nats_url: str): - nc = await nats.connect(nats_url) + nc = await nats.connect( + nats_url, + connect_timeout=5, + allow_reconnect=False, + max_reconnect_attempts=0, + ) js = nc.jetstream() return nc, js diff --git a/scripts/psv2_integration_test.sh b/scripts/psv2_integration_test.sh new file mode 100755 index 000000000..e622db999 --- /dev/null +++ b/scripts/psv2_integration_test.sh @@ -0,0 +1,292 @@ +#!/usr/bin/env bash +# +# PSv2 Integration Test +# +# Creates a collection + async job on the local Antenna stack, +# starts the ADC worker, streams logs from both sides, and +# monitors for errors. +# +# Usage: ./scripts/psv2_integration_test.sh [NUM_IMAGES] +# +# Requirements: +# - Antenna stack running (docker compose up -d) +# - conda env ami-py311 with ami-data-companion installed +# - .env in ami-data-companion with AMI_ANTENNA_API_AUTH_TOKEN set +# +set -euo pipefail + +# --- Config --- +NUM_IMAGES="${1:-20}" +PROJECT_ID=18 +PIPELINE_ID=3 # Quebec & Vermont moths +API_BASE="http://localhost:8000/api/v2" +TOKEN="644a0e28e2c09bda87c9ab3f6a002516f3dfb7ff" +ADC_DIR="/home/michael/Projects/AMI/ami-data-companion" +CONDA_BASE="$HOME/miniforge3" +LOG_DIR="/tmp/psv2-integration-test-$(date +%Y%m%d-%H%M%S)" + +mkdir -p "$LOG_DIR" + +ANTENNA_LOG="$LOG_DIR/antenna.log" +WORKER_LOG="$LOG_DIR/worker.log" +SUMMARY_LOG="$LOG_DIR/summary.log" + +# Cleanup function +cleanup() { + echo "" + echo "=== Cleaning up ===" + # Kill background processes + if [[ -n "${WORKER_PID:-}" ]] && kill -0 "$WORKER_PID" 2>/dev/null; then + echo "Stopping ADC worker (PID $WORKER_PID)..." + kill "$WORKER_PID" 2>/dev/null || true + wait "$WORKER_PID" 2>/dev/null || true + fi + if [[ -n "${ANTENNA_LOG_PID:-}" ]] && kill -0 "$ANTENNA_LOG_PID" 2>/dev/null; then + kill "$ANTENNA_LOG_PID" 2>/dev/null || true + fi + echo "Logs saved to: $LOG_DIR" + echo " - Antenna logs: $ANTENNA_LOG" + echo " - Worker logs: $WORKER_LOG" + echo " - Register logs: $LOG_DIR/register.log" + echo " - Summary: $SUMMARY_LOG" +} +trap cleanup EXIT + +auth_header="Authorization: Token $TOKEN" + +api_get() { + curl -sf -H "$auth_header" "$API_BASE$1" +} + +api_post() { + curl -sf -H "$auth_header" -H "Content-Type: application/json" -X POST "$API_BASE$1" -d "$2" +} + +api_post_empty() { + curl -sf -H "$auth_header" -X POST "$API_BASE$1" +} + +log() { + local msg="[$(date '+%H:%M:%S')] $1" + echo "$msg" + echo "$msg" >> "$SUMMARY_LOG" +} + +# --- Preflight checks --- +log "PSv2 Integration Test - $NUM_IMAGES images, project $PROJECT_ID, pipeline $PIPELINE_ID" +log "Log directory: $LOG_DIR" +echo "" + +echo "Checking Antenna API..." +if ! curl -sf -o /dev/null "$API_BASE/"; then + echo "ERROR: Antenna API not reachable at $API_BASE" + echo "Run: docker compose up -d" + exit 1 +fi +echo " Antenna API: OK" + +echo "Checking Docker services..." +DJANGO_STATUS=$(docker compose ps --format json django 2>/dev/null | jq -r '.State // .status' 2>/dev/null || echo "unknown") +CELERY_STATUS=$(docker compose ps --format json celeryworker 2>/dev/null | jq -r '.State // .status' 2>/dev/null || echo "unknown") +NATS_STATUS=$(docker compose ps --format json nats 2>/dev/null | jq -r '.State // .status' 2>/dev/null || echo "unknown") +echo " django: $DJANGO_STATUS" +echo " celeryworker: $CELERY_STATUS" +echo " nats: $NATS_STATUS" + +if [[ "$NATS_STATUS" != "running" ]]; then + echo "WARNING: NATS does not appear to be running. async_api dispatch requires NATS." +fi +echo "" + +# --- Step 1: Stream Antenna logs --- +log "Starting Antenna log stream..." +docker compose logs -f django celeryworker nats --since 0s > "$ANTENNA_LOG" 2>&1 & +ANTENNA_LOG_PID=$! + +# --- Step 2: Register pipelines with Antenna --- +log "Registering pipelines with Antenna for project $PROJECT_ID..." +REGISTER_LOG="$LOG_DIR/register.log" +( + source "$CONDA_BASE/etc/profile.d/conda.sh" + conda activate ami-py311 + cd "$ADC_DIR" + ami worker register "PSv2 integration test" --project "$PROJECT_ID" 2>&1 +) > "$REGISTER_LOG" 2>&1 +REGISTER_EXIT=$? + +if [[ "$REGISTER_EXIT" -eq 0 ]]; then + log "Pipeline registration: OK" +else + log "WARNING: Pipeline registration failed (exit $REGISTER_EXIT)" + tail -10 "$REGISTER_LOG" | tee -a "$SUMMARY_LOG" +fi +cat "$REGISTER_LOG" >> "$SUMMARY_LOG" + +# Verify registration via API +PIPELINES_AFTER=$(api_get "/ml/pipelines/?projects=$PROJECT_ID" | jq '.count') +log "Pipelines available for project $PROJECT_ID: $PIPELINES_AFTER" + +# --- Step 3: Start ADC worker --- +# Kill any stale workers from previous test runs to avoid task competition +if pgrep -f "ami worker" > /dev/null 2>&1; then + log "Killing stale ADC worker processes..." + pkill -f "ami worker" 2>/dev/null || true + sleep 2 +fi +log "Starting ADC worker..." +( + source "$CONDA_BASE/etc/profile.d/conda.sh" + conda activate ami-py311 + cd "$ADC_DIR" + AMI_NUM_WORKERS=0 ami worker --pipeline quebec_vermont_moths_2023 2>&1 +) > "$WORKER_LOG" 2>&1 & +WORKER_PID=$! +sleep 3 + +if ! kill -0 "$WORKER_PID" 2>/dev/null; then + log "ERROR: ADC worker failed to start. Check $WORKER_LOG" + tail -20 "$WORKER_LOG" + exit 1 +fi +log "ADC worker started (PID $WORKER_PID)" + +# --- Step 4: Create collection --- +log "Creating collection with $NUM_IMAGES random images..." +COLLECTION_RESP=$(api_post "/captures/collections/" "{ + \"name\": \"PSv2 integration test $(date '+%H:%M:%S')\", + \"project\": $PROJECT_ID, + \"method\": \"random\", + \"kwargs\": {\"size\": $NUM_IMAGES} +}") +COLLECTION_ID=$(echo "$COLLECTION_RESP" | jq -r '.id') +log "Created collection $COLLECTION_ID" + +# --- Step 5: Populate collection --- +log "Populating collection $COLLECTION_ID..." +POPULATE_RESP=$(api_post_empty "/captures/collections/$COLLECTION_ID/populate/") +POPULATE_JOB_ID=$(echo "$POPULATE_RESP" | jq -r '.job_id // .id // empty') +log "Populate job: $POPULATE_JOB_ID" + +# Wait for population to finish +for i in $(seq 1 30); do + sleep 2 + COLL_INFO=$(api_get "/captures/collections/$COLLECTION_ID/") + IMG_COUNT=$(echo "$COLL_INFO" | jq -r '.source_images_count') + if [[ "$IMG_COUNT" -gt 0 ]]; then + log "Collection populated: $IMG_COUNT images" + break + fi + if [[ $i -eq 30 ]]; then + log "ERROR: Collection not populated after 60s" + exit 1 + fi +done + +# --- Step 6: Create and start the ML job --- +log "Creating async ML job..." +JOB_RESP=$(api_post "/jobs/?start_now=true" "{ + \"name\": \"PSv2 integration test $(date '+%H:%M:%S')\", + \"project_id\": $PROJECT_ID, + \"pipeline_id\": $PIPELINE_ID, + \"source_image_collection_id\": $COLLECTION_ID, + \"delay\": 0, + \"shuffle\": true +}") +JOB_ID=$(echo "$JOB_RESP" | jq -r '.id') +JOB_STATUS=$(echo "$JOB_RESP" | jq -r '.status') +log "Created job $JOB_ID (status: $JOB_STATUS)" + +# --- Step 7: Poll job progress --- +log "Polling job progress..." +PREV_STATUS="" +PREV_PROGRESS="" +POLL_INTERVAL=3 +MAX_POLLS=200 # ~10 minutes + +for i in $(seq 1 $MAX_POLLS); do + sleep "$POLL_INTERVAL" + + JOB_INFO=$(api_get "/jobs/$JOB_ID/") + STATUS=$(echo "$JOB_INFO" | jq -r '.status') + DISPATCH=$(echo "$JOB_INFO" | jq -r '.dispatch_mode') + PROGRESS_PCT=$(echo "$JOB_INFO" | jq -r '.progress.summary.progress // 0') + STAGE_INFO=$(echo "$JOB_INFO" | jq -r ' + [.progress.stages[]? | + (.params // [] | map({(.key): .value}) | add // {}) as $p | + "\(.name): \(.progress // 0 | . * 100 | round)% (processed=\($p.processed // "?"), remaining=\($p.remaining // "?"), failed=\($p.failed // "?"))"] + | join(" | ")') + + # Only print when something changes + if [[ "$STATUS" != "$PREV_STATUS" || "$PROGRESS_PCT" != "$PREV_PROGRESS" ]]; then + log " [$i] status=$STATUS dispatch=$DISPATCH progress=$PROGRESS_PCT $STAGE_INFO" + PREV_STATUS="$STATUS" + PREV_PROGRESS="$PROGRESS_PCT" + fi + + if [[ "$STATUS" == "SUCCESS" || "$STATUS" == "FAILURE" || "$STATUS" == "REVOKED" ]]; then + break + fi + + # Check worker is still alive + if ! kill -0 "$WORKER_PID" 2>/dev/null; then + log "WARNING: ADC worker died during job processing!" + log "Last 20 lines of worker log:" + tail -20 "$WORKER_LOG" | tee -a "$SUMMARY_LOG" + fi +done + +# --- Step 8: Final job state --- +echo "" +JOB_FINAL=$(api_get "/jobs/$JOB_ID/") +FINAL_STATUS=$(echo "$JOB_FINAL" | jq -r '.status') +FINAL_DISPATCH=$(echo "$JOB_FINAL" | jq -r '.dispatch_mode') +STARTED_AT=$(echo "$JOB_FINAL" | jq -r '.started_at // "N/A"') +FINISHED_AT=$(echo "$JOB_FINAL" | jq -r '.finished_at // "N/A"') + +log "=== Job $JOB_ID Final State ===" +log " Status: $FINAL_STATUS" +log " Dispatch: $FINAL_DISPATCH" +log " Started: $STARTED_AT" +log " Finished: $FINISHED_AT" + +# Print stage details +echo "$JOB_FINAL" | jq -r '.progress.stages[]? | (.params // [] | map({(.key): .value}) | add // {}) as $p | " Stage \(.name): progress=\(.progress // 0 | . * 100 | round)% processed=\($p.processed // "?") remaining=\($p.remaining // "?") failed=\($p.failed // "?")"' | while read -r line; do + log "$line" +done + +# --- Step 9: Scan logs for errors --- +echo "" +log "=== Log Analysis ===" + +ANTENNA_ERRORS=$(grep -ciE 'ERROR|Traceback|CRITICAL' "$ANTENNA_LOG" 2>/dev/null || echo 0) +ANTENNA_WARNINGS=$(grep -ciE 'WARNING' "$ANTENNA_LOG" 2>/dev/null || echo 0) +WORKER_ERRORS=$(grep -ciE 'ERROR|Traceback|CRITICAL' "$WORKER_LOG" 2>/dev/null || echo 0) +WORKER_WARNINGS=$(grep -ciE 'WARNING' "$WORKER_LOG" 2>/dev/null || echo 0) + +log "Antenna logs: $ANTENNA_ERRORS errors, $ANTENNA_WARNINGS warnings" +log "Worker logs: $WORKER_ERRORS errors, $WORKER_WARNINGS warnings" + +if [[ "$ANTENNA_ERRORS" -gt 0 ]]; then + echo "" + log "--- Antenna Errors ---" + grep -iE 'ERROR|Traceback|CRITICAL' "$ANTENNA_LOG" | grep -v "NoSuchKey\|image dimensions" | head -30 | tee -a "$SUMMARY_LOG" +fi + +if [[ "$WORKER_ERRORS" -gt 0 ]]; then + echo "" + log "--- Worker Errors ---" + grep -iE 'ERROR|Traceback|CRITICAL' "$WORKER_LOG" | head -30 | tee -a "$SUMMARY_LOG" +fi + +# --- Step 10: Final verdict --- +echo "" +if [[ "$FINAL_STATUS" == "SUCCESS" && "$WORKER_ERRORS" -eq 0 ]]; then + log "RESULT: PASS - Job completed successfully with no worker errors" + exit 0 +elif [[ "$FINAL_STATUS" == "SUCCESS" ]]; then + log "RESULT: WARN - Job completed but worker had $WORKER_ERRORS errors (check logs)" + exit 0 +else + log "RESULT: FAIL - Job status: $FINAL_STATUS" + exit 1 +fi From 07cc18ac5737c9bf510094755e00e9e0185ad4a4 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 16 Feb 2026 22:43:18 -0800 Subject: [PATCH 02/12] docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude --- .../planning/nats-flooding-prevention.md | 125 ++++++++++++++++++ .../2026-02-16-psv2-integration-test.md | 125 ++++++++++++++++++ 2 files changed, 250 insertions(+) create mode 100644 docs/claude/planning/nats-flooding-prevention.md create mode 100644 docs/claude/sessions/2026-02-16-psv2-integration-test.md diff --git a/docs/claude/planning/nats-flooding-prevention.md b/docs/claude/planning/nats-flooding-prevention.md new file mode 100644 index 000000000..8aabae177 --- /dev/null +++ b/docs/claude/planning/nats-flooding-prevention.md @@ -0,0 +1,125 @@ +# NATS Flooding Prevention & Event Loop Blocking + +**Date:** 2026-02-16 +**Context:** PSv2 integration test exposed Django becoming unresponsive due to NATS connection issues + +## Problem + +When NATS becomes temporarily unreachable or connections are interrupted, Django's entire HTTP server hangs. This was observed during integration testing when: + +1. A stale job (1365) in STARTED status continuously attempted to reserve NATS tasks +2. The ADC worker spawned 16 DataLoader subprocesses, all hammering `/jobs/1365/tasks?batch=64` +3. Each `/tasks` request opens a new NATS connection and blocks a uvicorn worker thread +4. NATS connections timed out, triggering the nats.py client's reconnection loop +5. The reconnection loop consumed Django's shared event loop, blocking ALL HTTP requests +6. Even endpoints that don't use NATS (like `/ml/pipelines/`) became unreachable + +## Root Causes + +### 1. `nats.connect()` uses default reconnection behavior +**File:** `ami/ml/orchestration/nats_queue.py:26-29` +```python +async def get_connection(nats_url: str): + nc = await nats.connect(nats_url) # No connect_timeout, allow_reconnect defaults to True + js = nc.jetstream() + return nc, js +``` + +**Fix (APPLIED):** Added `connect_timeout=5, allow_reconnect=False, max_reconnect_attempts=0` to `nats.connect()`. Since we create a new connection per operation via context manager, we never need the client's built-in reconnection. + +### 2. `/tasks` endpoint doesn't check job status +**File:** `ami/jobs/views.py:232-255` +The endpoint checked `dispatch_mode` but not job status. A FAILURE/SUCCESS job still tried to fetch from NATS. + +**Fix (APPLIED):** Added guard: `if job.status in JobState.final_states(): return Response({"tasks": []})`. + +### 3. `incomplete_only` filter only checked progress JSON, not top-level status +**File:** `ami/jobs/views.py:50-69` (`IncompleteJobFilter`) +The filter only checked the "results" stage status in the progress JSON. Jobs manually set to FAILURE (without updating progress stages) slipped through. + +**Fix (APPLIED):** Added `queryset.exclude(status__in=JobState.final_states())` before the progress JSON check. + +### 4. No timeout on stream/consumer operations +**File:** `ami/ml/orchestration/nats_queue.py:77-124` +`_ensure_stream()` and `_ensure_consumer()` call JetStream API without explicit timeouts. If NATS is slow, these block indefinitely. + +**Status:** TODO + +### 5. Leaked NATS connections from interrupted requests +When an HTTP request is interrupted (client disconnect, test script killed), the `TaskQueueManager.__aexit__` may not run, leaving a NATS connection open. With `allow_reconnect=True` (the old default), that connection's reconnection callbacks consumed the event loop. + +**Status:** Mitigated by `allow_reconnect=False` fix. + +### 6. `async_to_sync()` blocks Django worker threads +**Files:** `ami/jobs/views.py:253`, `ami/ml/orchestration/jobs.py:119`, `ami/jobs/tasks.py:191` + +Every NATS operation wraps async code with `async_to_sync()`, which creates or reuses a thread-local event loop. If the async operation hangs (stuck NATS connection), the Django worker thread is permanently blocked. + +**Status:** TODO — wrap with `asyncio.wait_for()` inside the async function. + +### 7. Stale ADC workers compete for tasks (test infrastructure issue) +The test script starts an ADC worker but doesn't kill stale workers from previous runs. With 2 GPUs, `mp.spawn(nprocs=2)` forks 2 child processes. If a previous worker is still running, its DataLoader subprocesses race with the new worker for NATS messages. In the 2026-02-16 test, 147 `/tasks` requests were logged — the stale worker consumed all 20 NATS messages, leaving 0 for the new worker. + +**Fix:** Add `pkill -f "ami worker"` cleanup before starting the worker in the test script. + +## Additional TODOs from Integration Testing + +### 7. `/tasks/` endpoint should support multiple pipelines +The endpoint should allow workers to pass in multiple pipeline slugs, or return all available tasks for projects the token has access to (no pipeline filter = all). + +**Status:** TODO + +### 8. ADC worker should use trailing slashes +The ADC worker requests `/api/v2/jobs/1365/tasks?batch=64` without trailing slash, causing 301 redirects. Each redirect doubles the request overhead. + +**Status:** TODO (ADC-side fix in `ami-data-companion`) + +### 9. `dispatch_mode` should be set on job init, not `run()` +Currently `dispatch_mode` is set when the job starts running. It should be set at job creation time so the API can filter by it before the job runs. + +**Status:** TODO + +### 10. Processing service online status (GitHub #1122) +Show online status of registered processing services. +**See:** https://github.com/RolnickLab/antenna/issues/1122 + +### 11. Show which workers pick up a job/task (GitHub #1112) +At minimum, log which worker processes each task. +**See:** https://github.com/RolnickLab/antenna/issues/1112 + +## Applied Changes Summary + +| File | Change | Status | +|------|--------|--------| +| `ami/ml/orchestration/nats_queue.py:26-32` | `connect_timeout=5, allow_reconnect=False` | APPLIED | +| `ami/jobs/views.py:237-238` | Guard `/tasks` for terminal status jobs | APPLIED | +| `ami/jobs/views.py:59` | `incomplete_only` also checks top-level status | APPLIED | + +## Remaining TODOs + +| Priority | Issue | Impact | +|----------|-------|--------| +| P1 | Timeout on JetStream stream/consumer ops | Prevents indefinite blocking | +| P1 | `async_to_sync()` timeout wrapper | Prevents thread exhaustion | +| P2 | `/tasks/` multi-pipeline support | Worker efficiency | +| P2 | ADC trailing slashes | Removes 301 overhead | +| P2 | `dispatch_mode` on job init | Correct filtering at creation time | +| P3 | Stale job auto-cleanup (Celery Beat) | Prevents future flooding | +| P3 | Circuit breaker for NATS failures | Graceful degradation | +| P3 | #1122: Processing service online status | UX | +| P3 | #1112: Worker tracking in logs | Observability | + +## Related Files + +| File | Lines | What | +|------|-------|------| +| `ami/ml/orchestration/nats_queue.py` | 26-32 | `get_connection()` — FIXED with timeouts | +| `ami/ml/orchestration/nats_queue.py` | 77-124 | Stream/consumer ops — needs timeouts | +| `ami/ml/orchestration/nats_queue.py` | 159-214 | `reserve_task()` — has timeout but connection may block | +| `ami/jobs/views.py` | 50-69 | `IncompleteJobFilter` — FIXED | +| `ami/jobs/views.py` | 237-238 | `/tasks` status guard — FIXED | +| `ami/jobs/views.py` | 243-256 | `/tasks/` endpoint — `async_to_sync()` blocks thread | +| `ami/ml/orchestration/jobs.py` | 119 | `queue_images_to_nats()` — `async_to_sync()` blocks thread | +| `ami/jobs/tasks.py` | 184-199 | `_ack_task_via_nats()` — per-ACK connection (expensive) | +| `docs/claude/debugging/nats-triage.md` | Full | Previous NATS debugging findings | +| `docs/claude/nats-todo.md` | Full | NATS infrastructure improvements tracker | diff --git a/docs/claude/sessions/2026-02-16-psv2-integration-test.md b/docs/claude/sessions/2026-02-16-psv2-integration-test.md new file mode 100644 index 000000000..b13802692 --- /dev/null +++ b/docs/claude/sessions/2026-02-16-psv2-integration-test.md @@ -0,0 +1,125 @@ +# PSv2 Integration Test Session - 2026-02-16 + +## Summary + +Attempted end-to-end PSv2 integration test on main. Discovered several blocking issues preventing completion. + +## Test Setup + +- Antenna: main branch, all services via docker compose +- ADC: main branch, conda env ami-py311, freshly installed +- Script: `scripts/psv2_integration_test.sh 20` + +## What Worked + +- Pipeline registration: OK (10 pipelines, all models loaded) +- Collection creation & population: OK +- Job creation with `start_now=true`: OK +- Image queuing to NATS: OK (20 images published) +- ADC worker task fetching from NATS: OK (tasks retrieved) + +## What Failed + +### 1. ADC worker reports "Done, detections: 0" — stale worker stole tasks +- Root cause: the stale ADC worker from test run #1 was never killed +- It found job 1377 and consumed all 20 NATS messages before the new worker +- Antenna logs show 147 `/tasks?batch=64` requests, ~35 distinct source ports +- The new worker (AMI_NUM_WORKERS=0) found an empty NATS queue → "No more tasks" +- The 39-second delay was caused by queuing behind stale worker requests on single uvicorn thread +- **Not an ADC code bug** — the stale worker race condition caused 0 results +- **Next test must kill ALL ADC worker processes before starting** +- **TODO (test script)**: Add `pkill -f "ami worker"` cleanup step before starting worker + +### 2. Django overloaded by concurrent /tasks requests +- uvicorn runs with 1 worker in dev mode (no `--workers` flag) +- ADC spawns 16 DataLoader subprocesses by default, all hit `/tasks` simultaneously +- Each request blocks on NATS connection + operation (up to 5s with our fix) +- 16 × 5s = 80s serialized wait → all requests timeout +- **Fix applied**: Set `AMI_NUM_WORKERS=0` in test script (1 subprocess) +- **TODO**: Add `--workers 4` to uvicorn dev config, or connection pooling + +### 3. Stale NATS connections block Django event loop +- `nats.connect()` defaulted to `allow_reconnect=True` with long timeouts +- Leaked connections from interrupted requests spawned reconnection loops +- These loops consumed the shared async event loop, blocking ALL requests +- **Fix applied**: `connect_timeout=5, allow_reconnect=False` in `get_connection()` + +### 4. Stale jobs not filtered by incomplete_only +- `IncompleteJobFilter` only checked progress JSON stages, not top-level `status` +- Jobs manually set to FAILURE without progress update slipped through +- ADC worker picked up stale jobs and hammered `/tasks` endlessly +- **Fix applied**: Also exclude `status__in=JobState.final_states()` + +### 5. /tasks endpoint doesn't guard against terminal jobs +- A FAILURE/SUCCESS/REVOKED job still tried to fetch from NATS +- **Fix applied**: Return empty `{"tasks": []}` for terminal status jobs + +### 6. RabbitMQ stale connections +- After days of uptime, Django's AMQP connection to RabbitMQ goes stale +- `ConnectionResetError: [Errno 104]` when enqueuing Celery tasks +- **Fix**: Restart Django/Celery or full `docker compose down && up` + +## Applied Code Changes + +### `ami/ml/orchestration/nats_queue.py:26-32` +```python +async def get_connection(nats_url: str): + nc = await nats.connect( + nats_url, + connect_timeout=5, + allow_reconnect=False, + max_reconnect_attempts=0, + ) +``` + +### `ami/jobs/views.py:59` +```python +# IncompleteJobFilter - also exclude by top-level status +queryset = queryset.exclude(status__in=JobState.final_states()) +``` + +### `ami/jobs/views.py:237-238` +```python +# /tasks endpoint - guard against terminal jobs +if job.status in JobState.final_states(): + return Response({"tasks": []}) +``` + +### `scripts/psv2_integration_test.sh:135` +```bash +AMI_NUM_WORKERS=0 ami worker --pipeline quebec_vermont_moths_2023 2>&1 +``` + +## Remaining TODOs + +Full list in `docs/claude/planning/nats-flooding-prevention.md`. + +Key items: +1. **Re-run integration test** — kill all stale ADC workers first, then test with clean state +2. **Test script: kill stale workers** — add `pkill -f "ami worker"` before starting +3. **NATS connection pooling** — PR #1130 attempted this but had issues +4. **uvicorn workers** — add `--workers 4` to dev config +5. **JetStream operation timeouts** — `_ensure_stream()`, `_ensure_consumer()` have no timeouts +6. **ADC trailing slashes** — causes 301 redirects on every request +7. **`dispatch_mode` on job init** — should be set at creation, not run() +8. **Multi-pipeline /tasks endpoint** — let workers request tasks for multiple pipelines +9. **GitHub #1122** — processing service online status +10. **GitHub #1112** — worker tracking in logs + +## Next Session Prompt + +``` +Continue PSv2 integration testing. Key files changed (uncommitted): +- ami/ml/orchestration/nats_queue.py:26-32 (NATS connection safety) +- ami/jobs/views.py:59,237-238 (stale job filtering + /tasks guard) +- scripts/psv2_integration_test.sh:135 (reduced worker count) + +The 0-detections issue was caused by a stale ADC worker from test run #1 +consuming all NATS messages. Not a code bug. Kill ALL "ami worker" processes +before re-running. + +Add pkill cleanup to test script, commit fixes, re-run test. + +Full findings: docs/claude/planning/nats-flooding-prevention.md +Session notes: docs/claude/sessions/2026-02-16-psv2-integration-test.md +``` From ddd560d01a2936f75edafa84ef4d66c29bec781b Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 16 Feb 2026 23:04:30 -0800 Subject: [PATCH 03/12] docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude --- .../2026-02-16-psv2-integration-test.md | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/docs/claude/sessions/2026-02-16-psv2-integration-test.md b/docs/claude/sessions/2026-02-16-psv2-integration-test.md index b13802692..79f7da978 100644 --- a/docs/claude/sessions/2026-02-16-psv2-integration-test.md +++ b/docs/claude/sessions/2026-02-16-psv2-integration-test.md @@ -106,19 +106,37 @@ Key items: 9. **GitHub #1122** — processing service online status 10. **GitHub #1112** — worker tracking in logs +## Test Run #3 — SUCCESS (22:51 - 23:03) + +After restarting Django/Celery (stale AMQP connection), the test passed end-to-end: +- Job 1380: 20/20 images processed, status=SUCCESS +- Total time: ~11 minutes (including ~5 min NATS ack_wait delay) +- Detection + classification + result posting all worked + +**Performance issue found:** Both GPU processes race for tasks from the same NATS consumer. One gets all 20 messages, the other gets nothing. The unacked messages wait for `ack_wait=300s` to expire before redelivery. This added ~5 minutes of idle time. + +**Fix needed:** Either: +1. Reduce `ack_wait` from 300s to something smaller (30-60s) for dev +2. Ensure only one GPU process fetches tasks per batch (ADC-side coordination) +3. Use NATS NAK to immediately release unfetchable tasks + +**ADC trailing slashes:** Fixed on `fix/trailing-slashes` branch in ami-data-companion. + ## Next Session Prompt ``` -Continue PSv2 integration testing. Key files changed (uncommitted): -- ami/ml/orchestration/nats_queue.py:26-32 (NATS connection safety) -- ami/jobs/views.py:59,237-238 (stale job filtering + /tasks guard) -- scripts/psv2_integration_test.sh:135 (reduced worker count) +PSv2 integration test PASSED on main (job 1380, 20 images, SUCCESS). + +Committed fixes on branch fix/nats-connection-safety (PR #1135): +- NATS connection safety (connect_timeout, no reconnect) +- Stale job filtering + /tasks terminal status guard +- Test script stale worker cleanup -The 0-detections issue was caused by a stale ADC worker from test run #1 -consuming all NATS messages. Not a code bug. Kill ALL "ami worker" processes -before re-running. +Key remaining issue: NATS ack_wait=300s causes ~5min idle time when +GPU processes race for tasks. Consider reducing ack_wait or adding +NAK for unfetchable tasks. -Add pkill cleanup to test script, commit fixes, re-run test. +ADC trailing slashes fixed on fix/trailing-slashes branch (ami-data-companion). Full findings: docs/claude/planning/nats-flooding-prevention.md Session notes: docs/claude/sessions/2026-02-16-psv2-integration-test.md From 367343c1eea10b1b7e2ca8bc9c3a07fbdd8a4089 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 16 Feb 2026 23:17:52 -0800 Subject: [PATCH 04/12] fix: batch NATS task fetch to prevent HTTP timeouts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude --- ami/jobs/views.py | 7 +-- ami/ml/orchestration/nats_queue.py | 54 +++++++--------- ami/ml/orchestration/tests/test_nats_queue.py | 63 +++++++++++++------ 3 files changed, 68 insertions(+), 56 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 3cd2347c0..e29b367b5 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -246,13 +246,8 @@ def tasks(self, request, pk=None): from ami.ml.orchestration.nats_queue import TaskQueueManager async def get_tasks(): - tasks = [] async with TaskQueueManager() as manager: - for _ in range(batch): - task = await manager.reserve_task(job.pk, timeout=0.1) - if task: - tasks.append(task.dict()) - return tasks + return [task.dict() for task in await manager.reserve_tasks(job.pk, count=batch)] # Use async_to_sync to properly handle the async call tasks = async_to_sync(get_tasks)() diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index b87c0159b..96852bf5c 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -44,8 +44,8 @@ class TaskQueueManager: Use as an async context manager: async with TaskQueueManager() as manager: await manager.publish_task('job123', {'data': 'value'}) - task = await manager.reserve_task('job123') - await manager.acknowledge_task(task['reply_subject']) + tasks = await manager.reserve_tasks('job123', count=64) + await manager.acknowledge_task(tasks[0].reply_subject) """ def __init__(self, nats_url: str | None = None): @@ -161,62 +161,52 @@ async def publish_task(self, job_id: int, data: PipelineProcessingTask) -> bool: logger.error(f"Failed to publish task to stream for job '{job_id}': {e}") return False - async def reserve_task(self, job_id: int, timeout: float | None = None) -> PipelineProcessingTask | None: + async def reserve_tasks(self, job_id: int, count: int, timeout: float = 5) -> list[PipelineProcessingTask]: """ - Reserve a task from the specified stream. + Reserve up to `count` tasks from the specified stream in a single NATS fetch. Args: job_id: The job ID (integer primary key) to pull tasks from - timeout: Timeout in seconds for reservation (default: 5 seconds) + count: Maximum number of tasks to reserve + timeout: Timeout in seconds waiting for messages (default: 5 seconds) Returns: - PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available + List of PipelineProcessingTask objects with reply_subject set for acknowledgment. + May return fewer than `count` if the queue has fewer messages available. """ if self.js is None: raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") - if timeout is None: - timeout = 5 - try: - # Ensure stream and consumer exist await self._ensure_stream(job_id) await self._ensure_consumer(job_id) consumer_name = self._get_consumer_name(job_id) subject = self._get_subject(job_id) - # Create ephemeral subscription for this pull psub = await self.js.pull_subscribe(subject, consumer_name) try: - # Fetch a single message - msgs = await psub.fetch(1, timeout=timeout) - - if msgs: - msg = msgs[0] - task_data = json.loads(msg.data.decode()) - metadata = msg.metadata - - # Parse the task data into PipelineProcessingTask - task = PipelineProcessingTask(**task_data) - # Set the reply_subject for acknowledgment - task.reply_subject = msg.reply - - logger.debug(f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}") - return task - + msgs = await psub.fetch(count, timeout=timeout) except nats.errors.TimeoutError: - # No messages available logger.debug(f"No tasks available in stream for job '{job_id}'") - return None + return [] finally: - # Always unsubscribe await psub.unsubscribe() + tasks = [] + for msg in msgs: + task_data = json.loads(msg.data.decode()) + task = PipelineProcessingTask(**task_data) + task.reply_subject = msg.reply + tasks.append(task) + + logger.info(f"Reserved {len(tasks)} tasks from stream for job '{job_id}'") + return tasks + except Exception as e: - logger.error(f"Failed to reserve task from stream for job '{job_id}': {e}") - return None + logger.error(f"Failed to reserve tasks from stream for job '{job_id}': {e}") + return [] async def acknowledge_task(self, reply_subject: str) -> bool: """ diff --git a/ami/ml/orchestration/tests/test_nats_queue.py b/ami/ml/orchestration/tests/test_nats_queue.py index 0cd2c3bef..a7bd91b68 100644 --- a/ami/ml/orchestration/tests/test_nats_queue.py +++ b/ami/ml/orchestration/tests/test_nats_queue.py @@ -62,47 +62,74 @@ async def test_publish_task_creates_stream_and_consumer(self): self.assertIn("job_456", str(js.add_stream.call_args)) js.add_consumer.assert_called_once() - async def test_reserve_task_success(self): - """Test successful task reservation.""" + async def test_reserve_tasks_success(self): + """Test successful batch task reservation.""" nc, js = self._create_mock_nats_connection() sample_task = self._create_sample_task() - # Mock message with task data - mock_msg = MagicMock() - mock_msg.data = sample_task.json().encode() - mock_msg.reply = "reply.subject.123" - mock_msg.metadata = MagicMock(sequence=MagicMock(stream=1)) + # Mock messages with task data + mock_msg1 = MagicMock() + mock_msg1.data = sample_task.json().encode() + mock_msg1.reply = "reply.subject.1" + + mock_msg2 = MagicMock() + mock_msg2.data = sample_task.json().encode() + mock_msg2.reply = "reply.subject.2" mock_psub = MagicMock() - mock_psub.fetch = AsyncMock(return_value=[mock_msg]) + mock_psub.fetch = AsyncMock(return_value=[mock_msg1, mock_msg2]) mock_psub.unsubscribe = AsyncMock() js.pull_subscribe = AsyncMock(return_value=mock_psub) with patch("ami.ml.orchestration.nats_queue.get_connection", AsyncMock(return_value=(nc, js))): async with TaskQueueManager() as manager: - task = await manager.reserve_task(123) + tasks = await manager.reserve_tasks(123, count=5) - self.assertIsNotNone(task) - self.assertEqual(task.id, sample_task.id) - self.assertEqual(task.reply_subject, "reply.subject.123") + self.assertEqual(len(tasks), 2) + self.assertEqual(tasks[0].id, sample_task.id) + self.assertEqual(tasks[0].reply_subject, "reply.subject.1") + self.assertEqual(tasks[1].reply_subject, "reply.subject.2") + mock_psub.fetch.assert_called_once_with(5, timeout=5) mock_psub.unsubscribe.assert_called_once() - async def test_reserve_task_no_messages(self): - """Test reserve_task when no messages are available.""" + async def test_reserve_tasks_no_messages(self): + """Test reserve_tasks when no messages are available (timeout).""" nc, js = self._create_mock_nats_connection() + import nats.errors mock_psub = MagicMock() - mock_psub.fetch = AsyncMock(return_value=[]) + mock_psub.fetch = AsyncMock(side_effect=nats.errors.TimeoutError) mock_psub.unsubscribe = AsyncMock() js.pull_subscribe = AsyncMock(return_value=mock_psub) with patch("ami.ml.orchestration.nats_queue.get_connection", AsyncMock(return_value=(nc, js))): async with TaskQueueManager() as manager: - task = await manager.reserve_task(123) + tasks = await manager.reserve_tasks(123, count=5) - self.assertIsNone(task) + self.assertEqual(tasks, []) mock_psub.unsubscribe.assert_called_once() + async def test_reserve_tasks_single(self): + """Test reserving a single task.""" + nc, js = self._create_mock_nats_connection() + sample_task = self._create_sample_task() + + mock_msg = MagicMock() + mock_msg.data = sample_task.json().encode() + mock_msg.reply = "reply.subject.123" + + mock_psub = MagicMock() + mock_psub.fetch = AsyncMock(return_value=[mock_msg]) + mock_psub.unsubscribe = AsyncMock() + js.pull_subscribe = AsyncMock(return_value=mock_psub) + + with patch("ami.ml.orchestration.nats_queue.get_connection", AsyncMock(return_value=(nc, js))): + async with TaskQueueManager() as manager: + tasks = await manager.reserve_tasks(123, count=1) + + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks[0].reply_subject, "reply.subject.123") + async def test_acknowledge_task_success(self): """Test successful task acknowledgment.""" nc, js = self._create_mock_nats_connection() @@ -144,7 +171,7 @@ async def test_operations_without_connection_raise_error(self): await manager.publish_task(123, sample_task) with self.assertRaisesRegex(RuntimeError, "Connection is not open"): - await manager.reserve_task(123) + await manager.reserve_tasks(123, count=1) with self.assertRaisesRegex(RuntimeError, "Connection is not open"): await manager.delete_stream(123) From 17d4c6b0ef91037471a1bbc9dcf1da9d553b720e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 16 Feb 2026 23:20:26 -0800 Subject: [PATCH 05/12] docs: add next session prompt --- .../prompts/next-session-nats-batch-test.md | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 docs/claude/prompts/next-session-nats-batch-test.md diff --git a/docs/claude/prompts/next-session-nats-batch-test.md b/docs/claude/prompts/next-session-nats-batch-test.md new file mode 100644 index 000000000..398c288d5 --- /dev/null +++ b/docs/claude/prompts/next-session-nats-batch-test.md @@ -0,0 +1,76 @@ +# Next Session: Integration Test with Batch NATS Fetch + +## Context + +PR #1135 (`fix/nats-connection-safety`) now has 4 commits: + +1. `394ce55d` — NATS connection safety (connect_timeout, no reconnect) + stale job filtering + /tasks terminal guard + test script cleanup +2. `07cc18ac` — Session notes +3. `ddd560d0` — Updated session notes with test run #3 results +4. `367343c1` — **Batch NATS task fetch** (`reserve_tasks()` replaces N×1 `reserve_task()`) + +Previous integration test (run #3) passed end-to-end (job 1380, 20/20 images, SUCCESS) but took ~11 minutes due to a 5-minute NATS `ack_wait` delay. Root cause: the `/tasks` endpoint made 320 NATS round trips for `batch=64`, exceeding the ADC's HTTP timeout. Tasks were consumed at the NATS level but lost at the HTTP level, requiring `ack_wait=300s` redelivery. + +The batch fetch fix should eliminate this delay entirely. + +## Task + +Run integration test #4 to verify: +1. The batch fetch eliminates the ~5-minute idle time +2. End-to-end PSv2 flow still works (detection + classification + result posting) +3. Total time should be ~6 minutes or less (down from ~11) + +## Steps + +1. Make sure the `fix/nats-connection-safety` branch is checked out +2. Restart services to pick up code changes: + ```bash + docker compose restart django celeryworker celerybeat + ``` +3. Kill any stale ADC workers: + ```bash + pkill -f "ami worker" || true + ``` +4. Run the integration test: + ```bash + cd ~/Projects/AMI/ami-data-companion + conda activate ami-py311 + bash ~/Projects/AMI/antenna/scripts/psv2_integration_test.sh 20 + ``` +5. Monitor results — look for: + - Tasks fetched quickly (no 5-min delay) + - 20/20 images processed + - Job status = SUCCESS + +## ADC Setup Note + +The ADC should be on the `fix/trailing-slashes` branch (committed `eac7481`) to avoid 301 redirects on `/tasks` and `/jobs` endpoints. + +```bash +cd ~/Projects/AMI/ami-data-companion +git checkout fix/trailing-slashes +conda activate ami-py311 +pip install -e . +``` + +## Remaining TODOs (from PR #1135) + +After the test passes, these items remain for future work: + +- [ ] NATS connection pooling (singleton per process instead of per-request) +- [ ] uvicorn `--workers 4` for dev config +- [ ] JetStream operation timeouts on `_ensure_stream()` / `_ensure_consumer()` +- [ ] `dispatch_mode` set at job creation instead of `run()` +- [ ] Multi-pipeline `/tasks` endpoint (workers request tasks for any pipeline they support) +- [ ] ADC pipeline params from `/info` endpoint +- [ ] GitHub #1122 — processing service online status +- [ ] GitHub #1112 — worker tracking in logs + +## Key Files + +- `ami/ml/orchestration/nats_queue.py` — `TaskQueueManager.reserve_tasks()` (batch fetch) +- `ami/jobs/views.py:217-255` — `/tasks` endpoint +- `ami/ml/orchestration/tests/test_nats_queue.py` — 9 unit tests (all passing) +- `scripts/psv2_integration_test.sh` — integration test script +- `docs/claude/sessions/2026-02-16-psv2-integration-test.md` — previous session findings +- `docs/claude/planning/nats-flooding-prevention.md` — full findings doc From 88656c488f5f94bde216961d121ed68b4de38fed Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 16:55:55 -0800 Subject: [PATCH 06/12] feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude --- ami/jobs/tests.py | 30 ++++++++++++++++++++++++++++++ ami/jobs/views.py | 1 + 2 files changed, 31 insertions(+) diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index 7902faeb1..e94b75b9d 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -384,6 +384,36 @@ def test_filter_by_pipeline_slug(self): self.assertEqual(data["count"], 1) self.assertEqual(data["results"][0]["id"], job_with_pipeline.pk) + def test_filter_by_pipeline_slug_in(self): + """Test filtering jobs by pipeline__slug__in (multiple slugs).""" + pipeline_a = self._create_pipeline("Pipeline A", "pipeline-a") + pipeline_b = Pipeline.objects.create(name="Pipeline B", slug="pipeline-b", description="B") + pipeline_b.projects.add(self.project) + pipeline_c = Pipeline.objects.create(name="Pipeline C", slug="pipeline-c", description="C") + pipeline_c.projects.add(self.project) + + job_a = self._create_ml_job("Job A", pipeline_a) + job_b = self._create_ml_job("Job B", pipeline_b) + job_c = self._create_ml_job("Job C", pipeline_c) + + self.client.force_authenticate(user=self.user) + + # Filter for two of the three pipelines + jobs_list_url = reverse_with_params( + "api:job-list", + params={"project_id": self.project.pk, "pipeline__slug__in": "pipeline-a,pipeline-b"}, + ) + resp = self.client.get(jobs_list_url) + + self.assertEqual(resp.status_code, 200) + data = resp.json() + returned_ids = {job["id"] for job in data["results"]} + self.assertIn(job_a.pk, returned_ids) + self.assertIn(job_b.pk, returned_ids) + self.assertNotIn(job_c.pk, returned_ids) + # Original setUp job (no pipeline) should also be excluded + self.assertNotIn(self.job.pk, returned_ids) + def test_search_jobs(self): """Test searching jobs by name and pipeline name.""" pipeline = self._create_pipeline("SearchablePipeline", "searchable-pipeline") diff --git a/ami/jobs/views.py b/ami/jobs/views.py index e29b367b5..a1c8ee09e 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -32,6 +32,7 @@ class JobFilterSet(filters.FilterSet): """Custom filterset to enable pipeline name filtering.""" pipeline__slug = filters.CharFilter(field_name="pipeline__slug", lookup_expr="exact") + pipeline__slug__in = filters.BaseInFilter(field_name="pipeline__slug", lookup_expr="in") class Meta: model = Job From bd9cb648240ac26743daf5f943d71a8253cef2ff Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 16:59:36 -0800 Subject: [PATCH 07/12] chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude --- .../planning/nats-flooding-prevention.md | 125 -------- .../prompts/next-session-nats-batch-test.md | 76 ----- .../2026-02-16-psv2-integration-test.md | 143 --------- scripts/psv2_integration_test.sh | 292 ------------------ 4 files changed, 636 deletions(-) delete mode 100644 docs/claude/planning/nats-flooding-prevention.md delete mode 100644 docs/claude/prompts/next-session-nats-batch-test.md delete mode 100644 docs/claude/sessions/2026-02-16-psv2-integration-test.md delete mode 100755 scripts/psv2_integration_test.sh diff --git a/docs/claude/planning/nats-flooding-prevention.md b/docs/claude/planning/nats-flooding-prevention.md deleted file mode 100644 index 8aabae177..000000000 --- a/docs/claude/planning/nats-flooding-prevention.md +++ /dev/null @@ -1,125 +0,0 @@ -# NATS Flooding Prevention & Event Loop Blocking - -**Date:** 2026-02-16 -**Context:** PSv2 integration test exposed Django becoming unresponsive due to NATS connection issues - -## Problem - -When NATS becomes temporarily unreachable or connections are interrupted, Django's entire HTTP server hangs. This was observed during integration testing when: - -1. A stale job (1365) in STARTED status continuously attempted to reserve NATS tasks -2. The ADC worker spawned 16 DataLoader subprocesses, all hammering `/jobs/1365/tasks?batch=64` -3. Each `/tasks` request opens a new NATS connection and blocks a uvicorn worker thread -4. NATS connections timed out, triggering the nats.py client's reconnection loop -5. The reconnection loop consumed Django's shared event loop, blocking ALL HTTP requests -6. Even endpoints that don't use NATS (like `/ml/pipelines/`) became unreachable - -## Root Causes - -### 1. `nats.connect()` uses default reconnection behavior -**File:** `ami/ml/orchestration/nats_queue.py:26-29` -```python -async def get_connection(nats_url: str): - nc = await nats.connect(nats_url) # No connect_timeout, allow_reconnect defaults to True - js = nc.jetstream() - return nc, js -``` - -**Fix (APPLIED):** Added `connect_timeout=5, allow_reconnect=False, max_reconnect_attempts=0` to `nats.connect()`. Since we create a new connection per operation via context manager, we never need the client's built-in reconnection. - -### 2. `/tasks` endpoint doesn't check job status -**File:** `ami/jobs/views.py:232-255` -The endpoint checked `dispatch_mode` but not job status. A FAILURE/SUCCESS job still tried to fetch from NATS. - -**Fix (APPLIED):** Added guard: `if job.status in JobState.final_states(): return Response({"tasks": []})`. - -### 3. `incomplete_only` filter only checked progress JSON, not top-level status -**File:** `ami/jobs/views.py:50-69` (`IncompleteJobFilter`) -The filter only checked the "results" stage status in the progress JSON. Jobs manually set to FAILURE (without updating progress stages) slipped through. - -**Fix (APPLIED):** Added `queryset.exclude(status__in=JobState.final_states())` before the progress JSON check. - -### 4. No timeout on stream/consumer operations -**File:** `ami/ml/orchestration/nats_queue.py:77-124` -`_ensure_stream()` and `_ensure_consumer()` call JetStream API without explicit timeouts. If NATS is slow, these block indefinitely. - -**Status:** TODO - -### 5. Leaked NATS connections from interrupted requests -When an HTTP request is interrupted (client disconnect, test script killed), the `TaskQueueManager.__aexit__` may not run, leaving a NATS connection open. With `allow_reconnect=True` (the old default), that connection's reconnection callbacks consumed the event loop. - -**Status:** Mitigated by `allow_reconnect=False` fix. - -### 6. `async_to_sync()` blocks Django worker threads -**Files:** `ami/jobs/views.py:253`, `ami/ml/orchestration/jobs.py:119`, `ami/jobs/tasks.py:191` - -Every NATS operation wraps async code with `async_to_sync()`, which creates or reuses a thread-local event loop. If the async operation hangs (stuck NATS connection), the Django worker thread is permanently blocked. - -**Status:** TODO — wrap with `asyncio.wait_for()` inside the async function. - -### 7. Stale ADC workers compete for tasks (test infrastructure issue) -The test script starts an ADC worker but doesn't kill stale workers from previous runs. With 2 GPUs, `mp.spawn(nprocs=2)` forks 2 child processes. If a previous worker is still running, its DataLoader subprocesses race with the new worker for NATS messages. In the 2026-02-16 test, 147 `/tasks` requests were logged — the stale worker consumed all 20 NATS messages, leaving 0 for the new worker. - -**Fix:** Add `pkill -f "ami worker"` cleanup before starting the worker in the test script. - -## Additional TODOs from Integration Testing - -### 7. `/tasks/` endpoint should support multiple pipelines -The endpoint should allow workers to pass in multiple pipeline slugs, or return all available tasks for projects the token has access to (no pipeline filter = all). - -**Status:** TODO - -### 8. ADC worker should use trailing slashes -The ADC worker requests `/api/v2/jobs/1365/tasks?batch=64` without trailing slash, causing 301 redirects. Each redirect doubles the request overhead. - -**Status:** TODO (ADC-side fix in `ami-data-companion`) - -### 9. `dispatch_mode` should be set on job init, not `run()` -Currently `dispatch_mode` is set when the job starts running. It should be set at job creation time so the API can filter by it before the job runs. - -**Status:** TODO - -### 10. Processing service online status (GitHub #1122) -Show online status of registered processing services. -**See:** https://github.com/RolnickLab/antenna/issues/1122 - -### 11. Show which workers pick up a job/task (GitHub #1112) -At minimum, log which worker processes each task. -**See:** https://github.com/RolnickLab/antenna/issues/1112 - -## Applied Changes Summary - -| File | Change | Status | -|------|--------|--------| -| `ami/ml/orchestration/nats_queue.py:26-32` | `connect_timeout=5, allow_reconnect=False` | APPLIED | -| `ami/jobs/views.py:237-238` | Guard `/tasks` for terminal status jobs | APPLIED | -| `ami/jobs/views.py:59` | `incomplete_only` also checks top-level status | APPLIED | - -## Remaining TODOs - -| Priority | Issue | Impact | -|----------|-------|--------| -| P1 | Timeout on JetStream stream/consumer ops | Prevents indefinite blocking | -| P1 | `async_to_sync()` timeout wrapper | Prevents thread exhaustion | -| P2 | `/tasks/` multi-pipeline support | Worker efficiency | -| P2 | ADC trailing slashes | Removes 301 overhead | -| P2 | `dispatch_mode` on job init | Correct filtering at creation time | -| P3 | Stale job auto-cleanup (Celery Beat) | Prevents future flooding | -| P3 | Circuit breaker for NATS failures | Graceful degradation | -| P3 | #1122: Processing service online status | UX | -| P3 | #1112: Worker tracking in logs | Observability | - -## Related Files - -| File | Lines | What | -|------|-------|------| -| `ami/ml/orchestration/nats_queue.py` | 26-32 | `get_connection()` — FIXED with timeouts | -| `ami/ml/orchestration/nats_queue.py` | 77-124 | Stream/consumer ops — needs timeouts | -| `ami/ml/orchestration/nats_queue.py` | 159-214 | `reserve_task()` — has timeout but connection may block | -| `ami/jobs/views.py` | 50-69 | `IncompleteJobFilter` — FIXED | -| `ami/jobs/views.py` | 237-238 | `/tasks` status guard — FIXED | -| `ami/jobs/views.py` | 243-256 | `/tasks/` endpoint — `async_to_sync()` blocks thread | -| `ami/ml/orchestration/jobs.py` | 119 | `queue_images_to_nats()` — `async_to_sync()` blocks thread | -| `ami/jobs/tasks.py` | 184-199 | `_ack_task_via_nats()` — per-ACK connection (expensive) | -| `docs/claude/debugging/nats-triage.md` | Full | Previous NATS debugging findings | -| `docs/claude/nats-todo.md` | Full | NATS infrastructure improvements tracker | diff --git a/docs/claude/prompts/next-session-nats-batch-test.md b/docs/claude/prompts/next-session-nats-batch-test.md deleted file mode 100644 index 398c288d5..000000000 --- a/docs/claude/prompts/next-session-nats-batch-test.md +++ /dev/null @@ -1,76 +0,0 @@ -# Next Session: Integration Test with Batch NATS Fetch - -## Context - -PR #1135 (`fix/nats-connection-safety`) now has 4 commits: - -1. `394ce55d` — NATS connection safety (connect_timeout, no reconnect) + stale job filtering + /tasks terminal guard + test script cleanup -2. `07cc18ac` — Session notes -3. `ddd560d0` — Updated session notes with test run #3 results -4. `367343c1` — **Batch NATS task fetch** (`reserve_tasks()` replaces N×1 `reserve_task()`) - -Previous integration test (run #3) passed end-to-end (job 1380, 20/20 images, SUCCESS) but took ~11 minutes due to a 5-minute NATS `ack_wait` delay. Root cause: the `/tasks` endpoint made 320 NATS round trips for `batch=64`, exceeding the ADC's HTTP timeout. Tasks were consumed at the NATS level but lost at the HTTP level, requiring `ack_wait=300s` redelivery. - -The batch fetch fix should eliminate this delay entirely. - -## Task - -Run integration test #4 to verify: -1. The batch fetch eliminates the ~5-minute idle time -2. End-to-end PSv2 flow still works (detection + classification + result posting) -3. Total time should be ~6 minutes or less (down from ~11) - -## Steps - -1. Make sure the `fix/nats-connection-safety` branch is checked out -2. Restart services to pick up code changes: - ```bash - docker compose restart django celeryworker celerybeat - ``` -3. Kill any stale ADC workers: - ```bash - pkill -f "ami worker" || true - ``` -4. Run the integration test: - ```bash - cd ~/Projects/AMI/ami-data-companion - conda activate ami-py311 - bash ~/Projects/AMI/antenna/scripts/psv2_integration_test.sh 20 - ``` -5. Monitor results — look for: - - Tasks fetched quickly (no 5-min delay) - - 20/20 images processed - - Job status = SUCCESS - -## ADC Setup Note - -The ADC should be on the `fix/trailing-slashes` branch (committed `eac7481`) to avoid 301 redirects on `/tasks` and `/jobs` endpoints. - -```bash -cd ~/Projects/AMI/ami-data-companion -git checkout fix/trailing-slashes -conda activate ami-py311 -pip install -e . -``` - -## Remaining TODOs (from PR #1135) - -After the test passes, these items remain for future work: - -- [ ] NATS connection pooling (singleton per process instead of per-request) -- [ ] uvicorn `--workers 4` for dev config -- [ ] JetStream operation timeouts on `_ensure_stream()` / `_ensure_consumer()` -- [ ] `dispatch_mode` set at job creation instead of `run()` -- [ ] Multi-pipeline `/tasks` endpoint (workers request tasks for any pipeline they support) -- [ ] ADC pipeline params from `/info` endpoint -- [ ] GitHub #1122 — processing service online status -- [ ] GitHub #1112 — worker tracking in logs - -## Key Files - -- `ami/ml/orchestration/nats_queue.py` — `TaskQueueManager.reserve_tasks()` (batch fetch) -- `ami/jobs/views.py:217-255` — `/tasks` endpoint -- `ami/ml/orchestration/tests/test_nats_queue.py` — 9 unit tests (all passing) -- `scripts/psv2_integration_test.sh` — integration test script -- `docs/claude/sessions/2026-02-16-psv2-integration-test.md` — previous session findings -- `docs/claude/planning/nats-flooding-prevention.md` — full findings doc diff --git a/docs/claude/sessions/2026-02-16-psv2-integration-test.md b/docs/claude/sessions/2026-02-16-psv2-integration-test.md deleted file mode 100644 index 79f7da978..000000000 --- a/docs/claude/sessions/2026-02-16-psv2-integration-test.md +++ /dev/null @@ -1,143 +0,0 @@ -# PSv2 Integration Test Session - 2026-02-16 - -## Summary - -Attempted end-to-end PSv2 integration test on main. Discovered several blocking issues preventing completion. - -## Test Setup - -- Antenna: main branch, all services via docker compose -- ADC: main branch, conda env ami-py311, freshly installed -- Script: `scripts/psv2_integration_test.sh 20` - -## What Worked - -- Pipeline registration: OK (10 pipelines, all models loaded) -- Collection creation & population: OK -- Job creation with `start_now=true`: OK -- Image queuing to NATS: OK (20 images published) -- ADC worker task fetching from NATS: OK (tasks retrieved) - -## What Failed - -### 1. ADC worker reports "Done, detections: 0" — stale worker stole tasks -- Root cause: the stale ADC worker from test run #1 was never killed -- It found job 1377 and consumed all 20 NATS messages before the new worker -- Antenna logs show 147 `/tasks?batch=64` requests, ~35 distinct source ports -- The new worker (AMI_NUM_WORKERS=0) found an empty NATS queue → "No more tasks" -- The 39-second delay was caused by queuing behind stale worker requests on single uvicorn thread -- **Not an ADC code bug** — the stale worker race condition caused 0 results -- **Next test must kill ALL ADC worker processes before starting** -- **TODO (test script)**: Add `pkill -f "ami worker"` cleanup step before starting worker - -### 2. Django overloaded by concurrent /tasks requests -- uvicorn runs with 1 worker in dev mode (no `--workers` flag) -- ADC spawns 16 DataLoader subprocesses by default, all hit `/tasks` simultaneously -- Each request blocks on NATS connection + operation (up to 5s with our fix) -- 16 × 5s = 80s serialized wait → all requests timeout -- **Fix applied**: Set `AMI_NUM_WORKERS=0` in test script (1 subprocess) -- **TODO**: Add `--workers 4` to uvicorn dev config, or connection pooling - -### 3. Stale NATS connections block Django event loop -- `nats.connect()` defaulted to `allow_reconnect=True` with long timeouts -- Leaked connections from interrupted requests spawned reconnection loops -- These loops consumed the shared async event loop, blocking ALL requests -- **Fix applied**: `connect_timeout=5, allow_reconnect=False` in `get_connection()` - -### 4. Stale jobs not filtered by incomplete_only -- `IncompleteJobFilter` only checked progress JSON stages, not top-level `status` -- Jobs manually set to FAILURE without progress update slipped through -- ADC worker picked up stale jobs and hammered `/tasks` endlessly -- **Fix applied**: Also exclude `status__in=JobState.final_states()` - -### 5. /tasks endpoint doesn't guard against terminal jobs -- A FAILURE/SUCCESS/REVOKED job still tried to fetch from NATS -- **Fix applied**: Return empty `{"tasks": []}` for terminal status jobs - -### 6. RabbitMQ stale connections -- After days of uptime, Django's AMQP connection to RabbitMQ goes stale -- `ConnectionResetError: [Errno 104]` when enqueuing Celery tasks -- **Fix**: Restart Django/Celery or full `docker compose down && up` - -## Applied Code Changes - -### `ami/ml/orchestration/nats_queue.py:26-32` -```python -async def get_connection(nats_url: str): - nc = await nats.connect( - nats_url, - connect_timeout=5, - allow_reconnect=False, - max_reconnect_attempts=0, - ) -``` - -### `ami/jobs/views.py:59` -```python -# IncompleteJobFilter - also exclude by top-level status -queryset = queryset.exclude(status__in=JobState.final_states()) -``` - -### `ami/jobs/views.py:237-238` -```python -# /tasks endpoint - guard against terminal jobs -if job.status in JobState.final_states(): - return Response({"tasks": []}) -``` - -### `scripts/psv2_integration_test.sh:135` -```bash -AMI_NUM_WORKERS=0 ami worker --pipeline quebec_vermont_moths_2023 2>&1 -``` - -## Remaining TODOs - -Full list in `docs/claude/planning/nats-flooding-prevention.md`. - -Key items: -1. **Re-run integration test** — kill all stale ADC workers first, then test with clean state -2. **Test script: kill stale workers** — add `pkill -f "ami worker"` before starting -3. **NATS connection pooling** — PR #1130 attempted this but had issues -4. **uvicorn workers** — add `--workers 4` to dev config -5. **JetStream operation timeouts** — `_ensure_stream()`, `_ensure_consumer()` have no timeouts -6. **ADC trailing slashes** — causes 301 redirects on every request -7. **`dispatch_mode` on job init** — should be set at creation, not run() -8. **Multi-pipeline /tasks endpoint** — let workers request tasks for multiple pipelines -9. **GitHub #1122** — processing service online status -10. **GitHub #1112** — worker tracking in logs - -## Test Run #3 — SUCCESS (22:51 - 23:03) - -After restarting Django/Celery (stale AMQP connection), the test passed end-to-end: -- Job 1380: 20/20 images processed, status=SUCCESS -- Total time: ~11 minutes (including ~5 min NATS ack_wait delay) -- Detection + classification + result posting all worked - -**Performance issue found:** Both GPU processes race for tasks from the same NATS consumer. One gets all 20 messages, the other gets nothing. The unacked messages wait for `ack_wait=300s` to expire before redelivery. This added ~5 minutes of idle time. - -**Fix needed:** Either: -1. Reduce `ack_wait` from 300s to something smaller (30-60s) for dev -2. Ensure only one GPU process fetches tasks per batch (ADC-side coordination) -3. Use NATS NAK to immediately release unfetchable tasks - -**ADC trailing slashes:** Fixed on `fix/trailing-slashes` branch in ami-data-companion. - -## Next Session Prompt - -``` -PSv2 integration test PASSED on main (job 1380, 20 images, SUCCESS). - -Committed fixes on branch fix/nats-connection-safety (PR #1135): -- NATS connection safety (connect_timeout, no reconnect) -- Stale job filtering + /tasks terminal status guard -- Test script stale worker cleanup - -Key remaining issue: NATS ack_wait=300s causes ~5min idle time when -GPU processes race for tasks. Consider reducing ack_wait or adding -NAK for unfetchable tasks. - -ADC trailing slashes fixed on fix/trailing-slashes branch (ami-data-companion). - -Full findings: docs/claude/planning/nats-flooding-prevention.md -Session notes: docs/claude/sessions/2026-02-16-psv2-integration-test.md -``` diff --git a/scripts/psv2_integration_test.sh b/scripts/psv2_integration_test.sh deleted file mode 100755 index e622db999..000000000 --- a/scripts/psv2_integration_test.sh +++ /dev/null @@ -1,292 +0,0 @@ -#!/usr/bin/env bash -# -# PSv2 Integration Test -# -# Creates a collection + async job on the local Antenna stack, -# starts the ADC worker, streams logs from both sides, and -# monitors for errors. -# -# Usage: ./scripts/psv2_integration_test.sh [NUM_IMAGES] -# -# Requirements: -# - Antenna stack running (docker compose up -d) -# - conda env ami-py311 with ami-data-companion installed -# - .env in ami-data-companion with AMI_ANTENNA_API_AUTH_TOKEN set -# -set -euo pipefail - -# --- Config --- -NUM_IMAGES="${1:-20}" -PROJECT_ID=18 -PIPELINE_ID=3 # Quebec & Vermont moths -API_BASE="http://localhost:8000/api/v2" -TOKEN="644a0e28e2c09bda87c9ab3f6a002516f3dfb7ff" -ADC_DIR="/home/michael/Projects/AMI/ami-data-companion" -CONDA_BASE="$HOME/miniforge3" -LOG_DIR="/tmp/psv2-integration-test-$(date +%Y%m%d-%H%M%S)" - -mkdir -p "$LOG_DIR" - -ANTENNA_LOG="$LOG_DIR/antenna.log" -WORKER_LOG="$LOG_DIR/worker.log" -SUMMARY_LOG="$LOG_DIR/summary.log" - -# Cleanup function -cleanup() { - echo "" - echo "=== Cleaning up ===" - # Kill background processes - if [[ -n "${WORKER_PID:-}" ]] && kill -0 "$WORKER_PID" 2>/dev/null; then - echo "Stopping ADC worker (PID $WORKER_PID)..." - kill "$WORKER_PID" 2>/dev/null || true - wait "$WORKER_PID" 2>/dev/null || true - fi - if [[ -n "${ANTENNA_LOG_PID:-}" ]] && kill -0 "$ANTENNA_LOG_PID" 2>/dev/null; then - kill "$ANTENNA_LOG_PID" 2>/dev/null || true - fi - echo "Logs saved to: $LOG_DIR" - echo " - Antenna logs: $ANTENNA_LOG" - echo " - Worker logs: $WORKER_LOG" - echo " - Register logs: $LOG_DIR/register.log" - echo " - Summary: $SUMMARY_LOG" -} -trap cleanup EXIT - -auth_header="Authorization: Token $TOKEN" - -api_get() { - curl -sf -H "$auth_header" "$API_BASE$1" -} - -api_post() { - curl -sf -H "$auth_header" -H "Content-Type: application/json" -X POST "$API_BASE$1" -d "$2" -} - -api_post_empty() { - curl -sf -H "$auth_header" -X POST "$API_BASE$1" -} - -log() { - local msg="[$(date '+%H:%M:%S')] $1" - echo "$msg" - echo "$msg" >> "$SUMMARY_LOG" -} - -# --- Preflight checks --- -log "PSv2 Integration Test - $NUM_IMAGES images, project $PROJECT_ID, pipeline $PIPELINE_ID" -log "Log directory: $LOG_DIR" -echo "" - -echo "Checking Antenna API..." -if ! curl -sf -o /dev/null "$API_BASE/"; then - echo "ERROR: Antenna API not reachable at $API_BASE" - echo "Run: docker compose up -d" - exit 1 -fi -echo " Antenna API: OK" - -echo "Checking Docker services..." -DJANGO_STATUS=$(docker compose ps --format json django 2>/dev/null | jq -r '.State // .status' 2>/dev/null || echo "unknown") -CELERY_STATUS=$(docker compose ps --format json celeryworker 2>/dev/null | jq -r '.State // .status' 2>/dev/null || echo "unknown") -NATS_STATUS=$(docker compose ps --format json nats 2>/dev/null | jq -r '.State // .status' 2>/dev/null || echo "unknown") -echo " django: $DJANGO_STATUS" -echo " celeryworker: $CELERY_STATUS" -echo " nats: $NATS_STATUS" - -if [[ "$NATS_STATUS" != "running" ]]; then - echo "WARNING: NATS does not appear to be running. async_api dispatch requires NATS." -fi -echo "" - -# --- Step 1: Stream Antenna logs --- -log "Starting Antenna log stream..." -docker compose logs -f django celeryworker nats --since 0s > "$ANTENNA_LOG" 2>&1 & -ANTENNA_LOG_PID=$! - -# --- Step 2: Register pipelines with Antenna --- -log "Registering pipelines with Antenna for project $PROJECT_ID..." -REGISTER_LOG="$LOG_DIR/register.log" -( - source "$CONDA_BASE/etc/profile.d/conda.sh" - conda activate ami-py311 - cd "$ADC_DIR" - ami worker register "PSv2 integration test" --project "$PROJECT_ID" 2>&1 -) > "$REGISTER_LOG" 2>&1 -REGISTER_EXIT=$? - -if [[ "$REGISTER_EXIT" -eq 0 ]]; then - log "Pipeline registration: OK" -else - log "WARNING: Pipeline registration failed (exit $REGISTER_EXIT)" - tail -10 "$REGISTER_LOG" | tee -a "$SUMMARY_LOG" -fi -cat "$REGISTER_LOG" >> "$SUMMARY_LOG" - -# Verify registration via API -PIPELINES_AFTER=$(api_get "/ml/pipelines/?projects=$PROJECT_ID" | jq '.count') -log "Pipelines available for project $PROJECT_ID: $PIPELINES_AFTER" - -# --- Step 3: Start ADC worker --- -# Kill any stale workers from previous test runs to avoid task competition -if pgrep -f "ami worker" > /dev/null 2>&1; then - log "Killing stale ADC worker processes..." - pkill -f "ami worker" 2>/dev/null || true - sleep 2 -fi -log "Starting ADC worker..." -( - source "$CONDA_BASE/etc/profile.d/conda.sh" - conda activate ami-py311 - cd "$ADC_DIR" - AMI_NUM_WORKERS=0 ami worker --pipeline quebec_vermont_moths_2023 2>&1 -) > "$WORKER_LOG" 2>&1 & -WORKER_PID=$! -sleep 3 - -if ! kill -0 "$WORKER_PID" 2>/dev/null; then - log "ERROR: ADC worker failed to start. Check $WORKER_LOG" - tail -20 "$WORKER_LOG" - exit 1 -fi -log "ADC worker started (PID $WORKER_PID)" - -# --- Step 4: Create collection --- -log "Creating collection with $NUM_IMAGES random images..." -COLLECTION_RESP=$(api_post "/captures/collections/" "{ - \"name\": \"PSv2 integration test $(date '+%H:%M:%S')\", - \"project\": $PROJECT_ID, - \"method\": \"random\", - \"kwargs\": {\"size\": $NUM_IMAGES} -}") -COLLECTION_ID=$(echo "$COLLECTION_RESP" | jq -r '.id') -log "Created collection $COLLECTION_ID" - -# --- Step 5: Populate collection --- -log "Populating collection $COLLECTION_ID..." -POPULATE_RESP=$(api_post_empty "/captures/collections/$COLLECTION_ID/populate/") -POPULATE_JOB_ID=$(echo "$POPULATE_RESP" | jq -r '.job_id // .id // empty') -log "Populate job: $POPULATE_JOB_ID" - -# Wait for population to finish -for i in $(seq 1 30); do - sleep 2 - COLL_INFO=$(api_get "/captures/collections/$COLLECTION_ID/") - IMG_COUNT=$(echo "$COLL_INFO" | jq -r '.source_images_count') - if [[ "$IMG_COUNT" -gt 0 ]]; then - log "Collection populated: $IMG_COUNT images" - break - fi - if [[ $i -eq 30 ]]; then - log "ERROR: Collection not populated after 60s" - exit 1 - fi -done - -# --- Step 6: Create and start the ML job --- -log "Creating async ML job..." -JOB_RESP=$(api_post "/jobs/?start_now=true" "{ - \"name\": \"PSv2 integration test $(date '+%H:%M:%S')\", - \"project_id\": $PROJECT_ID, - \"pipeline_id\": $PIPELINE_ID, - \"source_image_collection_id\": $COLLECTION_ID, - \"delay\": 0, - \"shuffle\": true -}") -JOB_ID=$(echo "$JOB_RESP" | jq -r '.id') -JOB_STATUS=$(echo "$JOB_RESP" | jq -r '.status') -log "Created job $JOB_ID (status: $JOB_STATUS)" - -# --- Step 7: Poll job progress --- -log "Polling job progress..." -PREV_STATUS="" -PREV_PROGRESS="" -POLL_INTERVAL=3 -MAX_POLLS=200 # ~10 minutes - -for i in $(seq 1 $MAX_POLLS); do - sleep "$POLL_INTERVAL" - - JOB_INFO=$(api_get "/jobs/$JOB_ID/") - STATUS=$(echo "$JOB_INFO" | jq -r '.status') - DISPATCH=$(echo "$JOB_INFO" | jq -r '.dispatch_mode') - PROGRESS_PCT=$(echo "$JOB_INFO" | jq -r '.progress.summary.progress // 0') - STAGE_INFO=$(echo "$JOB_INFO" | jq -r ' - [.progress.stages[]? | - (.params // [] | map({(.key): .value}) | add // {}) as $p | - "\(.name): \(.progress // 0 | . * 100 | round)% (processed=\($p.processed // "?"), remaining=\($p.remaining // "?"), failed=\($p.failed // "?"))"] - | join(" | ")') - - # Only print when something changes - if [[ "$STATUS" != "$PREV_STATUS" || "$PROGRESS_PCT" != "$PREV_PROGRESS" ]]; then - log " [$i] status=$STATUS dispatch=$DISPATCH progress=$PROGRESS_PCT $STAGE_INFO" - PREV_STATUS="$STATUS" - PREV_PROGRESS="$PROGRESS_PCT" - fi - - if [[ "$STATUS" == "SUCCESS" || "$STATUS" == "FAILURE" || "$STATUS" == "REVOKED" ]]; then - break - fi - - # Check worker is still alive - if ! kill -0 "$WORKER_PID" 2>/dev/null; then - log "WARNING: ADC worker died during job processing!" - log "Last 20 lines of worker log:" - tail -20 "$WORKER_LOG" | tee -a "$SUMMARY_LOG" - fi -done - -# --- Step 8: Final job state --- -echo "" -JOB_FINAL=$(api_get "/jobs/$JOB_ID/") -FINAL_STATUS=$(echo "$JOB_FINAL" | jq -r '.status') -FINAL_DISPATCH=$(echo "$JOB_FINAL" | jq -r '.dispatch_mode') -STARTED_AT=$(echo "$JOB_FINAL" | jq -r '.started_at // "N/A"') -FINISHED_AT=$(echo "$JOB_FINAL" | jq -r '.finished_at // "N/A"') - -log "=== Job $JOB_ID Final State ===" -log " Status: $FINAL_STATUS" -log " Dispatch: $FINAL_DISPATCH" -log " Started: $STARTED_AT" -log " Finished: $FINISHED_AT" - -# Print stage details -echo "$JOB_FINAL" | jq -r '.progress.stages[]? | (.params // [] | map({(.key): .value}) | add // {}) as $p | " Stage \(.name): progress=\(.progress // 0 | . * 100 | round)% processed=\($p.processed // "?") remaining=\($p.remaining // "?") failed=\($p.failed // "?")"' | while read -r line; do - log "$line" -done - -# --- Step 9: Scan logs for errors --- -echo "" -log "=== Log Analysis ===" - -ANTENNA_ERRORS=$(grep -ciE 'ERROR|Traceback|CRITICAL' "$ANTENNA_LOG" 2>/dev/null || echo 0) -ANTENNA_WARNINGS=$(grep -ciE 'WARNING' "$ANTENNA_LOG" 2>/dev/null || echo 0) -WORKER_ERRORS=$(grep -ciE 'ERROR|Traceback|CRITICAL' "$WORKER_LOG" 2>/dev/null || echo 0) -WORKER_WARNINGS=$(grep -ciE 'WARNING' "$WORKER_LOG" 2>/dev/null || echo 0) - -log "Antenna logs: $ANTENNA_ERRORS errors, $ANTENNA_WARNINGS warnings" -log "Worker logs: $WORKER_ERRORS errors, $WORKER_WARNINGS warnings" - -if [[ "$ANTENNA_ERRORS" -gt 0 ]]; then - echo "" - log "--- Antenna Errors ---" - grep -iE 'ERROR|Traceback|CRITICAL' "$ANTENNA_LOG" | grep -v "NoSuchKey\|image dimensions" | head -30 | tee -a "$SUMMARY_LOG" -fi - -if [[ "$WORKER_ERRORS" -gt 0 ]]; then - echo "" - log "--- Worker Errors ---" - grep -iE 'ERROR|Traceback|CRITICAL' "$WORKER_LOG" | head -30 | tee -a "$SUMMARY_LOG" -fi - -# --- Step 10: Final verdict --- -echo "" -if [[ "$FINAL_STATUS" == "SUCCESS" && "$WORKER_ERRORS" -eq 0 ]]; then - log "RESULT: PASS - Job completed successfully with no worker errors" - exit 0 -elif [[ "$FINAL_STATUS" == "SUCCESS" ]]; then - log "RESULT: WARN - Job completed but worker had $WORKER_ERRORS errors (check logs)" - exit 0 -else - log "RESULT: FAIL - Job status: $FINAL_STATUS" - exit 1 -fi From a8f7b35d3ac9d12444d4b976ca0dc631b3adc3c9 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 17:15:54 -0800 Subject: [PATCH 08/12] feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude --- ami/jobs/models.py | 15 ++++++++++----- ami/jobs/tests.py | 39 +++++++++++++++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index b4df41a04..be797dd4f 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -461,9 +461,7 @@ def run(cls, job: "Job"): # End image collection stage job.save() - if job.project.feature_flags.async_pipeline_workers: - job.dispatch_mode = JobDispatchMode.ASYNC_API - job.save(update_fields=["dispatch_mode"]) + if job.dispatch_mode == JobDispatchMode.ASYNC_API: queued = queue_images_to_nats(job, images) if not queued: job.logger.error("Aborting job %s because images could not be queued to NATS", job.pk) @@ -473,8 +471,6 @@ def run(cls, job: "Job"): job.save() return else: - job.dispatch_mode = JobDispatchMode.SYNC_API - job.save(update_fields=["dispatch_mode"]) cls.process_images(job, images) @classmethod @@ -919,6 +915,15 @@ def setup(self, save=True): self.progress.add_stage_param(delay_stage.key, "Mood", "😴") if self.pipeline: + # Set dispatch mode based on project feature flags at creation time + # so the UI can show the correct mode before the job runs. + # Only override if still at the default (INTERNAL), to allow explicit overrides. + if self.dispatch_mode == JobDispatchMode.INTERNAL: + if self.project and self.project.feature_flags.async_pipeline_workers: + self.dispatch_mode = JobDispatchMode.ASYNC_API + else: + self.dispatch_mode = JobDispatchMode.SYNC_API + collect_stage = self.progress.add_stage("Collect") self.progress.add_stage_param(collect_stage.key, "Total Images", "") diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index e94b75b9d..033a08b5c 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -601,13 +601,11 @@ def test_dispatch_mode_filtering(self): dispatch_mode=JobDispatchMode.ASYNC_API, ) - # Create a job with default dispatch_mode (should be "internal") + # Create a non-ML job without a pipeline (dispatch_mode stays "internal") internal_job = Job.objects.create( - job_type_key=MLJob.key, + job_type_key="data_storage_sync", project=self.project, name="Internal Job", - pipeline=self.pipeline, - source_image_collection=self.source_image_collection, ) self.client.force_authenticate(user=self.user) @@ -644,6 +642,39 @@ def test_dispatch_mode_filtering(self): expected_ids = {sync_job.pk, async_job.pk, internal_job.pk} self.assertEqual(returned_ids, expected_ids) + def test_ml_job_dispatch_mode_set_on_creation(self): + """Test that ML jobs get dispatch_mode set based on project feature flags at creation time.""" + # Without async flag, ML job should default to sync_api + sync_job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Auto Sync Job", + pipeline=self.pipeline, + source_image_collection=self.source_image_collection, + ) + self.assertEqual(sync_job.dispatch_mode, JobDispatchMode.SYNC_API) + + # Enable async flag on project + self.project.feature_flags.async_pipeline_workers = True + self.project.save() + + async_job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Auto Async Job", + pipeline=self.pipeline, + source_image_collection=self.source_image_collection, + ) + self.assertEqual(async_job.dispatch_mode, JobDispatchMode.ASYNC_API) + + # Non-pipeline job should stay internal regardless of feature flag + internal_job = Job.objects.create( + job_type_key="data_storage_sync", + project=self.project, + name="Internal Job", + ) + self.assertEqual(internal_job.dispatch_mode, JobDispatchMode.INTERNAL) + def test_tasks_endpoint_rejects_non_async_jobs(self): """Test that /tasks endpoint returns 400 for non-async_api jobs.""" from ami.base.serializers import reverse_with_params From 37c02101c49f04bec74f58262616d2bb2ffb240b Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 17:24:44 -0800 Subject: [PATCH 09/12] fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude --- ami/jobs/views.py | 1 - ami/ml/orchestration/nats_queue.py | 71 +++++++++++++++++++++--------- 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index a1c8ee09e..1f3055560 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -250,7 +250,6 @@ async def get_tasks(): async with TaskQueueManager() as manager: return [task.dict() for task in await manager.reserve_tasks(job.pk, count=batch)] - # Use async_to_sync to properly handle the async call tasks = async_to_sync(get_tasks)() return Response({"tasks": tasks}) diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index 96852bf5c..73c6d7cb4 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -10,6 +10,7 @@ support the visibility timeout semantics we want or a disconnected mode of pulling and ACKing tasks. """ +import asyncio import json import logging @@ -22,13 +23,22 @@ logger = logging.getLogger(__name__) +# Timeout for individual JetStream metadata operations (create/check stream and consumer). +# These are lightweight NATS server operations that complete in milliseconds under normal +# conditions. stream_info() and add_stream() don't accept a native timeout parameter, so +# we use asyncio.wait_for() uniformly for all operations. Without these timeouts, a hung +# NATS connection blocks the caller's thread indefinitely — and when that caller is a +# Django worker (via async_to_sync), it makes the entire server unresponsive. +NATS_JETSTREAM_TIMEOUT = 10 # seconds + async def get_connection(nats_url: str): nc = await nats.connect( nats_url, connect_timeout=5, - allow_reconnect=False, - max_reconnect_attempts=0, + allow_reconnect=True, + max_reconnect_attempts=2, + reconnect_time_wait=1, ) js = nc.jetstream() return nc, js @@ -88,15 +98,20 @@ async def _ensure_stream(self, job_id: int): subject = self._get_subject(job_id) try: - await self.js.stream_info(stream_name) + await asyncio.wait_for(self.js.stream_info(stream_name), timeout=NATS_JETSTREAM_TIMEOUT) logger.debug(f"Stream {stream_name} already exists") + except asyncio.TimeoutError: + raise # NATS unreachable — let caller handle it rather than creating a stream blindly except Exception as e: logger.warning(f"Stream {stream_name} does not exist: {e}") # Stream doesn't exist, create it - await self.js.add_stream( - name=stream_name, - subjects=[subject], - max_age=86400, # 24 hours retention + await asyncio.wait_for( + self.js.add_stream( + name=stream_name, + subjects=[subject], + max_age=86400, # 24 hours retention + ), + timeout=NATS_JETSTREAM_TIMEOUT, ) logger.info(f"Created stream {stream_name}") @@ -110,21 +125,29 @@ async def _ensure_consumer(self, job_id: int): subject = self._get_subject(job_id) try: - info = await self.js.consumer_info(stream_name, consumer_name) + info = await asyncio.wait_for( + self.js.consumer_info(stream_name, consumer_name), + timeout=NATS_JETSTREAM_TIMEOUT, + ) logger.debug(f"Consumer {consumer_name} already exists: {info}") + except asyncio.TimeoutError: + raise # NATS unreachable — let caller handle it except Exception: # Consumer doesn't exist, create it - await self.js.add_consumer( - stream=stream_name, - config=ConsumerConfig( - durable_name=consumer_name, - ack_policy=AckPolicy.EXPLICIT, - ack_wait=TASK_TTR, # Visibility timeout (TTR) - max_deliver=5, # Max retry attempts - deliver_policy=DeliverPolicy.ALL, - max_ack_pending=100, # Max unacked messages - filter_subject=subject, + await asyncio.wait_for( + self.js.add_consumer( + stream=stream_name, + config=ConsumerConfig( + durable_name=consumer_name, + ack_policy=AckPolicy.EXPLICIT, + ack_wait=TASK_TTR, # Visibility timeout (TTR) + max_deliver=5, # Max retry attempts + deliver_policy=DeliverPolicy.ALL, + max_ack_pending=100, # Max unacked messages + filter_subject=subject, + ), ), + timeout=NATS_JETSTREAM_TIMEOUT, ) logger.info(f"Created consumer {consumer_name}") @@ -152,7 +175,7 @@ async def publish_task(self, job_id: int, data: PipelineProcessingTask) -> bool: task_data = json.dumps(data.dict()) # Publish to JetStream - ack = await self.js.publish(subject, task_data.encode()) + ack = await self.js.publish(subject, task_data.encode(), timeout=NATS_JETSTREAM_TIMEOUT) logger.info(f"Published task to stream for job '{job_id}', sequence {ack.seq}") return True @@ -246,7 +269,10 @@ async def delete_consumer(self, job_id: int) -> bool: stream_name = self._get_stream_name(job_id) consumer_name = self._get_consumer_name(job_id) - await self.js.delete_consumer(stream_name, consumer_name) + await asyncio.wait_for( + self.js.delete_consumer(stream_name, consumer_name), + timeout=NATS_JETSTREAM_TIMEOUT, + ) logger.info(f"Deleted consumer {consumer_name} for job '{job_id}'") return True except Exception as e: @@ -269,7 +295,10 @@ async def delete_stream(self, job_id: int) -> bool: try: stream_name = self._get_stream_name(job_id) - await self.js.delete_stream(stream_name) + await asyncio.wait_for( + self.js.delete_stream(stream_name), + timeout=NATS_JETSTREAM_TIMEOUT, + ) logger.info(f"Deleted stream {stream_name} for job '{job_id}'") return True except Exception as e: From 1d43c8fa2178c0ddcb116c8b2b474969e2fb7703 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 18:01:10 -0800 Subject: [PATCH 10/12] fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude --- ami/jobs/views.py | 7 ++++++- ami/ml/orchestration/nats_queue.py | 8 ++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 1f3055560..6d2028042 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -1,3 +1,4 @@ +import asyncio import logging import pydantic @@ -250,7 +251,11 @@ async def get_tasks(): async with TaskQueueManager() as manager: return [task.dict() for task in await manager.reserve_tasks(job.pk, count=batch)] - tasks = async_to_sync(get_tasks)() + try: + tasks = async_to_sync(get_tasks)() + except (asyncio.TimeoutError, OSError) as e: + logger.warning("NATS unavailable while fetching tasks for job %s: %s", job.pk, e) + return Response({"error": "Task queue temporarily unavailable"}, status=503) return Response({"tasks": tasks}) diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index 73c6d7cb4..e063cb4fc 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -32,13 +32,11 @@ NATS_JETSTREAM_TIMEOUT = 10 # seconds -async def get_connection(nats_url: str): +async def get_connection(nats_url: str) -> tuple[nats.NATS, JetStreamContext]: nc = await nats.connect( nats_url, connect_timeout=5, - allow_reconnect=True, - max_reconnect_attempts=2, - reconnect_time_wait=1, + allow_reconnect=False, ) js = nc.jetstream() return nc, js @@ -227,6 +225,8 @@ async def reserve_tasks(self, job_id: int, count: int, timeout: float = 5) -> li logger.info(f"Reserved {len(tasks)} tasks from stream for job '{job_id}'") return tasks + except asyncio.TimeoutError: + raise # NATS unreachable — propagate so the view can return an appropriate error except Exception as e: logger.error(f"Failed to reserve tasks from stream for job '{job_id}': {e}") return [] From 02081197aaa653735f0517d7365411bffc246455 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 18:02:35 -0800 Subject: [PATCH 11/12] fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude --- ami/jobs/views.py | 2 +- ami/ml/orchestration/nats_queue.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 6d2028042..01d9537ef 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -249,7 +249,7 @@ def tasks(self, request, pk=None): async def get_tasks(): async with TaskQueueManager() as manager: - return [task.dict() for task in await manager.reserve_tasks(job.pk, count=batch)] + return [task.dict() for task in await manager.reserve_tasks(job.pk, count=batch, timeout=0.5)] try: tasks = async_to_sync(get_tasks)() diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index e063cb4fc..65b6f6f72 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -51,8 +51,8 @@ class TaskQueueManager: Use as an async context manager: async with TaskQueueManager() as manager: - await manager.publish_task('job123', {'data': 'value'}) - tasks = await manager.reserve_tasks('job123', count=64) + await manager.publish_task(123, {'data': 'value'}) + tasks = await manager.reserve_tasks(123, count=64) await manager.acknowledge_task(tasks[0].reply_subject) """ @@ -222,7 +222,10 @@ async def reserve_tasks(self, job_id: int, count: int, timeout: float = 5) -> li task.reply_subject = msg.reply tasks.append(task) - logger.info(f"Reserved {len(tasks)} tasks from stream for job '{job_id}'") + if tasks: + logger.info(f"Reserved {len(tasks)} tasks from stream for job '{job_id}'") + else: + logger.debug(f"No tasks reserved from stream for job '{job_id}'") return tasks except asyncio.TimeoutError: From 6eb2854e50265e63a06fb868d115efaacb1b4af7 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 18:11:28 -0800 Subject: [PATCH 12/12] fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude --- ami/jobs/views.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 01d9537ef..ddc1e57a7 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -1,6 +1,7 @@ import asyncio import logging +import nats.errors import pydantic from asgiref.sync import async_to_sync from django.db.models import Q @@ -253,7 +254,7 @@ async def get_tasks(): try: tasks = async_to_sync(get_tasks)() - except (asyncio.TimeoutError, OSError) as e: + except (asyncio.TimeoutError, OSError, nats.errors.Error) as e: logger.warning("NATS unavailable while fetching tasks for job %s: %s", job.pk, e) return Response({"error": "Task queue temporarily unavailable"}, status=503)