Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions backends/advanced/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,6 @@ services:
timeout: 10s
retries: 5
start_period: 30s
profiles:
- obsidian
- knowledge-graph

# ollama:
# image: ollama/ollama:latest
Expand Down
129 changes: 43 additions & 86 deletions backends/advanced/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,119 +444,89 @@ def setup_optional_services(self):
self.config["TS_AUTHKEY"] = self.args.ts_authkey
self.console.print(f"[green][SUCCESS][/green] Tailscale auth key configured (Docker integration enabled)")

def setup_neo4j(self):
"""Configure Neo4j credentials (always required - used by Knowledge Graph)"""
neo4j_password = getattr(self.args, 'neo4j_password', None)

if neo4j_password:
self.console.print(f"[green]✅[/green] Neo4j: password configured via wizard")
else:
# Interactive prompt (standalone init.py run)
self.console.print()
self.console.print("[bold cyan]Neo4j Configuration[/bold cyan]")
self.console.print("Neo4j is used for Knowledge Graph (entity/relationship extraction)")
self.console.print()
neo4j_password = self.prompt_password("Neo4j password (min 8 chars)")

self.config["NEO4J_HOST"] = "neo4j"
self.config["NEO4J_USER"] = "neo4j"
self.config["NEO4J_PASSWORD"] = neo4j_password
self.console.print("[green][SUCCESS][/green] Neo4j credentials configured")

def setup_obsidian(self):
"""Configure Obsidian/Neo4j integration"""
# Check if enabled via command line
"""Configure Obsidian integration (optional feature flag only - Neo4j credentials handled by setup_neo4j)"""
if hasattr(self.args, 'enable_obsidian') and self.args.enable_obsidian:
enable_obsidian = True
neo4j_password = getattr(self.args, 'neo4j_password', None)

if not neo4j_password:
self.console.print("[yellow][WARNING][/yellow] --enable-obsidian provided but no password")
neo4j_password = self.prompt_password("Neo4j password (min 8 chars)")

self.console.print(f"[green]✅[/green] Obsidian/Neo4j: enabled (configured via wizard)")
self.console.print(f"[green]✅[/green] Obsidian: enabled (configured via wizard)")
else:
# Interactive prompt (fallback)
self.console.print()
self.console.print("[bold cyan]Obsidian/Neo4j Integration[/bold cyan]")
self.console.print("[bold cyan]Obsidian Integration (Optional)[/bold cyan]")
self.console.print("Enable graph-based knowledge management for Obsidian vault notes")
self.console.print()

try:
enable_obsidian = Confirm.ask("Enable Obsidian/Neo4j integration?", default=False)
enable_obsidian = Confirm.ask("Enable Obsidian integration?", default=False)
except EOFError:
self.console.print("Using default: No")
enable_obsidian = False

if enable_obsidian:
neo4j_password = self.prompt_password("Neo4j password (min 8 chars)")

if enable_obsidian:
# Update .env with credentials only (secrets, not feature flags)
self.config["NEO4J_HOST"] = "neo4j"
self.config["NEO4J_USER"] = "neo4j"
self.config["NEO4J_PASSWORD"] = neo4j_password

# Update config.yml with feature flag (source of truth) - auto-saves via ConfigManager
self.config_manager.update_memory_config({
"obsidian": {
"enabled": True,
"neo4j_host": "neo4j",
"timeout": 30
}
})

self.console.print("[green][SUCCESS][/green] Obsidian/Neo4j configured")
self.console.print("[blue][INFO][/blue] Neo4j will start automatically with --profile obsidian")
self.console.print("[green][SUCCESS][/green] Obsidian integration enabled")
else:
# Explicitly disable Obsidian in config.yml when not enabled
self.config_manager.update_memory_config({
"obsidian": {
"enabled": False,
"neo4j_host": "neo4j",
"timeout": 30
}
})
self.console.print("[blue][INFO][/blue] Obsidian/Neo4j integration disabled")
self.console.print("[blue][INFO][/blue] Obsidian integration disabled")

