diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 999b37a2..7e0ad0b6 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -2,6 +2,8 @@ # Isolated test environment for integration tests # Uses different ports to avoid conflicts with development environment +name: backend-test + services: chronicle-backend-test: build: @@ -16,7 +18,8 @@ services: - ./data/test_audio_chunks:/app/audio_chunks - ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database - ./data/test_data:/app/data - - ../../config:/app/config # Mount config directory with defaults.yml and config.yml + - ../../config:/app/config # Mount config directory with defaults.yml + - ../../tests/configs:/app/test-configs:ro # Mount test-specific configs - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/config/plugins.yml # Mount test plugins config to correct location environment: # Override with test-specific settings @@ -25,6 +28,8 @@ services: - QDRANT_PORT=6333 - REDIS_URL=redis://redis-test:6379/0 - DEBUG_DIR=/app/debug # Fixed: match plugin database mount path + # Test configuration file + - CONFIG_FILE=${TEST_CONFIG_FILE:-/app/test-configs/deepgram-openai.yml} # Import API keys from environment - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} - OPENAI_API_KEY=${OPENAI_API_KEY} @@ -45,13 +50,16 @@ services: # Speaker recognition controlled by config.yml (disabled in test config for CI performance) - SPEAKER_SERVICE_URL=http://speaker-service-test:8085 - CORS_ORIGINS=http://localhost:3001,http://localhost:8001,https://localhost:3001,https://localhost:8001 - # Set low inactivity timeout for tests (2 seconds instead of 60) - - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Set inactivity timeout for tests (20 seconds of audio time) + # This is audio duration, not wall-clock time + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=20 # Set low speech detection thresholds for tests - SPEECH_DETECTION_MIN_DURATION=2.0 # 2 seconds instead of 10 - SPEECH_DETECTION_MIN_WORDS=5 # 5 words instead of 10 # Wait for audio queue to drain before timing out (test mode) - WAIT_FOR_AUDIO_QUEUE_DRAIN=true + # Mock speaker recognition for tests (avoids resource-intensive ML service) + - USE_MOCK_SPEAKER_CLIENT=true depends_on: qdrant-test: condition: service_started @@ -168,7 +176,8 @@ services: - ./data/test_audio_chunks:/app/audio_chunks - ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database - ./data/test_data:/app/data - - ../../config:/app/config # Mount config directory with defaults.yml and config.yml + - ../../config:/app/config # Mount config directory with defaults.yml + - ../../tests/configs:/app/test-configs:ro # Mount test-specific configs - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/config/plugins.yml # Mount test plugins config to correct location environment: # Same environment as backend @@ -177,6 +186,8 @@ services: - QDRANT_PORT=6333 - REDIS_URL=redis://redis-test:6379/0 - DEBUG_DIR=/app/debug # Fixed: match plugin database mount path + # Test configuration file + - CONFIG_FILE=${TEST_CONFIG_FILE:-/app/test-configs/deepgram-openai.yml} - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} - OPENAI_API_KEY=${OPENAI_API_KEY} - GROQ_API_KEY=${GROQ_API_KEY} @@ -192,13 +203,16 @@ services: - MYCELIA_DB=mycelia_test # Speaker recognition controlled by config.yml (disabled in test config for CI performance) - SPEAKER_SERVICE_URL=http://speaker-service-test:8085 - # Set low inactivity timeout for tests (2 seconds instead of 60) - - SPEECH_INACTIVITY_THRESHOLD_SECONDS=2 + # Set inactivity timeout for tests (20 seconds of audio time) + # This is audio duration, not wall-clock time + - SPEECH_INACTIVITY_THRESHOLD_SECONDS=20 # Set low speech detection thresholds for tests - SPEECH_DETECTION_MIN_DURATION=2.0 # 2 seconds instead of 10 - SPEECH_DETECTION_MIN_WORDS=5 # 5 words instead of 10 # Wait for audio queue to drain before timing out (test mode) - WAIT_FOR_AUDIO_QUEUE_DRAIN=true + # Mock speaker recognition for tests (avoids resource-intensive ML service) + - USE_MOCK_SPEAKER_CLIENT=true depends_on: chronicle-backend-test: condition: service_healthy diff --git a/backends/advanced/run-test.sh b/backends/advanced/run-test.sh index c68a30ea..61fd7d55 100755 --- a/backends/advanced/run-test.sh +++ b/backends/advanced/run-test.sh @@ -219,17 +219,13 @@ if [ -d "./data/test_audio_chunks/" ] || [ -d "./data/test_data/" ] || [ -d "./d docker run --rm -v "$(pwd)/data:/data" alpine sh -c 'rm -rf /data/test_*' 2>/dev/null || true fi -# Use unique project name to avoid conflicts with development environment -export COMPOSE_PROJECT_NAME="advanced-backend-test" +# Note: Project name 'backend-test' is set in docker-compose-test.yml +# No need to export COMPOSE_PROJECT_NAME - it's handled by the compose file # Stop any existing test containers print_info "Stopping existing test containers..." -# Try cleanup with current project name docker compose -f docker-compose-test.yml down -v || true -# Also try cleanup with default project name (in case containers were started without COMPOSE_PROJECT_NAME) -COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true - # Run integration tests print_info "Running integration tests..." print_info "Using fresh mode (CACHED_MODE=False) for clean testing" @@ -268,8 +264,6 @@ else if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then print_info "Cleaning up test containers after failure..." docker compose -f docker-compose-test.yml down -v || true - # Also cleanup with default project name - COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true docker system prune -f || true else print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running for debugging" @@ -282,8 +276,6 @@ fi if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then print_info "Cleaning up test containers..." docker compose -f docker-compose-test.yml down -v || true - # Also cleanup with default project name - COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true docker system prune -f || true else print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running" diff --git a/backends/advanced/src/advanced_omi_backend/app_config.py b/backends/advanced/src/advanced_omi_backend/app_config.py index 15e825ec..c87398f3 100644 --- a/backends/advanced/src/advanced_omi_backend/app_config.py +++ b/backends/advanced/src/advanced_omi_backend/app_config.py @@ -29,8 +29,7 @@ class AppConfig: def __init__(self): # MongoDB Configuration self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017") - # default to legacy value to avoid breaking peoples .env - self.mongodb_database = os.getenv("MONGODB_DATABASE", "friend-lite") + self.mongodb_database = os.getenv("MONGODB_DATABASE", "chronicle") self.mongo_client = AsyncIOMotorClient(self.mongodb_uri) self.db = self.mongo_client.get_default_database(self.mongodb_database) self.users_col = self.db["users"] diff --git a/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py index 07a401a4..ee33b86c 100644 --- a/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py +++ b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py @@ -555,6 +555,39 @@ async def _stop(): logger.info(f"Stream {stream_id} stopped, sent {total_chunks} chunks") return total_chunks + def close_stream_without_stop(self, stream_id: str) -> int: + """Close WebSocket connection without sending audio-stop event. + + This simulates abrupt disconnection (network failure, client crash) + and should trigger websocket_disconnect end_reason. + + Args: + stream_id: Stream session ID + + Returns: + Total chunks sent during this session + """ + session = self._sessions.get(stream_id) + if not session: + raise ValueError(f"Unknown stream_id: {stream_id}") + + async def _close_abruptly(): + # Just close the connection without audio-stop + await session.client.close() + + future = asyncio.run_coroutine_threadsafe(_close_abruptly(), session.loop) + future.result(timeout=10) + + # Stop the event loop + session.loop.call_soon_threadsafe(session.loop.stop) + session.thread.join(timeout=5) + + total_chunks = session.chunk_count + del self._sessions[stream_id] + + logger.info(f"Stream {stream_id} closed abruptly (no audio-stop), sent {total_chunks} chunks") + return total_chunks + def get_session(self, stream_id: str) -> Optional[StreamSession]: """Get session info for a stream.""" return self._sessions.get(stream_id) diff --git a/backends/advanced/src/advanced_omi_backend/config_loader.py b/backends/advanced/src/advanced_omi_backend/config_loader.py index 6b504c79..5d25debd 100644 --- a/backends/advanced/src/advanced_omi_backend/config_loader.py +++ b/backends/advanced/src/advanced_omi_backend/config_loader.py @@ -55,7 +55,14 @@ def load_config(force_reload: bool = False) -> DictConfig: config_dir = get_config_dir() defaults_path = config_dir / "defaults.yml" - config_path = config_dir / "config.yml" + + # Support CONFIG_FILE env var for test configurations + config_file = os.getenv("CONFIG_FILE", "config.yml") + # Handle both absolute paths and relative filenames + if os.path.isabs(config_file): + config_path = Path(config_file) + else: + config_path = config_dir / config_file # Load defaults defaults = {} diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 041bd06b..734df6ed 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -14,13 +14,22 @@ from fastapi import UploadFile from fastapi.responses import JSONResponse +from advanced_omi_backend.controllers.queue_controller import ( + JOB_RESULT_TTL, + start_post_conversation_jobs, + transcription_queue, +) from advanced_omi_backend.models.conversation import create_conversation from advanced_omi_backend.models.user import User +from advanced_omi_backend.services.transcription import is_transcription_available from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks from advanced_omi_backend.utils.audio_utils import ( AudioValidationError, validate_and_prepare_audio, ) +from advanced_omi_backend.workers.transcription_jobs import ( + transcribe_full_audio_job, +) logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -94,7 +103,8 @@ async def upload_and_process_audio_files( audio_data, sample_rate, sample_width, channels, duration = await validate_and_prepare_audio( audio_data=content, expected_sample_rate=16000, # Expecting 16kHz - convert_to_mono=True # Convert stereo to mono + convert_to_mono=True, # Convert stereo to mono + auto_resample=True # Auto-resample if sample rate doesn't match ) except AudioValidationError as e: processed_files.append({ @@ -166,54 +176,61 @@ async def upload_and_process_audio_files( continue # Enqueue batch transcription job first (file uploads need transcription) - from advanced_omi_backend.controllers.queue_controller import ( - JOB_RESULT_TTL, - start_post_conversation_jobs, - transcription_queue, - ) - from advanced_omi_backend.workers.transcription_jobs import ( - transcribe_full_audio_job, - ) - version_id = str(uuid.uuid4()) transcribe_job_id = f"transcribe_{conversation_id[:12]}" - transcription_job = transcription_queue.enqueue( - transcribe_full_audio_job, - conversation_id, - version_id, - "batch", # trigger - job_timeout=1800, # 30 minutes - result_ttl=JOB_RESULT_TTL, - job_id=transcribe_job_id, - description=f"Transcribe uploaded file {conversation_id[:8]}", - meta={'conversation_id': conversation_id, 'client_id': client_id} - ) - - audio_logger.info(f"๐Ÿ“ฅ Enqueued transcription job {transcription_job.id} for uploaded file") + # Check if transcription provider is available before enqueueing + transcription_job = None + if is_transcription_available(mode="batch"): + transcription_job = transcription_queue.enqueue( + transcribe_full_audio_job, + conversation_id, + version_id, + "batch", # trigger + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + job_id=transcribe_job_id, + description=f"Transcribe uploaded file {conversation_id[:8]}", + meta={'conversation_id': conversation_id, 'client_id': client_id} + ) + audio_logger.info(f"๐Ÿ“ฅ Enqueued transcription job {transcription_job.id} for uploaded file") + else: + audio_logger.warning( + f"โš ๏ธ Skipping transcription for conversation {conversation_id}: " + "No transcription provider configured" + ) # Enqueue post-conversation processing job chain (depends on transcription) job_ids = start_post_conversation_jobs( conversation_id=conversation_id, user_id=user.user_id, transcript_version_id=version_id, # Pass the version_id from transcription job - depends_on_job=transcription_job, # Wait for transcription to complete + depends_on_job=transcription_job, # Wait for transcription to complete (or None) client_id=client_id # Pass client_id for UI tracking ) processed_files.append({ "filename": file.filename, - "status": "processing", + "status": "started", # RQ standard: job has been enqueued "conversation_id": conversation_id, - "transcript_job_id": transcription_job.id, + "transcript_job_id": transcription_job.id if transcription_job else None, "speaker_job_id": job_ids['speaker_recognition'], "memory_job_id": job_ids['memory'], "duration_seconds": round(duration, 2), }) + # Build job chain description + job_chain = [] + if transcription_job: + job_chain.append(transcription_job.id) + if job_ids['speaker_recognition']: + job_chain.append(job_ids['speaker_recognition']) + if job_ids['memory']: + job_chain.append(job_ids['memory']) + audio_logger.info( f"โœ… Processed {file.filename} โ†’ conversation {conversation_id}, " - f"jobs: {transcription_job.id} โ†’ {job_ids['speaker_recognition']} โ†’ {job_ids['memory']}" + f"jobs: {' โ†’ '.join(job_chain) if job_chain else 'none'}" ) except (OSError, IOError) as e: @@ -233,20 +250,33 @@ async def upload_and_process_audio_files( "error": str(e), }) - successful_files = [f for f in processed_files if f.get("status") == "processing"] + successful_files = [f for f in processed_files if f.get("status") == "started"] failed_files = [f for f in processed_files if f.get("status") == "error"] - return { + response_body = { "message": f"Uploaded and processing {len(successful_files)} file(s)", "client_id": client_id, "files": processed_files, "summary": { "total": len(files), - "processing": len(successful_files), + "started": len(successful_files), # RQ standard "failed": len(failed_files), }, } + # Return appropriate HTTP status code based on results + if len(failed_files) == len(files): + # ALL files failed - return 400 Bad Request + audio_logger.error(f"All {len(files)} file(s) failed to upload") + return JSONResponse(status_code=400, content=response_body) + elif len(failed_files) > 0: + # SOME files failed (partial success) - return 207 Multi-Status + audio_logger.warning(f"Partial upload: {len(successful_files)} succeeded, {len(failed_files)} failed") + return JSONResponse(status_code=207, content=response_body) + else: + # All files succeeded - return 200 OK + return response_body + except (OSError, IOError) as e: # File system errors during upload handling audio_logger.exception("File I/O error in upload_and_process_audio_files") diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index f8afaf9d..e6aac85a 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -561,6 +561,172 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use return JSONResponse(status_code=500, content={"error": "Error starting memory reprocessing"}) +async def reprocess_speakers( + conversation_id: str, + transcript_version_id: str, + user: User +): + """ + Reprocess speaker identification for a specific transcript version. + Users can only reprocess their own conversations. + + Creates NEW transcript version with same text/words but re-identified speakers. + Automatically chains memory reprocessing since speaker attribution affects meaning. + """ + try: + # 1. Find conversation and validate ownership + conversation_model = await Conversation.find_one( + Conversation.conversation_id == conversation_id + ) + if not conversation_model: + return JSONResponse( + status_code=404, + content={"error": "Conversation not found"} + ) + + # Check ownership for non-admin users + if not user.is_superuser and conversation_model.user_id != str(user.user_id): + return JSONResponse( + status_code=403, + content={"error": "Access forbidden. You can only reprocess your own conversations."} + ) + + # 2. Resolve source transcript version ID (handle "active" special case) + source_version_id = transcript_version_id + if source_version_id == "active": + active_version_id = conversation_model.active_transcript_version + if not active_version_id: + return JSONResponse( + status_code=404, + content={"error": "No active transcript version found"} + ) + source_version_id = active_version_id + + # 3. Find and validate the source transcript version + source_version = None + for version in conversation_model.transcript_versions: + if version.version_id == source_version_id: + source_version = version + break + + if not source_version: + return JSONResponse( + status_code=404, + content={"error": f"Transcript version '{source_version_id}' not found"} + ) + + # 4. Validate transcript has content and words + if not source_version.transcript: + return JSONResponse( + status_code=400, + content={"error": "Cannot re-diarize empty transcript. Transcript version has no text."} + ) + + if not source_version.words: + return JSONResponse( + status_code=400, + content={"error": "Cannot re-diarize transcript without word timings. Words are required for diarization."} + ) + + # 5. Check if speaker recognition is enabled + speaker_config = get_service_config('speaker_recognition') + if not speaker_config.get('enabled', True): + return JSONResponse( + status_code=400, + content={ + "error": "Speaker recognition is disabled", + "details": "Enable speaker service in config to use this feature" + } + ) + + # 6. Create NEW transcript version (copy text/words, empty segments) + new_version_id = str(uuid.uuid4()) + + # Add new version with copied text/words but empty segments + # Speaker job will populate segments with re-identified speakers + conversation_model.add_transcript_version( + version_id=new_version_id, + transcript=source_version.transcript, # COPY transcript text + words=source_version.words, # COPY word timings + segments=[], # Empty - will be populated by speaker job + provider=source_version.provider, + model=source_version.model, + processing_time_seconds=None, # Will be updated by job + metadata={ + "reprocessing_type": "speaker_diarization", + "source_version_id": source_version_id, + "trigger": "manual_reprocess" + }, + set_as_active=True # Set new version as active + ) + + # Save conversation with new version + await conversation_model.save() + + logger.info( + f"Created new transcript version {new_version_id} from source {source_version_id} " + f"for conversation {conversation_id}" + ) + + # 7. Enqueue speaker recognition job with NEW version_id + speaker_job = transcription_queue.enqueue( + recognise_speakers_job, + conversation_id, + new_version_id, # NEW version (not source) + job_timeout=1200, # 20 minutes + result_ttl=JOB_RESULT_TTL, + job_id=f"reprocess_speaker_{conversation_id[:12]}", + description=f"Re-diarize speakers for {conversation_id[:8]}", + meta={ + 'conversation_id': conversation_id, + 'version_id': new_version_id, + 'source_version_id': source_version_id, + 'trigger': 'reprocess' + } + ) + + logger.info( + f"Enqueued speaker reprocessing job {speaker_job.id} " + f"for new version {new_version_id}" + ) + + # 8. Chain memory reprocessing (speaker changes affect memory context) + memory_job = memory_queue.enqueue( + process_memory_job, + conversation_id, + depends_on=speaker_job, + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + job_id=f"memory_{conversation_id[:12]}", + description=f"Extract memories for {conversation_id[:8]}", + meta={ + 'conversation_id': conversation_id, + 'trigger': 'reprocess_after_speaker' + } + ) + + logger.info( + f"Chained memory reprocessing job {memory_job.id} " + f"after speaker job {speaker_job.id}" + ) + + # 9. Return job information + return JSONResponse(content={ + "message": "Speaker reprocessing started", + "job_id": speaker_job.id, + "version_id": new_version_id, # NEW version ID + "source_version_id": source_version_id, # Original version used as source + "status": "queued" + }) + + except Exception as e: + logger.error(f"Error starting speaker reprocessing: {e}") + return JSONResponse( + status_code=500, + content={"error": "Error starting speaker reprocessing"} + ) + + async def activate_transcript_version(conversation_id: str, version_id: str, user: User): """Activate a specific transcript version. Users can only modify their own conversations.""" try: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/memory_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/memory_controller.py index f52167de..5abf4b36 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/memory_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/memory_controller.py @@ -139,33 +139,6 @@ async def delete_memory(memory_id: str, user: User): ) -async def get_memories_unfiltered(user: User, limit: int, user_id: Optional[str] = None): - """Get all memories including fallback transcript memories (for debugging). Users see only their own memories, admins can see all or filter by user.""" - try: - memory_service = get_memory_service() - - # Determine which user's memories to fetch - target_user_id = user.user_id - if user.is_superuser and user_id: - target_user_id = user_id - - # Execute memory retrieval directly (now async) - memories = await memory_service.get_all_memories_unfiltered(target_user_id, limit) - - return { - "memories": memories, - "count": len(memories), - "user_id": target_user_id, - "includes_fallback": True, - } - - except Exception as e: - audio_logger.error(f"Error fetching unfiltered memories: {e}", exc_info=True) - return JSONResponse( - status_code=500, content={"message": f"Error fetching unfiltered memories: {str(e)}"} - ) - - async def add_memory(content: str, user: User, source_id: Optional[str] = None): """Add a memory directly from content text. Extracts structured memories from the provided content.""" try: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index 9cd374e0..70b9b336 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -17,7 +17,7 @@ import redis from rq import Queue, Worker -from rq.job import Job +from rq.job import Job, JobStatus from rq.registry import ScheduledJobRegistry, DeferredJobRegistry from advanced_omi_backend.models.job import JobPriority @@ -30,6 +30,52 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") redis_conn = redis.from_url(REDIS_URL) + +def get_job_status_from_rq(job: Job) -> str: + """ + Get job status using RQ's native method. + + Uses job.get_status() which is the Redis Queue standard approach. + Returns RQ's standard status names. + + Returns one of: queued, started, finished, failed, deferred, scheduled, canceled, stopped + + Raises: + RuntimeError: If job status is unexpected (should never happen with RQ's method) + """ + rq_status = job.get_status() + + # RQ returns status as JobStatus enum or string + # Convert to string if it's an enum + if isinstance(rq_status, JobStatus): + status_str = rq_status.value + else: + status_str = str(rq_status) + + # Validate it's a known RQ status + valid_statuses = { + JobStatus.QUEUED.value, + JobStatus.STARTED.value, + JobStatus.FINISHED.value, + JobStatus.FAILED.value, + JobStatus.DEFERRED.value, + JobStatus.SCHEDULED.value, + JobStatus.CANCELED.value, + JobStatus.STOPPED.value, + } + + if status_str not in valid_statuses: + logger.error( + f"Job {job.id} has unexpected RQ status: {status_str}. " + f"This indicates RQ library added a new status we don't know about." + ) + raise RuntimeError( + f"Job {job.id} has unknown RQ status: {status_str}. " + f"Please update get_job_status_from_rq() to handle this new status." + ) + + return status_str + # Queue name constants TRANSCRIPTION_QUEUE = "transcription" MEMORY_QUEUE = "memory" @@ -61,34 +107,34 @@ def get_queue(queue_name: str = DEFAULT_QUEUE) -> Queue: def get_job_stats() -> Dict[str, Any]: - """Get statistics about jobs in all queues matching frontend expectations.""" + """Get statistics about jobs in all queues using RQ standard status names.""" total_jobs = 0 queued_jobs = 0 - processing_jobs = 0 - completed_jobs = 0 + started_jobs = 0 # RQ standard: "started" not "processing" + finished_jobs = 0 # RQ standard: "finished" not "completed" failed_jobs = 0 - cancelled_jobs = 0 + canceled_jobs = 0 # RQ standard: "canceled" not "cancelled" deferred_jobs = 0 # Jobs waiting for dependencies (depends_on) for queue_name in QUEUE_NAMES: queue = get_queue(queue_name) queued_jobs += len(queue) - processing_jobs += len(queue.started_job_registry) - completed_jobs += len(queue.finished_job_registry) + started_jobs += len(queue.started_job_registry) + finished_jobs += len(queue.finished_job_registry) failed_jobs += len(queue.failed_job_registry) - cancelled_jobs += len(queue.canceled_job_registry) + canceled_jobs += len(queue.canceled_job_registry) deferred_jobs += len(queue.deferred_job_registry) - total_jobs = queued_jobs + processing_jobs + completed_jobs + failed_jobs + cancelled_jobs + deferred_jobs + total_jobs = queued_jobs + started_jobs + finished_jobs + failed_jobs + canceled_jobs + deferred_jobs return { "total_jobs": total_jobs, "queued_jobs": queued_jobs, - "processing_jobs": processing_jobs, - "completed_jobs": completed_jobs, + "started_jobs": started_jobs, + "finished_jobs": finished_jobs, "failed_jobs": failed_jobs, - "cancelled_jobs": cancelled_jobs, + "canceled_jobs": canceled_jobs, "deferred_jobs": deferred_jobs, "timestamp": datetime.utcnow().isoformat() } @@ -124,11 +170,11 @@ def get_jobs( for qname in queues_to_check: queue = get_queue(qname) - # Collect jobs from all registries + # Collect jobs from all registries (using RQ standard status names) registries = [ (queue.job_ids, "queued"), - (queue.started_job_registry.get_job_ids(), "processing"), - (queue.finished_job_registry.get_job_ids(), "completed"), + (queue.started_job_registry.get_job_ids(), "started"), # RQ standard, not "processing" + (queue.finished_job_registry.get_job_ids(), "finished"), # RQ standard, not "completed" (queue.failed_job_registry.get_job_ids(), "failed"), (queue.deferred_job_registry.get_job_ids(), "deferred"), # Jobs waiting for dependencies ] @@ -369,7 +415,8 @@ def start_post_conversation_jobs( user_id: str, transcript_version_id: Optional[str] = None, depends_on_job = None, - client_id: Optional[str] = None + client_id: Optional[str] = None, + end_reason: str = "file_upload" ) -> Dict[str, str]: """ Start post-conversation processing jobs after conversation is created. @@ -389,6 +436,7 @@ def start_post_conversation_jobs( transcript_version_id: Transcript version ID (auto-generated if None) depends_on_job: Optional job dependency for first job (e.g., transcription for file uploads) client_id: Client ID for UI tracking + end_reason: Reason conversation ended (e.g., 'file_upload', 'websocket_disconnect', 'user_stopped') Returns: Dict with job IDs for speaker_recognition, memory, title_summary, event_dispatch @@ -435,27 +483,35 @@ def start_post_conversation_jobs( else: logger.info(f"โญ๏ธ Speaker recognition disabled, skipping speaker job for conversation {conversation_id[:8]}") - # Step 2: Memory extraction job - # Depends on speaker job if it was created, otherwise depends on upstream (transcription or nothing) - memory_job_id = f"memory_{conversation_id[:12]}" - logger.info(f"๐Ÿ” DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}") + # Step 2: Memory extraction job (conditional - only if enabled) + # Check if memory extraction is enabled + memory_config = get_service_config('memory.extraction') + memory_enabled = memory_config.get('enabled', True) # Default to True for backward compatibility - memory_job = memory_queue.enqueue( - process_memory_job, - conversation_id, - job_timeout=900, # 15 minutes - result_ttl=JOB_RESULT_TTL, - depends_on=speaker_dependency, # Either speaker_job or upstream dependency - job_id=memory_job_id, - description=f"Memory extraction for conversation {conversation_id[:8]}", - meta=job_meta - ) - if speaker_job: - logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on speaker job {speaker_job.id})") - elif depends_on_job: - logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on {depends_on_job.id})") + memory_job = None + if memory_enabled: + # Depends on speaker job if it was created, otherwise depends on upstream (transcription or nothing) + memory_job_id = f"memory_{conversation_id[:12]}" + logger.info(f"๐Ÿ” DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}") + + memory_job = memory_queue.enqueue( + process_memory_job, + conversation_id, + job_timeout=900, # 15 minutes + result_ttl=JOB_RESULT_TTL, + depends_on=speaker_dependency, # Either speaker_job or upstream dependency + job_id=memory_job_id, + description=f"Memory extraction for conversation {conversation_id[:8]}", + meta=job_meta + ) + if speaker_job: + logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on speaker job {speaker_job.id})") + elif depends_on_job: + logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on {depends_on_job.id})") + else: + logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (no dependencies, starts immediately)") else: - logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (no dependencies, starts immediately)") + logger.info(f"โญ๏ธ Memory extraction disabled, skipping memory job for conversation {conversation_id[:8]}") # Step 3: Title/summary generation job # Depends on speaker job if enabled, otherwise on upstream dependency @@ -484,25 +540,39 @@ def start_post_conversation_jobs( event_job_id = f"event_complete_{conversation_id[:12]}" logger.info(f"๐Ÿ” DEBUG: Creating conversation complete event job with job_id={event_job_id}, conversation_id={conversation_id[:12]}") - # Event job depends on both memory and title/summary jobs completing - # Use RQ's depends_on list to wait for both + # Event job depends on memory and title/summary jobs that were actually enqueued + # Build dependency list excluding None values + event_dependencies = [] + if memory_job: + event_dependencies.append(memory_job) + if title_summary_job: + event_dependencies.append(title_summary_job) + + # Enqueue event dispatch job (may have no dependencies if all jobs were skipped) event_dispatch_job = default_queue.enqueue( dispatch_conversation_complete_event_job, conversation_id, client_id or "", user_id, + end_reason, # Use the end_reason parameter (defaults to 'file_upload' for backward compatibility) job_timeout=120, # 2 minutes result_ttl=JOB_RESULT_TTL, - depends_on=[memory_job, title_summary_job], # Wait for both parallel jobs + depends_on=event_dependencies if event_dependencies else None, # Wait for jobs that were enqueued job_id=event_job_id, - description=f"Dispatch conversation complete event for {conversation_id[:8]}", + description=f"Dispatch conversation complete event ({end_reason}) for {conversation_id[:8]}", meta=job_meta ) - logger.info(f"๐Ÿ“ฅ RQ: Enqueued conversation complete event job {event_dispatch_job.id}, meta={event_dispatch_job.meta} (depends on {memory_job.id} and {title_summary_job.id})") + + # Log event dispatch dependencies + if event_dependencies: + dep_ids = [job.id for job in event_dependencies] + logger.info(f"๐Ÿ“ฅ RQ: Enqueued conversation complete event job {event_dispatch_job.id}, meta={event_dispatch_job.meta} (depends on {', '.join(dep_ids)})") + else: + logger.info(f"๐Ÿ“ฅ RQ: Enqueued conversation complete event job {event_dispatch_job.id}, meta={event_dispatch_job.meta} (no dependencies, starts immediately)") return { - 'speaker_recognition': speaker_job.id, - 'memory': memory_job.id, + 'speaker_recognition': speaker_job.id if speaker_job else None, + 'memory': memory_job.id if memory_job else None, 'title_summary': title_summary_job.id, 'event_dispatch': event_dispatch_job.id } diff --git a/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py index d1a22695..fe9b87cd 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/session_controller.py @@ -56,12 +56,13 @@ async def mark_session_complete( await mark_session_complete(redis, session_id, "all_jobs_complete") """ session_key = f"audio:session:{session_id}" + mark_time = time.time() await redis_client.hset(session_key, mapping={ - "status": "complete", - "completed_at": str(time.time()), + "status": "finished", + "completed_at": str(mark_time), "completion_reason": reason }) - logger.info(f"โœ… Session {session_id[:12]} marked complete: {reason}") + logger.info(f"โœ… Session {session_id[:12]} marked finished: {reason} [TIME: {mark_time:.3f}]") async def get_session_info(redis_client, session_id: str) -> Optional[Dict]: @@ -199,7 +200,7 @@ async def get_streaming_status(request): transcription_queue, memory_queue, default_queue, - all_jobs_complete_for_session + all_jobs_complete_for_client ) try: @@ -229,17 +230,18 @@ async def get_streaming_status(request): # Separate active and completed sessions # Check if all jobs are complete (including failed jobs) - all_jobs_done = all_jobs_complete_for_session(session_id) - - # Session is completed if: - # 1. Redis status says complete/finalized AND all jobs done, OR - # 2. All jobs are done (even if status isn't complete yet) - # This ensures sessions with failed jobs move to completed - if status in ["complete", "completed", "finalized"] or all_jobs_done: + # Note: session_id == client_id in streaming context, but using client_id explicitly + all_jobs_done = all_jobs_complete_for_client(session_obj.get("client_id")) + + # Session is finished if: + # 1. Redis status says finished AND all jobs done, OR + # 2. All jobs are done (even if status isn't finished yet) + # This ensures sessions with failed jobs move to finished + if status == "finished" or all_jobs_done: if all_jobs_done: - # All jobs complete - this is truly a completed session - # Update Redis status if it wasn't already marked complete - if status not in ["complete", "completed", "finalized"]: + # All jobs finished - this is truly a finished session + # Update Redis status if it wasn't already marked finished + if status != "finished": await mark_session_complete(redis_client, session_id, "all_jobs_complete") # Get additional session data for completed sessions @@ -251,7 +253,7 @@ async def get_streaming_status(request): "client_id": session_obj.get("client_id", ""), "conversation_id": session_data.get(b"conversation_id", b"").decode() if session_data and b"conversation_id" in session_data else None, "has_conversation": bool(session_data and session_data.get(b"conversation_id", b"")), - "action": session_data.get(b"action", b"complete").decode() if session_data and b"action" in session_data else "complete", + "action": session_data.get(b"action", b"finished").decode() if session_data and b"action" in session_data else "finished", "reason": session_data.get(b"reason", b"").decode() if session_data and b"reason" in session_data else "", "completed_at": session_obj.get("last_chunk_at", 0), "audio_file": session_data.get(b"audio_file", b"").decode() if session_data and b"audio_file" in session_data else "", @@ -450,26 +452,26 @@ async def get_streaming_status(request): rq_stats = { "transcription_queue": { "queued": transcription_queue.count, - "processing": len(transcription_queue.started_job_registry), - "completed": len(transcription_queue.finished_job_registry), + "started": len(transcription_queue.started_job_registry), + "finished": len(transcription_queue.finished_job_registry), "failed": len(transcription_queue.failed_job_registry), - "cancelled": len(transcription_queue.canceled_job_registry), + "canceled": len(transcription_queue.canceled_job_registry), "deferred": len(transcription_queue.deferred_job_registry) }, "memory_queue": { "queued": memory_queue.count, - "processing": len(memory_queue.started_job_registry), - "completed": len(memory_queue.finished_job_registry), + "started": len(memory_queue.started_job_registry), + "finished": len(memory_queue.finished_job_registry), "failed": len(memory_queue.failed_job_registry), - "cancelled": len(memory_queue.canceled_job_registry), + "canceled": len(memory_queue.canceled_job_registry), "deferred": len(memory_queue.deferred_job_registry) }, "default_queue": { "queued": default_queue.count, - "processing": len(default_queue.started_job_registry), - "completed": len(default_queue.finished_job_registry), + "started": len(default_queue.started_job_registry), + "finished": len(default_queue.finished_job_registry), "failed": len(default_queue.failed_job_registry), - "cancelled": len(default_queue.canceled_job_registry), + "canceled": len(default_queue.canceled_job_registry), "deferred": len(default_queue.deferred_job_registry) } } diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index 79bb56fc..8cd3319b 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -202,35 +202,22 @@ async def create_client_state(client_id: str, user, device_name: Optional[str] = async def cleanup_client_state(client_id: str): - """Clean up and remove client state, including cancelling speech detection job and marking session complete.""" - # Cancel the speech detection job for this client - from advanced_omi_backend.controllers.queue_controller import redis_conn - from rq.job import Job + """ + Clean up and remove client state, marking session complete. + + Note: We do NOT cancel the speech detection job here because: + 1. The job needs to process all audio data that was already sent + 2. If speech was detected, it should create a conversation + 3. The job will complete naturally when it sees session status = "finalizing" + 4. The job has a grace period (15s) to wait for final transcription + 5. RQ's job_timeout (24h) prevents jobs from hanging forever + """ + # Note: Previously we cancelled the speech detection job here, but this prevented + # conversations from being created when WebSocket disconnects mid-recording. + # The speech detection job now monitors session status and completes naturally. import redis.asyncio as redis - try: - job_id_key = f"speech_detection_job:{client_id}" - job_id_bytes = redis_conn.get(job_id_key) - - if job_id_bytes: - job_id = job_id_bytes.decode() - logger.info(f"๐Ÿ›‘ Cancelling speech detection job {job_id} for client {client_id}") - - try: - # Fetch and cancel the job - job = Job.fetch(job_id, connection=redis_conn) - job.cancel() - logger.info(f"โœ… Successfully cancelled speech detection job {job_id}") - except Exception as job_error: - logger.warning(f"โš ๏ธ Failed to cancel job {job_id}: {job_error}") - - # Clean up the tracking key - redis_conn.delete(job_id_key) - logger.info(f"๐Ÿงน Cleaned up job tracking key for client {client_id}") - else: - logger.debug(f"No speech detection job found for client {client_id}") - except Exception as e: - logger.warning(f"โš ๏ธ Error during job cancellation for client {client_id}: {e}") + logger.info(f"๐Ÿ”„ Letting speech detection job complete naturally for client {client_id} (if running)") # Mark all active sessions for this client as complete AND delete Redis streams try: @@ -238,6 +225,10 @@ async def cleanup_client_state(client_id: str): redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") async_redis = redis.from_url(redis_url, decode_responses=False) + # Get audio stream producer for finalization + from advanced_omi_backend.services.audio_stream.producer import get_audio_stream_producer + audio_stream_producer = get_audio_stream_producer() + # Find all session keys for this client and mark them complete pattern = f"audio:session:*" cursor = 0 @@ -250,8 +241,18 @@ async def cleanup_client_state(client_id: str): # Check if this session belongs to this client client_id_bytes = await async_redis.hget(key, "client_id") if client_id_bytes and client_id_bytes.decode() == client_id: - # Mark session as complete (WebSocket disconnected) session_id = key.decode().replace("audio:session:", "") + + # Check session status + status_bytes = await async_redis.hget(key, "status") + status = status_bytes.decode() if status_bytes else None + + # If session is still active, finalize it first (sets status + completion_reason atomically) + if status in ["active", None]: + logger.info(f"๐Ÿ“Š Finalizing active session {session_id[:12]} due to WebSocket disconnect") + await audio_stream_producer.finalize_session(session_id, completion_reason="websocket_disconnect") + + # Mark session as complete (WebSocket disconnected) await mark_session_complete(async_redis, session_id, "websocket_disconnect") sessions_closed += 1 @@ -485,8 +486,8 @@ async def _finalize_streaming_session( # Send end-of-session signal to workers await audio_stream_producer.send_session_end_signal(session_id) - # Mark session as finalizing - await audio_stream_producer.finalize_session(session_id) + # Mark session as finalizing with user_stopped reason (audio-stop event) + await audio_stream_producer.finalize_session(session_id, completion_reason="user_stopped") # NOTE: Finalize job disabled - open_conversation_job now handles everything # The open_conversation_job will: diff --git a/backends/advanced/src/advanced_omi_backend/database.py b/backends/advanced/src/advanced_omi_backend/database.py index ae7650b0..1b214b6d 100644 --- a/backends/advanced/src/advanced_omi_backend/database.py +++ b/backends/advanced/src/advanced_omi_backend/database.py @@ -14,7 +14,7 @@ # MongoDB Configuration MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") -MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "friend-lite") +MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "chronicle") mongo_client = AsyncIOMotorClient( MONGODB_URI, diff --git a/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py b/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py index 4cff21eb..069d5239 100644 --- a/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py +++ b/backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py @@ -60,6 +60,7 @@ class RequestLoggingMiddleware(BaseHTTPMiddleware): "/health", "/auth/health", "/readiness", + "/api/queue/dashboard", # Auto-refresh endpoint, too noisy } # Binary content types to exclude diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 28a2f0ec..1ee9a57a 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -63,7 +63,14 @@ class TranscriptVersion(BaseModel): """Version of a transcript with processing metadata.""" version_id: str = Field(description="Unique version identifier") transcript: Optional[str] = Field(None, description="Full transcript text") - segments: List["Conversation.SpeakerSegment"] = Field(default_factory=list, description="Speaker segments") + words: List["Conversation.Word"] = Field( + default_factory=list, + description="Word-level timestamps for entire transcript" + ) + segments: List["Conversation.SpeakerSegment"] = Field( + default_factory=list, + description="Speaker segments (filled by speaker recognition)" + ) provider: Optional[str] = Field(None, description="Transcription provider used (deepgram, parakeet, etc.)") model: Optional[str] = Field(None, description="Model used (e.g., nova-3, parakeet)") created_at: datetime = Field(description="When this version was created") @@ -253,8 +260,9 @@ def add_transcript_version( self, version_id: str, transcript: str, - segments: List["Conversation.SpeakerSegment"], - provider: str, # Provider name from config.yml (deepgram, parakeet, etc.) + words: Optional[List["Conversation.Word"]] = None, + segments: Optional[List["Conversation.SpeakerSegment"]] = None, + provider: str = None, # Provider name from config.yml (deepgram, parakeet, etc.) model: Optional[str] = None, processing_time_seconds: Optional[float] = None, metadata: Optional[Dict[str, Any]] = None, @@ -264,7 +272,8 @@ def add_transcript_version( new_version = Conversation.TranscriptVersion( version_id=version_id, transcript=transcript, - segments=segments, + words=words or [], + segments=segments or [], provider=provider, model=model, created_at=datetime.now(), diff --git a/backends/advanced/src/advanced_omi_backend/models/job.py b/backends/advanced/src/advanced_omi_backend/models/job.py index f2d85add..5d906865 100644 --- a/backends/advanced/src/advanced_omi_backend/models/job.py +++ b/backends/advanced/src/advanced_omi_backend/models/job.py @@ -44,7 +44,7 @@ async def _ensure_beanie_initialized(): mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") # Create MongoDB client - mongodb_database = os.getenv("MONGODB_DATABASE", "friend-lite") + mongodb_database = os.getenv("MONGODB_DATABASE", "chronicle") client = AsyncIOMotorClient(mongodb_uri) try: database = client.get_default_database(mongodb_database) diff --git a/backends/advanced/src/advanced_omi_backend/plugins/router.py b/backends/advanced/src/advanced_omi_backend/plugins/router.py index f046520c..523fe3ed 100644 --- a/backends/advanced/src/advanced_omi_backend/plugins/router.py +++ b/backends/advanced/src/advanced_omi_backend/plugins/router.py @@ -122,23 +122,37 @@ async def dispatch_event( Returns: List of plugin results """ + # Add at start + logger.info(f"๐Ÿ”Œ ROUTER: Dispatching '{event}' event (user={user_id})") + results = [] # Get plugins subscribed to this event plugin_ids = self._plugins_by_event.get(event, []) + # Add subscription check + if not plugin_ids: + logger.warning(f"๐Ÿ”Œ ROUTER: No plugins subscribed to event '{event}'") + return results + + logger.info(f"๐Ÿ”Œ ROUTER: Found {len(plugin_ids)} subscribed plugin(s): {plugin_ids}") + for plugin_id in plugin_ids: plugin = self.plugins[plugin_id] if not plugin.enabled: + logger.info(f" โŠ˜ Skipping '{plugin_id}': disabled") continue # Check execution condition (wake_word, etc.) + logger.info(f" โ†’ Checking execution condition for '{plugin_id}'") if not await self._should_execute(plugin, data): + logger.info(f" โŠ˜ Skipping '{plugin_id}': condition not met") continue # Execute plugin try: + logger.info(f" โ–ถ Executing '{plugin_id}' for event '{event}'") context = PluginContext( user_id=user_id, event=event, @@ -149,15 +163,30 @@ async def dispatch_event( result = await self._execute_plugin(plugin, event, context) if result: + status_icon = "โœ“" if result.success else "โœ—" + logger.info( + f" {status_icon} Plugin '{plugin_id}' completed: " + f"success={result.success}, message={result.message}" + ) results.append(result) # If plugin says stop processing, break if not result.should_continue: - logger.info(f"Plugin '{plugin_id}' stopped further processing") + logger.info(f" โŠ— Plugin '{plugin_id}' stopped further processing") break except Exception as e: - logger.error(f"Error executing plugin '{plugin_id}': {e}", exc_info=True) + # CRITICAL: Log exception details + logger.error( + f" โœ— Plugin '{plugin_id}' FAILED with exception: {e}", + exc_info=True + ) + + # Add at end + logger.info( + f"๐Ÿ”Œ ROUTER: Dispatch complete for '{event}': " + f"{len(results)} plugin(s) executed successfully" + ) return results diff --git a/backends/advanced/src/advanced_omi_backend/plugins/test_event/event_storage.py b/backends/advanced/src/advanced_omi_backend/plugins/test_event/event_storage.py index 00bc674d..4fb618f9 100644 --- a/backends/advanced/src/advanced_omi_backend/plugins/test_event/event_storage.py +++ b/backends/advanced/src/advanced_omi_backend/plugins/test_event/event_storage.py @@ -118,29 +118,55 @@ async def log_event( Returns: Row ID of inserted event """ + # Add at start + logger.debug(f"๐Ÿ’พ STORAGE: Logging event '{event}' for user {user_id}") + if not self.db: + logger.error("๐Ÿ’พ STORAGE: Database connection not initialized!") raise RuntimeError("Event storage not initialized") timestamp = datetime.utcnow().isoformat() - data_json = json.dumps(data) - metadata_json = json.dumps(metadata) if metadata else None - cursor = await self.db.execute( - """ - INSERT INTO plugin_events (timestamp, event, user_id, data, metadata) - VALUES (?, ?, ?, ?, ?) - """, - (timestamp, event, user_id, data_json, metadata_json) - ) + # Add before serialization + logger.debug(f"๐Ÿ’พ STORAGE: Serializing event data...") + try: + data_json = json.dumps(data) + metadata_json = json.dumps(metadata) if metadata else None + except Exception as e: + logger.error( + f"๐Ÿ’พ STORAGE: JSON serialization failed for event '{event}': {e}", + exc_info=True + ) + raise - await self.db.commit() - row_id = cursor.lastrowid + # Add before database operation + logger.debug(f"๐Ÿ’พ STORAGE: Inserting into plugin_events table...") - logger.debug( - f"Logged event: {event} for user {user_id} (row_id={row_id})" - ) + try: + cursor = await self.db.execute( + """ + INSERT INTO plugin_events (timestamp, event, user_id, data, metadata) + VALUES (?, ?, ?, ?, ?) + """, + (timestamp, event, user_id, data_json, metadata_json) + ) + + await self.db.commit() + row_id = cursor.lastrowid + + # Add success log + logger.info( + f"๐Ÿ’พ STORAGE: Event '{event}' inserted successfully (row_id={row_id})" + ) + + return row_id - return row_id + except Exception as e: + logger.error( + f"๐Ÿ’พ STORAGE: Database operation failed for event '{event}': {e}", + exc_info=True + ) + raise async def get_events_by_type(self, event: str) -> List[Dict[str, Any]]: """ diff --git a/backends/advanced/src/advanced_omi_backend/plugins/test_event/plugin.py b/backends/advanced/src/advanced_omi_backend/plugins/test_event/plugin.py index 6b96e078..59dd652e 100644 --- a/backends/advanced/src/advanced_omi_backend/plugins/test_event/plugin.py +++ b/backends/advanced/src/advanced_omi_backend/plugins/test_event/plugin.py @@ -115,11 +115,21 @@ async def on_conversation_complete(self, context: PluginContext) -> Optional[Plu Returns: PluginResult indicating success """ + conversation_id = context.data.get('conversation_id', 'unknown') + duration = context.data.get('duration', 0) + + # Add at start + logger.info( + f"๐Ÿ“ HANDLER: on_conversation_complete called for {conversation_id[:12]}" + ) + logger.debug(f" Event: {context.event}") + logger.debug(f" Metadata: {context.metadata}") + logger.debug(f" Duration: {duration}s") + try: - conversation_id = context.data.get('conversation_id', 'unknown') - duration = context.data.get('duration', 0) + # Add before storage + logger.info(f" ๐Ÿ’พ Storing event to SQLite database...") - # Log to storage row_id = await self.storage.log_event( event=context.event, # 'conversation.complete' user_id=context.user_id, @@ -127,27 +137,28 @@ async def on_conversation_complete(self, context: PluginContext) -> Optional[Plu metadata=context.metadata ) - self.event_count += 1 + # Add after storage + logger.info(f" โœ“ Event stored successfully (row_id={row_id})") - logger.info( - f"๐Ÿ“ Logged conversation.complete event (row_id={row_id}): " - f"user={context.user_id}, " - f"conversation={conversation_id}, " - f"duration={duration:.2f}s" - ) + self.event_count += 1 return PluginResult( success=True, message=f"Conversation event logged (row_id={row_id})", - should_continue=True + data={"row_id": row_id}, + should_continue=True, ) except Exception as e: - logger.error(f"Error logging conversation event: {e}", exc_info=True) + # Enhance error logging + logger.error( + f" โœ— Storage FAILED for {conversation_id[:12]}: {e}", + exc_info=True + ) return PluginResult( success=False, message=f"Failed to log conversation event: {e}", - should_continue=True + should_continue=True, ) async def on_memory_processed(self, context: PluginContext) -> Optional[PluginResult]: diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py index dc2bbd3f..e8d36d1e 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py @@ -124,7 +124,7 @@ async def create_transcript_annotation( raise HTTPException(status_code=404, detail="Conversation not found") # Validate segment index - active_transcript = conversation.get_active_transcript() + active_transcript = conversation.active_transcript if ( not active_transcript or annotation_data.segment_index >= len(active_transcript.segments) @@ -288,7 +288,7 @@ async def update_annotation_status( Conversation.user_id == annotation.user_id ) if conversation: - transcript = conversation.get_active_transcript() + transcript = conversation.active_transcript if ( transcript and annotation.segment_index < len(transcript.segments) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index a9f09ac2..7cef955a 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -307,6 +307,10 @@ async def get_audio_chunk_range( 401: If not authenticated 400: If time range is invalid """ + import logging + logger = logging.getLogger(__name__) + logger.info(f"๐ŸŽต Audio chunk request: conversation={conversation_id[:8]}..., start={start_time:.2f}s, end={end_time:.2f}s") + # Try token param if header auth failed if not current_user and token: current_user = await get_user_from_token_param(token) @@ -333,52 +337,17 @@ async def get_audio_chunk_range( if conversation.audio_total_duration and end_time > conversation.audio_total_duration: end_time = conversation.audio_total_duration - # Calculate which chunks are needed (each chunk is 10 seconds) - CHUNK_DURATION = 10.0 - start_chunk = int(start_time / CHUNK_DURATION) - end_chunk = int(end_time / CHUNK_DURATION) - num_chunks = end_chunk - start_chunk + 1 - - # Retrieve only needed chunks - chunks = await retrieve_audio_chunks( - conversation_id=conversation_id, - start_index=start_chunk, - limit=num_chunks - ) - - if not chunks: - raise HTTPException( - status_code=404, - detail=f"No audio data in requested range ({start_time}s-{end_time}s)" - ) - - # Decode chunks and concatenate - pcm_buffer = await concatenate_chunks_to_pcm(chunks) - - # Trim to exact time range within the chunks - SAMPLE_RATE = 16000 - SAMPLE_WIDTH = 2 # 16-bit - CHANNELS = 1 - bytes_per_second = SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS - - # Calculate byte offsets within concatenated buffer - start_offset_in_chunk = start_time - (start_chunk * CHUNK_DURATION) - end_offset_in_chunk = end_time - (end_chunk * CHUNK_DURATION) + # Use the dedicated segment reconstruction function + from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_audio_segment - start_byte = int(start_offset_in_chunk * bytes_per_second) - # Calculate end byte from the end of buffer - bytes_from_end = int((CHUNK_DURATION - end_offset_in_chunk) * bytes_per_second) - end_byte = len(pcm_buffer) - bytes_from_end - - # Trim PCM data - trimmed_pcm = pcm_buffer[start_byte:end_byte] - - # Build WAV file with trimmed audio - wav_data = await build_wav_from_pcm( - pcm_data=trimmed_pcm, - sample_rate=SAMPLE_RATE, - channels=CHANNELS - ) + try: + wav_data = await reconstruct_audio_segment(conversation_id, start_time, end_time) + logger.info(f"โœ… Returning WAV: {len(wav_data)} bytes for range {start_time:.2f}s - {end_time:.2f}s") + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Failed to reconstruct audio segment: {e}") + raise HTTPException(status_code=500, detail=f"Failed to reconstruct audio: {str(e)}") return StreamingResponse( io.BytesIO(wav_data), diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py index be2b3bc4..25f494ce 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py @@ -7,12 +7,13 @@ import logging from typing import Optional -from fastapi import APIRouter, Depends, Query, HTTPException, Response +from fastapi import APIRouter, Depends, HTTPException, Query, Response from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.controllers import conversation_controller, audio_controller -from advanced_omi_backend.users import User +from advanced_omi_backend.controllers import conversation_controller from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.users import User +from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_audio_segment logger = logging.getLogger(__name__) @@ -65,6 +66,32 @@ async def reprocess_memory( return await conversation_controller.reprocess_memory(conversation_id, transcript_version_id, current_user) +@router.post("/{conversation_id}/reprocess-speakers") +async def reprocess_speakers( + conversation_id: str, + current_user: User = Depends(current_active_user), + transcript_version_id: str = Query(default="active") +): + """ + Re-run speaker identification/diarization on existing transcript. + + Creates a NEW transcript version with same text/words but re-identified speakers. + Automatically chains memory reprocessing since speaker changes affect memory context. + + Args: + conversation_id: Conversation to reprocess + transcript_version_id: Which transcript version to use as source (default: "active") + + Returns: + Job status with job_id and new version_id + """ + return await conversation_controller.reprocess_speakers( + conversation_id, + transcript_version_id, + current_user + ) + + @router.post("/{conversation_id}/activate-transcript/{version_id}") async def activate_transcript_version( conversation_id: str, @@ -116,6 +143,7 @@ async def get_conversation_waveform( - duration_seconds: float - Total audio duration """ from fastapi import HTTPException + from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.models.waveform import WaveformData from advanced_omi_backend.workers.waveform_jobs import generate_waveform_data @@ -227,7 +255,6 @@ async def get_audio_segment( Returns: WAV audio bytes (16kHz, mono) for the requested time range """ - from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_audio_segment # Verify conversation exists and user has access conversation = await Conversation.find_one( diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/memory_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/memory_routes.py index d0be9528..185f55ec 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/memory_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/memory_routes.py @@ -72,16 +72,6 @@ async def delete_memory(memory_id: str, current_user: User = Depends(current_act return await memory_controller.delete_memory(memory_id, current_user) -@router.get("/unfiltered") -async def get_memories_unfiltered( - current_user: User = Depends(current_active_user), - limit: int = Query(default=50, ge=1, le=1000), - user_id: Optional[str] = Query(default=None, description="User ID filter (admin only)"), -): - """Get all memories including fallback transcript memories (for debugging). Users see only their own memories, admins can see all or filter by user.""" - return await memory_controller.get_memories_unfiltered(current_user, limit, user_id) - - @router.get("/admin") async def get_all_memories_admin(current_user: User = Depends(current_superuser), limit: int = 200): """Get all memories across all users for admin review. Admin only.""" diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py index e45c51de..f6a46a38 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/obsidian_routes.py @@ -176,14 +176,12 @@ async def get_status(job_id: str, current_user: User = Depends(current_active_us status = job.get_status() if status == "started": status = "running" - if status == "canceled": - status = "cancelled" - + # Get metadata meta = job.meta or {} - + # If meta has status, prefer it (for granular updates) - if "status" in meta and meta["status"] in ("running", "completed", "failed", "cancelled"): + if "status" in meta and meta["status"] in ("running", "finished", "failed", "canceled"): status = meta["status"] total = meta.get("total_files", 0) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py index 8dd9e5f6..14c7ee0e 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py @@ -9,7 +9,7 @@ from typing import List, Optional from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.controllers.queue_controller import get_jobs, get_job_stats, redis_conn, QUEUE_NAMES +from advanced_omi_backend.controllers.queue_controller import get_jobs, get_job_stats, redis_conn, QUEUE_NAMES, get_job_status_from_rq from advanced_omi_backend.users import User from rq.job import Job import redis.asyncio as aioredis @@ -65,18 +65,12 @@ async def get_job_status( if job_user_id != str(current_user.user_id): raise HTTPException(status_code=403, detail="Access forbidden") - # Determine status from registries - status = "unknown" - if job.is_queued: - status = "queued" - elif job.is_started: - status = "processing" - elif job.is_finished: - status = "completed" - elif job.is_failed: - status = "failed" - elif job.is_deferred: - status = "deferred" + # Get status using RQ's native method + try: + status = get_job_status_from_rq(job) + except RuntimeError as e: + logger.error(f"Failed to determine status for job {job_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) return { "job_id": job.id, @@ -106,18 +100,12 @@ async def get_job( if job_user_id != str(current_user.user_id): raise HTTPException(status_code=403, detail="Access forbidden") - # Determine status from registries - status = "unknown" - if job.is_queued: - status = "queued" - elif job.is_started: - status = "processing" - elif job.is_finished: - status = "completed" - elif job.is_failed: - status = "failed" - elif job.is_deferred: - status = "deferred" + # Get status using RQ's native method + try: + status = get_job_status_from_rq(job) + except RuntimeError as e: + logger.error(f"Failed to determine status for job {job_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) return { "job_id": job.id, @@ -157,18 +145,18 @@ async def cancel_job( if job_user_id != str(current_user.user_id): raise HTTPException(status_code=403, detail="Access forbidden") - # Cancel if queued or processing, delete if completed/failed + # Cancel if queued or started, delete if finished/failed if job.is_queued or job.is_started or job.is_deferred or job.is_scheduled: # Cancel the job job.cancel() logger.info(f"Cancelled job {job_id}") return { "job_id": job_id, - "action": "cancelled", - "message": f"Job {job_id} has been cancelled" + "action": "canceled", + "message": f"Job {job_id} has been canceled" } else: - # Delete completed/failed jobs + # Delete finished/failed jobs job.delete() logger.info(f"Deleted job {job_id}") return { @@ -182,7 +170,7 @@ async def cancel_job( raise except Exception as e: logger.error(f"Failed to cancel/delete job {job_id}: {e}") - raise HTTPException(status_code=404, detail=f"Job not found or could not be cancelled: {str(e)}") + raise HTTPException(status_code=404, detail=f"Job not found or could not be canceled: {str(e)}") @router.get("/jobs/by-client/{client_id}") @@ -201,21 +189,14 @@ async def get_jobs_by_client( queues = QUEUE_NAMES def get_job_status(job, registries_map): - """Determine job status from registries.""" - if job.is_queued: - return "queued" - elif job.is_started: - return "processing" - elif job.is_finished: - return "completed" - elif job.is_failed: - return "failed" - elif job.is_deferred: - return "deferred" - elif job.is_scheduled: - return "waiting" - else: - return "unknown" + """Determine job status using RQ's native method.""" + try: + return get_job_status_from_rq(job) + except RuntimeError: + # In nested function, can't raise HTTP exception + # Log and re-raise to be handled by outer scope + logger.error(f"Job {job.id} status determination failed") + raise def process_job_and_dependents(job, queue_name, base_status): """Process a job and recursively find all its dependents.""" @@ -270,15 +251,15 @@ def process_job_and_dependents(job, queue_name, base_status): for queue_name in queues: queue = get_queue(queue_name) - # Check all registries + # Check all registries (using RQ standard status names) registries = [ ("queued", queue.job_ids), - ("processing", StartedJobRegistry(queue=queue).get_job_ids()), - ("completed", FinishedJobRegistry(queue=queue).get_job_ids()), + ("started", StartedJobRegistry(queue=queue).get_job_ids()), # RQ standard + ("finished", FinishedJobRegistry(queue=queue).get_job_ids()), # RQ standard ("failed", FailedJobRegistry(queue=queue).get_job_ids()), - ("cancelled", CanceledJobRegistry(queue=queue).get_job_ids()), - ("waiting", DeferredJobRegistry(queue=queue).get_job_ids()), - ("waiting", ScheduledJobRegistry(queue=queue).get_job_ids()) + ("canceled", CanceledJobRegistry(queue=queue).get_job_ids()), # RQ standard (US spelling) + ("deferred", DeferredJobRegistry(queue=queue).get_job_ids()), + ("scheduled", ScheduledJobRegistry(queue=queue).get_job_ids()) ] for status_name, job_ids in registries: @@ -329,7 +310,7 @@ async def get_queue_stats_endpoint( except Exception as e: logger.error(f"Failed to get queue stats: {e}") - return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0, "cancelled_jobs": 0, "deferred_jobs": 0} + return {"total_jobs": 0, "queued_jobs": 0, "started_jobs": 0, "finished_jobs": 0, "failed_jobs": 0, "canceled_jobs": 0, "deferred_jobs": 0} @router.get("/worker-details") @@ -480,13 +461,13 @@ async def get_stream_info(stream_key): class FlushJobsRequest(BaseModel): older_than_hours: int = 24 - statuses: List[str] = ["completed", "failed", "cancelled"] + statuses: List[str] = ["finished", "failed", "canceled"] # RQ standard status names class FlushAllJobsRequest(BaseModel): confirm: bool include_failed: bool = False # By default, preserve failed jobs for debugging - include_completed: bool = False # By default, preserve completed jobs for debugging + include_finished: bool = False # By default, preserve finished jobs for debugging @router.post("/flush") @@ -512,8 +493,8 @@ async def flush_jobs( for queue_name in queues: queue = get_queue(queue_name) - # Flush from appropriate registries based on requested statuses - if "completed" in request.statuses: + # Flush from appropriate registries based on requested statuses (RQ standard names) + if "finished" in request.statuses: # RQ standard, not "completed" registry = FinishedJobRegistry(queue=queue) for job_id in registry.get_job_ids(): try: @@ -535,7 +516,7 @@ async def flush_jobs( except Exception as e: logger.error(f"Error deleting job {job_id}: {e}") - if "cancelled" in request.statuses: + if "canceled" in request.statuses: # RQ standard (US spelling), not "cancelled" registry = CanceledJobRegistry(queue=queue) for job_id in registry.get_job_ids(): try: @@ -564,8 +545,8 @@ async def flush_all_jobs( ): """ Flush jobs from queues and registries. - By default preserves failed and completed jobs for debugging. - Set include_failed=true or include_completed=true to flush those as well. + By default preserves failed and finished jobs for debugging. + Set include_failed=true or include_finished=true to flush those as well. """ if not current_user.is_superuser: raise HTTPException(status_code=403, detail="Admin access required") @@ -607,7 +588,7 @@ async def flush_all_jobs( # Conditionally add failed and finished registries if request.include_failed: registries.append(("failed", FailedJobRegistry(queue=queue))) - if request.include_completed: + if request.include_finished: registries.append(("finished", FinishedJobRegistry(queue=queue))) for registry_name, registry in registries: @@ -691,8 +672,8 @@ async def flush_all_jobs( preserved = [] if not request.include_failed: preserved.append("failed jobs") - if not request.include_completed: - preserved.append("completed jobs") + if not request.include_finished: + preserved.append("finished jobs") preserved_msg = f" (preserved {', '.join(preserved)})" if preserved else "" logger.info(f"Flushed {total_removed} jobs and {deleted_keys} Redis keys from all queues{preserved_msg}") @@ -833,7 +814,7 @@ async def get_dashboard_data( """Get all data needed for the Queue dashboard in a single API call. Returns: - - Jobs grouped by status (queued, processing, completed, failed) + - Jobs grouped by status (queued, started, finished, failed) - Queue statistics - Streaming status - Client jobs for expanded clients @@ -858,12 +839,12 @@ async def fetch_jobs_by_status(status_name: str, limit: int = 100): for queue_name in queues: queue = get_queue(queue_name) - # Get job IDs based on status + # Get job IDs based on status (using RQ standard status names) if status_name == "queued": job_ids = queue.job_ids[:limit] - elif status_name == "processing": + elif status_name == "started": # RQ standard, not "processing" job_ids = list(StartedJobRegistry(queue=queue).get_job_ids())[:limit] - elif status_name == "completed": + elif status_name == "finished": # RQ standard, not "completed" job_ids = list(FinishedJobRegistry(queue=queue).get_job_ids())[:limit] elif status_name == "failed": job_ids = list(FailedJobRegistry(queue=queue).get_job_ids())[:limit] @@ -917,7 +898,7 @@ async def fetch_stats(): return get_job_stats() except Exception as e: logger.error(f"Error fetching stats: {e}") - return {"total_jobs": 0, "queued_jobs": 0, "processing_jobs": 0, "completed_jobs": 0, "failed_jobs": 0} + return {"total_jobs": 0, "queued_jobs": 0, "started_jobs": 0, "finished_jobs": 0, "failed_jobs": 0} async def fetch_streaming_status(): """Fetch streaming status.""" @@ -941,17 +922,12 @@ async def fetch_client_jobs(client_id: str): queues = QUEUE_NAMES def get_job_status(job): - if job.is_queued: - return "queued" - elif job.is_started: - return "processing" - elif job.is_finished: - return "completed" - elif job.is_failed: - return "failed" - elif job.is_deferred: - return "deferred" - else: + """Get job status using RQ's native method.""" + try: + return get_job_status_from_rq(job) + except RuntimeError: + logger.error(f"Job {job.id} status determination failed") + # Return unknown as fallback in dashboard context return "unknown" # Find all jobs for this session @@ -966,8 +942,8 @@ def get_job_status(job): registries = [ ("queued", queue.job_ids), - ("processing", StartedJobRegistry(queue=queue).get_job_ids()), - ("completed", FinishedJobRegistry(queue=queue).get_job_ids()), + ("started", StartedJobRegistry(queue=queue).get_job_ids()), # RQ standard + ("finished", FinishedJobRegistry(queue=queue).get_job_ids()), # RQ standard ("failed", FailedJobRegistry(queue=queue).get_job_ids()) ] @@ -1016,10 +992,10 @@ def get_job_status(job): logger.error(f"Error fetching jobs for client {client_id}: {e}") return {"client_id": client_id, "jobs": []} - # Execute all fetches in parallel + # Execute all fetches in parallel (using RQ standard status names) queued_jobs_task = fetch_jobs_by_status("queued", limit=100) - processing_jobs_task = fetch_jobs_by_status("processing", limit=100) - completed_jobs_task = fetch_jobs_by_status("completed", limit=50) + started_jobs_task = fetch_jobs_by_status("started", limit=100) # RQ standard, not "processing" + finished_jobs_task = fetch_jobs_by_status("finished", limit=50) # RQ standard, not "completed" failed_jobs_task = fetch_jobs_by_status("failed", limit=50) stats_task = fetch_stats() streaming_status_task = fetch_streaming_status() @@ -1027,8 +1003,8 @@ def get_job_status(job): results = await asyncio.gather( queued_jobs_task, - processing_jobs_task, - completed_jobs_task, + started_jobs_task, + finished_jobs_task, failed_jobs_task, stats_task, streaming_status_task, @@ -1037,8 +1013,8 @@ def get_job_status(job): ) queued_jobs = results[0] if not isinstance(results[0], Exception) else [] - processing_jobs = results[1] if not isinstance(results[1], Exception) else [] - completed_jobs = results[2] if not isinstance(results[2], Exception) else [] + started_jobs = results[1] if not isinstance(results[1], Exception) else [] # RQ standard + finished_jobs = results[2] if not isinstance(results[2], Exception) else [] # RQ standard failed_jobs = results[3] if not isinstance(results[3], Exception) else [] stats = results[4] if not isinstance(results[4], Exception) else {"total_jobs": 0} streaming_status = results[5] if not isinstance(results[5], Exception) else {"active_sessions": []} @@ -1066,8 +1042,8 @@ def get_job_status(job): return { "jobs": { "queued": queued_jobs, - "processing": processing_jobs, - "completed": completed_jobs, + "started": started_jobs, # RQ standard status name + "finished": finished_jobs, # RQ standard status name "failed": failed_jobs }, "stats": stats, diff --git a/backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py b/backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py index 6ec4fad4..224d69f4 100644 --- a/backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py +++ b/backends/advanced/src/advanced_omi_backend/services/audio_stream/producer.py @@ -216,19 +216,29 @@ async def mark_websocket_disconnected(self, session_id: str): }) logger.info(f"๐Ÿ”Œ Marked websocket disconnected for session {session_id}") - async def finalize_session(self, session_id: str): + async def finalize_session(self, session_id: str, completion_reason: str = None): """ Mark session as finalizing, send end marker, and clean up buffer. Args: session_id: Session identifier + completion_reason: Optional reason for session completion (e.g., "websocket_disconnect", "user_stopped") + This is set atomically with status to avoid race conditions. """ session_key = f"audio:session:{session_id}" - await self.redis_client.hset(session_key, mapping={ + # Build mapping with status and optional completion_reason + mapping = { "status": "finalizing", "finalized_at": str(time.time()) - }) + } + + # Set completion_reason atomically with status to prevent race conditions + if completion_reason: + mapping["completion_reason"] = completion_reason + logger.info(f"๐Ÿ“Š Finalizing session {session_id} with reason: {completion_reason}") + + await self.redis_client.hset(session_key, mapping=mapping) # Send end_marker to Redis stream so streaming consumer can close the connection if session_id in self.session_buffers: diff --git a/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py b/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py index 45773432..4bfae311 100644 --- a/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py @@ -142,43 +142,9 @@ async def transcribe(self, audio_data: bytes, sample_rate: int, diarize: bool = words = _dotted_get(data, extract.get("words")) or [] segments = _dotted_get(data, extract.get("segments")) or [] - # DEBUG: Log what we extracted - logger.debug(f"DEBUG Registry: Extracted {len(segments)} segments from response") - if segments and len(segments) > 0: - logger.debug(f"DEBUG Registry: First segment keys: {list(segments[0].keys()) if isinstance(segments[0], dict) else 'not a dict'}") - logger.debug(f"DEBUG Registry: First segment: {segments[0]}") - - # FIX: Normalize Deepgram segment structure - provider = self.model.model_provider.lower() if self.model.model_provider else "" - if provider == "deepgram" and segments: - normalized_segments = [] - for seg in segments: - # Deepgram segments may have nested structure - # Extract text from either 'text' or 'transcript' or 'sentences' - text_content = seg.get("text") or seg.get("transcript") or "" - - # Handle nested sentences structure - if not text_content and "sentences" in seg: - sentences = seg.get("sentences", []) - text_content = " ".join([s.get("text", "") for s in sentences if s.get("text")]) - - # Skip empty segments - if not text_content or not text_content.strip(): - logger.debug(f"Skipping empty Deepgram segment: {seg}") - continue - - # Build normalized segment - normalized_seg = { - "text": text_content.strip(), - "start": seg.get("start", 0.0), - "end": seg.get("end", 0.0), - "speaker": seg.get("speaker", "SPEAKER_00"), - "confidence": seg.get("confidence", 1.0) - } - normalized_segments.append(normalized_seg) - - segments = normalized_segments - logger.debug(f"Normalized {len(segments)} Deepgram segments") + # Ignore segments from all providers - speaker service creates them via diarization + segments = [] + logger.debug(f"Transcription: Extracted {len(words)} words, ignoring provider segments (speaker service will create them)") return {"text": text, "words": words, "segments": segments} @@ -393,8 +359,22 @@ def get_transcription_provider(provider_name: Optional[str] = None, mode: Option return RegistryBatchTranscriptionProvider() +def is_transcription_available(mode: str = "batch") -> bool: + """Check if transcription provider is available for given mode. + + Args: + mode: Either "batch" or "streaming" + + Returns: + True if a transcription provider is configured and available, False otherwise + """ + provider = get_transcription_provider(mode=mode) + return provider is not None + + __all__ = [ "get_transcription_provider", + "is_transcription_available", "RegistryBatchTranscriptionProvider", "RegistryStreamingTranscriptionProvider", "BaseTranscriptionProvider", diff --git a/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py b/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py index 8dbada0b..772ae33c 100644 --- a/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py +++ b/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py @@ -36,6 +36,21 @@ def __init__(self, service_url: Optional[str] = None): service_url: URL of the speaker recognition service (e.g., http://speaker-service:8085) If not provided, uses config.yml service_url or SPEAKER_SERVICE_URL env var """ + # Check if we should use mock client (for testing) + if os.getenv("USE_MOCK_SPEAKER_CLIENT") == "true": + try: + # Import mock client from testing module + from advanced_omi_backend.testing.mock_speaker_client import MockSpeakerRecognitionClient + + self._mock_client = MockSpeakerRecognitionClient() + self.enabled = True + self.service_url = "mock://speaker-service" + logger.info("๐ŸŽค Using MOCK speaker recognition client for tests") + return + except ImportError as e: + logger.error(f"Failed to import mock speaker client: {e}") + # Fall through to normal initialization + # Load speaker recognition config from config.yml registry = get_models_registry() if not registry or not registry.speaker_recognition: @@ -66,6 +81,35 @@ def __init__(self, service_url: Optional[str] = None): else: logger.info("Speaker recognition client disabled (no service URL configured)") + def calculate_timeout(self, audio_duration: Optional[float]) -> float: + """ + Calculate proportional timeout based on audio duration. + + Uses the formula: timeout = min(MAX_TIMEOUT, audio_duration * MULTIPLIER + BASE_TIMEOUT) + + Args: + audio_duration: Duration of audio in seconds + + Returns: + Calculated timeout in seconds + """ + BASE_TIMEOUT = 30.0 # Minimum timeout for short files + TIMEOUT_MULTIPLIER = 8.0 # Processing speed ratio (e.g., 1 min audio = 8 min timeout) + MAX_TIMEOUT = 600.0 # 10 minute cap for very long files + + if audio_duration is None or audio_duration <= 0: + logger.warning("Audio duration unknown or invalid, using base timeout") + return BASE_TIMEOUT + + calculated_timeout = audio_duration * TIMEOUT_MULTIPLIER + BASE_TIMEOUT + timeout = min(MAX_TIMEOUT, calculated_timeout) + + logger.info( + f"๐Ÿ• Calculated timeout: audio_duration={audio_duration:.1f}s โ†’ " + f"timeout={timeout:.1f}s (base={BASE_TIMEOUT}, multiplier={TIMEOUT_MULTIPLIER}, max={MAX_TIMEOUT})" + ) + return timeout + async def diarize_identify_match( self, conversation_id: str, @@ -88,10 +132,24 @@ async def diarize_identify_match( Returns: Dictionary containing segments with matched text and speaker identification """ + # Use mock client if configured + if hasattr(self, '_mock_client'): + return await self._mock_client.diarize_identify_match( + conversation_id, backend_token, transcript_data, user_id + ) + if not self.enabled: logger.info(f"๐ŸŽค Speaker recognition disabled, returning empty result") return {"segments": []} + # Fetch conversation to get audio duration for timeout calculation + from advanced_omi_backend.models.conversation import Conversation + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + audio_duration = conversation.audio_total_duration if conversation else None + + # Calculate proportional timeout based on audio duration + timeout = self.calculate_timeout(audio_duration) + try: logger.info(f"๐ŸŽค Calling speaker service with conversation_id: {conversation_id[:12]}...") @@ -151,7 +209,7 @@ async def diarize_identify_match( async with session.post( request_url, data=form_data, - timeout=aiohttp.ClientTimeout(total=120), + timeout=aiohttp.ClientTimeout(total=timeout), ) as response: logger.info(f"๐ŸŽค Speaker service response status: {response.status}") @@ -210,6 +268,11 @@ async def diarize_and_identify( f"({len(audio_data) / 1024 / 1024:.2f} MB)" ) + # Estimate audio duration from data size (assuming 16kHz, 16-bit PCM) + # WAV header is typically 44 bytes + estimated_duration = (len(audio_data) - 44) / 32000 # 16000 Hz * 2 bytes per sample + timeout = self.calculate_timeout(estimated_duration) + # Call the speaker recognition service async with aiohttp.ClientSession() as session: # Prepare the audio data for upload (no disk I/O!) @@ -256,7 +319,7 @@ async def diarize_and_identify( async with session.post( endpoint_url, data=form_data, - timeout=aiohttp.ClientTimeout(total=120), + timeout=aiohttp.ClientTimeout(total=timeout), ) as response: logger.info(f"๐ŸŽค [DIARIZE] Response status: {response.status}") @@ -325,6 +388,20 @@ async def identify_speakers(self, audio_path: str, segments: List[Dict]) -> Dict logger.info(f"Identifying {len(unique_speakers)} speakers in {audio_path}") + # Get audio duration for timeout calculation + import wave + try: + with wave.open(audio_path, "rb") as wav_file: + frame_count = wav_file.getnframes() + sample_rate = wav_file.getframerate() + audio_duration = frame_count / sample_rate if sample_rate > 0 else None + except Exception as e: + logger.warning(f"Failed to get audio duration from {audio_path}: {e}") + audio_duration = None + + # Calculate proportional timeout based on audio duration + timeout = self.calculate_timeout(audio_duration) + # Call the speaker recognition service async with aiohttp.ClientSession() as session: # Prepare the audio file for upload @@ -353,7 +430,7 @@ async def identify_speakers(self, audio_path: str, segments: List[Dict]) -> Dict async with session.post( f"{self.service_url}/diarize-and-identify", data=form_data, - timeout=aiohttp.ClientTimeout(total=120), + timeout=aiohttp.ClientTimeout(total=timeout), ) as response: if response.status != 200: logger.warning( diff --git a/backends/advanced/src/advanced_omi_backend/testing/__init__.py b/backends/advanced/src/advanced_omi_backend/testing/__init__.py new file mode 100644 index 00000000..8d430cdf --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/testing/__init__.py @@ -0,0 +1 @@ +"""Testing utilities and mocks for Chronicle backend.""" diff --git a/backends/advanced/src/advanced_omi_backend/testing/mock_speaker_client.py b/backends/advanced/src/advanced_omi_backend/testing/mock_speaker_client.py new file mode 100644 index 00000000..e53a556e --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/testing/mock_speaker_client.py @@ -0,0 +1,160 @@ +"""Mock speaker recognition client for testing without heavy ML dependencies.""" + +import logging +from typing import Dict, Optional + +logger = logging.getLogger(__name__) + + +class MockSpeakerRecognitionClient: + """ + Mock speaker recognition client that returns pre-computed segments. + + Used in test environments to avoid running resource-intensive speaker + recognition service. Segments are based on test_data.py expectations. + """ + + # Map audio filenames to mock segment data + # Segments follow the structure expected by the backend: + # { + # "start": float, # Start time in seconds + # "end": float, # End time in seconds + # "text": str, # Transcript text for this segment + # "speaker": int, # Speaker label (0, 1, 2, etc.) + # "identified_as": str, # Speaker name or "Unknown" + # "confidence": float # Optional confidence score + # } + + MOCK_SEGMENTS = { + "DIY_Experts_Glass_Blowing_16khz_mono_1min.wav": [ + { + "start": 0.0, + "end": 10.08, + "speaker": 0, + "identified_as": "Unknown", + "text": "The pumpkin that'll last for forever. Finally. Does it count? Today, we're taking a glass blowing class.", + "confidence": 0.95 + }, + { + "start": 10.28, + "end": 20.255, + "speaker": 0, + "identified_as": "Unknown", + "text": "I'm sweating already. We've worked with a lot of materials before, but we've only scratched the surface", + "confidence": 0.93 + }, + { + "start": 20.455, + "end": 21.895, + "speaker": 1, + "identified_as": "Unknown", + "text": "when it comes to glass", + "confidence": 0.91 + }, + { + "start": 22.095, + "end": 23.615, + "speaker": 0, + "identified_as": "Unknown", + "text": "and that's because", + "confidence": 0.94 + }, + { + "start": 23.815, + "end": 28.135, + "speaker": 1, + "identified_as": "Unknown", + "text": "a little intimidating. We've got about 400 pounds", + "confidence": 0.92 + }, + { + "start": 28.335, + "end": 43.08, + "speaker": 0, + "identified_as": "Unknown", + "text": "of liquid glass in this furnace right here. Nick's gonna really help us out. Nick, I'm excited and nervous. Me too.", + "confidence": 0.96 + }, + { + "start": 43.28, + "end": 44.48, + "speaker": 1, + "identified_as": "Unknown", + "text": "So we're gonna", + "confidence": 0.90 + }, + { + "start": 44.68, + "end": 46.76, + "speaker": 0, + "identified_as": "Unknown", + "text": "make what's called a trumpet", + "confidence": 0.95 + }, + { + "start": 46.96, + "end": 50.24, + "speaker": 0, + "identified_as": "Unknown", + "text": "flower. We're using gravity as a tool.", + "confidence": 0.93 + } + ] + } + + def __init__(self): + """Initialize mock client.""" + logger.info("๐ŸŽค Mock speaker recognition client initialized") + + async def diarize_identify_match( + self, + conversation_id: str, + backend_token: str, + transcript_data: Dict, + user_id: Optional[str] = None + ) -> Dict: + """ + Return pre-computed mock segments for known test audio files. + + Args: + conversation_id: Not used in mock (audio filename derived from transcript) + backend_token: Not used in mock + transcript_data: Dict with 'text' and 'words' - used to identify audio file + user_id: Not used in mock + + Returns: + Dictionary with 'segments' array matching speaker service format + """ + logger.info(f"๐ŸŽค Mock speaker client processing conversation: {conversation_id[:12]}...") + + # Try to identify which test audio this is from the transcript + transcript_text = transcript_data.get("text", "").lower() + + # Match by transcript content + if "glass blowing" in transcript_text or "glass" in transcript_text: + filename = "DIY_Experts_Glass_Blowing_16khz_mono_1min.wav" + if filename in self.MOCK_SEGMENTS: + segments = self.MOCK_SEGMENTS[filename] + logger.info(f"๐ŸŽค Mock returning {len(segments)} segments for DIY Glass Blowing audio") + return {"segments": segments} + + # Fallback: Create single generic segment + logger.warning(f"๐ŸŽค Mock: No pre-computed segments found, creating generic segment") + + # Get duration from words if available + words = transcript_data.get("words", []) + if words: + duration = words[-1].get("end", 60.0) + else: + duration = 60.0 + + return { + "segments": [{ + "start": 0.0, + "end": duration, + "speaker": 0, + "identified_as": "Unknown", + "text": transcript_data.get("text", ""), + "confidence": 0.85 + }] + } diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py index 406389a7..28581d09 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py @@ -587,20 +587,63 @@ async def reconstruct_audio_segment( channels=channels, ) - # Decode and concatenate chunks - pcm_data = await concatenate_chunks_to_pcm(chunks) + # Decode each chunk and clip to exact time boundaries for precise segment extraction + pcm_buffer = bytearray() + bytes_per_second = sample_rate * channels * 2 # 16-bit = 2 bytes per sample - # Build WAV file for this segment + for chunk in chunks: + # Decode this chunk to PCM + pcm_data = await decode_opus_to_pcm( + opus_data=chunk.audio_data, + sample_rate=chunk.sample_rate, + channels=chunk.channels, + ) + + # Calculate clip boundaries for this chunk + clip_start_byte = 0 + clip_end_byte = len(pcm_data) + + # Trim start if chunk begins before requested start_time + if chunk.start_time < start_time: + offset_seconds = start_time - chunk.start_time + offset_bytes = int(offset_seconds * bytes_per_second) + # Align to sample boundary (2 bytes for 16-bit audio) + clip_start_byte = (offset_bytes // 2) * 2 + + # Trim end if chunk extends past requested end_time + if chunk.end_time > end_time: + # Calculate duration from chunk start to requested end + duration_seconds = end_time - chunk.start_time + duration_bytes = int(duration_seconds * bytes_per_second) + # Align to sample boundary + clip_end_byte = (duration_bytes // 2) * 2 + + # Append only the clipped portion to buffer + if clip_start_byte < clip_end_byte: + clipped_pcm = pcm_data[clip_start_byte:clip_end_byte] + pcm_buffer.extend(clipped_pcm) + + logger.debug( + f"Chunk {chunk.chunk_index}: [{chunk.start_time:.1f}s - {chunk.end_time:.1f}s] " + f"โ†’ clipped [{max(chunk.start_time, start_time):.1f}s - {min(chunk.end_time, end_time):.1f}s] " + f"({len(clipped_pcm)} bytes)" + ) + + # Build WAV file from precisely trimmed PCM data wav_bytes = await build_wav_from_pcm( - pcm_data=pcm_data, + pcm_data=bytes(pcm_buffer), sample_rate=sample_rate, channels=channels, ) + actual_duration = len(pcm_buffer) / bytes_per_second + expected_duration = end_time - start_time + logger.info( f"Reconstructed audio segment for {conversation_id[:8]}...: " f"{start_time:.1f}s - {end_time:.1f}s " - f"({len(chunks)} chunks, {len(wav_bytes)} bytes)" + f"({len(chunks)} chunks, {len(wav_bytes)} bytes WAV, " + f"actual duration: {actual_duration:.2f}s, expected: {expected_duration:.2f}s)" ) return wav_bytes diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py index ce81bbb8..f8ba07a3 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py @@ -3,15 +3,18 @@ ############################################################################### import asyncio +import io import logging import os import time import uuid as uuid_lib +import wave from pathlib import Path # Type import to avoid circular imports from typing import TYPE_CHECKING, Optional +import numpy as np from wyoming.audio import AudioChunk if TYPE_CHECKING: @@ -30,10 +33,85 @@ class AudioValidationError(Exception): pass +async def resample_audio_with_ffmpeg( + audio_data: bytes, + input_sample_rate: int, + input_channels: int, + input_sample_width: int, + target_sample_rate: int, + target_channels: int = 1 +) -> bytes: + """ + Resample audio using FFmpeg with stdin/stdout pipes (no disk I/O). + + Args: + audio_data: Raw PCM audio bytes + input_sample_rate: Input sample rate in Hz + input_channels: Number of input channels + input_sample_width: Input sample width in bytes (2 for 16-bit, 4 for 32-bit) + target_sample_rate: Target sample rate in Hz + target_channels: Target number of channels (default: 1 for mono) + + Returns: + Resampled PCM audio bytes (16-bit signed little-endian) + + Raises: + RuntimeError: If FFmpeg resampling fails + """ + # Determine FFmpeg format based on sample width + if input_sample_width == 2: + input_format = "s16le" # 16-bit signed little-endian + elif input_sample_width == 4: + input_format = "s32le" # 32-bit signed little-endian + else: + raise AudioValidationError( + f"Unsupported sample width: {input_sample_width} bytes (only 2 or 4 supported)" + ) + + # FFmpeg command for resampling via pipes + # pipe:0 = stdin, pipe:1 = stdout + cmd = [ + "ffmpeg", + "-f", input_format, + "-ar", str(input_sample_rate), + "-ac", str(input_channels), + "-i", "pipe:0", # Read from stdin + "-ar", str(target_sample_rate), + "-ac", str(target_channels), + "-f", "s16le", # Always output 16-bit + "pipe:1", # Write to stdout + ] + + # Run FFmpeg with piped I/O + process = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Send input data and get output + stdout, stderr = await process.communicate(input=audio_data) + + if process.returncode != 0: + error_msg = stderr.decode() if stderr else "Unknown error" + audio_logger.error(f"FFmpeg resampling failed: {error_msg}") + raise RuntimeError(f"Audio resampling failed: {error_msg}") + + audio_logger.info( + f"Resampled audio: {input_sample_rate}Hz/{input_channels}ch โ†’ " + f"{target_sample_rate}Hz/{target_channels}ch " + f"({len(audio_data)} โ†’ {len(stdout)} bytes)" + ) + + return stdout + + async def validate_and_prepare_audio( audio_data: bytes, expected_sample_rate: int = 16000, - convert_to_mono: bool = True + convert_to_mono: bool = True, + auto_resample: bool = False ) -> tuple[bytes, int, int, int, float]: """ Validate WAV audio data and prepare it for processing. @@ -42,6 +120,7 @@ async def validate_and_prepare_audio( audio_data: Raw WAV file bytes expected_sample_rate: Expected sample rate (default: 16000 Hz) convert_to_mono: Whether to convert stereo to mono (default: True) + auto_resample: Whether to automatically resample audio if sample rate doesn't match (default: False) Returns: Tuple of (processed_audio_data, sample_rate, sample_width, channels, duration) @@ -49,10 +128,6 @@ async def validate_and_prepare_audio( Raises: AudioValidationError: If audio validation fails """ - import io - import wave - import numpy as np - try: # Parse WAV file with wave.open(io.BytesIO(audio_data), "rb") as wav_file: @@ -68,13 +143,36 @@ async def validate_and_prepare_audio( except Exception as e: raise AudioValidationError(f"Invalid WAV file: {str(e)}") - # Validate sample rate + # Handle sample rate mismatch if sample_rate != expected_sample_rate: - raise AudioValidationError( - f"Sample rate must be {expected_sample_rate}Hz, got {sample_rate}Hz" - ) + if auto_resample: + audio_logger.info( + f"Auto-resampling audio from {sample_rate}Hz to {expected_sample_rate}Hz" + ) + # Resample audio using FFmpeg (with pipes, no disk I/O) + processed_audio = await resample_audio_with_ffmpeg( + audio_data=processed_audio, + input_sample_rate=sample_rate, + input_channels=channels, + input_sample_width=sample_width, + target_sample_rate=expected_sample_rate, + target_channels=1 if convert_to_mono else channels + ) + # Update metadata after resampling + sample_rate = expected_sample_rate + sample_width = 2 # FFmpeg outputs 16-bit + if convert_to_mono: + channels = 1 + # Recalculate duration + duration = len(processed_audio) / (sample_rate * sample_width * channels) + # Skip stereo-to-mono conversion since resampling already handled it + convert_to_mono = False + else: + raise AudioValidationError( + f"Sample rate must be {expected_sample_rate}Hz, got {sample_rate}Hz" + ) - # Convert stereo to mono if requested + # Convert stereo to mono if requested and not already done if convert_to_mono and channels == 2: audio_logger.info(f"Converting stereo audio to mono") diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index ca62372b..27af4bfa 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -81,6 +81,10 @@ def analyze_speech(transcript_data: dict) -> dict: settings = get_speech_detection_settings() words = transcript_data.get("words", []) + logger.info(f"๐Ÿ”ฌ analyze_speech: words_list_length={len(words)}, settings={settings}") + if words and len(words) > 0: + logger.info(f"๐Ÿ“ First 3 words: {words[:3]}") + # Method 1: Word-level analysis (preferred - has confidence scores and timing) if words: # Filter by confidence threshold @@ -98,6 +102,12 @@ def analyze_speech(transcript_data: dict) -> dict: speech_end = valid_words[-1].get("end", 0) speech_duration = speech_end - speech_start + # Debug logging for timestamp investigation + logger.info( + f"๐Ÿ• Speech timing: start={speech_start:.2f}s, end={speech_end:.2f}s, " + f"duration={speech_duration:.2f}s (first_word={valid_words[0]}, last_word={valid_words[-1]})" + ) + # If no timing data (duration = 0), fall back to text-only analysis # This happens with some streaming transcription services if speech_duration == 0: @@ -106,6 +116,7 @@ def analyze_speech(transcript_data: dict) -> dict: else: # Check minimum duration threshold when we have timing data min_duration = settings.get("min_duration", 10.0) + logger.info(f"๐Ÿ“ Comparing duration {speech_duration:.1f}s vs threshold {min_duration:.1f}s") if speech_duration < min_duration: return { "has_speech": False, @@ -427,23 +438,46 @@ async def track_speech_activity( speech_analysis: Dict[str, Any], last_word_count: int, conversation_id: str, redis_client ) -> tuple[float, int]: """ - Track new speech activity and update last speech timestamp. + Track new speech activity and update last speech timestamp using audio timestamps. - Uses word count instead of chunk count to avoid false positives from noise/silence. + Uses word count to detect new speech, and audio timestamps (speech_end) to track + when the last speech occurred in the audio stream (not wall-clock time). Args: - speech_analysis: Speech analysis results from analyze_speech() + speech_analysis: Speech analysis results from analyze_speech() with: + - word_count: Number of words detected + - speech_end: Audio timestamp of last word (if available) + - fallback: True if using text-only analysis without timing last_word_count: Previous word count conversation_id: Conversation ID for Redis key redis_client: Redis client instance Returns: Tuple of (last_meaningful_speech_time, new_word_count) + Note: last_meaningful_speech_time is audio timestamp, NOT wall-clock time """ current_word_count = speech_analysis.get("word_count", 0) if current_word_count > last_word_count: - last_meaningful_speech_time = time.time() + # Use audio timestamp (speech_end) when available + speech_end = speech_analysis.get("speech_end") + is_fallback = speech_analysis.get("fallback", False) + + if speech_end is not None and speech_end > 0: + # Preferred: Use audio timestamp from word-level timing + last_meaningful_speech_time = speech_end + logger.debug( + f"๐Ÿ—ฃ๏ธ New speech detected (word count: {current_word_count}), " + f"audio timestamp: {speech_end:.2f}s" + ) + else: + # Fallback: Use wall-clock time when word-level timing unavailable + # This happens with text-only transcription or missing timing data + last_meaningful_speech_time = time.time() + logger.warning( + f"โš ๏ธ Using wall-clock time for speech tracking (no audio timestamps available). " + f"Word count: {current_word_count}, fallback={is_fallback}" + ) # Store timestamp in Redis for visibility/debugging await redis_client.set( @@ -451,9 +485,6 @@ async def track_speech_activity( last_meaningful_speech_time, ex=86400, # 24 hour TTL ) - logger.debug( - f"๐Ÿ—ฃ๏ธ New speech detected (word count: {current_word_count}), updated last_speech timestamp" - ) return last_meaningful_speech_time, current_word_count @@ -494,6 +525,15 @@ async def update_job_progress_metadata( if "created_at" not in current_job.meta: current_job.meta["created_at"] = datetime.now().isoformat() + # Calculate inactivity based on audio-relative timestamps + # Both current_audio_time and last_meaningful_speech_time are seconds into the audio stream + current_audio_time = speech_analysis.get("speech_end", 0.0) + inactivity_seconds = ( + current_audio_time - last_meaningful_speech_time + if current_audio_time > 0 and last_meaningful_speech_time > 0 + else 0 + ) + current_job.meta.update( { "conversation_id": conversation_id, @@ -507,7 +547,7 @@ async def update_job_progress_metadata( "duration_seconds": speech_analysis.get("duration", 0), "has_speech": speech_analysis.get("has_speech", False), "last_update": datetime.now().isoformat(), - "inactivity_seconds": time.time() - last_meaningful_speech_time, + "inactivity_seconds": inactivity_seconds, "chunks_processed": combined["chunk_count"], } ) diff --git a/backends/advanced/src/advanced_omi_backend/utils/job_utils.py b/backends/advanced/src/advanced_omi_backend/utils/job_utils.py index ba9fcc74..c9028909 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/job_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/job_utils.py @@ -44,7 +44,7 @@ async def check_job_alive(redis_client, current_job, session_id: Optional[str] = if session_id: session_key = f"audio:session:{session_id}" session_status = await redis_client.hget(session_key, "status") - if session_status and session_status.decode() in ["finalizing", "complete", "closed"]: + if session_status and session_status.decode() in ["finalizing", "finished"]: # Session ended naturally - not a zombie, just natural cleanup logger.debug(f"๐Ÿ“‹ Job {current_job.id} ending naturally (session closed)") return False diff --git a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py index 58acad62..8505d547 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py @@ -210,7 +210,7 @@ async def flush_pcm_buffer(): # Check if session is finalizing session_status = await redis_client.hget(session_key, "status") - if session_status and session_status.decode() in ["finalizing", "complete"]: + if session_status and session_status.decode() in ["finalizing", "finished"]: logger.info(f"๐Ÿ›‘ Session finalizing detected, flushing final chunks...") await asyncio.sleep(0.5) # Brief wait for in-flight chunks diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index e458a7fe..febdfbd8 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -8,7 +8,7 @@ import logging import time, os from datetime import datetime -from typing import Dict, Any +from typing import Dict, Any, Optional from rq.job import Job from rq.exceptions import NoSuchJobError @@ -16,6 +16,7 @@ from advanced_omi_backend.controllers.queue_controller import redis_conn from advanced_omi_backend.controllers.session_controller import mark_session_complete from advanced_omi_backend.services.plugin_service import get_plugin_router, init_plugin_router +from datetime import datetime from advanced_omi_backend.utils.conversation_utils import ( analyze_speech, @@ -289,9 +290,9 @@ async def open_conversation_job( # Inactivity timeout configuration inactivity_timeout_seconds = float(os.getenv("SPEECH_INACTIVITY_THRESHOLD_SECONDS", "60")) inactivity_timeout_minutes = inactivity_timeout_seconds / 60 - last_meaningful_speech_time = time.time() # Initialize with conversation start + last_meaningful_speech_time = 0.0 # Initialize with audio time 0 (will be updated with first speech) timeout_triggered = False # Track if closure was due to timeout - last_inactivity_log_time = time.time() # Track when we last logged inactivity + last_inactivity_log_time = time.time() # Track when we last logged inactivity (wall-clock for logging) last_word_count = 0 # Track word count to detect actual new speech # Test mode: wait for audio queue to drain before timing out @@ -315,7 +316,7 @@ async def open_conversation_job( status = await redis_client.hget(session_key, "status") status_str = status.decode() if status else None - if status_str in ["finalizing", "complete"]: + if status_str in ["finalizing", "finished"]: finalize_received = True # Get completion reason (guaranteed to exist with unified API) @@ -407,8 +408,21 @@ async def open_conversation_job( last_meaningful_speech_time=last_meaningful_speech_time, ) - # Check inactivity timeout and log every 10 seconds - inactivity_duration = time.time() - last_meaningful_speech_time + # Check inactivity timeout using audio time (not wall-clock time) + # Get current audio time from latest transcription + current_audio_time = speech_analysis.get("speech_end", 0.0) + + # Calculate inactivity based on audio timestamps + # Only check if we have valid audio timing data + if current_audio_time > 0 and last_meaningful_speech_time > 0: + inactivity_duration = current_audio_time - last_meaningful_speech_time + else: + # Fallback: No audio timestamps available (text-only transcription) + # Can't reliably detect inactivity, so skip timeout check this iteration + inactivity_duration = 0 + if speech_analysis.get("fallback", False): + logger.debug("โš ๏ธ Skipping inactivity check (no audio timestamps available)") + current_time = time.time() # Log inactivity every 10 seconds @@ -469,6 +483,11 @@ async def open_conversation_job( 'word_count': speech_analysis.get('word_count', 0), } + logger.info( + f"๐Ÿ”Œ DISPATCH: transcript.streaming event " + f"(conversation={conversation_id[:12]}, segment_id={session_id}_{current_count})" + ) + plugin_results = await plugin_router.dispatch_event( event='transcript.streaming', user_id=user_id, @@ -476,6 +495,10 @@ async def open_conversation_job( metadata={'client_id': client_id} ) + logger.info( + f"๐Ÿ”Œ RESULT: transcript.streaming dispatched to {len(plugin_results) if plugin_results else 0} plugins" + ) + if plugin_results: logger.info(f"๐Ÿ“Œ Triggered {len(plugin_results)} streaming transcript plugins") for result in plugin_results: @@ -496,17 +519,18 @@ async def open_conversation_job( ) # Determine end reason based on how we exited the loop - # Check session completion_reason from Redis (set by WebSocket controller on disconnect) + # Check session completion_reason from Redis (set atomically with status by finalize_session) completion_reason = await redis_client.hget(session_key, "completion_reason") completion_reason_str = completion_reason.decode() if completion_reason else None # Determine end_reason with proper precedence: - # 1. websocket_disconnect (explicit disconnect from client) + # 1. completion_reason from Redis (set by WebSocket controller: websocket_disconnect, user_stopped) # 2. inactivity_timeout (no speech for SPEECH_INACTIVITY_THRESHOLD_SECONDS) # 3. max_duration (conversation exceeded max runtime) - # 4. user_stopped (user manually stopped recording) - if completion_reason_str == "websocket_disconnect": - end_reason = "websocket_disconnect" + # 4. user_stopped (fallback for any other exit condition) + if completion_reason_str: + end_reason = completion_reason_str + logger.info(f"๐Ÿ“Š Using completion_reason from session: {end_reason}") elif timeout_triggered: end_reason = "inactivity_timeout" elif time.time() - start_time > max_runtime: @@ -564,74 +588,39 @@ async def open_conversation_job( # Create transcript version from streaming results version_id = f"streaming_{session_id[:12]}" transcript_text = final_transcript.get("text", "") - segments_data = final_transcript.get("segments", []) - - # If streaming provider didn't provide segments (e.g., Deepgram streaming), - # create segments from individual final results with word-level data - if not segments_data: - logger.info(f"๐Ÿ“ No segments in streaming results, creating from word-level data") - results = await aggregator.get_session_results(session_id) - - for result in results: - words = result.get("words", []) - text = result.get("text", "").strip() - - # Skip empty results or results without timing data - # WARNING: We don't support results without word-level timing data. - # Ideally should error, but skipping for now to handle edge cases gracefully. - if not words or not text: - continue - - # Create segment dict from this result chunk - # Each "final" result becomes one segment with generic speaker label - segment_dict = { - "start": words[0]["start"], - "end": words[-1]["end"], - "text": text, - "speaker": "SPEAKER_00", # Generic label, updated by speaker recognition - "confidence": result.get("confidence"), - "words": words # Already in correct format from aggregator - } - segments_data.append(segment_dict) - - logger.info(f"โœ… Created {len(segments_data)} segments from streaming results") - - # Convert segments to SpeakerSegment format with word-level timestamps - segments = [ - Conversation.SpeakerSegment( - start=seg.get("start", 0.0), - end=seg.get("end", 0.0), - text=seg.get("text", ""), - speaker=seg.get("speaker", "SPEAKER_00"), - confidence=seg.get("confidence"), - words=[ - Conversation.Word( - word=w.get("word", ""), - start=w.get("start", 0.0), - end=w.get("end", 0.0), - confidence=w.get("confidence") - ) - for w in seg.get("words", []) - ] + words_data = final_transcript.get("words", []) # All words from aggregator + + # Convert words to Word objects + words = [ + Conversation.Word( + word=w.get("word", ""), + start=w.get("start", 0.0), + end=w.get("end", 0.0), + confidence=w.get("confidence") ) - for seg in segments_data + for w in words_data ] + # Segments remain EMPTY until speaker recognition service processes them + # Per Chronicle architecture: segments ONLY come from speaker service + segments = [] + # Determine provider from streaming results provider = final_transcript.get("provider", "deepgram") - # Add streaming transcript as the initial version + # Add streaming transcript with words at version level conversation.add_transcript_version( version_id=version_id, transcript=transcript_text, - segments=segments, + words=words, # Store at version level + segments=segments, # Empty - only speaker service creates segments provider=provider, model=provider, # Provider name as model processing_time_seconds=None, # Not applicable for streaming metadata={ "source": "streaming", "chunk_count": final_transcript.get("chunk_count", 0), - "word_count": len(final_transcript.get("words", [])) + "word_count": len(words), }, set_as_active=True ) @@ -640,7 +629,7 @@ async def open_conversation_job( await conversation.save() logger.info( f"โœ… Saved streaming transcript: {len(transcript_text)} chars, " - f"{len(segments)} segments, {len(final_transcript.get('words', []))} words " + f"{len(segments)} segments (empty until speaker recognition), {len(words)} words " f"for conversation {conversation_id[:12]}" ) @@ -651,7 +640,8 @@ async def open_conversation_job( conversation_id=conversation_id, user_id=user_id, transcript_version_id=version_id, # Pass the streaming transcript version ID - client_id=client_id # Pass client_id for UI tracking + client_id=client_id, # Pass client_id for UI tracking + end_reason=end_reason # Pass the determined end_reason (websocket_disconnect, inactivity_timeout, etc.) ) logger.info( @@ -663,41 +653,11 @@ async def open_conversation_job( # Wait a moment to ensure jobs are registered in RQ await asyncio.sleep(0.5) - # Trigger conversation-level plugins - try: - plugin_router = get_plugin_router() - if plugin_router: - # Get conversation data for plugin context - conversation_model = await Conversation.find_one( - Conversation.conversation_id == conversation_id - ) - - plugin_data = { - 'conversation': { - 'conversation_id': conversation_id, - 'client_id': client_id, - 'user_id': user_id, - }, - 'transcript': conversation_model.transcript if conversation_model else "", - 'duration': time.time() - start_time, - 'conversation_id': conversation_id, - } - - plugin_results = await plugin_router.dispatch_event( - event='conversation.complete', - user_id=user_id, - data=plugin_data, - metadata={'end_reason': end_reason} - ) - - if plugin_results: - logger.info(f"๐Ÿ“Œ Triggered {len(plugin_results)} conversation-level plugins") - for result in plugin_results: - if result.message: - logger.info(f" Plugin result: {result.message}") - - except Exception as e: - logger.warning(f"โš ๏ธ Error triggering conversation-level plugins: {e}") + # Note: conversation.complete event dispatch job is already enqueued by start_post_conversation_jobs + # It runs after memory and title/summary jobs complete, ensuring all data is ready + logger.info( + f"โœ… Post-conversation pipeline started with event dispatch job (end_reason={end_reason})" + ) # Call shared cleanup/restart logic return await handle_end_of_conversation( @@ -845,20 +805,23 @@ async def dispatch_conversation_complete_event_job( conversation_id: str, client_id: str, user_id: str, + end_reason: Optional[str] = None, *, redis_client=None ) -> Dict[str, Any]: """ - Dispatch conversation.complete plugin event for file upload processing. + Dispatch conversation.complete plugin event for all conversation sources. - This job runs at the end of the post-conversation job chain to ensure - plugins receive the conversation.complete event for uploaded audio files. - WebSocket streaming dispatches this event in open_conversation_job instead. + This job runs at the end of conversation processing to ensure plugins + receive the conversation.complete event with the correct end_reason. + Used by both file upload and WebSocket streaming paths. Args: conversation_id: Conversation ID client_id: Client ID user_id: User ID + end_reason: Reason the conversation ended (e.g., 'file_upload', 'websocket_disconnect', 'user_stopped') + Defaults to 'file_upload' for backward compatibility redis_client: Redis client (injected by decorator) Returns: @@ -876,6 +839,21 @@ async def dispatch_conversation_complete_event_job( logger.error(f"Conversation {conversation_id} not found") return {"success": False, "error": "Conversation not found"} + # Save end_reason and completed_at to database if not already set + # This ensures end_reason is persisted before plugins receive conversation.complete event + if end_reason and conversation.end_reason is None: + try: + conversation.end_reason = Conversation.EndReason(end_reason) + except ValueError: + logger.warning(f"โš ๏ธ Invalid end_reason '{end_reason}', using UNKNOWN") + conversation.end_reason = Conversation.EndReason.UNKNOWN + + if conversation.completed_at is None: + conversation.completed_at = datetime.utcnow() + + await conversation.save() + logger.info(f"๐Ÿ’พ Saved end_reason={conversation.end_reason} to conversation {conversation_id[:12]} in event dispatch job") + # Get user email for event data from advanced_omi_backend.models.user import User user = await User.get(user_id) @@ -885,22 +863,40 @@ async def dispatch_conversation_complete_event_job( try: # Get or initialize plugin router (same pattern as transcription_jobs.py) plugin_router = get_plugin_router() + if not plugin_router: - logger.info("๐Ÿ”ง Initializing plugin router in worker process...") + logger.warning("๐Ÿ”ง Plugin router not found in worker process - attempting initialization...") plugin_router = init_plugin_router() - # Initialize all plugins asynchronously (same as app_factory.py) if plugin_router: + logger.info(f"๐Ÿ”ง Plugin router initialized with {len(plugin_router.plugins)} plugin(s)") + + # Initialize all plugins for plugin_id, plugin in plugin_router.plugins.items(): try: + logger.info(f" Initializing plugin '{plugin_id}'...") await plugin.initialize() - logger.info(f"โœ… Plugin '{plugin_id}' initialized") + logger.info(f" โœ“ Plugin '{plugin_id}' initialized") except Exception as e: - logger.error(f"Failed to initialize plugin '{plugin_id}': {e}") + logger.error(f" โœ— Failed to initialize plugin '{plugin_id}': {e}", exc_info=True) + else: + logger.error("๐Ÿ”ง Plugin router initialization FAILED - router is None") + # CRITICAL CHECK: Fail loudly if no router if not plugin_router: - logger.warning("โš ๏ธ Plugin router could not be initialized, skipping event dispatch") - return {"success": True, "skipped": True, "reason": "No plugin router"} + error_msg = ( + f"โŒ Plugin router could not be initialized in worker process. " + f"conversation.complete event for {conversation_id[:12]} will NOT be dispatched!" + ) + logger.error(error_msg) + + return { + "success": False, + "skipped": True, + "reason": "No plugin router", + "conversation_id": conversation_id, + "error": error_msg + } plugin_data = { 'conversation': { @@ -912,16 +908,28 @@ async def dispatch_conversation_complete_event_job( 'conversation_id': conversation_id, } + # Use provided end_reason or default to 'file_upload' for backward compatibility + actual_end_reason = end_reason or 'file_upload' + + logger.info( + f"๐Ÿ”Œ DISPATCH: conversation.complete event for {conversation_id[:12]} " + f"(end_reason={actual_end_reason}, user={user_id}, client={client_id})" + ) + plugin_results = await plugin_router.dispatch_event( event='conversation.complete', user_id=user_id, data=plugin_data, - metadata={'end_reason': 'file_upload'} + metadata={'end_reason': actual_end_reason} ) + logger.info( + f"๐Ÿ”Œ RESULT: conversation.complete dispatched to {len(plugin_results) if plugin_results else 0} plugins" + ) if plugin_results: logger.info(f"๐Ÿ“Œ Triggered {len(plugin_results)} conversation-level plugins") for result in plugin_results: + logger.info(f" Plugin result: success={result.success}, message={result.message}") if result.message: logger.info(f" Plugin result: {result.message}") diff --git a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py index ee02b065..1a5700bd 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py @@ -271,6 +271,11 @@ async def process_memory_job(conversation_id: str, *, redis_client=None) -> Dict 'conversation_id': conversation_id, } + logger.info( + f"๐Ÿ”Œ DISPATCH: memory.processed event " + f"(conversation={conversation_id[:12]}, memories={len(created_memory_ids)})" + ) + plugin_results = await plugin_router.dispatch_event( event='memory.processed', user_id=user_id, @@ -281,6 +286,10 @@ async def process_memory_job(conversation_id: str, *, redis_client=None) -> Dict } ) + logger.info( + f"๐Ÿ”Œ RESULT: memory.processed dispatched to {len(plugin_results) if plugin_results else 0} plugins" + ) + if plugin_results: logger.info(f"๐Ÿ“Œ Triggered {len(plugin_results)} memory-level plugins") for result in plugin_results: diff --git a/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py index 1956f00b..8c67616d 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/obsidian_jobs.py @@ -34,7 +34,7 @@ async def ingest_obsidian_vault_job(job_id: str, vault_path: str, redis_client=N logger.info("Starting Obsidian ingestion job %s", job.id) # Initialize job meta - job.meta["status"] = "processing" + job.meta["status"] = "started" job.meta["processed"] = 0 job.meta["total_files"] = 0 job.meta["errors"] = [] @@ -74,10 +74,10 @@ async def ingest_obsidian_vault_job(job_id: str, vault_path: str, redis_client=N # Check for cancellation job.refresh() if job.get_status() == "canceled": - logger.info("Obsidian ingestion job %s cancelled by user", job.id) - job.meta["status"] = "cancelled" + logger.info("Obsidian ingestion job %s canceled by user", job.id) + job.meta["status"] = "canceled" job.save_meta() - return {"status": "cancelled"} + return {"status": "canceled"} try: note_data = obsidian_service.parse_obsidian_note(root, filename, vault_path) @@ -96,12 +96,12 @@ async def ingest_obsidian_vault_job(job_id: str, vault_path: str, redis_client=N job.meta["errors"] = errors job.save_meta() - job.meta["status"] = "completed" + job.meta["status"] = "finished" job.save_meta() - + return { - "status": "completed", - "processed": processed, - "total": total, + "status": "finished", + "processed": processed, + "total": total, "errors": errors } diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py index 9b1149e2..310999c6 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py @@ -236,7 +236,7 @@ def _handle_registration_loss(self): "(replicating old start-workers.sh behavior)" ) - # Restart all RQ workers + # Restart all RQ workers (this method now handles timestamp update internally) success = self._restart_all_rq_workers() if success: @@ -244,12 +244,9 @@ def _handle_registration_loss(self): else: logger.error("โŒ Bulk restart encountered errors - check individual worker logs") - # Update recovery timestamp to start cooldown - self.last_registration_recovery = current_time - def _restart_all_rq_workers(self) -> bool: """ - Restart all RQ workers (bulk restart). + Restart all RQ workers (bulk restart) with timing measurements. This matches the old bash script's recovery mechanism: - Kill all RQ workers @@ -269,19 +266,66 @@ def _restart_all_rq_workers(self) -> bool: logger.warning("No RQ workers found to restart") return False - logger.info(f"Restarting {len(rq_workers)} RQ workers...") + # START TIMING + bulk_restart_start = time.time() + logger.warning( + f"โš ๏ธ RQ worker registration lost! " + f"Starting bulk restart of {len(rq_workers)} workers at {time.strftime('%H:%M:%S')}" + ) all_success = True - for worker in rq_workers: - logger.info(f" โ†ป Restarting {worker.name}...") + worker_times = [] # Track individual worker restart times + + for i, worker in enumerate(rq_workers, 1): + worker_start = time.time() + logger.info( + f" [{i}/{len(rq_workers)}] โ†ป Restarting {worker.name} at {time.strftime('%H:%M:%S')}..." + ) + success = self.process_manager.restart_worker(worker.name) + worker_duration = time.time() - worker_start + worker_times.append((worker.name, worker_duration)) + if success: - logger.info(f" โœ“ {worker.name} restarted successfully") + logger.info( + f" [{i}/{len(rq_workers)}] โœ“ {worker.name} restarted in {worker_duration:.2f}s" + ) else: - logger.error(f" โœ— {worker.name} restart failed") + logger.error( + f" [{i}/{len(rq_workers)}] โœ— {worker.name} restart failed after {worker_duration:.2f}s" + ) all_success = False + # END TIMING + total_duration = time.time() - bulk_restart_start + + # Log timing summary + logger.info(f"\nโฑ๏ธ Bulk Restart Timing Summary:") + logger.info(f" Total workers: {len(rq_workers)}") + logger.info( + f" Total time: {total_duration:.2f}s ({total_duration/60:.1f} minutes)" + ) + logger.info(f" Average per worker: {total_duration/len(rq_workers):.2f}s") + + if worker_times: + slowest = max(worker_times, key=lambda x: x[1]) + fastest = min(worker_times, key=lambda x: x[1]) + logger.info(f" Slowest worker: {slowest[0]} ({slowest[1]:.2f}s)") + logger.info(f" Fastest worker: {fastest[0]} ({fastest[1]:.2f}s)") + + # Update recovery timestamp (moved here from _handle_registration_loss) + self.last_registration_recovery = time.time() + + if all_success: + logger.info( + f"โœ… Successfully restarted all {len(rq_workers)} RQ workers in {total_duration:.2f}s" + ) + else: + logger.warning( + f"โš ๏ธ Some workers failed to restart (took {total_duration:.2f}s total)" + ) + return all_success def get_health_status(self) -> dict: diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py index 21b7f23e..5448b96f 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py @@ -240,7 +240,7 @@ def stop_all(self, timeout: int = 30) -> bool: def restart_worker(self, name: str, timeout: int = 30) -> bool: """ - Restart a specific worker. + Restart a specific worker with timing measurements. Args: name: Worker name @@ -254,23 +254,44 @@ def restart_worker(self, name: str, timeout: int = 30) -> bool: logger.error(f"Worker '{name}' not found") return False - logger.info(f"Restarting worker: {name}") + restart_start = time.time() + logger.info(f"{name}: Starting restart at {time.strftime('%H:%M:%S')}") - # Ensure worker is fully stopped before attempting restart + # STOP phase with timing + stop_start = time.time() stop_success = worker.stop(timeout=timeout) + stop_duration = time.time() - stop_start + if not stop_success: - logger.error(f"{name}: Failed to stop cleanly, restart aborted") + logger.error( + f"{name}: Failed to stop cleanly after {stop_duration:.2f}s " + f"(timeout was {timeout}s), restart aborted" + ) worker.state = WorkerState.FAILED return False - # Attempt to start the worker + logger.info( + f"{name}: Stopped in {stop_duration:.2f}s (timeout was {timeout}s)" + ) + + # START phase with timing + start_start = time.time() success = worker.start() + start_duration = time.time() - start_start + + total_restart_time = time.time() - restart_start if success: worker.restart_count += 1 - logger.info(f"{name}: Restart #{worker.restart_count} successful") + logger.info( + f"{name}: Restart #{worker.restart_count} successful " + f"(stop: {stop_duration:.2f}s, start: {start_duration:.2f}s, total: {total_restart_time:.2f}s)" + ) else: - logger.error(f"{name}: Restart failed") + logger.error( + f"{name}: Restart failed after {total_restart_time:.2f}s " + f"(stop: {stop_duration:.2f}s, start attempt: {start_duration:.2f}s)" + ) return success diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index f7379108..d9adbada 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -207,9 +207,36 @@ async def recognise_speakers_job( actual_transcript_text = transcript_text or transcript_version.transcript or "" actual_words = words if words else [] - # If words not provided, we need to get them from metadata - if not actual_words and transcript_version.metadata: + # If words not provided as parameter, read from version.words field (standardized location) + if not actual_words and transcript_version.words: + # Convert Word objects to dicts for speaker service API + actual_words = [ + { + "word": w.word, + "start": w.start, + "end": w.end, + "confidence": w.confidence + } + for w in transcript_version.words + ] + logger.info(f"๐Ÿ”ค Loaded {len(actual_words)} words from transcript version.words field") + # Backward compatibility: Fall back to metadata if words field is empty (old data) + elif not actual_words and transcript_version.metadata.get("words"): actual_words = transcript_version.metadata.get("words", []) + logger.info(f"๐Ÿ”ค Loaded {len(actual_words)} words from transcript version metadata (legacy)") + # Backward compatibility: Extract from segments if that's all we have (old streaming data) + elif not actual_words and transcript_version.segments: + for segment in transcript_version.segments: + if segment.words: + for w in segment.words: + actual_words.append({ + "word": w.word, + "start": w.start, + "end": w.end, + "confidence": w.confidence + }) + if actual_words: + logger.info(f"๐Ÿ”ค Extracted {len(actual_words)} words from segments (legacy)") if not actual_transcript_text: logger.warning(f"๐ŸŽค No transcript text found in version {version_id}") @@ -221,6 +248,16 @@ async def recognise_speakers_job( "processing_time_seconds": 0 } + if not actual_words: + logger.warning(f"๐ŸŽค No words found in version {version_id}") + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": "No word-level timing data available", + "processing_time_seconds": 0 + } + transcript_data = { "text": actual_transcript_text, "words": actual_words @@ -278,33 +315,61 @@ async def recognise_speakers_job( error_message = speaker_result.get("message", "Unknown error") logger.error(f"๐ŸŽค Speaker recognition service error: {error_type} - {error_message}") - # For connection failures, skip speaker recognition but allow downstream jobs to proceed - # Speaker recognition is optional - memory extraction and other jobs should still run + # Connection/timeout errors โ†’ skip gracefully (existing behavior) if error_type in ("connection_failed", "timeout", "client_error"): logger.warning( f"โš ๏ธ Speaker service unavailable ({error_type}), skipping speaker recognition. " f"Downstream jobs (memory, title/summary, events) will proceed normally." ) return { - "success": True, + "success": True, # Allow pipeline to continue "conversation_id": conversation_id, "version_id": version_id, "speaker_recognition_enabled": True, "speaker_service_unavailable": True, "identified_speakers": [], "skip_reason": f"Speaker service unavailable: {error_type}", + "error_type": error_type, "processing_time_seconds": time.time() - start_time } - # For other errors (e.g., processing errors), return error dict without failing - return { - "success": False, - "conversation_id": conversation_id, - "version_id": version_id, - "error": f"Speaker recognition failed: {error_type}", - "error_details": error_message, - "processing_time_seconds": time.time() - start_time - } + # Validation errors โ†’ fail job, don't retry + elif error_type == "validation_error": + logger.error(f"โŒ Speaker service validation error: {error_message}") + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": f"Validation error: {error_message}", + "error_type": error_type, + "retryable": False, # Don't retry validation errors + "processing_time_seconds": time.time() - start_time + } + + # Resource errors โ†’ fail job, can retry later + elif error_type == "resource_error": + logger.error(f"โŒ Speaker service resource error: {error_message}") + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": f"Resource error: {error_message}", + "error_type": error_type, + "retryable": True, # Can retry later when resources available + "processing_time_seconds": time.time() - start_time + } + + # Unknown errors โ†’ fail job + else: + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": f"Speaker recognition failed: {error_type}", + "error_details": error_message, + "error_type": error_type, + "processing_time_seconds": time.time() - start_time + } # Service worked but found no segments (legitimate empty result) if not speaker_result or "segments" not in speaker_result or not speaker_result["segments"]: @@ -342,13 +407,27 @@ async def recognise_speakers_job( continue speaker_name = seg.get("identified_as") or seg.get("speaker", "Unknown") + + # Extract words from speaker service response (already matched to this segment) + words_data = seg.get("words", []) + segment_words = [ + Conversation.Word( + word=w.get("word", ""), + start=w.get("start", 0.0), + end=w.get("end", 0.0), + confidence=w.get("confidence") + ) + for w in words_data + ] + updated_segments.append( Conversation.SpeakerSegment( start=seg.get("start", 0), end=seg.get("end", 0), text=text, speaker=speaker_name, - confidence=seg.get("confidence") + confidence=seg.get("confidence"), + words=segment_words # Use words from speaker service ) ) @@ -392,6 +471,30 @@ async def recognise_speakers_job( "processing_time_seconds": processing_time } + except asyncio.TimeoutError as e: + logger.error(f"โŒ Speaker recognition timeout: {e}") + + # Add timeout metadata to job + from rq import get_current_job + current_job = get_current_job() + if current_job: + current_job.meta.update({ + "error_type": "timeout", + "audio_duration": conversation.audio_total_duration if conversation else None, + "timeout_occurred_at": time.time() + }) + current_job.save_meta() + + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": "Speaker recognition timeout", + "error_type": "timeout", + "audio_duration": conversation.audio_total_duration if conversation else None, + "processing_time_seconds": time.time() - start_time + } + except Exception as speaker_error: logger.error(f"โŒ Speaker recognition failed: {speaker_error}") import traceback diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index 70935e1a..a3676383 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -252,14 +252,21 @@ async def transcribe_full_audio_job( 'word_count': len(words), } - logger.info(f"๐Ÿ” DEBUG: Dispatching transcript.batch event with user_id={user_id}, client_id={client_id}") + logger.info( + f"๐Ÿ”Œ DISPATCH: transcript.batch event " + f"(conversation={conversation_id[:12]}, words={len(words)})" + ) + plugin_results = await plugin_router.dispatch_event( event='transcript.batch', user_id=user_id, data=plugin_data, metadata={'client_id': client_id} ) - logger.info(f"๐Ÿ” DEBUG: Event dispatch returned {len(plugin_results) if plugin_results else 0} results") + + logger.info( + f"๐Ÿ”Œ RESULT: transcript.batch dispatched to {len(plugin_results) if plugin_results else 0} plugins" + ) if plugin_results: logger.info(f"โœ… Triggered {len(plugin_results)} transcript plugins in batch mode") @@ -350,78 +357,38 @@ async def transcribe_full_audio_job( # Calculate processing time (transcription only) processing_time = time.time() - start_time - # Convert segments to SpeakerSegment objects + # Transcription only provides text + words with timestamps + # Speaker service will create segments via diarization speaker_segments = [] - - if segments: - # Use provided segments - for seg in segments: - # Use identified_as if available (from speaker recognition), otherwise use speaker label - speaker_id = seg.get("identified_as") or seg.get("speaker", "Unknown") - # Convert speaker ID to string if it's an integer (some providers return int speaker IDs) - speaker_name = f"Speaker {speaker_id}" if isinstance(speaker_id, int) else speaker_id - - speaker_segments.append( - Conversation.SpeakerSegment( - start=seg.get("start", 0), - end=seg.get("end", 0), - text=seg.get("text", ""), - speaker=speaker_name, - confidence=seg.get("confidence"), - ) - ) - elif transcript_text: - # Fallback: If no segments but we have text, create a single segment from the full transcript - # This handles providers that don't support segmentation - # Calculate duration from words if available, otherwise estimate from audio - start_time_seg = 0.0 - end_time_seg = 0.0 - - if words: - # Use word timestamps if available - start_times = [w.get("start", 0) for w in words if "start" in w] - end_times = [w.get("end", 0) for w in words if "end" in w] - if start_times: - start_time_seg = min(start_times) - if end_times: - end_time_seg = max(end_times) - else: - # Estimate duration: assume ~150 words per minute, or use audio file duration - # For now, use a default duration if we can't calculate it - end_time_seg = len(transcript_text.split()) * 0.4 # Rough estimate: 0.4s per word - - speaker_segments.append( - Conversation.SpeakerSegment( - start=start_time_seg, - end=end_time_seg if end_time_seg > start_time_seg else start_time_seg + 1.0, - text=transcript_text, - speaker="Unknown", - confidence=None, - ) - ) - logger.info( - f"๐Ÿ“Š Created single segment from transcript text (no segments returned by provider)" - ) - - logger.info(f"๐Ÿ“Š Created {len(speaker_segments)} speaker segments") + logger.info(f"๐Ÿ“Š Transcription complete: {len(words)} words (segments will be created by speaker service)") # Add new transcript version provider_normalized = provider_name.lower() if provider_name else "unknown" - # Prepare metadata (transcription only - speaker recognition will add its own metadata) + # Convert words to Word objects + word_objects = [ + Conversation.Word( + word=w.get("word", ""), + start=w.get("start", 0.0), + end=w.get("end", 0.0), + confidence=w.get("confidence") + ) + for w in words + ] + + # Prepare metadata (transcription only - speaker service will add segments and metadata) metadata = { "trigger": trigger, "audio_file_size": len(wav_data), - "segment_count": len(segments), "word_count": len(words), - "words": words, # Store words for speaker recognition job to read - "speaker_recognition": {"enabled": False, "reason": "handled_by_separate_job"}, + "segments_created_by": "speaker_service", # Speaker service creates segments via diarization } conversation.add_transcript_version( version_id=version_id, transcript=transcript_text, - segments=speaker_segments, + words=word_objects, # Store at version level (not in metadata!) + segments=speaker_segments, # Empty - will be filled by speaker recognition provider=provider_normalized, # Now just a string, no enum constructor needed model=provider.name, processing_time_seconds=processing_time, @@ -612,7 +579,7 @@ async def stream_speech_detection_job( # Check if session has closed session_status = await redis_client.hget(session_key, "status") - session_closed = session_status and session_status.decode() in ["complete", "closed"] + session_closed = session_status and session_status.decode() in ["finalizing", "finished"] if session_closed and session_closed_at is None: # Session just closed - start grace period for final transcription diff --git a/backends/advanced/upload_files.py b/backends/advanced/upload_files.py index ead58e74..77a001f3 100755 --- a/backends/advanced/upload_files.py +++ b/backends/advanced/upload_files.py @@ -321,14 +321,14 @@ def poll_job_status(job_id: str, token: str, base_url: str, total_files: int) -> last_progress = progress last_current_file = current_file - # Check completion status - if status == "completed": + # Check completion status (RQ standard: "finished") + if status == "finished": elapsed = time.time() - start_time logger.info(f"๐ŸŽ‰ Job completed successfully in {elapsed:.0f}s!") - + # Show final file status summary files = job_status.get("files", []) - completed = len([f for f in files if f.get("status") == "completed"]) + completed = len([f for f in files if f.get("status") == "finished"]) failed = len([f for f in files if f.get("status") == "failed"]) skipped = len([f for f in files if f.get("status") == "skipped"]) diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index 9eb27f94..ca613702 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -63,12 +63,12 @@ export default function Conversations() { const [playingSegment, setPlayingSegment] = useState(null) // Format: "audioUuid-segmentIndex" const [audioCurrentTime, setAudioCurrentTime] = useState<{ [conversationId: string]: number }>({}) const audioRefs = useRef<{ [key: string]: HTMLAudioElement }>({}) - const segmentTimerRef = useRef(null) // Reprocessing state const [openDropdown, setOpenDropdown] = useState(null) const [reprocessingTranscript, setReprocessingTranscript] = useState>(new Set()) const [reprocessingMemory, setReprocessingMemory] = useState>(new Set()) + const [reprocessingSpeakers, setReprocessingSpeakers] = useState>(new Set()) const [deletingConversation, setDeletingConversation] = useState>(new Set()) // Transcript segment editing state @@ -247,6 +247,40 @@ export default function Conversations() { } } + const handleReprocessSpeakers = async (conversation: Conversation) => { + try { + if (!conversation.conversation_id) { + setError('Cannot reprocess speakers: Conversation ID is missing. This conversation may be from an older format.') + return + } + + setReprocessingSpeakers(prev => new Set(prev).add(conversation.conversation_id!)) + setOpenDropdown(null) + + const response = await conversationsApi.reprocessSpeakers( + conversation.conversation_id, + 'active' // Use active transcript version as source + ) + + if (response.status === 200) { + // Refresh conversations to show new version with updated speakers + await loadConversations() + } else { + setError(`Failed to start speaker reprocessing: ${response.data?.error || 'Unknown error'}`) + } + } catch (err: any) { + setError(`Error starting speaker reprocessing: ${err.message || 'Unknown error'}`) + } finally { + if (conversation.conversation_id) { + setReprocessingSpeakers(prev => { + const newSet = new Set(prev) + newSet.delete(conversation.conversation_id!) + return newSet + }) + } + } + } + const handleDeleteConversation = async (conversationId: string) => { try { const confirmed = window.confirm('Are you sure you want to delete this conversation? This action cannot be undone.') @@ -434,49 +468,41 @@ export default function Conversations() { const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any) => { const segmentId = `${conversationId}-${segmentIndex}`; - const audioKey = conversationId; // Use conversation ID as cache key // If this segment is already playing, pause it if (playingSegment === segmentId) { - const audio = audioRefs.current[audioKey]; + const audio = audioRefs.current[segmentId]; if (audio) { audio.pause(); } - if (segmentTimerRef.current) { - window.clearTimeout(segmentTimerRef.current); - segmentTimerRef.current = null; - } setPlayingSegment(null); return; } // Stop any currently playing segment if (playingSegment) { - // Stop all audio elements - Object.values(audioRefs.current).forEach(audio => { - audio.pause(); - }); - if (segmentTimerRef.current) { - window.clearTimeout(segmentTimerRef.current); - segmentTimerRef.current = null; + const currentAudio = audioRefs.current[playingSegment]; + if (currentAudio) { + currentAudio.pause(); } } - // Get or create audio element for this conversation - let audio = audioRefs.current[audioKey]; + // Get or create audio element for this specific segment + let audio = audioRefs.current[segmentId]; - // Check if we need to create a new audio element (none exists or previous had error) + // Create new audio element with segment-specific URL if (!audio || audio.error) { const token = localStorage.getItem(getStorageKey('token')) || ''; - const audioUrl = `${BACKEND_URL}/api/audio/get_audio/${conversationId}?token=${token}`; - console.log('Creating audio element with URL:', audioUrl); - console.log('Token present:', !!token, 'Token length:', token.length); + // Use chunks endpoint with time range for instant loading (only fetches needed chunks) + const audioUrl = `${BACKEND_URL}/api/audio/chunks/${conversationId}?start_time=${segment.start}&end_time=${segment.end}&token=${token}`; + console.log('Creating segment audio element with URL:', audioUrl); + console.log('Segment range:', segment.start, 'to', segment.end, '(duration:', segment.end - segment.start, 'seconds)'); audio = new Audio(audioUrl); - audioRefs.current[audioKey] = audio; + audioRefs.current[segmentId] = audio; // Add error listener for debugging audio.addEventListener('error', () => { - console.error('Audio element error:', audio.error?.code, audio.error?.message); + console.error('Audio segment error:', audio.error?.code, audio.error?.message); console.error('Audio src:', audio.src); }); @@ -486,19 +512,10 @@ export default function Conversations() { }); } - // Set the start time and play + // Play the segment (no need to seek since audio is already trimmed to exact range) console.log('Playing segment:', segment.start, 'to', segment.end); - audio.currentTime = segment.start; audio.play().then(() => { setPlayingSegment(segmentId); - - // Set a timer to stop at the segment end time - const duration = (segment.end - segment.start) * 1000; // Convert to milliseconds - segmentTimerRef.current = window.setTimeout(() => { - audio.pause(); - setPlayingSegment(null); - segmentTimerRef.current = null; - }, duration); }).catch(err => { console.error('Error playing audio segment:', err); setPlayingSegment(null); @@ -508,13 +525,10 @@ export default function Conversations() { // Cleanup audio on unmount useEffect(() => { return () => { - // Stop all audio and clear timers + // Stop all audio elements Object.values(audioRefs.current).forEach(audio => { audio.pause(); }); - if (segmentTimerRef.current) { - window.clearTimeout(segmentTimerRef.current); - } }; }, []) @@ -716,6 +730,22 @@ export default function Conversations() { (ID missing) )} +
- {(job.status === 'queued' || job.status === 'processing') && ( + {(job.status === 'queued' || job.status === 'started') && ( )} - {job.status === 'completed' && ( + {job.status === 'finished' && (