Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion backends/advanced/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ services:
# Wait for audio queue to drain before timing out (test mode)
- WAIT_FOR_AUDIO_QUEUE_DRAIN=true
# Mock speaker recognition for tests (avoids resource-intensive ML service)
# To test with REAL speaker recognition: set to 'false' and start extras/speaker-recognition service
- USE_MOCK_SPEAKER_CLIENT=true
depends_on:
qdrant-test:
Expand Down Expand Up @@ -137,7 +138,7 @@ services:
context: ../../extras/speaker-recognition
dockerfile: Dockerfile
args:
PYTORCH_CUDA_VERSION: cpu
PYTORCH_CUDA_VERSION: cu12.6
image: speaker-recognition-test:latest
ports:
- "8086:8085" # Avoid conflict with dev speaker service on 8085
Expand All @@ -164,6 +165,32 @@ services:
profiles:
- speaker # Optional service - only start when explicitly enabled

mock-streaming-stt:
build:
context: ../..
dockerfile: tests/Dockerfile.mock-streaming-stt
ports:
- "9999:9999"
healthcheck:
test: ["CMD", "python", "-c", "import socket; s=socket.socket(); s.connect(('localhost',9999)); s.close()"]
interval: 10s
timeout: 5s
retries: 3
restart: unless-stopped

mock-llm:
build:
context: ../..
dockerfile: tests/Dockerfile.mock-llm
ports:
- "11435:11435"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:11435/health').read()"]
interval: 10s
timeout: 5s
retries: 3
restart: unless-stopped

workers-test:
build:
context: .
Expand Down Expand Up @@ -212,6 +239,7 @@ services:
# Wait for audio queue to drain before timing out (test mode)
- WAIT_FOR_AUDIO_QUEUE_DRAIN=true
# Mock speaker recognition for tests (avoids resource-intensive ML service)
# To test with REAL speaker recognition: set to 'false' and start extras/speaker-recognition service
- USE_MOCK_SPEAKER_CLIENT=true
depends_on:
chronicle-backend-test:
Expand Down
49 changes: 49 additions & 0 deletions backends/advanced/src/advanced_omi_backend/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,52 @@
application_logger = logging.getLogger("audio_processing")


async def initialize_openmemory_user() -> None:
"""Initialize and register OpenMemory user if using OpenMemory MCP provider.

This function:
- Checks if OpenMemory MCP is configured as the memory provider
- Registers the configured user with OpenMemory server
- Creates a test memory and deletes it to trigger user creation
- Logs success or warning if OpenMemory is not reachable
"""
from advanced_omi_backend.services.memory.config import build_memory_config_from_env, MemoryProvider

memory_provider_config = build_memory_config_from_env()

if memory_provider_config.memory_provider != MemoryProvider.OPENMEMORY_MCP:
return

try:
from advanced_omi_backend.services.memory.providers.mcp_client import MCPClient

# Get configured user_id and server_url
openmemory_config = memory_provider_config.openmemory_config
user_id = openmemory_config.get("user_id", "openmemory") if openmemory_config else "openmemory"
server_url = openmemory_config.get("server_url", "http://host.docker.internal:8765") if openmemory_config else "http://host.docker.internal:8765"
client_name = openmemory_config.get("client_name", "chronicle") if openmemory_config else "chronicle"

application_logger.info(f"Registering OpenMemory user: {user_id} at {server_url}")

# Make a lightweight registration call (create and delete dummy memory)
async with MCPClient(server_url=server_url, client_name=client_name, user_id=user_id) as client:
# Test connection first
is_connected = await client.test_connection()
if is_connected:
# Create and immediately delete a dummy memory to trigger user creation
memory_ids = await client.add_memories("Chronicle initialization - user registration test")
if memory_ids:
# Delete the test memory
await client.delete_memory(memory_ids[0])
application_logger.info(f"✅ Registered OpenMemory user: {user_id}")
else:
application_logger.warning(f"⚠️ OpenMemory MCP not reachable at {server_url}")
application_logger.info("User will be auto-created on first memory operation")
except Exception as e:
application_logger.warning(f"⚠️ Could not register OpenMemory user: {e}")
application_logger.info("User will be auto-created on first memory operation")
Comment on lines +45 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check MCPClient for timeout and test_connection behavior
rg -n "class MCPClient|def test_connection|timeout" backends/advanced/src/advanced_omi_backend/services/memory/providers/mcp_client.py -A 5