def setup_knowledge_graph(self):
"""Configure Knowledge Graph (Neo4j-based entity/relationship extraction)"""
# Check if enabled via command line
"""Configure Knowledge Graph (Neo4j-based entity/relationship extraction - enabled by default)"""
if hasattr(self.args, 'enable_knowledge_graph') and self.args.enable_knowledge_graph:
enable_kg = True
neo4j_password = getattr(self.args, 'neo4j_password', None)

if not neo4j_password:
# Check if already set from obsidian setup
neo4j_password = self.config.get("NEO4J_PASSWORD")
if not neo4j_password:
self.console.print("[yellow][WARNING][/yellow] --enable-knowledge-graph provided but no password")
neo4j_password = self.prompt_password("Neo4j password (min 8 chars)")
else:
# Interactive prompt (fallback)
self.console.print()
self.console.print("[bold cyan]Knowledge Graph (Entity Extraction)[/bold cyan]")
self.console.print("Enable graph-based entity and relationship extraction from conversations")
self.console.print("Extracts: People, Places, Organizations, Events, Promises/Tasks")
self.console.print("Extract people, places, organizations, events, and tasks from conversations")
self.console.print()

try:
enable_kg = Confirm.ask("Enable Knowledge Graph?", default=False)
enable_kg = Confirm.ask("Enable Knowledge Graph?", default=True)
except EOFError:
self.console.print("Using default: No")
enable_kg = False

if enable_kg:
# Check if Neo4j password already set from obsidian setup
existing_password = self.config.get("NEO4J_PASSWORD")
if existing_password:
self.console.print("[blue][INFO][/blue] Using Neo4j password from Obsidian configuration")
neo4j_password = existing_password
else:
neo4j_password = self.prompt_password("Neo4j password (min 8 chars)")
self.console.print("Using default: Yes")
enable_kg = True

if enable_kg:
# Update .env with credentials only (secrets, not feature flags)
self.config["NEO4J_HOST"] = "neo4j"
self.config["NEO4J_USER"] = "neo4j"
if neo4j_password:
self.config["NEO4J_PASSWORD"] = neo4j_password

# Update config.yml with feature flag (source of truth) - auto-saves via ConfigManager
self.config_manager.update_memory_config({
"knowledge_graph": {
"enabled": True,
"neo4j_host": "neo4j",
"timeout": 30
}
})

