Skip to content

Commit e662f46

Browse files
committed
Refactor audio processing to utilize MongoDB chunks and enhance job handling
- Removed audio file path parameters from various functions, transitioning to audio data retrieval from MongoDB chunks. - Updated the `start_post_conversation_jobs` function to reflect changes in audio handling, ensuring jobs reconstruct audio from database chunks. - Enhanced the `transcribe_full_audio_job` and `recognise_speakers_job` to process audio directly from memory, eliminating the need for temporary files. - Improved error handling and logging for audio data retrieval, ensuring better feedback during processing. - Added a new utility function for converting PCM data to WAV format in memory, streamlining audio format handling.
1 parent 32e2e47 commit e662f46

File tree

9 files changed

+121
-146
lines changed

9 files changed

+121
-146
lines changed

backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ async def upload_and_process_audio_files(
156156
job_ids = start_post_conversation_jobs(
157157
conversation_id=conversation_id,
158158
audio_uuid=audio_uuid,
159-
audio_file_path=None, # No file path - using MongoDB chunks
160159
user_id=user.user_id,
161160
post_transcription=True, # Run batch transcription for uploads
162161
client_id=client_id # Pass client_id for UI tracking

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import logging
66
import time
7+
from datetime import datetime
78
from pathlib import Path
89

910
from fastapi.responses import JSONResponse
@@ -382,34 +383,20 @@ async def reprocess_transcript(conversation_id: str, user: User):
382383
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
383384
return JSONResponse(status_code=403, content={"error": "Access forbidden. You can only reprocess your own conversations."})
384385

385-
# Get audio_uuid and file path from conversation
386+
# Get audio_uuid from conversation
386387
audio_uuid = conversation_model.audio_uuid
387-
audio_path = conversation_model.audio_path
388388

389-
if not audio_path:
390-
return JSONResponse(
391-
status_code=400, content={"error": "No audio file found for this conversation"}
392-
)
393-
394-
# Check if file exists - try multiple possible locations
395-
possible_paths = [
396-
Path("/app/audio_chunks") / audio_path,
397-
Path(audio_path), # fallback to relative path
398-
]
389+
# Validate audio chunks exist in MongoDB
390+
chunks = await AudioChunkDocument.find(
391+
AudioChunkDocument.conversation_id == conversation_id
392+
).to_list()
399393

400-
full_audio_path = None
401-
for path in possible_paths:
402-
if path.exists():
403-
full_audio_path = path
404-
break
405-
406-
if not full_audio_path:
394+
if not chunks:
407395
return JSONResponse(
408-
status_code=422,
396+
status_code=404,
409397
content={
410-
"error": "Audio file not found on disk",
411-
"details": f"Conversation exists but audio file '{audio_path}' is missing from expected locations",
412-
"searched_paths": [str(p) for p in possible_paths]
398+
"error": "No audio data found for this conversation",
399+
"details": f"Conversation '{conversation_id}' exists but has no audio chunks in MongoDB"
413400
}
414401
)
415402

@@ -430,12 +417,11 @@ async def reprocess_transcript(conversation_id: str, user: User):
430417
transcribe_full_audio_job,
431418
)
432419

433-
# Job 1: Transcribe audio to text
420+
# Job 1: Transcribe audio to text (reconstructs from MongoDB chunks)
434421
transcript_job = transcription_queue.enqueue(
435422
transcribe_full_audio_job,
436423
conversation_id,
437424
audio_uuid,
438-
str(full_audio_path),
439425
version_id,
440426
"reprocess",
441427
job_timeout=600,
@@ -446,14 +432,11 @@ async def reprocess_transcript(conversation_id: str, user: User):
446432
)
447433
logger.info(f"📥 RQ: Enqueued transcription job {transcript_job.id}")
448434

449-
# Job 2: Recognize speakers (depends on transcription)
435+
# Job 2: Recognize speakers (depends on transcription, reads data from DB)
450436
speaker_job = transcription_queue.enqueue(
451437
recognise_speakers_job,
452438
conversation_id,
453439
version_id,
454-
str(full_audio_path),
455-
"", # transcript_text - will be read from DB
456-
[], # words - will be read from DB
457440
depends_on=transcript_job,
458441
job_timeout=600,
459442
result_ttl=JOB_RESULT_TTL,

backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,6 @@ def start_streaming_jobs(
366366
def start_post_conversation_jobs(
367367
conversation_id: str,
368368
audio_uuid: str,
369-
audio_file_path: str,
370369
user_id: str,
371370
post_transcription: bool = True,
372371
transcript_version_id: Optional[str] = None,
@@ -382,15 +381,17 @@ def start_post_conversation_jobs(
382381
3. Memory extraction job - Extracts memories from conversation (parallel)
383382
4. Title/summary generation job - Generates title and summary (parallel)
384383
384+
Note: Audio is reconstructed from MongoDB chunks, not files.
385+
385386
Args:
386387
conversation_id: Conversation identifier
387388
audio_uuid: Audio UUID for job tracking
388-
audio_file_path: Path to audio file
389389
user_id: User identifier
390390
post_transcription: If True, run batch transcription step (for uploads)
391391
If False, skip transcription (streaming already has it)
392392
transcript_version_id: Transcript version ID (auto-generated if None)
393393
depends_on_job: Optional job dependency for first job
394+
client_id: Client ID for UI tracking
394395
395396
Returns:
396397
Dict with job IDs (transcription will be None if post_transcription=False)
@@ -416,7 +417,6 @@ def start_post_conversation_jobs(
416417
transcribe_full_audio_job,
417418
conversation_id,
418419
audio_uuid,
419-
audio_file_path,
420420
version_id,
421421
"batch", # trigger
422422
job_timeout=1800, # 30 minutes
@@ -439,9 +439,6 @@ def start_post_conversation_jobs(
439439
recognise_speakers_job,
440440
conversation_id,
441441
version_id,
442-
audio_file_path,
443-
"", # transcript_text - will be read from DB
444-
[], # words - will be read from DB
445442
job_timeout=1200, # 20 minutes
446443
result_ttl=JOB_RESULT_TTL,
447444
depends_on=speaker_depends_on,

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,6 @@ async def _process_batch_audio_complete(
896896
job_ids = start_post_conversation_jobs(
897897
conversation_id=conversation_id,
898898
audio_uuid=audio_uuid,
899-
audio_file_path=None, # No file path - using MongoDB chunks
900899
user_id=None, # Will be read from conversation in DB by jobs
901900
post_transcription=True, # Run batch transcription for uploads
902901
client_id=client_id # Pass client_id for UI tracking

backends/advanced/src/advanced_omi_backend/utils/audio_utils.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,49 @@ async def process_audio_chunk(
258258
client_state.update_audio_received(chunk)
259259

260260

261+
def pcm_to_wav_bytes(
262+
pcm_data: bytes,
263+
sample_rate: int = 16000,
264+
channels: int = 1,
265+
sample_width: int = 2
266+
) -> bytes:
267+
"""
268+
Convert raw PCM audio data to WAV format in memory.
269+
270+
Args:
271+
pcm_data: Raw PCM audio bytes
272+
sample_rate: Sample rate in Hz (default: 16000)
273+
channels: Number of audio channels (default: 1 for mono)
274+
sample_width: Sample width in bytes (default: 2 for 16-bit)
275+
276+
Returns:
277+
WAV file data as bytes
278+
"""
279+
import wave
280+
import io
281+
282+
logger.debug(
283+
f"Converting PCM to WAV in memory: {len(pcm_data)} bytes "
284+
f"(rate={sample_rate}, channels={channels}, width={sample_width})"
285+
)
286+
287+
# Use BytesIO to create WAV in memory
288+
wav_buffer = io.BytesIO()
289+
290+
with wave.open(wav_buffer, 'wb') as wav_file:
291+
wav_file.setnchannels(channels)
292+
wav_file.setsampwidth(sample_width)
293+
wav_file.setframerate(sample_rate)
294+
wav_file.writeframes(pcm_data)
295+
296+
# Get the WAV bytes
297+
wav_bytes = wav_buffer.getvalue()
298+
299+
logger.debug(f"Created WAV in memory: {len(wav_bytes)} bytes")
300+
301+
return wav_bytes
302+
303+
261304
def write_pcm_to_wav(
262305
pcm_data: bytes,
263306
output_path: str,

backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,6 @@ async def open_conversation_job(
527527
job_ids = start_post_conversation_jobs(
528528
conversation_id=conversation_id,
529529
audio_uuid=session_id,
530-
audio_file_path=file_path,
531530
user_id=user_id,
532531
post_transcription=True, # Run batch transcription for streaming audio
533532
client_id=client_id # Pass client_id for UI tracking

backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py

Lines changed: 38 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -121,26 +121,25 @@ async def check_enrolled_speakers_job(
121121
async def recognise_speakers_job(
122122
conversation_id: str,
123123
version_id: str,
124-
audio_path: str,
125-
transcript_text: str,
126-
words: list,
124+
transcript_text: str = "",
125+
words: list = None,
127126
*,
128127
redis_client=None
129128
) -> Dict[str, Any]:
130129
"""
131130
RQ job function for identifying speakers in a transcribed conversation.
132131
133132
This job runs after transcription and:
134-
1. Calls speaker recognition service to identify speakers
135-
2. Updates the transcript version with identified speaker labels
136-
3. Returns results for downstream jobs (memory)
133+
1. Reconstructs audio from MongoDB chunks
134+
2. Calls speaker recognition service to identify speakers
135+
3. Updates the transcript version with identified speaker labels
136+
4. Returns results for downstream jobs (memory)
137137
138138
Args:
139139
conversation_id: Conversation ID
140140
version_id: Transcript version ID to update
141-
audio_path: Path to audio file
142-
transcript_text: Transcript text from transcription job
143-
words: Word-level timing data from transcription job
141+
transcript_text: Transcript text from transcription job (optional, reads from DB if empty)
142+
words: Word-level timing data from transcription job (optional, reads from DB if empty)
144143
redis_client: Redis client (injected by decorator)
145144
146145
Returns:
@@ -186,77 +185,52 @@ async def recognise_speakers_job(
186185
}
187186

188187
# Reconstruct audio from MongoDB chunks
189-
import tempfile
190-
from pathlib import Path
191188
from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_wav_from_conversation
192189

193190
logger.info(f"📦 Reconstructing audio from MongoDB chunks for conversation {conversation_id}")
194191

195192
# Call speaker recognition service
196193
try:
197-
# Reconstruct WAV from MongoDB chunks
194+
# Reconstruct WAV from MongoDB chunks (already in memory as bytes)
198195
wav_data = await reconstruct_wav_from_conversation(conversation_id)
199196

200-
# Write to temporary file for speaker recognition service
201-
temp_wav_file = tempfile.NamedTemporaryFile(
202-
suffix=".wav",
203-
delete=False,
204-
prefix=f"speaker_recog_{conversation_id[:8]}_"
197+
logger.info(
198+
f"📦 Reconstructed audio from MongoDB chunks: "
199+
f"{len(wav_data) / 1024 / 1024:.2f} MB"
205200
)
206201

207-
try:
208-
temp_wav_file.write(wav_data)
209-
temp_wav_file.flush()
210-
temp_wav_path = temp_wav_file.name
211-
temp_wav_file.close()
202+
# Read transcript text and words from the transcript version
203+
# (Parameters may be empty if called via job dependency)
204+
actual_transcript_text = transcript_text or transcript_version.transcript or ""
205+
actual_words = words if words else []
212206

213-
logger.info(
214-
f"📁 Created temporary WAV file for speaker recognition: {temp_wav_path} "
215-
f"({len(wav_data) / 1024 / 1024:.2f} MB)"
216-
)
207+
# If words not provided, we need to get them from metadata
208+
if not actual_words and transcript_version.metadata:
209+
actual_words = transcript_version.metadata.get("words", [])
217210

218-
# Read transcript text and words from the transcript version
219-
# (Parameters may be empty if called via job dependency)
220-
actual_transcript_text = transcript_text or transcript_version.transcript or ""
221-
actual_words = words if words else []
222-
223-
# If words not provided, we need to get them from metadata
224-
if not actual_words and transcript_version.metadata:
225-
actual_words = transcript_version.metadata.get("words", [])
226-
227-
if not actual_transcript_text:
228-
logger.warning(f"🎤 No transcript text found in version {version_id}")
229-
# Clean up temp file before returning
230-
Path(temp_wav_path).unlink(missing_ok=True)
231-
return {
232-
"success": False,
233-
"conversation_id": conversation_id,
234-
"version_id": version_id,
235-
"error": "No transcript text available",
236-
"processing_time_seconds": 0
237-
}
238-
239-
transcript_data = {
240-
"text": actual_transcript_text,
241-
"words": actual_words
211+
if not actual_transcript_text:
212+
logger.warning(f"🎤 No transcript text found in version {version_id}")
213+
return {
214+
"success": False,
215+
"conversation_id": conversation_id,
216+
"version_id": version_id,
217+
"error": "No transcript text available",
218+
"processing_time_seconds": 0
242219
}
243220

244-
logger.info(f"🎤 Calling speaker recognition service...")
221+
transcript_data = {
222+
"text": actual_transcript_text,
223+
"words": actual_words
224+
}
245225

246-
# Call speaker service with temporary file path
247-
speaker_result = await speaker_client.diarize_identify_match(
248-
audio_path=temp_wav_path,
249-
transcript_data=transcript_data,
250-
user_id=user_id
251-
)
226+
logger.info(f"🎤 Calling speaker recognition service...")
252227

253-
finally:
254-
# Clean up temporary file
255-
try:
256-
Path(temp_wav_path).unlink(missing_ok=True)
257-
logger.debug(f"🧹 Deleted temporary WAV file: {temp_wav_path}")
258-
except Exception as cleanup_error:
259-
logger.warning(f"Failed to delete temporary file {temp_wav_path}: {cleanup_error}")
228+
# Call speaker service with in-memory audio data (no temp file needed!)
229+
speaker_result = await speaker_client.diarize_identify_match(
230+
audio_data=wav_data, # Pass bytes directly, no disk I/O
231+
transcript_data=transcript_data,
232+
user_id=user_id
233+
)
260234

261235
except ValueError as e:
262236
# No chunks found for conversation

0 commit comments

Comments
 (0)