Skip to content

Commit e2c331b

Browse files
Enhance StreamingTranscriptionConsumer and conversation job handling (#282)
- Removed cumulative audio offset tracking from StreamingTranscriptionConsumer as Deepgram provides cumulative timestamps directly. - Updated store_final_result method to utilize Deepgram's cumulative timestamps without adjustments. - Implemented completion signaling for transcription sessions in Redis, ensuring conversation jobs wait for all results before processing. - Improved error handling to signal completion even in case of errors, preventing conversation jobs from hanging. - Enhanced logging for better visibility of transcription completion and error states.
1 parent 1201e0e commit e2c331b

File tree

2 files changed

+48
-48
lines changed

2 files changed

+48
-48
lines changed

backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py

Lines changed: 24 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ async def start_session_stream(self, session_id: str, sample_rate: int = 16000):
125125
self.active_sessions[session_id] = {
126126
"last_activity": time.time(),
127127
"sample_rate": sample_rate,
128-
"audio_offset_seconds": 0.0 # Track cumulative audio duration for timestamp adjustment
129128
}
130129

131130
logger.info(f"🎙️ Started Deepgram WebSocket stream for session: {session_id}")
@@ -164,10 +163,22 @@ async def end_session_stream(self, session_id: str):
164163
await self.trigger_plugins(session_id, final_result)
165164

166165
self.active_sessions.pop(session_id, None)
167-
logger.info(f"🛑 Ended Deepgram WebSocket stream for session: {session_id}")
166+
167+
# Signal that streaming transcription is complete for this session
168+
# This allows conversation_jobs to wait for all results before reading transcript
169+
completion_key = f"transcription:complete:{session_id}"
170+
await self.redis_client.set(completion_key, "1", ex=300) # 5 min TTL
171+
logger.info(f"✅ Streaming transcription complete for {session_id} (signal set)")
168172

169173
except Exception as e:
170174
logger.error(f"Error ending stream for {session_id}: {e}", exc_info=True)
175+
# Still signal completion even on error so conversation job doesn't hang
176+
try:
177+
completion_key = f"transcription:complete:{session_id}"
178+
await self.redis_client.set(completion_key, "error", ex=300)
179+
logger.warning(f"⚠️ Set error completion signal for {session_id}")
180+
except Exception:
181+
pass # Best effort
171182

172183
async def process_audio_chunk(self, session_id: str, audio_chunk: bytes, chunk_id: str):
173184
"""
@@ -250,11 +261,11 @@ async def publish_to_client(self, session_id: str, result: Dict, is_final: bool)
250261

251262
async def store_final_result(self, session_id: str, result: Dict, chunk_id: str = None):
252263
"""
253-
Store final transcription result to Redis Stream with cumulative timestamp adjustment.
264+
Store final transcription result to Redis Stream.
254265
255-
Transcription providers return word timestamps that reset to 0 for each chunk.
256-
We maintain a running audio_offset_seconds to make timestamps cumulative across
257-
the session, enabling accurate speech duration calculation for speech detection.
266+
Note: Deepgram streaming WebSocket maintains state and returns cumulative
267+
timestamps from the start of the stream. No offset adjustment is needed.
268+
Previous code incorrectly assumed per-chunk timestamps starting at 0.
258269
259270
Args:
260271
session_id: Session ID
@@ -264,38 +275,9 @@ async def store_final_result(self, session_id: str, result: Dict, chunk_id: str
264275
try:
265276
stream_name = f"transcription:results:{session_id}"
266277

267-
# Get cumulative audio offset for this session
268-
audio_offset = 0.0
269-
chunk_duration = 0.0
270-
if session_id in self.active_sessions:
271-
audio_offset = self.active_sessions[session_id].get("audio_offset_seconds", 0.0)
272-
273-
# Adjust word timestamps by cumulative offset
278+
# Get words and segments directly - Deepgram returns cumulative timestamps
274279
words = result.get("words", [])
275-
adjusted_words = []
276-
if words:
277-
for word in words:
278-
adjusted_word = word.copy()
279-
adjusted_word["start"] = word.get("start", 0.0) + audio_offset
280-
adjusted_word["end"] = word.get("end", 0.0) + audio_offset
281-
adjusted_words.append(adjusted_word)
282-
283-
# Calculate chunk duration from last word's end time
284-
if adjusted_words:
285-
last_word_end = words[-1].get("end", 0.0) # Use unadjusted for duration calc
286-
chunk_duration = last_word_end
287-
288-
logger.debug(f"➡️ [STREAMING] Adjusted {len(adjusted_words)} words by +{audio_offset:.1f}s (chunk_duration={chunk_duration:.1f}s)")
289-
290-
# Adjust segment timestamps too
291280
segments = result.get("segments", [])
292-
adjusted_segments = []
293-
if segments:
294-
for seg in segments:
295-
adjusted_seg = seg.copy()
296-
adjusted_seg["start"] = seg.get("start", 0.0) + audio_offset
297-
adjusted_seg["end"] = seg.get("end", 0.0) + audio_offset
298-
adjusted_segments.append(adjusted_seg)
299281

300282
# Prepare result entry - MUST match aggregator's expected schema
301283
# All keys and values must be bytes to match consumer.py format
@@ -308,23 +290,17 @@ async def store_final_result(self, session_id: str, result: Dict, chunk_id: str
308290
b"timestamp": str(time.time()).encode(),
309291
}
310292

311-
# Add adjusted JSON fields
312-
if adjusted_words:
313-
entry[b"words"] = json.dumps(adjusted_words).encode()
293+
# Add words and segments directly (already have cumulative timestamps from Deepgram)
294+
if words:
295+
entry[b"words"] = json.dumps(words).encode()
314296

315-
if adjusted_segments:
316-
entry[b"segments"] = json.dumps(adjusted_segments).encode()
297+
if segments:
298+
entry[b"segments"] = json.dumps(segments).encode()
317299

318300
# Write to Redis Stream
319301
await self.redis_client.xadd(stream_name, entry)
320302

321-
# Update cumulative offset for next chunk
322-
if session_id in self.active_sessions and chunk_duration > 0:
323-
self.active_sessions[session_id]["audio_offset_seconds"] += chunk_duration
324-
new_offset = self.active_sessions[session_id]["audio_offset_seconds"]
325-
logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}... (offset: {audio_offset:.1f}s → {new_offset:.1f}s)")
326-
else:
327-
logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}...")
303+
logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}... ({len(words)} words)")
328304