self.console.print("[green][SUCCESS][/green] Knowledge Graph configured")
self.console.print("[blue][INFO][/blue] Neo4j will start automatically with --profile knowledge-graph")
self.console.print("[green][SUCCESS][/green] Knowledge Graph enabled")
self.console.print("[blue][INFO][/blue] Entities and relationships will be extracted from conversations")
else:
# Explicitly disable Knowledge Graph in config.yml when not enabled
self.config_manager.update_memory_config({
"knowledge_graph": {
"enabled": False,
Expand All @@ -577,12 +547,14 @@ def setup_langfuse(self):

if langfuse_pub and langfuse_sec:
# Auto-configure from wizard — no prompts needed
self.config["LANGFUSE_HOST"] = "http://langfuse-web:3000"
langfuse_host = getattr(self.args, 'langfuse_host', None) or "http://langfuse-web:3000"
self.config["LANGFUSE_HOST"] = langfuse_host
self.config["LANGFUSE_PUBLIC_KEY"] = langfuse_pub
self.config["LANGFUSE_SECRET_KEY"] = langfuse_sec
self.config["LANGFUSE_BASE_URL"] = "http://langfuse-web:3000"
self.console.print("[green][SUCCESS][/green] LangFuse auto-configured from wizard")
self.console.print(f"[blue][INFO][/blue] Host: http://langfuse-web:3000")
self.config["LANGFUSE_BASE_URL"] = langfuse_host
source = "external" if "langfuse-web" not in langfuse_host else "local"
self.console.print(f"[green][SUCCESS][/green] LangFuse auto-configured ({source})")
self.console.print(f"[blue][INFO][/blue] Host: {langfuse_host}")
self.console.print(f"[blue][INFO][/blue] Public key: {self.mask_api_key(langfuse_pub)}")
return

Expand Down Expand Up @@ -842,25 +814,7 @@ def show_next_steps(self):
config_yml = self.config_manager.get_full_config()

self.console.print("1. Start the main services:")
# Include --profile obsidian/knowledge-graph if enabled (read from config.yml)
obsidian_enabled = config_yml.get("memory", {}).get("obsidian", {}).get("enabled", False)
kg_enabled = config_yml.get("memory", {}).get("knowledge_graph", {}).get("enabled", False)

profiles = []
profile_notes = []
if obsidian_enabled:
profiles.append("obsidian")
profile_notes.append("Obsidian integration")
if kg_enabled:
profiles.append("knowledge-graph")
profile_notes.append("Knowledge Graph")

if profiles:
profile_args = " ".join([f"--profile {p}" for p in profiles])
self.console.print(f" [cyan]docker compose {profile_args} up --build -d[/cyan]")
self.console.print(f" [dim](Includes Neo4j for: {', '.join(profile_notes)})[/dim]")
else:
self.console.print(" [cyan]docker compose up --build -d[/cyan]")
self.console.print(" [cyan]docker compose up --build -d[/cyan]")
self.console.print()

# Auto-determine URLs for next steps
Expand Down Expand Up @@ -908,6 +862,7 @@ def run(self):
self.setup_llm()
self.setup_memory()
self.setup_optional_services()
self.setup_neo4j()
self.setup_obsidian()
self.setup_knowledge_graph()
self.setup_langfuse()
Expand Down Expand Up @@ -967,9 +922,11 @@ def main():
parser.add_argument("--ts-authkey",
help="Tailscale auth key for Docker integration (default: prompt user)")
parser.add_argument("--langfuse-public-key",
help="LangFuse project public key (from langfuse init)")
help="LangFuse project public key (from langfuse init or external)")
parser.add_argument("--langfuse-secret-key",
help="LangFuse project secret key (from langfuse init)")
help="LangFuse project secret key (from langfuse init or external)")
parser.add_argument("--langfuse-host",
help="LangFuse host URL (default: http://langfuse-web:3000 for local)")

args = parser.parse_args()

Expand Down
1 change: 1 addition & 0 deletions backends/advanced/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"google-auth-oauthlib>=1.0.0",
"google-auth-httplib2>=0.2.0",
"websockets>=12.0",
"croniter>=1.3.0",
]

[project.optional-dependencies]
Expand Down
27 changes: 27 additions & 0 deletions backends/advanced/src/advanced_omi_backend/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,23 @@ async def lifespan(app: FastAPI):
# Register OpenMemory user if using openmemory_mcp provider
await initialize_openmemory_user()

# Start cron scheduler (requires Redis to be available)
try:
from advanced_omi_backend.cron_scheduler import get_scheduler, register_cron_job
from advanced_omi_backend.workers.finetuning_jobs import (
run_asr_jargon_extraction_job,
run_speaker_finetuning_job,
)

register_cron_job("speaker_finetuning", run_speaker_finetuning_job)
register_cron_job("asr_jargon_extraction", run_asr_jargon_extraction_job)

scheduler = get_scheduler()
await scheduler.start()
application_logger.info("Cron scheduler started")
except Exception as e:
application_logger.warning(f"Cron scheduler failed to start: {e}")

# SystemTracker is used for monitoring and debugging
application_logger.info("Using SystemTracker for monitoring and debugging")

Expand Down Expand Up @@ -319,6 +336,16 @@ async def lifespan(app: FastAPI):
except Exception as e:
application_logger.error(f"Error shutting down plugins: {e}")

# Shutdown cron scheduler
try:
from advanced_omi_backend.cron_scheduler import get_scheduler

scheduler = get_scheduler()
await scheduler.stop()
application_logger.info("Cron scheduler stopped")
except Exception as e:
application_logger.error(f"Error stopping cron scheduler: {e}")

# Shutdown memory service and speaker service
shutdown_memory_service()
application_logger.info("Memory and speaker services shut down.")
Expand Down
13 changes: 12 additions & 1 deletion backends/advanced/src/advanced_omi_backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,14 @@ def get_misc_settings() -> dict:
transcription_cfg = get_backend_config('transcription')
transcription_settings = OmegaConf.to_container(transcription_cfg, resolve=True) if transcription_cfg else {}

