fix: handle null occurrence determination in UI and backend#1185
fix: handle null occurrence determination in UI and backend#1185mihow wants to merge 48 commits intodemo/integrationfrom
Conversation
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
…1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after #1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <noreply@anthropic.com> * Update ami/jobs/serializers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…1122) Rename fields to better reflect the semantic difference between sync and async processing service status tracking: - last_checked → last_seen - last_checked_live → last_seen_live - last_checked_latency → last_seen_latency For sync services with endpoint URLs, fields are updated by the periodic status checker. For async/pull-mode services, a new mark_seen() method is called when the service registers pipelines, recording that we heard from it. Also updates all references in serializers, pipeline queryset, views, frontend models, columns, dialog, and language strings. Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Add structured queryset methods and a heartbeat mechanism so async
(pull-mode) processing services stay in sync with their actual liveness.
ProcessingService:
- New ProcessingServiceQuerySet with async_services() / sync_services()
methods — single canonical filter for endpoint_url null-or-empty, used
everywhere instead of ad-hoc Q expressions
- is_async property (derived from endpoint_url, no DB column)
- Docstrings reference Job.dispatch_mode ASYNC_API / SYNC_API for context
Liveness tracking:
- PROCESSING_SERVICE_LAST_SEEN_MAX = 60s constant (12× the worker's 5s
poll interval) — async services are considered offline after this
- check_processing_services_online task now handles both modes:
sync → active /readyz poll; async → bulk mark stale via async_services()
- _mark_pipeline_pull_services_seen() helper in jobs/views.py: single bulk
UPDATE via job.pipeline.processing_services.async_services(), called at
the top of both /jobs/{id}/tasks/ and /jobs/{id}/result/ so every worker
poll cycle refreshes last_seen without needing a separate registration
Async job cleanup (from carlosg/redisatomic):
- Rename _cleanup_job_if_needed → cleanup_async_job_if_needed and export
it so Job.cancel() can call it directly without a local import
- JobLogHandler: refresh_from_db before appending to avoid last-writer-
wins race across concurrent worker processes
- Job.logger: update existing handler's job reference instead of always
adding a new handler (process-level singleton leak fix)
Co-Authored-By: Claude <noreply@anthropic.com>
- PROCESSING_SERVICE_LAST_SEEN_MAX = 60s constant (12x the worker's 5s poll interval) used by check_processing_services_online to expire stale async service heartbeats - get_status() pull-mode branch: derives server_live from staleness check, populates pipelines_online from registered pipelines, uses `not self.endpoint_url` to also handle empty-string endpoints - endpointUrl getter: returns undefined instead of stringified "null" so async services show a blank cell in the endpoint column Co-Authored-By: Claude <noreply@anthropic.com>
- Fix ImportError: import PROCESSING_SERVICE_LAST_SEEN_MAX directly from ami.ml.models.processing_service (not re-exported from ami.ml.models) - Fix null last_seen causing epoch timestamp in processingServicesOnlineLastSeen getter — filter out null values before Math.max - Fix "Last seen undefined" rendered in status column when lastSeen is undefined Co-Authored-By: Claude <noreply@anthropic.com>
Move the async service stale-check to the top of check_processing_services_online so it always runs, even if a slow sync service check hits the time limit. Reduce the per-request timeout for the beat task from 90s (designed for cold-start waits) to 8s — if a sync service is starting up it will recover on the next cycle. Raise soft_time_limit/time_limit accordingly to give the sync loop room to complete (worst case ~30s per service with retries). Co-Authored-By: Claude <noreply@anthropic.com>
Async services now derive liveness from heartbeats rather than returning an error message. Update assertions: server_live=False (not None) when no heartbeat has been received, and remove error message checks. Co-Authored-By: Claude <noreply@anthropic.com>
Filter async services by project when marking them as seen, preventing cross-project contamination when a pipeline is shared across projects. Clarify in the docstring that this still marks all async services on the pipeline within the project, not the individual caller, until application-token auth (PR #1117) is available. Co-Authored-By: Claude <noreply@anthropic.com>
Add is_async to ProcessingServiceSerializer and ProcessingServiceNestedSerializer so the frontend can distinguish pull-mode from push-mode services. Also normalize empty endpoint_url strings to undefined in the FE model for consistency with the backend. Co-Authored-By: Claude <noreply@anthropic.com>
…le copies - Run async stale-check before the sync loop so it always executes regardless of how long sync checks take - Reduce per-request timeout for the beat task from 90s (designed for cold-start waits) to 8s — a slow or unreachable service just waits for the next 5-minute cycle - Add expires=240s so copies queued during a worker outage are discarded when the worker returns; only the most recent firing runs Co-Authored-By: Claude <noreply@anthropic.com>
- Coerce `last_seen_live` null to false in the getter to match the boolean return type (backend field is nullable). - Use `translate(STRING.FIELD_LABEL_LAST_SEEN)` in the status column instead of a hardcoded English string, matching the pattern in the details dialog. Co-Authored-By: Claude <noreply@anthropic.com>
…ogging - Fix potential UnboundLocalError in select_processing_service() when all online services have last_seen_latency=None (e.g. async/pull-mode services). Falls back to the first online service and logs that no latency data exists. - Use logger.exception() instead of logger.error() in the beat task exception handler to preserve the full traceback for debugging. - Fix comment arithmetic for worst-case timeout calculation. Co-Authored-By: Claude <noreply@anthropic.com>
Async/pull-mode services (no endpoint URL) now show a gray "Unknown" status indicator instead of the misleading ONLINE/OFFLINE based on heartbeat data that applies to all services on the pipeline. The pipeline-level indicator in the dropdown continues to work as before. Co-Authored-By: Claude <noreply@anthropic.com>
Async/pull-mode services (no endpoint URL) are now treated as "online" in the pipeline selector since their status cannot be verified via active polling. This prevents the "Process Now" dropdown from incorrectly disabling async pipelines when no recent heartbeat exists. Co-Authored-By: Claude <noreply@anthropic.com>
…hecks Use the explicit is_async field from the backend serializer rather than inferring async status from the absence of endpoint_url. Co-Authored-By: Claude <noreply@anthropic.com>
- Return empty pipelines_online list when async service is offline, avoiding contradictory payload (server_live=False + non-empty pipelines). - Add ?? false to isAsync getter to match defensive pattern used by lastSeenLive. Co-Authored-By: Claude <noreply@anthropic.com>
…s list The write-only project field was declared on the serializer but not included in Meta.fields, causing an AssertionError at runtime. Missed during rebase conflict resolution. Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
…o demo/integration
…o demo/integration
Occurrences can have a NULL determination FK when pipeline result saving crashes before completion. The frontend Occurrence model and all consuming components now guard against this with null checks, optional chaining, and fallback display text using STRING.UNKNOWN. Co-Authored-By: Claude <noreply@anthropic.com>
Add integrity check that finds occurrences with classifications but no determination set (from partial pipeline save failures) and re-runs update_occurrence_determination to fix them. Three call sites: - management command: manage.py check_data_integrity [--project N] [--job N] - post-save hook in pipeline save_results (scoped to affected occurrences) - Celery task for periodic scheduling via django_celery_beat Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Deploy Preview for antenna-ssec ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Closed in favor of #1188 |

Summary
determinationFK gracefully. Shows "Unknown" (viaSTRING.UNKNOWNtranslation) instead of crashing when determination is missing. Hides agree/ID actions for undetermined occurrences.reconcile_missing_determinations()function finds occurrences that have classifications but no determination set (from partial pipeline save failures) and re-runsupdate_occurrence_determinationto fix them. Called from:manage.py check_data_integritymanagement command (supports--project,--job,--dry-run)pipeline.save_results()(scoped to affected occurrence IDs)ami.main.tasks.check_data_integrityCelery task for periodic schedulingContext
In the demo environment, at least one occurrence had a NULL determination FK, causing the frontend to crash (
null has no attr .id). This happens when pipeline result saving crashes after creating occurrences but before the bulk_update that sets their determination.Immediate fix for demo env
docker compose exec django python manage.py check_data_integrityOr to preview first:
docker compose exec django python manage.py check_data_integrity --dry-runTest plan
manage.py check_data_integrity --dry-runto verify it finds affected occurrencesmanage.py check_data_integrityto verify it fixes them🤖 Generated with Claude Code