Repository: SimpleOpenSoftware/chronicle

Length of output: 1689


🏁 Script executed:

# Check where initialize_openmemory_user is called
rg -n "initialize_openmemory_user" backends/advanced/src/advanced_omi_backend/app_factory.py -B 5 -A 5

Repository: SimpleOpenSoftware/chronicle

Length of output: 1052


🏁 Script executed:

# Check if there's any background task or await pattern
grep -n "await initialize_openmemory_user\|asyncio.create_task\|asyncio.ensure_future" backends/advanced/src/advanced_omi_backend/app_factory.py

Repository: SimpleOpenSoftware/chronicle

Length of output: 114


🏁 Script executed:

# Get test_connection full implementation
rg -n "async def test_connection" backends/advanced/src/advanced_omi_backend/services/memory/providers/mcp_client.py -A 15

Repository: SimpleOpenSoftware/chronicle

Length of output: 851


🏁 Script executed:

# Check if MCPClient calls are sequential or concurrent
rg -n "test_connection\|add_memories\|delete_memory" backends/advanced/src/advanced_omi_backend/app_factory.py -B 2 -A 2

Repository: SimpleOpenSoftware/chronicle

Length of output: 54


🏁 Script executed:

# Get full test_connection implementation
rg -n "async def test_connection" backends/advanced/src/advanced_omi_backend/services/memory/providers/mcp_client.py -A 25

Repository: SimpleOpenSoftware/chronicle

Length of output: 1283


🏁 Script executed:

# Check the initialization sequence in app_factory
sed -n '45,95p' backends/advanced/src/advanced_omi_backend/app_factory.py

Repository: SimpleOpenSoftware/chronicle

Length of output: 2689


Move OpenMemory user registration to background task to prevent startup blocking.

This function is directly awaited during startup and makes sequential network calls with 30-second timeouts (up to 90 seconds if OpenMemory is unreachable). Use asyncio.create_task() or equivalent to defer this initialization, or implement a shorter timeout. This contradicts the stated intent in the preceding comment about avoiding startup blocking.

🧰 Tools
🪛 Ruff (0.14.14)