329305
except Exception as e:
330306
logger.error(f"Error storing final result for {session_id}: {e}", exc_info=True)

backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,30 @@ async def open_conversation_job(
593593
# to avoid false negatives from aggregated results lacking proper word-level data
594594
logger.info("✅ Conversation has meaningful speech (validated during streaming), proceeding with post-processing")
595595

596+
# Wait for streaming transcription consumer to complete before reading transcript
597+
# This fixes the race condition where conversation job reads transcript before
598+
# streaming consumer stores all final results (seen as 24+ second delay in logs)
599+
completion_key = f"transcription:complete:{session_id}"
600+
max_wait_streaming = 30 # seconds
601+
waited_streaming = 0.0
602+
while waited_streaming < max_wait_streaming:
603+
completion_status = await redis_client.get(completion_key)
604+
if completion_status:
605+
status_str = completion_status.decode() if isinstance(completion_status, bytes) else completion_status
606+
if status_str == "error":
607+
logger.warning(f"⚠️ Streaming transcription ended with error for {session_id}, proceeding anyway")
608+
else:
609+
logger.info(f"✅ Streaming transcription confirmed complete for {session_id}")
610+
break
611+
await asyncio.sleep(0.5)
612+
waited_streaming += 0.5
613+
614+
if waited_streaming >= max_wait_streaming:
615+
logger.warning(
616+
f"⚠️ Timed out waiting for streaming completion signal for {session_id} "
617+
f"(waited {max_wait_streaming}s), proceeding with available transcript"
618+
)
619+
596620
# Wait for audio_streaming_persistence_job to complete and write MongoDB chunks
597621
from advanced_omi_backend.utils.audio_chunk_utils import wait_for_audio_chunks
598622

0 commit comments

Comments
 (0)