Skip to content

Commit 42eb911

Browse files
committed
Refactor audio processing and enhance error handling
- Updated `worker_orchestrator.py` to use `logger.exception` for improved error logging. - Changed default MongoDB database name from "friend-lite" to "chronicle" in multiple files for consistency. - Added a new method `close_stream_without_stop` in `audio_stream_client.py` to handle abrupt WebSocket disconnections. - Enhanced audio validation in `audio_utils.py` to support automatic resampling of audio data if sample rates do not match. - Improved logging in various modules to provide clearer insights during audio processing and event dispatching.
1 parent 8b4d783 commit 42eb911

File tree

27 files changed

+601
-142
lines changed

27 files changed

+601
-142
lines changed

backends/advanced/src/advanced_omi_backend/app_config.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ class AppConfig:
2929
def __init__(self):
3030
# MongoDB Configuration
3131
self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017")
32-
# default to legacy value to avoid breaking peoples .env
33-
self.mongodb_database = os.getenv("MONGODB_DATABASE", "friend-lite")
32+
self.mongodb_database = os.getenv("MONGODB_DATABASE", "chronicle")
3433
self.mongo_client = AsyncIOMotorClient(self.mongodb_uri)
3534
self.db = self.mongo_client.get_default_database(self.mongodb_database)
3635
self.users_col = self.db["users"]

backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,39 @@ async def _stop():
555555
logger.info(f"Stream {stream_id} stopped, sent {total_chunks} chunks")
556556
return total_chunks
557557

558+
def close_stream_without_stop(self, stream_id: str) -> int:
559+
"""Close WebSocket connection without sending audio-stop event.
560+
561+
This simulates abrupt disconnection (network failure, client crash)
562+
and should trigger websocket_disconnect end_reason.
563+
564+
Args:
565+
stream_id: Stream session ID
566+
567+
Returns:
568+
Total chunks sent during this session
569+
"""
570+
session = self._sessions.get(stream_id)
571+
if not session:
572+
raise ValueError(f"Unknown stream_id: {stream_id}")
573+
574+
async def _close_abruptly():
575+
# Just close the connection without audio-stop
576+
await session.client.close()
577+
578+
future = asyncio.run_coroutine_threadsafe(_close_abruptly(), session.loop)
579+
future.result(timeout=10)
580+
581+
# Stop the event loop
582+
session.loop.call_soon_threadsafe(session.loop.stop)
583+
session.thread.join(timeout=5)
584+
585+
total_chunks = session.chunk_count
586+
del self._sessions[stream_id]
587+
588+
logger.info(f"Stream {stream_id} closed abruptly (no audio-stop), sent {total_chunks} chunks")
589+
return total_chunks
590+
558591
def get_session(self, stream_id: str) -> Optional[StreamSession]:
559592
"""Get session info for a stream."""
560593
return self._sessions.get(stream_id)

backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ async def upload_and_process_audio_files(
9494
audio_data, sample_rate, sample_width, channels, duration = await validate_and_prepare_audio(
9595
audio_data=content,
9696
expected_sample_rate=16000, # Expecting 16kHz
97-
convert_to_mono=True # Convert stereo to mono
97+
convert_to_mono=True, # Convert stereo to mono
98+
auto_resample=True # Auto-resample if sample rate doesn't match
9899
)
99100
except AudioValidationError as e:
100101
processed_files.append({

backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ def start_post_conversation_jobs(
537537
conversation_id,
538538
client_id or "",
539539
user_id,
540+
"file_upload", # Explicit end_reason for file upload processing
540541
job_timeout=120, # 2 minutes
541542
result_ttl=JOB_RESULT_TTL,
542543
depends_on=[memory_job, title_summary_job], # Wait for both parallel jobs

backends/advanced/src/advanced_omi_backend/controllers/session_controller.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,13 @@ async def mark_session_complete(
5656
await mark_session_complete(redis, session_id, "all_jobs_complete")
5757
"""
5858
session_key = f"audio:session:{session_id}"
59+
mark_time = time.time()
5960
await redis_client.hset(session_key, mapping={
6061
"status": "finished",
61-
"completed_at": str(time.time()),
62+
"completed_at": str(mark_time),
6263
"completion_reason": reason
6364
})
64-
logger.info(f"✅ Session {session_id[:12]} marked finished: {reason}")
65+
logger.info(f"✅ Session {session_id[:12]} marked finished: {reason} [TIME: {mark_time:.3f}]")
6566

6667

6768
async def get_session_info(redis_client, session_id: str) -> Optional[Dict]:

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,10 @@ async def cleanup_client_state(client_id: str):
238238
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
239239
async_redis = redis.from_url(redis_url, decode_responses=False)
240240

241+
# Get audio stream producer for finalization
242+
from advanced_omi_backend.services.audio_stream.producer import get_audio_stream_producer
243+
audio_stream_producer = get_audio_stream_producer()
244+
241245
# Find all session keys for this client and mark them complete
242246
pattern = f"audio:session:*"
243247
cursor = 0
@@ -250,8 +254,18 @@ async def cleanup_client_state(client_id: str):
250254
# Check if this session belongs to this client
251255
client_id_bytes = await async_redis.hget(key, "client_id")
252256
if client_id_bytes and client_id_bytes.decode() == client_id:
253-
# Mark session as complete (WebSocket disconnected)
254257
session_id = key.decode().replace("audio:session:", "")
258+
259+
# Check session status
260+
status_bytes = await async_redis.hget(key, "status")
261+
status = status_bytes.decode() if status_bytes else None
262+
263+
# If session is still active, finalize it first (sets status + completion_reason atomically)
264+
if status in ["active", None]:
265+
logger.info(f"📊 Finalizing active session {session_id[:12]} due to WebSocket disconnect")
266+
await audio_stream_producer.finalize_session(session_id, completion_reason="websocket_disconnect")
267+
268+
# Mark session as complete (WebSocket disconnected)
255269
await mark_session_complete(async_redis, session_id, "websocket_disconnect")
256270
sessions_closed += 1
257271

@@ -485,8 +499,8 @@ async def _finalize_streaming_session(
485499
# Send end-of-session signal to workers
486500
await audio_stream_producer.send_session_end_signal(session_id)
487501

488-
# Mark session as finalizing
489-
await audio_stream_producer.finalize_session(session_id)
502+
# Mark session as finalizing with user_stopped reason (audio-stop event)
503+
await audio_stream_producer.finalize_session(session_id, completion_reason="user_stopped")
490504

491505
# NOTE: Finalize job disabled - open_conversation_job now handles everything
492506
# The open_conversation_job will:

backends/advanced/src/advanced_omi_backend/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# MongoDB Configuration
1616
MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017")
17-
MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "friend-lite")
17+
MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "chronicle")
1818

1919
mongo_client = AsyncIOMotorClient(
2020
MONGODB_URI,

backends/advanced/src/advanced_omi_backend/middleware/app_middleware.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class RequestLoggingMiddleware(BaseHTTPMiddleware):
6060
"/health",
6161
"/auth/health",
6262
"/readiness",
63+
"/api/queue/dashboard", # Auto-refresh endpoint, too noisy
6364
}
6465

6566
# Binary content types to exclude

backends/advanced/src/advanced_omi_backend/models/job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def _ensure_beanie_initialized():
4444
mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
4545

4646
# Create MongoDB client
47-
mongodb_database = os.getenv("MONGODB_DATABASE", "friend-lite")
47+
mongodb_database = os.getenv("MONGODB_DATABASE", "chronicle")
4848
client = AsyncIOMotorClient(mongodb_uri)
4949
try:
5050
database = client.get_default_database(mongodb_database)

backends/advanced/src/advanced_omi_backend/plugins/router.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,23 +122,37 @@ async def dispatch_event(
122122
Returns:
123123
List of plugin results
124124
"""
125+
# Add at start
126+
logger.info(f"🔌 ROUTER: Dispatching '{event}' event (user={user_id})")
127+
125128
results = []
126129

127130
# Get plugins subscribed to this event
128131
plugin_ids = self._plugins_by_event.get(event, [])
129132

133+
# Add subscription check
134+
if not plugin_ids:
135+
logger.warning(f"🔌 ROUTER: No plugins subscribed to event '{event}'")
136+
return results
137+
138+
logger.info(f"🔌 ROUTER: Found {len(plugin_ids)} subscribed plugin(s): {plugin_ids}")
139+
130140
for plugin_id in plugin_ids:
131141
plugin = self.plugins[plugin_id]
132142

133143
if not plugin.enabled:
144+
logger.info(f" ⊘ Skipping '{plugin_id}': disabled")
134145
continue
135146

136147
# Check execution condition (wake_word, etc.)
148+
logger.info(f" → Checking execution condition for '{plugin_id}'")
137149
if not await self._should_execute(plugin, data):
150+
logger.info(f" ⊘ Skipping '{plugin_id}': condition not met")
138151
continue
139152

140153
# Execute plugin
141154
try:
155+
logger.info(f" ▶ Executing '{plugin_id}' for event '{event}'")
142156
context = PluginContext(
143157
user_id=user_id,
144158
event=event,
@@ -149,15 +163,30 @@ async def dispatch_event(
149163
result = await self._execute_plugin(plugin, event, context)
150164

151165
if result:
166+
status_icon = "✓" if result.success else "✗"
167+
logger.info(
168+
f" {status_icon} Plugin '{plugin_id}' completed: "
169+
f"success={result.success}, message={result.message}"
170+
)
152171
results.append(result)
153172

154173
# If plugin says stop processing, break
155174
if not result.should_continue:
156-
logger.info(f"Plugin '{plugin_id}' stopped further processing")
175+
logger.info(f"Plugin '{plugin_id}' stopped further processing")
157176
break
158177

159178
except Exception as e:
160-
logger.error(f"Error executing plugin '{plugin_id}': {e}", exc_info=True)
179+
# CRITICAL: Log exception details
180+
logger.error(
181+
f" ✗ Plugin '{plugin_id}' FAILED with exception: {e}",
182+
exc_info=True
183+
)
184+
185+
# Add at end
186+
logger.info(
187+
f"🔌 ROUTER: Dispatch complete for '{event}': "
188+
f"{len(results)} plugin(s) executed successfully"
189+
)
161190

162191
return results
163192

0 commit comments

Comments
 (0)