From 22eedde34817c5a023b6e8699b6e7062cdccf497 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Thu, 6 Nov 2025 21:33:14 +0300 Subject: [PATCH 01/13] update benchmarks.md --- BENCHMARKS.md | 60 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 58e69b0..c7b1232 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -479,8 +479,8 @@ ### OpenAI - **Latest Run:** `2025-11-06` -- **Model Version:** `whisper-1` (also known as `large-v2`) -- **Configuration:** OpenAI's Whisper API for transcription with keyword prompting. See [OpenAI Speech to Text: Prompting](https://platform.openai.com/docs/guides/speech-to-text#prompting) for more details. +- **Model Version:** `whisper-1` +- **Configuration:** OpenAI's Whisper API for transcription with prompt-based keyword boosting. See [OpenAI Whisper API](https://platform.openai.com/docs/guides/speech-to-text) for more details. - **Code Reference:** [openbench/pipeline/transcription/transcription_openai.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_openai.py) - **Hardware**: Unknown (Cloud API) @@ -498,7 +498,14 @@ - **Code Reference:** [openbench/pipeline/transcription/transcription_oss_whisper.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_oss_whisper.py) - **Hardware**: M2 Ultra Mac Studio -### Argmax +### Argmax (Parakeet V2) +- **Latest Run:** `2025-11-06` +- **Model Version:** `parakeet-v2` +- **Configuration:** Argmax WhisperKit Pro with compressed Parakeet V2 model (i.e. `parakeet-v2_476MB`) with custom vocabulary support. +- **Code Reference:** [openbench/pipeline/transcription/transcription_whisperkitpro.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_whisperkitpro.py) +- **Hardware**: M2 Ultra Mac Studio + +### Argmax (Parakeet V3) - **Latest Run:** `2025-11-06` - **Model Version:** `parakeet-v3` - **Configuration:** Argmax SDK WhisperKit Pro framework with compressed Parakeet V3 model (i.e. `parakeet-v3_494MB`) and Custom Vocabulary feature enabled. See [Argmax Custom Vocabulary](https://app.argmaxinc.com/docs/examples/custom-vocabulary) for more details. @@ -524,6 +531,21 @@
+## Keywords + +
+Click to expand + +The dataset consists of trimmed audio samples extracted from 1-2 hour earnings call recordings. Custom vocabulary evaluation uses two keyword categories to assess system performance under different vocabulary conditions: + +**Chunk keywords** include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. + +**File keywords** include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. + +
+ +
+ ## Word Error Rate (WER)
@@ -537,10 +559,10 @@
-| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) | 9.32 | 9.74 | 9.68 | 10 | 9.21 | -| earnings22-keywords
(file-keywords) | 9.81 | 9.57 | 9.63 | 10.3 | 9.63 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | +| earnings22-keywords | File | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 |

@@ -563,10 +585,10 @@ If the model predicts 20 keywords and 15 of them match the ground truth, precisi -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | -| earnings22-keywords
(file-keywords) |0.94 | 0.9 | 0.94 | 0.95 | 0.87 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | +| earnings22-keywords | File |0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 |

@@ -589,10 +611,10 @@ If the ground-truth transcript has 25 keywords and the model correctly finds 15, -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) | 0.89 | 0.69 | 0.7 | 0.81 | 0.88 | -| earnings22-keywords
(file-keywords) | 0.83 | 0.79 | 0.68 | 0.76 | 0.85 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | +| earnings22-keywords | File | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 |

@@ -616,10 +638,10 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | -| earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | +| earnings22-keywords | File | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

From 9eaa8a797cb23ad9c84a1442f9018dad864365d1 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Thu, 6 Nov 2025 21:34:54 +0300 Subject: [PATCH 02/13] Refactor --- BENCHMARKS.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index c7b1232..bf62298 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -538,9 +538,9 @@ The dataset consists of trimmed audio samples extracted from 1-2 hour earnings call recordings. Custom vocabulary evaluation uses two keyword categories to assess system performance under different vocabulary conditions: -**Chunk keywords** include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. +**Chunk keywords**: include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. -**File keywords** include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. +**File keywords**: include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. From d3c92cd123f6b5c853468200e0b3de4c96541937 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Thu, 6 Nov 2025 22:41:28 +0300 Subject: [PATCH 03/13] Refactor --- BENCHMARKS.md | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index bf62298..abdece8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -559,10 +559,10 @@ The dataset consists of trimmed audio samples extracted from 1-2 hour earnings c -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | -| earnings22-keywords | File | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | +| earnings22-keywords (file-level) | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 |

@@ -585,10 +585,10 @@ If the model predicts 20 keywords and 15 of them match the ground truth, precisi -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | -| earnings22-keywords | File |0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | +| earnings22-keywords (file-level) | 0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 |

@@ -611,10 +611,10 @@ If the ground-truth transcript has 25 keywords and the model correctly finds 15, -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | -| earnings22-keywords | File | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | +| earnings22-keywords (file-level) | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 |

@@ -638,10 +638,10 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | -| earnings22-keywords | File | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | +| earnings22-keywords (file-level) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