# Get speaker recognition settings for per_segment_speaker_id
speaker_cfg = get_backend_config('speaker_recognition')
speaker_settings = OmegaConf.to_container(speaker_cfg, resolve=True) if speaker_cfg else {}

return {
'always_persist_enabled': audio_settings.get('always_persist_enabled', False),
'use_provider_segments': transcription_settings.get('use_provider_segments', False)
'use_provider_segments': transcription_settings.get('use_provider_segments', False),
'per_segment_speaker_id': speaker_settings.get('per_segment_speaker_id', False),
}


Expand Down Expand Up @@ -228,4 +233,10 @@ def save_misc_settings(settings: dict) -> bool:
if not save_config_section('backend.transcription', transcription_settings):
success = False

# Save speaker recognition settings if per_segment_speaker_id is provided
if 'per_segment_speaker_id' in settings:
speaker_settings = {'per_segment_speaker_id': settings['per_segment_speaker_id']}
if not save_config_section('backend.speaker_recognition', speaker_settings):
success = False

return success
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async def upload_and_process_audio_files(
conversation_id,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
job_timeout=900, # 15 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe uploaded file {conversation_id[:8]}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ async def reprocess_transcript(conversation_id: str, user: User):
conversation_id,
version_id,
"reprocess",
job_timeout=600,
job_timeout=900, # 15 minutes
result_ttl=JOB_RESULT_TTL,
job_id=f"reprocess_{conversation_id[:8]}",
description=f"Transcribe audio for {conversation_id[:8]}",
Expand Down Expand Up @@ -722,14 +722,24 @@ async def reprocess_speakers(conversation_id: str, transcript_version_id: str, u
provider_capabilities.get("diarization", False)
or source_version.diarization_source == "provider"
)
has_words = bool(source_version.words)
has_segments = bool(source_version.segments)

if not source_version.words and not (provider_has_diarization and source_version.segments):
if not has_words and not has_segments:
return JSONResponse(
status_code=400,
content={
"error": "Cannot re-diarize transcript without word timings. Words are required for diarization."
"error": (
"Cannot re-diarize transcript without word timings or segments. "
"Word timestamps or provider segments are required."
)
},
)
if not has_words and has_segments and not provider_has_diarization:
logger.warning(
"Reprocessing speakers without word timings; "
"falling back to segment-based identification only."
)

# 5. Check if speaker recognition is enabled
speaker_config = get_service_config("speaker_recognition")
Expand All @@ -752,10 +762,13 @@ async def reprocess_speakers(conversation_id: str, transcript_version_id: str, u
"reprocessing_type": "speaker_diarization",
"source_version_id": source_version_id,
"trigger": "manual_reprocess",
"provider_capabilities": provider_capabilities,
}
if provider_has_diarization:
use_segments = provider_has_diarization or not has_words
if use_segments:
new_segments = source_version.segments # COPY provider segments
new_metadata["provider_capabilities"] = provider_capabilities
if not has_words and not provider_has_diarization:
new_metadata["segments_only"] = True
else:
new_segments = [] # Empty - will be populated by speaker job

Expand All @@ -772,7 +785,7 @@ async def reprocess_speakers(conversation_id: str, transcript_version_id: str, u
)

# Carry over diarization_source so speaker job knows to use segment identification
if provider_has_diarization:
if provider_has_diarization or (not has_words and has_segments):
new_version.diarization_source = "provider"

# Save conversation with new version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ async def _process_rolling_batch(
conversation_id,
version_id,
f"rolling_batch_{batch_number}", # trigger
job_timeout=1800, # 30 minutes
job_timeout=900, # 15 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe rolling batch #{batch_number} {conversation_id[:8]}",
Expand Down Expand Up @@ -1137,7 +1137,7 @@ async def _process_batch_audio_complete(
conversation_id,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
job_timeout=900, # 15 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe batch audio {conversation_id[:8]}",
Expand Down
Loading
Loading