Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 5 additions & 25 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,42 +1,22 @@
repos:
# Local hooks (project-specific checks)
- repo: local
hooks:
# Run Robot Framework endpoint tests before push
- id: robot-framework-tests
name: Robot Framework Tests (Endpoints)
entry: bash -c 'cd tests && make endpoints OUTPUTDIR=.pre-commit-results'
language: system
pass_filenames: false
stages: [push]
verbose: true

# Clean up test results after hook runs
- id: cleanup-test-results
name: Cleanup Test Results
entry: bash -c 'cd tests && rm -rf .pre-commit-results'
language: system
pass_filenames: false
stages: [push]
always_run: true

# Code formatting
- repo: https://github.com/psf/black
rev: 24.4.2
hooks:
- id: black
files: ^backends/advanced-backend/src/.*\.py$
exclude: \.venv/
- repo: https://github.com/PyCQA/isort
rev: 5.13.2
hooks:
- id: isort
files: ^backends/advanced-backend/src/.*\.py$
args: ["--profile", "black"]
exclude: \.venv/

# File hygiene
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
files: ^backends/advanced-backend/src/.*
exclude: \.venv/
- id: end-of-file-fixer
files: ^backends/advanced-backend/src/.*
exclude: \.venv/
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ help: ## Show detailed help for all targets
setup-dev: ## Setup development environment (git hooks, pre-commit)
@echo "🛠️ Setting up development environment..."
@echo ""
@bash scripts/check_uv.sh
@echo "📦 Installing pre-commit..."
@pip install pre-commit 2>/dev/null || pip3 install pre-commit
@uv tool install pre-commit
@echo ""
@echo "🔧 Installing git hooks..."
@pre-commit install --hook-type pre-push
Expand Down
6 changes: 6 additions & 0 deletions backends/advanced/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ LANGFUSE_PUBLIC_KEY=
LANGFUSE_SECRET_KEY=
LANGFUSE_BASE_URL=http://langfuse-web:3000

# Galileo (OTEL-based LLM observability)
GALILEO_API_KEY=
GALILEO_PROJECT=chronicle
GALILEO_LOG_STREAM=default
# GALILEO_CONSOLE_URL=https://app.galileo.ai # Default; override for self-hosted

# Qwen3-ASR (offline ASR via vLLM)
# QWEN3_ASR_URL=host.docker.internal:8767
# QWEN3_ASR_STREAM_URL=host.docker.internal:8769
Expand Down
4 changes: 2 additions & 2 deletions backends/advanced/Docs/plugin-development-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async def on_memory_processed(self, context: PluginContext):

**When**: OMI device button is pressed
**Context Data**:
- `state` (str): Button state (`SINGLE_TAP`, `DOUBLE_TAP`)
- `state` (str): Button state (`SINGLE_PRESS`, `DOUBLE_PRESS`)
- `timestamp` (float): Unix timestamp of the event
- `audio_uuid` (str): Current audio session UUID (may be None)
- `session_id` (str): Streaming session ID (for conversation close)
Expand All @@ -222,7 +222,7 @@ friend-lite-sdk (extras/friend-lite-sdk/)
→ parse_button_event() converts payload → ButtonState IntEnum
BLE Client (extras/local-wearable-client/ or mobile app)
→ Formats as Wyoming protocol: {"type": "button-event", "data": {"state": "SINGLE_TAP"}}
→ Formats as Wyoming protocol: {"type": "button-event", "data": {"state": "SINGLE_PRESS"}}
→ Sends over WebSocket
Backend (websocket_controller.py)
Expand Down
128 changes: 99 additions & 29 deletions backends/advanced/src/advanced_omi_backend/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,48 @@ async def initialize_openmemory_user() -> None:

# Get configured user_id and server_url
openmemory_config = memory_provider_config.openmemory_config
user_id = openmemory_config.get("user_id", "openmemory") if openmemory_config else "openmemory"
server_url = openmemory_config.get("server_url", "http://host.docker.internal:8765") if openmemory_config else "http://host.docker.internal:8765"
client_name = openmemory_config.get("client_name", "chronicle") if openmemory_config else "chronicle"
user_id = (
openmemory_config.get("user_id", "openmemory")
if openmemory_config
else "openmemory"
)
server_url = (
openmemory_config.get("server_url", "http://host.docker.internal:8765")
if openmemory_config
else "http://host.docker.internal:8765"
)
client_name = (
openmemory_config.get("client_name", "chronicle")
if openmemory_config
else "chronicle"
)