From f0e2dec7ac629f9594d4b2d94bbf344c816f4a98 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:41:56 +0300 Subject: [PATCH 04/13] add deepgram streaming diarization pipeline --- .../datasets/callhome_english.yaml | 6 + config/benchmark_config/metrics/cpwer.yaml | 3 + .../DeepgramStreamingDiarizationPipeline.yaml | 8 + src/openbench/dataset/dataset_aliases.py | 1 + src/openbench/dataset/dataset_registry.py | 7 +- src/openbench/metric/registry.py | 6 +- .../word_error_metrics/word_error_metrics.py | 10 +- src/openbench/pipeline/__init__.py | 1 + .../streaming_diarization/__init__.py | 13 + .../pipeline/streaming_diarization/common.py | 13 + .../streaming_diarization/deepgram.py | 316 ++++++++++++++++++ src/openbench/pipeline_prediction.py | 56 ++++ src/openbench/runner/benchmark.py | 10 +- src/openbench/types.py | 1 + 14 files changed, 445 insertions(+), 6 deletions(-) create mode 100644 config/benchmark_config/datasets/callhome_english.yaml create mode 100644 config/benchmark_config/metrics/cpwer.yaml create mode 100644 config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml create mode 100644 src/openbench/pipeline/streaming_diarization/__init__.py create mode 100644 src/openbench/pipeline/streaming_diarization/common.py create mode 100644 src/openbench/pipeline/streaming_diarization/deepgram.py diff --git a/config/benchmark_config/datasets/callhome_english.yaml b/config/benchmark_config/datasets/callhome_english.yaml new file mode 100644 index 0000000..c6315a3 --- /dev/null +++ b/config/benchmark_config/datasets/callhome_english.yaml @@ -0,0 +1,6 @@ +callhome_english: + dataset_id: argmaxinc/callhome-english + split: test + + + diff --git a/config/benchmark_config/metrics/cpwer.yaml b/config/benchmark_config/metrics/cpwer.yaml new file mode 100644 index 0000000..c10d6c9 --- /dev/null +++ b/config/benchmark_config/metrics/cpwer.yaml @@ -0,0 +1,3 @@ +# Dummy argument to make the config file valid +cpwer: + skip_overlap: false \ No newline at end of file diff --git a/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml b/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml new file mode 100644 index 0000000..3f458e5 --- /dev/null +++ b/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml @@ -0,0 +1,8 @@ +DeepgramStreamingDiarizationPipeline: + config: + endpoint_url: "wss://api.deepgram.com" + sample_rate: 16000 + channels: 1 + sample_width: 2 + realtime_resolution: 0.020 + model_version: "nova-3" \ No newline at end of file diff --git a/src/openbench/dataset/dataset_aliases.py b/src/openbench/dataset/dataset_aliases.py index 14d7ac9..99bab3a 100644 --- a/src/openbench/dataset/dataset_aliases.py +++ b/src/openbench/dataset/dataset_aliases.py @@ -172,6 +172,7 @@ def register_dataset_aliases() -> None: supported_pipeline_types={ PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, + PipelineType.STREAMING_DIARIZATION, }, description=( "Callhome English dataset for transcription and orchestration evaluation. " diff --git a/src/openbench/dataset/dataset_registry.py b/src/openbench/dataset/dataset_registry.py index f73ae17..1fdfef7 100644 --- a/src/openbench/dataset/dataset_registry.py +++ b/src/openbench/dataset/dataset_registry.py @@ -137,5 +137,10 @@ def has_alias(cls, alias: str) -> bool: # Register all datasets DatasetRegistry.register(PipelineType.DIARIZATION, DiarizationDataset) DatasetRegistry.register(PipelineType.ORCHESTRATION, OrchestrationDataset) -DatasetRegistry.register(PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset) +DatasetRegistry.register( + PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset +) +DatasetRegistry.register( + PipelineType.STREAMING_DIARIZATION, OrchestrationDataset +) DatasetRegistry.register(PipelineType.TRANSCRIPTION, TranscriptionDataset) diff --git a/src/openbench/metric/registry.py b/src/openbench/metric/registry.py index 99e991b..d8eec6b 100644 --- a/src/openbench/metric/registry.py +++ b/src/openbench/metric/registry.py @@ -135,7 +135,11 @@ def get_available_metrics(cls, pipeline_type: PipelineType) -> list[MetricOption # Register all existing and interesting metrics from pyannote.metrics # Custom metrics will be registered in their own files -MetricRegistry.register(DiarizationErrorRate, PipelineType.DIARIZATION, MetricOptions.DER) +MetricRegistry.register( + DiarizationErrorRate, + (PipelineType.DIARIZATION, PipelineType.STREAMING_DIARIZATION), + MetricOptions.DER, +) MetricRegistry.register(JaccardErrorRate, PipelineType.DIARIZATION, MetricOptions.JER) MetricRegistry.register(DiarizationPurity, PipelineType.DIARIZATION, MetricOptions.DIARIZATION_PURITY) MetricRegistry.register( diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index c4e7fe2..c3719ae 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -90,7 +90,9 @@ def _get_word_error_metrics( return result.ops[0], (ref_words, hyp_words), (ref_speakers, hyp_speakers) -@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.WDER) +@MetricRegistry.register_metric( + (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), MetricOptions.WDER +) class WordDiarizationErrorRate(BaseWordErrorMetric): """Word Diarization Error Rate (WDER) implementation. @@ -222,6 +224,7 @@ def compute_metric(self, detail: Details) -> float: PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, PipelineType.STREAMING_TRANSCRIPTION, + PipelineType.STREAMING_DIARIZATION, ), MetricOptions.WER, ) @@ -296,7 +299,10 @@ def compute_metric(self, detail: Details) -> float: return (S + D + I) / N if N > 0 else 0.0 -@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.CPWER) +@MetricRegistry.register_metric( + (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), + MetricOptions.CPWER, +) class ConcatenatedMinimumPermutationWER(BaseWordErrorMetric): """Concatenated minimum-Permutation Word Error Rate (cpWER) implementation. diff --git a/src/openbench/pipeline/__init__.py b/src/openbench/pipeline/__init__.py index 598328b..bc405bd 100644 --- a/src/openbench/pipeline/__init__.py +++ b/src/openbench/pipeline/__init__.py @@ -6,6 +6,7 @@ from .diarization import * from .orchestration import * from .pipeline_registry import PipelineRegistry +from .streaming_diarization import * from .streaming_transcription import * from .transcription import * diff --git a/src/openbench/pipeline/streaming_diarization/__init__.py b/src/openbench/pipeline/streaming_diarization/__init__.py new file mode 100644 index 0000000..670f0b9 --- /dev/null +++ b/src/openbench/pipeline/streaming_diarization/__init__.py @@ -0,0 +1,13 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +from .common import StreamingDiarizationConfig, StreamingDiarizationOutput +from .deepgram import DeepgramStreamingDiarizationPipeline, DeepgramStreamingDiarizationPipelineConfig + +__all__ = [ + "StreamingDiarizationConfig", + "StreamingDiarizationOutput", + "DeepgramStreamingDiarizationPipeline", + "DeepgramStreamingDiarizationPipelineConfig", +] + diff --git a/src/openbench/pipeline/streaming_diarization/common.py b/src/openbench/pipeline/streaming_diarization/common.py new file mode 100644 index 0000000..2704d9c --- /dev/null +++ b/src/openbench/pipeline/streaming_diarization/common.py @@ -0,0 +1,13 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +from ...pipeline_prediction import Transcript +from ..base import PipelineConfig, PipelineOutput + + +class StreamingDiarizationConfig(PipelineConfig): + endpoint_url: str + + +class StreamingDiarizationOutput(PipelineOutput[Transcript]): ... + diff --git a/src/openbench/pipeline/streaming_diarization/deepgram.py b/src/openbench/pipeline/streaming_diarization/deepgram.py new file mode 100644 index 0000000..9bc045d --- /dev/null +++ b/src/openbench/pipeline/streaming_diarization/deepgram.py @@ -0,0 +1,316 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +import asyncio +import json +import os + +import numpy as np +import websockets +from argmaxtools.utils import get_logger +from pyannote.core import Segment +from pydantic import Field + +from openbench.dataset import OrchestrationSample + +from ...pipeline import Pipeline, register_pipeline +from ...pipeline_prediction import ( + DiarizationAnnotation, + StreamingDiarization, + Transcript, + Word, +) +from ...types import PipelineType +from .common import StreamingDiarizationConfig, StreamingDiarizationOutput + + +logger = get_logger(__name__) + +# Some parts of this code are adapted from the Deepgram streaming example at: +# https://developers.deepgram.com/docs/measuring-streaming-latency + + +class DeepgramStreamingDiarizationApi: + def __init__(self, cfg) -> None: + self.realtime_resolution = 0.020 + self.model_version = "nova-3" + self.api_key = "38c68bfb62405f1ae17a840777ef531060018c3d" + assert ( + self.api_key is not None + ), "Please set DEEPGRAM_API_KEY in environment" + self.channels = cfg.channels + self.sample_width = cfg.sample_width + self.sample_rate = cfg.sample_rate + self.host_url = os.getenv( + "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" + ) + + async def run(self, data, key, channels, sample_width, sample_rate): + """Connect to Deepgram real-time endpoint with diarization. + + This streams audio data in real-time and collects diarization results. + """ + # How many bytes are contained in one second of audio. + byte_rate = sample_width * sample_rate * channels + + # Variables for collecting results + audio_cursor = 0.0 + audio_cursor_l = [] + interim_annotations_l = [] + confirmed_audio_cursor_l = [] + confirmed_interim_annotations_l = [] + final_annotation = DiarizationAnnotation() + transcript_text = "" + words_with_speakers = [] + + # Connect to the real-time streaming endpoint with diarization + url = ( + f"{self.host_url}/v1/listen?" + f"model={self.model_version}&" + f"channels={channels}&" + f"sample_rate={sample_rate}&" + f"encoding=linear16&" + f"interim_results=true&" + f"diarize=true" + ) + async with websockets.connect( + url, + additional_headers={ + "Authorization": "Token {}".format(key), + }, + ) as ws: + + async def sender(ws): + """Sends the data, mimicking real-time connection.""" + nonlocal data, audio_cursor + try: + while len(data): + # Bytes in `REALTIME_RESOLUTION` seconds + i = int(byte_rate * self.realtime_resolution) + chunk, data = data[:i], data[i:] + # Send the data + await ws.send(chunk) + # Move the audio cursor + audio_cursor += self.realtime_resolution + # Mimic real-time by waiting + await asyncio.sleep(self.realtime_resolution) + + # A CloseStream message tells Deepgram that no more audio + # will be sent. Deepgram will close the connection once all + # audio has finished processing. + await ws.send(json.dumps({"type": "CloseStream"})) + except Exception as e: + logger.error(f"Error while sending: {e}") + raise + + async def receiver(ws): + """Collect diarization results from the server.""" + nonlocal audio_cursor + nonlocal interim_annotations_l + nonlocal audio_cursor_l + nonlocal confirmed_interim_annotations_l + nonlocal confirmed_audio_cursor_l + nonlocal final_annotation + nonlocal transcript_text + nonlocal words_with_speakers + + async for msg in ws: + msg = json.loads(msg) + + if "request_id" in msg: + # This is the final metadata message + continue + + # Process words with speaker information + if "channel" in msg and "alternatives" in msg["channel"]: + alternatives = msg["channel"]["alternatives"] + if ( + len(alternatives) > 0 + and "words" in alternatives[0] + ): + words = alternatives[0]["words"] + + # Create annotation from words + annotation = DiarizationAnnotation() + for word_info in words: + if ( + "speaker" in word_info + and "start" in word_info + and "end" in word_info + ): + speaker = word_info["speaker"] + start = word_info["start"] + end = word_info["end"] + segment = Segment(start, end) + annotation[segment] = ( + f"SPEAKER_{speaker}" + ) + + if len(annotation) > 0: + if not msg.get("is_final", False): + # Interim result + audio_cursor_l.append(audio_cursor) + interim_annotations_l.append( + annotation + ) + logger.debug( + f"Interim annotation with " + f"{len(annotation)} segments" + ) + else: + # Confirmed/final result + confirmed_audio_cursor_l.append( + audio_cursor + ) + confirmed_interim_annotations_l.append( + annotation + ) + + # Merge into final annotation + for ( + segment, + _, + speaker, + ) in annotation.itertracks( + yield_label=True + ): + final_annotation[segment] = speaker + + # Collect final transcript and words + for word_info in words: + if ( + "word" in word_info + and "speaker" in word_info + ): + speaker_label = ( + f"SPEAKER_" + f"{word_info['speaker']}" + ) + words_with_speakers.append({ + "word": word_info["word"], + "speaker": speaker_label, + "start": ( + word_info.get("start", 0) + ), + "end": ( + word_info.get("end", 0) + ), + }) + + # Build full transcript with tags + if words_with_speakers: + current_speaker = None + transcript_parts = [] + for w in words_with_speakers: + spk = w["speaker"] + if spk != current_speaker: + if ( + current_speaker + is not None + ): + transcript_parts.append( + "" + ) + transcript_parts.append( + f"[{spk}]" + ) + current_speaker = spk + transcript_parts.append( + w["word"] + ) + transcript_text = " ".join( + transcript_parts + ) + + logger.debug( + f"Confirmed annotation with " + f"{len(annotation)} segments" + ) + + await asyncio.wait([ + asyncio.ensure_future(sender(ws)), + asyncio.ensure_future(receiver(ws)) + ]) + + return ( + final_annotation, + interim_annotations_l, + audio_cursor_l, + confirmed_interim_annotations_l, + confirmed_audio_cursor_l, + transcript_text, + words_with_speakers, + ) + + def __call__(self, sample): + # Sample must be in bytes + ( + final_annotation, + interim_annotations, + audio_cursor_l, + confirmed_interim_annotations, + confirmed_audio_cursor_l, + transcript_text, + words_with_speakers, + ) = asyncio.get_event_loop().run_until_complete( + self.run( + sample, + self.api_key, + self.channels, + self.sample_width, + self.sample_rate, + ) + ) + + return { + "annotation": final_annotation, + "interim_annotations": interim_annotations, + "audio_cursor": audio_cursor_l, + "confirmed_interim_annotations": confirmed_interim_annotations, + "confirmed_audio_cursor": confirmed_audio_cursor_l, + "transcript_text": transcript_text, + "words": words_with_speakers, + } + + +class DeepgramStreamingDiarizationPipelineConfig(StreamingDiarizationConfig): + sample_rate: int + channels: int + sample_width: int + realtime_resolution: float + model_version: str = Field( + default="nova-3", + description="The model to use for real-time diarization" + ) + + +@register_pipeline +class DeepgramStreamingDiarizationPipeline(Pipeline): + _config_class = DeepgramStreamingDiarizationPipelineConfig + pipeline_type = PipelineType.STREAMING_DIARIZATION + + def parse_input(self, input_sample: OrchestrationSample): + y = input_sample.waveform + y_int16 = (y * 32767).astype(np.int16) + audio_data_byte = y_int16.T.tobytes() + return audio_data_byte + + def parse_output(self, output) -> StreamingDiarizationOutput: + # Create Transcript from words with speakers + # For cpWER/WDER, we return transcript as the main prediction + words = [ + Word( + word=w["word"], + start=w.get("start"), + end=w.get("end"), + speaker=w.get("speaker"), + ) + for w in output["words"] + ] + transcript = Transcript(words=words) + + return StreamingDiarizationOutput(prediction=transcript) + + def build_pipeline(self): + pipeline = DeepgramStreamingDiarizationApi(self.config) + return pipeline diff --git a/src/openbench/pipeline_prediction.py b/src/openbench/pipeline_prediction.py index 030d6d5..fc9ddb0 100644 --- a/src/openbench/pipeline_prediction.py +++ b/src/openbench/pipeline_prediction.py @@ -274,3 +274,59 @@ def to_annotation_file(self, output_dir: str, filename: str) -> str: json.dump(data, f, indent=2) return path + + +# Streaming Diarization Prediction +class StreamingDiarization(BaseModel): + """Streaming diarization output combining real-time and diarization.""" + + annotation: DiarizationAnnotation = Field( + ..., description="The final diarization annotation" + ) + transcript: "Transcript | None" = Field( + None, description="The transcript with speaker information" + ) + audio_cursor: list[float] | None = Field( + None, description="The audio cursor in seconds for interim results" + ) + interim_annotations: list[DiarizationAnnotation] | None = Field( + None, description="Interim diarization annotations" + ) + confirmed_audio_cursor: list[float] | None = Field( + None, description="The confirmed audio cursor in seconds" + ) + confirmed_interim_annotations: list[DiarizationAnnotation] | None = Field( + None, description="The confirmed interim diarization annotations" + ) + + class Config: + arbitrary_types_allowed = True + + def to_annotation_file(self, output_dir: str, filename: str) -> str: + """Save both the final annotation and streaming metadata.""" + # Save final annotation as RTTM + rttm_path = os.path.join(output_dir, f"{filename}.rttm") + with open(rttm_path, "w") as f: + self.annotation.write_rttm(f) + + # Save streaming metadata as JSON + json_path = os.path.join(output_dir, f"{filename}_streaming.json") + data = { + "audio_cursor": self.audio_cursor, + "confirmed_audio_cursor": self.confirmed_audio_cursor, + "num_interim_annotations": ( + len(self.interim_annotations) + if self.interim_annotations + else 0 + ), + "num_confirmed_interim_annotations": ( + len(self.confirmed_interim_annotations) + if self.confirmed_interim_annotations + else 0 + ), + } + + with open(json_path, "w") as f: + json.dump(data, f, indent=2) + + return rttm_path diff --git a/src/openbench/runner/benchmark.py b/src/openbench/runner/benchmark.py index e2c055f..c2d0f3f 100644 --- a/src/openbench/runner/benchmark.py +++ b/src/openbench/runner/benchmark.py @@ -33,6 +33,7 @@ PipelineType.TRANSCRIPTION: TranscriptionSampleResult, PipelineType.ORCHESTRATION: TranscriptionSampleResult, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionSampleResult, + PipelineType.STREAMING_DIARIZATION: TranscriptionSampleResult, } @@ -64,6 +65,7 @@ def __init__(self, config: BenchmarkConfig, pipelines: list[Pipeline]): PipelineType.TRANSCRIPTION: TranscriptionWandbLogger, PipelineType.ORCHESTRATION: TranscriptionWandbLogger, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionWandbLogger, + PipelineType.STREAMING_DIARIZATION: TranscriptionWandbLogger, } def _get_metrics(self, pipeline: Pipeline) -> dict[str, BaseMetric]: @@ -107,8 +109,12 @@ def _process_single_sample( ) if pipeline.pipeline_type == PipelineType.DIARIZATION: - sample_results_attributes["num_speakers_predicted"] = output.prediction.num_speakers - sample_results_attributes["num_speakers_reference"] = sample.reference.num_speakers + sample_results_attributes["num_speakers_predicted"] = ( + output.prediction.num_speakers + ) + sample_results_attributes["num_speakers_reference"] = ( + sample.reference.num_speakers + ) sample_result = sample_result_class(**sample_results_attributes) diff --git a/src/openbench/types.py b/src/openbench/types.py index b4bbeaa..8fa1398 100644 --- a/src/openbench/types.py +++ b/src/openbench/types.py @@ -12,6 +12,7 @@ class PipelineType(Enum): TRANSCRIPTION = "transcription" ORCHESTRATION = "orchestration" STREAMING_TRANSCRIPTION = "streaming_transcription" + STREAMING_DIARIZATION = "streaming_diarization" # All prediction classes that we output should conform to this From 8037e294e365923fa231e0afb0900ca90ce9176c Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:45:42 +0300 Subject: [PATCH 05/13] update benchmarks --- BENCHMARKS.md | 61 ++++++++++++++++----------------------------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index abdece8..1866ec8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -479,8 +479,8 @@ ### OpenAI - **Latest Run:** `2025-11-06` -- **Model Version:** `whisper-1` -- **Configuration:** OpenAI's Whisper API for transcription with prompt-based keyword boosting. See [OpenAI Whisper API](https://platform.openai.com/docs/guides/speech-to-text) for more details. +- **Model Version:** `whisper-1` (also known as `large-v2`) +- **Configuration:** OpenAI's Whisper API for transcription with keyword prompting. See [OpenAI Speech to Text: Prompting](https://platform.openai.com/docs/guides/speech-to-text#prompting) for more details. - **Code Reference:** [openbench/pipeline/transcription/transcription_openai.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_openai.py) - **Hardware**: Unknown (Cloud API) @@ -498,14 +498,7 @@ - **Code Reference:** [openbench/pipeline/transcription/transcription_oss_whisper.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_oss_whisper.py) - **Hardware**: M2 Ultra Mac Studio -### Argmax (Parakeet V2) -- **Latest Run:** `2025-11-06` -- **Model Version:** `parakeet-v2` -- **Configuration:** Argmax WhisperKit Pro with compressed Parakeet V2 model (i.e. `parakeet-v2_476MB`) with custom vocabulary support. -- **Code Reference:** [openbench/pipeline/transcription/transcription_whisperkitpro.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_whisperkitpro.py) -- **Hardware**: M2 Ultra Mac Studio - -### Argmax (Parakeet V3) +### Argmax - **Latest Run:** `2025-11-06` - **Model Version:** `parakeet-v3` - **Configuration:** Argmax SDK WhisperKit Pro framework with compressed Parakeet V3 model (i.e. `parakeet-v3_494MB`) and Custom Vocabulary feature enabled. See [Argmax Custom Vocabulary](https://app.argmaxinc.com/docs/examples/custom-vocabulary) for more details. @@ -531,21 +524,6 @@
-## Keywords - -
-Click to expand - -The dataset consists of trimmed audio samples extracted from 1-2 hour earnings call recordings. Custom vocabulary evaluation uses two keyword categories to assess system performance under different vocabulary conditions: - -**Chunk keywords**: include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. - -**File keywords**: include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. - -
- -
- ## Word Error Rate (WER)
@@ -559,10 +537,10 @@ The dataset consists of trimmed audio samples extracted from 1-2 hour earnings c
-| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | -| earnings22-keywords (file-level) | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) | 9.32 | 9.74 | 9.68 | 10 | 9.21 | +| earnings22-keywords
(file-keywords) | 9.81 | 9.57 | 9.63 | 10.3 | 9.63 |

@@ -585,10 +563,10 @@ If the model predicts 20 keywords and 15 of them match the ground truth, precisi -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | -| earnings22-keywords (file-level) | 0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | +| earnings22-keywords
(file-keywords) |0.94 | 0.9 | 0.94 | 0.95 | 0.87 |

@@ -611,10 +589,10 @@ If the ground-truth transcript has 25 keywords and the model correctly finds 15, -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | -| earnings22-keywords (file-level) | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) | 0.89 | 0.69 | 0.7 | 0.81 | 0.88 | +| earnings22-keywords
(file-keywords) | 0.83 | 0.79 | 0.68 | 0.76 | 0.85 |

@@ -638,10 +616,9 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | -| earnings22-keywords (file-level) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | +| earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

- From adc13feb35a7ea0209674c0ee621b79776650ea4 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:46:22 +0300 Subject: [PATCH 06/13] Refactor --- BENCHMARKS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 1866ec8..de5aedf 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -621,4 +621,4 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa | earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | | earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | -

+

\ No newline at end of file From 3bda1d5204a27af0f54695c8002b6376597c15b9 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:46:56 +0300 Subject: [PATCH 07/13] refactor --- BENCHMARKS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index de5aedf..1866ec8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -621,4 +621,4 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa | earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | | earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | -

