diff --git a/Cargo.lock b/Cargo.lock index 3604d8f865..c1c0336429 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10278,6 +10278,38 @@ dependencies = [ "rand_chacha 0.3.1", ] +[[package]] +name = "listener-core" +version = "0.1.0" +dependencies = [ + "aec", + "audio", + "audio-utils", + "bytes", + "device-monitor", + "futures-util", + "host", + "hound", + "language", + "owhisper-client", + "owhisper-interface", + "ractor", + "sentry", + "serde", + "serde_json", + "specta", + "supervisor", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid", + "vad-ext", + "vorbis_rs", +] + [[package]] name = "litemap" version = "0.7.5" @@ -12055,15 +12087,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "ordered-float" -version = "5.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" -dependencies = [ - "num-traits", -] - [[package]] name = "ordered-multimap" version = "0.7.3" @@ -18056,54 +18079,31 @@ dependencies = [ name = "tauri-plugin-listener" version = "0.1.0" dependencies = [ - "aec", "audio", - "audio-device", - "audio-utils", - "bytes", - "chrono", - "codes-iso-639", - "device-monitor", "dirs 6.0.0", - "futures-util", - "host", - "hound", "insta", - "intercept", "language", - "mac 0.1.0", - "ordered-float", + "listener-core", "owhisper-client", "owhisper-interface", "quickcheck", "quickcheck_macros", "ractor", "rodio", - "sentry", "serde", "serde_json", "specta", "specta-typescript", - "strum 0.27.2", - "supervisor", "tauri", "tauri-plugin", - "tauri-plugin-fs-sync", "tauri-plugin-hooks", "tauri-plugin-local-stt", "tauri-plugin-settings", "tauri-plugin-tray", "tauri-specta", "thiserror 2.0.18", - "tokio", - "tokio-stream", - "tokio-util", "tracing", - "transcript", - "url", "uuid", - "vad-ext", - "vorbis_rs", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 139f933451..d9b8a17aaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ hypr-importer-core = { path = "crates/importer-core", package = "importer-core" hypr-intercept = { path = "crates/intercept", package = "intercept" } hypr-jina = { path = "crates/jina", package = "jina" } hypr-language = { path = "crates/language", package = "language" } +hypr-listener-core = { path = "crates/listener-core", package = "listener-core" } hypr-llm-cactus = { path = "crates/llm-cactus", package = "llm-cactus" } hypr-llm-proxy = { path = "crates/llm-proxy", package = "llm-proxy" } hypr-llm-types = { path = "crates/llm-types", package = "llm-types" } diff --git a/crates/listener-core/Cargo.toml b/crates/listener-core/Cargo.toml new file mode 100644 index 0000000000..e35a8789ee --- /dev/null +++ b/crates/listener-core/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "listener-core" +version = "0.1.0" +edition = "2024" + +[features] +default = [] +specta = ["dep:specta"] + +[dependencies] +hypr-aec = { workspace = true } +hypr-audio = { workspace = true } +hypr-audio-utils = { workspace = true } +hypr-device-monitor = { workspace = true } +hypr-host = { workspace = true } +hypr-language = { workspace = true } +hypr-supervisor = { workspace = true } +hypr-vad-ext = { workspace = true } + +owhisper-client = { workspace = true } +owhisper-interface = { workspace = true } + +hound = { workspace = true } +vorbis_rs = { workspace = true } + +ractor = { workspace = true, features = ["async-trait"] } + +bytes = { workspace = true } +futures-util = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +specta = { workspace = true, optional = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } +tokio-stream = { workspace = true } +tokio-util = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } + +sentry = { workspace = true } + +[dev-dependencies] +tracing-subscriber = { workspace = true } diff --git a/crates/listener-core/examples/cli.rs b/crates/listener-core/examples/cli.rs new file mode 100644 index 0000000000..bf39f38232 --- /dev/null +++ b/crates/listener-core/examples/cli.rs @@ -0,0 +1,161 @@ +use std::sync::Arc; + +use listener_core::{ + ListenerRuntime, SessionDataEvent, SessionErrorEvent, SessionLifecycleEvent, + SessionProgressEvent, + actors::{RootActor, RootArgs, RootMsg, SessionParams}, +}; +use ractor::Actor; + +struct CliRuntime { + sessions_dir: std::path::PathBuf, +} + +impl ListenerRuntime for CliRuntime { + fn sessions_dir(&self) -> Result { + Ok(self.sessions_dir.clone()) + } + + fn emit_lifecycle(&self, event: SessionLifecycleEvent) { + match &event { + SessionLifecycleEvent::Active { session_id, error } => { + if let Some(err) = error { + eprintln!("[lifecycle] active (degraded) session={session_id} error={err:?}"); + } else { + eprintln!("[lifecycle] active session={session_id}"); + } + } + SessionLifecycleEvent::Inactive { session_id, error } => { + eprintln!("[lifecycle] inactive session={session_id} error={error:?}"); + } + SessionLifecycleEvent::Finalizing { session_id } => { + eprintln!("[lifecycle] finalizing session={session_id}"); + } + } + } + + fn emit_progress(&self, event: SessionProgressEvent) { + match &event { + SessionProgressEvent::AudioInitializing { .. } => { + eprintln!("[progress] audio initializing..."); + } + SessionProgressEvent::AudioReady { device, .. } => { + eprintln!("[progress] audio ready device={device:?}"); + } + SessionProgressEvent::Connecting { .. } => { + eprintln!("[progress] connecting to STT..."); + } + SessionProgressEvent::Connected { adapter, .. } => { + eprintln!("[progress] connected via {adapter}"); + } + } + } + + fn emit_error(&self, event: SessionErrorEvent) { + match &event { + SessionErrorEvent::AudioError { error, device, .. } => { + eprintln!("[error] audio: {error} device={device:?}"); + } + SessionErrorEvent::ConnectionError { error, .. } => { + eprintln!("[error] connection: {error}"); + } + } + } + + fn emit_data(&self, event: SessionDataEvent) { + match &event { + SessionDataEvent::AudioAmplitude { mic, speaker, .. } => { + let mic_bar = "█".repeat((*mic as usize) / 50); + let spk_bar = "█".repeat((*speaker as usize) / 50); + eprint!("\r[audio] mic {mic_bar:<20} | spk {spk_bar:<20}"); + } + SessionDataEvent::StreamResponse { response, .. } => { + println!("{}", serde_json::to_string(&response).unwrap_or_default()); + } + SessionDataEvent::MicMuted { value, .. } => { + eprintln!("[data] mic muted={value}"); + } + } + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .init(); + + let base_url = std::env::var("LISTENER_BASE_URL").unwrap_or_else(|_| { + eprintln!("Usage: LISTENER_BASE_URL=... LISTENER_API_KEY=... cargo run --example cli"); + eprintln!(); + eprintln!(" LISTENER_BASE_URL STT provider URL (required)"); + eprintln!(" LISTENER_API_KEY API key (default: empty)"); + eprintln!(" LISTENER_MODEL Model name (default: empty)"); + eprintln!(" LISTENER_LANGUAGE Language code (default: en)"); + eprintln!(" LISTENER_RECORD Enable WAV recording (default: false)"); + std::process::exit(1); + }); + + let api_key = std::env::var("LISTENER_API_KEY").unwrap_or_default(); + let model = std::env::var("LISTENER_MODEL").unwrap_or_default(); + let language = std::env::var("LISTENER_LANGUAGE").unwrap_or_else(|_| "en".into()); + let record_enabled = std::env::var("LISTENER_RECORD") + .map(|v| v == "1" || v == "true") + .unwrap_or(false); + + let languages = vec![ + language + .parse::() + .expect("invalid language code"), + ]; + + let session_id = uuid::Uuid::new_v4().to_string(); + let sessions_dir = std::env::temp_dir().join("listener-cli").join("sessions"); + + let runtime = Arc::new(CliRuntime { sessions_dir }); + + let (root_ref, _handle) = Actor::spawn( + Some(RootActor::name()), + RootActor, + RootArgs { + runtime: runtime.clone(), + }, + ) + .await + .expect("failed to spawn root actor"); + + eprintln!("Starting session {session_id}..."); + eprintln!("Press Ctrl+C to stop."); + eprintln!(); + + let params = SessionParams { + session_id: session_id.clone(), + languages, + onboarding: false, + record_enabled, + model, + base_url, + api_key, + keywords: vec![], + }; + + let started = ractor::call!(root_ref, RootMsg::StartSession, params) + .expect("failed to send start message"); + + if !started { + eprintln!("Failed to start session"); + std::process::exit(1); + } + + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c"); + + eprintln!(); + eprintln!("Stopping session..."); + + let _ = ractor::call!(root_ref, RootMsg::StopSession); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + eprintln!("Done."); +} diff --git a/plugins/listener/src/actors/listener/adapters.rs b/crates/listener-core/src/actors/listener/adapters.rs similarity index 95% rename from plugins/listener/src/actors/listener/adapters.rs rename to crates/listener-core/src/actors/listener/adapters.rs index 0f45ae5b5a..041f876386 100644 --- a/plugins/listener/src/actors/listener/adapters.rs +++ b/crates/listener-core/src/actors/listener/adapters.rs @@ -2,7 +2,6 @@ use std::time::{Duration, UNIX_EPOCH}; use bytes::Bytes; use ractor::{ActorProcessingErr, ActorRef}; -use tauri_specta::Event; use owhisper_client::{ AdapterKind, ArgmaxAdapter, AssemblyAIAdapter, CactusAdapter, DashScopeAdapter, @@ -181,20 +180,18 @@ async fn spawn_rx_task_single_with_adapter( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), "listen_ws_connect_timeout(single)" ); - let _ = (SessionErrorEvent::ConnectionError { + args.runtime.emit_error(SessionErrorEvent::ConnectionError { session_id: args.session_id.clone(), error: "listen_ws_connect_timeout".to_string(), - }) - .emit(&args.app); + }); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { tracing::error!(session_id = %args.session_id, error = ?e, "listen_ws_connect_failed(single)"); - let _ = (SessionErrorEvent::ConnectionError { + args.runtime.emit_error(SessionErrorEvent::ConnectionError { session_id: args.session_id.clone(), error: format!("listen_ws_connect_failed: {:?}", e), - }) - .emit(&args.app); + }); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, @@ -253,20 +250,18 @@ async fn spawn_rx_task_dual_with_adapter( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), "listen_ws_connect_timeout(dual)" ); - let _ = (SessionErrorEvent::ConnectionError { + args.runtime.emit_error(SessionErrorEvent::ConnectionError { session_id: args.session_id.clone(), error: "listen_ws_connect_timeout".to_string(), - }) - .emit(&args.app); + }); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { tracing::error!(session_id = %args.session_id, error = ?e, "listen_ws_connect_failed(dual)"); - let _ = (SessionErrorEvent::ConnectionError { + args.runtime.emit_error(SessionErrorEvent::ConnectionError { session_id: args.session_id.clone(), error: format!("listen_ws_connect_failed: {:?}", e), - }) - .emit(&args.app); + }); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, diff --git a/plugins/listener/src/actors/listener/mod.rs b/crates/listener-core/src/actors/listener/mod.rs similarity index 83% rename from plugins/listener/src/actors/listener/mod.rs rename to crates/listener-core/src/actors/listener/mod.rs index d1a3356688..5deb445b44 100644 --- a/plugins/listener/src/actors/listener/mod.rs +++ b/crates/listener-core/src/actors/listener/mod.rs @@ -1,11 +1,11 @@ mod adapters; mod stream; +use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use bytes::Bytes; use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef, SupervisionEvent}; -use tauri_specta::Event; use tokio::time::error::Elapsed; use tracing::Instrument; @@ -13,7 +13,9 @@ use owhisper_interface::stream::StreamResponse; use owhisper_interface::{ControlMessage, MixedMessage}; use super::session::session_span; -use crate::{DegradedError, SessionDataEvent, SessionErrorEvent, SessionProgressEvent}; +use crate::{ + DegradedError, ListenerRuntime, SessionDataEvent, SessionErrorEvent, SessionProgressEvent, +}; use adapters::spawn_rx_task; @@ -32,7 +34,7 @@ pub enum ListenerMsg { #[derive(Clone)] pub struct ListenerArgs { - pub app: tauri::AppHandle, + pub runtime: Arc, pub languages: Vec, pub onboarding: bool, pub model: String, @@ -95,25 +97,18 @@ impl Actor for ListenerActor { let span = session_span(&session_id); async { - if let Err(error) = (SessionProgressEvent::Connecting { - session_id: session_id.clone(), - }) - .emit(&args.app) - { - tracing::error!(?error, "failed_to_emit_connecting"); - } + args.runtime + .emit_progress(SessionProgressEvent::Connecting { + session_id: session_id.clone(), + }); let (tx, rx_task, shutdown_tx, adapter_name) = spawn_rx_task(args.clone(), myself).await?; - if let Err(error) = (SessionProgressEvent::Connected { + args.runtime.emit_progress(SessionProgressEvent::Connected { session_id: session_id.clone(), adapter: adapter_name, - }) - .emit(&args.app) - { - tracing::error!(?error, "failed_to_emit_connected"); - } + }); let state = ListenerState { args, @@ -175,18 +170,20 @@ impl Actor for ListenerActor { %provider, "stream_provider_error" ); - let _ = (SessionErrorEvent::ConnectionError { - session_id: state.args.session_id.clone(), - error: format!( - "[{}] {} (code: {})", - provider, - error_message, - error_code - .map(|c| c.to_string()) - .unwrap_or_else(|| "none".to_string()) - ), - }) - .emit(&state.args.app); + state + .args + .runtime + .emit_error(SessionErrorEvent::ConnectionError { + session_id: state.args.session_id.clone(), + error: format!( + "[{}] {} (code: {})", + provider, + error_message, + error_code + .map(|c| c.to_string()) + .unwrap_or_else(|| "none".to_string()) + ), + }); let degraded = match *error_code { Some(401) | Some(403) => DegradedError::AuthenticationFailed { provider: provider.clone(), @@ -209,14 +206,13 @@ impl Actor for ListenerActor { crate::actors::ChannelMode::MicAndSpeaker => {} } - if let Err(error) = (SessionDataEvent::StreamResponse { - session_id: state.args.session_id.clone(), - response: Box::new(response), - }) - .emit(&state.args.app) - { - tracing::error!(?error, "stream_response_emit_failed"); - } + state + .args + .runtime + .emit_data(SessionDataEvent::StreamResponse { + session_id: state.args.session_id.clone(), + response: Box::new(response), + }); } ListenerMsg::StreamError(error) => { diff --git a/plugins/listener/src/actors/listener/stream.rs b/crates/listener-core/src/actors/listener/stream.rs similarity index 100% rename from plugins/listener/src/actors/listener/stream.rs rename to crates/listener-core/src/actors/listener/stream.rs diff --git a/plugins/listener/src/actors/mod.rs b/crates/listener-core/src/actors/mod.rs similarity index 91% rename from plugins/listener/src/actors/mod.rs rename to crates/listener-core/src/actors/mod.rs index 6ef8a133b9..28dfdedcb6 100644 --- a/plugins/listener/src/actors/mod.rs +++ b/crates/listener-core/src/actors/mod.rs @@ -1,8 +1,8 @@ -mod listener; -mod recorder; -mod root; -mod session; -mod source; +pub mod listener; +pub mod recorder; +pub mod root; +pub mod session; +pub mod source; pub use listener::*; pub use recorder::*; diff --git a/plugins/listener/src/actors/recorder.rs b/crates/listener-core/src/actors/recorder.rs similarity index 89% rename from plugins/listener/src/actors/recorder.rs rename to crates/listener-core/src/actors/recorder.rs index 259333051a..47e60b64f5 100644 --- a/plugins/listener/src/actors/recorder.rs +++ b/crates/listener-core/src/actors/recorder.rs @@ -1,6 +1,6 @@ use std::fs::File; use std::io::BufWriter; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; @@ -9,7 +9,6 @@ use hypr_audio_utils::{ ogg_has_identical_channels, }; use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef}; -use tauri_plugin_fs_sync::find_session_dir; const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000); @@ -185,13 +184,46 @@ impl Actor for RecorderActor { } } +// Duplicated from plugins/fs-sync/src/session.rs to avoid Tauri plugin dependency. +pub fn find_session_dir(sessions_base: &Path, session_id: &str) -> PathBuf { + if let Some(found) = find_session_dir_recursive(sessions_base, session_id) { + return found; + } + sessions_base.join(session_id) +} + +fn find_session_dir_recursive(dir: &Path, session_id: &str) -> Option { + let entries = std::fs::read_dir(dir).ok()?; + + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + let name = path.file_name()?.to_str()?; + + if name == session_id { + return Some(path); + } + + if uuid::Uuid::try_parse(name).is_err() + && let Some(found) = find_session_dir_recursive(&path, session_id) + { + return Some(found); + } + } + + None +} + fn into_actor_err(err: hypr_audio_utils::Error) -> ActorProcessingErr { Box::new(err) } fn is_debug_mode() -> bool { cfg!(debug_assertions) - || std::env::var("HYPRNOTE_DEBUG") + || std::env::var("LISTENER_DEBUG") .map(|v| !v.is_empty() && v != "0" && v != "false") .unwrap_or(false) } diff --git a/plugins/listener/src/actors/root.rs b/crates/listener-core/src/actors/root.rs similarity index 78% rename from plugins/listener/src/actors/root.rs rename to crates/listener-core/src/actors/root.rs index d15f15176c..5fc41cd677 100644 --- a/plugins/listener/src/actors/root.rs +++ b/crates/listener-core/src/actors/root.rs @@ -1,30 +1,29 @@ +use std::sync::Arc; use std::time::{Instant, SystemTime}; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent}; -use tauri_plugin_settings::SettingsPluginExt; -use tauri_specta::Event; use tracing::Instrument; -use crate::SessionLifecycleEvent; use crate::actors::session::lifecycle::{ clear_sentry_session_context, configure_sentry_session_context, emit_session_ended, }; use crate::actors::{ SessionContext, SessionMsg, SessionParams, session_span, spawn_session_supervisor, }; +use crate::{ListenerRuntime, SessionLifecycleEvent, State}; pub enum RootMsg { StartSession(SessionParams, RpcReplyPort), StopSession(RpcReplyPort<()>), - GetState(RpcReplyPort), + GetState(RpcReplyPort), } pub struct RootArgs { - pub app: tauri::AppHandle, + pub runtime: Arc, } pub struct RootState { - app: tauri::AppHandle, + runtime: Arc, session_id: Option, supervisor: Option, finalizing: bool, @@ -50,7 +49,7 @@ impl Actor for RootActor { args: Self::Arguments, ) -> Result { Ok(RootState { - app: args.app, + runtime: args.runtime, session_id: None, supervisor: None, finalizing: false, @@ -74,11 +73,11 @@ impl Actor for RootActor { } RootMsg::GetState(reply) => { let fsm_state = if state.finalizing { - crate::State::Finalizing + State::Finalizing } else if state.supervisor.is_some() { - crate::State::Active + State::Active } else { - crate::State::Inactive + State::Inactive }; let _ = reply.send(fsm_state); } @@ -105,7 +104,7 @@ impl Actor for RootActor { state.supervisor = None; state.finalizing = false; - emit_session_ended(&state.app, &session_id, reason); + emit_session_ended(&*state.runtime, &session_id, reason); } } SupervisionEvent::ActorFailed(cell, error) => { @@ -118,7 +117,7 @@ impl Actor for RootActor { tracing::warn!(?error, "session_supervisor_failed"); state.supervisor = None; state.finalizing = false; - emit_session_ended(&state.app, &session_id, Some(format!("{:?}", error))); + emit_session_ended(&*state.runtime, &session_id, Some(format!("{:?}", error))); } } } @@ -142,22 +141,17 @@ async fn start_session_impl( configure_sentry_session_context(¶ms); - let app_dir = match state.app.settings().cached_vault_base() { - Ok(base) => base.join("sessions").into_std_path_buf(), + let app_dir = match state.runtime.sessions_dir() { + Ok(dir) => dir, Err(e) => { - tracing::error!(error = ?e, "failed_to_resolve_sessions_base_dir"); + tracing::error!(error = %e, "failed_to_resolve_sessions_dir"); clear_sentry_session_context(); return false; } }; - { - use tauri_plugin_tray::TrayPluginExt; - let _ = state.app.tray().set_start_disabled(true); - } - let ctx = SessionContext { - app: state.app.clone(), + runtime: state.runtime.clone(), params: params.clone(), app_dir, started_at_instant: Instant::now(), @@ -171,14 +165,10 @@ async fn start_session_impl( state.session_id = Some(params.session_id.clone()); state.supervisor = Some(supervisor_cell); - if let Err(error) = (SessionLifecycleEvent::Active { + state.runtime.emit_lifecycle(SessionLifecycleEvent::Active { session_id: params.session_id, error: None, - }) - .emit(&state.app) - { - tracing::error!(?error, "failed_to_emit_active"); - } + }); tracing::info!("session_started"); true @@ -186,9 +176,6 @@ async fn start_session_impl( Err(e) => { tracing::error!(error = ?e, "failed_to_start_session"); clear_sentry_session_context(); - - use tauri_plugin_tray::TrayPluginExt; - let _ = state.app.tray().set_start_disabled(false); false } } @@ -206,13 +193,11 @@ async fn stop_session_impl(state: &mut RootState) { let _guard = span.enter(); tracing::info!("session_finalizing"); - if let Err(error) = (SessionLifecycleEvent::Finalizing { - session_id: session_id.clone(), - }) - .emit(&state.app) - { - tracing::error!(?error, "failed_to_emit_finalizing"); - } + state + .runtime + .emit_lifecycle(SessionLifecycleEvent::Finalizing { + session_id: session_id.clone(), + }); } let session_ref: ActorRef = supervisor.clone().into(); diff --git a/plugins/listener/src/actors/session/lifecycle.rs b/crates/listener-core/src/actors/session/lifecycle.rs similarity index 85% rename from plugins/listener/src/actors/session/lifecycle.rs rename to crates/listener-core/src/actors/session/lifecycle.rs index bc29b226b2..6d989dad36 100644 --- a/plugins/listener/src/actors/session/lifecycle.rs +++ b/crates/listener-core/src/actors/session/lifecycle.rs @@ -1,10 +1,8 @@ use std::collections::BTreeMap; -use tauri_specta::Event; - use super::SessionParams; use super::session_span; -use crate::SessionLifecycleEvent; +use crate::{ListenerRuntime, SessionLifecycleEvent}; pub(crate) fn configure_sentry_session_context(params: &SessionParams) { sentry::configure_scope(|scope| { @@ -40,26 +38,17 @@ pub(crate) fn clear_sentry_session_context() { } pub(crate) fn emit_session_ended( - app: &tauri::AppHandle, + runtime: &dyn ListenerRuntime, session_id: &str, failure_reason: Option, ) { let span = session_span(session_id); let _guard = span.enter(); - { - use tauri_plugin_tray::TrayPluginExt; - let _ = app.tray().set_start_disabled(false); - } - - if let Err(error) = (SessionLifecycleEvent::Inactive { + runtime.emit_lifecycle(SessionLifecycleEvent::Inactive { session_id: session_id.to_string(), error: failure_reason.clone(), - }) - .emit(app) - { - tracing::error!(?error, "failed_to_emit_inactive"); - } + }); if let Some(reason) = failure_reason { tracing::info!(failure_reason = %reason, "session_stopped"); diff --git a/plugins/listener/src/actors/session/mod.rs b/crates/listener-core/src/actors/session/mod.rs similarity index 100% rename from plugins/listener/src/actors/session/mod.rs rename to crates/listener-core/src/actors/session/mod.rs diff --git a/plugins/listener/src/actors/session/supervisor.rs b/crates/listener-core/src/actors/session/supervisor.rs similarity index 92% rename from plugins/listener/src/actors/session/supervisor.rs rename to crates/listener-core/src/actors/session/supervisor.rs index 93138f6c39..75fc2439b1 100644 --- a/plugins/listener/src/actors/session/supervisor.rs +++ b/crates/listener-core/src/actors/session/supervisor.rs @@ -1,16 +1,14 @@ use hypr_supervisor::{RestartBudget, RestartTracker, RetryStrategy, spawn_with_retry}; use ractor::concurrency::Duration; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; -use tauri_specta::Event; use tracing::Instrument; -use crate::DegradedError; -use crate::SessionLifecycleEvent; use crate::actors::session::lifecycle; use crate::actors::session::types::{SessionContext, session_span, session_supervisor_name}; use crate::actors::{ ChannelMode, ListenerActor, ListenerArgs, RecArgs, RecorderActor, SourceActor, SourceArgs, }; +use crate::{DegradedError, SessionLifecycleEvent}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum ChildKind { @@ -68,7 +66,7 @@ impl Actor for SessionActor { SourceArgs { mic_device: None, onboarding: ctx.params.onboarding, - app: ctx.app.clone(), + runtime: ctx.runtime.clone(), session_id: ctx.params.session_id.clone(), }, myself.get_cell(), @@ -120,7 +118,7 @@ impl Actor for SessionActor { Some(ListenerActor::name()), ListenerActor, ListenerArgs { - app: state.ctx.app.clone(), + runtime: state.ctx.runtime.clone(), languages: state.ctx.params.languages.clone(), onboarding: state.ctx.params.onboarding, model: state.ctx.params.model.clone(), @@ -145,11 +143,13 @@ impl Actor for SessionActor { let degraded = DegradedError::UpstreamUnavailable { message: classify_connection_failure(base_url), }; - let _ = (SessionLifecycleEvent::Active { - session_id: state.ctx.params.session_id.clone(), - error: Some(degraded), - }) - .emit(&state.ctx.app); + state + .ctx + .runtime + .emit_lifecycle(SessionLifecycleEvent::Active { + session_id: state.ctx.params.session_id.clone(), + error: Some(degraded), + }); } } Ok(()) @@ -212,11 +212,13 @@ impl Actor for SessionActor { let degraded = parse_degraded_reason(reason.as_ref()); state.listener_cell = None; - let _ = (SessionLifecycleEvent::Active { - session_id: state.ctx.params.session_id.clone(), - error: Some(degraded), - }) - .emit(&state.ctx.app); + state + .ctx + .runtime + .emit_lifecycle(SessionLifecycleEvent::Active { + session_id: state.ctx.params.session_id.clone(), + error: Some(degraded), + }); } Some(ChildKind::Source) => { tracing::info!(?reason, "source_terminated_attempting_restart"); @@ -248,11 +250,13 @@ impl Actor for SessionActor { }; state.listener_cell = None; - let _ = (SessionLifecycleEvent::Active { - session_id: state.ctx.params.session_id.clone(), - error: Some(degraded), - }) - .emit(&state.ctx.app); + state + .ctx + .runtime + .emit_lifecycle(SessionLifecycleEvent::Active { + session_id: state.ctx.params.session_id.clone(), + error: Some(degraded), + }); } Some(ChildKind::Source) => { tracing::warn!(?error, "source_failed_attempting_restart"); @@ -311,12 +315,12 @@ async fn try_restart_source(supervisor_cell: ActorCell, state: &mut SessionState let sup = supervisor_cell; let onboarding = state.ctx.params.onboarding; - let app = state.ctx.app.clone(); + let runtime = state.ctx.runtime.clone(); let session_id = state.ctx.params.session_id.clone(); let cell = spawn_with_retry(&RETRY_STRATEGY, || { let sup = sup.clone(); - let app = app.clone(); + let runtime = runtime.clone(); let session_id = session_id.clone(); async move { let (r, _) = Actor::spawn_linked( @@ -325,7 +329,7 @@ async fn try_restart_source(supervisor_cell: ActorCell, state: &mut SessionState SourceArgs { mic_device: None, onboarding, - app, + runtime, session_id, }, sup, @@ -406,10 +410,8 @@ async fn meltdown(myself: ActorRef, state: &mut SessionState) { fn classify_connection_failure(base_url: &str) -> String { if base_url.contains("localhost") || base_url.contains("127.0.0.1") { "Local transcription server is not running".to_string() - } else if !base_url.contains("hyprnote.com") { - format!("Cannot reach transcription server at {}", base_url) } else { - "Transcription service is temporarily unavailable".to_string() + format!("Cannot reach transcription server at {}", base_url) } } diff --git a/plugins/listener/src/actors/session/types.rs b/crates/listener-core/src/actors/session/types.rs similarity index 79% rename from plugins/listener/src/actors/session/types.rs rename to crates/listener-core/src/actors/session/types.rs index 9bf5df0b12..6602a0aee3 100644 --- a/plugins/listener/src/actors/session/types.rs +++ b/crates/listener-core/src/actors/session/types.rs @@ -1,13 +1,17 @@ use std::path::PathBuf; +use std::sync::Arc; use std::time::{Instant, SystemTime}; +use crate::ListenerRuntime; + pub const SESSION_SUPERVISOR_PREFIX: &str = "session_supervisor_"; pub fn session_span(session_id: &str) -> tracing::Span { tracing::info_span!("session", session_id = %session_id) } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, specta::Type)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "specta", derive(specta::Type))] pub struct SessionParams { pub session_id: String, pub languages: Vec, @@ -21,7 +25,7 @@ pub struct SessionParams { #[derive(Clone)] pub struct SessionContext { - pub app: tauri::AppHandle, + pub runtime: Arc, pub params: SessionParams, pub app_dir: PathBuf, pub started_at_instant: Instant, diff --git a/plugins/listener/src/actors/source/mod.rs b/crates/listener-core/src/actors/source/mod.rs similarity index 89% rename from plugins/listener/src/actors/source/mod.rs rename to crates/listener-core/src/actors/source/mod.rs index fc9fa50041..14664c8b51 100644 --- a/plugins/listener/src/actors/source/mod.rs +++ b/crates/listener-core/src/actors/source/mod.rs @@ -12,12 +12,11 @@ use tokio_util::sync::CancellationToken; use tracing::Instrument; use crate::{ - SessionErrorEvent, SessionProgressEvent, + ListenerRuntime, SessionErrorEvent, SessionProgressEvent, actors::session::session_span, actors::{AudioChunk, ChannelMode}, }; use hypr_audio::AudioInput; -use tauri_specta::Event; use pipeline::Pipeline; use stream::start_source_loop; @@ -36,12 +35,12 @@ pub enum SourceMsg { pub struct SourceArgs { pub mic_device: Option, pub onboarding: bool, - pub app: tauri::AppHandle, + pub runtime: Arc, pub session_id: String, } pub struct SourceState { - pub(super) app: tauri::AppHandle, + pub(super) runtime: Arc, pub(super) session_id: String, pub(super) mic_device: Option, pub(super) onboarding: bool, @@ -108,13 +107,10 @@ impl Actor for SourceActor { let span = session_span(&session_id); async { - if let Err(error) = (SessionProgressEvent::AudioInitializing { - session_id: session_id.clone(), - }) - .emit(&args.app) - { - tracing::error!(?error, "failed_to_emit_audio_initializing"); - } + args.runtime + .emit_progress(SessionProgressEvent::AudioInitializing { + session_id: session_id.clone(), + }); let device_watcher = DeviceChangeWatcher::spawn(myself.clone()); @@ -124,10 +120,10 @@ impl Actor for SourceActor { .or_else(|| Some(AudioInput::get_default_device_name())); tracing::info!(mic_device = ?mic_device); - let pipeline = Pipeline::new(args.app.clone(), args.session_id.clone()); + let pipeline = Pipeline::new(args.runtime.clone(), args.session_id.clone()); let mut st = SourceState { - app: args.app, + runtime: args.runtime, session_id: args.session_id, mic_device, onboarding: args.onboarding, @@ -180,13 +176,12 @@ impl Actor for SourceActor { } SourceMsg::StreamFailed(reason) => { tracing::error!(%reason, "source_stream_failed_stopping"); - let _ = (SessionErrorEvent::AudioError { + st.runtime.emit_error(SessionErrorEvent::AudioError { session_id: st.session_id.clone(), error: reason.clone(), device: st.mic_device.clone(), is_fatal: true, - }) - .emit(&st.app); + }); myself.stop(Some(reason)); } } diff --git a/plugins/listener/src/actors/source/pipeline.rs b/crates/listener-core/src/actors/source/pipeline.rs similarity index 90% rename from plugins/listener/src/actors/source/pipeline.rs rename to crates/listener-core/src/actors/source/pipeline.rs index 2f696aa25e..07f2676162 100644 --- a/plugins/listener/src/actors/source/pipeline.rs +++ b/crates/listener-core/src/actors/source/pipeline.rs @@ -5,10 +5,9 @@ use std::{ }; use ractor::{ActorRef, registry}; -use tauri_specta::Event; use crate::{ - SessionDataEvent, + ListenerRuntime, SessionDataEvent, actors::{AudioChunk, ChannelMode, ListenerActor, ListenerMsg, RecMsg, RecorderActor}, }; use hypr_aec::AEC; @@ -34,13 +33,13 @@ impl Pipeline { const BACKLOG_QUOTA_INCREMENT: f32 = 0.25; const MAX_BACKLOG_QUOTA: f32 = 2.0; - pub(super) fn new(app: tauri::AppHandle, session_id: String) -> Self { + pub(super) fn new(runtime: Arc, session_id: String) -> Self { Self { aec: AEC::new() .map_err(|e| tracing::warn!(error = ?e, "aec_init_failed")) .ok(), joiner: Joiner::new(), - amplitude: AmplitudeEmitter::new(app, session_id), + amplitude: AmplitudeEmitter::new(runtime, session_id), audio_buffer: AudioBuffer::new(MAX_BUFFER_CHUNKS), backlog_quota: 0.0, vad_mask: VadMask::default(), @@ -221,7 +220,7 @@ impl AudioBuffer { } struct AmplitudeEmitter { - app: tauri::AppHandle, + runtime: Arc, session_id: String, mic_smoothed: f32, spk_smoothed: f32, @@ -229,19 +228,13 @@ struct AmplitudeEmitter { } impl AmplitudeEmitter { - /// Smoothing factor for exponential moving average. - /// Lower values = more smoothing, slower response. - /// Higher values = less smoothing, faster response. const SMOOTHING_ALPHA: f32 = 0.7; - - /// Minimum dB level (silence floor) const MIN_DB: f32 = -60.0; - /// Maximum dB level (full scale) const MAX_DB: f32 = 0.0; - fn new(app: tauri::AppHandle, session_id: String) -> Self { + fn new(runtime: Arc, session_id: String) -> Self { Self { - app, + runtime, session_id, mic_smoothed: 0.0, spk_smoothed: 0.0, @@ -274,31 +267,23 @@ impl AmplitudeEmitter { return; } - // Convert smoothed [0, 1] to [0, 1000] for transmission as u16 let mic_level = (self.mic_smoothed * 1000.0) as u16; let spk_level = (self.spk_smoothed * 1000.0) as u16; - if let Err(error) = (SessionDataEvent::AudioAmplitude { + self.runtime.emit_data(SessionDataEvent::AudioAmplitude { session_id: self.session_id.clone(), mic: mic_level, speaker: spk_level, - }) - .emit(&self.app) - { - tracing::error!(error = ?error, "session_data_event_emit_failed"); - } + }); self.last_emit = Instant::now(); } - /// Computes amplitude from audio chunk using RMS and dB conversion. - /// Returns a normalized value in [0, 1] range. fn amplitude_from_chunk(chunk: &[f32]) -> f32 { if chunk.is_empty() { return 0.0; } - // Compute RMS (Root Mean Square) - represents perceived loudness better than peak let sum_squares: f32 = chunk.iter().filter(|x| x.is_finite()).map(|&x| x * x).sum(); let count = chunk.iter().filter(|x| x.is_finite()).count(); if count == 0 { @@ -306,14 +291,12 @@ impl AmplitudeEmitter { } let rms = (sum_squares / count as f32).sqrt(); - // Convert to dB (logarithmic scale matches human hearing) let db = if rms > 0.0 { 20.0 * rms.log10() } else { Self::MIN_DB }; - // Normalize to [0, 1] range ((db - Self::MIN_DB) / (Self::MAX_DB - Self::MIN_DB)).clamp(0.0, 1.0) } } diff --git a/plugins/listener/src/actors/source/stream.rs b/crates/listener-core/src/actors/source/stream.rs similarity index 95% rename from plugins/listener/src/actors/source/stream.rs rename to crates/listener-core/src/actors/source/stream.rs index 983478736e..4176d8c183 100644 --- a/plugins/listener/src/actors/source/stream.rs +++ b/crates/listener-core/src/actors/source/stream.rs @@ -13,7 +13,6 @@ use crate::{ }; use hypr_audio::AudioInput; use hypr_audio_utils::{ResampleExtDynamicNew, chunk_size_for_stt}; -use tauri_specta::Event; use super::{SourceMsg, SourceState}; @@ -32,14 +31,11 @@ pub(super) async fn start_source_loop( let result = start_streams(myself, st).await; - if result.is_ok() - && let Err(error) = (SessionProgressEvent::AudioReady { + if result.is_ok() { + st.runtime.emit_progress(SessionProgressEvent::AudioReady { session_id: st.session_id.clone(), device: st.mic_device.clone(), - }) - .emit(&st.app) - { - tracing::error!(?error, "failed_to_emit_audio_ready"); + }); } result @@ -109,7 +105,6 @@ async fn run_stream_loop(ctx: StreamContext, mode: ChannelMode) { None }; - // I believe this is not needed anymore since we do dynamic resampling with latest sample rate of device, but keep it just in case. if mode == ChannelMode::MicAndSpeaker { tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } diff --git a/crates/listener-core/src/events.rs b/crates/listener-core/src/events.rs new file mode 100644 index 0000000000..f37f59d9b7 --- /dev/null +++ b/crates/listener-core/src/events.rs @@ -0,0 +1,69 @@ +use owhisper_interface::stream::StreamResponse; + +use crate::DegradedError; + +#[derive(serde::Serialize, serde::Deserialize, Clone)] +#[serde(tag = "type")] +pub enum SessionLifecycleEvent { + #[serde(rename = "inactive")] + Inactive { + session_id: String, + error: Option, + }, + #[serde(rename = "active")] + Active { + session_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + }, + #[serde(rename = "finalizing")] + Finalizing { session_id: String }, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone)] +#[serde(tag = "type")] +pub enum SessionProgressEvent { + #[serde(rename = "audio_initializing")] + AudioInitializing { session_id: String }, + #[serde(rename = "audio_ready")] + AudioReady { + session_id: String, + device: Option, + }, + #[serde(rename = "connecting")] + Connecting { session_id: String }, + #[serde(rename = "connected")] + Connected { session_id: String, adapter: String }, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone)] +#[serde(tag = "type")] +pub enum SessionErrorEvent { + #[serde(rename = "audio_error")] + AudioError { + session_id: String, + error: String, + device: Option, + is_fatal: bool, + }, + #[serde(rename = "connection_error")] + ConnectionError { session_id: String, error: String }, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone)] +#[serde(tag = "type")] +pub enum SessionDataEvent { + #[serde(rename = "audio_amplitude")] + AudioAmplitude { + session_id: String, + mic: u16, + speaker: u16, + }, + #[serde(rename = "mic_muted")] + MicMuted { session_id: String, value: bool }, + #[serde(rename = "stream_response")] + StreamResponse { + session_id: String, + response: Box, + }, +} diff --git a/crates/listener-core/src/lib.rs b/crates/listener-core/src/lib.rs new file mode 100644 index 0000000000..bc0976ffe6 --- /dev/null +++ b/crates/listener-core/src/lib.rs @@ -0,0 +1,29 @@ +pub mod actors; +mod events; +mod runtime; + +pub use events::*; +pub use runtime::*; + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "specta", derive(specta::Type))] +#[serde(rename_all = "camelCase")] +pub enum State { + Active, + Inactive, + Finalizing, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "specta", derive(specta::Type))] +#[serde(tag = "type")] +pub enum DegradedError { + #[serde(rename = "authentication_failed")] + AuthenticationFailed { provider: String }, + #[serde(rename = "upstream_unavailable")] + UpstreamUnavailable { message: String }, + #[serde(rename = "connection_timeout")] + ConnectionTimeout, + #[serde(rename = "stream_error")] + StreamError { message: String }, +} diff --git a/crates/listener-core/src/runtime.rs b/crates/listener-core/src/runtime.rs new file mode 100644 index 0000000000..ce1d733159 --- /dev/null +++ b/crates/listener-core/src/runtime.rs @@ -0,0 +1,9 @@ +use crate::events::*; + +pub trait ListenerRuntime: Send + Sync + 'static { + fn sessions_dir(&self) -> Result; + fn emit_lifecycle(&self, event: SessionLifecycleEvent); + fn emit_progress(&self, event: SessionProgressEvent); + fn emit_error(&self, event: SessionErrorEvent); + fn emit_data(&self, event: SessionDataEvent); +} diff --git a/plugins/listener/Cargo.toml b/plugins/listener/Cargo.toml index 57ab253489..55f007736e 100644 --- a/plugins/listener/Cargo.toml +++ b/plugins/listener/Cargo.toml @@ -20,19 +20,10 @@ specta-typescript = { workspace = true } uuid = { workspace = true } [dependencies] -hypr-aec = { workspace = true } +hypr-listener-core = { workspace = true, features = ["specta"] } + hypr-audio = { workspace = true } -hypr-audio-device = { workspace = true } -hypr-audio-utils = { workspace = true } -hypr-device-monitor = { workspace = true } -hypr-host = { workspace = true } -hypr-intercept = { workspace = true } hypr-language = { workspace = true } -hypr-mac = { workspace = true } -hypr-vad-ext = { workspace = true } -tauri-plugin-fs-sync = { workspace = true } - -hypr-transcript = { workspace = true } owhisper-client = { workspace = true } owhisper-interface = { workspace = true } @@ -47,28 +38,10 @@ tauri = { workspace = true, features = ["specta", "test"] } specta = { workspace = true } tauri-specta = { workspace = true, features = ["derive", "typescript"] } -bytes = { workspace = true } -chrono = { workspace = true } -codes-iso-639 = { workspace = true } +ractor = { workspace = true, features = ["async-trait"] } + dirs = { workspace = true } -ordered-float = { version = "5", default-features = false } serde = { workspace = true } serde_json = { workspace = true } -strum = { workspace = true, features = ["derive"] } thiserror = { workspace = true } -url = { workspace = true } -uuid = { workspace = true, features = ["v4"] } - -hound = { workspace = true } -vorbis_rs = { workspace = true } - -hypr-supervisor = { workspace = true } -ractor = { workspace = true, features = ["async-trait"] } - -futures-util = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } -tokio-stream = { workspace = true } -tokio-util = { workspace = true } tracing = { workspace = true } - -sentry = { workspace = true } diff --git a/plugins/listener/src/commands.rs b/plugins/listener/src/commands.rs index f85b403d5a..70f9d6bd0a 100644 --- a/plugins/listener/src/commands.rs +++ b/plugins/listener/src/commands.rs @@ -1,7 +1,8 @@ use owhisper_client::AdapterKind; use std::str::FromStr; -use crate::{ListenerPluginExt, actors::SessionParams}; +use crate::ListenerPluginExt; +use hypr_listener_core::actors::SessionParams; #[tauri::command] #[specta::specta] @@ -62,7 +63,7 @@ pub async fn stop_session(app: tauri::AppHandle) -> Result #[specta::specta] pub async fn get_state( app: tauri::AppHandle, -) -> Result { +) -> Result { Ok(app.listener().get_state().await) } diff --git a/plugins/listener/src/error.rs b/plugins/listener/src/error.rs index e21fc1467e..23c5212b8a 100644 --- a/plugins/listener/src/error.rs +++ b/plugins/listener/src/error.rs @@ -1,4 +1,6 @@ -use serde::{Deserialize, Serialize, ser::Serializer}; +use serde::{Serialize, ser::Serializer}; + +pub use hypr_listener_core::DegradedError; pub type Result = std::result::Result; @@ -30,16 +32,3 @@ impl Serialize for Error { serializer.serialize_str(self.to_string().as_ref()) } } - -#[derive(Debug, Clone, Serialize, Deserialize, specta::Type)] -#[serde(tag = "type")] -pub enum DegradedError { - #[serde(rename = "authentication_failed")] - AuthenticationFailed { provider: String }, - #[serde(rename = "upstream_unavailable")] - UpstreamUnavailable { message: String }, - #[serde(rename = "connection_timeout")] - ConnectionTimeout, - #[serde(rename = "stream_error")] - StreamError { message: String }, -} diff --git a/plugins/listener/src/events.rs b/plugins/listener/src/events.rs index a8449f75eb..d4c28251b7 100644 --- a/plugins/listener/src/events.rs +++ b/plugins/listener/src/events.rs @@ -1,5 +1,7 @@ use owhisper_interface::stream::StreamResponse; +use hypr_listener_core as core; + #[macro_export] macro_rules! common_event_derives { ($item:item) => { @@ -42,7 +44,10 @@ common_event_derives! { #[serde(rename = "connecting")] Connecting { session_id: String }, #[serde(rename = "connected")] - Connected { session_id: String, adapter: String }, + Connected { + session_id: String, + adapter: String, + }, } } @@ -82,3 +87,89 @@ common_event_derives! { }, } } + +impl From for SessionLifecycleEvent { + fn from(event: core::SessionLifecycleEvent) -> Self { + match event { + core::SessionLifecycleEvent::Inactive { session_id, error } => { + SessionLifecycleEvent::Inactive { session_id, error } + } + core::SessionLifecycleEvent::Active { session_id, error } => { + SessionLifecycleEvent::Active { session_id, error } + } + core::SessionLifecycleEvent::Finalizing { session_id } => { + SessionLifecycleEvent::Finalizing { session_id } + } + } + } +} + +impl From for SessionProgressEvent { + fn from(event: core::SessionProgressEvent) -> Self { + match event { + core::SessionProgressEvent::AudioInitializing { session_id } => { + SessionProgressEvent::AudioInitializing { session_id } + } + core::SessionProgressEvent::AudioReady { session_id, device } => { + SessionProgressEvent::AudioReady { session_id, device } + } + core::SessionProgressEvent::Connecting { session_id } => { + SessionProgressEvent::Connecting { session_id } + } + core::SessionProgressEvent::Connected { + session_id, + adapter, + } => SessionProgressEvent::Connected { + session_id, + adapter, + }, + } + } +} + +impl From for SessionErrorEvent { + fn from(event: core::SessionErrorEvent) -> Self { + match event { + core::SessionErrorEvent::AudioError { + session_id, + error, + device, + is_fatal, + } => SessionErrorEvent::AudioError { + session_id, + error, + device, + is_fatal, + }, + core::SessionErrorEvent::ConnectionError { session_id, error } => { + SessionErrorEvent::ConnectionError { session_id, error } + } + } + } +} + +impl From for SessionDataEvent { + fn from(event: core::SessionDataEvent) -> Self { + match event { + core::SessionDataEvent::AudioAmplitude { + session_id, + mic, + speaker, + } => SessionDataEvent::AudioAmplitude { + session_id, + mic, + speaker, + }, + core::SessionDataEvent::MicMuted { session_id, value } => { + SessionDataEvent::MicMuted { session_id, value } + } + core::SessionDataEvent::StreamResponse { + session_id, + response, + } => SessionDataEvent::StreamResponse { + session_id, + response, + }, + } + } +} diff --git a/plugins/listener/src/ext.rs b/plugins/listener/src/ext.rs index 4e614cefe8..6bbbc1b5e7 100644 --- a/plugins/listener/src/ext.rs +++ b/plugins/listener/src/ext.rs @@ -1,6 +1,6 @@ use ractor::{ActorRef, call_t, registry}; -use crate::actors::{RootActor, RootMsg, SessionParams, SourceActor, SourceMsg}; +use hypr_listener_core::actors::{RootActor, RootMsg, SessionParams, SourceActor, SourceMsg}; pub struct Listener<'a, R: tauri::Runtime, M: tauri::Manager> { #[allow(unused)] @@ -28,15 +28,15 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Listener<'a, R, M> { } #[tracing::instrument(skip_all)] - pub async fn get_state(&self) -> crate::State { + pub async fn get_state(&self) -> hypr_listener_core::State { if let Some(cell) = registry::where_is(RootActor::name()) { let actor: ActorRef = cell.into(); match call_t!(actor, RootMsg::GetState, 100) { Ok(fsm_state) => fsm_state, - Err(_) => crate::State::Inactive, + Err(_) => hypr_listener_core::State::Inactive, } } else { - crate::State::Inactive + hypr_listener_core::State::Inactive } } diff --git a/plugins/listener/src/lib.rs b/plugins/listener/src/lib.rs index d056e61c0d..c7fcfb506f 100644 --- a/plugins/listener/src/lib.rs +++ b/plugins/listener/src/lib.rs @@ -1,29 +1,24 @@ +use std::sync::Arc; + use ractor::Actor; use tauri::Manager; -mod actors; mod commands; mod error; mod events; mod ext; +mod runtime; pub use error::{DegradedError, Error, Result}; pub use events::*; pub use ext::*; +pub use hypr_listener_core::State; -use actors::{RootActor, RootArgs}; +use hypr_listener_core::actors::{RootActor, RootArgs}; +use runtime::TauriRuntime; const PLUGIN_NAME: &str = "listener"; -#[derive(Debug, Clone, PartialEq, Eq, specta::Type, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum State { - Active, - Inactive, - // Transitioning from Active to Inactive. For ex, waiting for `from_finalize=true` from upstream provider. - Finalizing, -} - fn make_specta_builder() -> tauri_specta::Builder { tauri_specta::Builder::::new() .plugin_name(PLUGIN_NAME) @@ -58,15 +53,15 @@ pub fn init() -> tauri::plugin::TauriPlugin { let app_handle = app.app_handle().clone(); + let runtime = Arc::new(TauriRuntime { + app: app_handle.clone(), + }); + tauri::async_runtime::spawn(async move { - Actor::spawn( - Some(RootActor::name()), - RootActor, - RootArgs { app: app_handle }, - ) - .await - .map(|_| tracing::info!("root_actor_spawned")) - .map_err(|e| tracing::error!(?e, "failed_to_spawn_root_actor")) + Actor::spawn(Some(RootActor::name()), RootActor, RootArgs { runtime }) + .await + .map(|_| tracing::info!("root_actor_spawned")) + .map_err(|e| tracing::error!(?e, "failed_to_spawn_root_actor")) }); Ok(()) diff --git a/plugins/listener/src/runtime.rs b/plugins/listener/src/runtime.rs new file mode 100644 index 0000000000..4d8ba5f50c --- /dev/null +++ b/plugins/listener/src/runtime.rs @@ -0,0 +1,56 @@ +use hypr_listener_core::ListenerRuntime; +use tauri_plugin_settings::SettingsPluginExt; +use tauri_specta::Event; + +pub struct TauriRuntime { + pub app: tauri::AppHandle, +} + +impl ListenerRuntime for TauriRuntime { + fn sessions_dir(&self) -> Result { + self.app + .settings() + .cached_vault_base() + .map(|base| base.join("sessions").into_std_path_buf()) + .map_err(|e| e.to_string()) + } + + fn emit_lifecycle(&self, event: hypr_listener_core::SessionLifecycleEvent) { + use tauri_plugin_tray::TrayPluginExt; + match &event { + hypr_listener_core::SessionLifecycleEvent::Active { .. } => { + let _ = self.app.tray().set_start_disabled(true); + } + hypr_listener_core::SessionLifecycleEvent::Inactive { .. } => { + let _ = self.app.tray().set_start_disabled(false); + } + hypr_listener_core::SessionLifecycleEvent::Finalizing { .. } => {} + } + + let plugin_event: crate::events::SessionLifecycleEvent = event.into(); + if let Err(error) = plugin_event.emit(&self.app) { + tracing::error!(?error, "failed_to_emit_lifecycle_event"); + } + } + + fn emit_progress(&self, event: hypr_listener_core::SessionProgressEvent) { + let plugin_event: crate::events::SessionProgressEvent = event.into(); + if let Err(error) = plugin_event.emit(&self.app) { + tracing::error!(?error, "failed_to_emit_progress_event"); + } + } + + fn emit_error(&self, event: hypr_listener_core::SessionErrorEvent) { + let plugin_event: crate::events::SessionErrorEvent = event.into(); + if let Err(error) = plugin_event.emit(&self.app) { + tracing::error!(?error, "failed_to_emit_error_event"); + } + } + + fn emit_data(&self, event: hypr_listener_core::SessionDataEvent) { + let plugin_event: crate::events::SessionDataEvent = event.into(); + if let Err(error) = plugin_event.emit(&self.app) { + tracing::error!(?error, "failed_to_emit_data_event"); + } + } +}