application_logger.info(f"Registering OpenMemory user: {user_id} at {server_url}")
application_logger.info(
f"Registering OpenMemory user: {user_id} at {server_url}"
)

# Make a lightweight registration call (create and delete dummy memory)
async with MCPClient(server_url=server_url, client_name=client_name, user_id=user_id) as client:
async with MCPClient(
server_url=server_url, client_name=client_name, user_id=user_id
) as client:
# Test connection first
is_connected = await client.test_connection()
if is_connected:
# Create and immediately delete a dummy memory to trigger user creation
memory_ids = await client.add_memories("Chronicle initialization - user registration test")
memory_ids = await client.add_memories(
"Chronicle initialization - user registration test"
)
if memory_ids:
# Delete the test memory
await client.delete_memory(memory_ids[0])
application_logger.info(f"✅ Registered OpenMemory user: {user_id}")
else:
application_logger.warning(f"⚠️ OpenMemory MCP not reachable at {server_url}")
application_logger.info("User will be auto-created on first memory operation")
application_logger.warning(
f"⚠️ OpenMemory MCP not reachable at {server_url}"
)
application_logger.info(
"User will be auto-created on first memory operation"
)
except Exception as e:
application_logger.warning(f"⚠️ Could not register OpenMemory user: {e}")
application_logger.info("User will be auto-created on first memory operation")
Expand All @@ -116,7 +138,13 @@ async def lifespan(app: FastAPI):

await init_beanie(
database=config.db,
document_models=[User, Conversation, AudioChunkDocument, WaveformData, Annotation],
document_models=[
User,
Conversation,
AudioChunkDocument,
WaveformData,
Annotation,
],
)
application_logger.info("Beanie initialized for all document models")
except Exception as e:
Expand All @@ -133,12 +161,17 @@ async def lifespan(app: FastAPI):
# Initialize Redis connection for RQ
try:
from advanced_omi_backend.controllers.queue_controller import redis_conn

redis_conn.ping()
application_logger.info("Redis connection established for RQ")
application_logger.info("RQ workers can be started with: rq worker transcription memory default")
application_logger.info(
"RQ workers can be started with: rq worker transcription memory default"
)
except Exception as e:
application_logger.error(f"Failed to connect to Redis for RQ: {e}")
application_logger.warning("RQ queue system will not be available - check Redis connection")
application_logger.warning(
"RQ queue system will not be available - check Redis connection"
)

# Initialize BackgroundTaskManager (must happen before any code path uses it)
try:
Expand All @@ -153,6 +186,14 @@ async def lifespan(app: FastAPI):
get_client_manager()
application_logger.info("ClientManager initialized")

# Initialize OTEL/Galileo if configured (before LLM client so instrumentor patches OpenAI first)
try:
from advanced_omi_backend.observability.otel_setup import init_otel

init_otel()
except Exception as e:
application_logger.warning(f"OTEL initialization skipped: {e}")

# Initialize prompt registry with defaults; seed into LangFuse in background
try:
from advanced_omi_backend.prompt_defaults import register_all_defaults
Expand All @@ -176,6 +217,7 @@ async def _deferred_seed():
# Initialize LLM client eagerly (catch config errors at startup, not on first request)
try:
from advanced_omi_backend.llm_client import get_llm_client

get_llm_client()
application_logger.info("LLM client initialized from config.yml")
except Exception as e:
Expand All @@ -186,35 +228,47 @@ async def _deferred_seed():
audio_service = get_audio_stream_service()
await audio_service.connect()
application_logger.info("Audio stream service connected to Redis Streams")
application_logger.info("Audio stream workers can be started with: python -m advanced_omi_backend.workers.audio_stream_worker")
application_logger.info(
"Audio stream workers can be started with: python -m advanced_omi_backend.workers.audio_stream_worker"
)
except Exception as e:
application_logger.error(f"Failed to connect audio stream service: {e}")
application_logger.warning("Redis Streams audio processing will not be available")
application_logger.warning(
"Redis Streams audio processing will not be available"
)

# Initialize Redis client for audio streaming producer (used by WebSocket handlers)
try:
app.state.redis_audio_stream = await redis.from_url(
config.redis_url,
encoding="utf-8",
decode_responses=False
config.redis_url, encoding="utf-8", decode_responses=False
)
from advanced_omi_backend.services.audio_stream import AudioStreamProducer
app.state.audio_stream_producer = AudioStreamProducer(app.state.redis_audio_stream)
application_logger.info("✅ Redis client for audio streaming producer initialized")

