Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions backends/advanced/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,15 @@ def setup_langfuse(self):
self.config["LANGFUSE_PUBLIC_KEY"] = langfuse_pub
self.config["LANGFUSE_SECRET_KEY"] = langfuse_sec
self.config["LANGFUSE_BASE_URL"] = langfuse_host

# Derive browser-accessible URL for deep-links
public_url = getattr(self.args, 'langfuse_public_url', None) or "http://localhost:3002"
self._save_langfuse_public_url(public_url)

source = "external" if "langfuse-web" not in langfuse_host else "local"
self.console.print(f"[green][SUCCESS][/green] LangFuse auto-configured ({source})")
self.console.print(f"[blue][INFO][/blue] Host: {langfuse_host}")
self.console.print(f"[blue][INFO][/blue] Public URL: {public_url}")
self.console.print(f"[blue][INFO][/blue] Public key: {self.mask_api_key(langfuse_pub)}")
return

Expand Down Expand Up @@ -710,10 +716,28 @@ def setup_langfuse(self):
if secret_key:
self.config["LANGFUSE_SECRET_KEY"] = secret_key

# Browser-accessible URL for deep-links (stored in config.yml, not .env)
public_url = Prompt.ask(
"LangFuse browser URL (for dashboard links)",
default="http://localhost:3002",
)
if public_url:
self._save_langfuse_public_url(public_url)

self.console.print("[green][SUCCESS][/green] LangFuse configured")
else:
self.console.print("[blue][INFO][/blue] LangFuse disabled")

def _save_langfuse_public_url(self, public_url: str):
"""Save the Langfuse browser-accessible URL to config.yml."""
full_config = self.config_manager.get_full_config()
if "observability" not in full_config:
full_config["observability"] = {}
if "langfuse" not in full_config["observability"]:
full_config["observability"]["langfuse"] = {}
full_config["observability"]["langfuse"]["public_url"] = public_url
self.config_manager.save_full_config(full_config)

def setup_network(self):
"""Configure network settings"""
self.print_section("Network Configuration")
Expand Down Expand Up @@ -1038,6 +1062,8 @@ def main():
help="LangFuse project secret key (from langfuse init or external)")
parser.add_argument("--langfuse-host",
help="LangFuse host URL (default: http://langfuse-web:3000 for local)")
parser.add_argument("--langfuse-public-url",
help="LangFuse browser-accessible URL for deep-links (default: http://localhost:3002)")
parser.add_argument("--streaming-provider",
choices=["deepgram", "smallest", "qwen3-asr"],
help="Streaming provider when different from batch (enables batch re-transcription)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,33 @@ async def get_auth_config():
}


async def get_observability_config():
"""Get observability configuration for frontend (Langfuse deep-links).

Returns non-secret data only (enabled status and browser URL).
"""
from advanced_omi_backend.openai_factory import is_langfuse_enabled

enabled = is_langfuse_enabled()
session_base_url = None

if enabled:
from advanced_omi_backend.config_loader import load_config

cfg = load_config()
public_url = cfg.get("observability", {}).get("langfuse", {}).get("public_url", "")
if public_url:
# Strip trailing slash and build session URL
session_base_url = f"{public_url.rstrip('/')}/project/chronicle/sessions"

return {
"langfuse": {
"enabled": enabled,
"session_base_url": session_base_url,
}
}


# Audio file processing functions moved to audio_controller.py


Expand Down
36 changes: 30 additions & 6 deletions backends/advanced/src/advanced_omi_backend/llm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Any, Dict, Optional

