From 112ee0182f9965bc1867787ad6df26bfeb8c911b Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Sat, 7 Feb 2026 07:59:54 +0530 Subject: [PATCH 1/6] Enhance ASR service descriptions and provider feedback in wizard.py (#290) - Updated the description for the 'asr-services' to remove the specific mention of 'Parakeet', making it more general. - Improved the console output for auto-selected services to include the transcription provider label, enhancing user feedback during service selection. --- wizard.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/wizard.py b/wizard.py index 2ede3a6a..e3beb37a 100755 --- a/wizard.py +++ b/wizard.py @@ -42,7 +42,7 @@ 'asr-services': { 'path': 'extras/asr-services', 'cmd': ['uv', 'run', '--with-requirements', '../../setup-requirements.txt', 'python', 'init.py'], - 'description': 'Offline speech-to-text (Parakeet)' + 'description': 'Offline speech-to-text' }, 'openmemory-mcp': { 'path': 'extras/openmemory-mcp', @@ -131,7 +131,8 @@ def select_services(transcription_provider=None): for service_name, service_config in SERVICES['extras'].items(): # Skip services that will be auto-added based on earlier choices if service_name in auto_added: - console.print(f" ✅ {service_config['description']} [dim](auto-selected for {transcription_provider})[/dim]") + provider_label = {"vibevoice": "VibeVoice", "parakeet": "Parakeet"}.get(transcription_provider, transcription_provider) + console.print(f" ✅ {service_config['description']} ({provider_label}) [dim](auto-selected)[/dim]") continue # Check if service exists From cfba303b831943823b86d7463040ec48765bb5c2 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Sat, 7 Feb 2026 09:33:43 +0000 Subject: [PATCH 2/6] Refactor Obsidian and Knowledge Graph integration in services and setup - Removed redundant Obsidian and Knowledge Graph configuration checks from services.py, streamlining the command execution process. - Updated wizard.py to enhance user experience by setting default options for speaker recognition during service selection. - Improved Neo4j password handling in setup processes, ensuring consistent configuration prompts and feedback. - Introduced a new cron scheduler for managing scheduled tasks, enhancing the backend's automation capabilities. - Added new entity annotation features, allowing for corrections and updates to knowledge graph entities directly through the API. --- backends/advanced/docker-compose.yml | 3 - backends/advanced/init.py | 113 ++--- backends/advanced/pyproject.toml | 1 + .../src/advanced_omi_backend/app_factory.py | 27 ++ .../advanced_omi_backend/cron_scheduler.py | 277 +++++++++++ .../advanced_omi_backend/models/annotation.py | 28 +- .../advanced_omi_backend/prompt_defaults.py | 36 ++ .../routers/modules/annotation_routes.py | 128 +++++ .../routers/modules/finetuning_routes.py | 298 ++++++++++-- .../routers/modules/knowledge_graph_routes.py | 84 ++++ .../services/knowledge_graph/service.py | 38 ++ .../memory/providers/vector_stores.py | 50 ++ .../services/transcription/__init__.py | 59 ++- .../services/transcription/base.py | 4 +- .../services/transcription/context.py | 90 ++++ .../workers/finetuning_jobs.py | 235 +++++++++ .../workers/transcription_jobs.py | 22 +- backends/advanced/webui/package-lock.json | 10 + backends/advanced/webui/package.json | 1 + .../components/knowledge-graph/EntityCard.tsx | 139 +++++- .../components/knowledge-graph/EntityList.tsx | 11 + .../advanced/webui/src/pages/Finetuning.tsx | 458 +++++++++++++++--- backends/advanced/webui/src/services/api.ts | 18 + extras/asr-services/common/base_service.py | 25 +- .../providers/faster_whisper/service.py | 8 +- extras/asr-services/providers/nemo/service.py | 8 +- .../providers/transformers/service.py | 9 +- .../providers/vibevoice/service.py | 12 +- .../providers/vibevoice/transcriber.py | 53 +- extras/asr-services/pyproject.toml | 1 + extras/asr-services/scripts/convert_to_ct2.py | 122 +++++ extras/langfuse/docker-compose.yml | 16 +- services.py | 50 -- wizard.py | 101 ++-- 34 files changed, 2166 insertions(+), 369 deletions(-) create mode 100644 backends/advanced/src/advanced_omi_backend/cron_scheduler.py create mode 100644 backends/advanced/src/advanced_omi_backend/services/transcription/context.py create mode 100644 backends/advanced/src/advanced_omi_backend/workers/finetuning_jobs.py create mode 100644 extras/asr-services/scripts/convert_to_ct2.py diff --git a/backends/advanced/docker-compose.yml b/backends/advanced/docker-compose.yml index 95cc4cab..3eb7e108 100644 --- a/backends/advanced/docker-compose.yml +++ b/backends/advanced/docker-compose.yml @@ -267,9 +267,6 @@ services: timeout: 10s retries: 5 start_period: 30s - profiles: - - obsidian - - knowledge-graph # ollama: # image: ollama/ollama:latest diff --git a/backends/advanced/init.py b/backends/advanced/init.py index ff57242d..0b536bde 100644 --- a/backends/advanced/init.py +++ b/backends/advanced/init.py @@ -444,41 +444,44 @@ def setup_optional_services(self): self.config["TS_AUTHKEY"] = self.args.ts_authkey self.console.print(f"[green][SUCCESS][/green] Tailscale auth key configured (Docker integration enabled)") + def setup_neo4j(self): + """Configure Neo4j credentials (always required - used by Knowledge Graph)""" + neo4j_password = getattr(self.args, 'neo4j_password', None) + + if neo4j_password: + self.console.print(f"[green]✅[/green] Neo4j: password configured via wizard") + else: + # Interactive prompt (standalone init.py run) + self.console.print() + self.console.print("[bold cyan]Neo4j Configuration[/bold cyan]") + self.console.print("Neo4j is used for Knowledge Graph (entity/relationship extraction)") + self.console.print() + neo4j_password = self.prompt_password("Neo4j password (min 8 chars)") + + self.config["NEO4J_HOST"] = "neo4j" + self.config["NEO4J_USER"] = "neo4j" + self.config["NEO4J_PASSWORD"] = neo4j_password + self.console.print("[green][SUCCESS][/green] Neo4j credentials configured") + def setup_obsidian(self): - """Configure Obsidian/Neo4j integration""" - # Check if enabled via command line + """Configure Obsidian integration (optional feature flag only - Neo4j credentials handled by setup_neo4j)""" if hasattr(self.args, 'enable_obsidian') and self.args.enable_obsidian: enable_obsidian = True - neo4j_password = getattr(self.args, 'neo4j_password', None) - - if not neo4j_password: - self.console.print("[yellow][WARNING][/yellow] --enable-obsidian provided but no password") - neo4j_password = self.prompt_password("Neo4j password (min 8 chars)") - - self.console.print(f"[green]✅[/green] Obsidian/Neo4j: enabled (configured via wizard)") + self.console.print(f"[green]✅[/green] Obsidian: enabled (configured via wizard)") else: # Interactive prompt (fallback) self.console.print() - self.console.print("[bold cyan]Obsidian/Neo4j Integration[/bold cyan]") + self.console.print("[bold cyan]Obsidian Integration (Optional)[/bold cyan]") self.console.print("Enable graph-based knowledge management for Obsidian vault notes") self.console.print() try: - enable_obsidian = Confirm.ask("Enable Obsidian/Neo4j integration?", default=False) + enable_obsidian = Confirm.ask("Enable Obsidian integration?", default=False) except EOFError: self.console.print("Using default: No") enable_obsidian = False - if enable_obsidian: - neo4j_password = self.prompt_password("Neo4j password (min 8 chars)") - if enable_obsidian: - # Update .env with credentials only (secrets, not feature flags) - self.config["NEO4J_HOST"] = "neo4j" - self.config["NEO4J_USER"] = "neo4j" - self.config["NEO4J_PASSWORD"] = neo4j_password - - # Update config.yml with feature flag (source of truth) - auto-saves via ConfigManager self.config_manager.update_memory_config({ "obsidian": { "enabled": True, @@ -486,11 +489,8 @@ def setup_obsidian(self): "timeout": 30 } }) - - self.console.print("[green][SUCCESS][/green] Obsidian/Neo4j configured") - self.console.print("[blue][INFO][/blue] Neo4j will start automatically with --profile obsidian") + self.console.print("[green][SUCCESS][/green] Obsidian integration enabled") else: - # Explicitly disable Obsidian in config.yml when not enabled self.config_manager.update_memory_config({ "obsidian": { "enabled": False, @@ -498,52 +498,25 @@ def setup_obsidian(self): "timeout": 30 } }) - self.console.print("[blue][INFO][/blue] Obsidian/Neo4j integration disabled") + self.console.print("[blue][INFO][/blue] Obsidian integration disabled") def setup_knowledge_graph(self): - """Configure Knowledge Graph (Neo4j-based entity/relationship extraction)""" - # Check if enabled via command line + """Configure Knowledge Graph (Neo4j-based entity/relationship extraction - enabled by default)""" if hasattr(self.args, 'enable_knowledge_graph') and self.args.enable_knowledge_graph: enable_kg = True - neo4j_password = getattr(self.args, 'neo4j_password', None) - - if not neo4j_password: - # Check if already set from obsidian setup - neo4j_password = self.config.get("NEO4J_PASSWORD") - if not neo4j_password: - self.console.print("[yellow][WARNING][/yellow] --enable-knowledge-graph provided but no password") - neo4j_password = self.prompt_password("Neo4j password (min 8 chars)") else: - # Interactive prompt (fallback) self.console.print() self.console.print("[bold cyan]Knowledge Graph (Entity Extraction)[/bold cyan]") - self.console.print("Enable graph-based entity and relationship extraction from conversations") - self.console.print("Extracts: People, Places, Organizations, Events, Promises/Tasks") + self.console.print("Extract people, places, organizations, events, and tasks from conversations") self.console.print() try: - enable_kg = Confirm.ask("Enable Knowledge Graph?", default=False) + enable_kg = Confirm.ask("Enable Knowledge Graph?", default=True) except EOFError: - self.console.print("Using default: No") - enable_kg = False - - if enable_kg: - # Check if Neo4j password already set from obsidian setup - existing_password = self.config.get("NEO4J_PASSWORD") - if existing_password: - self.console.print("[blue][INFO][/blue] Using Neo4j password from Obsidian configuration") - neo4j_password = existing_password - else: - neo4j_password = self.prompt_password("Neo4j password (min 8 chars)") + self.console.print("Using default: Yes") + enable_kg = True if enable_kg: - # Update .env with credentials only (secrets, not feature flags) - self.config["NEO4J_HOST"] = "neo4j" - self.config["NEO4J_USER"] = "neo4j" - if neo4j_password: - self.config["NEO4J_PASSWORD"] = neo4j_password - - # Update config.yml with feature flag (source of truth) - auto-saves via ConfigManager self.config_manager.update_memory_config({ "knowledge_graph": { "enabled": True, @@ -551,12 +524,9 @@ def setup_knowledge_graph(self): "timeout": 30 } }) - - self.console.print("[green][SUCCESS][/green] Knowledge Graph configured") - self.console.print("[blue][INFO][/blue] Neo4j will start automatically with --profile knowledge-graph") + self.console.print("[green][SUCCESS][/green] Knowledge Graph enabled") self.console.print("[blue][INFO][/blue] Entities and relationships will be extracted from conversations") else: - # Explicitly disable Knowledge Graph in config.yml when not enabled self.config_manager.update_memory_config({ "knowledge_graph": { "enabled": False, @@ -842,25 +812,7 @@ def show_next_steps(self): config_yml = self.config_manager.get_full_config() self.console.print("1. Start the main services:") - # Include --profile obsidian/knowledge-graph if enabled (read from config.yml) - obsidian_enabled = config_yml.get("memory", {}).get("obsidian", {}).get("enabled", False) - kg_enabled = config_yml.get("memory", {}).get("knowledge_graph", {}).get("enabled", False) - - profiles = [] - profile_notes = [] - if obsidian_enabled: - profiles.append("obsidian") - profile_notes.append("Obsidian integration") - if kg_enabled: - profiles.append("knowledge-graph") - profile_notes.append("Knowledge Graph") - - if profiles: - profile_args = " ".join([f"--profile {p}" for p in profiles]) - self.console.print(f" [cyan]docker compose {profile_args} up --build -d[/cyan]") - self.console.print(f" [dim](Includes Neo4j for: {', '.join(profile_notes)})[/dim]") - else: - self.console.print(" [cyan]docker compose up --build -d[/cyan]") + self.console.print(" [cyan]docker compose up --build -d[/cyan]") self.console.print() # Auto-determine URLs for next steps @@ -908,6 +860,7 @@ def run(self): self.setup_llm() self.setup_memory() self.setup_optional_services() + self.setup_neo4j() self.setup_obsidian() self.setup_knowledge_graph() self.setup_langfuse() diff --git a/backends/advanced/pyproject.toml b/backends/advanced/pyproject.toml index 23c736d7..e0e964c0 100644 --- a/backends/advanced/pyproject.toml +++ b/backends/advanced/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "google-auth-oauthlib>=1.0.0", "google-auth-httplib2>=0.2.0", "websockets>=12.0", + "croniter>=1.3.0", ] [project.optional-dependencies] diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index 3c0417eb..6a99c841 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -221,6 +221,23 @@ async def lifespan(app: FastAPI): # Register OpenMemory user if using openmemory_mcp provider await initialize_openmemory_user() + # Start cron scheduler (requires Redis to be available) + try: + from advanced_omi_backend.cron_scheduler import get_scheduler, register_cron_job + from advanced_omi_backend.workers.finetuning_jobs import ( + run_asr_jargon_extraction_job, + run_speaker_finetuning_job, + ) + + register_cron_job("speaker_finetuning", run_speaker_finetuning_job) + register_cron_job("asr_jargon_extraction", run_asr_jargon_extraction_job) + + scheduler = get_scheduler() + await scheduler.start() + application_logger.info("Cron scheduler started") + except Exception as e: + application_logger.warning(f"Cron scheduler failed to start: {e}") + # SystemTracker is used for monitoring and debugging application_logger.info("Using SystemTracker for monitoring and debugging") @@ -319,6 +336,16 @@ async def lifespan(app: FastAPI): except Exception as e: application_logger.error(f"Error shutting down plugins: {e}") + # Shutdown cron scheduler + try: + from advanced_omi_backend.cron_scheduler import get_scheduler + + scheduler = get_scheduler() + await scheduler.stop() + application_logger.info("Cron scheduler stopped") + except Exception as e: + application_logger.error(f"Error stopping cron scheduler: {e}") + # Shutdown memory service and speaker service shutdown_memory_service() application_logger.info("Memory and speaker services shut down.") diff --git a/backends/advanced/src/advanced_omi_backend/cron_scheduler.py b/backends/advanced/src/advanced_omi_backend/cron_scheduler.py new file mode 100644 index 00000000..b79effbf --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/cron_scheduler.py @@ -0,0 +1,277 @@ +""" +Config-driven asyncio cron scheduler for Chronicle. + +Reads job definitions from config.yml ``cron_jobs`` section, uses ``croniter`` +to compute next-run times, and dispatches registered job functions. State +(last_run / next_run) is persisted in Redis so it survives restarts. + +Usage: + scheduler = get_scheduler() + await scheduler.start() # call during FastAPI lifespan startup + await scheduler.stop() # call during shutdown +""" + +import asyncio +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Callable, Coroutine, Dict, List, Optional + +import redis.asyncio as aioredis +from croniter import croniter + +from advanced_omi_backend.config_loader import load_config, save_config_section + +logger = logging.getLogger(__name__) + +# Redis key prefixes +_LAST_RUN_KEY = "cron:last_run:{job_id}" +_NEXT_RUN_KEY = "cron:next_run:{job_id}" + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + +@dataclass +class CronJobConfig: + job_id: str + enabled: bool + schedule: str + description: str + next_run: Optional[datetime] = None + last_run: Optional[datetime] = None + running: bool = False + last_error: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Job registry – maps job_id → async callable +# --------------------------------------------------------------------------- + +JobFunc = Callable[[], Coroutine[Any, Any, dict]] + +_JOB_REGISTRY: Dict[str, JobFunc] = {} + + +def register_cron_job(job_id: str, func: JobFunc) -> None: + """Register a job function so the scheduler can dispatch it.""" + _JOB_REGISTRY[job_id] = func + + +def _get_job_func(job_id: str) -> Optional[JobFunc]: + return _JOB_REGISTRY.get(job_id) + + +# --------------------------------------------------------------------------- +# Scheduler +# --------------------------------------------------------------------------- + +class CronScheduler: + def __init__(self) -> None: + self.jobs: Dict[str, CronJobConfig] = {} + self._running = False + self._task: Optional[asyncio.Task] = None + self._redis: Optional[aioredis.Redis] = None + + # -- lifecycle ----------------------------------------------------------- + + async def start(self) -> None: + """Load config, restore state from Redis, and start the scheduler loop.""" + import os + redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") + self._redis = aioredis.from_url(redis_url, decode_responses=True) + + self._load_jobs_from_config() + await self._restore_state() + + self._running = True + self._task = asyncio.create_task(self._loop()) + logger.info(f"Cron scheduler started with {len(self.jobs)} jobs") + + async def stop(self) -> None: + """Cancel the scheduler loop and close Redis.""" + self._running = False + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + if self._redis: + await self._redis.close() + logger.info("Cron scheduler stopped") + + # -- public API ---------------------------------------------------------- + + async def run_job_now(self, job_id: str) -> dict: + """Manually trigger a job regardless of schedule.""" + if job_id not in self.jobs: + raise ValueError(f"Unknown cron job: {job_id}") + return await self._execute_job(job_id) + + async def update_job( + self, + job_id: str, + enabled: Optional[bool] = None, + schedule: Optional[str] = None, + ) -> None: + """Update a job's config and persist to config.yml.""" + if job_id not in self.jobs: + raise ValueError(f"Unknown cron job: {job_id}") + + cfg = self.jobs[job_id] + + if schedule is not None: + # Validate cron expression + if not croniter.is_valid(schedule): + raise ValueError(f"Invalid cron expression: {schedule}") + cfg.schedule = schedule + cfg.next_run = croniter(schedule, datetime.now(timezone.utc)).get_next(datetime) + + if enabled is not None: + cfg.enabled = enabled + + # Persist changes to config.yml + save_config_section( + f"cron_jobs.{job_id}", + {"enabled": cfg.enabled, "schedule": cfg.schedule, "description": cfg.description}, + ) + + # Update next_run in Redis + if self._redis and cfg.next_run: + await self._redis.set( + _NEXT_RUN_KEY.format(job_id=job_id), + cfg.next_run.isoformat(), + ) + + logger.info(f"Updated cron job '{job_id}': enabled={cfg.enabled}, schedule={cfg.schedule}") + + async def get_all_jobs_status(self) -> List[dict]: + """Return status of all registered cron jobs.""" + result = [] + for job_id, cfg in self.jobs.items(): + result.append({ + "job_id": job_id, + "enabled": cfg.enabled, + "schedule": cfg.schedule, + "description": cfg.description, + "last_run": cfg.last_run.isoformat() if cfg.last_run else None, + "next_run": cfg.next_run.isoformat() if cfg.next_run else None, + "running": cfg.running, + "last_error": cfg.last_error, + }) + return result + + # -- internals ----------------------------------------------------------- + + def _load_jobs_from_config(self) -> None: + """Read cron_jobs section from config.yml.""" + cfg = load_config() + cron_section = cfg.get("cron_jobs", {}) + + for job_id, job_cfg in cron_section.items(): + schedule = str(job_cfg.get("schedule", "0 * * * *")) + now = datetime.now(timezone.utc) + self.jobs[job_id] = CronJobConfig( + job_id=job_id, + enabled=bool(job_cfg.get("enabled", False)), + schedule=schedule, + description=str(job_cfg.get("description", "")), + next_run=croniter(schedule, now).get_next(datetime), + ) + + async def _restore_state(self) -> None: + """Restore last_run / next_run from Redis.""" + if not self._redis: + return + for job_id, cfg in self.jobs.items(): + try: + lr = await self._redis.get(_LAST_RUN_KEY.format(job_id=job_id)) + if lr: + cfg.last_run = datetime.fromisoformat(lr) + nr = await self._redis.get(_NEXT_RUN_KEY.format(job_id=job_id)) + if nr: + cfg.next_run = datetime.fromisoformat(nr) + except Exception as e: + logger.warning(f"Failed to restore state for job '{job_id}': {e}") + + async def _persist_state(self, job_id: str) -> None: + """Write last_run / next_run to Redis.""" + if not self._redis: + return + cfg = self.jobs[job_id] + try: + if cfg.last_run: + await self._redis.set( + _LAST_RUN_KEY.format(job_id=job_id), + cfg.last_run.isoformat(), + ) + if cfg.next_run: + await self._redis.set( + _NEXT_RUN_KEY.format(job_id=job_id), + cfg.next_run.isoformat(), + ) + except Exception as e: + logger.warning(f"Failed to persist state for job '{job_id}': {e}") + + async def _execute_job(self, job_id: str) -> dict: + """Run the job function and update state.""" + cfg = self.jobs[job_id] + func = _get_job_func(job_id) + if func is None: + msg = f"No function registered for cron job '{job_id}'" + logger.error(msg) + cfg.last_error = msg + return {"error": msg} + + cfg.running = True + cfg.last_error = None + now = datetime.now(timezone.utc) + logger.info(f"Executing cron job '{job_id}'") + + try: + result = await func() + cfg.last_run = now + cfg.next_run = croniter(cfg.schedule, now).get_next(datetime) + await self._persist_state(job_id) + logger.info(f"Cron job '{job_id}' completed: {result}") + return result or {} + except Exception as e: + cfg.last_error = str(e) + logger.error(f"Cron job '{job_id}' failed: {e}", exc_info=True) + # Still advance next_run so we don't spin on failures + cfg.last_run = now + cfg.next_run = croniter(cfg.schedule, now).get_next(datetime) + await self._persist_state(job_id) + return {"error": str(e)} + finally: + cfg.running = False + + async def _loop(self) -> None: + """Main scheduler loop – checks every 30s for due jobs.""" + while self._running: + try: + now = datetime.now(timezone.utc) + for job_id, cfg in self.jobs.items(): + if not cfg.enabled or cfg.running: + continue + if cfg.next_run and now >= cfg.next_run: + asyncio.create_task(self._execute_job(job_id)) + except Exception as e: + logger.error(f"Error in cron scheduler loop: {e}", exc_info=True) + await asyncio.sleep(30) + + +# --------------------------------------------------------------------------- +# Singleton +# --------------------------------------------------------------------------- + +_scheduler: Optional[CronScheduler] = None + + +def get_scheduler() -> CronScheduler: + """Get (or create) the global CronScheduler singleton.""" + global _scheduler + if _scheduler is None: + _scheduler = CronScheduler() + return _scheduler diff --git a/backends/advanced/src/advanced_omi_backend/models/annotation.py b/backends/advanced/src/advanced_omi_backend/models/annotation.py index ac8ceefe..8eecf81a 100644 --- a/backends/advanced/src/advanced_omi_backend/models/annotation.py +++ b/backends/advanced/src/advanced_omi_backend/models/annotation.py @@ -19,6 +19,7 @@ class AnnotationType(str, Enum): MEMORY = "memory" TRANSCRIPT = "transcript" DIARIZATION = "diarization" # Speaker identification corrections + ENTITY = "entity" # Knowledge graph entity corrections (name/details edits) class AnnotationSource(str, Enum): @@ -70,6 +71,12 @@ class Annotation(Document): corrected_speaker: Optional[str] = None # Speaker label after correction segment_start_time: Optional[float] = None # Time offset for reference + # For ENTITY annotations: + # Dual purpose: feeds both the jargon pipeline (entity name corrections = domain vocabulary + # the ASR should know) and the entity extraction pipeline (corrections improve future accuracy). + entity_id: Optional[str] = None # Neo4j entity ID + entity_field: Optional[str] = None # Which field was changed ("name" or "details") + # Processed tracking (applies to ALL annotation types) processed: bool = Field(default=False) # Whether annotation has been applied/sent to training processed_at: Optional[datetime] = None # When annotation was processed @@ -88,11 +95,12 @@ class Settings: # Create indexes on commonly queried fields # Note: Enum fields and Optional fields don't use Indexed() wrapper indexes = [ - "annotation_type", # Query by type (memory vs transcript vs diarization) + "annotation_type", # Query by type (memory vs transcript vs diarization vs entity) "user_id", # User-scoped queries "status", # Filter by status (pending/accepted/rejected) "memory_id", # Lookup annotations for specific memory "conversation_id", # Lookup annotations for specific conversation + "entity_id", # Lookup annotations for specific entity "processed", # Query unprocessed annotations ] @@ -108,6 +116,10 @@ def is_diarization_annotation(self) -> bool: """Check if this is a diarization annotation.""" return self.annotation_type == AnnotationType.DIARIZATION + def is_entity_annotation(self) -> bool: + """Check if this is an entity annotation.""" + return self.annotation_type == AnnotationType.ENTITY + def is_pending_suggestion(self) -> bool: """Check if this is a pending AI suggestion.""" return ( @@ -151,6 +163,18 @@ class DiarizationAnnotationCreate(BaseModel): status: AnnotationStatus = AnnotationStatus.ACCEPTED +class EntityAnnotationCreate(BaseModel): + """Create entity annotation request. + + Dual purpose: feeds both the jargon pipeline (entity name corrections = domain vocabulary + the ASR should know) and the entity extraction pipeline (corrections improve future accuracy). + """ + entity_id: str + entity_field: str # "name" or "details" + original_text: str + corrected_text: str + + class AnnotationResponse(BaseModel): """Annotation response for API.""" id: str @@ -164,6 +188,8 @@ class AnnotationResponse(BaseModel): original_speaker: Optional[str] = None corrected_speaker: Optional[str] = None segment_start_time: Optional[float] = None + entity_id: Optional[str] = None + entity_field: Optional[str] = None processed: bool = False processed_at: Optional[datetime] = None processed_by: Optional[str] = None diff --git a/backends/advanced/src/advanced_omi_backend/prompt_defaults.py b/backends/advanced/src/advanced_omi_backend/prompt_defaults.py index eca71cfc..44ade563 100644 --- a/backends/advanced/src/advanced_omi_backend/prompt_defaults.py +++ b/backends/advanced/src/advanced_omi_backend/prompt_defaults.py @@ -486,6 +486,42 @@ def register_all_defaults(registry: PromptRegistry) -> None: category="knowledge_graph", ) + # ------------------------------------------------------------------ + # asr.hot_words + # ------------------------------------------------------------------ + registry.register_default( + "asr.hot_words", + template="hey vivi, chronicle, omi", + name="ASR Hot Words", + description="Comma-separated hot words for speech recognition. " + "For Deepgram: boosts keyword recognition via keyterm. " + "For VibeVoice: passed as context_info to guide the LLM backbone. " + "Supports names, technical terms, and domain-specific vocabulary.", + category="asr", + ) + + # ------------------------------------------------------------------ + # asr.jargon_extraction + # ------------------------------------------------------------------ + registry.register_default( + "asr.jargon_extraction", + template="""\ +Extract up to 20 key jargon terms, names, and technical vocabulary from these memory facts. +Return ONLY a comma-separated list of words or short phrases (1-3 words each). +Focus on: proper nouns, technical terms, domain-specific vocabulary, names of people/places/products. +Skip generic everyday words. + +Memory facts: +{{memories}} + +Jargon:""", + name="ASR Jargon Extraction", + description="Extracts key jargon terms from user memories for ASR context boosting.", + category="asr", + variables=["memories"], + is_dynamic=True, + ) + # ------------------------------------------------------------------ # transcription.title_summary # ------------------------------------------------------------------ diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py index f85a99ed..c4e49ce1 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py @@ -19,10 +19,12 @@ AnnotationStatus, AnnotationType, DiarizationAnnotationCreate, + EntityAnnotationCreate, MemoryAnnotationCreate, TranscriptAnnotationCreate, ) from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.services.knowledge_graph import get_knowledge_graph_service from advanced_omi_backend.services.memory import get_memory_service from advanced_omi_backend.users import User @@ -266,6 +268,25 @@ async def update_annotation_status( except Exception as e: logger.error(f"Error applying transcript suggestion: {e}") # Don't fail the status update if segment update fails + elif annotation.is_entity_annotation(): + # Update entity in Neo4j + try: + kg_service = get_knowledge_graph_service() + update_kwargs = {} + if annotation.entity_field == "name": + update_kwargs["name"] = annotation.corrected_text + elif annotation.entity_field == "details": + update_kwargs["details"] = annotation.corrected_text + if update_kwargs: + await kg_service.update_entity( + entity_id=annotation.entity_id, + user_id=annotation.user_id, + **update_kwargs, + ) + logger.info(f"Applied entity suggestion to entity {annotation.entity_id}") + except Exception as e: + logger.error(f"Error applying entity suggestion: {e}") + # Don't fail the status update if entity update fails await annotation.save() logger.info(f"Updated annotation {annotation_id} status to {status}") @@ -282,6 +303,113 @@ async def update_annotation_status( ) +# === Entity Annotation Routes === + + +@router.post("/entity", response_model=AnnotationResponse) +async def create_entity_annotation( + annotation_data: EntityAnnotationCreate, + current_user: User = Depends(current_active_user), +): + """ + Create annotation for entity edit (name or details correction). + + - Validates user owns the entity + - Creates annotation record for jargon/finetuning pipeline + - Applies correction to Neo4j immediately + - Marked as processed=False for downstream cron consumption + + Dual purpose: entity name corrections feed both the jargon pipeline + (domain vocabulary for ASR) and the entity extraction pipeline + (improving future extraction accuracy). + """ + try: + # Validate entity_field + if annotation_data.entity_field not in ("name", "details"): + raise HTTPException( + status_code=400, + detail="entity_field must be 'name' or 'details'", + ) + + # Verify entity exists and belongs to user + kg_service = get_knowledge_graph_service() + entity = await kg_service.get_entity( + entity_id=annotation_data.entity_id, + user_id=current_user.user_id, + ) + if not entity: + raise HTTPException(status_code=404, detail="Entity not found") + + # Create annotation + annotation = Annotation( + annotation_type=AnnotationType.ENTITY, + user_id=current_user.user_id, + entity_id=annotation_data.entity_id, + entity_field=annotation_data.entity_field, + original_text=annotation_data.original_text, + corrected_text=annotation_data.corrected_text, + status=AnnotationStatus.ACCEPTED, + processed=False, # Unprocessed — jargon/finetuning cron will consume later + ) + await annotation.save() + logger.info( + f"Created entity annotation {annotation.id} for entity {annotation_data.entity_id} " + f"field={annotation_data.entity_field}" + ) + + # Apply correction to Neo4j immediately + try: + update_kwargs = {} + if annotation_data.entity_field == "name": + update_kwargs["name"] = annotation_data.corrected_text + elif annotation_data.entity_field == "details": + update_kwargs["details"] = annotation_data.corrected_text + + await kg_service.update_entity( + entity_id=annotation_data.entity_id, + user_id=current_user.user_id, + **update_kwargs, + ) + logger.info(f"Applied entity correction to Neo4j for entity {annotation_data.entity_id}") + except Exception as e: + logger.error(f"Error applying entity correction to Neo4j: {e}") + # Annotation is saved but Neo4j update failed — log but don't fail the request + + return AnnotationResponse.model_validate(annotation) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating entity annotation: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to create entity annotation: {str(e)}", + ) + + +@router.get("/entity/{entity_id}", response_model=List[AnnotationResponse]) +async def get_entity_annotations( + entity_id: str, + current_user: User = Depends(current_active_user), +): + """Get all annotations for an entity.""" + try: + annotations = await Annotation.find( + Annotation.annotation_type == AnnotationType.ENTITY, + Annotation.entity_id == entity_id, + Annotation.user_id == current_user.user_id, + ).to_list() + + return [AnnotationResponse.model_validate(a) for a in annotations] + + except Exception as e: + logger.error(f"Error fetching entity annotations: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to fetch entity annotations: {str(e)}", + ) + + # === Diarization Annotation Routes === diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/finetuning_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/finetuning_routes.py index f3792e0b..72c271b6 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/finetuning_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/finetuning_routes.py @@ -1,7 +1,8 @@ """ Fine-tuning routes for Chronicle API. -Handles sending annotation corrections to speaker recognition service for training. +Handles sending annotation corrections to speaker recognition service for training +and cron job management for automated tasks. """ import logging @@ -10,6 +11,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import JSONResponse +from pydantic import BaseModel from advanced_omi_backend.auth import current_active_user from advanced_omi_backend.models.annotation import Annotation, AnnotationType @@ -56,7 +58,7 @@ async def process_annotations_for_training( # Filter out already trained annotations (processed_by contains "training") ready_for_training = [ a for a in annotations - if a.processed_by and "training" not in a.processed_by + if not a.processed_by or "training" not in a.processed_by ] if not ready_for_training: @@ -96,16 +98,14 @@ async def process_annotations_for_training( conversation = await Conversation.find_one( Conversation.conversation_id == annotation.conversation_id ) - + if not conversation or not conversation.active_transcript: - logger.warning(f"Conversation {annotation.conversation_id} not found or has no transcript") failed_count += 1 errors.append(f"Conversation {annotation.conversation_id[:8]} not found") continue # Validate segment index if annotation.segment_index >= len(conversation.active_transcript.segments): - logger.warning(f"Invalid segment index {annotation.segment_index} for conversation {annotation.conversation_id}") failed_count += 1 errors.append(f"Invalid segment index {annotation.segment_index}") continue @@ -198,7 +198,7 @@ async def process_annotations_for_training( "appended_to_existing": appended_count, "total_processed": total_processed, "failed_count": failed_count, - "errors": errors[:10] if errors else [], # Limit error list + "errors": errors[:10] if errors else [], "status": "success" if total_processed > 0 else "partial_failure" }) @@ -227,48 +227,111 @@ async def get_finetuning_status( - cron_status: Cron job schedule and last run info """ try: - # Count annotations by status - pending_count = await Annotation.find( - Annotation.annotation_type == AnnotationType.DIARIZATION, - Annotation.processed == False, - ).count() - - # Get all processed annotations - all_processed = await Annotation.find( - Annotation.annotation_type == AnnotationType.DIARIZATION, - Annotation.processed == True, - ).to_list() - - # Split into trained vs not-yet-trained - trained_annotations = [ - a for a in all_processed - if a.processed_by and "training" in a.processed_by - ] - applied_not_trained = [ - a for a in all_processed - if not a.processed_by or "training" not in a.processed_by - ] - - applied_count = len(applied_not_trained) - trained_count = len(trained_annotations) + # ------------------------------------------------------------------ + # Per-type annotation counts (with orphan detection) + # ------------------------------------------------------------------ + from advanced_omi_backend.models.conversation import Conversation - # Get last training run timestamp + annotation_counts: dict[str, dict] = {} + trained_diarization_list: list = [] + + # Collect all annotations to batch-check for orphans + all_annotations_by_type: dict[AnnotationType, list] = {} + for ann_type in AnnotationType: + all_annotations_by_type[ann_type] = await Annotation.find( + Annotation.annotation_type == ann_type, + ).to_list() + + # Batch-check which conversation_ids still exist + conv_annotation_types = {AnnotationType.DIARIZATION, AnnotationType.TRANSCRIPT} + all_conv_ids: set[str] = set() + for ann_type in conv_annotation_types: + for a in all_annotations_by_type.get(ann_type, []): + if a.conversation_id: + all_conv_ids.add(a.conversation_id) + + existing_conv_ids: set[str] = set() + if all_conv_ids: + existing_convs = await Conversation.find( + {"conversation_id": {"$in": list(all_conv_ids)}}, + ).to_list() + existing_conv_ids = {c.conversation_id for c in existing_convs} + + orphaned_conv_ids = all_conv_ids - existing_conv_ids + + total_orphaned = 0 + for ann_type in AnnotationType: + annotations = all_annotations_by_type[ann_type] + + # Identify orphaned annotations for conversation-based types + if ann_type in conv_annotation_types: + orphaned = [a for a in annotations if a.conversation_id in orphaned_conv_ids] + non_orphaned = [a for a in annotations if a.conversation_id not in orphaned_conv_ids] + else: + # Memory/entity orphan detection is placeholder for now + orphaned = [] + non_orphaned = annotations + + pending = [a for a in non_orphaned if not a.processed] + processed = [a for a in non_orphaned if a.processed] + trained = [a for a in processed if a.processed_by and "training" in a.processed_by] + applied_not_trained = [ + a for a in processed + if not a.processed_by or "training" not in a.processed_by + ] + + orphan_count = len(orphaned) + total_orphaned += orphan_count + + annotation_counts[ann_type.value] = { + "total": len(non_orphaned), + "pending": len(pending), + "applied": len(applied_not_trained), + "trained": len(trained), + "orphaned": orphan_count, + } + + if ann_type == AnnotationType.DIARIZATION: + trained_diarization_list = trained + + # ------------------------------------------------------------------ + # Diarization-specific fields (backward compat) + # ------------------------------------------------------------------ + diarization = annotation_counts.get("diarization", {}) + pending_count = diarization.get("pending", 0) + applied_count = diarization.get("applied", 0) + trained_count = diarization.get("trained", 0) + + # Get last training run timestamp from diarization annotations last_training_run = None - if trained_annotations: - # Find most recent trained annotation + if trained_diarization_list: latest_trained = max( - trained_annotations, + trained_diarization_list, key=lambda a: a.updated_at if a.updated_at else datetime.min.replace(tzinfo=timezone.utc) ) last_training_run = latest_trained.updated_at.isoformat() if latest_trained.updated_at else None - # TODO: Get cron job status from scheduler - cron_status = { - "enabled": False, # Placeholder - "schedule": "0 2 * * *", # Example: daily at 2 AM - "last_run": None, - "next_run": None, - } + # Get cron job status from scheduler + try: + from advanced_omi_backend.cron_scheduler import get_scheduler + + scheduler = get_scheduler() + all_jobs = await scheduler.get_all_jobs_status() + # Find speaker finetuning job for backward compat + speaker_job = next((j for j in all_jobs if j["job_id"] == "speaker_finetuning"), None) + cron_status = { + "enabled": speaker_job["enabled"] if speaker_job else False, + "schedule": speaker_job["schedule"] if speaker_job else "0 2 * * *", + "last_run": speaker_job["last_run"] if speaker_job else None, + "next_run": speaker_job["next_run"] if speaker_job else None, + } + except Exception: + cron_status = { + "enabled": False, + "schedule": "0 2 * * *", + "last_run": None, + "next_run": None, + } return JSONResponse(content={ "pending_annotation_count": pending_count, @@ -276,6 +339,8 @@ async def get_finetuning_status( "trained_annotation_count": trained_count, "last_training_run": last_training_run, "cron_status": cron_status, + "annotation_counts": annotation_counts, + "orphaned_annotation_count": total_orphaned, }) except Exception as e: @@ -284,3 +349,154 @@ async def get_finetuning_status( status_code=500, detail=f"Failed to fetch fine-tuning status: {str(e)}", ) + + +# --------------------------------------------------------------------------- +# Orphaned Annotation Management Endpoints +# --------------------------------------------------------------------------- + + +@router.delete("/orphaned-annotations") +async def delete_orphaned_annotations( + current_user: User = Depends(current_active_user), + annotation_type: Optional[str] = Query(None, description="Filter by annotation type (e.g. 'diarization')"), +): + """ + Find and delete orphaned annotations whose referenced conversation no longer exists. + + Only handles conversation-based annotation types (diarization, transcript). + """ + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + from advanced_omi_backend.models.conversation import Conversation + + conv_annotation_types = {AnnotationType.DIARIZATION, AnnotationType.TRANSCRIPT} + + # Filter to requested type if specified + if annotation_type: + try: + requested_type = AnnotationType(annotation_type) + except ValueError: + raise HTTPException(status_code=400, detail=f"Unknown annotation type: {annotation_type}") + if requested_type not in conv_annotation_types: + return JSONResponse(content={"deleted_count": 0, "by_type": {}, "message": "Orphan detection not supported for this type"}) + types_to_check = {requested_type} + else: + types_to_check = conv_annotation_types + + # Collect all conversation_ids referenced by these annotation types + all_conv_ids: set[str] = set() + annotations_by_type: dict[AnnotationType, list] = {} + for ann_type in types_to_check: + annotations = await Annotation.find( + Annotation.annotation_type == ann_type, + ).to_list() + annotations_by_type[ann_type] = annotations + for a in annotations: + if a.conversation_id: + all_conv_ids.add(a.conversation_id) + + if not all_conv_ids: + return JSONResponse(content={"deleted_count": 0, "by_type": {}}) + + # Batch-check which conversations still exist + existing_convs = await Conversation.find( + {"conversation_id": {"$in": list(all_conv_ids)}}, + ).to_list() + existing_conv_ids = {c.conversation_id for c in existing_convs} + orphaned_conv_ids = all_conv_ids - existing_conv_ids + + if not orphaned_conv_ids: + return JSONResponse(content={"deleted_count": 0, "by_type": {}}) + + # Delete orphaned annotations + deleted_by_type: dict[str, int] = {} + total_deleted = 0 + for ann_type, annotations in annotations_by_type.items(): + orphaned = [a for a in annotations if a.conversation_id in orphaned_conv_ids] + for a in orphaned: + await a.delete() + if orphaned: + deleted_by_type[ann_type.value] = len(orphaned) + total_deleted += len(orphaned) + + logger.info(f"Deleted {total_deleted} orphaned annotations: {deleted_by_type}") + return JSONResponse(content={ + "deleted_count": total_deleted, + "by_type": deleted_by_type, + }) + + +@router.post("/orphaned-annotations/reattach") +async def reattach_orphaned_annotations( + current_user: User = Depends(current_active_user), +): + """Placeholder for reattaching orphaned annotations to a different conversation.""" + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + raise HTTPException(status_code=501, detail="Reattach functionality coming soon") + + +# --------------------------------------------------------------------------- +# Cron Job Management Endpoints +# --------------------------------------------------------------------------- + + +class CronJobUpdate(BaseModel): + enabled: Optional[bool] = None + schedule: Optional[str] = None + + +@router.get("/cron-jobs") +async def get_cron_jobs(current_user: User = Depends(current_active_user)): + """List all cron jobs with status, schedule, last/next run.""" + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + from advanced_omi_backend.cron_scheduler import get_scheduler + + scheduler = get_scheduler() + return await scheduler.get_all_jobs_status() + + +@router.put("/cron-jobs/{job_id}") +async def update_cron_job( + job_id: str, + body: CronJobUpdate, + current_user: User = Depends(current_active_user), +): + """Update a cron job's schedule or enabled state.""" + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + from advanced_omi_backend.cron_scheduler import get_scheduler + + scheduler = get_scheduler() + try: + await scheduler.update_job(job_id, enabled=body.enabled, schedule=body.schedule) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + return {"message": f"Job '{job_id}' updated", "job_id": job_id} + + +@router.post("/cron-jobs/{job_id}/run") +async def run_cron_job_now( + job_id: str, + current_user: User = Depends(current_active_user), +): + """Manually trigger a cron job.""" + if not current_user.is_superuser: + raise HTTPException(status_code=403, detail="Admin access required") + + from advanced_omi_backend.cron_scheduler import get_scheduler + + scheduler = get_scheduler() + try: + result = await scheduler.run_job_now(job_id) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + return result diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/knowledge_graph_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/knowledge_graph_routes.py index d4680951..060b6ff7 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/knowledge_graph_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/knowledge_graph_routes.py @@ -13,6 +13,11 @@ from pydantic import BaseModel from advanced_omi_backend.auth import current_active_user +from advanced_omi_backend.models.annotation import ( + Annotation, + AnnotationStatus, + AnnotationType, +) from advanced_omi_backend.services.knowledge_graph import ( KnowledgeGraphService, PromiseStatus, @@ -30,6 +35,13 @@ # ============================================================================= +class UpdateEntityRequest(BaseModel): + """Request model for updating entity fields.""" + name: Optional[str] = None + details: Optional[str] = None + icon: Optional[str] = None + + class UpdatePromiseRequest(BaseModel): """Request model for updating promise status.""" status: str # pending, in_progress, completed, cancelled @@ -144,6 +156,78 @@ async def get_entity_relationships( ) +@router.patch("/entities/{entity_id}") +async def update_entity( + entity_id: str, + request: UpdateEntityRequest, + current_user: User = Depends(current_active_user), +): + """Update an entity's name, details, or icon. + + Also creates entity annotations as a side effect for each changed field. + These annotations feed the jargon and entity extraction pipelines. + """ + try: + if not request.name and not request.details and not request.icon: + raise HTTPException( + status_code=400, + detail="At least one field (name, details, icon) must be provided", + ) + + service = get_knowledge_graph_service() + + # Get current entity for annotation original values + existing = await service.get_entity( + entity_id=entity_id, + user_id=str(current_user.id), + ) + if not existing: + raise HTTPException(status_code=404, detail="Entity not found") + + # Apply update to Neo4j + updated = await service.update_entity( + entity_id=entity_id, + user_id=str(current_user.id), + name=request.name, + details=request.details, + icon=request.icon, + ) + if not updated: + raise HTTPException(status_code=404, detail="Entity not found") + + # Create annotations for changed text fields (name, details) + # These feed the jargon pipeline and entity extraction pipeline. + # Icon changes don't create annotations (not text corrections). + for field in ("name", "details"): + new_value = getattr(request, field) + if new_value is not None: + old_value = getattr(existing, field) or "" + annotation = Annotation( + annotation_type=AnnotationType.ENTITY, + user_id=str(current_user.id), + entity_id=entity_id, + entity_field=field, + original_text=old_value, + corrected_text=new_value, + status=AnnotationStatus.ACCEPTED, + processed=False, + ) + await annotation.save() + logger.info( + f"Created entity annotation for {field} change on entity {entity_id}" + ) + + return {"entity": updated.to_dict()} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating entity {entity_id}: {e}") + return JSONResponse( + status_code=500, + content={"message": f"Error updating entity: {str(e)}"}, + ) + + @router.delete("/entities/{entity_id}") async def delete_entity( entity_id: str, diff --git a/backends/advanced/src/advanced_omi_backend/services/knowledge_graph/service.py b/backends/advanced/src/advanced_omi_backend/services/knowledge_graph/service.py index 6562dccc..5f13508d 100644 --- a/backends/advanced/src/advanced_omi_backend/services/knowledge_graph/service.py +++ b/backends/advanced/src/advanced_omi_backend/services/knowledge_graph/service.py @@ -535,6 +535,44 @@ async def search_entities( return entities + async def update_entity( + self, + entity_id: str, + user_id: str, + name: str = None, + details: str = None, + icon: str = None, + ) -> Optional[Entity]: + """Update an entity's fields (partial update via COALESCE). + + Args: + entity_id: Entity UUID + user_id: User ID for permission check + name: New name (None keeps existing) + details: New details (None keeps existing) + icon: New icon (None keeps existing) + + Returns: + Updated Entity object or None if not found + """ + self._ensure_initialized() + + results = self._write.run( + queries.UPDATE_ENTITY, + id=entity_id, + user_id=user_id, + name=name, + details=details, + icon=icon, + metadata=None, + ) + + if not results: + return None + + entity_data = dict(results[0]["e"]) + return self._row_to_entity(entity_data) + async def delete_entity( self, entity_id: str, diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py b/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py index 9fed0126..6e49578d 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/providers/vector_stores.py @@ -19,6 +19,7 @@ FilterSelector, MatchValue, PointStruct, + Range, VectorParams, ) @@ -445,7 +446,56 @@ async def get_memory(self, memory_id: str, user_id: Optional[str] = None) -> Opt memory_logger.error(f"Qdrant get memory failed for {memory_id}: {e}") return None + async def get_recent_memories( + self, user_id: str, since_timestamp: int, limit: int = 100 + ) -> List[MemoryEntry]: + """Get memories created after a given unix timestamp for a user. + Args: + user_id: User identifier + since_timestamp: Unix timestamp; only memories at or after this time are returned + limit: Maximum number of memories to return + + Returns: + List of MemoryEntry objects + """ + try: + search_filter = Filter( + must=[ + FieldCondition( + key="metadata.user_id", + match=MatchValue(value=user_id), + ), + FieldCondition( + key="metadata.timestamp", + range=Range(gte=since_timestamp), + ), + ] + ) + + results = await self.client.scroll( + collection_name=self.collection_name, + scroll_filter=search_filter, + limit=limit, + ) + memories = [] + for point in results[0]: + memory = MemoryEntry( + id=str(point.id), + content=point.payload.get("content", ""), + metadata=point.payload.get("metadata", {}), + created_at=point.payload.get("created_at"), + updated_at=point.payload.get("updated_at"), + ) + memories.append(memory) + memory_logger.info( + f"Found {len(memories)} recent memories since {since_timestamp} for user {user_id}" + ) + return memories + + except Exception as e: + memory_logger.error(f"Qdrant get recent memories failed: {e}") + return [] diff --git a/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py b/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py index 71b213b8..9c7f1d21 100644 --- a/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/services/transcription/__init__.py @@ -17,6 +17,7 @@ from advanced_omi_backend.config_loader import get_backend_config from advanced_omi_backend.model_registry import get_models_registry +from advanced_omi_backend.prompt_registry import get_prompt_registry from .base import ( BaseTranscriptionProvider, @@ -27,6 +28,26 @@ logger = logging.getLogger(__name__) +def _parse_hot_words_to_keyterm(hot_words_str: str) -> str: + """Convert comma-separated hot words to Deepgram keyterm format. + + Input: "hey vivi, chronicle, omi" + Output: "hey vivi Hey Vivi chronicle Chronicle omi Omi" + """ + if not hot_words_str or not hot_words_str.strip(): + return "" + terms = [] + for word in hot_words_str.split(","): + word = word.strip() + if not word: + continue + terms.append(word) + capitalized = word.title() + if capitalized != word: + terms.append(capitalized) + return " ".join(terms) + + def _dotted_get(d: dict | list | None, dotted: Optional[str]): """Safely extract a value from nested dict/list using dotted paths. @@ -99,7 +120,7 @@ def get_capabilities_dict(self) -> dict: """ return {cap: True for cap in self._capabilities} - async def transcribe(self, audio_data: bytes, sample_rate: int, diarize: bool = False) -> dict: + async def transcribe(self, audio_data: bytes, sample_rate: int, diarize: bool = False, context_info: Optional[str] = None, **kwargs) -> dict: # Special handling for mock provider (no HTTP server needed) if self.model.model_provider == "mock": from .mock_provider import MockTranscriptionProvider @@ -148,14 +169,34 @@ async def transcribe(self, audio_data: bytes, sample_rate: int, diarize: bool = if "diarize" in query: query["diarize"] = "true" if diarize else "false" + # Use caller-provided context or fall back to LangFuse prompt store + if context_info: + hot_words_str = context_info + else: + hot_words_str = "" + try: + registry = get_prompt_registry() + hot_words_str = await registry.get_prompt("asr.hot_words") + except Exception as e: + logger.debug(f"Failed to fetch asr.hot_words prompt: {e}") + + # For Deepgram: inject as keyterm query param + if self.model.model_provider == "deepgram" and hot_words_str.strip(): + keyterm = _parse_hot_words_to_keyterm(hot_words_str) + if keyterm: + query["keyterm"] = keyterm + timeout = op.get("timeout", 300) try: async with httpx.AsyncClient(timeout=timeout) as client: if method == "POST": if use_multipart: - # Send as multipart file upload (for Parakeet) + # Send as multipart file upload (for Parakeet/VibeVoice) files = {"file": ("audio.wav", audio_data, "audio/wav")} - resp = await client.post(url, headers=headers, params=query, files=files) + data = {} + if hot_words_str and hot_words_str.strip(): + data["context_info"] = hot_words_str.strip() + resp = await client.post(url, headers=headers, params=query, files=files, data=data) else: # Send as raw audio data (for Deepgram) resp = await client.post(url, headers=headers, params=query, content=audio_data) @@ -240,6 +281,18 @@ async def start_stream(self, client_id: str, sample_rate: int = 16000, diarize: if diarize and "diarize" in query_dict: query_dict["diarize"] = "true" + # Inject hot words for streaming (Deepgram only) + if self.model.model_provider == "deepgram": + try: + registry = get_prompt_registry() + hot_words_str = await registry.get_prompt("asr.hot_words") + if hot_words_str and hot_words_str.strip(): + keyterm = _parse_hot_words_to_keyterm(hot_words_str) + if keyterm: + query_dict["keyterm"] = keyterm + except Exception as e: + logger.debug(f"Failed to fetch asr.hot_words for streaming: {e}") + # Normalize boolean values to lowercase strings (Deepgram expects "true"/"false", not "True"/"False") normalized_query = {} for k, v in query_dict.items(): diff --git a/backends/advanced/src/advanced_omi_backend/services/transcription/base.py b/backends/advanced/src/advanced_omi_backend/services/transcription/base.py index 7d0f2306..bc5cf6f7 100644 --- a/backends/advanced/src/advanced_omi_backend/services/transcription/base.py +++ b/backends/advanced/src/advanced_omi_backend/services/transcription/base.py @@ -122,12 +122,14 @@ def mode(self) -> str: return "batch" @abc.abstractmethod - async def transcribe(self, audio_data: bytes, sample_rate: int, diarize: bool = False) -> dict: + async def transcribe(self, audio_data: bytes, sample_rate: int, diarize: bool = False, context_info: Optional[str] = None, **kwargs) -> dict: """Transcribe audio data. Args: audio_data: Raw audio bytes sample_rate: Audio sample rate diarize: Whether to enable speaker diarization (provider-dependent) + context_info: Optional ASR context (hot words, jargon) to boost recognition + **kwargs: Additional parameters (e.g. Langfuse trace IDs) """ pass diff --git a/backends/advanced/src/advanced_omi_backend/services/transcription/context.py b/backends/advanced/src/advanced_omi_backend/services/transcription/context.py new file mode 100644 index 00000000..6b08daa8 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/services/transcription/context.py @@ -0,0 +1,90 @@ +""" +ASR context builder for transcription. + +Combines static hot words from the prompt registry with per-user dynamic +jargon cached in Redis by the ``asr_jargon_extraction`` cron job. +""" + +import logging +import os +from dataclasses import dataclass, field +from typing import Optional + +import redis.asyncio as aioredis + +logger = logging.getLogger(__name__) + +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") + + +@dataclass +class TranscriptionContext: + """Structured context gathered before transcription. + + Holds the individual context components so callers can inspect them + and Langfuse spans can log them as structured metadata. + """ + + hot_words: str = "" + user_jargon: str = "" + user_id: Optional[str] = None + + @property + def combined(self) -> str: + """Comma-separated string suitable for passing to ASR providers.""" + parts = [p.strip() for p in [self.hot_words, self.user_jargon] if p and p.strip()] + return ", ".join(parts) + + def to_metadata(self) -> dict: + """Return a dict suitable for Langfuse span metadata.""" + return { + "hot_words": self.hot_words[:200] if self.hot_words else "", + "user_jargon": self.user_jargon[:200] if self.user_jargon else "", + "user_id": self.user_id, + "combined_length": len(self.combined), + } + + +async def gather_transcription_context(user_id: Optional[str] = None) -> TranscriptionContext: + """Build structured transcription context: static hot words + cached user jargon. + + Args: + user_id: If provided, also look up per-user jargon from Redis. + + Returns: + TranscriptionContext with individual components. + """ + from advanced_omi_backend.prompt_registry import get_prompt_registry + + registry = get_prompt_registry() + hot_words = await registry.get_prompt("asr.hot_words") + + user_jargon = "" + if user_id: + try: + redis_client = aioredis.from_url(REDIS_URL, decode_responses=True) + try: + user_jargon = await redis_client.get(f"asr:jargon:{user_id}") or "" + finally: + await redis_client.close() + except Exception: + pass # Redis unavailable → skip dynamic jargon + + return TranscriptionContext( + hot_words=hot_words or "", + user_jargon=user_jargon, + user_id=user_id, + ) + + +async def get_asr_context(user_id: Optional[str] = None) -> str: + """Build combined ASR context string (backward-compatible alias). + + Args: + user_id: If provided, also look up per-user jargon from Redis. + + Returns: + Comma-separated string of context terms for the ASR provider. + """ + ctx = await gather_transcription_context(user_id) + return ctx.combined diff --git a/backends/advanced/src/advanced_omi_backend/workers/finetuning_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/finetuning_jobs.py new file mode 100644 index 00000000..f7f77daf --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/finetuning_jobs.py @@ -0,0 +1,235 @@ +""" +Cron job implementations for the Chronicle scheduler. + +Jobs: + - speaker_finetuning: sends applied diarization annotations to speaker service + - asr_jargon_extraction: extracts jargon from recent memories, caches in Redis +""" + +import logging +import os +import time +from datetime import datetime, timezone +from typing import Optional + +import redis.asyncio as aioredis + +from advanced_omi_backend.llm_client import async_generate +from advanced_omi_backend.prompt_registry import get_prompt_registry + +logger = logging.getLogger(__name__) + +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") + +# TTL for cached jargon: 2 hours (job runs every 30 min, so always refreshed) +JARGON_CACHE_TTL = 7200 + +# Maximum number of recent memories to pull per user +MAX_RECENT_MEMORIES = 50 + +# How far back to look for memories (24 hours in seconds) +MEMORY_LOOKBACK_SECONDS = 86400 + + +# --------------------------------------------------------------------------- +# Job 1: Speaker Fine-tuning +# --------------------------------------------------------------------------- + +async def run_speaker_finetuning_job() -> dict: + """Process applied diarization annotations and send to speaker recognition service. + + This mirrors the logic in ``finetuning_routes.process_annotations_for_training`` + but is invocable from the cron scheduler without an HTTP request. + """ + from advanced_omi_backend.models.annotation import Annotation, AnnotationType + from advanced_omi_backend.models.conversation import Conversation + from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient + from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_audio_segment + + # Find annotations ready for training + annotations = await Annotation.find( + Annotation.annotation_type == AnnotationType.DIARIZATION, + Annotation.processed == True, + ).to_list() + + ready_for_training = [ + a for a in annotations if not a.processed_by or "training" not in a.processed_by + ] + + if not ready_for_training: + logger.info("Speaker finetuning: no annotations ready for training") + return {"processed": 0, "message": "No annotations ready for training"} + + speaker_client = SpeakerRecognitionClient() + if not speaker_client.enabled: + logger.warning("Speaker finetuning: speaker recognition service is not enabled") + return {"processed": 0, "message": "Speaker recognition service not enabled"} + + enrolled = 0 + appended = 0 + failed = 0 + cleaned = 0 + + for annotation in ready_for_training: + try: + conversation = await Conversation.find_one( + Conversation.conversation_id == annotation.conversation_id + ) + if not conversation or not conversation.active_transcript: + logger.warning( + f"Conversation {annotation.conversation_id} not found — " + f"deleting orphaned annotation {annotation.id}" + ) + await annotation.delete() + cleaned += 1 + continue + + if annotation.segment_index >= len(conversation.active_transcript.segments): + logger.warning( + f"Invalid segment index {annotation.segment_index} for " + f"conversation {annotation.conversation_id} — " + f"deleting orphaned annotation {annotation.id}" + ) + await annotation.delete() + cleaned += 1 + continue + + segment = conversation.active_transcript.segments[annotation.segment_index] + + wav_bytes = await reconstruct_audio_segment( + conversation_id=annotation.conversation_id, + start_time=segment.start, + end_time=segment.end, + ) + if not wav_bytes: + failed += 1 + continue + + existing_speaker = await speaker_client.get_speaker_by_name( + speaker_name=annotation.corrected_speaker, + user_id=1, + ) + + if existing_speaker: + result = await speaker_client.append_to_speaker( + speaker_id=existing_speaker["id"], audio_data=wav_bytes + ) + if "error" in result: + failed += 1 + continue + appended += 1 + else: + result = await speaker_client.enroll_new_speaker( + speaker_name=annotation.corrected_speaker, + audio_data=wav_bytes, + user_id=1, + ) + if "error" in result: + failed += 1 + continue + enrolled += 1 + + # Mark as trained + annotation.processed_by = ( + f"{annotation.processed_by},training" if annotation.processed_by else "training" + ) + annotation.updated_at = datetime.now(timezone.utc) + await annotation.save() + + except Exception as e: + logger.error(f"Speaker finetuning: error processing annotation {annotation.id}: {e}") + failed += 1 + + total = enrolled + appended + logger.info( + f"Speaker finetuning complete: {total} processed " + f"({enrolled} new, {appended} appended, {failed} failed, {cleaned} orphaned cleaned)" + ) + return {"enrolled": enrolled, "appended": appended, "failed": failed, "cleaned": cleaned, "processed": total} + + +# --------------------------------------------------------------------------- +# Job 2: ASR Jargon Extraction +# --------------------------------------------------------------------------- + +async def run_asr_jargon_extraction_job() -> dict: + """Extract jargon from recent memories for all users and cache in Redis.""" + from advanced_omi_backend.models.user import User + + users = await User.find_all().to_list() + processed = 0 + skipped = 0 + errors = 0 + + redis_client = aioredis.from_url(REDIS_URL, decode_responses=True) + try: + for user in users: + user_id = str(user.id) + try: + jargon = await _extract_jargon_for_user(user_id) + if jargon: + await redis_client.set(f"asr:jargon:{user_id}", jargon, ex=JARGON_CACHE_TTL) + processed += 1 + logger.debug(f"Cached jargon for user {user_id}: {jargon[:80]}...") + else: + skipped += 1 + except Exception as e: + logger.error(f"Jargon extraction failed for user {user_id}: {e}") + errors += 1 + finally: + await redis_client.close() + + logger.info( + f"ASR jargon extraction complete: {processed} users processed, " + f"{skipped} skipped, {errors} errors" + ) + return {"users_processed": processed, "skipped": skipped, "errors": errors} + + +async def _extract_jargon_for_user(user_id: str) -> Optional[str]: + """Pull recent memories from Qdrant, call LLM to extract jargon terms. + + Returns a comma-separated string of jargon terms, or None if nothing found. + """ + from advanced_omi_backend.services.memory import get_memory_service + from advanced_omi_backend.services.memory.providers.chronicle import MemoryService + + memory_service = get_memory_service() + + # Only works with Chronicle provider (has Qdrant vector store) + if not isinstance(memory_service, MemoryService): + logger.debug("Jargon extraction requires Chronicle memory provider, skipping") + return None + + if memory_service.vector_store is None: + return None + + since_ts = int(time.time()) - MEMORY_LOOKBACK_SECONDS + + memories = await memory_service.vector_store.get_recent_memories( + user_id=user_id, + since_timestamp=since_ts, + limit=MAX_RECENT_MEMORIES, + ) + + if not memories: + return None + + # Concatenate memory content + memory_text = "\n".join(m.content for m in memories if m.content) + if not memory_text.strip(): + return None + + # Use LLM to extract jargon + registry = get_prompt_registry() + prompt_template = await registry.get_prompt("asr.jargon_extraction", memories=memory_text) + + result = await async_generate(prompt_template) + + # Clean up: strip whitespace, remove empty items + if result: + terms = [t.strip() for t in result.split(",") if t.strip()] + if terms: + return ", ".join(terms) + + return None diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index 19483f55..e2cd37e3 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -215,13 +215,25 @@ async def transcribe_full_audio_job( logger.error(f"Failed to reconstruct audio from MongoDB: {e}", exc_info=True) raise RuntimeError(f"Audio reconstruction failed: {e}") + # Build ASR context (static hot words + per-user cached jargon) + try: + from advanced_omi_backend.services.transcription.context import get_asr_context + + context_info = await get_asr_context(user_id=user_id) + except Exception as e: + logger.warning(f"Failed to build ASR context: {e}") + context_info = None + try: # Transcribe the audio directly from memory (no disk I/O needed) - transcription_result = await provider.transcribe( - audio_data=wav_data, # Pass bytes directly, already in memory - sample_rate=16000, - diarize=True, - ) + transcribe_kwargs: Dict[str, Any] = { + "audio_data": wav_data, + "sample_rate": 16000, + "diarize": True, + } + if context_info: + transcribe_kwargs["context_info"] = context_info + transcription_result = await provider.transcribe(**transcribe_kwargs) except ConnectionError as e: logger.exception(f"Transcription service unreachable for {conversation_id}") raise RuntimeError(str(e)) diff --git a/backends/advanced/webui/package-lock.json b/backends/advanced/webui/package-lock.json index ead72812..7fe0c6d6 100644 --- a/backends/advanced/webui/package-lock.json +++ b/backends/advanced/webui/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "axios": "^1.6.2", "clsx": "^2.0.0", + "cronstrue": "^2.50.0", "d3": "^7.8.5", "frappe-gantt": "^1.0.4", "lucide-react": "^0.294.0", @@ -2719,6 +2720,15 @@ "dev": true, "license": "MIT" }, + "node_modules/cronstrue": { + "version": "2.59.0", + "resolved": "https://registry.npmjs.org/cronstrue/-/cronstrue-2.59.0.tgz", + "integrity": "sha512-YKGmAy84hKH+hHIIER07VCAHf9u0Ldelx1uU6EBxsRPDXIA1m5fsKmJfyC3xBhw6cVC/1i83VdbL4PvepTrt8A==", + "license": "MIT", + "bin": { + "cronstrue": "bin/cli.js" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", diff --git a/backends/advanced/webui/package.json b/backends/advanced/webui/package.json index b933d8db..c6c1f771 100644 --- a/backends/advanced/webui/package.json +++ b/backends/advanced/webui/package.json @@ -11,6 +11,7 @@ }, "dependencies": { "axios": "^1.6.2", + "cronstrue": "^2.50.0", "clsx": "^2.0.0", "d3": "^7.8.5", "frappe-gantt": "^1.0.4", diff --git a/backends/advanced/webui/src/components/knowledge-graph/EntityCard.tsx b/backends/advanced/webui/src/components/knowledge-graph/EntityCard.tsx index 76d28cf9..bb48fef1 100644 --- a/backends/advanced/webui/src/components/knowledge-graph/EntityCard.tsx +++ b/backends/advanced/webui/src/components/knowledge-graph/EntityCard.tsx @@ -1,4 +1,6 @@ -import { User, MapPin, Building, Calendar, Package, Link2 } from 'lucide-react' +import { useState } from 'react' +import { User, MapPin, Building, Calendar, Package, Link2, Pencil, Check, X } from 'lucide-react' +import { knowledgeGraphApi } from '../../services/api' export interface Entity { id: string @@ -20,6 +22,7 @@ export interface Entity { interface EntityCardProps { entity: Entity onClick?: (entity: Entity) => void + onEntityUpdated?: (entity: Entity) => void compact?: boolean } @@ -39,7 +42,12 @@ const typeColors: Record = { thing: 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300', } -export default function EntityCard({ entity, onClick, compact = false }: EntityCardProps) { +export default function EntityCard({ entity, onClick, onEntityUpdated, compact = false }: EntityCardProps) { + const [isEditing, setIsEditing] = useState(false) + const [editName, setEditName] = useState(entity.name) + const [editDetails, setEditDetails] = useState(entity.details || '') + const [saving, setSaving] = useState(false) + const icon = typeIcons[entity.type] || const colorClass = typeColors[entity.type] || typeColors.thing @@ -52,6 +60,41 @@ export default function EntityCard({ entity, onClick, compact = false }: EntityC } } + const handleEditClick = (e: React.MouseEvent) => { + e.stopPropagation() + setEditName(entity.name) + setEditDetails(entity.details || '') + setIsEditing(true) + } + + const handleCancel = (e: React.MouseEvent) => { + e.stopPropagation() + setIsEditing(false) + } + + const handleSave = async (e: React.MouseEvent) => { + e.stopPropagation() + const updates: { name?: string; details?: string } = {} + if (editName.trim() !== entity.name) updates.name = editName.trim() + if (editDetails.trim() !== (entity.details || '')) updates.details = editDetails.trim() + + if (Object.keys(updates).length === 0) { + setIsEditing(false) + return + } + + try { + setSaving(true) + const response = await knowledgeGraphApi.updateEntity(entity.id, updates) + setIsEditing(false) + onEntityUpdated?.(response.data.entity) + } catch (err) { + console.error('Failed to update entity:', err) + } finally { + setSaving(false) + } + } + if (compact) { return (
onClick?.(entity)} - className="bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 p-4 hover:border-blue-400 dark:hover:border-blue-500 transition-colors cursor-pointer group" + onClick={() => !isEditing && onClick?.(entity)} + className={`bg-white dark:bg-gray-800 rounded-lg border border-gray-200 dark:border-gray-700 p-4 hover:border-blue-400 dark:hover:border-blue-500 transition-colors ${isEditing ? '' : 'cursor-pointer'} group`} >
-
-
+
+
{entity.icon ? ( {entity.icon} ) : ( icon )}
-
-

- {entity.name} -

+
+ {isEditing ? ( + setEditName(e.target.value)} + onClick={(e) => e.stopPropagation()} + className="w-full px-2 py-1 text-sm font-semibold border border-gray-300 dark:border-gray-600 rounded bg-white dark:bg-gray-700 text-gray-900 dark:text-gray-100 focus:outline-none focus:ring-2 focus:ring-blue-500" + autoFocus + /> + ) : ( +

+ {entity.name} +

+ )} {entity.type}
- {entity.relationship_count != null && entity.relationship_count > 0 && ( -
- - {entity.relationship_count} -
- )} +
+ {isEditing ? ( + <> + + + + ) : ( + <> + + {entity.relationship_count != null && entity.relationship_count > 0 && ( +
+ + {entity.relationship_count} +
+ )} + + )} +
- {entity.details && ( -

- {entity.details} -

+ {isEditing ? ( +