Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions plugins/listener2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tauri-plugin-settings = { workspace = true }
hypr-audio-utils = { workspace = true }
hypr-host = { workspace = true }
hypr-language = { workspace = true }
hypr-pyannote-cloud = { workspace = true }

owhisper-client = { workspace = true, features = ["argmax"] }
owhisper-interface = { workspace = true }
Expand All @@ -29,8 +30,11 @@ camino = { workspace = true }
specta = { workspace = true }
tauri-specta = { workspace = true, features = ["derive", "typescript"] }

reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }

ractor = { workspace = true, features = ["async-trait"] }

Expand Down
2 changes: 2 additions & 0 deletions plugins/listener2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum Error {
SpawnError(#[from] ractor::SpawnErr),
#[error("batch start failed: {0}")]
BatchStartFailed(String),
#[error("pyannote error: {0}")]
Pyannote(String),
}

impl Serialize for Error {
Expand Down
257 changes: 257 additions & 0 deletions plugins/listener2/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub enum BatchProvider {
Soniox,
AssemblyAI,
Am,
Pyannote,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, specta::Type)]
Expand Down Expand Up @@ -92,6 +93,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Listener2<'a, R, M> {
)
.await
}
BatchProvider::Pyannote => run_batch_pyannote(app, params).await,
}
}

Expand Down Expand Up @@ -264,3 +266,258 @@ async fn run_batch_am(
.instrument(span)
.await
}

const PYANNOTE_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
const PYANNOTE_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);

fn make_pyannote_client(
base_url: &str,
api_key: &str,
) -> Result<hypr_pyannote_cloud::Client, crate::Error> {
let mut headers = reqwest::header::HeaderMap::new();
let auth_value = reqwest::header::HeaderValue::from_str(&format!("Bearer {api_key}"))
.map_err(|e| crate::Error::Pyannote(format!("invalid api key: {e}")))?;
headers.insert(reqwest::header::AUTHORIZATION, auth_value);

let http_client = reqwest::Client::builder()
.default_headers(headers)
.build()
.map_err(|e| crate::Error::Pyannote(format!("failed to build http client: {e}")))?;

Ok(hypr_pyannote_cloud::Client::new_with_client(
base_url,
http_client,
))
}

async fn pyannote_upload_audio(
client: &hypr_pyannote_cloud::Client,
file_path: &str,
) -> Result<String, crate::Error> {
let media_key = format!("media://{}", uuid::Uuid::new_v4());
let media_url_parsed = media_key
.parse()
.map_err(|e| crate::Error::Pyannote(format!("failed to parse media url: {e}")))?;

let upload_body = hypr_pyannote_cloud::types::GetMediaUploadUrl {
url: media_url_parsed,
};

let upload_response = client
.get_media_upload_url(&upload_body)
.await
.map_err(|e| crate::Error::Pyannote(format!("failed to get upload url: {e}")))?;

let presigned_url = upload_response.into_inner().url;

let file_bytes = tokio::fs::read(file_path)
.await
.map_err(|e| crate::Error::Pyannote(format!("failed to read audio file: {e}")))?;

let http_client = reqwest::Client::new();
let put_response = http_client
.put(&presigned_url)
.body(file_bytes)
.send()
.await
.map_err(|e| crate::Error::Pyannote(format!("failed to upload audio: {e}")))?;

if !put_response.status().is_success() {
return Err(crate::Error::Pyannote(format!(
"audio upload failed with status: {}",
put_response.status()
)));
}

Ok(media_key)
}

async fn pyannote_poll_job(
client: &hypr_pyannote_cloud::Client,
job_id: &str,
) -> Result<hypr_pyannote_cloud::types::GetJobByIdResponse, crate::Error> {
let start = std::time::Instant::now();

loop {
if start.elapsed() > PYANNOTE_POLL_TIMEOUT {
return Err(crate::Error::Pyannote(format!(
"job {} timed out after {:?}",
job_id, PYANNOTE_POLL_TIMEOUT
)));
}

let response = client
.get_job_by_id(job_id)
.await
.map_err(|e| crate::Error::Pyannote(format!("failed to poll job {job_id}: {e}")))?;

let job = response.into_inner();

let status = match &job {
hypr_pyannote_cloud::types::GetJobByIdResponse::DiarizationJob(j) => j.status.clone(),
hypr_pyannote_cloud::types::GetJobByIdResponse::VoiceprintJob(j) => j.status.clone(),
hypr_pyannote_cloud::types::GetJobByIdResponse::IdentifyJob(j) => j.status.clone(),
};

match status {
Some(hypr_pyannote_cloud::types::JobStatus::Succeeded) => {
tracing::info!("pyannote job {job_id} succeeded");
return Ok(job);
}
Some(hypr_pyannote_cloud::types::JobStatus::Failed) => {
return Err(crate::Error::Pyannote(format!(
"pyannote job {job_id} failed"
)));
}
Some(hypr_pyannote_cloud::types::JobStatus::Canceled) => {
return Err(crate::Error::Pyannote(format!(
"pyannote job {job_id} was canceled"
)));
}
_ => {
tracing::debug!("pyannote job {job_id} status: {status:?}, polling again...");
tokio::time::sleep(PYANNOTE_POLL_INTERVAL).await;
}
}
}
}

