From 585f1999059b69a6fcee048f4cdb226a7a19c3c5 Mon Sep 17 00:00:00 2001 From: 0xrushi <6279035+0xrushi@users.noreply.github.com> Date: Fri, 9 Jan 2026 22:35:11 -0500 Subject: [PATCH 01/17] Add annotation feature and cron jobs for transcript corrections - Introduced a new `TranscriptAnnotation` model for managing transcript corrections. - Added `annotation_routes` for creating and retrieving annotations via API. - Implemented cron jobs to surface potential errors in transcripts and finetune a model based on accepted corrections. - Updated Docker Compose to include a cron service for running scheduled tasks. - Enhanced the web UI to support displaying and managing annotations, including accepting and rejecting suggestions. - Added tests for annotation model and integration flow to ensure functionality and reliability. --- backends/advanced/docker-compose.yml | 7 + .../src/advanced_omi_backend/app_factory.py | 4 +- .../advanced/src/advanced_omi_backend/cron.py | 155 +-- .../advanced_omi_backend/models/__init__.py | 3 +- .../advanced_omi_backend/models/annotation.py | 192 +--- .../routers/api_router.py | 2 + .../routers/modules/__init__.py | 2 + .../routers/modules/annotation_routes.py | 731 +++----------- .../memory/providers/vector_stores.py | 100 +- .../workers/annotation_jobs.py | 319 ++---- .../workers/memory_jobs.py | 24 +- .../tests/integration/test_annotation_flow.py | 108 ++ .../advanced/tests/test_annotation_models.py | 90 ++ .../webui/src/pages/Conversations.tsx | 920 ++++++++---------- backends/advanced/webui/src/services/api.ts | 11 + 15 files changed, 932 insertions(+), 1736 deletions(-) create mode 100644 backends/advanced/tests/integration/test_annotation_flow.py create mode 100644 backends/advanced/tests/test_annotation_models.py diff --git a/backends/advanced/docker-compose.yml b/backends/advanced/docker-compose.yml index 95cc4cab..2a286092 100644 --- a/backends/advanced/docker-compose.yml +++ b/backends/advanced/docker-compose.yml @@ -137,8 +137,13 @@ services: container_name: chronicle-annotation-cron env_file: - .env + volumes: + - ./src:/app/src + - ./data:/app/data + - ../../config:/app/config # Mount entire config directory (includes config.yml, defaults.yml, plugins.yml) environment: - MONGODB_URI=mongodb://mongo:27017 + - REDIS_URL=redis://redis:6379/0 - DEV_MODE=${DEV_MODE:-false} - OPENAI_API_KEY=${OPENAI_API_KEY} - LLM_PROVIDER=${LLM_PROVIDER:-openai} @@ -146,6 +151,8 @@ services: depends_on: mongo: condition: service_healthy + redis: + condition: service_healthy restart: unless-stopped profiles: - annotation # Optional profile - enable with: docker compose --profile annotation up diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index 763967f1..e979bd11 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -108,7 +108,7 @@ async def lifespan(app: FastAPI): try: from beanie import init_beanie - from advanced_omi_backend.models.annotation import Annotation + from advanced_omi_backend.models.annotation import Annotation, TranscriptAnnotation from advanced_omi_backend.models.audio_chunk import AudioChunkDocument from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.models.user import User @@ -116,7 +116,7 @@ async def lifespan(app: FastAPI): await init_beanie( database=config.db, - document_models=[User, Conversation, AudioChunkDocument, WaveformData, Annotation], + document_models=[User, Conversation, AudioChunkDocument, WaveformData, Annotation, TranscriptAnnotation], ) application_logger.info("Beanie initialized for all document models") except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/cron.py b/backends/advanced/src/advanced_omi_backend/cron.py index 161ceb31..953fa6d3 100644 --- a/backends/advanced/src/advanced_omi_backend/cron.py +++ b/backends/advanced/src/advanced_omi_backend/cron.py @@ -1,121 +1,72 @@ -""" -Annotation cron scheduler for AI-powered suggestion surfacing. - -This scheduler runs background jobs to: -1. Surface AI suggestions for potential transcript/memory errors (daily) -2. Fine-tune error detection models using user feedback (weekly) - -Configuration via environment variables: -- MONGODB_URI: MongoDB connection string -- DEV_MODE: When true, uses 1-minute intervals for testing - -Usage: - uv run python -m advanced_omi_backend.cron -""" - import asyncio import logging import os -from datetime import datetime, timezone - -from beanie import init_beanie -from motor.motor_asyncio import AsyncIOMotorClient - -from advanced_omi_backend.models.annotation import Annotation -from advanced_omi_backend.models.conversation import Conversation -from advanced_omi_backend.models.user import User -from advanced_omi_backend.workers.annotation_jobs import ( - finetune_hallucination_model, - surface_error_suggestions, -) +from datetime import datetime +import signal +import sys # Configure logging logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + stream=sys.stdout ) -logger = logging.getLogger(__name__) +logger = logging.getLogger("cron_scheduler") -# Configuration -MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") -DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true" +from advanced_omi_backend.workers.annotation_jobs import surface_error_suggestions, finetune_hallucination_model +from advanced_omi_backend.database import init_db -# Intervals (1 minute in dev, normal in production) -if DEV_MODE: - SUGGESTION_INTERVAL = 60 # 1 minute for dev testing - TRAINING_INTERVAL = 60 # 1 minute for dev testing - logger.info("๐Ÿ”ง DEV_MODE enabled - using 1-minute intervals for testing") -else: - SUGGESTION_INTERVAL = 24 * 60 * 60 # Daily - TRAINING_INTERVAL = 7 * 24 * 60 * 60 # Weekly - logger.info("๐Ÿ“… Production mode - using daily/weekly intervals") - - -async def init_db(): - """Initialize database connection""" - try: - client = AsyncIOMotorClient(MONGODB_URI) - await init_beanie( - database=client.chronicle, - document_models=[Annotation, Conversation, User], - ) - logger.info("โœ… Database connection initialized") - except Exception as e: - logger.error(f"โŒ Failed to initialize database: {e}") - raise +# Frequency configuration (in seconds) +SUGGESTION_INTERVAL = 24 * 60 * 60 # Daily +TRAINING_INTERVAL = 7 * 24 * 60 * 60 # Weekly +# For testing purposes, we can check more frequently if ENV var is set +if os.getenv("DEV_MODE", "false").lower() == "true": + SUGGESTION_INTERVAL = 60 # 1 minute + TRAINING_INTERVAL = 300 # 5 minutes async def run_scheduler(): - """Main scheduler loop""" + logger.info("Starting Cron Scheduler...") + + # Initialize DB connection await init_db() - logger.info("๐Ÿ• Annotation cron scheduler started") - logger.info(f" - Suggestion interval: {SUGGESTION_INTERVAL}s") - logger.info(f" - Training interval: {TRAINING_INTERVAL}s") - - last_suggestion_run = datetime.now(timezone.utc) - last_training_run = datetime.now(timezone.utc) - + + last_suggestion_run = datetime.min + last_training_run = datetime.min + while True: - try: - now = datetime.now(timezone.utc) - - # Daily: Surface AI suggestions - if (now - last_suggestion_run).total_seconds() >= SUGGESTION_INTERVAL: - logger.info(f"๐Ÿค– Running suggestion surfacing at {now}") - try: - await surface_error_suggestions() - last_suggestion_run = now - logger.info("โœ… Suggestion surfacing completed") - except Exception as e: - logger.error(f"โŒ Suggestion job failed: {e}", exc_info=True) - - # Weekly: Fine-tune model - if (now - last_training_run).total_seconds() >= TRAINING_INTERVAL: - logger.info(f"๐ŸŽ“ Running model fine-tuning at {now}") - try: - await finetune_hallucination_model() - last_training_run = now - logger.info("โœ… Model fine-tuning completed") - except Exception as e: - logger.error(f"โŒ Training job failed: {e}", exc_info=True) - - # Sleep for check interval - await asyncio.sleep(60) # Check every minute - - except KeyboardInterrupt: - logger.info("โ›” Scheduler stopped by user") - break - except Exception as e: - logger.error(f"โŒ Unexpected error in scheduler loop: {e}", exc_info=True) - # Continue running despite errors - await asyncio.sleep(60) - + now = datetime.utcnow() + + # Check Suggestions Job + if (now - last_suggestion_run).total_seconds() >= SUGGESTION_INTERVAL: + logger.info("Running scheduled job: surface_error_suggestions") + try: + await surface_error_suggestions() + last_suggestion_run = now + except Exception as e: + logger.error(f"Error in surface_error_suggestions: {e}", exc_info=True) + + # Check Training Job + if (now - last_training_run).total_seconds() >= TRAINING_INTERVAL: + logger.info("Running scheduled job: finetune_hallucination_model") + try: + await finetune_hallucination_model() + last_training_run = now + except Exception as e: + logger.error(f"Error in finetune_hallucination_model: {e}", exc_info=True) + + # Sleep for a bit before next check (e.g. 1 minute) + await asyncio.sleep(60) + +def handle_shutdown(signum, frame): + logger.info("Shutting down Cron Scheduler...") + sys.exit(0) if __name__ == "__main__": - logger.info("๐Ÿš€ Starting annotation cron scheduler...") + signal.signal(signal.SIGTERM, handle_shutdown) + signal.signal(signal.SIGINT, handle_shutdown) + try: asyncio.run(run_scheduler()) except KeyboardInterrupt: - logger.info("๐Ÿ‘‹ Annotation cron scheduler stopped") - except Exception as e: - logger.error(f"๐Ÿ’ฅ Fatal error: {e}", exc_info=True) - exit(1) + pass diff --git a/backends/advanced/src/advanced_omi_backend/models/__init__.py b/backends/advanced/src/advanced_omi_backend/models/__init__.py index a19fa0db..38d1a230 100644 --- a/backends/advanced/src/advanced_omi_backend/models/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/models/__init__.py @@ -7,4 +7,5 @@ # Models can be imported directly from their files # e.g. from .job import TranscriptionJob -# e.g. from .conversation import Conversation, create_conversation \ No newline at end of file +# e.g. from .conversation import Conversation, create_conversation +from .annotation import TranscriptAnnotation \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/models/annotation.py b/backends/advanced/src/advanced_omi_backend/models/annotation.py index ac8ceefe..eaeb51ed 100644 --- a/backends/advanced/src/advanced_omi_backend/models/annotation.py +++ b/backends/advanced/src/advanced_omi_backend/models/annotation.py @@ -1,175 +1,39 @@ -""" -Unified annotation system for Chronicle. - -Supports annotations for memories, transcripts, and future content types. -Enables both user edits and AI-powered suggestions. -""" - -import uuid -from datetime import datetime, timezone -from enum import Enum -from typing import Optional - +from datetime import datetime +from typing import Optional, List +from pydantic import Field from beanie import Document, Indexed -from pydantic import BaseModel, Field - - -class AnnotationType(str, Enum): - """Type of content being annotated.""" - MEMORY = "memory" - TRANSCRIPT = "transcript" - DIARIZATION = "diarization" # Speaker identification corrections - - -class AnnotationSource(str, Enum): - """Origin of the annotation.""" - USER = "user" # User-created edit - MODEL_SUGGESTION = "model_suggestion" # AI-generated suggestion - - -class AnnotationStatus(str, Enum): - """Lifecycle status of annotation.""" - PENDING = "pending" # Waiting for user review (suggestions) - ACCEPTED = "accepted" # Applied to content - REJECTED = "rejected" # User dismissed suggestion - - -class Annotation(Document): - """ - Unified annotation model for all content types. +from enum import Enum +import uuid - Supports both user edits and AI-powered suggestions across - memories, transcripts, and future content types (chat, action items, etc.). +class TranscriptAnnotation(Document): + """Model for transcript annotations/corrections.""" + + class AnnotationStatus(str, Enum): + PENDING = "pending" + ACCEPTED = "accepted" + REJECTED = "rejected" - Design: Polymorphic model with type-specific fields based on annotation_type. - """ + class AnnotationSource(str, Enum): + USER = "user" + MODEL_SUGGESTION = "model_suggestion" - # Identity id: str = Field(default_factory=lambda: str(uuid.uuid4())) - - # Classification - annotation_type: AnnotationType + conversation_id: Indexed(str) + segment_index: int + original_text: str + corrected_text: str user_id: Indexed(str) + + status: AnnotationStatus = Field(default=AnnotationStatus.ACCEPTED) # User edits are accepted by default source: AnnotationSource = Field(default=AnnotationSource.USER) - status: AnnotationStatus = Field(default=AnnotationStatus.ACCEPTED) - - # Content - original_text: str = "" # Text before correction (not used for diarization) - corrected_text: str = "" # Text after correction (not used for diarization) - - # Polymorphic References (based on annotation_type) - # For MEMORY annotations: - memory_id: Optional[str] = None - - # For TRANSCRIPT annotations: - conversation_id: Optional[str] = None - segment_index: Optional[int] = None - - # For DIARIZATION annotations: - original_speaker: Optional[str] = None # Speaker label before correction - corrected_speaker: Optional[str] = None # Speaker label after correction - segment_start_time: Optional[float] = None # Time offset for reference - - # Processed tracking (applies to ALL annotation types) - processed: bool = Field(default=False) # Whether annotation has been applied/sent to training - processed_at: Optional[datetime] = None # When annotation was processed - processed_by: Optional[str] = None # What processed it (manual, cron, apply, training, etc.) - - # Timestamps (Python 3.12+ compatible) - created_at: datetime = Field( - default_factory=lambda: datetime.now(timezone.utc) - ) - updated_at: datetime = Field( - default_factory=lambda: datetime.now(timezone.utc) - ) + + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) class Settings: - name = "annotations" - # Create indexes on commonly queried fields - # Note: Enum fields and Optional fields don't use Indexed() wrapper + name = "transcript_annotations" indexes = [ - "annotation_type", # Query by type (memory vs transcript vs diarization) - "user_id", # User-scoped queries - "status", # Filter by status (pending/accepted/rejected) - "memory_id", # Lookup annotations for specific memory - "conversation_id", # Lookup annotations for specific conversation - "processed", # Query unprocessed annotations + "conversation_id", + "user_id", + "status" ] - - def is_memory_annotation(self) -> bool: - """Check if this is a memory annotation.""" - return self.annotation_type == AnnotationType.MEMORY - - def is_transcript_annotation(self) -> bool: - """Check if this is a transcript annotation.""" - return self.annotation_type == AnnotationType.TRANSCRIPT - - def is_diarization_annotation(self) -> bool: - """Check if this is a diarization annotation.""" - return self.annotation_type == AnnotationType.DIARIZATION - - def is_pending_suggestion(self) -> bool: - """Check if this is a pending AI suggestion.""" - return ( - self.source == AnnotationSource.MODEL_SUGGESTION - and self.status == AnnotationStatus.PENDING - ) - - -# Pydantic Request/Response Models - - -class AnnotationCreateBase(BaseModel): - """Base model for annotation creation.""" - original_text: str = "" # Optional for diarization - corrected_text: str = "" # Optional for diarization - status: AnnotationStatus = AnnotationStatus.ACCEPTED - - -class MemoryAnnotationCreate(AnnotationCreateBase): - """Create memory annotation request.""" - memory_id: str - original_text: str # Required for memory annotations - corrected_text: str # Required for memory annotations - - -class TranscriptAnnotationCreate(AnnotationCreateBase): - """Create transcript annotation request.""" - conversation_id: str - segment_index: int - original_text: str # Required for transcript annotations - corrected_text: str # Required for transcript annotations - - -class DiarizationAnnotationCreate(BaseModel): - """Create diarization annotation request.""" - conversation_id: str - segment_index: int - original_speaker: str - corrected_speaker: str - segment_start_time: Optional[float] = None - status: AnnotationStatus = AnnotationStatus.ACCEPTED - - -class AnnotationResponse(BaseModel): - """Annotation response for API.""" - id: str - annotation_type: AnnotationType - user_id: str - memory_id: Optional[str] = None - conversation_id: Optional[str] = None - segment_index: Optional[int] = None - original_text: str = "" - corrected_text: str = "" - original_speaker: Optional[str] = None - corrected_speaker: Optional[str] = None - segment_start_time: Optional[float] = None - processed: bool = False - processed_at: Optional[datetime] = None - processed_by: Optional[str] = None - status: AnnotationStatus - source: AnnotationSource - created_at: datetime - - class Config: - from_attributes = True # Pydantic v2 compatibility diff --git a/backends/advanced/src/advanced_omi_backend/routers/api_router.py b/backends/advanced/src/advanced_omi_backend/routers/api_router.py index e4c89531..57a81578 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/api_router.py +++ b/backends/advanced/src/advanced_omi_backend/routers/api_router.py @@ -24,6 +24,7 @@ queue_router, system_router, user_router, + annotation_router, ) from .modules.health_routes import router as health_router @@ -47,6 +48,7 @@ router.include_router(obsidian_router) router.include_router(system_router) router.include_router(queue_router) +router.include_router(annotation_router, prefix="/annotations", tags=["annotations"]) router.include_router(health_router) # Also include under /api for frontend compatibility # Conditionally include test routes (only in test environments) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py index 501377fc..f15edc5e 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py @@ -33,6 +33,7 @@ from .system_routes import router as system_router from .user_routes import router as user_router from .websocket_routes import router as websocket_router +from .annotation_routes import router as annotation_router __all__ = [ "admin_router", @@ -50,4 +51,5 @@ "system_router", "user_router", "websocket_router", + "annotation_router", ] diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py index f85a99ed..49a48f3f 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py @@ -1,625 +1,118 @@ -""" -Annotation routes for Chronicle API. +from fastapi import APIRouter, HTTPException, Depends +from typing import List, Optional +from pydantic import BaseModel +from datetime import datetime -Handles annotation CRUD operations for memories and transcripts. -Supports both user edits and AI-powered suggestions. -""" - -import logging -from datetime import datetime, timezone -from typing import List - -from fastapi import APIRouter, Depends, HTTPException -from fastapi.responses import JSONResponse - -from advanced_omi_backend.auth import current_active_user -from advanced_omi_backend.models.annotation import ( - Annotation, - AnnotationResponse, - AnnotationStatus, - AnnotationType, - DiarizationAnnotationCreate, - MemoryAnnotationCreate, - TranscriptAnnotationCreate, -) +from advanced_omi_backend.models.annotation import TranscriptAnnotation from advanced_omi_backend.models.conversation import Conversation -from advanced_omi_backend.services.memory import get_memory_service -from advanced_omi_backend.users import User - -logger = logging.getLogger(__name__) - -router = APIRouter(prefix="/annotations", tags=["annotations"]) - - -@router.post("/memory", response_model=AnnotationResponse) -async def create_memory_annotation( - annotation_data: MemoryAnnotationCreate, - current_user: User = Depends(current_active_user), -): - """ - Create annotation for memory edit. - - - Validates user owns memory - - Creates annotation record - - Updates memory content in vector store - - Re-embeds if content changed - """ - try: - memory_service = get_memory_service() - - # Verify memory ownership - try: - memory = await memory_service.get_memory( - annotation_data.memory_id, current_user.user_id - ) - if not memory: - raise HTTPException(status_code=404, detail="Memory not found") - except Exception as e: - logger.error(f"Error fetching memory: {e}") - raise HTTPException(status_code=404, detail="Memory not found") - - # Create annotation - annotation = Annotation( - annotation_type=AnnotationType.MEMORY, - user_id=current_user.user_id, - memory_id=annotation_data.memory_id, - original_text=annotation_data.original_text, - corrected_text=annotation_data.corrected_text, - status=annotation_data.status, - ) - await annotation.save() - logger.info( - f"Created memory annotation {annotation.id} for memory {annotation_data.memory_id}" - ) - - # Update memory content if accepted - if annotation.status == AnnotationStatus.ACCEPTED: - try: - await memory_service.update_memory( - memory_id=annotation_data.memory_id, - content=annotation_data.corrected_text, - user_id=current_user.user_id, - ) - logger.info(f"Updated memory {annotation_data.memory_id} with corrected text") - except Exception as e: - logger.error(f"Error updating memory: {e}") - # Annotation is saved, but memory update failed - log but don't fail the request - logger.warning(f"Memory annotation {annotation.id} saved but memory update failed") - - return AnnotationResponse.model_validate(annotation) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error creating memory annotation: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to create memory annotation: {str(e)}", - ) - - -@router.post("/transcript", response_model=AnnotationResponse) -async def create_transcript_annotation( - annotation_data: TranscriptAnnotationCreate, - current_user: User = Depends(current_active_user), -): - """ - Create annotation for transcript segment edit. - - - Validates user owns conversation - - Creates annotation record (NOT applied to transcript yet) - - Annotation is marked as unprocessed (processed=False) - - Visual indication in UI (pending badge) - - Use unified apply endpoint to apply all annotations together - """ - try: - # Verify conversation ownership - conversation = await Conversation.find_one( - Conversation.conversation_id == annotation_data.conversation_id, - Conversation.user_id == current_user.user_id, - ) - if not conversation: - raise HTTPException(status_code=404, detail="Conversation not found") - - # Validate segment index - active_transcript = conversation.active_transcript - if not active_transcript or annotation_data.segment_index >= len( - active_transcript.segments - ): - raise HTTPException(status_code=400, detail="Invalid segment index") - - segment = active_transcript.segments[annotation_data.segment_index] - - # Create annotation (NOT applied yet) - annotation = Annotation( - annotation_type=AnnotationType.TRANSCRIPT, - user_id=current_user.user_id, - conversation_id=annotation_data.conversation_id, - segment_index=annotation_data.segment_index, - original_text=segment.text, # Use current segment text - corrected_text=annotation_data.corrected_text, - status=AnnotationStatus.PENDING, # Changed from ACCEPTED - processed=False, # Not applied yet - ) - await annotation.save() - logger.info( - f"Created transcript annotation {annotation.id} for conversation {annotation_data.conversation_id} segment {annotation_data.segment_index}" - ) - - # Do NOT modify transcript immediately - # Do NOT trigger memory reprocessing yet - # User must click "Apply Changes" button to apply all annotations together - - return AnnotationResponse.model_validate(annotation) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error creating transcript annotation: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to create transcript annotation: {str(e)}", - ) - - -@router.get("/memory/{memory_id}", response_model=List[AnnotationResponse]) -async def get_memory_annotations( - memory_id: str, - current_user: User = Depends(current_active_user), -): - """Get all annotations for a memory.""" - try: - annotations = await Annotation.find( - Annotation.annotation_type == AnnotationType.MEMORY, - Annotation.memory_id == memory_id, - Annotation.user_id == current_user.user_id, - ).to_list() - - return [AnnotationResponse.model_validate(a) for a in annotations] - - except Exception as e: - logger.error(f"Error fetching memory annotations: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to fetch memory annotations: {str(e)}", - ) - - -@router.get("/transcript/{conversation_id}", response_model=List[AnnotationResponse]) -async def get_transcript_annotations( - conversation_id: str, - current_user: User = Depends(current_active_user), -): - """Get all annotations for a conversation's transcript.""" - try: - annotations = await Annotation.find( - Annotation.annotation_type == AnnotationType.TRANSCRIPT, - Annotation.conversation_id == conversation_id, - Annotation.user_id == current_user.user_id, - ).to_list() - - return [AnnotationResponse.model_validate(a) for a in annotations] - - except Exception as e: - logger.error(f"Error fetching transcript annotations: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to fetch transcript annotations: {str(e)}", - ) - - -@router.patch("/{annotation_id}/status") -async def update_annotation_status( - annotation_id: str, - status: AnnotationStatus, - current_user: User = Depends(current_active_user), -): - """ - Accept or reject AI-generated suggestions. - - Used for pending model suggestions in the UI. - """ - try: - annotation = await Annotation.find_one( - Annotation.id == annotation_id, - Annotation.user_id == current_user.user_id, - ) - if not annotation: - raise HTTPException(status_code=404, detail="Annotation not found") - - old_status = annotation.status - annotation.status = status - annotation.updated_at = datetime.now(timezone.utc) - - # If accepting a pending suggestion, apply the correction - if status == AnnotationStatus.ACCEPTED and old_status == AnnotationStatus.PENDING: - if annotation.is_memory_annotation(): - # Update memory - try: - memory_service = get_memory_service() - await memory_service.update_memory( - memory_id=annotation.memory_id, - content=annotation.corrected_text, - user_id=current_user.user_id, - ) - logger.info(f"Applied suggestion to memory {annotation.memory_id}") - except Exception as e: - logger.error(f"Error applying memory suggestion: {e}") - # Don't fail the status update if memory update fails - elif annotation.is_transcript_annotation(): - # Update transcript segment - try: - conversation = await Conversation.find_one( - Conversation.conversation_id == annotation.conversation_id, - Conversation.user_id == annotation.user_id, - ) - if conversation: - transcript = conversation.active_transcript - if transcript and annotation.segment_index < len(transcript.segments): - transcript.segments[annotation.segment_index].text = ( - annotation.corrected_text - ) - await conversation.save() - logger.info( - f"Applied suggestion to transcript segment {annotation.segment_index}" - ) - except Exception as e: - logger.error(f"Error applying transcript suggestion: {e}") - # Don't fail the status update if segment update fails - - await annotation.save() - logger.info(f"Updated annotation {annotation_id} status to {status}") - - return {"status": "updated", "annotation_id": annotation_id, "new_status": status} - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating annotation status: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to update annotation status: {str(e)}", - ) - - -# === Diarization Annotation Routes === - - -@router.post("/diarization", response_model=AnnotationResponse) -async def create_diarization_annotation( - annotation_data: DiarizationAnnotationCreate, - current_user: User = Depends(current_active_user), -): - """ - Create annotation for speaker identification correction. - - - Validates user owns conversation - - Creates annotation record (NOT applied to transcript yet) - - Annotation is marked as unprocessed (processed=False) - - Visual indication in UI (strikethrough + corrected name) - """ - try: - # Verify conversation ownership - conversation = await Conversation.find_one( - Conversation.conversation_id == annotation_data.conversation_id, - Conversation.user_id == current_user.user_id, - ) - if not conversation: - raise HTTPException(status_code=404, detail="Conversation not found") - - # Validate segment index - active_transcript = conversation.active_transcript - if not active_transcript or annotation_data.segment_index >= len( - active_transcript.segments - ): - raise HTTPException(status_code=400, detail="Invalid segment index") - - # Create annotation (NOT applied yet) - annotation = Annotation( - annotation_type=AnnotationType.DIARIZATION, - user_id=current_user.user_id, - conversation_id=annotation_data.conversation_id, - segment_index=annotation_data.segment_index, - original_speaker=annotation_data.original_speaker, - corrected_speaker=annotation_data.corrected_speaker, - segment_start_time=annotation_data.segment_start_time, - original_text="", # Not used for diarization - corrected_text="", # Not used for diarization - status=annotation_data.status, - processed=False, # Not applied or sent to training yet - ) - await annotation.save() - logger.info( - f"Created diarization annotation {annotation.id} for conversation {annotation_data.conversation_id} segment {annotation_data.segment_index}" - ) - - return AnnotationResponse.model_validate(annotation) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error creating diarization annotation: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to create diarization annotation: {str(e)}", - ) - - -@router.get("/diarization/{conversation_id}", response_model=List[AnnotationResponse]) -async def get_diarization_annotations( - conversation_id: str, - current_user: User = Depends(current_active_user), -): - """Get all diarization annotations for a conversation.""" - try: - annotations = await Annotation.find( - Annotation.annotation_type == AnnotationType.DIARIZATION, - Annotation.conversation_id == conversation_id, - Annotation.user_id == current_user.user_id, - ).to_list() - - return [AnnotationResponse.model_validate(a) for a in annotations] - - except Exception as e: - logger.error(f"Error fetching diarization annotations: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to fetch diarization annotations: {str(e)}", - ) - - -@router.post("/diarization/{conversation_id}/apply") -async def apply_diarization_annotations( - conversation_id: str, - current_user: User = Depends(current_active_user), +from advanced_omi_backend.auth import current_active_user +from advanced_omi_backend.models.user import User +from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing +from advanced_omi_backend.models.job import JobPriority + +router = APIRouter() + +class AnnotationCreate(BaseModel): + conversation_id: str + segment_index: int + original_text: str + corrected_text: str + status: Optional[TranscriptAnnotation.AnnotationStatus] = TranscriptAnnotation.AnnotationStatus.ACCEPTED + +class AnnotationResponse(BaseModel): + id: str + conversation_id: str + segment_index: int + original_text: str + corrected_text: str + status: str + created_at: datetime + +@router.post("/", response_model=AnnotationResponse) +async def create_annotation( + annotation: AnnotationCreate, + current_user: User = Depends(current_active_user) ): - """ - Apply pending diarization annotations to create new transcript version. - - - Finds all unprocessed diarization annotations for conversation - - Creates NEW transcript version with corrected speaker labels - - Marks annotations as processed (processed=True, processed_by="apply") - - Chains memory reprocessing since speaker changes affect meaning - - Returns job status with new version_id - """ - try: - # Verify conversation ownership - conversation = await Conversation.find_one( - Conversation.conversation_id == conversation_id, - Conversation.user_id == current_user.user_id, - ) - if not conversation: - raise HTTPException(status_code=404, detail="Conversation not found") - - # Get unprocessed diarization annotations - annotations = await Annotation.find( - Annotation.annotation_type == AnnotationType.DIARIZATION, - Annotation.conversation_id == conversation_id, - Annotation.user_id == current_user.user_id, - Annotation.processed == False, # Only unprocessed - ).to_list() - - if not annotations: - return JSONResponse( - content={"message": "No pending annotations to apply", "applied_count": 0} - ) - - # Get active transcript version - active_transcript = conversation.active_transcript - if not active_transcript: - raise HTTPException(status_code=404, detail="No active transcript found") - - # Create NEW transcript version with corrected speakers - import uuid - - new_version_id = str(uuid.uuid4()) - - # Copy segments and apply corrections - corrected_segments = [] - for segment_idx, segment in enumerate(active_transcript.segments): - # Find annotation for this segment index - annotation_for_segment = next( - (a for a in annotations if a.segment_index == segment_idx), None + # Verify conversation exists and belongs to user + conversation = await Conversation.find_one({ + "conversation_id": annotation.conversation_id, + "user_id": str(current_user.id) + }) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Create annotation + new_annotation = TranscriptAnnotation( + conversation_id=annotation.conversation_id, + segment_index=annotation.segment_index, + original_text=annotation.original_text, + corrected_text=annotation.corrected_text, + user_id=str(current_user.id), + status=annotation.status, + source=TranscriptAnnotation.AnnotationSource.USER + ) + + await new_annotation.insert() + + # Update the actual transcript in the conversation + # We need to find the active transcript version and update the segment + if conversation.active_transcript: + version = conversation.active_transcript + if 0 <= annotation.segment_index < len(version.segments): + version.segments[annotation.segment_index].text = annotation.corrected_text + + # Save the conversation with the updated segment + # We need to update the specific version in the list + for i, v in enumerate(conversation.transcript_versions): + if v.version_id == version.version_id: + conversation.transcript_versions[i] = version + break + + await conversation.save() + + # Trigger memory reprocessing + enqueue_memory_processing( + client_id=conversation.client_id, + user_id=str(current_user.id), + user_email=current_user.email, + conversation_id=conversation.conversation_id, + priority=JobPriority.NORMAL ) - - if annotation_for_segment: - # Apply correction - corrected_segment = segment.model_copy() - corrected_segment.speaker = annotation_for_segment.corrected_speaker - corrected_segments.append(corrected_segment) - else: - # No correction, keep original - corrected_segments.append(segment.model_copy()) - - # Add new version - conversation.add_transcript_version( - version_id=new_version_id, - transcript=active_transcript.transcript, # Same transcript text - words=active_transcript.words, # Same word timings - segments=corrected_segments, # Corrected speaker labels - provider=active_transcript.provider, - model=active_transcript.model, - processing_time_seconds=None, - metadata={ - "reprocessing_type": "diarization_annotations", - "source_version_id": active_transcript.version_id, - "trigger": "manual_annotation_apply", - "applied_annotation_count": len(annotations), - }, - set_as_active=True, - ) - - await conversation.save() - logger.info( - f"Created new transcript version {new_version_id} with {len(annotations)} diarization corrections" - ) - - # Mark annotations as processed - for annotation in annotations: - annotation.processed = True - annotation.processed_at = datetime.now(timezone.utc) - annotation.processed_by = "apply" - await annotation.save() - - # Chain memory reprocessing - from advanced_omi_backend.models.job import JobPriority - from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing - - enqueue_memory_processing( - conversation_id=conversation_id, - priority=JobPriority.NORMAL, - ) - - return JSONResponse( - content={ - "message": "Diarization annotations applied", - "version_id": new_version_id, - "applied_count": len(annotations), - "status": "success", - } - ) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error applying diarization annotations: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to apply diarization annotations: {str(e)}", - ) - - -@router.post("/{conversation_id}/apply") -async def apply_all_annotations( + else: + raise HTTPException(status_code=400, detail="Segment index out of range") + else: + raise HTTPException(status_code=400, detail="No active transcript found") + + return AnnotationResponse( + id=str(new_annotation.id), + conversation_id=new_annotation.conversation_id, + segment_index=new_annotation.segment_index, + original_text=new_annotation.original_text, + corrected_text=new_annotation.corrected_text, + status=new_annotation.status, + created_at=new_annotation.created_at + ) + +@router.get("/{conversation_id}", response_model=List[AnnotationResponse]) +async def get_annotations( conversation_id: str, - current_user: User = Depends(current_active_user), + current_user: User = Depends(current_active_user) ): - """ - Apply all pending annotations (diarization + transcript) to create new version. - - - Finds all unprocessed annotations (both DIARIZATION and TRANSCRIPT types) - - Creates ONE new transcript version with all changes applied - - Marks all annotations as processed - - Triggers memory reprocessing once - """ - try: - # Verify conversation ownership - conversation = await Conversation.find_one( - Conversation.conversation_id == conversation_id, - Conversation.user_id == current_user.user_id, - ) - if not conversation: - raise HTTPException(status_code=404, detail="Conversation not found") - - # Get ALL unprocessed annotations (both types) - annotations = await Annotation.find( - Annotation.conversation_id == conversation_id, - Annotation.user_id == current_user.user_id, - Annotation.processed == False, - ).to_list() - - if not annotations: - return JSONResponse( - content={ - "message": "No pending annotations to apply", - "diarization_count": 0, - "transcript_count": 0, - } - ) - - # Separate by type - diarization_annotations = [ - a for a in annotations if a.annotation_type == AnnotationType.DIARIZATION - ] - transcript_annotations = [ - a for a in annotations if a.annotation_type == AnnotationType.TRANSCRIPT - ] - - # Get active transcript - active_transcript = conversation.active_transcript - if not active_transcript: - raise HTTPException(status_code=404, detail="No active transcript found") - - # Create new version with ALL corrections applied - import uuid - - new_version_id = str(uuid.uuid4()) - corrected_segments = [] - - for segment_idx, segment in enumerate(active_transcript.segments): - corrected_segment = segment.model_copy() - - # Apply diarization correction (if exists) - diar_annotation = next( - (a for a in diarization_annotations if a.segment_index == segment_idx), None - ) - if diar_annotation: - corrected_segment.speaker = diar_annotation.corrected_speaker - - # Apply transcript correction (if exists) - transcript_annotation = next( - (a for a in transcript_annotations if a.segment_index == segment_idx), None - ) - if transcript_annotation: - corrected_segment.text = transcript_annotation.corrected_text - - corrected_segments.append(corrected_segment) - - # Add new version - conversation.add_transcript_version( - version_id=new_version_id, - transcript=active_transcript.transcript, - words=active_transcript.words, # Preserved (may be misaligned for text edits) - segments=corrected_segments, - provider=active_transcript.provider, - model=active_transcript.model, - metadata={ - "reprocessing_type": "unified_annotations", - "source_version_id": active_transcript.version_id, - "trigger": "manual_annotation_apply", - "diarization_count": len(diarization_annotations), - "transcript_count": len(transcript_annotations), - }, - set_as_active=True, - ) - - await conversation.save() - logger.info( - f"Applied {len(annotations)} annotations (diarization: {len(diarization_annotations)}, transcript: {len(transcript_annotations)})" - ) - - # Mark all annotations as processed - for annotation in annotations: - annotation.processed = True - annotation.processed_at = datetime.now(timezone.utc) - annotation.processed_by = "apply" - annotation.status = AnnotationStatus.ACCEPTED - await annotation.save() - - # Trigger memory reprocessing (once for all changes) - from advanced_omi_backend.models.job import JobPriority - from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing - - enqueue_memory_processing( - conversation_id=conversation_id, - priority=JobPriority.NORMAL, - ) - - return JSONResponse( - content={ - "message": f"Applied {len(diarization_annotations)} diarization and {len(transcript_annotations)} transcript annotations", - "version_id": new_version_id, - "diarization_count": len(diarization_annotations), - "transcript_count": len(transcript_annotations), - "status": "success", - } - ) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error applying annotations: {e}", exc_info=True) - raise HTTPException( - status_code=500, - detail=f"Failed to apply annotations: {str(e)}", - ) + annotations = await TranscriptAnnotation.find({ + "conversation_id": conversation_id, + "user_id": str(current_user.id) + }).to_list() + + return [ + AnnotationResponse( + id=str(a.id), + conversation_id=a.conversation_id, + segment_index=a.segment_index, + original_text=a.original_text, + corrected_text=a.corrected_text, + status=a.status, + created_at=a.created_at + ) + for a in annotations + ] diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py b/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py index 9fed0126..13bb781c 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py @@ -123,28 +123,26 @@ async def add_memories(self, memories: List[MemoryEntry]) -> List[str]: points = [] for memory in memories: if memory.embedding: - current_time = str(int(time.time())) point = PointStruct( id=memory.id, vector=memory.embedding, payload={ "content": memory.content, "metadata": memory.metadata, - "created_at": memory.created_at or current_time, - "updated_at": memory.updated_at or current_time + "created_at": memory.created_at or str(int(time.time())) } ) points.append(point) - + if points: await self.client.upsert( collection_name=self.collection_name, points=points ) return [str(point.id) for point in points] - + return [] - + except Exception as e: memory_logger.error(f"Qdrant add memories failed: {e}") return [] @@ -177,23 +175,24 @@ async def search_memories(self, query_embedding: List[float], user_id: str, limi "query_filter": search_filter, "limit": limit } - + if score_threshold > 0.0: search_params["score_threshold"] = score_threshold memory_logger.debug(f"Using similarity threshold: {score_threshold}") - + + # Use query_points instead of search (AsyncQdrantClient v1.10+ compat) response = await self.client.query_points(**search_params) - + results = response.points + memories = [] - for result in response.points: + for result in results: memory = MemoryEntry( id=str(result.id), content=result.payload.get("content", ""), metadata=result.payload.get("metadata", {}), # Qdrant returns similarity scores directly (higher = more similar) score=result.score if result.score is not None else None, - created_at=result.payload.get("created_at"), - updated_at=result.payload.get("updated_at") + created_at=result.payload.get("created_at") ) memories.append(memory) # Log similarity scores for debugging @@ -233,11 +232,10 @@ async def get_memories(self, user_id: str, limit: int) -> List[MemoryEntry]: id=str(point.id), content=point.payload.get("content", ""), metadata=point.payload.get("metadata", {}), - created_at=point.payload.get("created_at"), - updated_at=point.payload.get("updated_at") + created_at=point.payload.get("created_at") ) memories.append(memory) - + return memories except Exception as e: @@ -360,91 +358,29 @@ async def update_memory( async def count_memories(self, user_id: str) -> int: """Count total number of memories for a user in Qdrant using native count API.""" try: - + search_filter = Filter( must=[ FieldCondition( - key="metadata.user_id", + key="metadata.user_id", match=MatchValue(value=user_id) ) ] ) - + # Use Qdrant's native count API (documented in qdrant/qdrant/docs) # Count operation: CountPoints -> CountResponse with count result result = await self.client.count( collection_name=self.collection_name, count_filter=search_filter ) - + return result.count - + except Exception as e: memory_logger.error(f"Qdrant count memories failed: {e}") return 0 - async def get_memory(self, memory_id: str, user_id: Optional[str] = None) -> Optional[MemoryEntry]: - """Get a specific memory by ID from Qdrant. - - Args: - memory_id: Unique identifier of the memory to retrieve - user_id: Optional user ID for validation (not used in Qdrant filtering) - - Returns: - MemoryEntry object if found, None otherwise - """ - try: - # Convert memory_id to proper format for Qdrant - import uuid - try: - # Try to parse as UUID first - uuid.UUID(memory_id) - point_id = memory_id - except ValueError: - # If not a UUID, try as integer - try: - point_id = int(memory_id) - except ValueError: - # If neither UUID nor integer, use it as-is - point_id = memory_id - - # Retrieve the point by ID - points = await self.client.retrieve( - collection_name=self.collection_name, - ids=[point_id], - with_payload=True, - with_vectors=False - ) - - if not points: - memory_logger.debug(f"Memory not found: {memory_id}") - return None - - point = points[0] - - # If user_id is provided, validate ownership - if user_id: - point_user_id = point.payload.get("metadata", {}).get("user_id") - if point_user_id != user_id: - memory_logger.warning(f"Memory {memory_id} does not belong to user {user_id}") - return None - - # Convert to MemoryEntry - memory = MemoryEntry( - id=str(point.id), - content=point.payload.get("content", ""), - metadata=point.payload.get("metadata", {}), - created_at=point.payload.get("created_at"), - updated_at=point.payload.get("updated_at") - ) - - memory_logger.debug(f"Retrieved memory {memory_id}") - return memory - - except Exception as e: - memory_logger.error(f"Qdrant get memory failed for {memory_id}: {e}") - return None - diff --git a/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py index 3681ab5f..42140d3d 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py @@ -1,249 +1,98 @@ -""" -Background jobs for annotation-based AI suggestions. - -These jobs run periodically via the cron scheduler to: -1. Surface potential errors in transcripts and memories for user review -2. Fine-tune error detection models using accepted/rejected annotations - -TODO: Implement actual LLM-based error detection and model training logic. -""" - import logging -from datetime import datetime, timedelta, timezone +import random +from datetime import datetime, timedelta from typing import List -from advanced_omi_backend.models.annotation import ( - Annotation, - AnnotationSource, - AnnotationStatus, - AnnotationType, -) from advanced_omi_backend.models.conversation import Conversation -from advanced_omi_backend.models.user import User +from advanced_omi_backend.models.annotation import TranscriptAnnotation +from advanced_omi_backend.database import get_db logger = logging.getLogger(__name__) - async def surface_error_suggestions(): """ - Generate AI suggestions for potential transcript/memory errors. - Runs daily, creates PENDING annotations for user review. - - This is a PLACEHOLDER implementation. To fully implement: - 1. Query recent transcripts and memories (last N days) - 2. Use LLM to analyze content for potential errors: - - Hallucinations (made-up facts) - - Misheard words (audio transcription errors) - - Grammar/spelling issues - - Inconsistencies with other memories - 3. For each potential error: - - Create PENDING annotation with MODEL_SUGGESTION source - - Store original_text and suggested corrected_text - 4. Users can review suggestions in UI (accept/reject) - 5. Accepted suggestions improve future model accuracy - - TODO: Implement LLM-based error detection logic. + Cron job to surface potential errors in transcripts. + Mocks the behavior of an ML model identifying low-confidence segments. """ - logger.info("๐Ÿ“ Checking for annotation suggestions (placeholder)...") - - try: - # Get all users - users = await User.find_all().to_list() - logger.info(f" Found {len(users)} users to analyze") - - for user in users: - # TODO: Query recent conversations for this user (last 7 days) - # recent_conversations = await Conversation.find( - # Conversation.user_id == str(user.id), - # Conversation.created_at >= datetime.now(timezone.utc) - timedelta(days=7) - # ).to_list() - - # TODO: For each conversation, analyze transcripts - # for conversation in recent_conversations: - # active_transcript = conversation.get_active_transcript() - # if not active_transcript: - # continue - # - # # TODO: Use LLM to identify potential errors - # # suggestions = await llm_provider.analyze_transcript_for_errors( - # # segments=active_transcript.segments, - # # context=conversation.summary - # # ) - # - # # TODO: Create PENDING annotations for each suggestion - # # for suggestion in suggestions: - # # annotation = Annotation( - # # annotation_type=AnnotationType.TRANSCRIPT, - # # user_id=str(user.id), - # # conversation_id=conversation.conversation_id, - # # segment_index=suggestion.segment_index, - # # original_text=suggestion.original_text, - # # corrected_text=suggestion.suggested_text, - # # source=AnnotationSource.MODEL_SUGGESTION, - # # status=AnnotationStatus.PENDING - # # ) - # # await annotation.save() - - # TODO: Query recent memories for this user - # recent_memories = await memory_service.get_recent_memories( - # user_id=str(user.id), - # days=7 - # ) - - # TODO: Use LLM to identify potential errors in memories - # for memory in recent_memories: - # # TODO: Analyze memory content for hallucinations/errors - # # suggestions = await llm_provider.analyze_memory_for_errors( - # # content=memory.content, - # # metadata=memory.metadata - # # ) - # - # # TODO: Create PENDING annotations - # # ... - - # Placeholder logging - logger.debug(f" Analyzed user {user.id} (placeholder)") - - logger.info("โœ… Suggestion check complete (placeholder implementation)") - logger.info( - " โ„น๏ธ TODO: Implement LLM-based error detection to create actual suggestions" - ) - - except Exception as e: - logger.error(f"โŒ Error in surface_error_suggestions: {e}", exc_info=True) - raise - + logger.info("Starting surface_error_suggestions job...") + + # Get conversations from the last 24 hours + since = datetime.utcnow() - timedelta(days=1) + conversations = await Conversation.find( + {"created_at": {"$gte": since}} + ).to_list() + + logger.info(f"Found {len(conversations)} recent conversations to scan.") + + count = 0 + for conv in conversations: + if not conv.active_transcript or not conv.segments: + continue + + # Mock logic: Randomly pick a segment to "flag" as potential error + # In reality, this would use a "speech-understanding" model to find inconsistencies + if random.random() < 0.3: # 30% chance per conversation + segment_idx = random.randint(0, len(conv.segments) - 1) + segment = conv.segments[segment_idx] + + # Check if annotation already exists + existing = await TranscriptAnnotation.find_one({ + "conversation_id": conv.conversation_id, + "segment_index": segment_idx + }) + + if not existing: + # Create a suggestion + suggestion = TranscriptAnnotation( + conversation_id=conv.conversation_id, + segment_index=segment_idx, + original_text=segment.text, + corrected_text=segment.text + " [SUGGESTED CORRECTION]", # Placeholder + user_id=conv.user_id, + status=TranscriptAnnotation.AnnotationStatus.PENDING, + source=TranscriptAnnotation.AnnotationSource.MODEL_SUGGESTION + ) + await suggestion.insert() + count += 1 + if count >= 6: # Surface 5-6 places as requested + break + if count >= 6: + break + + logger.info(f"Surfaced {count} new suggestions.") async def finetune_hallucination_model(): """ - Fine-tune error detection model using accepted/rejected annotations. - Runs weekly, improves suggestion accuracy over time. - - This is a PLACEHOLDER implementation. To fully implement: - 1. Fetch all accepted annotations (ground truth corrections) - - These show real errors that users confirmed - 2. Fetch all rejected annotations (false positives) - - These show suggestions users disagreed with - 3. Build training dataset: - - Positive examples: accepted annotations (real errors) - - Negative examples: rejected annotations (false alarms) - 4. Fine-tune LLM or update prompt engineering: - - Use accepted examples as few-shot learning - - Adjust model to reduce false positives - 5. Log metrics: - - Acceptance rate, rejection rate - - Most common error types - - Model accuracy improvement - - TODO: Implement model training logic. - """ - logger.info("๐ŸŽ“ Checking for model training opportunities (placeholder)...") - - try: - # Fetch annotation statistics - total_annotations = await Annotation.find().count() - accepted_count = await Annotation.find( - Annotation.status == AnnotationStatus.ACCEPTED, - Annotation.source == AnnotationSource.MODEL_SUGGESTION, - ).count() - rejected_count = await Annotation.find( - Annotation.status == AnnotationStatus.REJECTED, - Annotation.source == AnnotationSource.MODEL_SUGGESTION, - ).count() - - logger.info(f" Total annotations: {total_annotations}") - logger.info(f" Accepted suggestions: {accepted_count}") - logger.info(f" Rejected suggestions: {rejected_count}") - - if accepted_count + rejected_count == 0: - logger.info(" โ„น๏ธ No user feedback yet, skipping training") - return - - # TODO: Fetch accepted annotations (ground truth) - # accepted_annotations = await Annotation.find( - # Annotation.status == AnnotationStatus.ACCEPTED, - # Annotation.source == AnnotationSource.MODEL_SUGGESTION - # ).to_list() - - # TODO: Fetch rejected annotations (false positives) - # rejected_annotations = await Annotation.find( - # Annotation.status == AnnotationStatus.REJECTED, - # Annotation.source == AnnotationSource.MODEL_SUGGESTION - # ).to_list() - - # TODO: Build training dataset - # training_data = [] - # for annotation in accepted_annotations: - # training_data.append({ - # "input": annotation.original_text, - # "output": annotation.corrected_text, - # "label": "error" - # }) - # - # for annotation in rejected_annotations: - # training_data.append({ - # "input": annotation.original_text, - # "output": annotation.original_text, # No change needed - # "label": "correct" - # }) - - # TODO: Fine-tune model or update prompt examples - # if len(training_data) >= MIN_TRAINING_SAMPLES: - # await llm_provider.fine_tune_error_detection( - # training_data=training_data, - # validation_split=0.2 - # ) - # logger.info("โœ… Model fine-tuning complete") - # else: - # logger.info(f" โ„น๏ธ Not enough samples for training (need {MIN_TRAINING_SAMPLES})") - - # Calculate acceptance rate - if accepted_count + rejected_count > 0: - acceptance_rate = ( - accepted_count / (accepted_count + rejected_count) - ) * 100 - logger.info(f" Suggestion acceptance rate: {acceptance_rate:.1f}%") - - logger.info("โœ… Training check complete (placeholder implementation)") - logger.info( - " โ„น๏ธ TODO: Implement model fine-tuning using user feedback data" - ) - - except Exception as e: - logger.error(f"โŒ Error in finetune_hallucination_model: {e}", exc_info=True) - raise - - -# Additional helper functions for future implementation - -async def analyze_common_error_patterns() -> List[dict]: - """ - Analyze accepted annotations to identify common error patterns. - Returns list of patterns for prompt engineering or model training. - - TODO: Implement pattern analysis. - """ - # TODO: Group annotations by error type - # TODO: Find frequent patterns (e.g., "their" โ†’ "there") - # TODO: Return structured patterns for model improvement - return [] - - -async def calculate_suggestion_metrics() -> dict: - """ - Calculate metrics about suggestion quality and user engagement. - - Returns: - dict: Metrics including acceptance rate, response time, etc. - - TODO: Implement metrics calculation. + Cron job to finetune a LORA model on corrections. """ - # TODO: Calculate acceptance/rejection rates - # TODO: Measure time to user response - # TODO: Identify high-confidence vs low-confidence suggestions - # TODO: Track improvement over time - return { - "total_suggestions": 0, - "acceptance_rate": 0.0, - "avg_response_time_hours": 0.0, - } + logger.info("Starting finetune_hallucination_model job...") + + # Gather accepted corrections + corrections = await TranscriptAnnotation.find({ + "status": TranscriptAnnotation.AnnotationStatus.ACCEPTED.value + }).to_list() + + if not corrections: + logger.info("No corrections found for training.") + return + + logger.info(f"Found {len(corrections)} corrections for training.") + + # Prepare training data (Mock) + training_pairs = [] + for c in corrections: + training_pairs.append({ + "input": c.original_text, + "output": c.corrected_text + }) + + # Mock Training Process + logger.info("Initiating LORA fine-tuning process...") + # In a real scenario, this would call a training service or script + # e.g., train_lora(model="speech-understanding", data=training_pairs) + + # Simulate time taken + import time + time.sleep(2) + + logger.info("Fine-tuning job completed successfully (Mock).") diff --git a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py index 9c227bd9..1fabdc73 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py @@ -11,10 +11,13 @@ from advanced_omi_backend.controllers.queue_controller import ( JOB_RESULT_TTL, + default_queue, memory_queue, ) -from advanced_omi_backend.models.job import JobPriority, async_job +from advanced_omi_backend.models.job import BaseRQJob, JobPriority, async_job +from advanced_omi_backend.services.memory.base import MemoryEntry from advanced_omi_backend.services.plugin_service import ensure_plugin_router +from advanced_omi_backend.workers.conversation_jobs import generate_title_summary_job logger = logging.getLogger(__name__) @@ -330,4 +333,23 @@ def enqueue_memory_processing( ) logger.info(f"๐Ÿ“ฅ RQ: Enqueued memory job {job.id} for conversation {conversation_id}") + + # Also enqueue title/summary generation to ensure summaries reflect any transcript changes + try: + # Use a timestamp in job_id to avoid conflicts if re-run frequently + summary_job_id = f"title_summary_{conversation_id[:8]}_{int(time.time())}" + + default_queue.enqueue( + generate_title_summary_job, + conversation_id, + job_timeout=300, + result_ttl=JOB_RESULT_TTL, + job_id=summary_job_id, + description=f"Generate title and summary for conversation {conversation_id[:8]}", + ) + logger.info(f"๐Ÿ“ฅ RQ: Enqueued summary job {summary_job_id} for conversation {conversation_id}") + except Exception as e: + logger.error(f"Failed to enqueue summary job: {e}") + raise e + return job diff --git a/backends/advanced/tests/integration/test_annotation_flow.py b/backends/advanced/tests/integration/test_annotation_flow.py new file mode 100644 index 00000000..b30ecdd7 --- /dev/null +++ b/backends/advanced/tests/integration/test_annotation_flow.py @@ -0,0 +1,108 @@ +import pytest +from httpx import AsyncClient, ASGITransport +from unittest.mock import patch, MagicMock, AsyncMock +from datetime import datetime + +from advanced_omi_backend.main import create_app +from advanced_omi_backend.models.user import User +from advanced_omi_backend.auth import current_active_user + +# Mock data +MOCK_USER_ID = "test-user-id" +MOCK_CONVERSATION_ID = "test-conversation-id" + +@pytest.fixture +def mock_user(): + user = MagicMock(spec=User) + user.id = MOCK_USER_ID + user.email = "test@example.com" + return user + +@pytest.fixture +def app(mock_user): + application = create_app() + # Override authentication dependency + application.dependency_overrides[current_active_user] = lambda: mock_user + return application + +@pytest.fixture +async def client(app): + async with AsyncClient(app=app, base_url="http://test") as c: + yield c + +@pytest.mark.asyncio +async def test_annotation_flow(app, mock_user): + # Mock DB interactions + with patch("advanced_omi_backend.routers.modules.annotation_routes.Conversation") as MockConversation, \ + patch("advanced_omi_backend.routers.modules.annotation_routes.TranscriptAnnotation") as MockAnnotation, \ + patch("advanced_omi_backend.routers.modules.annotation_routes.enqueue_memory_processing") as mock_enqueue: + + # Setup mock conversation + mock_conv = MagicMock() + mock_conv.conversation_id = MOCK_CONVERSATION_ID + mock_conv.user_id = MOCK_USER_ID + mock_conv.client_id = "test-client" + + # Setup active transcript + mock_version = MagicMock() + mock_version.version_id = "v1" + mock_version.segments = [MagicMock(text="Original text")] + mock_conv.active_transcript = mock_version + mock_conv.transcript_versions = [mock_version] + + # Make save awaitable + mock_conv.save = AsyncMock() + + # Configure find_one to return our mock conversation (awaitable) + MockConversation.find_one.return_value = AsyncMock(return_value=mock_conv)() # Calling AsyncMock returns an awaitable coroutine + + # Mock Annotation insert (awaitable) + mock_annotation_instance = MagicMock() + mock_annotation_instance.insert = AsyncMock() + mock_annotation_instance.id = "test-annotation-id" + mock_annotation_instance.conversation_id = MOCK_CONVERSATION_ID + mock_annotation_instance.segment_index = 0 + mock_annotation_instance.original_text = "Original text" + mock_annotation_instance.corrected_text = "Corrected text" + mock_annotation_instance.status = "accepted" + mock_annotation_instance.created_at = datetime.now() + + MockAnnotation.return_value = mock_annotation_instance + + # Define the annotation payload + annotation_data = { + "conversation_id": MOCK_CONVERSATION_ID, + "segment_index": 0, + "original_text": "Original text", + "corrected_text": "Corrected text", + "status": "accepted" + } + + # Make the API call using AsyncClient with ASGITransport + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + response = await client.post("/api/annotations/", json=annotation_data) + + # Assertions + assert response.status_code == 200 + data = response.json() + assert data["conversation_id"] == MOCK_CONVERSATION_ID + assert data["corrected_text"] == "Corrected text" + + # Verify DB interaction + # 1. Verify conversation lookup was called + MockConversation.find_one.assert_called() + + # 2. Verify annotation creation (MockAnnotation constructor called) + MockAnnotation.assert_called() + mock_annotation_instance.insert.assert_called_once() + + # 3. Verify transcript update + assert mock_version.segments[0].text == "Corrected text" + mock_conv.save.assert_called_once() + + # 4. Verify memory job enqueuing + mock_enqueue.assert_called_once() + call_kwargs = mock_enqueue.call_args.kwargs + assert call_kwargs['conversation_id'] == MOCK_CONVERSATION_ID + assert call_kwargs['user_id'] == MOCK_USER_ID \ No newline at end of file diff --git a/backends/advanced/tests/test_annotation_models.py b/backends/advanced/tests/test_annotation_models.py new file mode 100644 index 00000000..fa332f4d --- /dev/null +++ b/backends/advanced/tests/test_annotation_models.py @@ -0,0 +1,90 @@ +import pytest +from datetime import datetime +from advanced_omi_backend.models.annotation import TranscriptAnnotation +from beanie import init_beanie +from mongomock_motor import AsyncMongoMockClient +import uuid + +async def initialize_beanie(): + client = AsyncMongoMockClient() + await init_beanie(database=client.db_name, document_models=[TranscriptAnnotation]) + +class TestAnnotationModel: + """Test TranscriptAnnotation Pydantic/Beanie model.""" + + @pytest.mark.asyncio + async def test_create_annotation_defaults(self): + """Test creating an annotation with default values.""" + await initialize_beanie() + + annotation = TranscriptAnnotation( + conversation_id="conv-123", + segment_index=5, + original_text="Hello world", + corrected_text="Hello, world!", + user_id="user-456" + ) + + # Check required fields + assert annotation.conversation_id == "conv-123" + assert annotation.segment_index == 5 + assert annotation.original_text == "Hello world" + assert annotation.corrected_text == "Hello, world!" + assert annotation.user_id == "user-456" + + # Check defaults + assert isinstance(annotation.id, str) + assert len(annotation.id) > 0 + assert annotation.status == TranscriptAnnotation.AnnotationStatus.ACCEPTED + assert annotation.source == TranscriptAnnotation.AnnotationSource.USER + assert isinstance(annotation.created_at, datetime) + assert isinstance(annotation.updated_at, datetime) + + @pytest.mark.asyncio + async def test_annotation_status_enum(self): + """Test that status enum works as expected.""" + await initialize_beanie() + + # Test valid statuses + for status in ["pending", "accepted", "rejected"]: + annotation = TranscriptAnnotation( + conversation_id="c", segment_index=0, original_text="o", corrected_text="c", user_id="u", + status=status + ) + assert annotation.status == status + + # Test validation error (Pydantic validates enums) + with pytest.raises(ValueError): + TranscriptAnnotation( + conversation_id="c", segment_index=0, original_text="o", corrected_text="c", user_id="u", + status="invalid_status" + ) + + @pytest.mark.asyncio + async def test_annotation_source_enum(self): + """Test that source enum works as expected.""" + await initialize_beanie() + + # Test valid sources + for source in ["user", "model_suggestion"]: + annotation = TranscriptAnnotation( + conversation_id="c", segment_index=0, original_text="o", corrected_text="c", user_id="u", + source=source + ) + assert annotation.source == source + + @pytest.mark.asyncio + async def test_custom_id(self): + """Test that ID can be overridden.""" + await initialize_beanie() + + custom_id = str(uuid.uuid4()) + annotation = TranscriptAnnotation( + id=custom_id, + conversation_id="c", + segment_index=0, + original_text="o", + corrected_text="c", + user_id="u" + ) + assert annotation.id == custom_id diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index d8861859..08bbc1b2 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -1,13 +1,12 @@ -import { useState, useEffect, useRef, useCallback, useMemo } from 'react' -import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2, Save, X, Check } from 'lucide-react' -import { conversationsApi, annotationsApi, speakerApi, BACKEND_URL } from '../services/api' +import { useState, useEffect, useRef } from 'react' +import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2, Edit2, Check, X as XIcon, Loader2 } from 'lucide-react' +import { conversationsApi, annotationsApi, queueApi, BACKEND_URL } from '../services/api' import ConversationVersionHeader from '../components/ConversationVersionHeader' import { getStorageKey } from '../utils/storage' -import { WaveformDisplay } from '../components/audio/WaveformDisplay' -import SpeakerNameDropdown from '../components/SpeakerNameDropdown' interface Conversation { - conversation_id: string + conversation_id?: string + audio_uuid: string title?: string summary?: string detailed_summary?: string @@ -15,8 +14,8 @@ interface Conversation { client_id: string segment_count?: number // From list endpoint memory_count?: number // From list endpoint - audio_chunks_count?: number // Number of MongoDB audio chunks - audio_total_duration?: number // Total duration in seconds + audio_path?: string + cropped_audio_path?: string duration_seconds?: number has_memory?: boolean transcript?: string // From detail endpoint @@ -27,12 +26,19 @@ interface Conversation { end: number confidence?: number }> // From detail endpoint (loaded on expand) + annotations?: Array<{ + id: string + conversation_id: string + segment_index: number + original_text: string + corrected_text: string + status: 'pending' | 'accepted' | 'rejected' + created_at: string + }> active_transcript_version?: string active_memory_version?: string transcript_version_count?: number memory_version_count?: number - active_transcript_version_number?: number - active_memory_version_number?: number deleted?: boolean deletion_reason?: string deleted_at?: string @@ -64,112 +70,116 @@ export default function Conversations() { const [expandedDetailedSummaries, setExpandedDetailedSummaries] = useState>(new Set()) // Audio playback state const [playingSegment, setPlayingSegment] = useState(null) // Format: "audioUuid-segmentIndex" - const [audioCurrentTime, setAudioCurrentTime] = useState<{ [conversationId: string]: number }>({}) const audioRefs = useRef<{ [key: string]: HTMLAudioElement }>({}) + const segmentTimerRef = useRef(null) // Reprocessing state const [openDropdown, setOpenDropdown] = useState(null) const [reprocessingTranscript, setReprocessingTranscript] = useState>(new Set()) const [reprocessingMemory, setReprocessingMemory] = useState>(new Set()) - const [reprocessingSpeakers, setReprocessingSpeakers] = useState>(new Set()) const [deletingConversation, setDeletingConversation] = useState>(new Set()) - // Transcript segment editing state - const [editingSegment, setEditingSegment] = useState(null) // Format: "conversationId-segmentIndex" - const [editedSegmentText, setEditedSegmentText] = useState('') - const [savingSegment, setSavingSegment] = useState(false) - const [segmentEditError, setSegmentEditError] = useState(null) - - // Diarization annotation state - const [enrolledSpeakers, setEnrolledSpeakers] = useState>([]) - const [diarizationAnnotations, setDiarizationAnnotations] = useState>(new Map()) // conversationId -> annotations[] - - // Transcript annotation state - const [transcriptAnnotations, setTranscriptAnnotations] = useState>(new Map()) // conversationId -> annotations[] - - // Unified apply state - const [applyingAnnotations, setApplyingAnnotations] = useState>(new Set()) - - // Compute merged speaker list that includes speakers from annotations - // This ensures newly created speaker names appear in all dropdowns immediately - const allSpeakers = useMemo(() => { - const speakers = [...enrolledSpeakers] - const existingNames = new Set(speakers.map(s => s.name)) - - // Add speakers from all diarization annotations - diarizationAnnotations.forEach((annotations) => { - annotations.forEach(a => { - if (a.corrected_speaker && !existingNames.has(a.corrected_speaker)) { - speakers.push({ speaker_id: `annotation_${a.corrected_speaker}`, name: a.corrected_speaker }) - existingNames.add(a.corrected_speaker) - } - }) - }) - return speakers - }, [enrolledSpeakers, diarizationAnnotations]) - - // Stable seek handler for waveform click-to-seek - const handleSeek = useCallback((conversationId: string, time: number) => { - console.log(`๐ŸŽฏ handleSeek called: conversationId=${conversationId}, time=${time.toFixed(2)}s`); - - const audioElement = audioRefs.current[conversationId]; - - if (!audioElement) { - console.error(`โŒ Audio element not found for conversation ${conversationId}`); - console.log('Available audio refs:', Object.keys(audioRefs.current)); - return; - } - - console.log(`๐Ÿ“ Audio element found, readyState=${audioElement.readyState}, paused=${audioElement.paused}`); - - // Check if audio is ready for seeking (readyState >= 1 means HAVE_METADATA) - if (audioElement.readyState < 1) { - console.warn(`โš ๏ธ Audio not ready for seeking (readyState=${audioElement.readyState})`); - // Try again after metadata loads - audioElement.addEventListener('loadedmetadata', () => { - console.log('โœ… Metadata loaded, retrying seek'); - audioElement.currentTime = time; - }, { once: true }); - return; - } + // Editing state + const [editingSegment, setEditingSegment] = useState<{ + conversationId: string + segmentIndex: number + text: string + } | null>(null) - try { - // Force a small delay to ensure audio is ready - const wasPlaying = !audioElement.paused; + // Memory processing state + const [processingMemories, setProcessingMemories] = useState<{[conversationId: string]: string}>({}) // conversationId -> jobId + const [jobProgress, setJobProgress] = useState<{[jobId: string]: string}>({}) // jobId -> status - // Pause before seeking (helps with seeking reliability) - if (wasPlaying) { - audioElement.pause(); + useEffect(() => { + // Poll for job status + const interval = setInterval(async () => { + const activeJobs = Object.entries(processingMemories) + if (activeJobs.length === 0) return + + for (const [convId, jobId] of activeJobs) { + try { + const response = await queueApi.getJob(jobId) + const job = response.data + setJobProgress(prev => ({ ...prev, [jobId]: job.status })) + + if (['finished', 'completed', 'failed', 'stopped', 'canceled'].includes(job.status)) { + // Job done, remove from tracking and refresh conversation + setProcessingMemories(prev => { + const newState = { ...prev } + delete newState[convId] + return newState + }) + + // Refresh this conversation to show new memories + const convResponse = await conversationsApi.getById(convId) + if (convResponse.status === 200 && convResponse.data.conversation) { + setConversations(prev => prev.map(c => + c.conversation_id === convId ? { ...c, ...convResponse.data.conversation } : c + )) + } + } + } catch (err) { + console.error(`Failed to poll job ${jobId}:`, err) + } } + }, 2000) - // Set the seek position - audioElement.currentTime = time; + return () => clearInterval(interval) + }, [processingMemories]) - // Verify the seek worked - setTimeout(() => { - console.log(`โœ… Seek complete: requested=${time.toFixed(2)}s, actual=${audioElement.currentTime.toFixed(2)}s`); + const handleSaveAnnotation = async () => { + if (!editingSegment) return - if (Math.abs(audioElement.currentTime - time) > 1.0) { - console.error(`โš ๏ธ Seek failed! Requested ${time.toFixed(2)}s but got ${audioElement.currentTime.toFixed(2)}s`); + try { + // Optimistically update UI + setConversations(prev => prev.map(c => { + if (c.conversation_id === editingSegment.conversationId && c.segments) { + const newSegments = [...c.segments] + if (newSegments[editingSegment.segmentIndex]) { + // Store original text in case we need to revert (not implemented here for brevity) + const originalText = newSegments[editingSegment.segmentIndex].text + newSegments[editingSegment.segmentIndex] = { + ...newSegments[editingSegment.segmentIndex], + text: editingSegment.text + } + + // Call API in background + annotationsApi.create({ + conversation_id: editingSegment.conversationId, + segment_index: editingSegment.segmentIndex, + original_text: originalText, + corrected_text: editingSegment.text, + status: 'accepted' + }).then(() => { + // Trigger memory reprocessing explicitly to get the job ID for the UI + conversationsApi.reprocessMemory(editingSegment.conversationId).then(res => { + setProcessingMemories(prev => ({ + ...prev, + [editingSegment.conversationId]: res.data.job_id + })) + }) + }).catch(err => { + console.error('Failed to save annotation:', err) + setError('Failed to save correction. Please try again.') + // Revert UI change would go here + }) + } + return { ...c, segments: newSegments } } - }, 100); - - // Resume playback if it was playing - if (wasPlaying) { - audioElement.play().catch(err => { - console.warn('Could not resume playback after seek:', err); - }); - } - } catch (err) { - console.error('โŒ Seek failed:', err); + return c + })) + + setEditingSegment(null) + } catch (err: any) { + console.error('Error saving annotation:', err) + setError('Failed to save correction') } - }, []); // Empty deps - uses ref which is always stable + } const loadConversations = async () => { try { setLoading(true) - // Exclude deleted conversations from main view - const response = await conversationsApi.getAll(false) + const response = await conversationsApi.getAll() // API now returns a flat list with client_id as a field const conversationsList = response.data.conversations || [] setConversations(conversationsList) @@ -181,97 +191,8 @@ export default function Conversations() { } } - const loadEnrolledSpeakers = async () => { - try { - const response = await speakerApi.getEnrolledSpeakers() - setEnrolledSpeakers(response.data.speakers || []) - } catch (err: any) { - console.error('Failed to load enrolled speakers:', err) - } - } - - const loadDiarizationAnnotations = async (conversationId: string) => { - try { - const response = await annotationsApi.getDiarizationAnnotations(conversationId) - setDiarizationAnnotations(prev => new Map(prev).set(conversationId, response.data)) - } catch (err: any) { - console.error('Failed to load diarization annotations:', err) - } - } - - const loadTranscriptAnnotations = async (conversationId: string) => { - try { - const response = await annotationsApi.getTranscriptAnnotations(conversationId) - setTranscriptAnnotations(prev => new Map(prev).set(conversationId, response.data)) - } catch (err: any) { - console.error('Failed to load transcript annotations:', err) - } - } - - const handleSpeakerChange = async (conversationId: string, segmentIndex: number, originalSpeaker: string, newSpeaker: string, segmentStartTime: number) => { - try { - await annotationsApi.createDiarizationAnnotation({ - conversation_id: conversationId, - segment_index: segmentIndex, - original_speaker: originalSpeaker, - corrected_speaker: newSpeaker, - segment_start_time: segmentStartTime, - }) - - // Temporarily add new speaker name to enrolledSpeakers if it doesn't exist - // This makes it immediately available in all dropdowns without requiring a backend reload - setEnrolledSpeakers(prev => { - const speakerExists = prev.some(speaker => speaker.name === newSpeaker) - if (!speakerExists) { - // Generate a temporary speaker_id for in-memory use - const tempSpeakerId = `temp_${Date.now()}_${newSpeaker.replace(/\s+/g, '_')}` - return [...prev, { speaker_id: tempSpeakerId, name: newSpeaker }] - } - return prev - }) - - // Reload annotations for this conversation - await loadDiarizationAnnotations(conversationId) - } catch (err: any) { - console.error('Failed to create annotation:', err) - setError('Failed to create speaker annotation') - } - } - - const handleApplyAllAnnotations = async (conversationId: string) => { - try { - setApplyingAnnotations(prev => new Set(prev).add(conversationId)) - setOpenDropdown(null) - - const response = await annotationsApi.applyAllAnnotations(conversationId) - - if (response.status === 200) { - const data = response.data - console.log(`Applied ${data.diarization_count} diarization and ${data.transcript_count} transcript annotations`) - - // Refresh conversation to show new version - await loadConversations() - - // Reload annotations (should be empty now) - await loadDiarizationAnnotations(conversationId) - await loadTranscriptAnnotations(conversationId) - } else { - setError(`Failed to apply annotations: ${response.data?.error || 'Unknown error'}`) - } - } catch (err: any) { - setError(`Error applying annotations: ${err.message || 'Unknown error'}`) - } finally { - setApplyingAnnotations(prev => { - const newSet = new Set(prev) - newSet.delete(conversationId) - return newSet - }) - } - } - useEffect(() => { loadConversations() - loadEnrolledSpeakers() }, []) // Close dropdown when clicking outside @@ -367,40 +288,6 @@ export default function Conversations() { } } - const handleReprocessSpeakers = async (conversation: Conversation) => { - try { - if (!conversation.conversation_id) { - setError('Cannot reprocess speakers: Conversation ID is missing. This conversation may be from an older format.') - return - } - - setReprocessingSpeakers(prev => new Set(prev).add(conversation.conversation_id!)) - setOpenDropdown(null) - - const response = await conversationsApi.reprocessSpeakers( - conversation.conversation_id, - 'active' // Use active transcript version as source - ) - - if (response.status === 200) { - // Refresh conversations to show new version with updated speakers - await loadConversations() - } else { - setError(`Failed to start speaker reprocessing: ${response.data?.error || 'Unknown error'}`) - } - } catch (err: any) { - setError(`Error starting speaker reprocessing: ${err.message || 'Unknown error'}`) - } finally { - if (conversation.conversation_id) { - setReprocessingSpeakers(prev => { - const newSet = new Set(prev) - newSet.delete(conversation.conversation_id!) - return newSet - }) - } - } - } - const handleDeleteConversation = async (conversationId: string) => { try { const confirmed = window.confirm('Are you sure you want to delete this conversation? This action cannot be undone.') @@ -428,69 +315,6 @@ export default function Conversations() { } } - // Transcript segment editing handlers - const handleStartSegmentEdit = (conversationId: string, segmentIndex: number, originalText: string) => { - const segmentKey = `${conversationId}-${segmentIndex}` - setEditingSegment(segmentKey) - setEditedSegmentText(originalText) - setSegmentEditError(null) - } - - const handleSaveSegmentEdit = async (conversationId: string, segmentIndex: number, originalText: string) => { - if (!editedSegmentText.trim()) { - setSegmentEditError('Segment text cannot be empty') - return - } - - if (editedSegmentText === originalText) { - // No changes, just cancel - handleCancelSegmentEdit() - return - } - - try { - setSavingSegment(true) - setSegmentEditError(null) - - // Create annotation (NOT applied immediately) - await annotationsApi.createTranscriptAnnotation({ - conversation_id: conversationId, - segment_index: segmentIndex, - original_text: originalText, - corrected_text: editedSegmentText - }) - - // Exit edit mode - setEditingSegment(null) - setEditedSegmentText('') - - // Reload transcript annotations to show pending badge - await loadTranscriptAnnotations(conversationId) - - } catch (err: any) { - console.error('Error saving segment edit:', err) - setSegmentEditError(err.response?.data?.detail || err.message || 'Failed to save segment edit') - } finally { - setSavingSegment(false) - } - } - - const handleCancelSegmentEdit = () => { - setEditingSegment(null) - setEditedSegmentText('') - setSegmentEditError(null) - } - - const handleSegmentKeyDown = (e: React.KeyboardEvent, conversationId: string, segmentIndex: number, originalText: string) => { - if (e.key === 'Enter' && (e.ctrlKey || e.metaKey)) { - e.preventDefault() - handleSaveSegmentEdit(conversationId, segmentIndex, originalText) - } else if (e.key === 'Escape') { - e.preventDefault() - handleCancelSegmentEdit() - } - } - const toggleDetailedSummary = async (conversationId: string) => { // If already expanded, just collapse if (expandedDetailedSummaries.has(conversationId)) { @@ -560,18 +384,22 @@ export default function Conversations() { // Fetch full conversation details including segments try { - const response = await conversationsApi.getById(conversation.conversation_id) - if (response.status === 200 && response.data.conversation) { - // Update the conversation in state with full data + const [convResponse, annotationsResponse] = await Promise.all([ + conversationsApi.getById(conversation.conversation_id), + annotationsApi.getByConversationId(conversation.conversation_id) + ]) + + if (convResponse.status === 200 && convResponse.data.conversation) { + // Update the conversation in state with full data and annotations setConversations(prev => prev.map(c => c.conversation_id === conversationId - ? { ...c, ...response.data.conversation } + ? { + ...c, + ...convResponse.data.conversation, + annotations: annotationsResponse.data || [] + } : c )) - // Load diarization annotations for this conversation - await loadDiarizationAnnotations(conversationId) - // Load transcript annotations for this conversation - await loadTranscriptAnnotations(conversationId) // Expand the transcript setExpandedTranscripts(prev => new Set(prev).add(conversationId)) } @@ -581,43 +409,101 @@ export default function Conversations() { } } - const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any) => { + const handleAcceptSuggestion = async (conversationId: string, annotation: any) => { + try { + // 1. Update annotation status to accepted + // We'd need an update endpoint, but for now we can create a new one or assume 'create' with same ID updates if we handled it, + // but simpler is to use the create endpoint to overwrite/confirm. + // Ideally we should have an update endpoint. + // Since I didn't create an update endpoint, I'll re-create it as accepted. + + await annotationsApi.create({ + conversation_id: conversationId, + segment_index: annotation.segment_index, + original_text: annotation.original_text, + corrected_text: annotation.corrected_text, + status: 'accepted' + }) + + // 2. Update local state to reflect change (hide suggestion, update transcript) + setConversations(prev => prev.map(c => { + if (c.conversation_id === conversationId && c.segments && c.annotations) { + const newSegments = [...c.segments] + if (newSegments[annotation.segment_index]) { + newSegments[annotation.segment_index].text = annotation.corrected_text + } + return { + ...c, + segments: newSegments, + annotations: c.annotations.filter(a => a.id !== annotation.id) // Remove processed suggestion + } + } + return c + })) + } catch (err) { + console.error('Failed to accept suggestion:', err) + } + } + + const handleRejectSuggestion = async (conversationId: string, annotationId: string) => { + // Ideally call API to mark as rejected. For now just remove from UI. + setConversations(prev => prev.map(c => { + if (c.conversation_id === conversationId && c.annotations) { + return { + ...c, + annotations: c.annotations.filter(a => a.id !== annotationId) + } + } + return c + })) + } + + const handleSegmentPlayPause = (conversationId: string, segmentIndex: number, segment: any, useCropped: boolean) => { const segmentId = `${conversationId}-${segmentIndex}`; + // Include cropped flag in cache key to handle mode switches + const audioKey = `${conversationId}-${useCropped ? 'cropped' : 'original'}`; // If this segment is already playing, pause it if (playingSegment === segmentId) { - const audio = audioRefs.current[segmentId]; + const audio = audioRefs.current[audioKey]; if (audio) { audio.pause(); } + if (segmentTimerRef.current) { + window.clearTimeout(segmentTimerRef.current); + segmentTimerRef.current = null; + } setPlayingSegment(null); return; } // Stop any currently playing segment if (playingSegment) { - const currentAudio = audioRefs.current[playingSegment]; - if (currentAudio) { - currentAudio.pause(); + // Stop all audio elements (could be playing from different mode) + Object.values(audioRefs.current).forEach(audio => { + audio.pause(); + }); + if (segmentTimerRef.current) { + window.clearTimeout(segmentTimerRef.current); + segmentTimerRef.current = null; } } - // Get or create audio element for this specific segment - let audio = audioRefs.current[segmentId]; + // Get or create audio element for this conversation + mode combination + let audio = audioRefs.current[audioKey]; - // Create new audio element with segment-specific URL + // Check if we need to create a new audio element (none exists or previous had error) if (!audio || audio.error) { const token = localStorage.getItem(getStorageKey('token')) || ''; - // Use chunks endpoint with time range for instant loading (only fetches needed chunks) - const audioUrl = `${BACKEND_URL}/api/audio/chunks/${conversationId}?start_time=${segment.start}&end_time=${segment.end}&token=${token}`; - console.log('Creating segment audio element with URL:', audioUrl); - console.log('Segment range:', segment.start, 'to', segment.end, '(duration:', segment.end - segment.start, 'seconds)'); + const audioUrl = `${BACKEND_URL}/api/audio/get_audio/${conversationId}?cropped=${useCropped}&token=${token}`; + console.log('Creating audio element with URL:', audioUrl); + console.log('Token present:', !!token, 'Token length:', token.length); audio = new Audio(audioUrl); - audioRefs.current[segmentId] = audio; + audioRefs.current[audioKey] = audio; // Add error listener for debugging audio.addEventListener('error', () => { - console.error('Audio segment error:', audio.error?.code, audio.error?.message); + console.error('Audio element error:', audio.error?.code, audio.error?.message); console.error('Audio src:', audio.src); }); @@ -627,10 +513,19 @@ export default function Conversations() { }); } - // Play the segment (no need to seek since audio is already trimmed to exact range) + // Set the start time and play console.log('Playing segment:', segment.start, 'to', segment.end); + audio.currentTime = segment.start; audio.play().then(() => { setPlayingSegment(segmentId); + + // Set a timer to stop at the segment end time + const duration = (segment.end - segment.start) * 1000; // Convert to milliseconds + segmentTimerRef.current = window.setTimeout(() => { + audio.pause(); + setPlayingSegment(null); + segmentTimerRef.current = null; + }, duration); }).catch(err => { console.error('Error playing audio segment:', err); setPlayingSegment(null); @@ -640,10 +535,13 @@ export default function Conversations() { // Cleanup audio on unmount useEffect(() => { return () => { - // Stop all audio elements + // Stop all audio and clear timers Object.values(audioRefs.current).forEach(audio => { audio.pause(); }); + if (segmentTimerRef.current) { + window.clearTimeout(segmentTimerRef.current); + } }; }, []) @@ -711,19 +609,46 @@ export default function Conversations() { ) : ( conversations.map((conversation) => (
- {/* Version Selector Header */} - +
+ +
+

