Skip to content

Commit 0e73576

Browse files
committed
Implement miscellaneous settings management and enhance audio processing
- Introduced functions to retrieve and save miscellaneous settings, including `always_persist_enabled` and `use_provider_segments`, using OmegaConf. - Updated the system controller and routes to handle new endpoints for managing miscellaneous settings, ensuring admin access control. - Refactored audio processing jobs to read the `always_persist_enabled` setting from global configuration, improving audio persistence behavior. - Enhanced the web UI to allow administrators to view and modify miscellaneous settings, providing better control over audio processing features. - Added integration tests to verify the functionality of the new settings management, ensuring robust handling of audio persistence scenarios.
1 parent 98149f8 commit 0e73576

File tree

21 files changed

+884
-242
lines changed

21 files changed

+884
-242
lines changed

backends/advanced/src/advanced_omi_backend/config.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,56 @@ def get_audio_storage_settings() -> dict:
176176
Dict with audio_base_path, audio_chunks_path
177177
"""
178178
cfg = get_backend_config('audio_storage')
179-
return OmegaConf.to_container(cfg, resolve=True)
179+
return OmegaConf.to_container(cfg, resolve=True)
180+
181+
182+
# ============================================================================
183+
# Miscellaneous Settings (OmegaConf-based)
184+
# ============================================================================
185+
186+
def get_misc_settings() -> dict:
187+
"""
188+
Get miscellaneous configuration settings using OmegaConf.
189+
190+
Returns:
191+
Dict with always_persist_enabled and use_provider_segments
192+
"""
193+
# Get audio settings for always_persist_enabled
194+
audio_cfg = get_backend_config('audio')
195+
audio_settings = OmegaConf.to_container(audio_cfg, resolve=True) if audio_cfg else {}
196+
197+
# Get transcription settings for use_provider_segments
198+
transcription_cfg = get_backend_config('transcription')
199+
transcription_settings = OmegaConf.to_container(transcription_cfg, resolve=True) if transcription_cfg else {}
200+
201+
return {
202+
'always_persist_enabled': audio_settings.get('always_persist_enabled', False),
203+
'use_provider_segments': transcription_settings.get('use_provider_segments', False)
204+
}
205+
206+
207+
def save_misc_settings(settings: dict) -> bool:
208+
"""
209+
Save miscellaneous settings to config.yml using OmegaConf.
210+
211+
Args:
212+
settings: Dict with always_persist_enabled and/or use_provider_segments
213+
214+
Returns:
215+
True if saved successfully, False otherwise
216+
"""
217+
success = True
218+
219+
# Save audio settings if always_persist_enabled is provided
220+
if 'always_persist_enabled' in settings:
221+
audio_settings = {'always_persist_enabled': settings['always_persist_enabled']}
222+
if not save_config_section('backend.audio', audio_settings):
223+
success = False
224+
225+
# Save transcription settings if use_provider_segments is provided
226+
if 'use_provider_segments' in settings:
227+
transcription_settings = {'use_provider_segments': settings['use_provider_segments']}
228+
if not save_config_section('backend.transcription', transcription_settings):
229+
success = False
230+
231+
return success

backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,7 @@ def is_job_complete(job):
325325
def start_streaming_jobs(
326326
session_id: str,
327327
user_id: str,
328-
client_id: str,
329-
always_persist: bool = False
328+
client_id: str
330329
) -> Dict[str, str]:
331330
"""
332331
Enqueue jobs for streaming audio session (initial session setup).
@@ -339,12 +338,13 @@ def start_streaming_jobs(
339338
session_id: Stream session ID (equals client_id for streaming)
340339
user_id: User identifier
341340
client_id: Client identifier
342-
always_persist: Whether to create placeholder conversation immediately (default: False)
343341
344342
Returns:
345343
Dict with job IDs: {'speech_detection': job_id, 'audio_persistence': job_id}
346344
347-
Note: user_email is fetched from the database when needed.
345+
Note:
346+
- user_email is fetched from the database when needed.
347+
- always_persist setting is read from global config by the audio persistence job.
348348
"""
349349
from advanced_omi_backend.workers.transcription_jobs import stream_speech_detection_job
350350
from advanced_omi_backend.workers.audio_jobs import audio_streaming_persistence_job
@@ -383,12 +383,12 @@ def start_streaming_jobs(
383383
# Enqueue audio persistence job on dedicated audio queue
384384
# NOTE: This job handles file rotation for multiple conversations automatically
385385
# Runs for entire session, not tied to individual conversations
386+
# The job reads always_persist_enabled from global config internally
386387
audio_job = audio_queue.enqueue(
387388
audio_streaming_persistence_job,
388389
session_id,
389390
user_id,
390391
client_id,
391-
always_persist,
392392
job_timeout=86400, # 24 hours for all-day sessions
393393
ttl=None, # No pre-run expiry (job can wait indefinitely in queue)
394394
result_ttl=JOB_RESULT_TTL, # Cleanup AFTER completion

backends/advanced/src/advanced_omi_backend/controllers/system_controller.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from advanced_omi_backend.config import (
1717
get_diarization_settings as load_diarization_settings,
18+
get_misc_settings as load_misc_settings,
19+
save_misc_settings,
1820
)
1921
from advanced_omi_backend.config import (
2022
save_diarization_settings,
@@ -333,6 +335,68 @@ async def save_diarization_settings_controller(settings: dict):
333335
raise e
334336

335337

338+
async def get_misc_settings():
339+
"""Get current miscellaneous settings."""
340+
try:
341+
# Get settings using OmegaConf
342+
settings = load_misc_settings()
343+
return {
344+
"settings": settings,
345+
"status": "success"
346+
}
347+
except Exception as e:
348+
logger.exception("Error getting misc settings")
349+
raise e
350+
351+
352+
async def save_misc_settings_controller(settings: dict):
353+
"""Save miscellaneous settings."""
354+
try:
355+
# Validate settings
356+
valid_keys = {"always_persist_enabled", "use_provider_segments"}
357+
358+
# Filter to only valid keys
359+
filtered_settings = {}
360+
for key, value in settings.items():
361+
if key not in valid_keys:
362+
continue # Skip unknown keys
363+
364+
# Type validation
365+
if not isinstance(value, bool):
366+
raise HTTPException(status_code=400, detail=f"Invalid value for {key}: must be boolean")
367+
368+
filtered_settings[key] = value
369+
370+
# Reject if NO valid keys provided
371+
if not filtered_settings:
372+
raise HTTPException(status_code=400, detail="No valid misc settings provided")
373+
374+
# Save using OmegaConf
375+
if save_misc_settings(filtered_settings):
376+
# Get updated settings
377+
updated_settings = load_misc_settings()
378+
logger.info(f"Updated and saved misc settings: {filtered_settings}")
379+
380+
return {
381+
"message": "Miscellaneous settings saved successfully",
382+
"settings": updated_settings,
383+
"status": "success"
384+
}
385+
else:
386+
logger.warning("Settings save failed")
387+
return {
388+
"message": "Settings save failed",
389+
"settings": load_misc_settings(),
390+
"status": "error"
391+
}
392+
393+
except HTTPException:
394+
raise
395+
except Exception as e:
396+
logger.exception("Error saving misc settings")
397+
raise e
398+
399+
336400
async def get_cleanup_settings_controller(user: User) -> dict:
337401
"""
338402
Get current cleanup settings (admin only).

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -451,14 +451,10 @@ async def _initialize_streaming_session(
451451
# Enqueue streaming jobs (speech detection + audio persistence)
452452
from advanced_omi_backend.controllers.queue_controller import start_streaming_jobs
453453

454-
# Get always_persist flag from client state
455-
always_persist_flag = getattr(client_state, 'always_persist', False)
456-
457454
job_ids = start_streaming_jobs(
458455
session_id=client_state.stream_session_id,
459456
user_id=user_id,
460-
client_id=client_id,
461-
always_persist=always_persist_flag
457+
client_id=client_id
462458
)
463459

464460
# Store job IDs in Redis session (not in ClientState)
@@ -468,8 +464,8 @@ async def _initialize_streaming_session(
468464
audio_persistence_job_id=job_ids['audio_persistence']
469465
)
470466

471-
# Note: Placeholder conversation creation (if always_persist=True) is now handled
472-
# by the audio persistence job itself, making it self-sufficient.
467+
# Note: Placeholder conversation creation is handled by the audio persistence job,
468+
# which reads the always_persist_enabled setting from global config.
473469

474470
# Launch interim results subscriber if WebSocket provided
475471
subscriber_task = None
@@ -822,11 +818,11 @@ async def _handle_audio_session_start(
822818
websocket: Optional[WebSocket] = None
823819
) -> tuple[bool, str]:
824820
"""
825-
Handle audio-start event - validate mode, set recording mode, and extract always_persist flag.
821+
Handle audio-start event - validate mode and set recording mode.
826822
827823
Args:
828824
client_state: Client state object
829-
audio_format: Audio format dict with mode and always_persist
825+
audio_format: Audio format dict with mode
830826
client_id: Client ID
831827
websocket: Optional WebSocket connection (for WebUI error messages)
832828
@@ -836,16 +832,14 @@ async def _handle_audio_session_start(
836832
from advanced_omi_backend.services.transcription import is_transcription_available
837833

838834
recording_mode = audio_format.get("mode", "batch")
839-
always_persist = audio_format.get("always_persist", False)
840835

841836
application_logger.info(
842837
f"🔴 BACKEND: Received audio-start for {client_id} - "
843-
f"mode={recording_mode}, always_persist={always_persist}, full format={audio_format}"
838+
f"mode={recording_mode}, full format={audio_format}"
844839
)
845840

846841
# Store on client state for later use
847842
client_state.recording_mode = recording_mode
848-
client_state.always_persist = always_persist
849843

850844
# VALIDATION: Check if streaming mode is available
851845
if recording_mode == "streaming":
@@ -898,8 +892,7 @@ async def _handle_audio_session_start(
898892
f"Format: {audio_format.get('rate')}Hz, "
899893
f"{audio_format.get('width')}bytes, "
900894
f"{audio_format.get('channels')}ch, "
901-
f"Mode: {recording_mode}, "
902-
f"Always Persist: {always_persist}"
895+
f"Mode: {recording_mode}"
903896
)
904897

905898
return True, recording_mode # Switch to audio streaming mode
@@ -1358,7 +1351,7 @@ async def handle_pcm_websocket(
13581351
websocket=ws # Pass websocket for WebUI error display
13591352
)
13601353

1361-
# Initialize streaming session (for always_persist and job setup)
1354+
# Initialize streaming session
13621355
if recording_mode == "streaming":
13631356
application_logger.info(f"🔴 BACKEND: Initializing streaming session for {client_id}")
13641357
interim_subscriber_task = await _initialize_streaming_session(

backends/advanced/src/advanced_omi_backend/routers/modules/queue_routes.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,18 @@ async def get_job_status(
7272
logger.error(f"Failed to determine status for job {job_id}: {e}")
7373
raise HTTPException(status_code=500, detail=str(e))
7474

75-
return {
75+
response = {
7676
"job_id": job.id,
7777
"status": status
7878
}
7979

80+
# Include error information for failed jobs
81+
if status == "failed" and job.exc_info:
82+
response["error_message"] = str(job.exc_info)
83+
response["exc_info"] = str(job.exc_info)
84+
85+
return response
86+
8087
except HTTPException:
8188
# Re-raise HTTPException unchanged (e.g., 403 Forbidden)
8289
raise

backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,21 @@ async def save_diarization_settings(
6363
return await system_controller.save_diarization_settings_controller(settings)
6464

6565

66+
@router.get("/misc-settings")
67+
async def get_misc_settings(current_user: User = Depends(current_superuser)):
68+
"""Get miscellaneous configuration settings. Admin only."""
69+
return await system_controller.get_misc_settings()
70+
71+
72+
@router.post("/misc-settings")
73+
async def save_misc_settings(
74+
settings: dict,
75+
current_user: User = Depends(current_superuser)
76+
):
77+
"""Save miscellaneous configuration settings. Admin only."""
78+
return await system_controller.save_misc_settings_controller(settings)
79+
80+
6681
@router.get("/cleanup-settings")
6782
async def get_cleanup_settings(
6883
current_user: User = Depends(current_superuser)

backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ async def audio_streaming_persistence_job(
2626
session_id: str,
2727
user_id: str,
2828
client_id: str,
29-
always_persist: bool = False,
3029
*,
3130
redis_client=None
3231
) -> Dict[str, Any]:
@@ -42,15 +41,22 @@ async def audio_streaming_persistence_job(
4241
session_id: Stream session ID
4342
user_id: User ID
4443
client_id: Client ID
45-
always_persist: Whether to create placeholder conversation immediately (default: False)
4644
redis_client: Redis client (injected by decorator)
4745
4846
Returns:
4947
Dict with chunk_count, total_bytes, compressed_bytes, duration_seconds
5048
51-
Note: Replaces disk-based WAV file storage with MongoDB chunk storage.
49+
Note:
50+
- Replaces disk-based WAV file storage with MongoDB chunk storage.
51+
- Reads always_persist_enabled from global config to determine whether to
52+
create placeholder conversations immediately.
5253
"""
53-
logger.info(f"🎵 Starting MongoDB audio persistence for session {session_id}")
54+
# Read always_persist setting from global config
55+
from advanced_omi_backend.config import get_misc_settings
56+
misc_settings = get_misc_settings()
57+
always_persist = misc_settings.get('always_persist_enabled', False)
58+
59+
logger.info(f"🎵 Starting MongoDB audio persistence for session {session_id} (always_persist={always_persist})")
5460

5561
# Setup audio persistence consumer group (separate from transcription consumer)
5662
audio_stream_name = f"audio:stream:{client_id}"

backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,10 +691,12 @@ async def open_conversation_job(
691691
# Enqueue post-conversation processing pipeline (no batch transcription needed - using streaming transcript)
692692
client_id = conversation.client_id if conversation else None
693693

694+
# Enqueue post-conversation jobs directly (no fallback dependency in success case)
694695
job_ids = start_post_conversation_jobs(
695696
conversation_id=conversation_id,
696697
user_id=user_id,
697698
transcript_version_id=version_id, # Pass the streaming transcript version ID
699+
depends_on_job=None, # No dependency - streaming already succeeded
698700
client_id=client_id, # Pass client_id for UI tracking
699701
end_reason=end_reason # Pass the determined end_reason (websocket_disconnect, inactivity_timeout, etc.)
700702
)

0 commit comments

Comments
 (0)