\ No newline at end of file +

From e9b314cf14508ee98b931c6c48538e2b6d74b9d9 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:47:18 +0300 Subject: [PATCH 08/13] . --- BENCHMARKS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 1866ec8..58e69b0 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -622,3 +622,4 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa | earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

+ From 1dbc5fe91bf38128428e38a1e8f277a9f79af55b Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:51:06 +0300 Subject: [PATCH 09/13] fix api key --- src/openbench/pipeline/streaming_diarization/deepgram.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openbench/pipeline/streaming_diarization/deepgram.py b/src/openbench/pipeline/streaming_diarization/deepgram.py index 9bc045d..12e41fd 100644 --- a/src/openbench/pipeline/streaming_diarization/deepgram.py +++ b/src/openbench/pipeline/streaming_diarization/deepgram.py @@ -34,7 +34,7 @@ class DeepgramStreamingDiarizationApi: def __init__(self, cfg) -> None: self.realtime_resolution = 0.020 self.model_version = "nova-3" - self.api_key = "38c68bfb62405f1ae17a840777ef531060018c3d" + self.api_key = os.getenv("DEEPGRAM_API_KEY") assert ( self.api_key is not None ), "Please set DEEPGRAM_API_KEY in environment" From 28da8ec581cf00cfbe303541fc292c5cb64ee55a Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 19 Nov 2025 19:12:08 +0300 Subject: [PATCH 10/13] Add Deepgram streaming orchestration pipeline --- .../DeepgramStreamingDiarizationPipeline.yaml | 8 - ...eepgramStreamingOrchestrationPipeline.yaml | 9 + .../pipeline/orchestration/__init__.py | 6 + .../orchestration_deepgram_streaming.py | 113 +++++++ src/openbench/pipeline/pipeline_aliases.py | 17 + .../streaming_diarization/__init__.py | 13 - .../pipeline/streaming_diarization/common.py | 13 - .../streaming_diarization/deepgram.py | 316 ------------------ .../streaming_transcription/deepgram.py | 155 +++++++-- 9 files changed, 263 insertions(+), 387 deletions(-) delete mode 100644 config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml create mode 100644 config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml create mode 100644 src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py delete mode 100644 src/openbench/pipeline/streaming_diarization/__init__.py delete mode 100644 src/openbench/pipeline/streaming_diarization/common.py delete mode 100644 src/openbench/pipeline/streaming_diarization/deepgram.py diff --git a/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml b/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml deleted file mode 100644 index 3f458e5..0000000 --- a/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml +++ /dev/null @@ -1,8 +0,0 @@ -DeepgramStreamingDiarizationPipeline: - config: - endpoint_url: "wss://api.deepgram.com" - sample_rate: 16000 - channels: 1 - sample_width: 2 - realtime_resolution: 0.020 - model_version: "nova-3" \ No newline at end of file diff --git a/config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml b/config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml new file mode 100644 index 0000000..6d3eae2 --- /dev/null +++ b/config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml @@ -0,0 +1,9 @@ +DeepgramStreamingOrchestrationPipeline: + pipeline_config: + sample_rate: 16000 + channels: 1 + sample_width: 2 + realtime_resolution: 0.020 + model_version: "nova-3" + enable_diarization: true + diff --git a/src/openbench/pipeline/orchestration/__init__.py b/src/openbench/pipeline/orchestration/__init__.py index 5298603..c450180 100644 --- a/src/openbench/pipeline/orchestration/__init__.py +++ b/src/openbench/pipeline/orchestration/__init__.py @@ -2,6 +2,10 @@ # Copyright (C) 2025 Argmax, Inc. All Rights Reserved. from .orchestration_deepgram import DeepgramOrchestrationPipeline, DeepgramOrchestrationPipelineConfig +from .orchestration_deepgram_streaming import ( + DeepgramStreamingOrchestrationPipeline, + DeepgramStreamingOrchestrationPipelineConfig, +) from .orchestration_openai import OpenAIOrchestrationPipeline, OpenAIOrchestrationPipelineConfig from .orchestration_whisperkitpro import WhisperKitProOrchestrationConfig, WhisperKitProOrchestrationPipeline from .whisperx import WhisperXPipeline, WhisperXPipelineConfig @@ -10,6 +14,8 @@ __all__ = [ "DeepgramOrchestrationPipeline", "DeepgramOrchestrationPipelineConfig", + "DeepgramStreamingOrchestrationPipeline", + "DeepgramStreamingOrchestrationPipelineConfig", "WhisperXPipeline", "WhisperXPipelineConfig", "WhisperKitProOrchestrationPipeline", diff --git a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py new file mode 100644 index 0000000..4ab01d8 --- /dev/null +++ b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py @@ -0,0 +1,113 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +import numpy as np +from pydantic import Field + +from ...dataset import OrchestrationSample +from ...pipeline import Pipeline, PipelineConfig, register_pipeline +from ...pipeline_prediction import Transcript, Word +from ...types import PipelineType +from ..streaming_transcription.deepgram import DeepgramApi +from .common import OrchestrationOutput + + +class DeepgramStreamingOrchestrationPipelineConfig(PipelineConfig): + sample_rate: int = Field( + default=16000, + description="Sample rate of the audio" + ) + channels: int = Field( + default=1, + description="Number of audio channels" + ) + sample_width: int = Field( + default=2, + description="Sample width in bytes" + ) + realtime_resolution: float = Field( + default=0.020, + description="Real-time resolution for streaming" + ) + model_version: str = Field( + default="nova-3", + description=( + "The model to use for real-time transcription " + "with diarization" + ) + ) + enable_diarization: bool = Field( + default=True, + description="Whether to enable speaker diarization" + ) + + +@register_pipeline +class DeepgramStreamingOrchestrationPipeline(Pipeline): + _config_class = DeepgramStreamingOrchestrationPipelineConfig + pipeline_type = PipelineType.ORCHESTRATION + + def build_pipeline(self): + """Build Deepgram streaming API with diarization enabled.""" + # Create a modified config for the streaming API + from types import SimpleNamespace + + api_config = SimpleNamespace( + channels=self.config.channels, + sample_width=self.config.sample_width, + sample_rate=self.config.sample_rate, + realtime_resolution=self.config.realtime_resolution, + model_version=self.config.model_version, + enable_diarization=self.config.enable_diarization, + ) + + pipeline = DeepgramApi(api_config) + return pipeline + + def parse_input(self, input_sample: OrchestrationSample): + """Convert audio waveform to bytes for streaming.""" + y = input_sample.waveform + y_int16 = (y * 32767).astype(np.int16) + audio_data_byte = y_int16.T.tobytes() + return audio_data_byte + + def parse_output(self, output) -> OrchestrationOutput: + """Parse output to extract transcription and diarization.""" + # Extract words with speaker info if diarization enabled + words = [] + + if ( + "words_with_speakers" in output and + output["words_with_speakers"] + ): + # This comes from diarization-enabled streaming + for word_info in output["words_with_speakers"]: + words.append(Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=word_info.get("speaker"), + )) + elif ( + "model_timestamps_confirmed" in output and + output["model_timestamps_confirmed"] + ): + # Fallback to regular transcription without speaker + for timestamp_group in output["model_timestamps_confirmed"]: + for word_info in timestamp_group: + if "word" in word_info: + words.append(Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=None, + )) + + # Create final transcript with speaker-attributed words + transcript = Transcript(words=words) + + return OrchestrationOutput( + prediction=transcript, + transcription_output=None, + diarization_output=None, + ) diff --git a/src/openbench/pipeline/pipeline_aliases.py b/src/openbench/pipeline/pipeline_aliases.py index b02f759..10c22c5 100644 --- a/src/openbench/pipeline/pipeline_aliases.py +++ b/src/openbench/pipeline/pipeline_aliases.py @@ -16,6 +16,7 @@ ) from .orchestration import ( DeepgramOrchestrationPipeline, + DeepgramStreamingOrchestrationPipeline, OpenAIOrchestrationPipeline, WhisperKitProOrchestrationPipeline, WhisperXPipeline, @@ -170,6 +171,22 @@ def register_pipeline_aliases() -> None: description="Deepgram orchestration pipeline. Requires API key from https://www.deepgram.com/. Set `DEEPGRAM_API_KEY` env var.", ) + PipelineRegistry.register_alias( + "deepgram-streaming-orchestration", + DeepgramStreamingOrchestrationPipeline, + default_config={ + "sample_rate": 16000, + "channels": 1, + "sample_width": 2, + "realtime_resolution": 0.020, + "model_version": "nova-3", + "enable_diarization": True, + }, + description=( + "Deepgram streaming orchestration pipeline with diarization enabled." + ), + ) + PipelineRegistry.register_alias( "whisperkitpro-orchestration-tiny", WhisperKitProOrchestrationPipeline, diff --git a/src/openbench/pipeline/streaming_diarization/__init__.py b/src/openbench/pipeline/streaming_diarization/__init__.py deleted file mode 100644 index 670f0b9..0000000 --- a/src/openbench/pipeline/streaming_diarization/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# For licensing see accompanying LICENSE.md file. -# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. - -from .common import StreamingDiarizationConfig, StreamingDiarizationOutput -from .deepgram import DeepgramStreamingDiarizationPipeline, DeepgramStreamingDiarizationPipelineConfig - -__all__ = [ - "StreamingDiarizationConfig", - "StreamingDiarizationOutput", - "DeepgramStreamingDiarizationPipeline", - "DeepgramStreamingDiarizationPipelineConfig", -] - diff --git a/src/openbench/pipeline/streaming_diarization/common.py b/src/openbench/pipeline/streaming_diarization/common.py deleted file mode 100644 index 2704d9c..0000000 --- a/src/openbench/pipeline/streaming_diarization/common.py +++ /dev/null @@ -1,13 +0,0 @@ -# For licensing see accompanying LICENSE.md file. -# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. - -from ...pipeline_prediction import Transcript -from ..base import PipelineConfig, PipelineOutput - - -class StreamingDiarizationConfig(PipelineConfig): - endpoint_url: str - - -class StreamingDiarizationOutput(PipelineOutput[Transcript]): ... - diff --git a/src/openbench/pipeline/streaming_diarization/deepgram.py b/src/openbench/pipeline/streaming_diarization/deepgram.py deleted file mode 100644 index 12e41fd..0000000 --- a/src/openbench/pipeline/streaming_diarization/deepgram.py +++ /dev/null @@ -1,316 +0,0 @@ -# For licensing see accompanying LICENSE.md file. -# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. - -import asyncio -import json -import os - -import numpy as np -import websockets -from argmaxtools.utils import get_logger -from pyannote.core import Segment -from pydantic import Field - -from openbench.dataset import OrchestrationSample - -from ...pipeline import Pipeline, register_pipeline -from ...pipeline_prediction import ( - DiarizationAnnotation, - StreamingDiarization, - Transcript, - Word, -) -from ...types import PipelineType -from .common import StreamingDiarizationConfig, StreamingDiarizationOutput - - -logger = get_logger(__name__) - -# Some parts of this code are adapted from the Deepgram streaming example at: -# https://developers.deepgram.com/docs/measuring-streaming-latency - - -class DeepgramStreamingDiarizationApi: - def __init__(self, cfg) -> None: - self.realtime_resolution = 0.020 - self.model_version = "nova-3" - self.api_key = os.getenv("DEEPGRAM_API_KEY") - assert ( - self.api_key is not None - ), "Please set DEEPGRAM_API_KEY in environment" - self.channels = cfg.channels - self.sample_width = cfg.sample_width - self.sample_rate = cfg.sample_rate - self.host_url = os.getenv( - "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" - ) - - async def run(self, data, key, channels, sample_width, sample_rate): - """Connect to Deepgram real-time endpoint with diarization. - - This streams audio data in real-time and collects diarization results. - """ - # How many bytes are contained in one second of audio. - byte_rate = sample_width * sample_rate * channels - - # Variables for collecting results - audio_cursor = 0.0 - audio_cursor_l = [] - interim_annotations_l = [] - confirmed_audio_cursor_l = [] - confirmed_interim_annotations_l = [] - final_annotation = DiarizationAnnotation() - transcript_text = "" - words_with_speakers = [] - - # Connect to the real-time streaming endpoint with diarization - url = ( - f"{self.host_url}/v1/listen?" - f"model={self.model_version}&" - f"channels={channels}&" - f"sample_rate={sample_rate}&" - f"encoding=linear16&" - f"interim_results=true&" - f"diarize=true" - ) - async with websockets.connect( - url, - additional_headers={ - "Authorization": "Token {}".format(key), - }, - ) as ws: - - async def sender(ws): - """Sends the data, mimicking real-time connection.""" - nonlocal data, audio_cursor - try: - while len(data): - # Bytes in `REALTIME_RESOLUTION` seconds - i = int(byte_rate * self.realtime_resolution) - chunk, data = data[:i], data[i:] - # Send the data - await ws.send(chunk) - # Move the audio cursor - audio_cursor += self.realtime_resolution - # Mimic real-time by waiting - await asyncio.sleep(self.realtime_resolution) - - # A CloseStream message tells Deepgram that no more audio - # will be sent. Deepgram will close the connection once all - # audio has finished processing. - await ws.send(json.dumps({"type": "CloseStream"})) - except Exception as e: - logger.error(f"Error while sending: {e}") - raise - - async def receiver(ws): - """Collect diarization results from the server.""" - nonlocal audio_cursor - nonlocal interim_annotations_l - nonlocal audio_cursor_l - nonlocal confirmed_interim_annotations_l - nonlocal confirmed_audio_cursor_l - nonlocal final_annotation - nonlocal transcript_text - nonlocal words_with_speakers - - async for msg in ws: - msg = json.loads(msg) - - if "request_id" in msg: - # This is the final metadata message - continue - - # Process words with speaker information - if "channel" in msg and "alternatives" in msg["channel"]: - alternatives = msg["channel"]["alternatives"] - if ( - len(alternatives) > 0 - and "words" in alternatives[0] - ): - words = alternatives[0]["words"] - - # Create annotation from words - annotation = DiarizationAnnotation() - for word_info in words: - if ( - "speaker" in word_info - and "start" in word_info - and "end" in word_info - ): - speaker = word_info["speaker"] - start = word_info["start"] - end = word_info["end"] - segment = Segment(start, end) - annotation[segment] = ( - f"SPEAKER_{speaker}" - ) - - if len(annotation) > 0: - if not msg.get("is_final", False): - # Interim result - audio_cursor_l.append(audio_cursor) - interim_annotations_l.append( - annotation - ) - logger.debug( - f"Interim annotation with " - f"{len(annotation)} segments" - ) - else: - # Confirmed/final result - confirmed_audio_cursor_l.append( - audio_cursor - ) - confirmed_interim_annotations_l.append( - annotation - ) - - # Merge into final annotation - for ( - segment, - _, - speaker, - ) in annotation.itertracks( - yield_label=True - ): - final_annotation[segment] = speaker - - # Collect final transcript and words - for word_info in words: - if ( - "word" in word_info - and "speaker" in word_info - ): - speaker_label = ( - f"SPEAKER_" - f"{word_info['speaker']}" - ) - words_with_speakers.append({ - "word": word_info["word"], - "speaker": speaker_label, - "start": ( - word_info.get("start", 0) - ), - "end": ( - word_info.get("end", 0) - ), - }) - - # Build full transcript with tags - if words_with_speakers: - current_speaker = None - transcript_parts = [] - for w in words_with_speakers: - spk = w["speaker"] - if spk != current_speaker: - if ( - current_speaker - is not None - ): - transcript_parts.append( - "" - ) - transcript_parts.append( - f"[{spk}]" - ) - current_speaker = spk - transcript_parts.append( - w["word"] - ) - transcript_text = " ".join( - transcript_parts - ) - - logger.debug( - f"Confirmed annotation with " - f"{len(annotation)} segments" - ) - - await asyncio.wait([ - asyncio.ensure_future(sender(ws)), - asyncio.ensure_future(receiver(ws)) - ]) - - return ( - final_annotation, - interim_annotations_l, - audio_cursor_l, - confirmed_interim_annotations_l, - confirmed_audio_cursor_l, - transcript_text, - words_with_speakers, - ) - - def __call__(self, sample): - # Sample must be in bytes - ( - final_annotation, - interim_annotations, - audio_cursor_l, - confirmed_interim_annotations, - confirmed_audio_cursor_l, - transcript_text, - words_with_speakers, - ) = asyncio.get_event_loop().run_until_complete( - self.run( - sample, - self.api_key, - self.channels, - self.sample_width, - self.sample_rate, - ) - ) - - return { - "annotation": final_annotation, - "interim_annotations": interim_annotations, - "audio_cursor": audio_cursor_l, - "confirmed_interim_annotations": confirmed_interim_annotations, - "confirmed_audio_cursor": confirmed_audio_cursor_l, - "transcript_text": transcript_text, - "words": words_with_speakers, - } - - -class DeepgramStreamingDiarizationPipelineConfig(StreamingDiarizationConfig): - sample_rate: int - channels: int - sample_width: int - realtime_resolution: float - model_version: str = Field( - default="nova-3", - description="The model to use for real-time diarization" - ) - - -@register_pipeline -class DeepgramStreamingDiarizationPipeline(Pipeline): - _config_class = DeepgramStreamingDiarizationPipelineConfig - pipeline_type = PipelineType.STREAMING_DIARIZATION - - def parse_input(self, input_sample: OrchestrationSample): - y = input_sample.waveform - y_int16 = (y * 32767).astype(np.int16) - audio_data_byte = y_int16.T.tobytes() - return audio_data_byte - - def parse_output(self, output) -> StreamingDiarizationOutput: - # Create Transcript from words with speakers - # For cpWER/WDER, we return transcript as the main prediction - words = [ - Word( - word=w["word"], - start=w.get("start"), - end=w.get("end"), - speaker=w.get("speaker"), - ) - for w in output["words"] - ] - transcript = Transcript(words=words) - - return StreamingDiarizationOutput(prediction=transcript) - - def build_pipeline(self): - pipeline = DeepgramStreamingDiarizationApi(self.config) - return pipeline diff --git a/src/openbench/pipeline/streaming_transcription/deepgram.py b/src/openbench/pipeline/streaming_transcription/deepgram.py index 56bc618..ff6e9fd 100644 --- a/src/openbench/pipeline/streaming_transcription/deepgram.py +++ b/src/openbench/pipeline/streaming_transcription/deepgram.py @@ -26,18 +26,28 @@ class DeepgramApi: def __init__(self, cfg) -> None: - self.realtime_resolution = 0.020 - self.model_version = "nova-3" + self.realtime_resolution = getattr( + cfg, 'realtime_resolution', 0.020 + ) + self.model_version = getattr(cfg, 'model_version', "nova-3") self.api_key = os.getenv("DEEPGRAM_API_KEY") - assert self.api_key is not None, "Please set API key in environment" + assert ( + self.api_key is not None + ), "Please set API key in environment" self.channels = cfg.channels self.sample_width = cfg.sample_width self.sample_rate = cfg.sample_rate - self.host_url = os.getenv("DEEPGRAM_HOST_URL", "wss://api.deepgram.com") + self.host_url = os.getenv( + "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" + ) + self.enable_diarization = getattr( + cfg, 'enable_diarization', False + ) async def run(self, data, key, channels, sample_width, sample_rate): - """Connect to the Deepgram real-time streaming endpoint, stream the data - in real-time, and print out the responses from the server. + """Connect to Deepgram real-time streaming endpoint. + + Stream the data in real-time and print responses from server. This uses a pre-recorded file as an example. It mimics a real-time connection by sending `REALTIME_RESOLUTION` seconds of audio every @@ -62,9 +72,23 @@ async def run(self, data, key, channels, sample_width, sample_rate): confirmed_interim_transcripts = [] model_timestamps_hypothesis = [] model_timestamps_confirmed = [] - # Connect to the real-time streaming endpoint, attaching our API key. + words_with_speakers = [] + + # Build connection URL with optional diarization + url = ( + f"{self.host_url}/v1/listen?" + f"model={self.model_version}&" + f"channels={channels}&" + f"sample_rate={sample_rate}&" + f"encoding=linear16&" + f"interim_results=true" + ) + if self.enable_diarization: + url += "&diarize=true" + + # Connect to the real-time streaming endpoint async with websockets.connect( - f"{self.host_url}/v1/listen?model={self.model_version}&channels={channels}&sample_rate={sample_rate}&encoding=linear16&interim_results=true", + url, additional_headers={ "Authorization": "Token {}".format(key), }, @@ -75,28 +99,27 @@ async def sender(ws): nonlocal data, audio_cursor try: while len(data): - # How many bytes are in `REALTIME_RESOLUTION` seconds of audio + # How many bytes in `REALTIME_RESOLUTION` seconds i = int(byte_rate * self.realtime_resolution) chunk, data = data[:i], data[i:] # Send the data await ws.send(chunk) # Move the audio cursor audio_cursor += self.realtime_resolution - # Mimic real-time by waiting `REALTIME_RESOLUTION` seconds - # before the next packet. + # Mimic real-time by waiting await asyncio.sleep(self.realtime_resolution) - # A CloseStream message tells Deepgram that no more audio - # will be sent. Deepgram will close the connection once all - # audio has finished processing. + # A CloseStream message tells Deepgram that no more + # audio will be sent. Deepgram will close connection + # once all audio has finished processing. await ws.send(json.dumps({"type": "CloseStream"})) except Exception as e: print(f"Error while sending: {e}") raise async def receiver(ws): - """Print out the messages received from the server.""" - nonlocal audio_cursor + """Print out messages received from the server.""" + nonlocal audio_cursor, words_with_speakers global transcript global interim_transcripts global audio_cursor_l @@ -109,28 +132,58 @@ async def receiver(ws): async for msg in ws: msg = json.loads(msg) if "request_id" in msg: - # This is the final metadata message. It gets sent as the - # very last message by Deepgram during a clean shutdown. + # This is the final metadata message. # There is no transcript in it. continue - if msg["channel"]["alternatives"][0]["transcript"] != "": + alternatives = msg["channel"]["alternatives"][0] + if alternatives["transcript"] != "": if not msg["is_final"]: audio_cursor_l.append(audio_cursor) - model_timestamps_hypothesis.append(msg["channel"]["alternatives"][0]["words"]) + model_timestamps_hypothesis.append( + alternatives["words"] + ) interim_transcripts.append( - transcript + " " + msg["channel"]["alternatives"][0]["transcript"] + transcript + " " + alternatives["transcript"] ) logger.debug( - "\n" + "Transcription: " + transcript + msg["channel"]["alternatives"][0]["transcript"] + "\n" + "Transcription: " + transcript + + alternatives["transcript"] ) elif msg["is_final"]: confirmed_audio_cursor_l.append(audio_cursor) - transcript = transcript + " " + msg["channel"]["alternatives"][0]["transcript"] - confirmed_interim_transcripts.append(transcript) - model_timestamps_confirmed.append(msg["channel"]["alternatives"][0]["words"]) + transcript = ( + transcript + " " + alternatives["transcript"] + ) + confirmed_interim_transcripts.append( + transcript + ) + words = alternatives["words"] + model_timestamps_confirmed.append(words) + + # Collect speaker info if diarization enabled + if self.enable_diarization: + for word_info in words: + if "speaker" in word_info: + speaker_label = ( + f"SPEAKER_" + f"{word_info['speaker']}" + ) + words_with_speakers.append({ + "word": word_info.get( + "word", "" + ), + "speaker": speaker_label, + "start": word_info.get( + "start", 0 + ), + "end": word_info.get("end", 0), + }) - await asyncio.wait([asyncio.ensure_future(sender(ws)), asyncio.ensure_future(receiver(ws))]) + await asyncio.wait([ + asyncio.ensure_future(sender(ws)), + asyncio.ensure_future(receiver(ws)) + ]) return ( transcript, interim_transcripts, @@ -139,6 +192,7 @@ async def receiver(ws): confirmed_audio_cursor_l, model_timestamps_hypothesis, model_timestamps_confirmed, + words_with_speakers, ) def __call__(self, sample): @@ -151,17 +205,28 @@ def __call__(self, sample): confirmed_audio_cursor_l, model_timestamps_hypothesis, model_timestamps_confirmed, + words_with_speakers, ) = asyncio.get_event_loop().run_until_complete( - self.run(sample, self.api_key, self.channels, self.sample_width, self.sample_rate) + self.run( + sample, self.api_key, self.channels, + self.sample_width, self.sample_rate + ) ) return { "transcript": transcript, "interim_transcripts": interim_transcripts, "audio_cursor": audio_cursor_l, - "confirmed_interim_transcripts": confirmed_interim_transcripts, + "confirmed_interim_transcripts": ( + confirmed_interim_transcripts + ), "confirmed_audio_cursor": confirmed_audio_cursor_l, - "model_timestamps_hypothesis": model_timestamps_hypothesis, - "model_timestamps_confirmed": model_timestamps_confirmed, + "model_timestamps_hypothesis": ( + model_timestamps_hypothesis + ), + "model_timestamps_confirmed": ( + model_timestamps_confirmed + ), + "words_with_speakers": words_with_speakers, } @@ -170,7 +235,9 @@ class DeepgramStreamingPipelineConfig(StreamingTranscriptionConfig): channels: int sample_width: int realtime_resolution: float - model_version: str = Field(..., description="The model to use for real-time transcription") + model_version: str = Field( + ..., description="The model to use for real-time transcription" + ) @register_pipeline @@ -184,19 +251,31 @@ def parse_input(self, input_sample: StreamingSample): audio_data_byte = y_int16.T.tobytes() return audio_data_byte - def parse_output(self, output) -> StreamingTranscriptionOutput: - model_timestamps_hypothesis = output["model_timestamps_hypothesis"] - model_timestamps_confirmed = output["model_timestamps_confirmed"] + def parse_output( + self, output + ) -> StreamingTranscriptionOutput: + model_timestamps_hypothesis = ( + output["model_timestamps_hypothesis"] + ) + model_timestamps_confirmed = ( + output["model_timestamps_confirmed"] + ) if model_timestamps_hypothesis is not None: model_timestamps_hypothesis = [ - [{"start": word["start"], "end": word["end"]} for word in interim_result_words] + [ + {"start": word["start"], "end": word["end"]} + for word in interim_result_words + ] for interim_result_words in model_timestamps_hypothesis ] if model_timestamps_confirmed is not None: model_timestamps_confirmed = [ - [{"start": word["start"], "end": word["end"]} for word in interim_result_words] + [ + {"start": word["start"], "end": word["end"]} + for word in interim_result_words + ] for interim_result_words in model_timestamps_confirmed ] @@ -205,7 +284,9 @@ def parse_output(self, output) -> StreamingTranscriptionOutput: audio_cursor=output["audio_cursor"], interim_results=output["interim_transcripts"], confirmed_audio_cursor=output["confirmed_audio_cursor"], - confirmed_interim_results=output["confirmed_interim_transcripts"], + confirmed_interim_results=( + output["confirmed_interim_transcripts"] + ), model_timestamps_hypothesis=model_timestamps_hypothesis, model_timestamps_confirmed=model_timestamps_confirmed, ) From 6c6a394d1fe266d147eaebcab6fcd29173cafeff Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 19 Nov 2025 19:40:56 +0300 Subject: [PATCH 11/13] remove streaming diarization --- src/openbench/dataset/dataset_aliases.py | 1 - src/openbench/dataset/dataset_registry.py | 7 +-- src/openbench/metric/registry.py | 6 +- .../word_error_metrics/word_error_metrics.py | 5 +- src/openbench/pipeline/__init__.py | 1 - src/openbench/pipeline_prediction.py | 56 ------------------- src/openbench/runner/benchmark.py | 2 - src/openbench/types.py | 1 - 8 files changed, 3 insertions(+), 76 deletions(-) diff --git a/src/openbench/dataset/dataset_aliases.py b/src/openbench/dataset/dataset_aliases.py index 99bab3a..14d7ac9 100644 --- a/src/openbench/dataset/dataset_aliases.py +++ b/src/openbench/dataset/dataset_aliases.py @@ -172,7 +172,6 @@ def register_dataset_aliases() -> None: supported_pipeline_types={ PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, - PipelineType.STREAMING_DIARIZATION, }, description=( "Callhome English dataset for transcription and orchestration evaluation. " diff --git a/src/openbench/dataset/dataset_registry.py b/src/openbench/dataset/dataset_registry.py index 1fdfef7..f73ae17 100644 --- a/src/openbench/dataset/dataset_registry.py +++ b/src/openbench/dataset/dataset_registry.py @@ -137,10 +137,5 @@ def has_alias(cls, alias: str) -> bool: # Register all datasets DatasetRegistry.register(PipelineType.DIARIZATION, DiarizationDataset) DatasetRegistry.register(PipelineType.ORCHESTRATION, OrchestrationDataset) -DatasetRegistry.register( - PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset -) -DatasetRegistry.register( - PipelineType.STREAMING_DIARIZATION, OrchestrationDataset -) +DatasetRegistry.register(PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset) DatasetRegistry.register(PipelineType.TRANSCRIPTION, TranscriptionDataset) diff --git a/src/openbench/metric/registry.py b/src/openbench/metric/registry.py index d8eec6b..99e991b 100644 --- a/src/openbench/metric/registry.py +++ b/src/openbench/metric/registry.py @@ -135,11 +135,7 @@ def get_available_metrics(cls, pipeline_type: PipelineType) -> list[MetricOption # Register all existing and interesting metrics from pyannote.metrics # Custom metrics will be registered in their own files -MetricRegistry.register( - DiarizationErrorRate, - (PipelineType.DIARIZATION, PipelineType.STREAMING_DIARIZATION), - MetricOptions.DER, -) +MetricRegistry.register(DiarizationErrorRate, PipelineType.DIARIZATION, MetricOptions.DER) MetricRegistry.register(JaccardErrorRate, PipelineType.DIARIZATION, MetricOptions.JER) MetricRegistry.register(DiarizationPurity, PipelineType.DIARIZATION, MetricOptions.DIARIZATION_PURITY) MetricRegistry.register( diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index c3719ae..5b45d75 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -90,9 +90,7 @@ def _get_word_error_metrics( return result.ops[0], (ref_words, hyp_words), (ref_speakers, hyp_speakers) -@MetricRegistry.register_metric( - (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), MetricOptions.WDER -) +@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.WDER) class WordDiarizationErrorRate(BaseWordErrorMetric): """Word Diarization Error Rate (WDER) implementation. @@ -224,7 +222,6 @@ def compute_metric(self, detail: Details) -> float: PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, PipelineType.STREAMING_TRANSCRIPTION, - PipelineType.STREAMING_DIARIZATION, ), MetricOptions.WER, ) diff --git a/src/openbench/pipeline/__init__.py b/src/openbench/pipeline/__init__.py index bc405bd..598328b 100644 --- a/src/openbench/pipeline/__init__.py +++ b/src/openbench/pipeline/__init__.py @@ -6,7 +6,6 @@ from .diarization import * from .orchestration import * from .pipeline_registry import PipelineRegistry -from .streaming_diarization import * from .streaming_transcription import * from .transcription import * diff --git a/src/openbench/pipeline_prediction.py b/src/openbench/pipeline_prediction.py index fc9ddb0..030d6d5 100644 --- a/src/openbench/pipeline_prediction.py +++ b/src/openbench/pipeline_prediction.py @@ -274,59 +274,3 @@ def to_annotation_file(self, output_dir: str, filename: str) -> str: json.dump(data, f, indent=2) return path - - -# Streaming Diarization Prediction -class StreamingDiarization(BaseModel): - """Streaming diarization output combining real-time and diarization.""" - - annotation: DiarizationAnnotation = Field( - ..., description="The final diarization annotation" - ) - transcript: "Transcript | None" = Field( - None, description="The transcript with speaker information" - ) - audio_cursor: list[float] | None = Field( - None, description="The audio cursor in seconds for interim results" - ) - interim_annotations: list[DiarizationAnnotation] | None = Field( - None, description="Interim diarization annotations" - ) - confirmed_audio_cursor: list[float] | None = Field( - None, description="The confirmed audio cursor in seconds" - ) - confirmed_interim_annotations: list[DiarizationAnnotation] | None = Field( - None, description="The confirmed interim diarization annotations" - ) - - class Config: - arbitrary_types_allowed = True - - def to_annotation_file(self, output_dir: str, filename: str) -> str: - """Save both the final annotation and streaming metadata.""" - # Save final annotation as RTTM - rttm_path = os.path.join(output_dir, f"{filename}.rttm") - with open(rttm_path, "w") as f: - self.annotation.write_rttm(f) - - # Save streaming metadata as JSON - json_path = os.path.join(output_dir, f"{filename}_streaming.json") - data = { - "audio_cursor": self.audio_cursor, - "confirmed_audio_cursor": self.confirmed_audio_cursor, - "num_interim_annotations": ( - len(self.interim_annotations) - if self.interim_annotations - else 0 - ), - "num_confirmed_interim_annotations": ( - len(self.confirmed_interim_annotations) - if self.confirmed_interim_annotations - else 0 - ), - } - - with open(json_path, "w") as f: - json.dump(data, f, indent=2) - - return rttm_path diff --git a/src/openbench/runner/benchmark.py b/src/openbench/runner/benchmark.py index c2d0f3f..b923fa3 100644 --- a/src/openbench/runner/benchmark.py +++ b/src/openbench/runner/benchmark.py @@ -33,7 +33,6 @@ PipelineType.TRANSCRIPTION: TranscriptionSampleResult, PipelineType.ORCHESTRATION: TranscriptionSampleResult, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionSampleResult, - PipelineType.STREAMING_DIARIZATION: TranscriptionSampleResult, } @@ -65,7 +64,6 @@ def __init__(self, config: BenchmarkConfig, pipelines: list[Pipeline]): PipelineType.TRANSCRIPTION: TranscriptionWandbLogger, PipelineType.ORCHESTRATION: TranscriptionWandbLogger, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionWandbLogger, - PipelineType.STREAMING_DIARIZATION: TranscriptionWandbLogger, } def _get_metrics(self, pipeline: Pipeline) -> dict[str, BaseMetric]: diff --git a/src/openbench/types.py b/src/openbench/types.py index 8fa1398..b4bbeaa 100644 --- a/src/openbench/types.py +++ b/src/openbench/types.py @@ -12,7 +12,6 @@ class PipelineType(Enum): TRANSCRIPTION = "transcription" ORCHESTRATION = "orchestration" STREAMING_TRANSCRIPTION = "streaming_transcription" - STREAMING_DIARIZATION = "streaming_diarization" # All prediction classes that we output should conform to this From 6b6508bbd1ede2c9ad15a4191d89e65a308c6171 Mon Sep 17 00:00:00 2001 From: Berkin Durmus Date: Fri, 21 Nov 2025 22:57:21 +0300 Subject: [PATCH 12/13] Add Speechmatics Realtime Transcription & Diarization (#76) * remove streaming diarization cpWER * Add speechmatics realtime transcription and diarization --------- Co-authored-by: dberkin1 --- ...hmaticsStreamingOrchestrationPipeline.yaml | 9 + .../SpeechmaticsStreamingPipeline.yaml | 9 + pyproject.toml | 1 + .../word_error_metrics/word_error_metrics.py | 6 +- .../pipeline/orchestration/__init__.py | 21 +- .../orchestration_speechmatics_streaming.py | 115 ++++++++ src/openbench/pipeline/pipeline_aliases.py | 39 +++ .../streaming_transcription/__init__.py | 29 +- .../streaming_transcription/speechmatics.py | 271 ++++++++++++++++++ uv.lock | 78 ++++- 10 files changed, 563 insertions(+), 15 deletions(-) create mode 100644 config/pipeline_configs/SpeechmaticsStreamingOrchestrationPipeline.yaml create mode 100644 config/pipeline_configs/SpeechmaticsStreamingPipeline.yaml create mode 100644 src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py create mode 100644 src/openbench/pipeline/streaming_transcription/speechmatics.py diff --git a/config/pipeline_configs/SpeechmaticsStreamingOrchestrationPipeline.yaml b/config/pipeline_configs/SpeechmaticsStreamingOrchestrationPipeline.yaml new file mode 100644 index 0000000..32ae005 --- /dev/null +++ b/config/pipeline_configs/SpeechmaticsStreamingOrchestrationPipeline.yaml @@ -0,0 +1,9 @@ +SpeechmaticsStreamingOrchestrationPipeline: + pipeline_config: + sample_rate: 16000 + language: "en" + operating_point: "enhanced" + max_delay: 1 + enable_partials: true + enable_diarization: true + diff --git a/config/pipeline_configs/SpeechmaticsStreamingPipeline.yaml b/config/pipeline_configs/SpeechmaticsStreamingPipeline.yaml new file mode 100644 index 0000000..929ca2f --- /dev/null +++ b/config/pipeline_configs/SpeechmaticsStreamingPipeline.yaml @@ -0,0 +1,9 @@ +SpeechmaticsStreamingPipeline: + pipeline_config: + sample_rate: 16000 + language: "en" + operating_point: "enhanced" + max_delay: 1 + enable_partials: true + endpoint_url: "wss://eu2.rt.speechmatics.com/v2" + diff --git a/pyproject.toml b/pyproject.toml index 4719300..e772936 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "texterrors==0.5.1", "nemo-toolkit[asr]>=2.5.0", "openai>=2.7.1", + "speechmatics-python>=5.0.0", ] [project.scripts] diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index 5b45d75..9935d47 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -296,10 +296,8 @@ def compute_metric(self, detail: Details) -> float: return (S + D + I) / N if N > 0 else 0.0 -@MetricRegistry.register_metric( - (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), - MetricOptions.CPWER, -) +@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, + MetricOptions.CPWER) class ConcatenatedMinimumPermutationWER(BaseWordErrorMetric): """Concatenated minimum-Permutation Word Error Rate (cpWER) implementation. diff --git a/src/openbench/pipeline/orchestration/__init__.py b/src/openbench/pipeline/orchestration/__init__.py index c450180..7150196 100644 --- a/src/openbench/pipeline/orchestration/__init__.py +++ b/src/openbench/pipeline/orchestration/__init__.py @@ -1,13 +1,26 @@ # For licensing see accompanying LICENSE.md file. # Copyright (C) 2025 Argmax, Inc. All Rights Reserved. -from .orchestration_deepgram import DeepgramOrchestrationPipeline, DeepgramOrchestrationPipelineConfig +from .orchestration_deepgram import ( + DeepgramOrchestrationPipeline, + DeepgramOrchestrationPipelineConfig, +) from .orchestration_deepgram_streaming import ( DeepgramStreamingOrchestrationPipeline, DeepgramStreamingOrchestrationPipelineConfig, ) -from .orchestration_openai import OpenAIOrchestrationPipeline, OpenAIOrchestrationPipelineConfig -from .orchestration_whisperkitpro import WhisperKitProOrchestrationConfig, WhisperKitProOrchestrationPipeline +from .orchestration_openai import ( + OpenAIOrchestrationPipeline, + OpenAIOrchestrationPipelineConfig, +) +from .orchestration_speechmatics_streaming import ( + SpeechmaticsStreamingOrchestrationPipeline, + SpeechmaticsStreamingOrchestrationPipelineConfig, +) +from .orchestration_whisperkitpro import ( + WhisperKitProOrchestrationConfig, + WhisperKitProOrchestrationPipeline, +) from .whisperx import WhisperXPipeline, WhisperXPipelineConfig @@ -16,6 +29,8 @@ "DeepgramOrchestrationPipelineConfig", "DeepgramStreamingOrchestrationPipeline", "DeepgramStreamingOrchestrationPipelineConfig", + "SpeechmaticsStreamingOrchestrationPipeline", + "SpeechmaticsStreamingOrchestrationPipelineConfig", "WhisperXPipeline", "WhisperXPipelineConfig", "WhisperKitProOrchestrationPipeline", diff --git a/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py b/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py new file mode 100644 index 0000000..bf8c58d --- /dev/null +++ b/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py @@ -0,0 +1,115 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +import numpy as np +from pydantic import Field + +from ...dataset import OrchestrationSample +from ...pipeline import Pipeline, PipelineConfig, register_pipeline +from ...pipeline_prediction import Transcript, Word +from ...types import PipelineType +from ..streaming_transcription.speechmatics import SpeechmaticsApi +from .common import OrchestrationOutput + + +class SpeechmaticsStreamingOrchestrationPipelineConfig(PipelineConfig): + sample_rate: int = Field( + default=16000, + description="Sample rate of the audio" + ) + language: str = Field( + default="en", + description="Language code for transcription" + ) + operating_point: str = Field( + default="enhanced", + description="Operating point (standard or enhanced)" + ) + max_delay: int = Field( + default=1, + description="Maximum delay in seconds" + ) + enable_partials: bool = Field( + default=True, + description="Enable partial transcripts" + ) + enable_diarization: bool = Field( + default=True, + description="Whether to enable speaker diarization" + ) + + +@register_pipeline +class SpeechmaticsStreamingOrchestrationPipeline(Pipeline): + _config_class = SpeechmaticsStreamingOrchestrationPipelineConfig + pipeline_type = PipelineType.ORCHESTRATION + + def build_pipeline(self): + """Build Speechmatics streaming API with diarization.""" + # Create a modified config for the streaming API + from types import SimpleNamespace + + api_config = SimpleNamespace( + sample_rate=self.config.sample_rate, + language=self.config.language, + operating_point=self.config.operating_point, + max_delay=self.config.max_delay, + enable_partials=self.config.enable_partials, + enable_diarization=self.config.enable_diarization, + ) + + pipeline = SpeechmaticsApi(api_config) + return pipeline + + def parse_input(self, input_sample: OrchestrationSample): + """Convert audio waveform to bytes for streaming.""" + y = input_sample.waveform + y_int16 = (y * 32767).astype(np.int16) + audio_data_byte = y_int16.tobytes() + return audio_data_byte + + def parse_output(self, output) -> OrchestrationOutput: + """Parse output to extract transcription and diarization.""" + # Extract words with speaker info if diarization enabled + words = [] + + if ( + "words_with_speakers" in output and + output["words_with_speakers"] + ): + # This comes from diarization-enabled streaming + for word_info in output["words_with_speakers"]: + words.append(Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=word_info.get("speaker"), + )) + elif ( + "model_timestamps_confirmed" in output and + output["model_timestamps_confirmed"] + ): + # Fallback to regular transcription without speaker + transcript_words = output.get("transcript", "").split() + timestamp_idx = 0 + + for timestamp_group in output["model_timestamps_confirmed"]: + for word_info in timestamp_group: + if timestamp_idx < len(transcript_words): + words.append(Word( + word=transcript_words[timestamp_idx], + start=word_info.get("start"), + end=word_info.get("end"), + speaker=None, + )) + timestamp_idx += 1 + + # Create final transcript with speaker-attributed words + transcript = Transcript(words=words) + + return OrchestrationOutput( + prediction=transcript, + transcription_output=None, + diarization_output=None, + ) + diff --git a/src/openbench/pipeline/pipeline_aliases.py b/src/openbench/pipeline/pipeline_aliases.py index 10c22c5..a098584 100644 --- a/src/openbench/pipeline/pipeline_aliases.py +++ b/src/openbench/pipeline/pipeline_aliases.py @@ -18,6 +18,7 @@ DeepgramOrchestrationPipeline, DeepgramStreamingOrchestrationPipeline, OpenAIOrchestrationPipeline, + SpeechmaticsStreamingOrchestrationPipeline, WhisperKitProOrchestrationPipeline, WhisperXPipeline, ) @@ -28,6 +29,7 @@ FireworksStreamingPipeline, GladiaStreamingPipeline, OpenAIStreamingPipeline, + SpeechmaticsStreamingPipeline, ) from .transcription import ( AssemblyAITranscriptionPipeline, @@ -186,6 +188,25 @@ def register_pipeline_aliases() -> None: "Deepgram streaming orchestration pipeline with diarization enabled." ), ) + + PipelineRegistry.register_alias( + "speechmatics-streaming-orchestration", + SpeechmaticsStreamingOrchestrationPipeline, + default_config={ + "sample_rate": 16000, + "language": "en", + "operating_point": "enhanced", + "max_delay": 1, + "enable_partials": True, + "enable_diarization": True, + }, + description=( + "Speechmatics streaming orchestration pipeline with " + "diarization. Requires API key from " + "https://www.speechmatics.com/. Set " + "`SPEECHMATICS_API_KEY` env var." + ), + ) PipelineRegistry.register_alias( "whisperkitpro-orchestration-tiny", @@ -684,5 +705,23 @@ def register_pipeline_aliases() -> None: description="AssemblyAI streaming transcription pipeline. Requires API key from https://www.assemblyai.com/. Set `ASSEMBLYAI_API_KEY` env var.", ) + PipelineRegistry.register_alias( + "speechmatics-streaming", + SpeechmaticsStreamingPipeline, + default_config={ + "sample_rate": 16000, + "language": "en", + "operating_point": "enhanced", + "max_delay": 1, + "enable_partials": True, + "endpoint_url": "wss://eu2.rt.speechmatics.com/v2", + }, + description=( + "Speechmatics streaming transcription pipeline. " + "Requires API key from https://www.speechmatics.com/. " + "Set `SPEECHMATICS_API_KEY` env var." + ), + ) + register_pipeline_aliases() diff --git a/src/openbench/pipeline/streaming_transcription/__init__.py b/src/openbench/pipeline/streaming_transcription/__init__.py index c152587..d8b5dc7 100644 --- a/src/openbench/pipeline/streaming_transcription/__init__.py +++ b/src/openbench/pipeline/streaming_transcription/__init__.py @@ -1,8 +1,27 @@ # For licensing see accompanying LICENSE.md file. # Copyright (C) 2025 Argmax, Inc. All Rights Reserved. -from .assemblyai import AssemblyAIStreamingPipeline, AssemblyAIStreamingPipelineConfig -from .deepgram import DeepgramStreamingPipeline, DeepgramStreamingPipelineConfig -from .fireworks import FireworksStreamingPipeline, FireworksStreamingPipelineConfig -from .gladia import GladiaStreamingPipeline, GladiaStreamingPipelineConfig -from .openai import OpenAIStreamingPipeline, OpenAIStreamingPipelineConfig +from .assemblyai import ( + AssemblyAIStreamingPipeline, + AssemblyAIStreamingPipelineConfig, +) +from .deepgram import ( + DeepgramStreamingPipeline, + DeepgramStreamingPipelineConfig, +) +from .fireworks import ( + FireworksStreamingPipeline, + FireworksStreamingPipelineConfig, +) +from .gladia import ( + GladiaStreamingPipeline, + GladiaStreamingPipelineConfig, +) +from .openai import ( + OpenAIStreamingPipeline, + OpenAIStreamingPipelineConfig, +) +from .speechmatics import ( + SpeechmaticsStreamingPipeline, + SpeechmaticsStreamingPipelineConfig, +) diff --git a/src/openbench/pipeline/streaming_transcription/speechmatics.py b/src/openbench/pipeline/streaming_transcription/speechmatics.py new file mode 100644 index 0000000..b513796 --- /dev/null +++ b/src/openbench/pipeline/streaming_transcription/speechmatics.py @@ -0,0 +1,271 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +import io +import os + +import numpy as np +import speechmatics +from argmaxtools.utils import get_logger +from pydantic import Field +from speechmatics.models import ServerMessageType + +from openbench.dataset import StreamingSample + +from ...pipeline import Pipeline, register_pipeline +from ...pipeline_prediction import StreamingTranscript +from ...types import PipelineType +from .common import StreamingTranscriptionConfig, StreamingTranscriptionOutput + + +logger = get_logger(__name__) + + +class SpeechmaticsApi: + def __init__(self, cfg) -> None: + self.api_key = os.getenv("SPEECHMATICS_API_KEY") + assert ( + self.api_key is not None + ), "Please set SPEECHMATICS_API_KEY in environment" + self.language = getattr(cfg, 'language', 'en') + self.operating_point = getattr(cfg, 'operating_point', 'enhanced') + self.max_delay = getattr(cfg, 'max_delay', 1) + self.enable_partials = getattr(cfg, 'enable_partials', True) + self.sample_rate = cfg.sample_rate + self.connection_url = os.getenv( + "SPEECHMATICS_URL", "wss://eu2.rt.speechmatics.com/v2" + ) + self.enable_diarization = getattr( + cfg, 'enable_diarization', False + ) + + def __call__(self, sample): + # Sample must be in bytes (raw audio data) + transcript = "" + interim_transcripts = [] + audio_cursor_l = [] + confirmed_interim_transcripts = [] + confirmed_audio_cursor_l = [] + model_timestamps_hypothesis = [] + model_timestamps_confirmed = [] + words_with_speakers = [] + + # Create audio cursor tracker + audio_cursor = [0.0] + + # Create a transcription client + ws = speechmatics.client.WebsocketClient( + speechmatics.models.ConnectionSettings( + url=self.connection_url, + auth_token=self.api_key, + ) + ) + + # Define event handler for partial transcripts + def handle_partial_transcript(msg): + nonlocal interim_transcripts, audio_cursor_l + nonlocal model_timestamps_hypothesis + + metadata = msg.get('metadata', {}) + partial_transcript = metadata.get('transcript', '') + + if partial_transcript: + audio_cursor_l.append(audio_cursor[0]) + interim_transcripts.append( + transcript + " " + partial_transcript + ) + + # Collect word timestamps if available + results = msg.get('results', []) + if results: + words = [] + for result in results: + if result.get('type') == 'word': + words.append({ + 'start': result.get('start_time', 0), + 'end': result.get('end_time', 0), + }) + if words: + model_timestamps_hypothesis.append(words) + + logger.debug(f"[partial] {partial_transcript}") + + # Define event handler for full transcripts + def handle_transcript(msg): + nonlocal transcript, confirmed_interim_transcripts + nonlocal confirmed_audio_cursor_l + nonlocal model_timestamps_confirmed, words_with_speakers + + metadata = msg.get('metadata', {}) + full_transcript = metadata.get('transcript', '') + + if full_transcript: + confirmed_audio_cursor_l.append(audio_cursor[0]) + transcript = transcript + " " + full_transcript + confirmed_interim_transcripts.append(transcript) + + # Collect word timestamps and speaker info + results = msg.get('results', []) + if results: + words = [] + for result in results: + if result.get('type') == 'word': + # Get alternatives array + alternatives = result.get('alternatives', []) + if alternatives: + # Take first alternative + alternative = alternatives[0] + + word_data = { + 'start': result.get('start_time', 0), + 'end': result.get('end_time', 0), + } + words.append(word_data) + + # Collect speaker info if diarization + if self.enable_diarization: + speaker_info = alternative.get( + 'speaker', None + ) + word_content = alternative.get( + 'content', '' + ) + if speaker_info is not None: + words_with_speakers.append({ + 'word': word_content, + 'speaker': ( + f"SPEAKER_{speaker_info}" + ), + 'start': result.get( + 'start_time', 0 + ), + 'end': result.get( + 'end_time', 0 + ), + }) + + if words: + model_timestamps_confirmed.append(words) + + logger.debug(f"[FULL] {full_transcript}") + + # Register event handlers + ws.add_event_handler( + event_name=ServerMessageType.AddPartialTranscript, + event_handler=handle_partial_transcript, + ) + + ws.add_event_handler( + event_name=ServerMessageType.AddTranscript, + event_handler=handle_transcript, + ) + + # Audio settings + settings = speechmatics.models.AudioSettings( + sample_rate=self.sample_rate, + encoding='pcm_s16le', + ) + + # Transcription config + conf_dict = { + 'operating_point': self.operating_point, + 'language': self.language, + 'enable_partials': self.enable_partials, + 'max_delay': self.max_delay, + } + + # Enable diarization if requested + if self.enable_diarization: + conf_dict['diarization'] = 'speaker' + + conf = speechmatics.models.TranscriptionConfig(**conf_dict) + + # Create a BytesIO stream from the audio data + audio_stream = io.BytesIO(sample) + + try: + # Run transcription synchronously + ws.run_synchronously(audio_stream, conf, settings) + except Exception as e: + logger.error(f"Speechmatics transcription error: {e}") + raise + + return { + "transcript": transcript.strip(), + "interim_transcripts": interim_transcripts, + "audio_cursor": audio_cursor_l, + "confirmed_interim_transcripts": ( + confirmed_interim_transcripts + ), + "confirmed_audio_cursor": confirmed_audio_cursor_l, + "model_timestamps_hypothesis": ( + model_timestamps_hypothesis + ), + "model_timestamps_confirmed": ( + model_timestamps_confirmed + ), + "words_with_speakers": words_with_speakers, + } + + +class SpeechmaticsStreamingPipelineConfig(StreamingTranscriptionConfig): + sample_rate: int = Field( + default=16000, + description="Sample rate of the audio" + ) + language: str = Field( + default="en", + description="Language code for transcription" + ) + operating_point: str = Field( + default="enhanced", + description="Operating point (standard or enhanced)" + ) + max_delay: int = Field( + default=1, + description="Maximum delay in seconds" + ) + enable_partials: bool = Field( + default=True, + description="Enable partial transcripts" + ) + + +@register_pipeline +class SpeechmaticsStreamingPipeline(Pipeline): + _config_class = SpeechmaticsStreamingPipelineConfig + pipeline_type = PipelineType.STREAMING_TRANSCRIPTION + + def parse_input(self, input_sample: StreamingSample): + y = input_sample.waveform + y_int16 = (y * 32767).astype(np.int16) + audio_data_byte = y_int16.tobytes() + return audio_data_byte + + def parse_output( + self, output + ) -> StreamingTranscriptionOutput: + model_timestamps_hypothesis = ( + output["model_timestamps_hypothesis"] + ) + model_timestamps_confirmed = ( + output["model_timestamps_confirmed"] + ) + + prediction = StreamingTranscript( + transcript=output["transcript"], + audio_cursor=output["audio_cursor"], + interim_results=output["interim_transcripts"], + confirmed_audio_cursor=output["confirmed_audio_cursor"], + confirmed_interim_results=( + output["confirmed_interim_transcripts"] + ), + model_timestamps_hypothesis=model_timestamps_hypothesis, + model_timestamps_confirmed=model_timestamps_confirmed, + ) + + return StreamingTranscriptionOutput(prediction=prediction) + + def build_pipeline(self): + pipeline = SpeechmaticsApi(self.config) + return pipeline diff --git a/uv.lock b/uv.lock index b1124be..7e7dc3a 100644 --- a/uv.lock +++ b/uv.lock @@ -1346,6 +1346,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, ] +[[package]] +name = "h2" +version = "4.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "hpack" }, + { name = "hyperframe" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1d/17/afa56379f94ad0fe8defd37d6eb3f89a25404ffc71d4d848893d270325fc/h2-4.3.0.tar.gz", hash = "sha256:6c59efe4323fa18b47a632221a1888bd7fde6249819beda254aeca909f221bf1", size = 2152026, upload-time = "2025-08-23T18:12:19.778Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/69/b2/119f6e6dcbd96f9069ce9a2665e0146588dc9f88f29549711853645e736a/h2-4.3.0-py3-none-any.whl", hash = "sha256:c438f029a25f7945c69e0ccf0fb951dc3f73a5f6412981daee861431b70e2bdd", size = 61779, upload-time = "2025-08-23T18:12:17.779Z" }, +] + [[package]] name = "hdbscan" version = "0.8.40" @@ -1387,6 +1400,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9e/d3/0aaf279f4f3dea58e99401b92c31c0f752924ba0e6c7d7bb07b1dbd7f35e/hf_xet-1.1.8-cp37-abi3-win_amd64.whl", hash = "sha256:4171f31d87b13da4af1ed86c98cf763292e4720c088b4957cf9d564f92904ca9", size = 2801689, upload-time = "2025-08-18T22:01:04.81Z" }, ] +[[package]] +name = "hpack" +version = "4.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2c/48/71de9ed269fdae9c8057e5a4c0aa7402e8bb16f2c6e90b3aa53327b113f8/hpack-4.1.0.tar.gz", hash = "sha256:ec5eca154f7056aa06f196a557655c5b009b382873ac8d1e66e79e87535f1dca", size = 51276, upload-time = "2025-01-22T21:44:58.347Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/c6/80c95b1b2b94682a72cbdbfb85b81ae2daffa4291fbfa1b1464502ede10d/hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496", size = 34357, upload-time = "2025-01-22T21:44:56.92Z" }, +] + [[package]] name = "httpcore" version = "1.0.9" @@ -1415,6 +1437,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, ] +[package.optional-dependencies] +http2 = [ + { name = "h2" }, +] + [[package]] name = "huggingface-hub" version = "0.34.4" @@ -1460,6 +1487,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c6/50/e0edd38dcd63fb26a8547f13d28f7a008bc4a3fd4eb4ff030673f22ad41a/hydra_core-1.3.2-py3-none-any.whl", hash = "sha256:fa0238a9e31df3373b35b0bfb672c34cc92718d21f81311d8996a16de1141d8b", size = 154547, upload-time = "2023-02-23T18:33:40.801Z" }, ] +[[package]] +name = "hyperframe" +version = "6.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/02/e7/94f8232d4a74cc99514c13a9f995811485a6903d48e5d952771ef6322e30/hyperframe-6.1.0.tar.gz", hash = "sha256:f630908a00854a7adeabd6382b43923a4c4cd4b821fcb527e6ab9e15382a3b08", size = 26566, upload-time = "2025-01-22T21:41:49.302Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/48/30/47d0bf6072f7252e6521f3447ccfa40b421b6824517f82854703d0f5a98b/hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5", size = 13007, upload-time = "2025-01-22T21:41:47.295Z" }, +] + [[package]] name = "hyperpyyaml" version = "1.2.2" @@ -3008,6 +3044,7 @@ dependencies = [ { name = "rich" }, { name = "scikit-learn" }, { name = "speechbrain" }, + { name = "speechmatics-python" }, { name = "texterrors" }, { name = "torch" }, { name = "typer" }, @@ -3059,6 +3096,7 @@ requires-dist = [ { name = "rich", specifier = ">=13.0.0,<14" }, { name = "scikit-learn", specifier = "==1.5.1" }, { name = "speechbrain", specifier = "==1.0.2" }, + { name = "speechmatics-python", specifier = ">=5.0.0" }, { name = "texterrors", specifier = "==0.5.1" }, { name = "torch", specifier = "==2.8" }, { name = "typer", specifier = ">=0.16.0" }, @@ -3291,6 +3329,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "polling2" +version = "0.5.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/04/9d/6a560ab95e1b92dfce97321d8ffc9f20d352fa4b12a91525d4c575df1c74/polling2-0.5.0.tar.gz", hash = "sha256:90b7da82cf7adbb48029724d3546af93f21ab6e592ec37c8c4619aedd010e342", size = 6549, upload-time = "2021-07-19T18:06:54.951Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a3/de/e5bf2556ebd6db12590788207575c7c75b1de62f5ddc8b4916b668e04e6b/polling2-0.5.0-py2.py3-none-any.whl", hash = "sha256:ad86d56fbd7502f0856cac2d0109d595c18fa6c7fb12c88cee5e5d16c17286c1", size = 6431, upload-time = "2021-07-19T18:06:53.681Z" }, +] + [[package]] name = "pooch" version = "1.8.2" @@ -4583,6 +4630,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/26/74/b63579a8f2bd0934a53ae13c10cc20539473cb29dbb911eefce88b59b43d/speechbrain-1.0.2-py3-none-any.whl", hash = "sha256:fe5328554c28bc8fe8bfef355144ee9de5cf569b9706cee2267e19c99b092578", size = 824842, upload-time = "2024-10-30T18:31:32.191Z" }, ] +[[package]] +name = "speechmatics-python" +version = "5.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "httpx", extra = ["http2"] }, + { name = "polling2" }, + { name = "tenacity" }, + { name = "toml" }, + { name = "websockets" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/61/fe/baa7da879f0b43d941cb9092bf68a8d92dbb9325a7aa614715ae8f0b0b32/speechmatics_python-5.0.0.tar.gz", hash = "sha256:bc8724aff604b13b00a83f7b4bd3ebc68e9f6c4ab713b3fb675b1d5bb0870053", size = 133087, upload-time = "2025-08-14T15:15:33.334Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2f/80/7e2a698437effd75be73e442b0e0a669e66c48d302bc318406a5bcef5345/speechmatics_python-5.0.0-py3-none-any.whl", hash = "sha256:9d42bf40e452dc20c1f15c9a16b712b2ab2e6c0585ed6943c2d0753269faa0da", size = 132185, upload-time = "2025-08-14T15:15:32.027Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.43" @@ -4657,11 +4720,11 @@ wheels = [ [[package]] name = "tenacity" -version = "9.1.2" +version = "8.2.3" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/0a/d4/2b0cd0fe285e14b36db076e78c93766ff1d529d70408bd1d2a5a84f1d929/tenacity-9.1.2.tar.gz", hash = "sha256:1169d376c297e7de388d18b4481760d478b0e99a777cad3a9c86e556f4b697cb", size = 48036, upload-time = "2025-04-02T08:25:09.966Z" } +sdist = { url = "https://files.pythonhosted.org/packages/89/3c/253e1627262373784bf9355db9d6f20d2d8831d79f91e9cca48050cddcc2/tenacity-8.2.3.tar.gz", hash = "sha256:5398ef0d78e63f40007c1fb4c0bff96e1911394d2fa8d194f77619c05ff6cc8a", size = 40651, upload-time = "2023-08-14T13:22:50.869Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e5/30/643397144bfbfec6f6ef821f36f33e57d35946c44a2352d3c9f0ae847619/tenacity-9.1.2-py3-none-any.whl", hash = "sha256:f77bf36710d8b73a50b2dd155c97b870017ad21afe6ab300326b0371b3b05138", size = 28248, upload-time = "2025-04-02T08:25:07.678Z" }, + { url = "https://files.pythonhosted.org/packages/f4/f1/990741d5bb2487d529d20a433210ffa136a367751e454214013b441c4575/tenacity-8.2.3-py3-none-any.whl", hash = "sha256:ce510e327a630c9e1beaf17d42e6ffacc88185044ad85cf74c0a8887c6a0f88c", size = 24401, upload-time = "2023-08-14T13:22:49.265Z" }, ] [[package]] @@ -4814,6 +4877,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/41/f2/fd673d979185f5dcbac4be7d09461cbb99751554ffb6718d0013af8604cb/tokenizers-0.21.4-cp39-abi3-win_amd64.whl", hash = "sha256:475d807a5c3eb72c59ad9b5fcdb254f6e17f53dfcbb9903233b0dfa9c943b597", size = 2507568, upload-time = "2025-07-28T15:48:55.456Z" }, ] +[[package]] +name = "toml" +version = "0.10.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/be/ba/1f744cdc819428fc6b5084ec34d9b30660f6f9daaf70eead706e3203ec3c/toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f", size = 22253, upload-time = "2020-11-01T01:40:22.204Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588, upload-time = "2020-11-01T01:40:20.672Z" }, +] + [[package]] name = "tomli" version = "2.2.1" From 6fd5da6de489de8ec39914bc4ff75d40a74f83a2 Mon Sep 17 00:00:00 2001 From: Eduardo Pacheco Date: Fri, 21 Nov 2025 16:58:50 -0300 Subject: [PATCH 13/13] fix: format --- .../word_error_metrics/word_error_metrics.py | 3 +- .../orchestration_deepgram_streaming.py | 69 +++----- .../orchestration_speechmatics_streaming.py | 69 +++----- src/openbench/pipeline/pipeline_aliases.py | 6 +- .../streaming_transcription/deepgram.py | 114 ++++---------- .../streaming_transcription/speechmatics.py | 149 +++++++----------- src/openbench/runner/benchmark.py | 8 +- 7 files changed, 137 insertions(+), 281 deletions(-) diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index 63d7dab..4cef2a1 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -297,8 +297,7 @@ def compute_metric(self, detail: Details) -> float: return (S + D + I) / N if N > 0 else 0.0 -@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, - MetricOptions.CPWER) +@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.CPWER) class ConcatenatedMinimumPermutationWER(BaseWordErrorMetric): """Concatenated minimum-Permutation Word Error Rate (cpWER) implementation. diff --git a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py index 4ab01d8..ce0240f 100644 --- a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py +++ b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py @@ -13,33 +13,14 @@ class DeepgramStreamingOrchestrationPipelineConfig(PipelineConfig): - sample_rate: int = Field( - default=16000, - description="Sample rate of the audio" - ) - channels: int = Field( - default=1, - description="Number of audio channels" - ) - sample_width: int = Field( - default=2, - description="Sample width in bytes" - ) - realtime_resolution: float = Field( - default=0.020, - description="Real-time resolution for streaming" - ) + sample_rate: int = Field(default=16000, description="Sample rate of the audio") + channels: int = Field(default=1, description="Number of audio channels") + sample_width: int = Field(default=2, description="Sample width in bytes") + realtime_resolution: float = Field(default=0.020, description="Real-time resolution for streaming") model_version: str = Field( - default="nova-3", - description=( - "The model to use for real-time transcription " - "with diarization" - ) - ) - enable_diarization: bool = Field( - default=True, - description="Whether to enable speaker diarization" + default="nova-3", description=("The model to use for real-time transcription with diarization") ) + enable_diarization: bool = Field(default=True, description="Whether to enable speaker diarization") @register_pipeline @@ -76,32 +57,30 @@ def parse_output(self, output) -> OrchestrationOutput: # Extract words with speaker info if diarization enabled words = [] - if ( - "words_with_speakers" in output and - output["words_with_speakers"] - ): + if "words_with_speakers" in output and output["words_with_speakers"]: # This comes from diarization-enabled streaming for word_info in output["words_with_speakers"]: - words.append(Word( - word=word_info.get("word", ""), - start=word_info.get("start"), - end=word_info.get("end"), - speaker=word_info.get("speaker"), - )) - elif ( - "model_timestamps_confirmed" in output and - output["model_timestamps_confirmed"] - ): + words.append( + Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=word_info.get("speaker"), + ) + ) + elif "model_timestamps_confirmed" in output and output["model_timestamps_confirmed"]: # Fallback to regular transcription without speaker for timestamp_group in output["model_timestamps_confirmed"]: for word_info in timestamp_group: if "word" in word_info: - words.append(Word( - word=word_info.get("word", ""), - start=word_info.get("start"), - end=word_info.get("end"), - speaker=None, - )) + words.append( + Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=None, + ) + ) # Create final transcript with speaker-attributed words transcript = Transcript(words=words) diff --git a/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py b/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py index bf8c58d..32e24e4 100644 --- a/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py +++ b/src/openbench/pipeline/orchestration/orchestration_speechmatics_streaming.py @@ -13,30 +13,12 @@ class SpeechmaticsStreamingOrchestrationPipelineConfig(PipelineConfig): - sample_rate: int = Field( - default=16000, - description="Sample rate of the audio" - ) - language: str = Field( - default="en", - description="Language code for transcription" - ) - operating_point: str = Field( - default="enhanced", - description="Operating point (standard or enhanced)" - ) - max_delay: int = Field( - default=1, - description="Maximum delay in seconds" - ) - enable_partials: bool = Field( - default=True, - description="Enable partial transcripts" - ) - enable_diarization: bool = Field( - default=True, - description="Whether to enable speaker diarization" - ) + sample_rate: int = Field(default=16000, description="Sample rate of the audio") + language: str = Field(default="en", description="Language code for transcription") + operating_point: str = Field(default="enhanced", description="Operating point (standard or enhanced)") + max_delay: int = Field(default=1, description="Maximum delay in seconds") + enable_partials: bool = Field(default=True, description="Enable partial transcripts") + enable_diarization: bool = Field(default=True, description="Whether to enable speaker diarization") @register_pipeline @@ -73,22 +55,18 @@ def parse_output(self, output) -> OrchestrationOutput: # Extract words with speaker info if diarization enabled words = [] - if ( - "words_with_speakers" in output and - output["words_with_speakers"] - ): + if "words_with_speakers" in output and output["words_with_speakers"]: # This comes from diarization-enabled streaming for word_info in output["words_with_speakers"]: - words.append(Word( - word=word_info.get("word", ""), - start=word_info.get("start"), - end=word_info.get("end"), - speaker=word_info.get("speaker"), - )) - elif ( - "model_timestamps_confirmed" in output and - output["model_timestamps_confirmed"] - ): + words.append( + Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=word_info.get("speaker"), + ) + ) + elif "model_timestamps_confirmed" in output and output["model_timestamps_confirmed"]: # Fallback to regular transcription without speaker transcript_words = output.get("transcript", "").split() timestamp_idx = 0 @@ -96,12 +74,14 @@ def parse_output(self, output) -> OrchestrationOutput: for timestamp_group in output["model_timestamps_confirmed"]: for word_info in timestamp_group: if timestamp_idx < len(transcript_words): - words.append(Word( - word=transcript_words[timestamp_idx], - start=word_info.get("start"), - end=word_info.get("end"), - speaker=None, - )) + words.append( + Word( + word=transcript_words[timestamp_idx], + start=word_info.get("start"), + end=word_info.get("end"), + speaker=None, + ) + ) timestamp_idx += 1 # Create final transcript with speaker-attributed words @@ -112,4 +92,3 @@ def parse_output(self, output) -> OrchestrationOutput: transcription_output=None, diarization_output=None, ) - diff --git a/src/openbench/pipeline/pipeline_aliases.py b/src/openbench/pipeline/pipeline_aliases.py index a098584..f2ac84f 100644 --- a/src/openbench/pipeline/pipeline_aliases.py +++ b/src/openbench/pipeline/pipeline_aliases.py @@ -184,11 +184,9 @@ def register_pipeline_aliases() -> None: "model_version": "nova-3", "enable_diarization": True, }, - description=( - "Deepgram streaming orchestration pipeline with diarization enabled." - ), + description=("Deepgram streaming orchestration pipeline with diarization enabled."), ) - + PipelineRegistry.register_alias( "speechmatics-streaming-orchestration", SpeechmaticsStreamingOrchestrationPipeline, diff --git a/src/openbench/pipeline/streaming_transcription/deepgram.py b/src/openbench/pipeline/streaming_transcription/deepgram.py index ff6e9fd..3d10f41 100644 --- a/src/openbench/pipeline/streaming_transcription/deepgram.py +++ b/src/openbench/pipeline/streaming_transcription/deepgram.py @@ -26,23 +26,15 @@ class DeepgramApi: def __init__(self, cfg) -> None: - self.realtime_resolution = getattr( - cfg, 'realtime_resolution', 0.020 - ) - self.model_version = getattr(cfg, 'model_version', "nova-3") + self.realtime_resolution = getattr(cfg, "realtime_resolution", 0.020) + self.model_version = getattr(cfg, "model_version", "nova-3") self.api_key = os.getenv("DEEPGRAM_API_KEY") - assert ( - self.api_key is not None - ), "Please set API key in environment" + assert self.api_key is not None, "Please set API key in environment" self.channels = cfg.channels self.sample_width = cfg.sample_width self.sample_rate = cfg.sample_rate - self.host_url = os.getenv( - "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" - ) - self.enable_diarization = getattr( - cfg, 'enable_diarization', False - ) + self.host_url = os.getenv("DEEPGRAM_HOST_URL", "wss://api.deepgram.com") + self.enable_diarization = getattr(cfg, "enable_diarization", False) async def run(self, data, key, channels, sample_width, sample_rate): """Connect to Deepgram real-time streaming endpoint. @@ -139,25 +131,14 @@ async def receiver(ws): if alternatives["transcript"] != "": if not msg["is_final"]: audio_cursor_l.append(audio_cursor) - model_timestamps_hypothesis.append( - alternatives["words"] - ) - interim_transcripts.append( - transcript + " " + alternatives["transcript"] - ) - logger.debug( - "\n" + "Transcription: " + transcript + - alternatives["transcript"] - ) + model_timestamps_hypothesis.append(alternatives["words"]) + interim_transcripts.append(transcript + " " + alternatives["transcript"]) + logger.debug("\n" + "Transcription: " + transcript + alternatives["transcript"]) elif msg["is_final"]: confirmed_audio_cursor_l.append(audio_cursor) - transcript = ( - transcript + " " + alternatives["transcript"] - ) - confirmed_interim_transcripts.append( - transcript - ) + transcript = transcript + " " + alternatives["transcript"] + confirmed_interim_transcripts.append(transcript) words = alternatives["words"] model_timestamps_confirmed.append(words) @@ -165,25 +146,17 @@ async def receiver(ws): if self.enable_diarization: for word_info in words: if "speaker" in word_info: - speaker_label = ( - f"SPEAKER_" - f"{word_info['speaker']}" + speaker_label = f"SPEAKER_{word_info['speaker']}" + words_with_speakers.append( + { + "word": word_info.get("word", ""), + "speaker": speaker_label, + "start": word_info.get("start", 0), + "end": word_info.get("end", 0), + } ) - words_with_speakers.append({ - "word": word_info.get( - "word", "" - ), - "speaker": speaker_label, - "start": word_info.get( - "start", 0 - ), - "end": word_info.get("end", 0), - }) - - await asyncio.wait([ - asyncio.ensure_future(sender(ws)), - asyncio.ensure_future(receiver(ws)) - ]) + + await asyncio.wait([asyncio.ensure_future(sender(ws)), asyncio.ensure_future(receiver(ws))]) return ( transcript, interim_transcripts, @@ -207,25 +180,16 @@ def __call__(self, sample): model_timestamps_confirmed, words_with_speakers, ) = asyncio.get_event_loop().run_until_complete( - self.run( - sample, self.api_key, self.channels, - self.sample_width, self.sample_rate - ) + self.run(sample, self.api_key, self.channels, self.sample_width, self.sample_rate) ) return { "transcript": transcript, "interim_transcripts": interim_transcripts, "audio_cursor": audio_cursor_l, - "confirmed_interim_transcripts": ( - confirmed_interim_transcripts - ), + "confirmed_interim_transcripts": (confirmed_interim_transcripts), "confirmed_audio_cursor": confirmed_audio_cursor_l, - "model_timestamps_hypothesis": ( - model_timestamps_hypothesis - ), - "model_timestamps_confirmed": ( - model_timestamps_confirmed - ), + "model_timestamps_hypothesis": (model_timestamps_hypothesis), + "model_timestamps_confirmed": (model_timestamps_confirmed), "words_with_speakers": words_with_speakers, } @@ -235,9 +199,7 @@ class DeepgramStreamingPipelineConfig(StreamingTranscriptionConfig): channels: int sample_width: int realtime_resolution: float - model_version: str = Field( - ..., description="The model to use for real-time transcription" - ) + model_version: str = Field(..., description="The model to use for real-time transcription") @register_pipeline @@ -251,31 +213,19 @@ def parse_input(self, input_sample: StreamingSample): audio_data_byte = y_int16.T.tobytes() return audio_data_byte - def parse_output( - self, output - ) -> StreamingTranscriptionOutput: - model_timestamps_hypothesis = ( - output["model_timestamps_hypothesis"] - ) - model_timestamps_confirmed = ( - output["model_timestamps_confirmed"] - ) + def parse_output(self, output) -> StreamingTranscriptionOutput: + model_timestamps_hypothesis = output["model_timestamps_hypothesis"] + model_timestamps_confirmed = output["model_timestamps_confirmed"] if model_timestamps_hypothesis is not None: model_timestamps_hypothesis = [ - [ - {"start": word["start"], "end": word["end"]} - for word in interim_result_words - ] + [{"start": word["start"], "end": word["end"]} for word in interim_result_words] for interim_result_words in model_timestamps_hypothesis ] if model_timestamps_confirmed is not None: model_timestamps_confirmed = [ - [ - {"start": word["start"], "end": word["end"]} - for word in interim_result_words - ] + [{"start": word["start"], "end": word["end"]} for word in interim_result_words] for interim_result_words in model_timestamps_confirmed ] @@ -284,9 +234,7 @@ def parse_output( audio_cursor=output["audio_cursor"], interim_results=output["interim_transcripts"], confirmed_audio_cursor=output["confirmed_audio_cursor"], - confirmed_interim_results=( - output["confirmed_interim_transcripts"] - ), + confirmed_interim_results=(output["confirmed_interim_transcripts"]), model_timestamps_hypothesis=model_timestamps_hypothesis, model_timestamps_confirmed=model_timestamps_confirmed, ) diff --git a/src/openbench/pipeline/streaming_transcription/speechmatics.py b/src/openbench/pipeline/streaming_transcription/speechmatics.py index b513796..db29ffe 100644 --- a/src/openbench/pipeline/streaming_transcription/speechmatics.py +++ b/src/openbench/pipeline/streaming_transcription/speechmatics.py @@ -24,20 +24,14 @@ class SpeechmaticsApi: def __init__(self, cfg) -> None: self.api_key = os.getenv("SPEECHMATICS_API_KEY") - assert ( - self.api_key is not None - ), "Please set SPEECHMATICS_API_KEY in environment" - self.language = getattr(cfg, 'language', 'en') - self.operating_point = getattr(cfg, 'operating_point', 'enhanced') - self.max_delay = getattr(cfg, 'max_delay', 1) - self.enable_partials = getattr(cfg, 'enable_partials', True) + assert self.api_key is not None, "Please set SPEECHMATICS_API_KEY in environment" + self.language = getattr(cfg, "language", "en") + self.operating_point = getattr(cfg, "operating_point", "enhanced") + self.max_delay = getattr(cfg, "max_delay", 1) + self.enable_partials = getattr(cfg, "enable_partials", True) self.sample_rate = cfg.sample_rate - self.connection_url = os.getenv( - "SPEECHMATICS_URL", "wss://eu2.rt.speechmatics.com/v2" - ) - self.enable_diarization = getattr( - cfg, 'enable_diarization', False - ) + self.connection_url = os.getenv("SPEECHMATICS_URL", "wss://eu2.rt.speechmatics.com/v2") + self.enable_diarization = getattr(cfg, "enable_diarization", False) def __call__(self, sample): # Sample must be in bytes (raw audio data) @@ -66,25 +60,25 @@ def handle_partial_transcript(msg): nonlocal interim_transcripts, audio_cursor_l nonlocal model_timestamps_hypothesis - metadata = msg.get('metadata', {}) - partial_transcript = metadata.get('transcript', '') + metadata = msg.get("metadata", {}) + partial_transcript = metadata.get("transcript", "") if partial_transcript: audio_cursor_l.append(audio_cursor[0]) - interim_transcripts.append( - transcript + " " + partial_transcript - ) + interim_transcripts.append(transcript + " " + partial_transcript) # Collect word timestamps if available - results = msg.get('results', []) + results = msg.get("results", []) if results: words = [] for result in results: - if result.get('type') == 'word': - words.append({ - 'start': result.get('start_time', 0), - 'end': result.get('end_time', 0), - }) + if result.get("type") == "word": + words.append( + { + "start": result.get("start_time", 0), + "end": result.get("end_time", 0), + } + ) if words: model_timestamps_hypothesis.append(words) @@ -96,8 +90,8 @@ def handle_transcript(msg): nonlocal confirmed_audio_cursor_l nonlocal model_timestamps_confirmed, words_with_speakers - metadata = msg.get('metadata', {}) - full_transcript = metadata.get('transcript', '') + metadata = msg.get("metadata", {}) + full_transcript = metadata.get("transcript", "") if full_transcript: confirmed_audio_cursor_l.append(audio_cursor[0]) @@ -105,44 +99,36 @@ def handle_transcript(msg): confirmed_interim_transcripts.append(transcript) # Collect word timestamps and speaker info - results = msg.get('results', []) + results = msg.get("results", []) if results: words = [] for result in results: - if result.get('type') == 'word': + if result.get("type") == "word": # Get alternatives array - alternatives = result.get('alternatives', []) + alternatives = result.get("alternatives", []) if alternatives: # Take first alternative alternative = alternatives[0] word_data = { - 'start': result.get('start_time', 0), - 'end': result.get('end_time', 0), + "start": result.get("start_time", 0), + "end": result.get("end_time", 0), } words.append(word_data) # Collect speaker info if diarization if self.enable_diarization: - speaker_info = alternative.get( - 'speaker', None - ) - word_content = alternative.get( - 'content', '' - ) + speaker_info = alternative.get("speaker", None) + word_content = alternative.get("content", "") if speaker_info is not None: - words_with_speakers.append({ - 'word': word_content, - 'speaker': ( - f"SPEAKER_{speaker_info}" - ), - 'start': result.get( - 'start_time', 0 - ), - 'end': result.get( - 'end_time', 0 - ), - }) + words_with_speakers.append( + { + "word": word_content, + "speaker": (f"SPEAKER_{speaker_info}"), + "start": result.get("start_time", 0), + "end": result.get("end_time", 0), + } + ) if words: model_timestamps_confirmed.append(words) @@ -163,20 +149,20 @@ def handle_transcript(msg): # Audio settings settings = speechmatics.models.AudioSettings( sample_rate=self.sample_rate, - encoding='pcm_s16le', + encoding="pcm_s16le", ) # Transcription config conf_dict = { - 'operating_point': self.operating_point, - 'language': self.language, - 'enable_partials': self.enable_partials, - 'max_delay': self.max_delay, + "operating_point": self.operating_point, + "language": self.language, + "enable_partials": self.enable_partials, + "max_delay": self.max_delay, } # Enable diarization if requested if self.enable_diarization: - conf_dict['diarization'] = 'speaker' + conf_dict["diarization"] = "speaker" conf = speechmatics.models.TranscriptionConfig(**conf_dict) @@ -194,41 +180,20 @@ def handle_transcript(msg): "transcript": transcript.strip(), "interim_transcripts": interim_transcripts, "audio_cursor": audio_cursor_l, - "confirmed_interim_transcripts": ( - confirmed_interim_transcripts - ), + "confirmed_interim_transcripts": (confirmed_interim_transcripts), "confirmed_audio_cursor": confirmed_audio_cursor_l, - "model_timestamps_hypothesis": ( - model_timestamps_hypothesis - ), - "model_timestamps_confirmed": ( - model_timestamps_confirmed - ), + "model_timestamps_hypothesis": (model_timestamps_hypothesis), + "model_timestamps_confirmed": (model_timestamps_confirmed), "words_with_speakers": words_with_speakers, } class SpeechmaticsStreamingPipelineConfig(StreamingTranscriptionConfig): - sample_rate: int = Field( - default=16000, - description="Sample rate of the audio" - ) - language: str = Field( - default="en", - description="Language code for transcription" - ) - operating_point: str = Field( - default="enhanced", - description="Operating point (standard or enhanced)" - ) - max_delay: int = Field( - default=1, - description="Maximum delay in seconds" - ) - enable_partials: bool = Field( - default=True, - description="Enable partial transcripts" - ) + sample_rate: int = Field(default=16000, description="Sample rate of the audio") + language: str = Field(default="en", description="Language code for transcription") + operating_point: str = Field(default="enhanced", description="Operating point (standard or enhanced)") + max_delay: int = Field(default=1, description="Maximum delay in seconds") + enable_partials: bool = Field(default=True, description="Enable partial transcripts") @register_pipeline @@ -242,24 +207,16 @@ def parse_input(self, input_sample: StreamingSample): audio_data_byte = y_int16.tobytes() return audio_data_byte - def parse_output( - self, output - ) -> StreamingTranscriptionOutput: - model_timestamps_hypothesis = ( - output["model_timestamps_hypothesis"] - ) - model_timestamps_confirmed = ( - output["model_timestamps_confirmed"] - ) + def parse_output(self, output) -> StreamingTranscriptionOutput: + model_timestamps_hypothesis = output["model_timestamps_hypothesis"] + model_timestamps_confirmed = output["model_timestamps_confirmed"] prediction = StreamingTranscript( transcript=output["transcript"], audio_cursor=output["audio_cursor"], interim_results=output["interim_transcripts"], confirmed_audio_cursor=output["confirmed_audio_cursor"], - confirmed_interim_results=( - output["confirmed_interim_transcripts"] - ), + confirmed_interim_results=(output["confirmed_interim_transcripts"]), model_timestamps_hypothesis=model_timestamps_hypothesis, model_timestamps_confirmed=model_timestamps_confirmed, ) diff --git a/src/openbench/runner/benchmark.py b/src/openbench/runner/benchmark.py index b923fa3..e2c055f 100644 --- a/src/openbench/runner/benchmark.py +++ b/src/openbench/runner/benchmark.py @@ -107,12 +107,8 @@ def _process_single_sample( ) if pipeline.pipeline_type == PipelineType.DIARIZATION: - sample_results_attributes["num_speakers_predicted"] = ( - output.prediction.num_speakers - ) - sample_results_attributes["num_speakers_reference"] = ( - sample.reference.num_speakers - ) + sample_results_attributes["num_speakers_predicted"] = output.prediction.num_speakers + sample_results_attributes["num_speakers_reference"] = sample.reference.num_speakers sample_result = sample_result_class(**sample_results_attributes)