app.state.audio_stream_producer = AudioStreamProducer(
app.state.redis_audio_stream
)
application_logger.info(
"✅ Redis client for audio streaming producer initialized"
)

# Initialize ClientManager Redis for cross-container client→user mapping
from advanced_omi_backend.client_manager import (
initialize_redis_for_client_manager,
)

initialize_redis_for_client_manager(config.redis_url)

except Exception as e:
application_logger.error(f"Failed to initialize Redis client for audio streaming: {e}", exc_info=True)
application_logger.error(
f"Failed to initialize Redis client for audio streaming: {e}", exc_info=True
)
application_logger.warning("Audio streaming producer will not be available")

# Skip memory service pre-initialization to avoid blocking FastAPI startup
# Memory service will be lazily initialized when first used
application_logger.info("Memory service will be initialized on first use (lazy loading)")
application_logger.info(
"Memory service will be initialized on first use (lazy loading)"
)

# Register OpenMemory user if using openmemory_mcp provider
await initialize_openmemory_user()
Expand Down Expand Up @@ -264,12 +318,15 @@ async def _deferred_seed():
application_logger.info(f"✅ Plugin '{plugin_id}' initialized")
except Exception as e:
plugin_router.mark_plugin_failed(plugin_id, str(e))
application_logger.error(f"Failed to initialize plugin '{plugin_id}': {e}", exc_info=True)
application_logger.error(
f"Failed to initialize plugin '{plugin_id}': {e}",
exc_info=True,
)

health = plugin_router.get_health_summary()
application_logger.info(
f"Plugins initialized: {health['initialized']}/{health['total']} active"
+ (f", {health['failed']} failed" if health['failed'] else "")
+ (f", {health['failed']} failed" if health["failed"] else "")
)

# Store in app state for API access
Expand All @@ -281,10 +338,14 @@ async def _deferred_seed():
app.state.plugin_router = None

except Exception as e:
application_logger.error(f"Failed to initialize plugin system: {e}", exc_info=True)
application_logger.error(
f"Failed to initialize plugin system: {e}", exc_info=True
)
app.state.plugin_router = None

application_logger.info("Application ready - using application-level processing architecture.")
application_logger.info(
"Application ready - using application-level processing architecture."
)

logger.info("App ready")
try:
Expand All @@ -300,6 +361,7 @@ async def _deferred_seed():
from advanced_omi_backend.controllers.websocket_controller import (
cleanup_client_state,
)

await cleanup_client_state(client_id)
except Exception as e:
application_logger.error(f"Error cleaning up client {client_id}: {e}")
Expand Down Expand Up @@ -327,9 +389,14 @@ async def _deferred_seed():

# Close Redis client for audio streaming producer
try:
if hasattr(app.state, 'redis_audio_stream') and app.state.redis_audio_stream:
if (
hasattr(app.state, "redis_audio_stream")
and app.state.redis_audio_stream
):
await app.state.redis_audio_stream.close()
application_logger.info("Redis client for audio streaming producer closed")
application_logger.info(
"Redis client for audio streaming producer closed"
)
except Exception as e:
application_logger.error(f"Error closing Redis audio streaming client: {e}")

Expand All @@ -341,6 +408,7 @@ async def _deferred_seed():
from advanced_omi_backend.services.plugin_service import (
cleanup_plugin_router,
)

await cleanup_plugin_router()
application_logger.info("Plugins shut down")
except Exception as e:
Expand Down Expand Up @@ -380,7 +448,7 @@ def create_app() -> FastAPI:
# Add WebSocket router at root level (not under /api prefix)
app.include_router(websocket_router)

# Add authentication routers
# Add authentication routers
app.include_router(
fastapi_users.get_auth_router(cookie_backend),
prefix="/auth/cookie",
Expand All @@ -403,6 +471,8 @@ def create_app() -> FastAPI:
CHUNK_DIR = Path("/app/audio_chunks")
app.mount("/audio", StaticFiles(directory=CHUNK_DIR), name="audio")

logger.info("FastAPI application created with all routers and middleware configured")
logger.info(
"FastAPI application created with all routers and middleware configured"
)

return app
return app
Loading
Loading