diff --git a/backends/advanced/init.py b/backends/advanced/init.py index a1448876..4ea037b2 100644 --- a/backends/advanced/init.py +++ b/backends/advanced/init.py @@ -662,9 +662,15 @@ def setup_langfuse(self): self.config["LANGFUSE_PUBLIC_KEY"] = langfuse_pub self.config["LANGFUSE_SECRET_KEY"] = langfuse_sec self.config["LANGFUSE_BASE_URL"] = langfuse_host + + # Derive browser-accessible URL for deep-links + public_url = getattr(self.args, 'langfuse_public_url', None) or "http://localhost:3002" + self._save_langfuse_public_url(public_url) + source = "external" if "langfuse-web" not in langfuse_host else "local" self.console.print(f"[green][SUCCESS][/green] LangFuse auto-configured ({source})") self.console.print(f"[blue][INFO][/blue] Host: {langfuse_host}") + self.console.print(f"[blue][INFO][/blue] Public URL: {public_url}") self.console.print(f"[blue][INFO][/blue] Public key: {self.mask_api_key(langfuse_pub)}") return @@ -710,10 +716,28 @@ def setup_langfuse(self): if secret_key: self.config["LANGFUSE_SECRET_KEY"] = secret_key + # Browser-accessible URL for deep-links (stored in config.yml, not .env) + public_url = Prompt.ask( + "LangFuse browser URL (for dashboard links)", + default="http://localhost:3002", + ) + if public_url: + self._save_langfuse_public_url(public_url) + self.console.print("[green][SUCCESS][/green] LangFuse configured") else: self.console.print("[blue][INFO][/blue] LangFuse disabled") + def _save_langfuse_public_url(self, public_url: str): + """Save the Langfuse browser-accessible URL to config.yml.""" + full_config = self.config_manager.get_full_config() + if "observability" not in full_config: + full_config["observability"] = {} + if "langfuse" not in full_config["observability"]: + full_config["observability"]["langfuse"] = {} + full_config["observability"]["langfuse"]["public_url"] = public_url + self.config_manager.save_full_config(full_config) + def setup_network(self): """Configure network settings""" self.print_section("Network Configuration") @@ -1038,6 +1062,8 @@ def main(): help="LangFuse project secret key (from langfuse init or external)") parser.add_argument("--langfuse-host", help="LangFuse host URL (default: http://langfuse-web:3000 for local)") + parser.add_argument("--langfuse-public-url", + help="LangFuse browser-accessible URL for deep-links (default: http://localhost:3002)") parser.add_argument("--streaming-provider", choices=["deepgram", "smallest", "qwen3-asr"], help="Streaming provider when different from batch (enables batch re-transcription)") diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index 263b806f..bf3ce1b1 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -283,6 +283,33 @@ async def get_auth_config(): } +async def get_observability_config(): + """Get observability configuration for frontend (Langfuse deep-links). + + Returns non-secret data only (enabled status and browser URL). + """ + from advanced_omi_backend.openai_factory import is_langfuse_enabled + + enabled = is_langfuse_enabled() + session_base_url = None + + if enabled: + from advanced_omi_backend.config_loader import load_config + + cfg = load_config() + public_url = cfg.get("observability", {}).get("langfuse", {}).get("public_url", "") + if public_url: + # Strip trailing slash and build session URL + session_base_url = f"{public_url.rstrip('/')}/project/chronicle/sessions" + + return { + "langfuse": { + "enabled": enabled, + "session_base_url": session_base_url, + } + } + + # Audio file processing functions moved to audio_controller.py diff --git a/backends/advanced/src/advanced_omi_backend/llm_client.py b/backends/advanced/src/advanced_omi_backend/llm_client.py index ca640640..96ccc77b 100644 --- a/backends/advanced/src/advanced_omi_backend/llm_client.py +++ b/backends/advanced/src/advanced_omi_backend/llm_client.py @@ -11,7 +11,7 @@ from typing import Any, Dict, Optional from advanced_omi_backend.model_registry import get_models_registry -from advanced_omi_backend.openai_factory import create_openai_client +from advanced_omi_backend.openai_factory import create_openai_client, is_langfuse_enabled from advanced_omi_backend.services.memory.config import ( load_config_yml as _load_root_config, ) @@ -78,7 +78,8 @@ def __init__( raise def generate( - self, prompt: str, model: str | None = None, temperature: float | None = None + self, prompt: str, model: str | None = None, temperature: float | None = None, + **langfuse_kwargs, ) -> str: """Generate text completion using OpenAI-compatible API.""" try: @@ -90,6 +91,8 @@ def generate( "messages": [{"role": "user", "content": prompt}], "temperature": temp, } + if is_langfuse_enabled(): + params.update(langfuse_kwargs) response = self.client.chat.completions.create(**params) return response.choices[0].message.content.strip() @@ -98,7 +101,8 @@ def generate( raise def chat_with_tools( - self, messages: list, tools: list | None = None, model: str | None = None, temperature: float | None = None + self, messages: list, tools: list | None = None, model: str | None = None, + temperature: float | None = None, **langfuse_kwargs, ): """Chat completion with tool/function calling support. Returns raw response object.""" model_name = model or self.model @@ -109,6 +113,8 @@ def chat_with_tools( } if tools: params["tools"] = tools + if is_langfuse_enabled(): + params.update(langfuse_kwargs) return self.client.chat.completions.create(**params) def health_check(self) -> Dict: @@ -190,12 +196,20 @@ def reset_llm_client(): _llm_client = None +def _langfuse_metadata(session_id: str | None) -> dict: + """Return metadata dict with langfuse_session_id if Langfuse is enabled.""" + if session_id and is_langfuse_enabled(): + return {"langfuse_session_id": session_id} + return {} + + # Async wrapper for blocking LLM operations async def async_generate( prompt: str, model: str | None = None, temperature: float | None = None, operation: str | None = None, + langfuse_session_id: str | None = None, ) -> str: """Async wrapper for LLM text generation. @@ -203,6 +217,10 @@ async def async_generate( ``llm_operations`` config section via ``get_llm_operation()``. The resolved config determines model, temperature, max_tokens, etc. Explicit ``model``/``temperature`` kwargs still override the resolved values. + + When ``langfuse_session_id`` is provided and Langfuse is enabled, + the session ID is set on the current Langfuse trace to group all + LLM calls for a conversation. """ if operation: registry = get_models_registry() @@ -210,19 +228,21 @@ async def async_generate( op = registry.get_llm_operation(operation) client = op.get_client(is_async=True) api_params = op.to_api_params() - # Allow explicit overrides if temperature is not None: api_params["temperature"] = temperature if model is not None: api_params["model"] = model api_params["messages"] = [{"role": "user", "content": prompt}] + api_params["metadata"] = _langfuse_metadata(langfuse_session_id) response = await client.chat.completions.create(**api_params) return response.choices[0].message.content.strip() # Fallback: use singleton client client = get_llm_client() loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, client.generate, prompt, model, temperature) + return await loop.run_in_executor( + None, lambda: client.generate(prompt, model, temperature) + ) async def async_chat_with_tools( @@ -231,6 +251,7 @@ async def async_chat_with_tools( model: str | None = None, temperature: float | None = None, operation: str | None = None, + langfuse_session_id: str | None = None, ): """Async wrapper for chat completion with tool calling. @@ -249,12 +270,15 @@ async def async_chat_with_tools( api_params["messages"] = messages if tools: api_params["tools"] = tools + api_params["metadata"] = _langfuse_metadata(langfuse_session_id) return await client.chat.completions.create(**api_params) # Fallback: use singleton client client = get_llm_client() loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, client.chat_with_tools, messages, tools, model, temperature) + return await loop.run_in_executor( + None, lambda: client.chat_with_tools(messages, tools, model, temperature) + ) async def async_health_check() -> Dict: diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py index 44d44c1a..277d7dc1 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py @@ -50,6 +50,16 @@ async def get_auth_config(): return await system_controller.get_auth_config() +@router.get("/observability") +async def get_observability_config(): + """Get observability configuration for frontend (Langfuse deep-links). + + Returns non-secret data: enabled status and browser-accessible session URL. + No authentication required. + """ + return await system_controller.get_observability_config() + + @router.get("/diarization-settings") async def get_diarization_settings(current_user: User = Depends(current_superuser)): """Get current diarization settings. Admin only.""" diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/base.py b/backends/advanced/src/advanced_omi_backend/services/memory/base.py index ce0fe22b..bae18e56 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/base.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/base.py @@ -342,6 +342,7 @@ class LLMProviderBase(ABC): @abstractmethod async def extract_memories( self, text: str, prompt: str, user_id: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> List[str]: """Extract meaningful fact memories from text using an LLM. @@ -349,6 +350,7 @@ async def extract_memories( text: Input text to extract memories from prompt: System prompt to guide the extraction process user_id: Optional user ID for per-user prompt override resolution + langfuse_session_id: Optional session ID for Langfuse trace grouping Returns: List of extracted fact memory strings @@ -373,6 +375,7 @@ async def propose_memory_actions( retrieved_old_memory: List[Dict[str, str]], new_facts: List[str], custom_prompt: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> Dict[str, Any]: """Propose memory management actions based on existing and new information. @@ -398,6 +401,7 @@ async def propose_reprocess_actions( diff_context: str, new_transcript: str, custom_prompt: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> Dict[str, Any]: """Propose memory updates after transcript reprocessing (e.g., speaker changes). diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py b/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py index abfb7bb5..d1f51775 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py @@ -156,6 +156,7 @@ async def add_memory( fact_memories_text = await asyncio.wait_for( self.llm_provider.extract_memories( transcript, self.config.extraction_prompt, user_id=user_id, + langfuse_session_id=source_id, ), timeout=self.config.timeout_seconds, ) @@ -193,7 +194,8 @@ async def add_memory( if allow_update and fact_memories_text: memory_logger.info(f"šŸ” Allowing update for {source_id}") created_ids = await self._process_memory_updates( - fact_memories_text, embeddings, user_id, client_id, source_id, user_email + fact_memories_text, embeddings, user_id, client_id, source_id, user_email, + langfuse_session_id=source_id, ) else: memory_logger.info(f"šŸ” Not allowing update for {source_id}") @@ -578,6 +580,7 @@ async def reprocess_memory( existing_memories=existing_memory_dicts, diff_context=diff_text, new_transcript=transcript, + langfuse_session_id=source_id, ) memory_logger.info( f"šŸ”„ Reprocess LLM returned actions: {actions_obj}" @@ -786,6 +789,7 @@ async def _process_memory_updates( client_id: str, source_id: str, user_email: str, + langfuse_session_id: Optional[str] = None, ) -> List[str]: """Process memory updates using LLM-driven action proposals. @@ -848,6 +852,7 @@ async def _process_memory_updates( retrieved_old_memory=retrieved_old_memory, new_facts=memories_text, custom_prompt=None, + langfuse_session_id=langfuse_session_id, ) memory_logger.info(f"šŸ“ UpdateMemory LLM returned: {type(actions_obj)} - {actions_obj}") except Exception as e_actions: diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/providers/llm_providers.py b/backends/advanced/src/advanced_omi_backend/services/memory/providers/llm_providers.py index 25afc38a..2d83d24c 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/providers/llm_providers.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/providers/llm_providers.py @@ -15,7 +15,7 @@ from typing import Any, Dict, List, Optional from advanced_omi_backend.model_registry import ModelDef, get_models_registry -from advanced_omi_backend.openai_factory import create_openai_client +from advanced_omi_backend.openai_factory import create_openai_client, is_langfuse_enabled from advanced_omi_backend.prompt_registry import get_prompt_registry from ..base import LLMProviderBase @@ -38,6 +38,13 @@ memory_logger = logging.getLogger("memory_service") +def _langfuse_metadata(session_id: str | None) -> dict: + """Return metadata dict with langfuse_session_id if Langfuse is enabled.""" + if session_id and is_langfuse_enabled(): + return {"langfuse_session_id": session_id} + return {} + + def _get_openai_client(api_key: str, base_url: str, is_async: bool = False): """Get OpenAI client with optional Langfuse tracing. @@ -186,6 +193,7 @@ def __init__(self, config: Dict[str, Any]): async def extract_memories( self, text: str, prompt: str, user_id: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> List[str]: """Extract memories using OpenAI API with the enhanced fact retrieval prompt. @@ -193,6 +201,7 @@ async def extract_memories( text: Input text to extract memories from prompt: System prompt to guide extraction (uses default if empty) user_id: Optional user ID for per-user prompt override resolution + langfuse_session_id: Optional session ID for Langfuse trace grouping Returns: List of extracted memory strings @@ -209,12 +218,12 @@ async def extract_memories( user_id, current_date=datetime.now().strftime("%Y-%m-%d"), ) - + # local models can only handle small chunks of input text text_chunks = chunk_text_with_spacy(text) - + # Process all chunks in sequence, not concurrently - results = [await self._process_chunk(system_prompt, chunk, i) for i, chunk in enumerate(text_chunks)] + results = [await self._process_chunk(system_prompt, chunk, i, langfuse_session_id=langfuse_session_id) for i, chunk in enumerate(text_chunks)] # Spread list of list of facts into a single list of facts cleaned_facts = [] @@ -228,23 +237,26 @@ async def extract_memories( memory_logger.error(f"OpenAI memory extraction failed: {e}") return [] - async def _process_chunk(self, system_prompt: str, chunk: str, index: int) -> List[str]: + async def _process_chunk( + self, system_prompt: str, chunk: str, index: int, + langfuse_session_id: Optional[str] = None, + ) -> List[str]: """Process a single text chunk to extract memories using OpenAI API. - + This private method handles the LLM interaction for a single chunk of text, sending it to OpenAI's chat completion API with the specified system prompt to extract structured memory facts. - + Args: - client: OpenAI async client instance for API communication system_prompt: System prompt that guides the memory extraction behavior chunk: Individual text chunk to process for memory extraction index: Index of the chunk for logging and error tracking purposes - + langfuse_session_id: Optional session ID for Langfuse trace grouping + Returns: List of extracted memory fact strings from the chunk. Returns empty list if no facts are found or if an error occurs during processing. - + Note: Errors are logged but don't propagate to avoid failing the entire memory extraction process. @@ -258,6 +270,7 @@ async def _process_chunk(self, system_prompt: str, chunk: str, index: int) -> Li {"role": "system", "content": system_prompt}, {"role": "user", "content": chunk}, ], + metadata=_langfuse_metadata(langfuse_session_id), ) facts = (response.choices[0].message.content or "").strip() if not facts: @@ -314,6 +327,7 @@ async def propose_memory_actions( retrieved_old_memory: List[Dict[str, str]] | List[str], new_facts: List[str], custom_prompt: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> Dict[str, Any]: """Use OpenAI chat completion with enhanced prompt to propose memory actions. @@ -321,6 +335,7 @@ async def propose_memory_actions( retrieved_old_memory: List of existing memories for context new_facts: List of new facts to process custom_prompt: Optional custom prompt to override default + langfuse_session_id: Optional session ID for Langfuse trace grouping Returns: Dictionary containing proposed memory actions @@ -340,6 +355,7 @@ async def propose_memory_actions( response = await client.chat.completions.create( **op.to_api_params(), messages=update_memory_messages, + metadata=_langfuse_metadata(langfuse_session_id), ) content = (response.choices[0].message.content or "").strip() if not content: @@ -365,6 +381,7 @@ async def propose_reprocess_actions( diff_context: str, new_transcript: str, custom_prompt: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> Dict[str, Any]: """Propose memory updates after speaker re-identification. @@ -427,6 +444,7 @@ async def propose_reprocess_actions( response = await client.chat.completions.create( **op.to_api_params(), messages=messages, + metadata=_langfuse_metadata(langfuse_session_id), ) content = (response.choices[0].message.content or "").strip() diff --git a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py index a199d5fa..63036ce1 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/conversation_utils.py @@ -164,6 +164,7 @@ async def generate_title_and_summary( text: str, segments: Optional[list] = None, user_id: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> tuple[str, str]: """ Generate title and short summary in a single LLM call using full conversation context. @@ -221,7 +222,7 @@ async def generate_title_and_summary( "{conversation_text}" """ - response = await async_generate(prompt, operation="title_summary") + response = await async_generate(prompt, operation="title_summary", langfuse_session_id=langfuse_session_id) # Parse response for Title: and Summary: lines title = None @@ -253,6 +254,7 @@ async def generate_detailed_summary( text: str, segments: Optional[list] = None, memory_context: Optional[str] = None, + langfuse_session_id: Optional[str] = None, ) -> str: """ Generate a comprehensive, detailed summary of the conversation. @@ -328,7 +330,7 @@ async def generate_detailed_summary( "{conversation_text}" """ - summary = await async_generate(prompt, operation="detailed_summary") + summary = await async_generate(prompt, operation="detailed_summary", langfuse_session_id=langfuse_session_id) return summary.strip().strip('"').strip("'") or "No meaningful content to summarize" except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 021bc9da..b1bbb8cd 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -1025,10 +1025,12 @@ async def generate_title_summary_job(conversation_id: str, *, redis_client=None) (title, short_summary), detailed_summary = await asyncio.gather( generate_title_and_summary( - transcript_text, segments=segments, user_id=conversation.user_id + transcript_text, segments=segments, user_id=conversation.user_id, + langfuse_session_id=conversation_id, ), generate_detailed_summary( - transcript_text, segments=segments, memory_context=memory_context + transcript_text, segments=segments, memory_context=memory_context, + langfuse_session_id=conversation_id, ), ) diff --git a/backends/advanced/webui/src/components/SpeakerNameDropdown.tsx b/backends/advanced/webui/src/components/SpeakerNameDropdown.tsx index 45b0ff07..b0553fc7 100644 --- a/backends/advanced/webui/src/components/SpeakerNameDropdown.tsx +++ b/backends/advanced/webui/src/components/SpeakerNameDropdown.tsx @@ -1,7 +1,9 @@ import { useState, useRef, useEffect } from 'react' -import { Check, Plus } from 'lucide-react' +import { Check, Plus, UserX } from 'lucide-react' import { useSortedSpeakers } from '../hooks/useSortedSpeakers' +const UNKNOWN_SPEAKER = 'Unknown Speaker' + interface SpeakerNameDropdownProps { currentSpeaker: string enrolledSpeakers: Array<{ speaker_id: string; name: string }> @@ -112,6 +114,24 @@ export default function SpeakerNameDropdown({ /> + {/* Unknown Speaker option */} + {(!searchQuery || UNKNOWN_SPEAKER.toLowerCase().includes(searchQuery.toLowerCase())) && ( +
+ +
+ )} + {/* Speaker list */}
{hasResults ? ( diff --git a/backends/advanced/webui/src/pages/ConversationDetail.tsx b/backends/advanced/webui/src/pages/ConversationDetail.tsx index bfbe0022..a7ec7fa4 100644 --- a/backends/advanced/webui/src/pages/ConversationDetail.tsx +++ b/backends/advanced/webui/src/pages/ConversationDetail.tsx @@ -4,9 +4,9 @@ import { useQueryClient } from '@tanstack/react-query' import { ArrowLeft, Calendar, User, Trash2, RefreshCw, MoreVertical, RotateCcw, Zap, Play, Pause, - Save, X, Pencil, Brain, Clock, Database, Layers, Star + Save, X, Pencil, Brain, Clock, Database, Layers, Star, BarChart3 } from 'lucide-react' -import { annotationsApi, speakerApi, BACKEND_URL } from '../services/api' +import { annotationsApi, speakerApi, systemApi, BACKEND_URL } from '../services/api' import { useConversationDetail, useConversationMemories, useDeleteConversation, useReprocessTranscript, useReprocessMemory, useReprocessSpeakers, useToggleStar @@ -88,6 +88,17 @@ export default function ConversationDetail() { // Dropdown menu state const [openDropdown, setOpenDropdown] = useState(false) + // Langfuse observability link + const [langfuseSessionUrl, setLangfuseSessionUrl] = useState(null) + useEffect(() => { + systemApi.getObservabilityConfig().then(res => { + const cfg = res.data?.langfuse + if (cfg?.enabled && cfg?.session_base_url) { + setLangfuseSessionUrl(cfg.session_base_url) + } + }).catch(() => {}) + }, []) + // Reprocessing state const [reprocessingTranscript, setReprocessingTranscript] = useState(false) const [reprocessingMemory, setReprocessingMemory] = useState(false) @@ -455,6 +466,7 @@ export default function ConversationDetail() { }) } setEnrolledSpeakers(prev => { + if (newSpeaker === 'Unknown Speaker') return prev if (prev.some(s => s.name === newSpeaker)) return prev return [...prev, { speaker_id: `temp_${Date.now()}_${newSpeaker}`, name: newSpeaker }] }) @@ -589,6 +601,17 @@ export default function ConversationDetail() {
+ {langfuseSessionUrl && conversation.conversation_id && ( + + + + )}