fn pyannote_diarization_to_batch_response(
output: &hypr_pyannote_cloud::types::DiarizationJobOutput,
) -> owhisper_interface::batch::Response {
// Build a speaker label -> index map from diarization segments
let mut speaker_indices: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for segment in &output.diarization {
let next_idx = speaker_indices.len();
speaker_indices
.entry(segment.speaker.clone())
.or_insert(next_idx);
}

// Prefer word-level transcription, fall back to turn-level
let segments = if !output.word_level_transcription.is_empty() {
&output.word_level_transcription
} else {
&output.turn_level_transcription
};

let words: Vec<owhisper_interface::batch::Word> = segments
.iter()
.map(|seg| {
let speaker_idx = speaker_indices.get(&seg.speaker).copied();
owhisper_interface::batch::Word {
word: seg.text.clone(),
start: seg.start,
end: seg.end,
confidence: 1.0,
speaker: speaker_idx,
punctuated_word: Some(seg.text.clone()),
}
})
.collect();

let transcript = words
.iter()
.map(|w| w.word.as_str())
.collect::<Vec<_>>()
.join(" ");

owhisper_interface::batch::Response {
metadata: serde_json::json!({
"provider": "pyannote",
"speakers": speaker_indices,
}),
results: owhisper_interface::batch::Results {
channels: vec![owhisper_interface::batch::Channel {
alternatives: vec![owhisper_interface::batch::Alternatives {
transcript,
confidence: 1.0,
words,
}],
}],
},
}
}

async fn run_batch_pyannote(
app: tauri::AppHandle,
params: BatchParams,
) -> Result<(), crate::Error> {
let span = session_span(&params.session_id);

async {
BatchEvent::BatchStarted {
session_id: params.session_id.clone(),
}
.emit(&app)
.map_err(|e| {
crate::Error::BatchStartFailed(format!("failed to emit BatchStarted event: {e}"))
})?;

let client = make_pyannote_client(&params.base_url, &params.api_key)?;

tracing::info!("pyannote: uploading audio file: {}", params.file_path);
let media_url = pyannote_upload_audio(&client, &params.file_path).await?;

tracing::info!("pyannote: submitting diarization job with url: {media_url}");
let diarize_request = hypr_pyannote_cloud::types::DiarizeRequest {
url: media_url,
transcription: true,
confidence: false,
exclusive: false,
max_speakers: None,
min_speakers: None,
model: None,
num_speakers: None,
transcription_config: None,
turn_level_confidence: None,
webhook: None,
webhook_status_only: false,
};

let job_created = client
.diarize(&diarize_request)
.await
.map_err(|e| crate::Error::Pyannote(format!("failed to submit diarize job: {e}")))?
.into_inner();

let job_id = job_created.job_id;
tracing::info!("pyannote: job created with id: {job_id}");

let job_result = pyannote_poll_job(&client, &job_id).await?;

let response = match job_result {
hypr_pyannote_cloud::types::GetJobByIdResponse::DiarizationJob(job) => {
match job.output {
Some(output) => pyannote_diarization_to_batch_response(&output),
None => {
return Err(crate::Error::Pyannote(
"diarization job succeeded but has no output".to_string(),
));
}
}
}
other => {
return Err(crate::Error::Pyannote(format!(
"expected diarization job response, got: {other:?}"
)));
}
};

tracing::info!("pyannote: batch transcription completed");

BatchEvent::BatchResponse {
session_id: params.session_id.clone(),
response,
}
.emit(&app)
.map_err(|e| {
crate::Error::BatchStartFailed(format!("failed to emit BatchResponse event: {e}"))
})?;

Ok(())
}
.instrument(span)
.await
}
Loading