from advanced_omi_backend.model_registry import get_models_registry
from advanced_omi_backend.openai_factory import create_openai_client
from advanced_omi_backend.openai_factory import create_openai_client, is_langfuse_enabled
from advanced_omi_backend.services.memory.config import (
load_config_yml as _load_root_config,
)
Expand Down Expand Up @@ -78,7 +78,8 @@ def __init__(
raise

def generate(
self, prompt: str, model: str | None = None, temperature: float | None = None
self, prompt: str, model: str | None = None, temperature: float | None = None,
**langfuse_kwargs,
) -> str:
"""Generate text completion using OpenAI-compatible API."""
try:
Expand All @@ -90,6 +91,8 @@ def generate(
"messages": [{"role": "user", "content": prompt}],
"temperature": temp,
}
if is_langfuse_enabled():
params.update(langfuse_kwargs)

response = self.client.chat.completions.create(**params)
return response.choices[0].message.content.strip()
Expand All @@ -98,7 +101,8 @@ def generate(
raise

def chat_with_tools(
self, messages: list, tools: list | None = None, model: str | None = None, temperature: float | None = None
self, messages: list, tools: list | None = None, model: str | None = None,
temperature: float | None = None, **langfuse_kwargs,
):
"""Chat completion with tool/function calling support. Returns raw response object."""
model_name = model or self.model
Expand All @@ -109,6 +113,8 @@ def chat_with_tools(
}
if tools:
params["tools"] = tools
if is_langfuse_enabled():
params.update(langfuse_kwargs)
return self.client.chat.completions.create(**params)

def health_check(self) -> Dict:
Expand Down Expand Up @@ -190,39 +196,53 @@ def reset_llm_client():
_llm_client = None


def _langfuse_metadata(session_id: str | None) -> dict:
"""Return metadata dict with langfuse_session_id if Langfuse is enabled."""
if session_id and is_langfuse_enabled():
return {"langfuse_session_id": session_id}
return {}


# Async wrapper for blocking LLM operations
async def async_generate(
prompt: str,
model: str | None = None,
temperature: float | None = None,
operation: str | None = None,
langfuse_session_id: str | None = None,
) -> str:
"""Async wrapper for LLM text generation.
When ``operation`` is provided, parameters are resolved from the
``llm_operations`` config section via ``get_llm_operation()``.
The resolved config determines model, temperature, max_tokens, etc.
Explicit ``model``/``temperature`` kwargs still override the resolved values.
When ``langfuse_session_id`` is provided and Langfuse is enabled,
the session ID is set on the current Langfuse trace to group all
LLM calls for a conversation.
"""
if operation:
registry = get_models_registry()
if registry:
op = registry.get_llm_operation(operation)
client = op.get_client(is_async=True)
api_params = op.to_api_params()
# Allow explicit overrides
if temperature is not None:
api_params["temperature"] = temperature
if model is not None:
api_params["model"] = model
api_params["messages"] = [{"role": "user", "content": prompt}]
api_params["metadata"] = _langfuse_metadata(langfuse_session_id)
response = await client.chat.completions.create(**api_params)
return response.choices[0].message.content.strip()

# Fallback: use singleton client
client = get_llm_client()
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, client.generate, prompt, model, temperature)
return await loop.run_in_executor(
None, lambda: client.generate(prompt, model, temperature)
)


async def async_chat_with_tools(
Expand All @@ -231,6 +251,7 @@ async def async_chat_with_tools(
model: str | None = None,
temperature: float | None = None,
operation: str | None = None,
langfuse_session_id: str | None = None,
):
"""Async wrapper for chat completion with tool calling.
Expand All @@ -249,12 +270,15 @@ async def async_chat_with_tools(
api_params["messages"] = messages
if tools:
api_params["tools"] = tools
api_params["metadata"] = _langfuse_metadata(langfuse_session_id)
return await client.chat.completions.create(**api_params)

# Fallback: use singleton client
client = get_llm_client()
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, client.chat_with_tools, messages, tools, model, temperature)
return await loop.run_in_executor(
None, lambda: client.chat_with_tools(messages, tools, model, temperature)
)


async def async_health_check() -> Dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ async def get_auth_config():
return await system_controller.get_auth_config()


@router.get("/observability")
async def get_observability_config():
"""Get observability configuration for frontend (Langfuse deep-links).

Returns non-secret data: enabled status and browser-accessible session URL.
No authentication required.
"""
return await system_controller.get_observability_config()


@router.get("/diarization-settings")
async def get_diarization_settings(current_user: User = Depends(current_superuser)):
"""Get current diarization settings. Admin only."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,15 @@ class LLMProviderBase(ABC):
@abstractmethod
async def extract_memories(
self, text: str, prompt: str, user_id: Optional[str] = None,
langfuse_session_id: Optional[str] = None,
) -> List[str]:
"""Extract meaningful fact memories from text using an LLM.

Args:
text: Input text to extract memories from
prompt: System prompt to guide the extraction process
user_id: Optional user ID for per-user prompt override resolution
langfuse_session_id: Optional session ID for Langfuse trace grouping

Returns:
List of extracted fact memory strings
Expand All @@ -373,6 +375,7 @@ async def propose_memory_actions(
retrieved_old_memory: List[Dict[str, str]],
new_facts: List[str],
custom_prompt: Optional[str] = None,
langfuse_session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Propose memory management actions based on existing and new information.

Expand All @@ -398,6 +401,7 @@ async def propose_reprocess_actions(
diff_context: str,
new_transcript: str,
custom_prompt: Optional[str] = None,
langfuse_session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Propose memory updates after transcript reprocessing (e.g., speaker changes).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ async def add_memory(
fact_memories_text = await asyncio.wait_for(
self.llm_provider.extract_memories(
transcript, self.config.extraction_prompt, user_id=user_id,
langfuse_session_id=source_id,
),
timeout=self.config.timeout_seconds,
)
Expand Down Expand Up @@ -193,7 +194,8 @@ async def add_memory(
if allow_update and fact_memories_text:
memory_logger.info(f"🔍 Allowing update for {source_id}")
created_ids = await self._process_memory_updates(
fact_memories_text, embeddings, user_id, client_id, source_id, user_email
fact_memories_text, embeddings, user_id, client_id, source_id, user_email,
langfuse_session_id=source_id,
)
else:
memory_logger.info(f"🔍 Not allowing update for {source_id}")
Expand Down Expand Up @@ -578,6 +580,7 @@ async def reprocess_memory(
existing_memories=existing_memory_dicts,
diff_context=diff_text,
new_transcript=transcript,
langfuse_session_id=source_id,
)
memory_logger.info(
f"🔄 Reprocess LLM returned actions: {actions_obj}"
Expand Down Expand Up @@ -786,6 +789,7 @@ async def _process_memory_updates(
client_id: str,
source_id: str,
user_email: str,
langfuse_session_id: Optional[str] = None,
) -> List[str]:
"""Process memory updates using LLM-driven action proposals.

Expand Down Expand Up @@ -848,6 +852,7 @@ async def _process_memory_updates(
retrieved_old_memory=retrieved_old_memory,
new_facts=memories_text,
custom_prompt=None,
langfuse_session_id=langfuse_session_id,
)
memory_logger.info(f"📝 UpdateMemory LLM returned: {type(actions_obj)} - {actions_obj}")
except Exception as e_actions:
Expand Down
Loading
Loading