Skip to content

Commit 0dfd900

Browse files
committed
Enhance conversation model to include word-level timestamps and improve transcript handling
- Added a new `words` field to the `Conversation` model for storing word-level timestamps. - Updated methods to handle word data during transcript version creation, ensuring compatibility with speaker recognition. - Refactored conversation job processing to utilize the new word structure, improving data integrity and access. - Enhanced speaker recognition job to read words from the new standardized location, ensuring backward compatibility with legacy data.
1 parent bd1cd84 commit 0dfd900

File tree

4 files changed

+84
-65
lines changed

4 files changed

+84
-65
lines changed

backends/advanced/src/advanced_omi_backend/models/conversation.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ class TranscriptVersion(BaseModel):
6363
"""Version of a transcript with processing metadata."""
6464
version_id: str = Field(description="Unique version identifier")
6565
transcript: Optional[str] = Field(None, description="Full transcript text")
66-
segments: List["Conversation.SpeakerSegment"] = Field(default_factory=list, description="Speaker segments")
66+
words: List["Conversation.Word"] = Field(
67+
default_factory=list,
68+
description="Word-level timestamps for entire transcript"
69+
)
70+
segments: List["Conversation.SpeakerSegment"] = Field(
71+
default_factory=list,
72+
description="Speaker segments (filled by speaker recognition)"
73+
)
6774
provider: Optional[str] = Field(None, description="Transcription provider used (deepgram, parakeet, etc.)")
6875
model: Optional[str] = Field(None, description="Model used (e.g., nova-3, parakeet)")
6976
created_at: datetime = Field(description="When this version was created")
@@ -253,8 +260,9 @@ def add_transcript_version(
253260
self,
254261
version_id: str,
255262
transcript: str,
256-
segments: List["Conversation.SpeakerSegment"],
257-
provider: str, # Provider name from config.yml (deepgram, parakeet, etc.)
263+
words: Optional[List["Conversation.Word"]] = None,
264+
segments: Optional[List["Conversation.SpeakerSegment"]] = None,
265+
provider: str = None, # Provider name from config.yml (deepgram, parakeet, etc.)
258266
model: Optional[str] = None,
259267
processing_time_seconds: Optional[float] = None,
260268
metadata: Optional[Dict[str, Any]] = None,
@@ -264,7 +272,8 @@ def add_transcript_version(
264272
new_version = Conversation.TranscriptVersion(
265273
version_id=version_id,
266274
transcript=transcript,
267-
segments=segments,
275+
words=words or [],
276+
segments=segments or [],
268277
provider=provider,
269278
model=model,
270279
created_at=datetime.now(),

backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py

Lines changed: 19 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -588,74 +588,39 @@ async def open_conversation_job(
588588
# Create transcript version from streaming results
589589
version_id = f"streaming_{session_id[:12]}"
590590
transcript_text = final_transcript.get("text", "")
591-
segments_data = final_transcript.get("segments", [])
592-
593-
# If streaming provider didn't provide segments (e.g., Deepgram streaming),
594-
# create segments from individual final results with word-level data
595-
if not segments_data:
596-
logger.info(f"📝 No segments in streaming results, creating from word-level data")
597-
results = await aggregator.get_session_results(session_id)
598-
599-
for result in results:
600-
words = result.get("words", [])
601-
text = result.get("text", "").strip()
602-
603-
# Skip empty results or results without timing data
604-
# WARNING: We don't support results without word-level timing data.
605-
# Ideally should error, but skipping for now to handle edge cases gracefully.
606-
if not words or not text:
607-
continue
608-
609-
# Create segment dict from this result chunk
610-
# Each "final" result becomes one segment with generic speaker label
611-
segment_dict = {
612-
"start": words[0]["start"],
613-
"end": words[-1]["end"],
614-
"text": text,
615-
"speaker": "SPEAKER_00", # Generic label, updated by speaker recognition
616-
"confidence": result.get("confidence"),
617-
"words": words # Already in correct format from aggregator
618-
}
619-
segments_data.append(segment_dict)
620-
621-
logger.info(f"✅ Created {len(segments_data)} segments from streaming results")
622-
623-
# Convert segments to SpeakerSegment format with word-level timestamps
624-
segments = [
625-
Conversation.SpeakerSegment(
626-
start=seg.get("start", 0.0),
627-
end=seg.get("end", 0.0),
628-
text=seg.get("text", ""),
629-
speaker=seg.get("speaker", "SPEAKER_00"),
630-
confidence=seg.get("confidence"),
631-
words=[
632-
Conversation.Word(
633-
word=w.get("word", ""),
634-
start=w.get("start", 0.0),
635-
end=w.get("end", 0.0),
636-
confidence=w.get("confidence")
637-
)
638-
for w in seg.get("words", [])
639-
]
591+
words_data = final_transcript.get("words", []) # All words from aggregator
592+
593+
# Convert words to Word objects
594+
words = [
595+
Conversation.Word(
596+
word=w.get("word", ""),
597+
start=w.get("start", 0.0),
598+
end=w.get("end", 0.0),
599+
confidence=w.get("confidence")
640600
)
641-
for seg in segments_data
601+
for w in words_data
642602
]
643603

604+
# Segments remain EMPTY until speaker recognition service processes them
605+
# Per Chronicle architecture: segments ONLY come from speaker service
606+
segments = []
607+
644608
# Determine provider from streaming results
645609
provider = final_transcript.get("provider", "deepgram")
646610

647-
# Add streaming transcript as the initial version
611+
# Add streaming transcript with words at version level
648612
conversation.add_transcript_version(
649613
version_id=version_id,
650614
transcript=transcript_text,
651-
segments=segments,
615+
words=words, # Store at version level
616+
segments=segments, # Empty - only speaker service creates segments
652617
provider=provider,
653618
model=provider, # Provider name as model
654619
processing_time_seconds=None, # Not applicable for streaming
655620
metadata={
656621
"source": "streaming",
657622
"chunk_count": final_transcript.get("chunk_count", 0),
658-
"word_count": len(final_transcript.get("words", []))
623+
"word_count": len(words),
659624
},
660625
set_as_active=True
661626
)
@@ -664,7 +629,7 @@ async def open_conversation_job(
664629
await conversation.save()
665630
logger.info(
666631
f"✅ Saved streaming transcript: {len(transcript_text)} chars, "
667-
f"{len(segments)} segments, {len(final_transcript.get('words', []))} words "
632+
f"{len(segments)} segments (empty until speaker recognition), {len(words)} words "
668633
f"for conversation {conversation_id[:12]}"
669634
)
670635

backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,36 @@ async def recognise_speakers_job(
207207
actual_transcript_text = transcript_text or transcript_version.transcript or ""
208208
actual_words = words if words else []
209209

210-
# If words not provided, read from transcript version metadata
211-
# (Transcription job stores words in metadata since segments are created by speaker service)
212-
if not actual_words and transcript_version.metadata:
210+
# If words not provided as parameter, read from version.words field (standardized location)
211+
if not actual_words and transcript_version.words:
212+
# Convert Word objects to dicts for speaker service API
213+
actual_words = [
214+
{
215+
"word": w.word,
216+
"start": w.start,
217+
"end": w.end,
218+
"confidence": w.confidence
219+
}
220+
for w in transcript_version.words
221+
]
222+
logger.info(f"🔤 Loaded {len(actual_words)} words from transcript version.words field")
223+
# Backward compatibility: Fall back to metadata if words field is empty (old data)
224+
elif not actual_words and transcript_version.metadata.get("words"):
213225
actual_words = transcript_version.metadata.get("words", [])
214-
logger.info(f"🔤 Loaded {len(actual_words)} words from transcript version metadata")
226+
logger.info(f"🔤 Loaded {len(actual_words)} words from transcript version metadata (legacy)")
227+
# Backward compatibility: Extract from segments if that's all we have (old streaming data)
228+
elif not actual_words and transcript_version.segments:
229+
for segment in transcript_version.segments:
230+
if segment.words:
231+
for w in segment.words:
232+
actual_words.append({
233+
"word": w.word,
234+
"start": w.start,
235+
"end": w.end,
236+
"confidence": w.confidence
237+
})
238+
if actual_words:
239+
logger.info(f"🔤 Extracted {len(actual_words)} words from segments (legacy)")
215240

216241
if not actual_transcript_text:
217242
logger.warning(f"🎤 No transcript text found in version {version_id}")
@@ -223,6 +248,16 @@ async def recognise_speakers_job(
223248
"processing_time_seconds": 0
224249
}
225250

251+
if not actual_words:
252+
logger.warning(f"🎤 No words found in version {version_id}")
253+
return {
254+
"success": False,
255+
"conversation_id": conversation_id,
256+
"version_id": version_id,
257+
"error": "No word-level timing data available",
258+
"processing_time_seconds": 0
259+
}
260+
226261
transcript_data = {
227262
"text": actual_transcript_text,
228263
"words": actual_words

backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,20 +365,30 @@ async def transcribe_full_audio_job(
365365
# Add new transcript version
366366
provider_normalized = provider_name.lower() if provider_name else "unknown"
367367

368+
# Convert words to Word objects
369+
word_objects = [
370+
Conversation.Word(
371+
word=w.get("word", ""),
372+
start=w.get("start", 0.0),
373+
end=w.get("end", 0.0),
374+
confidence=w.get("confidence")
375+
)
376+
for w in words
377+
]
378+
368379
# Prepare metadata (transcription only - speaker service will add segments and metadata)
369-
# Store words in metadata so speaker job can access them
370380
metadata = {
371381
"trigger": trigger,
372382
"audio_file_size": len(wav_data),
373383
"word_count": len(words),
374384
"segments_created_by": "speaker_service", # Speaker service creates segments via diarization
375-
"words": words, # Store word-level timing data for speaker job
376385
}
377386

378387
conversation.add_transcript_version(
379388
version_id=version_id,
380389
transcript=transcript_text,
381-
segments=speaker_segments,
390+
words=word_objects, # Store at version level (not in metadata!)
391+
segments=speaker_segments, # Empty - will be filled by speaker recognition
382392
provider=provider_normalized, # Now just a string, no enum constructor needed
383393
model=provider.name,
384394
processing_time_seconds=processing_time,

0 commit comments

Comments
 (0)