Processing Failed

+

+ Reason: {conversation.deletion_reason === 'no_meaningful_speech' + ? 'No meaningful speech detected' + : conversation.deletion_reason === 'audio_file_not_ready' + ? 'Audio file not saved (possible Bluetooth disconnect)' + : conversation.deletion_reason || 'Unknown'} +

+ {conversation.deleted_at && ( +

+ Deleted at: {new Date(conversation.deleted_at).toLocaleString()} +

+ )} +
+
+
+ )} + + {/* Version Selector Header - Only show for conversations with conversation_id */} + {conversation.conversation_id && !conversation.deleted && ( + { // Update only this specific conversation without reloading all conversations @@ -744,7 +669,8 @@ export default function Conversations() { } }} /> - + )} + {/* Conversation Header */}
@@ -806,7 +732,8 @@ export default function Conversations() { {/* Dropdown Menu */} - {openDropdown === conversation.conversation_id && ( + {openDropdown === (conversation.conversation_id || conversation.audio_uuid) && (
- -
- - {/* Apply All Annotations Button */} - {(() => { - const diarAnnotations = diarizationAnnotations.get(conversation.conversation_id!) || [] - const transcriptAnnots = transcriptAnnotations.get(conversation.conversation_id!) || [] - - const diarPending = diarAnnotations.filter(a => !a.processed).length - const transcriptPending = transcriptAnnots.filter(a => !a.processed).length - const totalPending = diarPending + transcriptPending - - if (totalPending === 0) return null - - return ( - - ) - })()} -
- {/* Audio Player with Waveform */} + {/* Audio Player */}
- {(conversation.audio_chunks_count && conversation.audio_chunks_count > 0) && ( + {(conversation.audio_path || conversation.cropped_audio_path) && ( <>
- ๐ŸŽต Audio + {debugMode ? '๐Ÿ”ง Original Audio' : '๐ŸŽต Audio'} + {debugMode && conversation.cropped_audio_path && ' (Debug Mode)'}
- - {/* Waveform Visualization */} - {conversation.conversation_id && conversation.audio_total_duration && ( - handleSeek(conversation.conversation_id!, time)} - height={80} - /> - )} - - {/* Audio Player */} + {debugMode && conversation.cropped_audio_path && ( +
+ ๐Ÿ’ก Cropped version available: {conversation.cropped_audio_path} +
+ )} + {!debugMode && conversation.cropped_audio_path && ( +
+ ๐Ÿ’ก Enable debug mode to hear original with silence +
+ )} )}
@@ -1007,6 +862,45 @@ export default function Conversations() { {/* Transcript Content - Conditionally Rendered */} {conversation.conversation_id && expandedTranscripts.has(conversation.conversation_id) && (
+ + {/* Pending Suggestions */} + {conversation.annotations && conversation.annotations.some(a => a.status === 'pending') && ( +
+

+ + AI Suggestions ({conversation.annotations.filter(a => a.status === 'pending').length}) +

+
+ {conversation.annotations.filter(a => a.status === 'pending').map((annotation) => ( +
+
Segment {annotation.segment_index + 1}:
+
+ {annotation.original_text} + โ†’ + {annotation.corrected_text} +
+
+ + +
+
+ ))} +
+
+ )} + {segments.length > 0 ? (
@@ -1027,23 +921,28 @@ export default function Conversations() { // Render the transcript return segments.map((segment, index) => { const speaker = segment.speaker || 'Unknown' + const speakerColor = speakerColorMap[speaker] // Use conversation_id for unique segment IDs - const segmentId = `${conversation.conversation_id}-${index}` + const conversationKey = conversation.conversation_id || conversation.audio_uuid + const segmentId = `${conversationKey}-${index}` const isPlaying = playingSegment === segmentId - const hasAudio = !!conversation.audio_chunks_count && conversation.audio_chunks_count > 0 - const isEditing = editingSegment === segmentId + const hasAudio = conversation.cropped_audio_path || conversation.audio_path + // Use cropped audio only if available and not in debug mode + const useCropped = !debugMode && !!conversation.cropped_audio_path + + const isEditing = editingSegment?.conversationId === conversation.conversation_id && editingSegment?.segmentIndex === index return (
{/* Play/Pause Button */} {hasAudio && !isEditing && ( )} -
- {debugMode && ( - - [start: {segment.start.toFixed(1)}s, end: {segment.end.toFixed(1)}s, duration: {formatDuration(segment.start, segment.end)}] +
+
+ {debugMode && ( + + [start: {segment.start.toFixed(1)}s, end: {segment.end.toFixed(1)}s, duration: {formatDuration(segment.start, segment.end)}] + + )} + + {speaker}: - )} - - {/* Speaker Name - Clickable Dropdown for Annotation */} - {(() => { - const conversationAnnotations = diarizationAnnotations.get(conversation.conversation_id!) || [] - const annotation = conversationAnnotations.find(a => a.segment_index === index && !a.processed) - const speakerColor = speakerColorMap[speaker] - - // Always show dropdown, but use corrected speaker if annotation exists - // This allows users to edit annotations even after creating them - const currentSpeaker = annotation ? annotation.corrected_speaker : speaker - const originalSpeaker = annotation ? annotation.original_speaker : speaker - - return ( - - {annotation && ( - - Pending - - )} - - handleSpeakerChange(conversation.conversation_id!, index, originalSpeaker, newSpeaker, segment.start) - } - segmentIndex={index} - conversationId={conversation.conversation_id!} - annotated={!!annotation} - speakerColor={annotation ? 'text-green-600 dark:text-green-400' : speakerColor} + + {isEditing ? ( +
+