From fb9921dc95ff9e1c6890153129f918902a61ab93 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 22 Feb 2026 14:57:06 +0000 Subject: [PATCH] feat: integrate pyannote-cloud into listener2 plugin - Add hypr-pyannote-cloud dependency to listener2 - Add BatchProvider::Pyannote variant - Implement run_batch_pyannote with media upload, diarization job submission, polling, and response mapping - Add Pyannote error variant to error.rs - Map pyannote TranscriptionSegments to owhisper batch::Response format with speaker indices Co-Authored-By: yujonglee --- Cargo.lock | 4 + plugins/listener2/Cargo.toml | 4 + plugins/listener2/src/error.rs | 2 + plugins/listener2/src/ext.rs | 257 +++++++++++++++++++++++++++++++++ 4 files changed, 267 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index c8326dae28..2bb38f262e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18383,8 +18383,11 @@ dependencies = [ "language", "owhisper-client", "owhisper-interface", + "pyannote-cloud", "ractor", + "reqwest 0.13.2", "serde", + "serde_json", "specta", "specta-typescript", "tauri", @@ -18395,6 +18398,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "uuid", ] [[package]] diff --git a/plugins/listener2/Cargo.toml b/plugins/listener2/Cargo.toml index fc336f6c25..8bfeaf8a0b 100644 --- a/plugins/listener2/Cargo.toml +++ b/plugins/listener2/Cargo.toml @@ -19,6 +19,7 @@ tauri-plugin-settings = { workspace = true } hypr-audio-utils = { workspace = true } hypr-host = { workspace = true } hypr-language = { workspace = true } +hypr-pyannote-cloud = { workspace = true } owhisper-client = { workspace = true, features = ["argmax"] } owhisper-interface = { workspace = true } @@ -29,8 +30,11 @@ camino = { workspace = true } specta = { workspace = true } tauri-specta = { workspace = true, features = ["derive", "typescript"] } +reqwest = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } +uuid = { workspace = true } ractor = { workspace = true, features = ["async-trait"] } diff --git a/plugins/listener2/src/error.rs b/plugins/listener2/src/error.rs index 0ee00641a7..51a03f58ad 100644 --- a/plugins/listener2/src/error.rs +++ b/plugins/listener2/src/error.rs @@ -12,6 +12,8 @@ pub enum Error { SpawnError(#[from] ractor::SpawnErr), #[error("batch start failed: {0}")] BatchStartFailed(String), + #[error("pyannote error: {0}")] + Pyannote(String), } impl Serialize for Error { diff --git a/plugins/listener2/src/ext.rs b/plugins/listener2/src/ext.rs index 4c74912d60..1da6c15053 100644 --- a/plugins/listener2/src/ext.rs +++ b/plugins/listener2/src/ext.rs @@ -19,6 +19,7 @@ pub enum BatchProvider { Soniox, AssemblyAI, Am, + Pyannote, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, specta::Type)] @@ -92,6 +93,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Listener2<'a, R, M> { ) .await } + BatchProvider::Pyannote => run_batch_pyannote(app, params).await, } } @@ -264,3 +266,258 @@ async fn run_batch_am( .instrument(span) .await } + +const PYANNOTE_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2); +const PYANNOTE_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600); + +fn make_pyannote_client( + base_url: &str, + api_key: &str, +) -> Result { + let mut headers = reqwest::header::HeaderMap::new(); + let auth_value = reqwest::header::HeaderValue::from_str(&format!("Bearer {api_key}")) + .map_err(|e| crate::Error::Pyannote(format!("invalid api key: {e}")))?; + headers.insert(reqwest::header::AUTHORIZATION, auth_value); + + let http_client = reqwest::Client::builder() + .default_headers(headers) + .build() + .map_err(|e| crate::Error::Pyannote(format!("failed to build http client: {e}")))?; + + Ok(hypr_pyannote_cloud::Client::new_with_client( + base_url, + http_client, + )) +} + +async fn pyannote_upload_audio( + client: &hypr_pyannote_cloud::Client, + file_path: &str, +) -> Result { + let media_key = format!("media://{}", uuid::Uuid::new_v4()); + let media_url_parsed = media_key + .parse() + .map_err(|e| crate::Error::Pyannote(format!("failed to parse media url: {e}")))?; + + let upload_body = hypr_pyannote_cloud::types::GetMediaUploadUrl { + url: media_url_parsed, + }; + + let upload_response = client + .get_media_upload_url(&upload_body) + .await + .map_err(|e| crate::Error::Pyannote(format!("failed to get upload url: {e}")))?; + + let presigned_url = upload_response.into_inner().url; + + let file_bytes = tokio::fs::read(file_path) + .await + .map_err(|e| crate::Error::Pyannote(format!("failed to read audio file: {e}")))?; + + let http_client = reqwest::Client::new(); + let put_response = http_client + .put(&presigned_url) + .body(file_bytes) + .send() + .await + .map_err(|e| crate::Error::Pyannote(format!("failed to upload audio: {e}")))?; + + if !put_response.status().is_success() { + return Err(crate::Error::Pyannote(format!( + "audio upload failed with status: {}", + put_response.status() + ))); + } + + Ok(media_key) +} + +async fn pyannote_poll_job( + client: &hypr_pyannote_cloud::Client, + job_id: &str, +) -> Result { + let start = std::time::Instant::now(); + + loop { + if start.elapsed() > PYANNOTE_POLL_TIMEOUT { + return Err(crate::Error::Pyannote(format!( + "job {} timed out after {:?}", + job_id, PYANNOTE_POLL_TIMEOUT + ))); + } + + let response = client + .get_job_by_id(job_id) + .await + .map_err(|e| crate::Error::Pyannote(format!("failed to poll job {job_id}: {e}")))?; + + let job = response.into_inner(); + + let status = match &job { + hypr_pyannote_cloud::types::GetJobByIdResponse::DiarizationJob(j) => j.status.clone(), + hypr_pyannote_cloud::types::GetJobByIdResponse::VoiceprintJob(j) => j.status.clone(), + hypr_pyannote_cloud::types::GetJobByIdResponse::IdentifyJob(j) => j.status.clone(), + }; + + match status { + Some(hypr_pyannote_cloud::types::JobStatus::Succeeded) => { + tracing::info!("pyannote job {job_id} succeeded"); + return Ok(job); + } + Some(hypr_pyannote_cloud::types::JobStatus::Failed) => { + return Err(crate::Error::Pyannote(format!( + "pyannote job {job_id} failed" + ))); + } + Some(hypr_pyannote_cloud::types::JobStatus::Canceled) => { + return Err(crate::Error::Pyannote(format!( + "pyannote job {job_id} was canceled" + ))); + } + _ => { + tracing::debug!("pyannote job {job_id} status: {status:?}, polling again..."); + tokio::time::sleep(PYANNOTE_POLL_INTERVAL).await; + } + } + } +} + +fn pyannote_diarization_to_batch_response( + output: &hypr_pyannote_cloud::types::DiarizationJobOutput, +) -> owhisper_interface::batch::Response { + // Build a speaker label -> index map from diarization segments + let mut speaker_indices: std::collections::HashMap = + std::collections::HashMap::new(); + for segment in &output.diarization { + let next_idx = speaker_indices.len(); + speaker_indices + .entry(segment.speaker.clone()) + .or_insert(next_idx); + } + + // Prefer word-level transcription, fall back to turn-level + let segments = if !output.word_level_transcription.is_empty() { + &output.word_level_transcription + } else { + &output.turn_level_transcription + }; + + let words: Vec = segments + .iter() + .map(|seg| { + let speaker_idx = speaker_indices.get(&seg.speaker).copied(); + owhisper_interface::batch::Word { + word: seg.text.clone(), + start: seg.start, + end: seg.end, + confidence: 1.0, + speaker: speaker_idx, + punctuated_word: Some(seg.text.clone()), + } + }) + .collect(); + + let transcript = words + .iter() + .map(|w| w.word.as_str()) + .collect::>() + .join(" "); + + owhisper_interface::batch::Response { + metadata: serde_json::json!({ + "provider": "pyannote", + "speakers": speaker_indices, + }), + results: owhisper_interface::batch::Results { + channels: vec![owhisper_interface::batch::Channel { + alternatives: vec![owhisper_interface::batch::Alternatives { + transcript, + confidence: 1.0, + words, + }], + }], + }, + } +} + +async fn run_batch_pyannote( + app: tauri::AppHandle, + params: BatchParams, +) -> Result<(), crate::Error> { + let span = session_span(¶ms.session_id); + + async { + BatchEvent::BatchStarted { + session_id: params.session_id.clone(), + } + .emit(&app) + .map_err(|e| { + crate::Error::BatchStartFailed(format!("failed to emit BatchStarted event: {e}")) + })?; + + let client = make_pyannote_client(¶ms.base_url, ¶ms.api_key)?; + + tracing::info!("pyannote: uploading audio file: {}", params.file_path); + let media_url = pyannote_upload_audio(&client, ¶ms.file_path).await?; + + tracing::info!("pyannote: submitting diarization job with url: {media_url}"); + let diarize_request = hypr_pyannote_cloud::types::DiarizeRequest { + url: media_url, + transcription: true, + confidence: false, + exclusive: false, + max_speakers: None, + min_speakers: None, + model: None, + num_speakers: None, + transcription_config: None, + turn_level_confidence: None, + webhook: None, + webhook_status_only: false, + }; + + let job_created = client + .diarize(&diarize_request) + .await + .map_err(|e| crate::Error::Pyannote(format!("failed to submit diarize job: {e}")))? + .into_inner(); + + let job_id = job_created.job_id; + tracing::info!("pyannote: job created with id: {job_id}"); + + let job_result = pyannote_poll_job(&client, &job_id).await?; + + let response = match job_result { + hypr_pyannote_cloud::types::GetJobByIdResponse::DiarizationJob(job) => { + match job.output { + Some(output) => pyannote_diarization_to_batch_response(&output), + None => { + return Err(crate::Error::Pyannote( + "diarization job succeeded but has no output".to_string(), + )); + } + } + } + other => { + return Err(crate::Error::Pyannote(format!( + "expected diarization job response, got: {other:?}" + ))); + } + }; + + tracing::info!("pyannote: batch transcription completed"); + + BatchEvent::BatchResponse { + session_id: params.session_id.clone(), + response, + } + .emit(&app) + .map_err(|e| { + crate::Error::BatchStartFailed(format!("failed to emit BatchResponse event: {e}")) + })?; + + Ok(()) + } + .instrument(span) + .await +}