Skip to content

Commit 8898b43

Browse files
committed
Refactor audio processing and conversation management for improved deduplication and tracking
1 parent 1d63565 commit 8898b43

20 files changed

+204
-300
lines changed

backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,14 @@ async def upload_and_process_audio_files(
8585
content = await file.read()
8686

8787

88-
# Generate audio UUID and timestamp
88+
# Track external source for deduplication (Google Drive, etc.)
89+
external_source_id = None
90+
external_source_type = None
8991
if source == "gdrive":
90-
audio_uuid = getattr(file, "audio_uuid", None)
91-
if not audio_uuid:
92-
audio_logger.error(f"Missing audio_uuid for gdrive file: {file.filename}")
93-
audio_uuid = str(uuid.uuid4())
94-
else:
95-
audio_uuid = str(uuid.uuid4())
92+
external_source_id = getattr(file, "file_id", None) or getattr(file, "audio_uuid", None)
93+
external_source_type = "gdrive"
94+
if not external_source_id:
95+
audio_logger.warning(f"Missing file_id for gdrive file: {file.filename}")
9696
timestamp = int(time.time() * 1000)
9797

9898
# Validate and prepare audio (read format from WAV file)
@@ -121,11 +121,12 @@ async def upload_and_process_audio_files(
121121
title = file.filename.rsplit('.', 1)[0][:50] if file.filename else "Uploaded Audio"
122122

123123
conversation = create_conversation(
124-
audio_uuid=audio_uuid,
125124
user_id=user.user_id,
126125
client_id=client_id,
127126
title=title,
128-
summary="Processing uploaded audio file..."
127+
summary="Processing uploaded audio file...",
128+
external_source_id=external_source_id,
129+
external_source_type=external_source_type,
129130
)
130131
await conversation.insert()
131132
conversation_id = conversation.conversation_id # Get the auto-generated ID
@@ -184,22 +185,20 @@ async def upload_and_process_audio_files(
184185
transcription_job = transcription_queue.enqueue(
185186
transcribe_full_audio_job,
186187
conversation_id,
187-
audio_uuid,
188188
version_id,
189189
"batch", # trigger
190190
job_timeout=1800, # 30 minutes
191191
result_ttl=JOB_RESULT_TTL,
192192
job_id=transcribe_job_id,
193193
description=f"Transcribe uploaded file {conversation_id[:8]}",
194-
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id}
194+
meta={'conversation_id': conversation_id, 'client_id': client_id}
195195
)
196196

197197
audio_logger.info(f"📥 Enqueued transcription job {transcription_job.id} for uploaded file")
198198

199199
# Enqueue post-conversation processing job chain (depends on transcription)
200200
job_ids = start_post_conversation_jobs(
201201
conversation_id=conversation_id,
202-
audio_uuid=audio_uuid,
203202
user_id=user.user_id,
204203
transcript_version_id=version_id, # Pass the version_id from transcription job
205204
depends_on_job=transcription_job, # Wait for transcription to complete
@@ -209,7 +208,6 @@ async def upload_and_process_audio_files(
209208
processed_files.append({
210209
"filename": file.filename,
211210
"status": "processing",
212-
"audio_uuid": audio_uuid,
213211
"conversation_id": conversation_id,
214212
"transcript_job_id": transcription_job.id,
215213
"speaker_job_id": job_ids['speaker_recognition'],

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,8 @@ async def get_conversation(conversation_id: str, user: User):
109109
# Build response with explicit curated fields
110110
response = {
111111
"conversation_id": conversation.conversation_id,
112-
"audio_uuid": conversation.audio_uuid,
113112
"user_id": conversation.user_id,
114113
"client_id": conversation.client_id,
115-
"audio_path": conversation.audio_path,
116114
"audio_chunks_count": conversation.audio_chunks_count,
117115
"audio_total_duration": conversation.audio_total_duration,
118116
"audio_compression_ratio": conversation.audio_compression_ratio,
@@ -175,10 +173,8 @@ async def get_conversations(user: User, include_deleted: bool = False):
175173
for conv in user_conversations:
176174
conversations.append({
177175
"conversation_id": conv.conversation_id,
178-
"audio_uuid": conv.audio_uuid,
179176
"user_id": conv.user_id,
180177
"client_id": conv.client_id,
181-
"audio_path": conv.audio_path,
182178
"audio_chunks_count": conv.audio_chunks_count,
183179
"audio_total_duration": conv.audio_total_duration,
184180
"audio_compression_ratio": conv.audio_compression_ratio,
@@ -248,7 +244,6 @@ async def _hard_delete_conversation(conversation: Conversation) -> JSONResponse:
248244
"""Permanently delete conversation and chunks (admin only)."""
249245
conversation_id = conversation.conversation_id
250246
client_id = conversation.client_id
251-
audio_uuid = conversation.audio_uuid
252247

253248
# Delete conversation document
254249
await conversation.delete()
@@ -268,8 +263,7 @@ async def _hard_delete_conversation(conversation: Conversation) -> JSONResponse:
268263
"message": f"Successfully permanently deleted conversation '{conversation_id}'",
269264
"deleted_chunks": deleted_chunks,
270265
"conversation_id": conversation_id,
271-
"client_id": client_id,
272-
"audio_uuid": audio_uuid
266+
"client_id": client_id
273267
}
274268
)
275269

@@ -411,8 +405,6 @@ async def reprocess_transcript(conversation_id: str, user: User):
411405
return JSONResponse(status_code=403, content={"error": "Access forbidden. You can only reprocess your own conversations."})
412406

413407
# Get audio_uuid from conversation
414-
audio_uuid = conversation_model.audio_uuid
415-
416408
# Validate audio chunks exist in MongoDB
417409
chunks = await AudioChunkDocument.find(
418410
AudioChunkDocument.conversation_id == conversation_id
@@ -439,14 +431,13 @@ async def reprocess_transcript(conversation_id: str, user: User):
439431
transcript_job = transcription_queue.enqueue(
440432
transcribe_full_audio_job,
441433
conversation_id,
442-
audio_uuid,
443434
version_id,
444435
"reprocess",
445436
job_timeout=600,
446437
result_ttl=JOB_RESULT_TTL,
447438
job_id=f"reprocess_{conversation_id[:8]}",
448439
description=f"Transcribe audio for {conversation_id[:8]}",
449-
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
440+
meta={'conversation_id': conversation_id}
450441
)
451442
logger.info(f"📥 RQ: Enqueued transcription job {transcript_job.id}")
452443

@@ -468,7 +459,7 @@ async def reprocess_transcript(conversation_id: str, user: User):
468459
result_ttl=JOB_RESULT_TTL,
469460
job_id=f"speaker_{conversation_id[:8]}",
470461
description=f"Recognize speakers for {conversation_id[:8]}",
471-
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
462+
meta={'conversation_id': conversation_id}
472463
)
473464
speaker_dependency = speaker_job # Chain for next job
474465
logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id} (depends on {transcript_job.id})")
@@ -486,7 +477,7 @@ async def reprocess_transcript(conversation_id: str, user: User):
486477
result_ttl=JOB_RESULT_TTL,
487478
job_id=f"memory_{conversation_id[:8]}",
488479
description=f"Extract memories for {conversation_id[:8]}",
489-
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
480+
meta={'conversation_id': conversation_id}
490481
)
491482
if speaker_job:
492483
logger.info(f"📥 RQ: Enqueued memory job {memory_job.id} (depends on speaker job {speaker_job.id})")

backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -211,15 +211,15 @@ def get_jobs(
211211
}
212212

213213

214-
def all_jobs_complete_for_session(session_id: str) -> bool:
214+
def all_jobs_complete_for_client(client_id: str) -> bool:
215215
"""
216-
Check if all jobs associated with a session are in terminal states.
216+
Check if all jobs associated with a client are in terminal states.
217217
218-
Only checks jobs with audio_uuid in job.meta (no backward compatibility).
218+
Checks jobs with client_id in job.meta.
219219
Traverses dependency chains to include dependent jobs.
220220
221221
Args:
222-
session_id: The audio_uuid (session ID) to check jobs for
222+
client_id: The client device identifier to check jobs for
223223
224224
Returns:
225225
True if all jobs are complete (or no jobs found), False if any job is still processing
@@ -248,7 +248,7 @@ def is_job_complete(job):
248248

249249
return True
250250

251-
# Find all jobs for this session
251+
# Find all jobs for this client
252252
all_queues = [transcription_queue, memory_queue, audio_queue, default_queue]
253253
for queue in all_queues:
254254
registries = [
@@ -266,8 +266,8 @@ def is_job_complete(job):
266266
try:
267267
job = Job.fetch(job_id, connection=redis_conn)
268268

269-
# Only check jobs with audio_uuid in meta
270-
if job.meta and job.meta.get('audio_uuid') == session_id:
269+
# Only check jobs with client_id in meta
270+
if job.meta and job.meta.get('client_id') == client_id:
271271
if not is_job_complete(job):
272272
return False
273273
except Exception as e:
@@ -289,7 +289,7 @@ def start_streaming_jobs(
289289
2. Audio persistence job - writes audio chunks to WAV file (file rotation per conversation)
290290
291291
Args:
292-
session_id: Stream session ID (audio_uuid)
292+
session_id: Stream session ID (equals client_id for streaming)
293293
user_id: User identifier
294294
client_id: Client identifier
295295
@@ -313,7 +313,7 @@ def start_streaming_jobs(
313313
failure_ttl=86400, # Cleanup failed jobs after 24h
314314
job_id=f"speech-detect_{session_id[:12]}",
315315
description=f"Listening for speech...",
316-
meta={'audio_uuid': session_id, 'client_id': client_id, 'session_level': True}
316+
meta={'client_id': client_id, 'session_level': True}
317317
)
318318
# Log job enqueue with TTL information for debugging
319319
actual_ttl = redis_conn.ttl(f"rq:job:{speech_job.id}")
@@ -346,7 +346,7 @@ def start_streaming_jobs(
346346
failure_ttl=86400, # Cleanup failed jobs after 24h
347347
job_id=f"audio-persist_{session_id[:12]}",
348348
description=f"Audio persistence for session {session_id[:12]}",
349-
meta={'audio_uuid': session_id, 'session_level': True} # Mark as session-level job
349+
meta={'client_id': client_id, 'session_level': True} # Mark as session-level job
350350
)
351351
# Log job enqueue with TTL information for debugging
352352
actual_ttl = redis_conn.ttl(f"rq:job:{audio_job.id}")
@@ -366,7 +366,6 @@ def start_streaming_jobs(
366366

367367
def start_post_conversation_jobs(
368368
conversation_id: str,
369-
audio_uuid: str,
370369
user_id: str,
371370
transcript_version_id: Optional[str] = None,
372371
depends_on_job = None,
@@ -386,7 +385,6 @@ def start_post_conversation_jobs(
386385
387386
Args:
388387
conversation_id: Conversation identifier
389-
audio_uuid: Audio UUID for job tracking
390388
user_id: User identifier
391389
transcript_version_id: Transcript version ID (auto-generated if None)
392390
depends_on_job: Optional job dependency for first job (e.g., transcription for file uploads)
@@ -402,7 +400,7 @@ def start_post_conversation_jobs(
402400
version_id = transcript_version_id or str(uuid.uuid4())
403401

404402
# Build job metadata (include client_id if provided for UI tracking)
405-
job_meta = {'audio_uuid': audio_uuid, 'conversation_id': conversation_id}
403+
job_meta = {'conversation_id': conversation_id}
406404
if client_id:
407405
job_meta['client_id'] = client_id
408406

@@ -416,7 +414,7 @@ def start_post_conversation_jobs(
416414

417415
if speaker_enabled:
418416
speaker_job_id = f"speaker_{conversation_id[:12]}"
419-
logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")
417+
logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}")
420418

421419
speaker_job = transcription_queue.enqueue(
422420
recognise_speakers_job,
@@ -440,7 +438,7 @@ def start_post_conversation_jobs(
440438
# Step 2: Memory extraction job
441439
# Depends on speaker job if it was created, otherwise depends on upstream (transcription or nothing)
442440
memory_job_id = f"memory_{conversation_id[:12]}"
443-
logger.info(f"🔍 DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")
441+
logger.info(f"🔍 DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}")
444442

445443
memory_job = memory_queue.enqueue(
446444
process_memory_job,
@@ -462,7 +460,7 @@ def start_post_conversation_jobs(
462460
# Step 3: Title/summary generation job
463461
# Depends on speaker job if enabled, otherwise on upstream dependency
464462
title_job_id = f"title_summary_{conversation_id[:12]}"
465-
logger.info(f"🔍 DEBUG: Creating title/summary job with job_id={title_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")
463+
logger.info(f"🔍 DEBUG: Creating title/summary job with job_id={title_job_id}, conversation_id={conversation_id[:12]}")
466464

467465
title_summary_job = default_queue.enqueue(
468466
generate_title_summary_job,
@@ -484,14 +482,13 @@ def start_post_conversation_jobs(
484482
# Step 5: Dispatch conversation.complete event (runs after both memory and title/summary complete)
485483
# This ensures plugins receive the event after all processing is done
486484
event_job_id = f"event_complete_{conversation_id[:12]}"
487-
logger.info(f"🔍 DEBUG: Creating conversation complete event job with job_id={event_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")
485+
logger.info(f"🔍 DEBUG: Creating conversation complete event job with job_id={event_job_id}, conversation_id={conversation_id[:12]}")
488486

489487
# Event job depends on both memory and title/summary jobs completing
490488
# Use RQ's depends_on list to wait for both
491489
event_dispatch_job = default_queue.enqueue(
492490
dispatch_conversation_complete_event_job,
493491
conversation_id,
494-
audio_uuid,
495492
client_id or "",
496493
user_id,
497494
job_timeout=120, # 2 minutes

backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -838,8 +838,7 @@ async def _process_batch_audio_complete(
838838
f"📦 Batch mode: Combined {len(client_state.batch_audio_chunks)} chunks into {len(complete_audio)} bytes"
839839
)
840840

841-
# Generate audio UUID and timestamp
842-
audio_uuid = str(uuid.uuid4())
841+
# Timestamp for logging
843842
timestamp = int(time.time() * 1000)
844843

845844
# Get audio format from batch metadata (set during audio-start)
@@ -859,7 +858,6 @@ async def _process_batch_audio_complete(
859858
version_id = str(uuid.uuid4())
860859

861860
conversation = create_conversation(
862-
audio_uuid=audio_uuid,
863861
user_id=user_id,
864862
client_id=client_id,
865863
title="Batch Recording",
@@ -904,22 +902,20 @@ async def _process_batch_audio_complete(
904902
transcription_job = transcription_queue.enqueue(
905903
transcribe_full_audio_job,
906904
conversation_id,
907-
audio_uuid,
908905
version_id,
909906
"batch", # trigger
910907
job_timeout=1800, # 30 minutes
911908
result_ttl=JOB_RESULT_TTL,
912909
job_id=transcribe_job_id,
913910
description=f"Transcribe batch audio {conversation_id[:8]}",
914-
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id}
911+
meta={'conversation_id': conversation_id, 'client_id': client_id}
915912
)
916913

917914
application_logger.info(f"📥 Batch mode: Enqueued transcription job {transcription_job.id}")
918915

919916
# Enqueue post-conversation processing job chain (depends on transcription)
920917
job_ids = start_post_conversation_jobs(
921918
conversation_id=conversation_id,
922-
audio_uuid=audio_uuid,
923919
user_id=None, # Will be read from conversation in DB by jobs
924920
depends_on_job=transcription_job, # Wait for transcription to complete
925921
client_id=client_id # Pass client_id for UI tracking

0 commit comments

Comments
 (0)