diff --git a/ami/jobs/management/commands/debug_jobs.py b/ami/jobs/management/commands/debug_jobs.py new file mode 100644 index 000000000..5d2ddce38 --- /dev/null +++ b/ami/jobs/management/commands/debug_jobs.py @@ -0,0 +1,537 @@ +""" +Management command for testing and troubleshooting jobs and Celery tasks. + +This command provides utilities to simulate various job/task issues for testing +the job status monitoring system. +""" + +from celery.result import AsyncResult +from django.core.management.base import BaseCommand, CommandError +from django.utils import timezone + +from ami.jobs.models import Job, JobState, MLJob +from ami.main.models import Project + + +class Command(BaseCommand): + help = """ + Utilities for testing and troubleshooting jobs and Celery tasks. + + Examples: + # Check a specific job's status + python manage.py debug_jobs --check-job 123 + + # Remove a job's task from Celery (simulate disappeared task) + python manage.py debug_jobs --remove-task 123 + + # Corrupt a job's task_id (simulate bad data) + python manage.py debug_jobs --corrupt-task 123 + + # Check all unfinished jobs (run periodic task) + python manage.py debug_jobs --check-all + + # List all unfinished jobs + python manage.py debug_jobs --list-unfinished + + # Get detailed info about a job + python manage.py debug_jobs --info 123 + + # Create and run a 5-minute test job for project 1 + python manage.py debug_jobs --create-test-job 1 + + # Create and run async (in background via Celery) + python manage.py debug_jobs --create-test-job 1 --async + """ + + def add_arguments(self, parser): + parser.add_argument( + "--check-job", + type=int, + metavar="JOB_ID", + help="Check status of a specific job by ID", + ) + + parser.add_argument( + "--remove-task", + type=int, + metavar="JOB_ID", + help="Remove a job's Celery task (simulates disappeared task)", + ) + + parser.add_argument( + "--corrupt-task", + type=int, + metavar="JOB_ID", + help="Corrupt a job's task_id (simulates bad data)", + ) + + parser.add_argument( + "--check-all", + action="store_true", + help="Check all unfinished jobs without making changes (dry-run mode)", + ) + + parser.add_argument( + "--update-all", + action="store_true", + help="Check and update all unfinished jobs (runs periodic task with save=True)", + ) + + parser.add_argument( + "--list-unfinished", + action="store_true", + help="List all unfinished jobs", + ) + + parser.add_argument( + "--info", + type=int, + metavar="JOB_ID", + help="Get detailed information about a job", + ) + + parser.add_argument( + "--create-test-job", + type=int, + metavar="PROJECT_ID", + help="Create and run a long-running test job (5+ minutes) for the specified project", + ) + + parser.add_argument( + "--async", + action="store_true", + dest="run_async", + help="Run the created test job asynchronously (default: synchronous)", + ) + + parser.add_argument( + "--no-retry", + action="store_true", + help="Disable automatic retry when checking status", + ) + + parser.add_argument( + "--force", + action="store_true", + help="Force check even if job is in final state", + ) + + def handle(self, *args, **options): + # Check which action to perform + actions_taken = 0 + + if options["check_job"]: + self.check_job_status(options["check_job"], options) + actions_taken += 1 + + if options["remove_task"]: + self.remove_task(options["remove_task"]) + actions_taken += 1 + + if options["corrupt_task"]: + self.corrupt_task(options["corrupt_task"]) + actions_taken += 1 + + if options["check_all"]: + self.check_all_jobs(dry_run=True) + actions_taken += 1 + + if options["update_all"]: + self.check_all_jobs(dry_run=False) + actions_taken += 1 + + if options["list_unfinished"]: + self.list_unfinished_jobs() + actions_taken += 1 + + if options["info"]: + self.show_job_info(options["info"]) + actions_taken += 1 + + if options["create_test_job"]: + self.create_test_job(options["create_test_job"], options) + actions_taken += 1 + + if actions_taken == 0: + self.stdout.write(self.style.WARNING("No action specified. Use --help to see available options.")) + + def get_job(self, job_id: int) -> Job: + """Get a job by ID or raise CommandError.""" + try: + return Job.objects.get(pk=job_id) + except Job.DoesNotExist: + raise CommandError(f"Job {job_id} does not exist") + + def check_job_status(self, job_id: int, options: dict): + """Check the status of a specific job.""" + job = self.get_job(job_id) + + self.stdout.write(f"\n{'='*60}") + self.stdout.write(f"Checking Job #{job_id}: {job.name}") + self.stdout.write(f"{'='*60}\n") + + self.stdout.write(f"Current Status: {self.style.WARNING(job.status)}") + self.stdout.write(f"Task ID: {job.task_id or '(none)'}") + self.stdout.write(f"Scheduled: {job.scheduled_at or '(never)'}") + self.stdout.write(f"Started: {job.started_at or '(never)'}") + self.stdout.write(f"Finished: {job.finished_at or '(never)'}") + self.stdout.write(f"Last Checked: {job.last_checked_at or '(never)'}") + + # Check Celery task if exists + if job.task_id: + self.stdout.write("\n--- Celery Task Info ---") + try: + task = AsyncResult(job.task_id) + self.stdout.write(f"Celery Status: {task.status}") + self.stdout.write(f"Task Ready: {task.ready()}") + self.stdout.write(f"Task Successful: {task.successful() if task.ready() else 'N/A'}") + if task.ready() and not task.successful(): + self.stdout.write(f"Task Error: {task.result}") + except Exception as e: + self.stdout.write(self.style.ERROR(f"Error querying Celery: {e}")) + + # Run check_status + self.stdout.write("\n--- Running check_status() ---") + try: + status_changed = job.check_status( + force=options.get("force", False), save=True, auto_retry=not options.get("no_retry", False) + ) + + job.refresh_from_db() + + if status_changed: + self.stdout.write(self.style.SUCCESS(f"✓ Status changed to: {job.status}")) + else: + self.stdout.write(self.style.WARNING(f"○ Status unchanged: {job.status}")) + + self.stdout.write(f"\nFinal State:") + self.stdout.write(f" Status: {job.status}") + self.stdout.write(f" Finished: {job.finished_at or '(still running)'}") + self.stdout.write(f" Last Checked: {job.last_checked_at}") + + except Exception as e: + self.stdout.write(self.style.ERROR(f"✗ Error checking status: {e}")) + raise + + def remove_task(self, job_id: int): + """Remove/revoke a job's Celery task to simulate disappeared task.""" + job = self.get_job(job_id) + + if not job.task_id: + raise CommandError(f"Job {job_id} has no task_id") + + self.stdout.write(f"\n{'='*60}") + self.stdout.write(f"Removing Task for Job #{job_id}: {job.name}") + self.stdout.write(f"{'='*60}\n") + + self.stdout.write(f"Current Status: {job.status}") + self.stdout.write(f"Task ID: {job.task_id}") + + try: + task = AsyncResult(job.task_id) + self.stdout.write(f"Celery Status (before): {task.status}") + + # Revoke the task + task.revoke(terminate=True) + self.stdout.write(self.style.SUCCESS("✓ Task revoked")) + + # Forget the task (removes from result backend) + task.forget() + self.stdout.write(self.style.SUCCESS("✓ Task forgotten (removed from result backend)")) + + # Check status again + task = AsyncResult(job.task_id) + self.stdout.write(f"Celery Status (after): {task.status}") + + self.stdout.write( + self.style.WARNING( + f"\n⚠ Job status in database is still '{job.status}'. " f"Run --check-job {job_id} to update it." + ) + ) + + except Exception as e: + self.stdout.write(self.style.ERROR(f"✗ Error removing task: {e}")) + raise + + def corrupt_task(self, job_id: int): + """Corrupt a job's task_id to simulate bad data.""" + job = self.get_job(job_id) + + self.stdout.write(f"\n{'='*60}") + self.stdout.write(f"Corrupting Task ID for Job #{job_id}: {job.name}") + self.stdout.write(f"{'='*60}\n") + + old_task_id = job.task_id + self.stdout.write(f"Old Task ID: {old_task_id or '(none)'}") + + # Generate a fake task_id + import uuid + + fake_task_id = f"fake-task-{uuid.uuid4()}" + + job.task_id = fake_task_id + job.save(update_fields=["task_id"], update_progress=False) + + self.stdout.write(f"New Task ID: {self.style.WARNING(fake_task_id)}") + self.stdout.write(self.style.SUCCESS(f"✓ Task ID corrupted. This will simulate a disappeared task scenario.")) + + self.stdout.write(self.style.WARNING(f"\n⚠ Run --check-job {job_id} to see how the system handles this.")) + + def check_all_jobs(self, dry_run: bool = True): + """Check all unfinished jobs, optionally without saving changes.""" + mode_label = "Dry-Run Mode (no changes will be saved)" if dry_run else "Update Mode (changes will be saved)" + + self.stdout.write(f"\n{'='*60}") + self.stdout.write(f"Checking All Unfinished Jobs - {mode_label}") + self.stdout.write(f"{'='*60}\n") + + # Get all unfinished jobs, excluding CREATED jobs that have never been scheduled + unfinished_jobs = ( + Job.objects.filter(status__in=JobState.running_states()) + .exclude(status=JobState.CREATED.value, scheduled_at__isnull=True) + .order_by("-created_at") + ) + unfinished_count = unfinished_jobs.count() + + self.stdout.write(f"Found {unfinished_count} unfinished jobs\n") + + if unfinished_count == 0: + self.stdout.write(self.style.SUCCESS("✓ No unfinished jobs to check!")) + return + + checked = 0 + updated = 0 + errors = 0 + + for job in unfinished_jobs: + try: + self.stdout.write(f"Checking Job #{job.pk}: {job.name[:50]} ({job.status})") + + # Check status without saving in dry-run mode + status_changed = job.check_status(force=False, save=not dry_run, auto_retry=not dry_run) + + checked += 1 + + if status_changed: + job.refresh_from_db() + if dry_run: + self.stdout.write( + self.style.WARNING(f" → Would change to: {job.status} (dry-run, not saved)") + ) + else: + self.stdout.write(self.style.SUCCESS(f" → Changed to: {job.status}")) + updated += 1 + else: + self.stdout.write(" → No change needed") + + except Exception as e: + self.stdout.write(self.style.ERROR(f" ✗ Error: {e}")) + errors += 1 + + # Summary + self.stdout.write("\n--- Summary ---") + self.stdout.write(f"Total Unfinished: {unfinished_count}") + self.stdout.write(f"Checked: {checked}") + if dry_run: + self.stdout.write(self.style.WARNING(f"Would Update: {updated}")) + self.stdout.write(self.style.NOTICE("\n💡 Use --update-all to actually save the changes")) + else: + self.stdout.write(self.style.SUCCESS(f"Updated: {updated}")) + + if errors > 0: + self.stdout.write(self.style.ERROR(f"Errors: {errors}")) + + def list_unfinished_jobs(self): + """List all unfinished jobs with their details.""" + self.stdout.write(f"\n{'='*60}") + self.stdout.write("Unfinished Jobs") + self.stdout.write(f"{'='*60}\n") + + # Exclude CREATED jobs that have never been scheduled (nothing to check) + unfinished_jobs = ( + Job.objects.filter(status__in=JobState.running_states()) + .exclude(status=JobState.CREATED.value, scheduled_at__isnull=True) + .order_by("-created_at") + ) + + if not unfinished_jobs.exists(): + self.stdout.write(self.style.SUCCESS("✓ No unfinished jobs found!")) + return + + self.stdout.write(f"Found {unfinished_jobs.count()} unfinished jobs:\n") + + for job in unfinished_jobs: + status_color = self.style.WARNING + if job.status == JobState.STARTED.value: + status_color = self.style.HTTP_INFO + elif job.status in [JobState.PENDING.value, JobState.CREATED.value]: + status_color = self.style.NOTICE + + time_info = "" + if job.started_at: + elapsed = timezone.now() - job.started_at + hours = elapsed.total_seconds() / 3600 + time_info = f" (running {hours:.1f}h)" + elif job.scheduled_at: + elapsed = timezone.now() - job.scheduled_at + minutes = elapsed.total_seconds() / 60 + time_info = f" (scheduled {minutes:.1f}m ago)" + + task_status = "✓" if job.task_id else "✗" + + self.stdout.write( + f" [{task_status}] Job #{job.pk:4d}: " + f"{status_color(job.status):15s} " + f"{job.name[:40]:40s} " + f"{time_info}" + ) + + self.stdout.write("\n💡 Use --check-job to check a specific job") + self.stdout.write("💡 Use --check-all to check all without saving changes") + self.stdout.write("💡 Use --update-all to check and update all unfinished jobs") + + def show_job_info(self, job_id: int): + """Show detailed information about a job.""" + job = self.get_job(job_id) + + self.stdout.write(f"\n{'='*60}") + self.stdout.write(f"Job #{job_id} Detailed Information") + self.stdout.write(f"{'='*60}\n") + + # Basic info + self.stdout.write(f"Name: {job.name}") + self.stdout.write(f"Type: {job.job_type_key}") + self.stdout.write(f"Project: {job.project.name} (#{job.project.pk})") + + # Status info + self.stdout.write(f"\n--- Status ---") + status_color = self.style.SUCCESS if job.status in JobState.final_states() else self.style.WARNING + self.stdout.write(f"Status: {status_color(job.status)}") + self.stdout.write(f"Task ID: {job.task_id or '(none)'}") + + # Timestamps + self.stdout.write(f"\n--- Timestamps ---") + self.stdout.write(f"Created: {job.created_at}") + self.stdout.write(f"Updated: {job.updated_at}") + self.stdout.write(f"Scheduled: {job.scheduled_at or '(never)'}") + self.stdout.write(f"Started: {job.started_at or '(never)'}") + self.stdout.write(f"Finished: {job.finished_at or '(never)'}") + self.stdout.write(f"Last Checked: {job.last_checked_at or '(never)'}") + + # Duration + if job.started_at: + if job.finished_at: + duration = job.finished_at - job.started_at + self.stdout.write(f"Duration: {duration}") + else: + elapsed = timezone.now() - job.started_at + self.stdout.write(f"Running for: {elapsed}") + + # Progress + self.stdout.write(f"\n--- Progress ---") + self.stdout.write(f"Overall: {job.progress.summary.progress:.1%}") + self.stdout.write(f"Stages: {len(job.progress.stages)}") + for stage in job.progress.stages: + self.stdout.write(f" - {stage.name}: {stage.progress:.1%} ({stage.status})") + + # Celery task info + if job.task_id: + self.stdout.write(f"\n--- Celery Task ---") + try: + task = AsyncResult(job.task_id) + self.stdout.write(f"Status: {task.status}") + self.stdout.write(f"Ready: {task.ready()}") + if task.ready(): + self.stdout.write(f"Successful: {task.successful()}") + if not task.successful(): + self.stdout.write(f"Error: {task.result}") + except Exception as e: + self.stdout.write(self.style.ERROR(f"Error: {e}")) + + # Logs + if job.logs.stderr: + self.stdout.write(f"\n--- Recent Errors ---") + for error in job.logs.stderr[:5]: + self.stdout.write(self.style.ERROR(f" {error}")) + + if job.logs.stdout: + self.stdout.write(f"\n--- Recent Logs (last 5) ---") + for log in job.logs.stdout[:5]: + self.stdout.write(f" {log}") + + # Suggestions + self.stdout.write(f"\n--- Actions ---") + if job.status in JobState.running_states(): + self.stdout.write(f" --check-job {job_id} Check status") + if job.task_id: + self.stdout.write(f" --remove-task {job_id} Simulate disappeared task") + self.stdout.write(f" --corrupt-task {job_id} Corrupt task_id") + elif job.status in JobState.failed_states(): + self.stdout.write(f" Job is in final state ({job.status})") + self.stdout.write(f" Use --force to check anyway") + + def create_test_job(self, project_id: int, options: dict): + """Create and run a long-running test job.""" + try: + project = Project.objects.get(pk=project_id) + except Project.DoesNotExist: + raise CommandError(f"Project {project_id} does not exist") + + self.stdout.write(f"\n{'='*60}") + self.stdout.write(f"Creating Test Job for Project #{project_id}: {project.name}") + self.stdout.write(f"{'='*60}\n") + + # Create a job with 5 minutes of delay (300 seconds) + # This makes it easy to test various scenarios during its runtime + job = Job.objects.create( + job_type_key=MLJob.key, + project=project, + name=f"Test Job - Created at {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}", + delay=300, # 5 minutes + ) + + self.stdout.write(f"Created Job #{job.pk}: {job.name}") + self.stdout.write(f"Duration: 5 minutes (300 seconds)") + self.stdout.write(f"Status: {job.status}") + + # Run the job + run_async = options.get("run_async", False) + + if run_async: + self.stdout.write("\n--- Running Asynchronously ---") + job.enqueue() + self.stdout.write(self.style.SUCCESS(f"✓ Job enqueued with task_id: {job.task_id}")) + self.stdout.write("\nThe job is now running in the background via Celery.") + else: + self.stdout.write("\n--- Running Synchronously ---") + self.stdout.write(self.style.WARNING("This will block for ~5 minutes. Press Ctrl+C to cancel.")) + self.stdout.write("") + + try: + job.run() + self.stdout.write(self.style.SUCCESS(f"\n✓ Job completed successfully!")) + except KeyboardInterrupt: + self.stdout.write(self.style.WARNING("\n\n⚠ Interrupted! Job may still be running.")) + except Exception as e: + self.stdout.write(self.style.ERROR(f"\n✗ Job failed: {e}")) + + job.refresh_from_db() + + self.stdout.write(f"\n--- Job Details ---") + self.stdout.write(f"Job ID: {job.pk}") + self.stdout.write(f"Status: {job.status}") + self.stdout.write(f"Task ID: {job.task_id or '(none)'}") + self.stdout.write(f"Progress: {job.progress.summary.progress:.1%}") + + self.stdout.write(f"\n--- Testing Ideas ---") + self.stdout.write(f" # While it's running, simulate a disappeared task:") + self.stdout.write(f" docker compose run --rm django python manage.py debug_jobs --remove-task {job.pk}") + self.stdout.write(f"") + self.stdout.write(f" # Then check if auto-retry kicks in:") + self.stdout.write(f" docker compose run --rm django python manage.py debug_jobs --check-job {job.pk}") + self.stdout.write(f"") + self.stdout.write(f" # Corrupt the task_id:") + self.stdout.write(f" docker compose run --rm django python manage.py debug_jobs --corrupt-task {job.pk}") + self.stdout.write(f"") + self.stdout.write(f" # Get detailed info:") + self.stdout.write(f" docker compose run --rm django python manage.py debug_jobs --info {job.pk}") diff --git a/ami/jobs/management/commands/update_stale_jobs.py b/ami/jobs/management/commands/update_stale_jobs.py index da3a53e3d..a440d2f93 100644 --- a/ami/jobs/management/commands/update_stale_jobs.py +++ b/ami/jobs/management/commands/update_stale_jobs.py @@ -1,5 +1,3 @@ -from celery import states -from celery.result import AsyncResult from django.core.management.base import BaseCommand from django.utils import timezone @@ -8,7 +6,10 @@ class Command(BaseCommand): help = ( - "Update the status of all jobs that are not in a final state " "and have not been updated in the last X hours." + "Update the status of all jobs that are not in a final state " + "and have not been updated in the last X hours. " + "\n\nNOTE: This is now handled automatically by the periodic task 'check_unfinished_jobs'. " + "This command is kept for manual intervention when needed." ) # Add argument for the number of hours to consider a job stale @@ -19,6 +20,11 @@ def add_arguments(self, parser): default=Job.FAILED_CUTOFF_HOURS, help="Number of hours to consider a job stale", ) + parser.add_argument( + "--no-retry", + action="store_true", + help="Disable automatic retry of disappeared tasks", + ) def handle(self, *args, **options): stale_jobs = Job.objects.filter( @@ -26,13 +32,20 @@ def handle(self, *args, **options): updated_at__lt=timezone.now() - timezone.timedelta(hours=options["hours"]), ) + total = stale_jobs.count() + self.stdout.write(f"Found {total} stale jobs to check...") + + updated_count = 0 for job in stale_jobs: - task = AsyncResult(job.task_id) if job.task_id else None - if task: - job.update_status(task.state, save=False) - job.save() - self.stdout.write(self.style.SUCCESS(f"Updated status of job {job.pk} to {task.state}")) - else: - self.stdout.write(self.style.WARNING(f"Job {job.pk} has no associated task, setting status to FAILED")) - job.update_status(states.FAILURE, save=False) - job.save() + try: + status_changed = job.check_status(force=False, save=True, auto_retry=not options["no_retry"]) + if status_changed: + updated_count += 1 + self.stdout.write(self.style.SUCCESS(f"✓ Job {job.pk} status updated to {job.status}")) + else: + self.stdout.write(self.style.WARNING(f"○ Job {job.pk} status unchanged ({job.status})")) + except Exception as e: + self.stdout.write(self.style.ERROR(f"✗ Error checking job {job.pk}: {e}")) + + self.stdout.write("") + self.stdout.write(self.style.SUCCESS(f"Completed: {updated_count} of {total} jobs updated")) diff --git a/ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py b/ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py new file mode 100644 index 000000000..aa84c26da --- /dev/null +++ b/ami/jobs/migrations/0018_add_last_checked_at_and_periodic_task.py @@ -0,0 +1,44 @@ +from django.db import migrations, models +from django_celery_beat.models import PeriodicTask, IntervalSchedule + + +def create_periodic_task(apps, schema_editor): + """Create periodic task to check unfinished jobs every 3 minutes.""" + interval_schedule, _ = IntervalSchedule.objects.get_or_create( + every=3, + period=IntervalSchedule.MINUTES, + ) + + PeriodicTask.objects.get_or_create( + name="jobs.check_unfinished_jobs", + task="ami.jobs.tasks.check_unfinished_jobs", + interval=interval_schedule, + defaults={ + "enabled": True, + "description": "Check status of all unfinished jobs and update if tasks have disappeared", + }, + ) + + +def delete_periodic_task(apps, schema_editor): + """Delete the periodic task if rolling back.""" + PeriodicTask.objects.filter(name="jobs.check_unfinished_jobs").delete() + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0017_alter_job_logs_alter_job_progress"), + ] + + operations = [ + migrations.AddField( + model_name="job", + name="last_checked_at", + field=models.DateTimeField( + blank=True, + help_text="Last time job status was checked", + null=True, + ), + ), + migrations.RunPython(create_periodic_task, delete_periodic_task), + ] diff --git a/ami/jobs/models.py b/ami/jobs/models.py index ac0078d76..91ecd3bf0 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1,8 +1,10 @@ import datetime +import functools import logging import random import time import typing +from contextlib import contextmanager from dataclasses import dataclass import pydantic @@ -10,6 +12,7 @@ from celery.result import AsyncResult from django.db import models, transaction from django.utils.text import slugify +from django.utils.timesince import timesince from django_pydantic_field import SchemaField from guardian.shortcuts import get_perms @@ -20,13 +23,141 @@ from ami.ml.models import Pipeline from ami.ml.post_processing.registry import get_postprocessing_task from ami.utils.schemas import OrderedEnum +from config import celery_app logger = logging.getLogger(__name__) +# ============================================================================== +# CONCURRENCY PROTECTION +# ============================================================================== +# This module implements row-level locking to prevent concurrent updates from +# multiple Celery workers from overwriting each other's changes. +# +# Key components: +# - atomic_job_update(): Context manager that locks a job row for safe updates +# - JobLogHandler.emit(): Uses locking when writing logs +# - Job.save(): Automatically uses locking and validates logs to prevent overwrites +# +# When multiple workers (1-10) are processing the same job, they all may need +# to update logs, progress, and status fields. Without locking, last-write-wins +# can cause lost updates. The locking approach ensures all updates are preserved. +# ============================================================================== + + +def check_celery_workers_available() -> tuple[bool, int]: + """ + Check if any Celery workers are currently running and available. + + Returns: + tuple: (workers_available: bool, worker_count: int) + """ + try: + # Get active workers using Celery's inspect API + inspect = celery_app.control.inspect() + active_workers = inspect.active() + + if active_workers is None: + # None means no workers responded (likely no workers running) + return False, 0 + + worker_count = len(active_workers) + return worker_count > 0, worker_count + except Exception as e: + logger.warning(f"Failed to check for Celery workers: {e}") + # If we can't check, assume workers might be available (fail open) + return True, 0 + + +@functools.lru_cache(maxsize=1) +def check_celery_workers_available_cached(timestamp: int) -> tuple[bool, int]: + """ + Cached version of check_celery_workers_available. + + Cache is keyed by timestamp (current minute), so results are cached + for approximately 1 minute to avoid excessive worker checks when + processing many jobs. + + Args: + timestamp: Current minute as an integer (int(time.time() / 60)) + + Returns: + tuple: (workers_available: bool, worker_count: int) + """ + return check_celery_workers_available() + + +@contextmanager +def atomic_job_update(job_id: int, timeout: int | None = None): + """ + Context manager for safely updating job fields with row-level locking. + + This ensures that concurrent updates to the same job (from multiple + Celery workers or tasks) don't overwrite each other's changes. The job + is locked for the duration of the context, and automatically saved when + the context exits if it was modified. + + Args: + job_id: The ID of the job to lock and update + timeout: Optional timeout in seconds to wait for the lock. + If None (default), waits indefinitely. + + Yields: + Job: The locked job instance, safe to modify + + Example: + with atomic_job_update(job.pk) as locked_job: + locked_job.logs.stdout.insert(0, "New log message") + locked_job.progress.update_stage("process", progress=0.5) + # Job is automatically saved on context exit + + Raises: + Job.DoesNotExist: If the job doesn't exist + DatabaseError: If the lock cannot be acquired within timeout + """ + # Import here to avoid circular import + from ami.jobs.models import Job + + with transaction.atomic(): + # Use select_for_update to lock the row + # nowait=False means we'll wait for the lock (don't lose data) + # skip_locked=False means we'll wait, not skip + query = Job.objects.select_for_update(nowait=False, skip_locked=False) + + if timeout is not None: + # Set statement timeout for this transaction + from django.db import connection + + with connection.cursor() as cursor: + cursor.execute(f"SET LOCAL statement_timeout = {timeout * 1000}") + + job = query.get(pk=job_id) + yield job + # Job will be saved automatically when exiting if changed + # due to Django's behavior with select_for_update + + class JobState(str, OrderedEnum): """ These come from Celery, except for CREATED, which is a custom state. + + Future Enhancement: Consider implementing a state machine validator to enforce + valid state transitions. Example structure: + + VALID_TRANSITIONS = { + JobState.CREATED: [JobState.PENDING, JobState.STARTED, JobState.FAILURE], + JobState.PENDING: [JobState.STARTED, JobState.RECEIVED, JobState.FAILURE], + JobState.STARTED: [JobState.SUCCESS, JobState.FAILURE, JobState.RETRY], + JobState.RETRY: [JobState.PENDING, JobState.STARTED, JobState.FAILURE], + JobState.CANCELING: [JobState.REVOKED, JobState.FAILURE], + # Final states generally shouldn't transition + JobState.SUCCESS: [], + JobState.FAILURE: [JobState.RETRY], # Only allow retry from failure + JobState.REVOKED: [], + } + + This would help catch invalid state transitions early and make the job + lifecycle more predictable. """ # CREATED = "Created" @@ -255,9 +386,12 @@ class JobLogs(pydantic.BaseModel): class JobLogHandler(logging.Handler): """ Class for handling logs from a job and writing them to the job instance. + + Uses row-level locking to prevent concurrent log writes from overwriting + each other when multiple Celery workers are updating the same job. """ - max_log_length = 1000 + max_log_length = 10000 # Allow ~100 messages per batch × hundreds of batches def __init__(self, job: "Job", *args, **kwargs): self.job = job @@ -267,23 +401,29 @@ def emit(self, record: logging.LogRecord): # Log to the current app logger logger.log(record.levelno, self.format(record)) - # Write to the logs field on the job instance + # Write to the logs field on the job instance with atomic locking timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") msg = f"[{timestamp}] {record.levelname} {self.format(record)}" - if msg not in self.job.logs.stdout: - self.job.logs.stdout.insert(0, msg) - # Write a simpler copy of any errors to the errors field - if record.levelno >= logging.ERROR: - if record.message not in self.job.logs.stderr: - self.job.logs.stderr.insert(0, record.message) + try: + # Use atomic update to prevent race conditions + with atomic_job_update(self.job.pk) as job: + if msg not in job.logs.stdout: + job.logs.stdout.insert(0, msg) + + # Write a simpler copy of any errors to the errors field + if record.levelno >= logging.ERROR: + if record.message not in job.logs.stderr: + job.logs.stderr.insert(0, record.message) - if len(self.job.logs.stdout) > self.max_log_length: - self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length] + if len(job.logs.stdout) > self.max_log_length: + job.logs.stdout = job.logs.stdout[: self.max_log_length] - # @TODO consider saving logs to the database periodically rather than on every log - try: - self.job.save(update_fields=["logs"], update_progress=False) + if len(job.logs.stderr) > self.max_log_length: + job.logs.stderr = job.logs.stderr[: self.max_log_length] + + # Save with only the logs field to minimize lock time + job.save(update_fields=["logs"]) except Exception as e: logger.error(f"Failed to save logs for job #{self.job.pk}: {e}") pass @@ -325,7 +465,7 @@ def run(cls, job: "Job"): job.update_status(JobState.STARTED) job.started_at = datetime.datetime.now() job.finished_at = None - job.save() + job.save(update_fields=["status", "progress", "started_at", "finished_at"]) # Keep track of sub-tasks for saving results, pair with batch number save_tasks: list[tuple[int, AsyncResult]] = [] @@ -345,7 +485,8 @@ def run(cls, job: "Job"): progress=i / job.delay, mood="😵‍💫", ) - job.save() + # Only save progress to avoid overwriting logs + job.save(update_fields=["progress"]) last_update = time.time() job.progress.update_stage( @@ -354,7 +495,8 @@ def run(cls, job: "Job"): progress=1, mood="🥳", ) - job.save() + # Only save progress to avoid overwriting logs + job.save(update_fields=["progress"]) if not job.pipeline: raise ValueError("No pipeline specified to process images in ML job") @@ -398,8 +540,8 @@ def run(cls, job: "Job"): progress=1, ) - # End image collection stage - job.save() + # End image collection stage - only save progress to avoid overwriting logs + job.save(update_fields=["progress"]) total_captures = 0 total_detections = 0 @@ -461,7 +603,8 @@ def run(cls, job: "Job"): detections=total_detections, classifications=total_classifications, ) - job.save() + # Only save progress field to avoid overwriting logs from JobLogHandler + job.save(update_fields=["progress"]) # Stop processing if any save tasks have failed # Otherwise, calculate the percent of images that have failed to save @@ -495,7 +638,8 @@ def run(cls, job: "Job"): FAILURE_THRESHOLD = 0.5 if image_count and (percent_successful < FAILURE_THRESHOLD): job.progress.update_stage("process", status=JobState.FAILURE) - job.save() + # Only save progress to avoid overwriting logs + job.save(update_fields=["progress"]) raise Exception(f"Failed to process more than {int(FAILURE_THRESHOLD * 100)}% of images") job.progress.update_stage( @@ -510,7 +654,8 @@ def run(cls, job: "Job"): ) job.update_status(JobState.SUCCESS, save=False) job.finished_at = datetime.datetime.now() - job.save() + # Save all final fields at once, excluding logs + job.save(update_fields=["status", "progress", "finished_at"]) class DataStorageSyncJob(JobType): @@ -530,7 +675,8 @@ def run(cls, job: "Job"): job.update_status(JobState.STARTED) job.started_at = datetime.datetime.now() job.finished_at = None - job.save() + # Only save specific fields to avoid overwriting logs + job.save(update_fields=["progress", "status", "started_at", "finished_at"]) if not job.deployment: raise ValueError("No deployment provided for data storage sync job") @@ -542,7 +688,8 @@ def run(cls, job: "Job"): progress=0, total_files=0, ) - job.save() + # Only save progress to avoid overwriting logs + job.save(update_fields=["progress"]) job.deployment.sync_captures(job=job) @@ -553,10 +700,12 @@ def run(cls, job: "Job"): progress=1, ) job.update_status(JobState.SUCCESS) - job.save() + # Save status and progress to avoid overwriting logs + job.save(update_fields=["status", "progress"]) job.finished_at = datetime.datetime.now() - job.save() + # Only save finished_at to avoid overwriting logs + job.save(update_fields=["finished_at"]) class SourceImageCollectionPopulateJob(JobType): @@ -575,7 +724,8 @@ def run(cls, job: "Job"): job.update_status(JobState.STARTED) job.started_at = datetime.datetime.now() job.finished_at = None - job.save() + # Only save specific fields to avoid overwriting logs + job.save(update_fields=["progress", "status", "started_at", "finished_at"]) if not job.source_image_collection: raise ValueError("No source image collection provided") @@ -590,11 +740,13 @@ def run(cls, job: "Job"): progress=0.10, captures_added=0, ) - job.save() + # Only save progress to avoid overwriting logs + job.save(update_fields=["progress", "status", "started_at", "finished_at"]) job.source_image_collection.populate_sample(job=job) job.logger.info(f"Finished populating source image collection {job.source_image_collection}") - job.save() + # Only save progress to avoid overwriting logs + job.save(update_fields=["progress"]) captures_added = job.source_image_collection.images.count() job.logger.info(f"Added {captures_added} captures to source image collection {job.source_image_collection}") @@ -607,7 +759,8 @@ def run(cls, job: "Job"): ) job.finished_at = datetime.datetime.now() job.update_status(JobState.SUCCESS, save=False) - job.save() + # Save final fields to avoid overwriting logs + job.save(update_fields=["progress", "status", "finished_at"]) class DataExportJob(JobType): @@ -630,7 +783,8 @@ def run(cls, job: "Job"): job.update_status(JobState.STARTED) job.started_at = datetime.datetime.now() job.finished_at = None - job.save() + # Only save specific fields to avoid overwriting logs + job.save(update_fields=["progress", "status", "started_at", "finished_at"]) job.logger.info(f"Starting export for project {job.project}") @@ -643,7 +797,9 @@ def run(cls, job: "Job"): job.progress.add_stage_param(stage.key, "File URL", f"{file_url}") job.progress.update_stage(stage.key, status=JobState.SUCCESS, progress=1) job.finished_at = datetime.datetime.now() - job.update_status(JobState.SUCCESS, save=True) + job.update_status(JobState.SUCCESS, save=False) + # Save final fields to avoid overwriting logs + job.save(update_fields=["progress", "status", "finished_at"]) class PostProcessingJob(JobType): @@ -719,14 +875,23 @@ def get_job_type_by_inferred_key(job: "Job") -> type[JobType] | None: class Job(BaseModel): """A job to be run by the scheduler""" - # Hide old failed jobs after 3 days - FAILED_CUTOFF_HOURS = 24 * 3 + # Hide old failed jobs after 30 days + FAILED_CUTOFF_HOURS = 24 * 30 + + # Job status check configuration thresholds + NO_TASK_ID_TIMEOUT_SECONDS = 300 # 5 minutes + DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes + MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days + STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes + STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 3600 # 1 hour + PENDING_LOG_INTERVAL_SECONDS = 300 # 5 minutes name = models.CharField(max_length=255) queue = models.CharField(max_length=255, default="default") scheduled_at = models.DateTimeField(null=True, blank=True) started_at = models.DateTimeField(null=True, blank=True) finished_at = models.DateTimeField(null=True, blank=True) + last_checked_at = models.DateTimeField(null=True, blank=True, help_text="Last time job status was checked") # @TODO can we use an Enum or Pydantic model for status? status = models.CharField(max_length=255, default=JobState.CREATED.name, choices=JobState.choices()) progress: JobProgress = SchemaField(JobProgress, default=default_job_progress) @@ -861,7 +1026,7 @@ def retry(self, async_task=True): """ self.logger.info(f"Re-running job {self}") self.finished_at = None - self.progress.reset() + self.progress.reset(status=JobState.RETRY) self.status = JobState.RETRY self.save() if async_task: @@ -874,6 +1039,7 @@ def cancel(self): Terminate the celery task. """ self.status = JobState.CANCELING + self.progress.summary.status = JobState.CANCELING self.save() if self.task_id: task = run_job.AsyncResult(self.task_id) @@ -882,29 +1048,393 @@ def cancel(self): self.save() else: self.status = JobState.REVOKED + self.progress.summary.status = JobState.REVOKED self.save() - def update_status(self, status=None, save=True): + def _mark_as_failed(self, error_message: str, now: datetime.datetime) -> bool: """ - Update the status of the job based on the status of the celery task. - Or if a status is provided, update the status of the job to that value. + Mark the job as failed. + + Args: + error_message: The error message to log + now: The current datetime to use for finished_at + + Returns: + Always returns True since status is being changed to FAILURE """ - if not status and self.task_id: - task = run_job.AsyncResult(self.task_id) - status = task.status + if self.status == JobState.FAILURE: + return False # No change + self.logger.error(error_message) + self.status = JobState.FAILURE + self.progress.summary.status = JobState.FAILURE + self.finished_at = now + return True + + def _check_if_resurrected(self, now: datetime.datetime, save: bool) -> tuple[bool, bool]: + """ + Check if a failed job has been resurrected (Celery task is now running/completed). + + Args: + now: Current datetime + save: Whether to save changes + + Returns: + Tuple of (should_continue_checks, status_changed) + """ + if self.status != JobState.FAILURE or not self.task_id: + return True, False + + try: + task = AsyncResult(self.task_id) + celery_status = task.status + # If the task is now running or succeeded, resurrect the job + if celery_status in [JobState.STARTED, JobState.SUCCESS]: + self.logger.warning( + f"Job {self.pk} was marked as FAILURE but task is now {celery_status}, resurrecting job" + ) + self.update_status(celery_status, save=False) + self.finished_at = None if celery_status == JobState.STARTED else self.finished_at + if save: + update_fields = ["last_checked_at", "status", "progress"] + if celery_status == JobState.STARTED: + update_fields.append("finished_at") + self.save(update_fields=update_fields) + return False, True + except Exception as e: + self.logger.debug(f"Could not check resurrection of failed job {self.pk}: {e}") + + return False, False + + def _check_missing_task_id(self, now: datetime.datetime, timeout_seconds: int) -> bool: + """ + Check if job was scheduled but never got a task_id. + + Args: + now: Current datetime + timeout_seconds: How long to wait before marking as failed + + Returns: + True if status changed, False otherwise + """ + if self.task_id or not self.scheduled_at: + return False + + time_since_scheduled = (now - self.scheduled_at).total_seconds() + if time_since_scheduled > timeout_seconds: + return self._mark_as_failed( + f"Job {self.pk} was scheduled {time_since_scheduled:.0f}s ago but never got a task_id", now + ) + return False + + def _check_disappeared_task( + self, + task: AsyncResult, + celery_status: str | None, + now: datetime.datetime, + auto_retry: bool, + retry_threshold_seconds: int, + save: bool, + ) -> tuple[bool, bool]: + """ + Check if task has disappeared from Celery backend and handle retry logic. + + Args: + task: The Celery AsyncResult + celery_status: The task status (may be None) + now: Current datetime + auto_retry: Whether to auto-retry disappeared tasks + retry_threshold_seconds: Time threshold for auto-retry + save: Whether to save changes + + Returns: + Tuple of (should_return_early, status_changed) + """ + # Task not found or status unavailable + if celery_status is None or ( + celery_status == "PENDING" and self.status not in [JobState.CREATED, JobState.PENDING] + ): + self.logger.warning( + f"Job {self.pk} task {self.task_id} not found in Celery backend " + f"(current job status: {self.status})" + ) + + # Only retry if job was supposedly running + if self.status in JobState.running_states() and auto_retry: + if self.started_at: + time_since_start = (now - self.started_at).total_seconds() + if time_since_start < retry_threshold_seconds: + # Task disappeared shortly after starting - likely a worker crash + self.logger.info(f"Job {self.pk} task disappeared shortly after starting, attempting retry") + try: + self.retry(async_task=True) + if save: + self.save(update_fields=["last_checked_at"]) + return True, True + except Exception as retry_err: + self.logger.error(f"Failed to retry job {self.pk}: {retry_err}") + + # If we didn't retry or retry failed, mark as failed + status_changed = self._mark_as_failed( + f"Job {self.pk} task disappeared from Celery, marking as failed", now + ) + return True, status_changed # Return early - job is now failed + + return False, False + + def _check_status_mismatch(self, celery_status: str, now: datetime.datetime) -> bool: + """ + Check if job status doesn't match Celery task status and reconcile. + + Args: + celery_status: The status reported by Celery + now: Current datetime + + Returns: + True if status changed, False otherwise + """ + if celery_status == self.status: + return False + + self.logger.warning(f"Job {self.pk} status '{self.status}' doesn't match Celery task status '{celery_status}'") + + # Update to match Celery's status + old_status = self.status + self.update_status(celery_status, save=False) + + # If Celery says it failed but we thought it was running + if celery_status in JobState.failed_states() and old_status in JobState.running_states(): + self.finished_at = now + self.logger.error(f"Job {self.pk} task failed in Celery") + # If Celery says it succeeded but we thought it was running + elif celery_status == JobState.SUCCESS and old_status in JobState.running_states(): + self.finished_at = now + + return True + + def _check_if_stale(self, task: AsyncResult, now: datetime.datetime, max_runtime_seconds: int) -> bool: + """ + Check if job has been running for too long and should be marked as stale. + + Args: + task: The Celery AsyncResult + now: Current datetime + max_runtime_seconds: Maximum allowed runtime + + Returns: + True if status changed, False otherwise + """ + if self.status not in JobState.running_states() or not self.started_at: + return False + + time_since_start = (now - self.started_at).total_seconds() + if time_since_start > max_runtime_seconds: + time_running = timesince(self.started_at, now) + max_runtime_hours = max_runtime_seconds / 3600 + status_changed = self._mark_as_failed( + f"Job {self.pk} has been running for {time_running}, " + f"marking as failed (max runtime: {max_runtime_hours:.0f} hours)", + now, + ) + + # Try to revoke the task if we just marked it as failed + if status_changed: + try: + task.revoke(terminate=True) + except Exception as e: + self.logger.error(f"Failed to revoke stale task {self.task_id}: {e}") + + return status_changed + + return False + + def _check_stuck_pending( + self, + celery_status: str, + now: datetime.datetime, + timeout_with_workers: int, + timeout_no_workers: int, + log_interval: int, + ) -> bool: + """ + Check if task is stuck in PENDING state for too long. + + Args: + celery_status: The status reported by Celery + now: Current datetime + timeout_with_workers: Timeout when workers are available + timeout_no_workers: Timeout when no workers are available + log_interval: How often to log waiting messages + + Returns: + True if status changed, False otherwise + """ + if celery_status != JobState.PENDING or not self.scheduled_at: + return False + + time_since_scheduled = (now - self.scheduled_at).total_seconds() + + # Check if workers are available (using cached check to avoid excessive queries) + current_minute = int(time.time() / 60) + workers_available, worker_count = check_celery_workers_available_cached(current_minute) + + # Determine timeout based on worker availability + timeout = timeout_with_workers if workers_available else timeout_no_workers + + # Log periodic waiting messages (approximately every log_interval seconds) + # Only log if we're past the interval and within a reasonable window + if time_since_scheduled > log_interval: + # Calculate how many intervals have passed + intervals_passed = int(time_since_scheduled / log_interval) + time_since_last_interval = time_since_scheduled - (intervals_passed * log_interval) + + # Log if we're within the first 60 seconds of a new interval + if time_since_last_interval < 60: + time_waiting = timesince(self.scheduled_at, now) + if workers_available: + self.logger.warning( + f"Job {self.pk} has been waiting for {time_waiting} " + f"with {worker_count} worker(s) available. Task may be queued behind other jobs." + ) + else: + self.logger.error( + f"Job {self.pk} has been waiting for {time_waiting}. " + f"NO WORKERS RUNNING - task cannot be picked up until workers start." + ) + + # Check if timeout exceeded + if time_since_scheduled > timeout: + time_waiting = timesince(self.scheduled_at, now) + if workers_available: + error_message = ( + f"Job {self.pk} has been pending for {time_waiting} " f"with workers available, marking as failed" + ) + else: + error_message = ( + f"Job {self.pk} has been pending for {time_waiting} " + f"with no workers detected, marking as failed" + ) + return self._mark_as_failed(error_message, now) + + return False + + def check_status(self, force: bool = False, save: bool = True, auto_retry: bool = True) -> bool: + """ + Check the status of the job by querying the underlying Celery task. + + This method verifies that the job's task is still active and updates + the job status if inconsistencies are detected (e.g., task disappeared). + + Args: + force: If True, check even if job is in a final state + save: If True, save the job after checking + auto_retry: If True, automatically retry jobs with disappeared tasks + + Returns: + True if job status was updated, False otherwise + """ + now = datetime.datetime.now() + self.last_checked_at = now + + # Special case: check if a failed job has come back to life + if not force: + should_continue, status_changed = self._check_if_resurrected(now, save) + if not should_continue: + return status_changed + + # Don't check jobs that are already in final states unless forced + if not force and self.status in JobState.final_states(): + if save: + self.save(update_fields=["last_checked_at"]) + return False + + status_changed = False + + # Check if task_id exists + status_changed = self._check_missing_task_id(now, self.NO_TASK_ID_TIMEOUT_SECONDS) + if not self.task_id: + if save: + update_fields = ["last_checked_at"] + if status_changed: + update_fields.extend(["status", "progress", "finished_at"]) + self.save(update_fields=update_fields) + return status_changed + + # Query the Celery task + try: + task = AsyncResult(self.task_id) + + # Try to get task status + try: + celery_status = task.status + except Exception as status_err: + self.logger.warning(f"Job {self.pk} task {self.task_id} status could not be retrieved: {status_err}") + celery_status = None + + # Check if task has disappeared + should_return, disappeared_status_changed = self._check_disappeared_task( + task, celery_status, now, auto_retry, self.DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS, save + ) + status_changed = status_changed or disappeared_status_changed + if should_return: + return disappeared_status_changed + + # Check for status mismatches with Celery + if celery_status: + mismatch_changed = self._check_status_mismatch(celery_status, now) + status_changed = status_changed or mismatch_changed + + # Check if job is stale (running too long) + stale_changed = self._check_if_stale(task, now, self.MAX_JOB_RUNTIME_SECONDS) + status_changed = status_changed or stale_changed + + # Check if stuck in PENDING + pending_changed = self._check_stuck_pending( + celery_status, + now, + self.STUCK_PENDING_TIMEOUT_SECONDS, + self.STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS, + self.PENDING_LOG_INTERVAL_SECONDS, + ) + status_changed = status_changed or pending_changed + except Exception as e: + self.logger.error(f"Error checking status for job {self.pk}: {e}") + + if save: + update_fields = ["last_checked_at"] + if status_changed: + update_fields.extend(["status", "progress", "finished_at"]) + self.save(update_fields=update_fields) + + return status_changed + + def update_status(self, status, save=True): + """ + Update the job status to a specific value. + + This is a simple setter for when you know the status you want to set. + For checking and syncing with Celery, use check_status() instead. + + Args: + status: The status to set (required) + save: Whether to save the job after updating + """ if not status: - self.logger.warning(f"Could not determine status of job {self.pk}") + self.logger.warning(f"Cannot update status for job {self.pk} - no status provided") return + status_changed = False if status != self.status: self.logger.info(f"Changing status of job {self.pk} from {self.status} to {status}") self.status = status - - self.progress.summary.status = status + self.progress.summary.status = status + status_changed = True if save: - self.save() + # Only update status and progress fields to avoid concurrency issues + update_fields = ["status", "progress"] if status_changed else [] + if update_fields: + self.save(update_fields=update_fields) def update_progress(self, save=True): """ @@ -929,22 +1459,129 @@ def update_progress(self, save=True): self.progress.summary.progress = total_progress if save: - self.save(update_progress=False) + # Only update progress field to avoid concurrency issues + self.save(update_fields=["progress"]) + + def _validate_log_lengths(self, update_fields: list[str] | None) -> None: + """ + Validate that logs aren't getting shorter due to stale in-memory data. + + This is a safety check to catch bugs where concurrent updates might + overwrite logs. If logs would get shorter, automatically refreshes + them from the database to prevent data loss. + + Args: + update_fields: List of fields being updated, or None for all fields + + Note: + This can be easily disabled by commenting out the call in save() + if the validation overhead becomes an issue. However, the check + is very cheap (just a length comparison) and provides valuable + protection against data loss. + """ + if self.pk is None or not update_fields or "logs" in update_fields: + # Skip validation for new jobs, when updating logs explicitly, + # or when not using update_fields + return + + try: + # Get current log lengths from database + current_job = Job.objects.only("logs").get(pk=self.pk) + current_stdout_len = len(current_job.logs.stdout) + current_stderr_len = len(current_job.logs.stderr) + new_stdout_len = len(self.logs.stdout) + new_stderr_len = len(self.logs.stderr) + + # If logs would get shorter, it means we have stale in-memory data + if new_stdout_len < current_stdout_len or new_stderr_len < current_stderr_len: + logger.error( + f"CRITICAL: Job #{self.pk} attempted to save with stale logs! " + f"stdout: {current_stdout_len} -> {new_stdout_len}, " + f"stderr: {current_stderr_len} -> {new_stderr_len}. " + f"update_fields={update_fields}. This would lose log data!" + ) + # Refresh logs from database to prevent data loss + self.logs = current_job.logs + logger.warning(f"Refreshed logs for job #{self.pk} from database to prevent data loss") + except Job.DoesNotExist: + # Job might have been deleted, let it fail naturally + pass + except Exception as e: + # Don't let validation break the save, but log it + logger.warning(f"Failed to validate log lengths for job #{self.pk}: {e}") def duration(self) -> datetime.timedelta | None: if self.started_at and self.finished_at: return self.finished_at - self.started_at return None - def save(self, update_progress=True, *args, **kwargs): + def save(self, update_progress=True, use_locking=True, *args, **kwargs): """ Create the job stages if they don't exist. + + This method automatically uses row-level locking when updating existing jobs + to prevent concurrent workers from overwriting each other's changes. + + Args: + update_progress: Whether to recalculate progress summary (default: True) + use_locking: Whether to use SELECT FOR UPDATE locking for existing jobs (default: True) + *args, **kwargs: Additional arguments passed to Django's save() + + Special handling: + - If 'update_fields' is specified, only those fields will be saved + - If 'update_fields' includes 'logs', locking is automatically disabled to avoid + conflicts with JobLogHandler which manages its own locks + - For new jobs (pk=None), locking is skipped """ + is_new = self.pk is None + update_fields = kwargs.get("update_fields") + + # Don't use locking if explicitly disabled or if this is a new job + # or if we're only updating logs (JobLogHandler manages its own locks) + should_lock = use_locking and not is_new and not (update_fields and update_fields == ["logs"]) + + # Update progress/setup before saving if self.pk and self.progress.stages and update_progress: self.update_progress(save=False) else: self.setup(save=False) - super().save(*args, **kwargs) + + # Safety check: Ensure logs never get shorter (unless explicitly updating logs) + # This can be disabled by commenting out this line if needed + self._validate_log_lengths(update_fields) + + if should_lock: + # Refresh from database with row-level lock to prevent concurrent overwrites + # This ensures we have the latest data before saving our changes + try: + with atomic_job_update(self.pk) as locked_job: + # Copy our in-memory changes to the locked instance + if update_fields: + # Only update specified fields + for field_name in update_fields: + setattr(locked_job, field_name, getattr(self, field_name)) + else: + # Update all non-log fields to preserve concurrent log writes + for field in self._meta.fields: + if field.name != "logs": # Never overwrite logs unless explicitly specified + setattr(locked_job, field.name, getattr(self, field.name)) + + # Save the locked instance with our changes + super(Job, locked_job).save(*args, **kwargs) + + # Update our instance with the saved state + self.pk = locked_job.pk + for field in self._meta.fields: + setattr(self, field.name, getattr(locked_job, field.name)) + except Exception as e: + logger.error(f"Failed to save job #{self.pk} with locking: {e}") + # Fall back to normal save - better to save without lock than to fail completely + logger.warning(f"Falling back to unlocked save for job #{self.pk}") + super().save(*args, **kwargs) + else: + # New job or locking disabled - use normal save + super().save(*args, **kwargs) + logger.debug(f"Saved job {self}") if self.progress.summary.status != self.status: logger.warning(f"Job {self} status mismatches progress: {self.progress.summary.status} != {self.status}") diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index b12271178..572ae342e 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -2,11 +2,14 @@ from celery.result import AsyncResult from celery.signals import task_failure, task_postrun, task_prerun +from celery.utils.log import get_task_logger +from django.core.cache import cache from ami.tasks import default_soft_time_limit, default_time_limit from config import celery_app logger = logging.getLogger(__name__) +task_logger = get_task_logger(__name__) @celery_app.task(bind=True, soft_time_limit=default_soft_time_limit, time_limit=default_time_limit) @@ -50,7 +53,7 @@ def update_job_status(sender, task_id, task, *args, **kwargs): task = AsyncResult(task_id) # I'm not sure if this is reliable job.update_status(task.status, save=False) - job.save() + job.save(update_fields=["status", "progress"]) @task_failure.connect(sender=run_job, retry=False) @@ -62,4 +65,93 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}') - job.save() + job.save(update_fields=["status", "progress"]) + + +@celery_app.task(soft_time_limit=300, time_limit=360) +def check_unfinished_jobs(): + """ + Periodic task to check the status of all unfinished jobs. + + This task prevents duplicate execution using cache-based locking and + checks jobs that haven't been verified recently to ensure their Celery + tasks are still active and their statuses are accurate. + """ + import datetime + + from ami.jobs.models import Job, JobState + + # Configuration thresholds (TODO: make these configurable via settings) + LOCK_TIMEOUT_SECONDS = 300 # 5 minutes - how long the lock is held + MAX_JOBS_PER_RUN = 100 # Maximum number of jobs to check in one run + MIN_CHECK_INTERVAL_MINUTES = 2 # Minimum time between checks for the same job + + # Use cache-based locking to prevent duplicate checks + lock_id = "check_unfinished_jobs_lock" + + # Try to acquire lock + if not cache.add(lock_id, "locked", LOCK_TIMEOUT_SECONDS): + task_logger.info("check_unfinished_jobs is already running, skipping this execution") + return {"status": "skipped", "reason": "already_running"} + + try: + task_logger.info("Starting check_unfinished_jobs task") + + # Get all jobs that are not in final states + unfinished_jobs = Job.objects.filter(status__in=JobState.running_states()).order_by("scheduled_at") + + total_jobs = unfinished_jobs.count() + task_logger.info(f"Found {total_jobs} unfinished jobs to check") + + if total_jobs == 0: + return {"status": "success", "checked": 0, "updated": 0} + + # Avoid checking too many jobs at once + if total_jobs > MAX_JOBS_PER_RUN: + task_logger.warning(f"Limiting check to {MAX_JOBS_PER_RUN} jobs (out of {total_jobs})") + unfinished_jobs = unfinished_jobs[:MAX_JOBS_PER_RUN] + + # Only check jobs that haven't been checked recently + now = datetime.datetime.now() + min_check_interval = datetime.timedelta(minutes=MIN_CHECK_INTERVAL_MINUTES) + + jobs_to_check = [] + for job in unfinished_jobs: + if job.last_checked_at is None: + jobs_to_check.append(job) + else: + time_since_check = now - job.last_checked_at + if time_since_check >= min_check_interval: + jobs_to_check.append(job) + + task_logger.info(f"Checking {len(jobs_to_check)} jobs that need status verification") + + checked_count = 0 + updated_count = 0 + error_count = 0 + + for job in jobs_to_check: + try: + task_logger.debug(f"Checking job {job.pk}: {job.name} (status: {job.status})") + status_changed = job.check_status(force=False, save=True) + checked_count += 1 + if status_changed: + updated_count += 1 + task_logger.info(f"Updated job {job.pk} status to {job.status}") + except Exception as e: + error_count += 1 + task_logger.error(f"Error checking job {job.pk}: {e}", exc_info=True) + + result = { + "status": "success", + "total_unfinished": total_jobs, + "checked": checked_count, + "updated": updated_count, + "errors": error_count, + } + task_logger.info(f"Completed check_unfinished_jobs: {result}") + return result + + finally: + # Always release the lock + cache.delete(lock_id) diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index 64fbf23a2..85aee8ebb 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -1,7 +1,10 @@ # from rich import print +import datetime import logging +from unittest.mock import MagicMock, patch from django.test import TestCase +from django.utils import timezone from guardian.shortcuts import assign_perm from rest_framework.test import APIRequestFactory, APITestCase @@ -198,3 +201,529 @@ def test_cancel_job(self): # This cannot be tested until we have a way to cancel jobs # and a way to run async tasks in tests. pass + + +class TestJobStatusChecking(TestCase): + """ + Test the job status checking functionality. + """ + + def setUp(self): + self.project = Project.objects.create(name="Status Check Test Project") + self.pipeline = Pipeline.objects.create( + name="Test ML pipeline", + description="Test ML pipeline", + ) + self.pipeline.projects.add(self.project) + self.source_image_collection = SourceImageCollection.objects.create( + name="Test collection", + project=self.project, + ) + + def test_check_status_no_task_id_recently_scheduled(self): + """Test that recently scheduled jobs without task_id are not marked as failed.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - no task_id", + scheduled_at=timezone.now(), + ) + + status_changed = job.check_status() + + self.assertFalse(status_changed) + self.assertEqual(job.status, JobState.CREATED.value) + self.assertIsNotNone(job.last_checked_at) + + def test_check_status_no_task_id_old_scheduled(self): + """Test that old scheduled jobs without task_id are marked as failed.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - stale no task_id", + scheduled_at=timezone.now() - datetime.timedelta(minutes=10), + ) + + status_changed = job.check_status() + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + self.assertIsNotNone(job.last_checked_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_with_matching_status(self, mock_async_result): + """Test that jobs with matching Celery status are not changed.""" + mock_task = MagicMock() + mock_task.status = JobState.STARTED.value + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - matching status", + task_id="test-task-id-123", + status=JobState.STARTED.value, + started_at=timezone.now(), + ) + + status_changed = job.check_status() + + self.assertFalse(status_changed) + self.assertEqual(job.status, JobState.STARTED.value) + self.assertIsNotNone(job.last_checked_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_with_mismatched_status(self, mock_async_result): + """Test that jobs with mismatched Celery status are updated.""" + mock_task = MagicMock() + mock_task.status = JobState.FAILURE.value + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - mismatched status", + task_id="test-task-id-456", + status=JobState.STARTED.value, + started_at=timezone.now(), + ) + + status_changed = job.check_status() + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + self.assertIsNotNone(job.last_checked_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_stale_running_job(self, mock_async_result): + """Test that jobs running for too long are marked as failed.""" + mock_task = MagicMock() + mock_task.status = JobState.STARTED.value + mock_async_result.return_value = mock_task + + # Create job that started longer than MAX_JOB_RUNTIME_SECONDS ago + stale_time = datetime.timedelta(seconds=Job.MAX_JOB_RUNTIME_SECONDS + 3600) # 1 hour past limit + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - stale running", + task_id="test-task-id-789", + status=JobState.STARTED.value, + started_at=timezone.now() - stale_time, + ) + + status_changed = job.check_status() + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + # Verify task was attempted to be revoked + mock_task.revoke.assert_called_once_with(terminate=True) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_stuck_pending(self, mock_async_result): + """Test that jobs stuck in PENDING for too long are marked as failed.""" + mock_task = MagicMock() + mock_task.status = JobState.PENDING.value + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - stuck pending", + task_id="test-task-id-pending", + status=JobState.PENDING.value, + scheduled_at=timezone.now() - datetime.timedelta(minutes=15), + ) + + status_changed = job.check_status() + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + + def test_check_status_does_not_check_completed_jobs(self): + """Test that completed jobs are not checked unless forced.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - completed", + task_id="test-task-id-completed", + status=JobState.SUCCESS.value, + finished_at=timezone.now(), + ) + + status_changed = job.check_status(force=False) + + self.assertFalse(status_changed) + self.assertEqual(job.status, JobState.SUCCESS.value) + self.assertIsNotNone(job.last_checked_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_forces_check_on_completed_jobs(self, mock_async_result): + """Test that force=True checks even completed jobs.""" + mock_task = MagicMock() + mock_task.status = JobState.SUCCESS.value + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - force check", + task_id="test-task-id-force", + status=JobState.SUCCESS.value, + finished_at=timezone.now(), + ) + + status_changed = job.check_status(force=True) + + # Status shouldn't change since Celery status matches + self.assertFalse(status_changed) + self.assertIsNotNone(job.last_checked_at) + + @patch("ami.jobs.tasks.cache") + @patch("ami.jobs.models.Job.objects") + def test_check_unfinished_jobs_with_lock(self, mock_job_objects, mock_cache): + """Test that check_unfinished_jobs uses locking to prevent duplicates.""" + from ami.jobs.tasks import check_unfinished_jobs + + # Simulate lock already acquired + mock_cache.add.return_value = False + + result = check_unfinished_jobs() + + self.assertEqual(result["status"], "skipped") + self.assertEqual(result["reason"], "already_running") + mock_cache.add.assert_called_once() + mock_cache.delete.assert_not_called() + + @patch("ami.jobs.tasks.cache") + def test_check_unfinished_jobs_processes_jobs(self, mock_cache): + """Test that check_unfinished_jobs processes unfinished jobs.""" + from ami.jobs.tasks import check_unfinished_jobs + + # Allow lock to be acquired + mock_cache.add.return_value = True + + # Create some unfinished jobs + job1 = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Unfinished job 1", + status=JobState.STARTED.value, + task_id="test-task-1", + started_at=timezone.now(), + last_checked_at=timezone.now() - datetime.timedelta(minutes=5), + ) + + job2 = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Unfinished job 2", + status=JobState.PENDING.value, + task_id="test-task-2", + scheduled_at=timezone.now() - datetime.timedelta(minutes=3), + ) + + # Create a completed job (should not be checked) + Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Completed job", + status=JobState.SUCCESS.value, + finished_at=timezone.now(), + ) + + with patch("ami.jobs.models.AsyncResult") as mock_async_result: + mock_task = MagicMock() + mock_task.status = JobState.STARTED.value + mock_async_result.return_value = mock_task + + result = check_unfinished_jobs() + + self.assertEqual(result["status"], "success") + self.assertEqual(result["total_unfinished"], 2) + self.assertGreaterEqual(result["checked"], 1) + + # Verify lock was released + mock_cache.delete.assert_called_once() + + # Verify jobs were checked + job1.refresh_from_db() + job2.refresh_from_db() + self.assertIsNotNone(job1.last_checked_at) + self.assertIsNotNone(job2.last_checked_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_task_disappeared_with_retry(self, mock_async_result): + """Test that jobs with disappeared tasks are retried if they just started.""" + mock_task = MagicMock() + mock_task.status = None # Task not found + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - task disappeared", + task_id="test-task-disappeared", + status=JobState.STARTED.value, + started_at=timezone.now() - datetime.timedelta(minutes=2), # Started 2 mins ago + pipeline=self.pipeline, + source_image_collection=self.source_image_collection, + ) + + # Mock the retry method + with patch.object(job, "retry") as mock_retry: + status_changed = job.check_status(auto_retry=True) + + # Should attempt retry + mock_retry.assert_called_once_with(async_task=True) + self.assertTrue(status_changed) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_task_disappeared_no_retry_old_job(self, mock_async_result): + """Test that old jobs with disappeared tasks are marked failed, not retried.""" + mock_task = MagicMock() + mock_task.status = None # Task not found + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - old disappeared task", + task_id="test-task-old-disappeared", + status=JobState.STARTED.value, + started_at=timezone.now() - datetime.timedelta(minutes=10), # Started 10 mins ago + ) + + status_changed = job.check_status(auto_retry=True) + + # Should not retry, just mark as failed + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_task_disappeared_auto_retry_disabled(self, mock_async_result): + """Test that auto_retry=False prevents automatic retry.""" + mock_task = MagicMock() + mock_task.status = None # Task not found + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - no auto retry", + task_id="test-task-no-retry", + status=JobState.STARTED.value, + started_at=timezone.now() - datetime.timedelta(minutes=2), + ) + + status_changed = job.check_status(auto_retry=False) + + # Should not retry, just mark as failed + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + self.assertIsNotNone(job.finished_at) + + @patch("ami.jobs.models.AsyncResult") + def test_check_status_task_pending_but_job_running(self, mock_async_result): + """Test that PENDING status from Celery when job thinks it's running indicates disappeared task.""" + mock_task = MagicMock() + mock_task.status = "PENDING" # Celery returns PENDING for unknown tasks + mock_async_result.return_value = mock_task + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - pending but should be running", + task_id="test-task-fake-pending", + status=JobState.STARTED.value, + started_at=timezone.now() - datetime.timedelta(minutes=2), + ) + + status_changed = job.check_status(auto_retry=False) + + # Should detect this as a disappeared task + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE.value) + + +class TestJobConcurrency(TestCase): + """Test concurrent updates to jobs from multiple workers.""" + + def setUp(self): + self.project = Project.objects.create(name="Test project") + self.pipeline = Pipeline.objects.create( + name="Test ML pipeline", + description="Test ML pipeline", + ) + self.pipeline.projects.add(self.project) + + def test_atomic_job_update_context_manager(self): + """Test that atomic_job_update locks and updates the job.""" + from ami.jobs.models import atomic_job_update + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - atomic update", + pipeline=self.pipeline, + ) + + # Use the context manager to update the job + with atomic_job_update(job.pk) as locked_job: + locked_job.logs.stdout.insert(0, "Test log message") + locked_job.save(update_fields=["logs"], update_progress=False) + + # Refresh from DB and verify the update persisted + job.refresh_from_db() + self.assertIn("Test log message", job.logs.stdout) + + def test_concurrent_log_writes(self): + """Test that concurrent log writes don't overwrite each other.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - concurrent logs", + pipeline=self.pipeline, + ) + + # Simulate multiple workers adding logs + messages = [f"Log message {i}" for i in range(5)] + + for msg in messages: + # Use the logger which uses JobLogHandler with atomic updates + job.logger.info(msg) + + # Refresh from DB + job.refresh_from_db() + + # All messages should be present (no overwrites) + for msg in messages: + # Messages are formatted with timestamps and log levels + self.assertTrue(any(msg in log for log in job.logs.stdout), f"Message '{msg}' not found in logs") + + def test_log_handler_with_atomic_update(self): + """Test that JobLogHandler properly uses atomic updates.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - log handler", + pipeline=self.pipeline, + ) + + # Get the logger (which adds JobLogHandler) + job_logger = job.logger + + # Add multiple log messages + job_logger.info("Info message") + job_logger.warning("Warning message") + job_logger.error("Error message") + + # Refresh from DB + job.refresh_from_db() + + # Verify all logs are present + self.assertTrue(any("Info message" in log for log in job.logs.stdout)) + self.assertTrue(any("Warning message" in log for log in job.logs.stdout)) + self.assertTrue(any("Error message" in log for log in job.logs.stdout)) + + # Verify error also appears in stderr + self.assertTrue(any("Error message" in err for err in job.logs.stderr)) + + def test_max_log_length_enforcement(self): + """Test that log length limits are enforced with atomic updates.""" + import logging + + from ami.jobs.models import JobLogHandler + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - max logs", + pipeline=self.pipeline, + ) + + # Temporarily suppress log output to avoid spamming test results + job_logger = job.logger + original_level = job_logger.level + job_logger.setLevel(logging.CRITICAL) + + try: + # Add more logs than the max + max_logs = JobLogHandler.max_log_length + for i in range(max_logs + 10): + job.logger.info(f"Message {i}") + + # Refresh from DB + job.refresh_from_db() + + # Should not exceed max length + self.assertLessEqual(len(job.logs.stdout), max_logs) + self.assertLessEqual(len(job.logs.stderr), max_logs) + finally: + # Restore original log level + job_logger.setLevel(original_level) + + def test_log_length_never_decreases(self): + """Test that the save method prevents logs from getting shorter.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - log safety", + pipeline=self.pipeline, + ) + + # Add some logs + job.logger.info("Log message 1") + job.logger.info("Log message 2") + job.logger.info("Log message 3") + + job.refresh_from_db() + initial_log_count = len(job.logs.stdout) + self.assertGreaterEqual(initial_log_count, 3) + + # Simulate stale in-memory job with fewer logs (like what happens with concurrent workers) + stale_job = Job.objects.get(pk=job.pk) + stale_job.logs.stdout = stale_job.logs.stdout[:1] # Artificially reduce logs to just 1 + + # Try to save with update_fields that doesn't include logs + # The safety check should prevent logs from being overwritten + stale_job.status = JobState.STARTED + stale_job.save(update_fields=["status", "progress"]) + + # Verify logs weren't reduced + stale_job.refresh_from_db() + final_log_count = len(stale_job.logs.stdout) + self.assertEqual( + final_log_count, + initial_log_count, + "Logs should never decrease in length when not explicitly updating logs", + ) + + def test_log_can_be_explicitly_updated(self): + """Test that logs CAN be updated when explicitly included in update_fields.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test job - explicit log update", + pipeline=self.pipeline, + ) + + # Add initial logs + job.logger.info("Log message 1") + job.logger.info("Log message 2") + + job.refresh_from_db() + + # Explicitly update logs (like JobLogHandler does) + job.logs.stdout = ["New log only"] + job.save(update_fields=["logs"]) + + # Verify logs were updated as requested + job.refresh_from_db() + self.assertEqual(len(job.logs.stdout), 1) + self.assertEqual(job.logs.stdout[0], "New log only")