From a4681c53aa9ddad672bbe7429fa8cd9909ec4253 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Mon, 19 Jan 2026 17:02:58 +0000 Subject: [PATCH 01/14] Refactor job status handling to align with RQ standards - Updated job status checks across various modules to use "started" and "finished" instead of "processing" and "completed" for consistency with RQ's naming conventions. - Adjusted related logging and response messages to reflect the new status terminology. - Simplified Docker Compose project name handling in test scripts to avoid conflicts and improve clarity in test environment setup. --- backends/advanced/docker-compose-test.yml | 2 + backends/advanced/run-test.sh | 12 +- .../controllers/audio_controller.py | 6 +- .../controllers/queue_controller.py | 76 +++++++-- .../controllers/session_controller.py | 40 ++--- .../routers/modules/obsidian_routes.py | 8 +- .../routers/modules/queue_routes.py | 154 ++++++++---------- .../utils/conversation_utils.py | 11 ++ .../advanced_omi_backend/utils/job_utils.py | 2 +- .../workers/audio_jobs.py | 2 +- .../workers/conversation_jobs.py | 2 +- .../workers/obsidian_jobs.py | 18 +- .../workers/transcription_jobs.py | 2 +- backends/advanced/upload_files.py | 8 +- backends/advanced/webui/src/pages/Queue.tsx | 86 +++++----- backends/advanced/webui/src/pages/Upload.tsx | 4 +- config/defaults.yml | 6 +- tests/Makefile | 44 +++-- tests/bin/rebuild-containers.sh | 44 +---- tests/bin/save-container-logs.sh | 14 +- tests/bin/start-containers.sh | 4 +- tests/bin/start-rebuild-containers.sh | 48 ++++++ tests/bin/status-containers.sh | 9 +- tests/endpoints/audio_upload_tests.robot | 10 +- tests/endpoints/client_queue_tests.robot | 4 +- tests/endpoints/conversation_tests.robot | 6 +- tests/endpoints/health_tests.robot | 4 +- tests/endpoints/plugin_tests.robot | 8 +- tests/endpoints/rq_queue_tests.robot | 14 +- tests/infrastructure/infra_tests.robot | 4 +- tests/integration/conversation_queue.robot | 8 +- tests/integration/integration_test.robot | 24 +-- tests/integration/sdk_tests.robot | 2 +- .../websocket_streaming_tests.robot | 10 +- .../websocket_transcription_e2e_test.robot | 41 ++--- tests/resources/audio_keywords.robot | 18 +- tests/resources/conversation_keywords.robot | 6 +- tests/resources/memory_keywords.robot | 12 +- tests/resources/queue_keywords.robot | 19 ++- tests/resources/websocket_keywords.robot | 4 +- tests/run-no-api-tests.sh | 9 +- tests/run-robot-tests.sh | 7 +- tests/setup-test-containers.sh | 9 +- tests/setup/test_manager_keywords.robot | 4 +- tests/teardown-test-containers.sh | 6 +- 45 files changed, 428 insertions(+), 403 deletions(-) create mode 100755 tests/bin/start-rebuild-containers.sh diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 999b37a2..e89102f5 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -2,6 +2,8 @@ # Isolated test environment for integration tests # Uses different ports to avoid conflicts with development environment +name: backend-test + services: chronicle-backend-test: build: diff --git a/backends/advanced/run-test.sh b/backends/advanced/run-test.sh index c68a30ea..61fd7d55 100755 --- a/backends/advanced/run-test.sh +++ b/backends/advanced/run-test.sh @@ -219,17 +219,13 @@ if [ -d "./data/test_audio_chunks/" ] || [ -d "./data/test_data/" ] || [ -d "./d docker run --rm -v "$(pwd)/data:/data" alpine sh -c 'rm -rf /data/test_*' 2>/dev/null || true fi -# Use unique project name to avoid conflicts with development environment -export COMPOSE_PROJECT_NAME="advanced-backend-test" +# Note: Project name 'backend-test' is set in docker-compose-test.yml +# No need to export COMPOSE_PROJECT_NAME - it's handled by the compose file # Stop any existing test containers print_info "Stopping existing test containers..." -# Try cleanup with current project name docker compose -f docker-compose-test.yml down -v || true -# Also try cleanup with default project name (in case containers were started without COMPOSE_PROJECT_NAME) -COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true - # Run integration tests print_info "Running integration tests..." print_info "Using fresh mode (CACHED_MODE=False) for clean testing" @@ -268,8 +264,6 @@ else if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then print_info "Cleaning up test containers after failure..." docker compose -f docker-compose-test.yml down -v || true - # Also cleanup with default project name - COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true docker system prune -f || true else print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running for debugging" @@ -282,8 +276,6 @@ fi if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then print_info "Cleaning up test containers..." docker compose -f docker-compose-test.yml down -v || true - # Also cleanup with default project name - COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true docker system prune -f || true else print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running" diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 041bd06b..d726a392 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -203,7 +203,7 @@ async def upload_and_process_audio_files( processed_files.append({ "filename": file.filename, - "status": "processing", + "status": "started", # RQ standard: job has been enqueued "conversation_id": conversation_id, "transcript_job_id": transcription_job.id, "speaker_job_id": job_ids['speaker_recognition'], @@ -233,7 +233,7 @@ async def upload_and_process_audio_files( "error": str(e), }) - successful_files = [f for f in processed_files if f.get("status") == "processing"] + successful_files = [f for f in processed_files if f.get("status") == "started"] failed_files = [f for f in processed_files if f.get("status") == "error"] return { @@ -242,7 +242,7 @@ async def upload_and_process_audio_files( "files": processed_files, "summary": { "total": len(files), - "processing": len(successful_files), + "started": len(successful_files), # RQ standard "failed": len(failed_files), }, } diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index 9cd374e0..d804df95 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -17,7 +17,7 @@ import redis from rq import Queue, Worker -from rq.job import Job +from rq.job import Job, JobStatus from rq.registry import ScheduledJobRegistry, DeferredJobRegistry from advanced_omi_backend.models.job import JobPriority @@ -30,6 +30,52 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") redis_conn = redis.from_url(REDIS_URL) + +def get_job_status_from_rq(job: Job) -> str: + """ + Get job status using RQ's native method. + + Uses job.get_status() which is the Redis Queue standard approach. + Returns RQ's standard status names. + + Returns one of: queued, started, finished, failed, deferred, scheduled, canceled, stopped + + Raises: + RuntimeError: If job status is unexpected (should never happen with RQ's method) + """ + rq_status = job.get_status() + + # RQ returns status as JobStatus enum or string + # Convert to string if it's an enum + if isinstance(rq_status, JobStatus): + status_str = rq_status.value + else: + status_str = str(rq_status) + + # Validate it's a known RQ status + valid_statuses = { + JobStatus.QUEUED.value, + JobStatus.STARTED.value, + JobStatus.FINISHED.value, + JobStatus.FAILED.value, + JobStatus.DEFERRED.value, + JobStatus.SCHEDULED.value, + JobStatus.CANCELED.value, + JobStatus.STOPPED.value, + } + + if status_str not in valid_statuses: + logger.error( + f"Job {job.id} has unexpected RQ status: {status_str}. " + f"This indicates RQ library added a new status we don't know about." + ) + raise RuntimeError( + f"Job {job.id} has unknown RQ status: {status_str}. " + f"Please update get_job_status_from_rq() to handle this new status." + ) + + return status_str + # Queue name constants TRANSCRIPTION_QUEUE = "transcription" MEMORY_QUEUE = "memory" @@ -61,34 +107,34 @@ def get_queue(queue_name: str = DEFAULT_QUEUE) -> Queue: def get_job_stats() -> Dict[str, Any]: - """Get statistics about jobs in all queues matching frontend expectations.""" + """Get statistics about jobs in all queues using RQ standard status names.""" total_jobs = 0 queued_jobs = 0 - processing_jobs = 0 - completed_jobs = 0 + started_jobs = 0 # RQ standard: "started" not "processing" + finished_jobs = 0 # RQ standard: "finished" not "completed" failed_jobs = 0 - cancelled_jobs = 0 + canceled_jobs = 0 # RQ standard: "canceled" not "cancelled" deferred_jobs = 0 # Jobs waiting for dependencies (depends_on) for queue_name in QUEUE_NAMES: queue = get_queue(queue_name) queued_jobs += len(queue) - processing_jobs += len(queue.started_job_registry) - completed_jobs += len(queue.finished_job_registry) + started_jobs += len(queue.started_job_registry) + finished_jobs += len(queue.finished_job_registry) failed_jobs += len(queue.failed_job_registry) - cancelled_jobs += len(queue.canceled_job_registry) + canceled_jobs += len(queue.canceled_job_registry) deferred_jobs += len(queue.deferred_job_registry) - total_jobs = queued_jobs + processing_jobs + completed_jobs + failed_jobs + cancelled_jobs + deferred_jobs + total_jobs = queued_jobs + started_jobs + finished_jobs + failed_jobs + canceled_jobs + deferred_jobs return { "total_jobs": total_jobs, "queued_jobs": queued_jobs, - "processing_jobs": processing_jobs, - "completed_jobs": completed_jobs, + "started_jobs": started_jobs, + "finished_jobs": finished_jobs, "failed_jobs": failed_jobs, - "cancelled_jobs": cancelled_jobs, + "canceled_jobs": canceled_jobs, "deferred_jobs": deferred_jobs, "timestamp": datetime.utcnow().isoformat() } @@ -124,11 +170,11 @@ def get_jobs( for qname in queues_to_check: queue = get_queue(qname) - # Collect jobs from all registries + # Collect jobs from all registries (using RQ standard status names) registries = [ (queue.job_ids, "queued"), - (queue.started_job_registry.get_job_ids(), "processing"), - (queue.finished_job_registry.get_job_ids(), "completed"), + (queue.started_job_registry.get_job_ids(), "started"), # RQ standard, not "processing" + (queue.finished_job_registry.get_job_ids(), "finished"), # RQ standard, not "completed" (queue.failed_job_registry.get_job_ids(), "failed"), (queue.deferred_job_registry.get_job_ids(), "deferred"), # Jobs waiting for dependencies ] diff --git a/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py index d1a22695..165bb4c3 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py @@ -57,11 +57,11 @@ async def mark_session_complete( """ session_key = f"audio:session:{session_id}" await redis_client.hset(session_key, mapping={ - "status": "complete", + "status": "finished", "completed_at": str(time.time()), "completion_reason": reason }) - logger.info(f"✅ Session {session_id[:12]} marked complete: {reason}") + logger.info(f"✅ Session {session_id[:12]} marked finished: {reason}") async def get_session_info(redis_client, session_id: str) -> Optional[Dict]: @@ -231,15 +231,15 @@ async def get_streaming_status(request): # Check if all jobs are complete (including failed jobs) all_jobs_done = all_jobs_complete_for_session(session_id) - # Session is completed if: - # 1. Redis status says complete/finalized AND all jobs done, OR - # 2. All jobs are done (even if status isn't complete yet) - # This ensures sessions with failed jobs move to completed - if status in ["complete", "completed", "finalized"] or all_jobs_done: + # Session is finished if: + # 1. Redis status says finished AND all jobs done, OR + # 2. All jobs are done (even if status isn't finished yet) + # This ensures sessions with failed jobs move to finished + if status == "finished" or all_jobs_done: if all_jobs_done: - # All jobs complete - this is truly a completed session - # Update Redis status if it wasn't already marked complete - if status not in ["complete", "completed", "finalized"]: + # All jobs finished - this is truly a finished session + # Update Redis status if it wasn't already marked finished + if status != "finished": await mark_session_complete(redis_client, session_id, "all_jobs_complete") # Get additional session data for completed sessions @@ -251,7 +251,7 @@ async def get_streaming_status(request): "client_id": session_obj.get("client_id", ""), "conversation_id": session_data.get(b"conversation_id", b"").decode() if session_data and b"conversation_id" in session_data else None, "has_conversation": bool(session_data and session_data.get(b"conversation_id", b"")), - "action": session_data.get(b"action", b"complete").decode() if session_data and b"action" in session_data else "complete", + "action": session_data.get(b"action", b"finished").decode() if session_data and b"action" in session_data else "finished", "reason": session_data.get(b"reason", b"").decode() if session_data and b"reason" in session_data else "", "completed_at": session_obj.get("last_chunk_at", 0), "audio_file": session_data.get(b"audio_file", b"").decode() if session_data and b"audio_file" in session_data else "", @@ -450,26 +450,26 @@ async def get_streaming_status(request): rq_stats = { "transcription_queue": { "queued": transcription_queue.count, - "processing": len(transcription_queue.started_job_registry), - "completed": len(transcription_queue.finished_job_registry), + "started": len(transcription_queue.started_job_registry), + "finished": len(transcription_queue.finished_job_registry), "failed": len(transcription_queue.failed_job_registry), - "cancelled": len(transcription_queue.canceled_job_registry), + "canceled": len(transcription_queue.canceled_job_registry), "deferred": len(transcription_queue.deferred_job_registry) }, "memory_queue": { "queued": memory_queue.count, - "processing": len(memory_queue.started_job_registry), - "completed": len(memory_queue.finished_job_registry), + "started": len(memory_queue.started_job_registry), + "finished": len(memory_queue.finished_job_registry), "failed": len(memory_queue.failed_job_registry), - "cancelled": len(memory_queue.canceled_job_registry), + "canceled": len(memory_queue.canceled_job_registry), "deferred": len(memory_queue.deferred_job_registry) }, "default_queue": { "queued": default_queue.count, - "processing": len(default_queue.started_job_registry), - "completed": len(default_queue.finished_job_registry), + "started": len(default_queue.started_job_registry), + "finished": len(default_queue.finished_job_registry), "failed": len(default_queue.failed_job_registry), - "cancelled": len(default_queue.canceled_job_registry), + "canceled": len(default_queue.canceled_job_registry), "deferred": len(default_queue.deferred_job_registry) } } diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py index e45c51de..f6a46a38 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py @@ -176,14 +176,12 @@ async def get_status(job_id: str, current_user: User = Depends(current_active_us status = job.get_status() if status == "started": status = "running" - if status == "canceled": - status = "cancelled" - + # Get metadata meta = job.meta or {} - + # If meta has status, prefer it (for granular updates) - if "status" in meta and meta["status"] in ("running", "completed", "failed", "cancelled"): + if "status" in meta and meta["status"] in ("running", "finished", "failed", "canceled"): status = meta["status"] total = meta.get("total_files", 0) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py index 8dd9e5f6..14c7ee0e 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py @@ -9,7 +9,7 @@ from typing import List, Optional from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.controllers.queue_controller import get_jobs, get_job_stats, redis_conn, QUEUE_NAMES +from advanced_omi_backend.controllers.queue_controller import get_jobs, get_job_stats, redis_conn, QUEUE_NAMES, get_job_status_from_rq from advanced_omi_backend.users import User from rq.job import Job import redis.asyncio as aioredis @@ -65,18 +65,12 @@ async def get_job_status( if job_user_id != str(current_user.user_id): raise HTTPException(status_code=403, detail="Access forbidden") - # Determine status from registries - status = "unknown" - if job.is_queued: - status = "queued" - elif job.is_started: - status = "processing" - elif job.is_finished: - status = "completed" - elif job.is_failed: - status = "failed" - elif job.is_deferred: - status = "deferred" + # Get status using RQ's native method + try: + status = get_job_status_from_rq(job) + except RuntimeError as e: + logger.error(f"Failed to determine status for job {job_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) return { "job_id": job.id, @@ -106,18 +100,12 @@ async def get_job( if job_user_id != str(current_user.user_id): raise HTTPException(status_code=403, detail="Access forbidden") - # Determine status from registries - status = "unknown" - if job.is_queued: - status = "queued" - elif job.is_started: - status = "processing" - elif job.is_finished: - status = "completed" - elif job.is_failed: - status = "failed" - elif job.is_deferred: - status = "deferred" + # Get status using RQ's native method + try: + status = get_job_status_from_rq(job) + except RuntimeError as e: + logger.error(f"Failed to determine status for job {job_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) return { "job_id": job.id, @@ -157,18 +145,18 @@ async def cancel_job( if job_user_id != str(current_user.user_id): raise HTTPException(status_code=403, detail="Access forbidden") - # Cancel if queued or processing, delete if completed/failed + # Cancel if queued or started, delete if finished/failed if job.is_queued or job.is_started or job.is_deferred or job.is_scheduled: # Cancel the job job.cancel() logger.info(f"Cancelled job {job_id}") return { "job_id": job_id, - "action": "cancelled", - "message": f"Job {job_id} has been cancelled" + "action": "canceled", + "message": f"Job {job_id} has been canceled" } else: - # Delete completed/failed jobs + # Delete finished/failed jobs job.delete() logger.info(f"Deleted job {job_id}") return { @@ -182,7 +170,7 @@ async def cancel_job( raise except Exception as e: logger.error(f"Failed to cancel/delete job {job_id}: {e}") - raise HTTPException(status_code=404, detail=f"Job not found or could not be cancelled: {str(e)}") + raise HTTPException(status_code=404, detail=f"Job not found or could not be canceled: {str(e)}") @router.get("/jobs/by-client/{client_id}") @@ -201,21 +189,14 @@ async def get_jobs_by_client( queues = QUEUE_NAMES def get_job_status(job, registries_map): - """Determine job status from registries.""" - if job.is_queued: - return "queued" - elif job.is_started: - return "processing" - elif job.is_finished: - return "completed" - elif job.is_failed: - return "failed" - elif job.is_deferred: - return "deferred" - elif job.is_scheduled: - return "waiting" - else: - return "unknown" + """Determine job status using RQ's native method.""" + try: + return get_job_status_from_rq(job) + except RuntimeError: + # In nested function, can't raise HTTP exception + # Log and re-raise to be handled by outer scope + logger.error(f"Job {job.id} status determination failed") + raise def process_job_and_dependents(job, queue_name, base_status): """Process a job and recursively find all its dependents.""" @@ -270,15 +251,15 @@ def process_job_and_dependents(job, queue_name, base_status): for queue_name in queues: queue = get_queue(queue_name) - # Check all registries + # Check all registries (using RQ standard status names) registries = [ ("queued", queue.job_ids), - ("processing", StartedJobRegistry(queue=queue).get_job_ids()), - ("completed", FinishedJobRegistry(queue=queue).get_job_ids()), + ("started", StartedJobRegistry(queue=queue).get_job_ids()), # RQ standard + ("finished", FinishedJobRegistry(queue=queue).get_job_ids()), # RQ standard ("failed", FailedJobRegistry(queue=queue).get_job_ids()), - ("cancelled", CanceledJobRegistry(queue=queue).get_job_ids()), - ("waiting", DeferredJobRegistry(queue=queue).get_job_ids()), - ("waiting", ScheduledJobRegistry(queue=queue).get_job_ids()) + ("canceled", CanceledJobRegistry(queue=queue).get_job_ids()), # RQ standard (US spelling) + ("deferred", DeferredJobRegistry(queue=queue).get_job_ids()), + ("scheduled", ScheduledJobRegistry(queue=queue).get_job_ids()) ] for status_name, job_ids in registries: @@ -329,7 +310,7 @@ async def get_queue_stats_endpoint( except Exception as e: logger.error(f"Failed to get queue stats: {e}") - return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0, "cancelled_jobs": 0, "deferred_jobs": 0} + return {"total_jobs": 0, "queued_jobs": 0, "started_jobs": 0, "finished_jobs": 0, "failed_jobs": 0, "canceled_jobs": 0, "deferred_jobs": 0} @router.get("/worker-details") @@ -480,13 +461,13 @@ async def get_stream_info(stream_key): class FlushJobsRequest(BaseModel): older_than_hours: int = 24 - statuses: List[str] = ["completed", "failed", "cancelled"] + statuses: List[str] = ["finished", "failed", "canceled"] # RQ standard status names class FlushAllJobsRequest(BaseModel): confirm: bool include_failed: bool = False # By default, preserve failed jobs for debugging - include_completed: bool = False # By default, preserve completed jobs for debugging + include_finished: bool = False # By default, preserve finished jobs for debugging @router.post("/flush") @@ -512,8 +493,8 @@ async def flush_jobs( for queue_name in queues: queue = get_queue(queue_name) - # Flush from appropriate registries based on requested statuses - if "completed" in request.statuses: + # Flush from appropriate registries based on requested statuses (RQ standard names) + if "finished" in request.statuses: # RQ standard, not "completed" registry = FinishedJobRegistry(queue=queue) for job_id in registry.get_job_ids(): try: @@ -535,7 +516,7 @@ async def flush_jobs( except Exception as e: logger.error(f"Error deleting job {job_id}: {e}") - if "cancelled" in request.statuses: + if "canceled" in request.statuses: # RQ standard (US spelling), not "cancelled" registry = CanceledJobRegistry(queue=queue) for job_id in registry.get_job_ids(): try: @@ -564,8 +545,8 @@ async def flush_all_jobs( ): """ Flush jobs from queues and registries. - By default preserves failed and completed jobs for debugging. - Set include_failed=true or include_completed=true to flush those as well. + By default preserves failed and finished jobs for debugging. + Set include_failed=true or include_finished=true to flush those as well. """ if not current_user.is_superuser: raise HTTPException(status_code=403, detail="Admin access required") @@ -607,7 +588,7 @@ async def flush_all_jobs( # Conditionally add failed and finished registries if request.include_failed: registries.append(("failed", FailedJobRegistry(queue=queue))) - if request.include_completed: + if request.include_finished: registries.append(("finished", FinishedJobRegistry(queue=queue))) for registry_name, registry in registries: @@ -691,8 +672,8 @@ async def flush_all_jobs( preserved = [] if not request.include_failed: preserved.append("failed jobs") - if not request.include_completed: - preserved.append("completed jobs") + if not request.include_finished: + preserved.append("finished jobs") preserved_msg = f" (preserved {', '.join(preserved)})" if preserved else "" logger.info(f"Flushed {total_removed} jobs and {deleted_keys} Redis keys from all queues{preserved_msg}") @@ -833,7 +814,7 @@ async def get_dashboard_data( """Get all data needed for the Queue dashboard in a single API call. Returns: - - Jobs grouped by status (queued, processing, completed, failed) + - Jobs grouped by status (queued, started, finished, failed) - Queue statistics - Streaming status - Client jobs for expanded clients @@ -858,12 +839,12 @@ async def fetch_jobs_by_status(status_name: str, limit: int = 100): for queue_name in queues: queue = get_queue(queue_name) - # Get job IDs based on status + # Get job IDs based on status (using RQ standard status names) if status_name == "queued": job_ids = queue.job_ids[:limit] - elif status_name == "processing": + elif status_name == "started": # RQ standard, not "processing" job_ids = list(StartedJobRegistry(queue=queue).get_job_ids())[:limit] - elif status_name == "completed": + elif status_name == "finished": # RQ standard, not "completed" job_ids = list(FinishedJobRegistry(queue=queue).get_job_ids())[:limit] elif status_name == "failed": job_ids = list(FailedJobRegistry(queue=queue).get_job_ids())[:limit] @@ -917,7 +898,7 @@ async def fetch_stats(): return get_job_stats() except Exception as e: logger.error(f"Error fetching stats: {e}") - return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0} + return {"total_jobs": 0, "queued_jobs": 0, "started_jobs": 0, "finished_jobs": 0, "failed_jobs": 0} async def fetch_streaming_status(): """Fetch streaming status.""" @@ -941,17 +922,12 @@ async def fetch_client_jobs(client_id: str): queues = QUEUE_NAMES def get_job_status(job): - if job.is_queued: - return "queued" - elif job.is_started: - return "processing" - elif job.is_finished: - return "completed" - elif job.is_failed: - return "failed" - elif job.is_deferred: - return "deferred" - else: + """Get job status using RQ's native method.""" + try: + return get_job_status_from_rq(job) + except RuntimeError: + logger.error(f"Job {job.id} status determination failed") + # Return unknown as fallback in dashboard context return "unknown" # Find all jobs for this session @@ -966,8 +942,8 @@ def get_job_status(job): registries = [ ("queued", queue.job_ids), - ("processing", StartedJobRegistry(queue=queue).get_job_ids()), - ("completed", FinishedJobRegistry(queue=queue).get_job_ids()), + ("started", StartedJobRegistry(queue=queue).get_job_ids()), # RQ standard + ("finished", FinishedJobRegistry(queue=queue).get_job_ids()), # RQ standard ("failed", FailedJobRegistry(queue=queue).get_job_ids()) ] @@ -1016,10 +992,10 @@ def get_job_status(job): logger.error(f"Error fetching jobs for client {client_id}: {e}") return {"client_id": client_id, "jobs": []} - # Execute all fetches in parallel + # Execute all fetches in parallel (using RQ standard status names) queued_jobs_task = fetch_jobs_by_status("queued", limit=100) - processing_jobs_task = fetch_jobs_by_status("processing", limit=100) - completed_jobs_task = fetch_jobs_by_status("completed", limit=50) + started_jobs_task = fetch_jobs_by_status("started", limit=100) # RQ standard, not "processing" + finished_jobs_task = fetch_jobs_by_status("finished", limit=50) # RQ standard, not "completed" failed_jobs_task = fetch_jobs_by_status("failed", limit=50) stats_task = fetch_stats() streaming_status_task = fetch_streaming_status() @@ -1027,8 +1003,8 @@ def get_job_status(job): results = await asyncio.gather( queued_jobs_task, - processing_jobs_task, - completed_jobs_task, + started_jobs_task, + finished_jobs_task, failed_jobs_task, stats_task, streaming_status_task, @@ -1037,8 +1013,8 @@ def get_job_status(job): ) queued_jobs = results[0] if not isinstance(results[0], Exception) else [] - processing_jobs = results[1] if not isinstance(results[1], Exception) else [] - completed_jobs = results[2] if not isinstance(results[2], Exception) else [] + started_jobs = results[1] if not isinstance(results[1], Exception) else [] # RQ standard + finished_jobs = results[2] if not isinstance(results[2], Exception) else [] # RQ standard failed_jobs = results[3] if not isinstance(results[3], Exception) else [] stats = results[4] if not isinstance(results[4], Exception) else {"total_jobs": 0} streaming_status = results[5] if not isinstance(results[5], Exception) else {"active_sessions": []} @@ -1066,8 +1042,8 @@ def get_job_status(job): return { "jobs": { "queued": queued_jobs, - "processing": processing_jobs, - "completed": completed_jobs, + "started": started_jobs, # RQ standard status name + "finished": finished_jobs, # RQ standard status name "failed": failed_jobs }, "stats": stats, diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index ca62372b..c0f92408 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -81,6 +81,10 @@ def analyze_speech(transcript_data: dict) -> dict: settings = get_speech_detection_settings() words = transcript_data.get("words", []) + logger.info(f"🔬 analyze_speech: words_list_length={len(words)}, settings={settings}") + if words and len(words) > 0: + logger.info(f"📝 First 3 words: {words[:3]}") + # Method 1: Word-level analysis (preferred - has confidence scores and timing) if words: # Filter by confidence threshold @@ -98,6 +102,12 @@ def analyze_speech(transcript_data: dict) -> dict: speech_end = valid_words[-1].get("end", 0) speech_duration = speech_end - speech_start + # Debug logging for timestamp investigation + logger.info( + f"🕐 Speech timing: start={speech_start:.2f}s, end={speech_end:.2f}s, " + f"duration={speech_duration:.2f}s (first_word={valid_words[0]}, last_word={valid_words[-1]})" + ) + # If no timing data (duration = 0), fall back to text-only analysis # This happens with some streaming transcription services if speech_duration == 0: @@ -106,6 +116,7 @@ def analyze_speech(transcript_data: dict) -> dict: else: # Check minimum duration threshold when we have timing data min_duration = settings.get("min_duration", 10.0) + logger.info(f"📏 Comparing duration {speech_duration:.1f}s vs threshold {min_duration:.1f}s") if speech_duration < min_duration: return { "has_speech": False, diff --git a/backends/advanced/src/advanced_omi_backend/utils/job_utils.py b/backends/advanced/src/advanced_omi_backend/utils/job_utils.py index ba9fcc74..c9028909 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/job_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/job_utils.py @@ -44,7 +44,7 @@ async def check_job_alive(redis_client, current_job, session_id: Optional[str] = if session_id: session_key = f"audio:session:{session_id}" session_status = await redis_client.hget(session_key, "status") - if session_status and session_status.decode() in ["finalizing", "complete", "closed"]: + if session_status and session_status.decode() in ["finalizing", "finished"]: # Session ended naturally - not a zombie, just natural cleanup logger.debug(f"📋 Job {current_job.id} ending naturally (session closed)") return False diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 58acad62..8505d547 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -210,7 +210,7 @@ async def flush_pcm_buffer(): # Check if session is finalizing session_status = await redis_client.hget(session_key, "status") - if session_status and session_status.decode() in ["finalizing", "complete"]: + if session_status and session_status.decode() in ["finalizing", "finished"]: logger.info(f"🛑 Session finalizing detected, flushing final chunks...") await asyncio.sleep(0.5) # Brief wait for in-flight chunks diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index e458a7fe..8edc01e4 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -315,7 +315,7 @@ async def open_conversation_job( status = await redis_client.hget(session_key, "status") status_str = status.decode() if status else None - if status_str in ["finalizing", "complete"]: + if status_str in ["finalizing", "finished"]: finalize_received = True # Get completion reason (guaranteed to exist with unified API) diff --git a/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py index 1956f00b..8c67616d 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py @@ -34,7 +34,7 @@ async def ingest_obsidian_vault_job(job_id: str, vault_path: str, redis_client=N logger.info("Starting Obsidian ingestion job %s", job.id) # Initialize job meta - job.meta["status"] = "processing" + job.meta["status"] = "started" job.meta["processed"] = 0 job.meta["total_files"] = 0 job.meta["errors"] = [] @@ -74,10 +74,10 @@ async def ingest_obsidian_vault_job(job_id: str, vault_path: str, redis_client=N # Check for cancellation job.refresh() if job.get_status() == "canceled": - logger.info("Obsidian ingestion job %s cancelled by user", job.id) - job.meta["status"] = "cancelled" + logger.info("Obsidian ingestion job %s canceled by user", job.id) + job.meta["status"] = "canceled" job.save_meta() - return {"status": "cancelled"} + return {"status": "canceled"} try: note_data = obsidian_service.parse_obsidian_note(root, filename, vault_path) @@ -96,12 +96,12 @@ async def ingest_obsidian_vault_job(job_id: str, vault_path: str, redis_client=N job.meta["errors"] = errors job.save_meta() - job.meta["status"] = "completed" + job.meta["status"] = "finished" job.save_meta() - + return { - "status": "completed", - "processed": processed, - "total": total, + "status": "finished", + "processed": processed, + "total": total, "errors": errors } diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index 70935e1a..f492c910 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -612,7 +612,7 @@ async def stream_speech_detection_job( # Check if session has closed session_status = await redis_client.hget(session_key, "status") - session_closed = session_status and session_status.decode() in ["complete", "closed"] + session_closed = session_status and session_status.decode() in ["finalizing", "finished"] if session_closed and session_closed_at is None: # Session just closed - start grace period for final transcription diff --git a/backends/advanced/upload_files.py b/backends/advanced/upload_files.py index ead58e74..77a001f3 100755 --- a/backends/advanced/upload_files.py +++ b/backends/advanced/upload_files.py @@ -321,14 +321,14 @@ def poll_job_status(job_id: str, token: str, base_url: str, total_files: int) -> last_progress = progress last_current_file = current_file - # Check completion status - if status == "completed": + # Check completion status (RQ standard: "finished") + if status == "finished": elapsed = time.time() - start_time logger.info(f"🎉 Job completed successfully in {elapsed:.0f}s!") - + # Show final file status summary files = job_status.get("files", []) - completed = len([f for f in files if f.get("status") == "completed"]) + completed = len([f for f in files if f.get("status") == "finished"]) failed = len([f for f in files if f.get("status") == "failed"]) skipped = len([f for f in files if f.get("status") == "skipped"]) diff --git a/backends/advanced/webui/src/pages/Queue.tsx b/backends/advanced/webui/src/pages/Queue.tsx index 05b56fb6..b05f9374 100644 --- a/backends/advanced/webui/src/pages/Queue.tsx +++ b/backends/advanced/webui/src/pages/Queue.tsx @@ -27,10 +27,10 @@ import { queueApi } from '../services/api'; interface QueueStats { total_jobs: number; queued_jobs: number; - processing_jobs: number; - completed_jobs: number; + started_jobs: number; // RQ standard, not "processing_jobs" + finished_jobs: number; // RQ standard, not "completed_jobs" failed_jobs: number; - cancelled_jobs: number; + canceled_jobs: number; // RQ standard (US spelling), not "cancelled_jobs" deferred_jobs: number; timestamp: string; } @@ -131,10 +131,10 @@ const Queue: React.FC = () => { const [showFlushModal, setShowFlushModal] = useState(false); const [flushSettings, setFlushSettings] = useState({ older_than_hours: 24, - statuses: ['completed', 'failed'], + statuses: ['finished', 'failed'], // RQ standard status names flush_all: false, include_failed: false, // For flush_all mode - include_completed: false // For flush_all mode + include_completed: false // For flush_all mode (note: API expects include_completed for backward compat) }); const [flushing, setFlushing] = useState(false); const [expandedConversations, setExpandedConversations] = useState>(new Set()); @@ -186,19 +186,19 @@ const Queue: React.FC = () => { const response = await queueApi.getDashboard(expandedConversationIds); const dashboardData = response.data; - // Extract jobs from response + // Extract jobs from response (using RQ standard status names) const queuedJobs = dashboardData.jobs.queued || []; - const processingJobs = dashboardData.jobs.processing || []; - const completedJobs = dashboardData.jobs.completed || []; + const startedJobs = dashboardData.jobs.started || []; // RQ standard, not "processing" + const finishedJobs = dashboardData.jobs.finished || []; // RQ standard, not "completed" const failedJobs = dashboardData.jobs.failed || []; // Combine all jobs - const allFetchedJobs = [...queuedJobs, ...processingJobs, ...completedJobs, ...failedJobs]; + const allFetchedJobs = [...queuedJobs, ...startedJobs, ...finishedJobs, ...failedJobs]; console.log(`📊 Fetched ${allFetchedJobs.length} total jobs via consolidated endpoint`); console.log(` - Queued: ${queuedJobs.length}`); - console.log(` - Processing: ${processingJobs.length}`); - console.log(` - Completed: ${completedJobs.length}`); + console.log(` - Started: ${startedJobs.length}`); // RQ standard + console.log(` - Finished: ${finishedJobs.length}`); // RQ standard console.log(` - Failed: ${failedJobs.length}`); // Debug: Log open_conversation_job details @@ -266,7 +266,7 @@ const Queue: React.FC = () => { // Find all conversations with active open_conversation_job Object.entries(jobsByConversation).forEach(([_conversationId, jobs]) => { const openConvJob = jobs.find((j: any) => j.job_type === 'open_conversation_job'); - if (openConvJob && openConvJob.status === 'processing') { + if (openConvJob && openConvJob.status === 'started') { const conversationId = openConvJob.meta?.conversation_id; if (conversationId && !expandedConversations.has(conversationId)) { newExpanded.add(conversationId); @@ -441,12 +441,12 @@ const Queue: React.FC = () => { const getStatusIcon = (status: string) => { switch (status) { case 'queued': return ; - case 'processing': return ; - case 'completed': return ; + case 'started': return ; // RQ standard + case 'finished': return ; // RQ standard case 'failed': return ; - case 'cancelled': return ; + case 'canceled': return ; // RQ standard (US spelling) case 'deferred': return ; - case 'waiting': return ; + case 'scheduled': return ; // RQ standard, not "waiting" default: return ; } }; @@ -454,12 +454,12 @@ const Queue: React.FC = () => { const getStatusColor = (status: string) => { switch (status) { case 'queued': return 'text-yellow-600 bg-yellow-100'; - case 'processing': return 'text-blue-600 bg-blue-100'; - case 'completed': return 'text-green-600 bg-green-100'; + case 'started': return 'text-blue-600 bg-blue-100'; // RQ standard + case 'finished': return 'text-green-600 bg-green-100'; // RQ standard case 'failed': return 'text-red-600 bg-red-100'; - case 'cancelled': return 'text-gray-600 bg-gray-100'; + case 'canceled': return 'text-gray-600 bg-gray-100'; // RQ standard (US spelling) case 'deferred': return 'text-blue-600 bg-blue-100'; - case 'waiting': return 'text-blue-600 bg-blue-100'; + case 'scheduled': return 'text-blue-600 bg-blue-100'; // RQ standard, not "waiting" default: return 'text-gray-600 bg-gray-100'; } }; @@ -536,7 +536,7 @@ const Queue: React.FC = () => { borderColor = 'border-red-600'; } // Processing jobs - add pulse animation - else if (status === 'processing') { + else if (status === 'started') { bgColor = bgColor + ' animate-pulse'; } @@ -634,7 +634,7 @@ const Queue: React.FC = () => { // For failed/finished jobs, use completed_at or ended_at. For running jobs, use current time. const end = job.completed_at || job.ended_at ? new Date((job.completed_at || job.ended_at)!).getTime() - : (job.status === 'processing' ? Date.now() : start); // Don't show increasing time for failed jobs + : (job.status === 'started' ? Date.now() : start); // Don't show increasing time for failed jobs const durationMs = end - start; if (durationMs < 1000) return `${durationMs}ms`; @@ -758,10 +758,10 @@ const Queue: React.FC = () => {
- 0 ? 'animate-pulse' : ''}`} /> + 0 ? 'animate-pulse' : ''}`} />
-