86-86: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@backends/advanced/src/advanced_omi_backend/app_factory.py` around lines 45 -
88, The initialize_openmemory_user function performs blocking network operations
during startup; change its invocation so it runs in the background instead of
being awaited synchronously: locate the startup code that calls
initialize_openmemory_user and replace the direct await with scheduling it via
asyncio.create_task(initialize_openmemory_user()) (or use loop.create_task) so
the app doesn't block; alternatively, if you prefer time-bounding, add a short
per-call timeout around MCPClient.test_connection / add_memories (e.g.,
asyncio.wait_for) inside initialize_openmemory_user to limit how long MCPClient
(used in the async with block and methods test_connection, add_memories,
delete_memory) can hang. Ensure you keep build_memory_config_from_env and
MemoryProvider checks unchanged and preserve logging behavior when running as a
background task.



@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan events."""
Expand Down Expand Up @@ -126,6 +172,9 @@ async def lifespan(app: FastAPI):
# Memory service will be lazily initialized when first used
application_logger.info("Memory service will be initialized on first use (lazy loading)")

# Register OpenMemory user if using openmemory_mcp provider
await initialize_openmemory_user()

# SystemTracker is used for monitoring and debugging
application_logger.info("Using SystemTracker for monitoring and debugging")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def send_audio_start(
sample_rate: int = OMI_SAMPLE_RATE,
sample_width: int = OMI_SAMPLE_WIDTH,
channels: int = OMI_CHANNELS,
always_persist: bool = False,
) -> None:
"""Send Wyoming audio-start event.

Expand All @@ -131,6 +132,7 @@ async def send_audio_start(
sample_rate: Audio sample rate in Hz (default: 16000)
sample_width: Bytes per sample (default: 2 for 16-bit)
channels: Number of audio channels (default: 1)
always_persist: Save audio even if transcription fails (default: False)

Note:
The mode is inside the "data" dict, matching _handle_audio_session_start
Expand All @@ -146,11 +148,15 @@ async def send_audio_start(
"width": sample_width,
"channels": channels,
"mode": recording_mode,
"always_persist": always_persist,
},
"payload_length": None,
}
print(f"🔵 CLIENT: Sending audio-start message: {header}")
logger.info(f"🔵 CLIENT: Sending audio-start message: {header}")
await self.ws.send(json.dumps(header) + "\n")
logger.info(f"Sent audio-start with mode={recording_mode}")
print(f"✅ CLIENT: Sent audio-start with mode={recording_mode}, always_persist={always_persist}")
logger.info(f"✅ CLIENT: Sent audio-start with mode={recording_mode}, always_persist={always_persist}")

async def send_audio_chunk_wyoming(
self,
Expand Down Expand Up @@ -232,6 +238,7 @@ async def stream_wav_file(
use_wyoming: bool = True,
recording_mode: str = "streaming",
realtime_factor: float = 0.1,
always_persist: bool = False,
) -> int:
"""Stream a WAV file in chunks, simulating real-time audio.

Expand All @@ -241,6 +248,7 @@ async def stream_wav_file(
use_wyoming: If True, use Wyoming protocol; if False, send raw binary
recording_mode: "streaming" or "batch"
realtime_factor: Fraction of real-time to simulate (0.1 = 10x speed)
always_persist: Save audio even if transcription fails (default: False)

Returns:
Number of chunks sent
Expand Down Expand Up @@ -268,6 +276,7 @@ async def stream_wav_file(
sample_rate=sample_rate,
sample_width=sample_width,
channels=channels,
always_persist=always_persist,
)

# Reset counters
Expand Down Expand Up @@ -335,6 +344,7 @@ def stream_audio_file(
device_name: str = "robot-test",
recording_mode: str = "streaming",
use_wyoming: bool = True,
always_persist: bool = False,
) -> int:
"""Synchronous wrapper for streaming audio file.

Expand All @@ -348,6 +358,7 @@ def stream_audio_file(
device_name: Device name for client identification
recording_mode: "streaming" or "batch"
use_wyoming: If True, use Wyoming protocol
always_persist: Save audio even if transcription fails (default: False)

Returns:
Number of chunks sent
Expand All @@ -359,6 +370,7 @@ async def _run() -> int:
wav_path,
use_wyoming=use_wyoming,
recording_mode=recording_mode,
always_persist=always_persist,
)

return asyncio.run(_run())
Expand Down Expand Up @@ -407,6 +419,7 @@ def start_stream(
token: str,
device_name: str = "robot-test",
recording_mode: str = "streaming",
always_persist: bool = False,
) -> str:
"""Start a new audio stream (non-blocking).

Expand All @@ -415,6 +428,7 @@ def start_stream(
token: JWT token
device_name: Device name for client ID
recording_mode: "streaming" or "batch"
always_persist: Save audio even if transcription fails (default: False)

Returns:
stream_id: Unique ID for this stream session
Expand All @@ -440,14 +454,16 @@ def run_loop():
# Connect and send audio-start
async def _connect_and_start():
try:
logger.info(f"🔵 CLIENT: Stream {stream_id} connecting for {device_name}...")
await client.connect()
session.connected = True
await client.send_audio_start(recording_mode=recording_mode)
logger.info(f"✅ CLIENT: Stream {stream_id} connected, sending audio-start...")
await client.send_audio_start(recording_mode=recording_mode, always_persist=always_persist)
session.audio_started = True
logger.info(f"Stream {stream_id} started for {device_name}")
logger.info(f"✅ CLIENT: Stream {stream_id} started for {device_name}")
except Exception as e:
session.error = str(e)
logger.error(f"Stream {stream_id} failed to start: {e}")
logger.error(f"❌ CLIENT: Stream {stream_id} failed to start: {e}")

future = asyncio.run_coroutine_threadsafe(_connect_and_start(), loop)
future.result(timeout=10) # Wait for connection
Expand Down
54 changes: 53 additions & 1 deletion backends/advanced/src/advanced_omi_backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,56 @@ def get_audio_storage_settings() -> dict:
Dict with audio_base_path, audio_chunks_path
"""
cfg = get_backend_config('audio_storage')
return OmegaConf.to_container(cfg, resolve=True)
return OmegaConf.to_container(cfg, resolve=True)


# ============================================================================
# Miscellaneous Settings (OmegaConf-based)
# ============================================================================

def get_misc_settings() -> dict:
"""
Get miscellaneous configuration settings using OmegaConf.

Returns:
Dict with always_persist_enabled and use_provider_segments
"""
# Get audio settings for always_persist_enabled
audio_cfg = get_backend_config('audio')
audio_settings = OmegaConf.to_container(audio_cfg, resolve=True) if audio_cfg else {}

# Get transcription settings for use_provider_segments
transcription_cfg = get_backend_config('transcription')
transcription_settings = OmegaConf.to_container(transcription_cfg, resolve=True) if transcription_cfg else {}

return {
'always_persist_enabled': audio_settings.get('always_persist_enabled', False),
'use_provider_segments': transcription_settings.get('use_provider_segments', False)
}


def save_misc_settings(settings: dict) -> bool:
"""
Save miscellaneous settings to config.yml using OmegaConf.

Args:
settings: Dict with always_persist_enabled and/or use_provider_segments

Returns:
True if saved successfully, False otherwise
"""
success = True

# Save audio settings if always_persist_enabled is provided
if 'always_persist_enabled' in settings:
audio_settings = {'always_persist_enabled': settings['always_persist_enabled']}
if not save_config_section('backend.audio', audio_settings):
success = False

# Save transcription settings if use_provider_segments is provided
if 'use_provider_segments' in settings:
transcription_settings = {'use_provider_segments': settings['use_provider_segments']}
if not save_config_section('backend.transcription', transcription_settings):
success = False

return success
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ async def get_conversation(conversation_id: str, user: User):
"deleted": conversation.deleted,
"deletion_reason": conversation.deletion_reason,
"deleted_at": conversation.deleted_at.isoformat() if conversation.deleted_at else None,
"processing_status": conversation.processing_status,
"always_persist": conversation.always_persist,
"end_reason": conversation.end_reason.value if conversation.end_reason else None,
"completed_at": conversation.completed_at.isoformat() if conversation.completed_at else None,
"title": conversation.title,
Expand All @@ -133,6 +135,8 @@ async def get_conversation(conversation_id: str, user: User):
"active_memory_version": conversation.active_memory_version,
"transcript_version_count": conversation.transcript_version_count,
"memory_version_count": conversation.memory_version_count,
"active_transcript_version_number": conversation.active_transcript_version_number,
"active_memory_version_number": conversation.active_memory_version_number,
}

return {"conversation": response}
Expand Down Expand Up @@ -182,6 +186,8 @@ async def get_conversations(user: User, include_deleted: bool = False):
"deleted": conv.deleted,
"deletion_reason": conv.deletion_reason,
"deleted_at": conv.deleted_at.isoformat() if conv.deleted_at else None,
"processing_status": conv.processing_status,
"always_persist": conv.always_persist,
"title": conv.title,
"summary": conv.summary,
"detailed_summary": conv.detailed_summary,
Expand All @@ -193,6 +199,8 @@ async def get_conversations(user: User, include_deleted: bool = False):
"memory_count": conv.memory_count,
"transcript_version_count": conv.transcript_version_count,
"memory_version_count": conv.memory_version_count,
"active_transcript_version_number": conv.active_transcript_version_number,
"active_memory_version_number": conv.active_memory_version_number,
})

return {"conversations": conversations}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ def start_streaming_jobs(
Returns:
Dict with job IDs: {'speech_detection': job_id, 'audio_persistence': job_id}

Note: user_email is fetched from the database when needed.
Note:
- user_email is fetched from the database when needed.
- always_persist setting is read from global config by the audio persistence job.
"""
from advanced_omi_backend.workers.transcription_jobs import stream_speech_detection_job
from advanced_omi_backend.workers.audio_jobs import audio_streaming_persistence_job
Expand Down Expand Up @@ -381,6 +383,7 @@ def start_streaming_jobs(
# Enqueue audio persistence job on dedicated audio queue
# NOTE: This job handles file rotation for multiple conversations automatically
# Runs for entire session, not tied to individual conversations
# The job reads always_persist_enabled from global config internally
audio_job = audio_queue.enqueue(
audio_streaming_persistence_job,
session_id,
Expand Down
Loading
Loading