Processing

-

{stats.processing_jobs}

+

Started

+

{stats.started_jobs}

@@ -770,8 +770,8 @@ const Queue: React.FC = () => {
-

Completed

-

{stats.completed_jobs}

+

Finished

+

{stats.finished_jobs}

@@ -790,8 +790,8 @@ const Queue: React.FC = () => {
-

Cancelled

-

{stats.cancelled_jobs}

+

Canceled

+

{stats.canceled_jobs}

@@ -890,7 +890,7 @@ const Queue: React.FC = () => { const allListenJobs = allJobs.filter((job: any) => job && job.job_type === 'stream_speech_detection_job' && job.meta?.client_id === clientId && - job.status !== 'completed' && + job.status !== 'finished' && job.status !== 'failed' ); @@ -1092,7 +1092,7 @@ const Queue: React.FC = () => { // Filter to only show conversations where at least one job is NOT completed const conversationMap = new Map(); allConversationJobs.forEach((jobs, conversationId) => { - const hasActiveJob = jobs.some(j => j.status !== 'completed' && j.status !== 'failed'); + const hasActiveJob = jobs.some(j => j.status !== 'finished' && j.status !== 'failed'); if (hasActiveJob) { conversationMap.set(conversationId, jobs); } @@ -1211,7 +1211,7 @@ const Queue: React.FC = () => { const startTime = new Date(job.started_at!).getTime(); const endTime = job.completed_at || job.ended_at ? new Date((job.completed_at || job.ended_at)!).getTime() - : (job.status === 'processing' ? Date.now() : startTime); + : (job.status === 'started' ? Date.now() : startTime); return { job, @@ -1299,7 +1299,7 @@ const Queue: React.FC = () => {
{/* Job Bar */}
{ // Filter to only show conversations where ALL jobs are completed or failed const conversationMap = new Map(); allConversationJobs.forEach((jobs, conversationId) => { - const allJobsComplete = jobs.every(j => j.status === 'completed' || j.status === 'failed'); + const allJobsComplete = jobs.every(j => j.status === 'finished' || j.status === 'failed'); if (allJobsComplete) { conversationMap.set(conversationId, jobs); } @@ -1604,7 +1604,7 @@ const Queue: React.FC = () => { const summary = transcriptionMeta.summary || null; // Check job statuses - const allComplete = jobs.every(j => j.status === 'completed'); + const allComplete = jobs.every(j => j.status === 'finished'); const hasFailedJob = jobs.some(j => j.status === 'failed'); const failedJobCount = jobs.filter(j => j.status === 'failed').length; @@ -1730,7 +1730,7 @@ const Queue: React.FC = () => { const startTime = new Date(job.started_at!).getTime(); const endTime = job.completed_at || job.ended_at ? new Date((job.completed_at || job.ended_at)!).getTime() - : (job.status === 'processing' ? Date.now() : startTime); + : (job.status === 'started' ? Date.now() : startTime); return { job, @@ -1818,7 +1818,7 @@ const Queue: React.FC = () => {
{/* Job Bar */}
{ > - - + + - +
@@ -2158,7 +2158,7 @@ const Queue: React.FC = () => { > - {(job.status === 'queued' || job.status === 'processing') && ( + {(job.status === 'queued' || job.status === 'started') && ( )} - {job.status === 'completed' && ( + {job.status === 'finished' && ( +