From bf7059c9bf32471325bddea95d332c230e134ebb Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Wed, 24 Dec 2025 15:00:46 +0900 Subject: [PATCH] split session events to multiple enums --- .../components/main/body/sessions/index.tsx | 3 +- .../main/body/sessions/note-input/header.tsx | 4 +- .../note-input/transcript/shared/index.tsx | 2 +- .../body/sessions/outer-header/listen.tsx | 2 +- .../outer-header/overflow/listening.tsx | 2 +- .../components/main/body/sessions/shared.tsx | 5 +- apps/desktop/src/hooks/useAutoEnhance.ts | 3 +- apps/desktop/src/hooks/useEnhancedNotes.ts | 2 +- apps/desktop/src/routes/app/main/_layout.tsx | 2 +- .../store/zustand/listener/general.test.ts | 2 +- .../src/store/zustand/listener/general.ts | 182 ++++++++++++++---- plugins/listener/js/bindings.gen.ts | 15 +- plugins/listener/src/actors/listener.rs | 64 +++++- plugins/listener/src/actors/source.rs | 37 +++- plugins/listener/src/events.rs | 52 ++++- plugins/listener/src/ext.rs | 11 +- plugins/listener/src/fsm.rs | 4 +- plugins/listener/src/lib.rs | 9 +- 18 files changed, 321 insertions(+), 80 deletions(-) diff --git a/apps/desktop/src/components/main/body/sessions/index.tsx b/apps/desktop/src/components/main/body/sessions/index.tsx index ead6d0d606..f5002c9f5b 100644 --- a/apps/desktop/src/components/main/body/sessions/index.tsx +++ b/apps/desktop/src/components/main/body/sessions/index.tsx @@ -35,8 +35,7 @@ export const TabItemNote: TabItem> = ({ main.STORE_ID, ); const sessionMode = useListener((state) => state.getSessionMode(tab.id)); - const isActive = - sessionMode === "running_active" || sessionMode === "finalizing"; + const isActive = sessionMode === "active" || sessionMode === "finalizing"; return ( state.getSessionMode(sessionId)); const isBatchProcessing = sessionMode === "running_batch"; - const isLiveProcessing = sessionMode === "running_active"; + const isLiveProcessing = sessionMode === "active"; if (editorTabs.length === 1 && editorTabs[0].type === "raw") { return null; @@ -408,7 +408,7 @@ export function useEditorTabs({ main.STORE_ID, ); - if (sessionMode === "running_active" || sessionMode === "running_batch") { + if (sessionMode === "active" || sessionMode === "running_batch") { return [{ type: "raw" }, { type: "transcript" }]; } diff --git a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx index 586e66c3c3..c777693fba 100644 --- a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx +++ b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx @@ -29,7 +29,7 @@ export function TranscriptContainer({ const sessionMode = useListener((state) => state.getSessionMode(sessionId)); const currentActive = - sessionMode === "running_active" || sessionMode === "finalizing"; + sessionMode === "active" || sessionMode === "finalizing"; const editable = sessionMode === "inactive" && Object.keys(operations ?? {}).length > 0; diff --git a/apps/desktop/src/components/main/body/sessions/outer-header/listen.tsx b/apps/desktop/src/components/main/body/sessions/outer-header/listen.tsx index 7a1a77a5c1..946fc0ff5e 100644 --- a/apps/desktop/src/components/main/body/sessions/outer-header/listen.tsx +++ b/apps/desktop/src/components/main/body/sessions/outer-header/listen.tsx @@ -229,7 +229,7 @@ function InMeetingIndicator({ sessionId }: { sessionId: string }) { muted: state.live.muted, })); - const active = mode === "running_active" || mode === "finalizing"; + const active = mode === "active" || mode === "finalizing"; const finalizing = mode === "finalizing"; if (!active) { diff --git a/apps/desktop/src/components/main/body/sessions/outer-header/overflow/listening.tsx b/apps/desktop/src/components/main/body/sessions/outer-header/overflow/listening.tsx index 36bd9ec623..6e443b80c3 100644 --- a/apps/desktop/src/components/main/body/sessions/outer-header/overflow/listening.tsx +++ b/apps/desktop/src/components/main/body/sessions/outer-header/overflow/listening.tsx @@ -10,7 +10,7 @@ export function Listening({ sessionId }: { sessionId: string }) { mode: state.getSessionMode(sessionId), stop: state.stop, })); - const isListening = mode === "running_active" || mode === "finalizing"; + const isListening = mode === "active" || mode === "finalizing"; const isFinalizing = mode === "finalizing"; const isBatching = mode === "running_batch"; const startListening = useStartListening(sessionId); diff --git a/apps/desktop/src/components/main/body/sessions/shared.tsx b/apps/desktop/src/components/main/body/sessions/shared.tsx index 152d243474..7d928c542c 100644 --- a/apps/desktop/src/components/main/body/sessions/shared.tsx +++ b/apps/desktop/src/components/main/body/sessions/shared.tsx @@ -26,7 +26,7 @@ export function useCurrentNoteTab( ): EditorView { const sessionMode = useListener((state) => state.getSessionMode(tab.id)); const isListenerActive = - sessionMode === "running_active" || sessionMode === "finalizing"; + sessionMode === "active" || sessionMode === "finalizing"; const enhancedNoteIds = main.UI.useSliceRowIds( main.INDEXES.enhancedNotesBySession, @@ -68,8 +68,7 @@ export function RecordingIcon({ disabled }: { disabled?: boolean }) { export function useListenButtonState(sessionId: string) { const sessionMode = useListener((state) => state.getSessionMode(sessionId)); - const active = - sessionMode === "running_active" || sessionMode === "finalizing"; + const active = sessionMode === "active" || sessionMode === "finalizing"; const batching = sessionMode === "running_batch"; const taskId = createTaskId(sessionId, "enhance"); diff --git a/apps/desktop/src/hooks/useAutoEnhance.ts b/apps/desktop/src/hooks/useAutoEnhance.ts index 5426531a6d..45e5978982 100644 --- a/apps/desktop/src/hooks/useAutoEnhance.ts +++ b/apps/desktop/src/hooks/useAutoEnhance.ts @@ -91,8 +91,7 @@ export function useAutoEnhance(tab: Extract) { useEffect(() => { const listenerJustStopped = - prevListenerStatus === "running_active" && - listenerStatus !== "running_active"; + prevListenerStatus === "active" && listenerStatus !== "active"; if (listenerJustStopped) { createAndStartEnhance(); diff --git a/apps/desktop/src/hooks/useEnhancedNotes.ts b/apps/desktop/src/hooks/useEnhancedNotes.ts index aed5cb1b37..784ddd7050 100644 --- a/apps/desktop/src/hooks/useEnhancedNotes.ts +++ b/apps/desktop/src/hooks/useEnhancedNotes.ts @@ -148,7 +148,7 @@ export function useEnsureDefaultSummary(sessionId: string) { useEffect(() => { if ( !hasTranscript || - sessionMode === "running_active" || + sessionMode === "active" || sessionMode === "running_batch" || sessionMode === "finalizing" || (enhancedNoteIds && enhancedNoteIds.length > 0) diff --git a/apps/desktop/src/routes/app/main/_layout.tsx b/apps/desktop/src/routes/app/main/_layout.tsx index 16d868753d..bbfb546961 100644 --- a/apps/desktop/src/routes/app/main/_layout.tsx +++ b/apps/desktop/src/routes/app/main/_layout.tsx @@ -48,7 +48,7 @@ function Component() { return true; } const mode = getSessionMode(tab.id); - return mode !== "running_active" && mode !== "finalizing"; + return mode !== "active" && mode !== "finalizing"; }); }, [registerCanClose, getSessionMode]); diff --git a/apps/desktop/src/store/zustand/listener/general.test.ts b/apps/desktop/src/store/zustand/listener/general.test.ts index 1dfdbd45ca..5f3ebbbd1d 100644 --- a/apps/desktop/src/store/zustand/listener/general.test.ts +++ b/apps/desktop/src/store/zustand/listener/general.test.ts @@ -16,7 +16,7 @@ describe("General Listener Slice", () => { expect(state.live.loading).toBe(false); expect(state.live.amplitude).toEqual({ mic: 0, speaker: 0 }); expect(state.live.seconds).toBe(0); - expect(state.live.sessionEventUnlisten).toBeUndefined(); + expect(state.live.eventUnlisteners).toBeUndefined(); expect(state.live.intervalId).toBeUndefined(); expect(state.batch).toEqual({}); }); diff --git a/apps/desktop/src/store/zustand/listener/general.ts b/apps/desktop/src/store/zustand/listener/general.ts index c24f9cf0a8..33c1789bff 100644 --- a/apps/desktop/src/store/zustand/listener/general.ts +++ b/apps/desktop/src/store/zustand/listener/general.ts @@ -8,8 +8,11 @@ import { commands as hooksCommands } from "@hypr/plugin-hooks"; import { commands as listenerCommands, events as listenerEvents, - type SessionEvent, + type SessionDataEvent, + type SessionErrorEvent, + type SessionLifecycleEvent, type SessionParams, + type SessionProgressEvent, type StreamResponse, } from "@hypr/plugin-listener"; import { @@ -23,27 +26,28 @@ import { fromResult } from "../../../effect"; import type { BatchActions, BatchState } from "./batch"; import type { HandlePersistCallback, TranscriptActions } from "./transcript"; -type LiveSessionStatus = Extract< - SessionEvent["type"], - "inactive" | "running_active" | "finalizing" ->; +type LiveSessionStatus = "inactive" | "active" | "finalizing"; export type SessionMode = LiveSessionStatus | "running_batch"; -const hasSessionId = ( - payload: SessionEvent, -): payload is SessionEvent & { session_id: string } => - "session_id" in payload && typeof payload.session_id === "string"; +export type LoadingPhase = + | "idle" + | "audio_initializing" + | "audio_ready" + | "connecting" + | "connected"; export type GeneralState = { live: { - sessionEventUnlisten?: () => void; + eventUnlisteners?: (() => void)[]; loading: boolean; + loadingPhase: LoadingPhase; status: LiveSessionStatus; amplitude: { mic: number; speaker: number }; seconds: number; intervalId?: NodeJS.Timeout; sessionId: string | null; muted: boolean; + lastError: string | null; }; }; @@ -65,19 +69,43 @@ const initialState: GeneralState = { live: { status: "inactive", loading: false, + loadingPhase: "idle", amplitude: { mic: 0, speaker: 0 }, seconds: 0, sessionId: null, muted: false, + lastError: null, }, }; -const listenToSessionEvents = ( - onEvent: (payload: SessionEvent) => void, -): Effect.Effect<() => void, unknown> => +type EventListeners = { + lifecycle: (payload: SessionLifecycleEvent) => void; + progress: (payload: SessionProgressEvent) => void; + error: (payload: SessionErrorEvent) => void; + data: (payload: SessionDataEvent) => void; +}; + +const listenToAllSessionEvents = ( + handlers: EventListeners, +): Effect.Effect<(() => void)[], unknown> => Effect.tryPromise({ - try: () => - listenerEvents.sessionEvent.listen(({ payload }) => onEvent(payload)), + try: async () => { + const unlisteners = await Promise.all([ + listenerEvents.sessionLifecycleEvent.listen(({ payload }) => + handlers.lifecycle(payload), + ), + listenerEvents.sessionProgressEvent.listen(({ payload }) => + handlers.progress(payload), + ), + listenerEvents.sessionErrorEvent.listen(({ payload }) => + handlers.error(payload), + ), + listenerEvents.sessionDataEvent.listen(({ payload }) => + handlers.data(payload), + ), + ]); + return unlisteners; + }, catch: (error) => error, }); @@ -123,21 +151,12 @@ export const createGeneralSlice = < get().setTranscriptPersist(options.handlePersist); } - const handleSessionEvent = (payload: SessionEvent) => { - if (!hasSessionId(payload) || payload.session_id !== targetSessionId) { + const handleLifecycleEvent = (payload: SessionLifecycleEvent) => { + if (payload.session_id !== targetSessionId) { return; } - if (payload.type === "audioAmplitude") { - set((state) => - mutate(state, (draft) => { - draft.live.amplitude = { - mic: payload.mic, - speaker: payload.speaker, - }; - }), - ); - } else if (payload.type === "running_active") { + if (payload.type === "active") { const currentState = get(); if (currentState.live.intervalId) { clearInterval(currentState.live.intervalId); @@ -153,8 +172,9 @@ export const createGeneralSlice = < set((state) => mutate(state, (draft) => { - draft.live.status = "running_active"; + draft.live.status = "active"; draft.live.loading = false; + draft.live.loadingPhase = "idle"; draft.live.seconds = 0; draft.live.intervalId = intervalId; draft.live.sessionId = targetSessionId; @@ -173,32 +193,118 @@ export const createGeneralSlice = < ); } else if (payload.type === "inactive") { const currentState = get(); - if (currentState.live.sessionEventUnlisten) { - currentState.live.sessionEventUnlisten(); + if (currentState.live.eventUnlisteners) { + currentState.live.eventUnlisteners.forEach((fn) => fn()); } set((state) => mutate(state, (draft) => { draft.live.status = "inactive"; draft.live.loading = false; + draft.live.loadingPhase = "idle"; draft.live.sessionId = null; - draft.live.sessionEventUnlisten = undefined; + draft.live.eventUnlisteners = undefined; + draft.live.lastError = null; }), ); get().resetTranscript(); - } else if (payload.type === "streamResponse") { + } + }; + + const handleProgressEvent = (payload: SessionProgressEvent) => { + if (payload.session_id !== targetSessionId) { + return; + } + + if (payload.type === "audio_initializing") { + set((state) => + mutate(state, (draft) => { + draft.live.loadingPhase = "audio_initializing"; + draft.live.lastError = null; + }), + ); + } else if (payload.type === "audio_ready") { + set((state) => + mutate(state, (draft) => { + draft.live.loadingPhase = "audio_ready"; + }), + ); + } else if (payload.type === "connecting") { + set((state) => + mutate(state, (draft) => { + draft.live.loadingPhase = "connecting"; + }), + ); + } else if (payload.type === "connected") { + set((state) => + mutate(state, (draft) => { + draft.live.loadingPhase = "connected"; + }), + ); + } + }; + + const handleErrorEvent = (payload: SessionErrorEvent) => { + if (payload.session_id !== targetSessionId) { + return; + } + + if (payload.type === "audio_error") { + set((state) => + mutate(state, (draft) => { + draft.live.lastError = payload.error; + if (payload.is_fatal) { + draft.live.loading = false; + } + }), + ); + } else if (payload.type === "connection_error") { + set((state) => + mutate(state, (draft) => { + draft.live.lastError = payload.error; + }), + ); + } + }; + + const handleDataEvent = (payload: SessionDataEvent) => { + if (payload.session_id !== targetSessionId) { + return; + } + + if (payload.type === "audio_amplitude") { + set((state) => + mutate(state, (draft) => { + draft.live.amplitude = { + mic: payload.mic, + speaker: payload.speaker, + }; + }), + ); + } else if (payload.type === "stream_response") { const response = payload.response; get().handleTranscriptResponse(response as unknown as StreamResponse); + } else if (payload.type === "mic_muted") { + set((state) => + mutate(state, (draft) => { + draft.live.muted = payload.value; + }), + ); } }; const program = Effect.gen(function* () { - const unlisten = yield* listenToSessionEvents(handleSessionEvent); + const unlisteners = yield* listenToAllSessionEvents({ + lifecycle: handleLifecycleEvent, + progress: handleProgressEvent, + error: handleErrorEvent, + data: handleDataEvent, + }); set((state) => mutate(state, (draft) => { - draft.live.sessionEventUnlisten = unlisten; + draft.live.eventUnlisteners = unlisteners; }), ); @@ -239,7 +345,7 @@ export const createGeneralSlice = < yield* startSessionEffect(params); set((state) => mutate(state, (draft) => { - draft.live.status = "running_active"; + draft.live.status = "active"; draft.live.loading = false; draft.live.sessionId = targetSessionId; }), @@ -257,13 +363,15 @@ export const createGeneralSlice = < draft.live.intervalId = undefined; } - draft.live.sessionEventUnlisten = undefined; + draft.live.eventUnlisteners = undefined; draft.live.loading = false; + draft.live.loadingPhase = "idle"; draft.live.status = "inactive"; draft.live.amplitude = { mic: 0, speaker: 0 }; draft.live.seconds = 0; draft.live.sessionId = null; draft.live.muted = initialState.live.muted; + draft.live.lastError = null; }), ); }, @@ -331,7 +439,7 @@ export const createGeneralSlice = < } const mode = get().getSessionMode(sessionId); - if (mode === "running_active" || mode === "finalizing") { + if (mode === "active" || mode === "finalizing") { console.warn( `[listener] cannot start batch processing while session ${sessionId} is live`, ); diff --git a/plugins/listener/js/bindings.gen.ts b/plugins/listener/js/bindings.gen.ts index fa09b18f41..00e0cdb70b 100644 --- a/plugins/listener/js/bindings.gen.ts +++ b/plugins/listener/js/bindings.gen.ts @@ -68,9 +68,15 @@ async getState() : Promise> { export const events = __makeEvents__<{ -sessionEvent: SessionEvent +sessionDataEvent: SessionDataEvent, +sessionErrorEvent: SessionErrorEvent, +sessionLifecycleEvent: SessionLifecycleEvent, +sessionProgressEvent: SessionProgressEvent }>({ -sessionEvent: "plugin:listener:session-event" +sessionDataEvent: "plugin:listener:session-data-event", +sessionErrorEvent: "plugin:listener:session-error-event", +sessionLifecycleEvent: "plugin:listener:session-lifecycle-event", +sessionProgressEvent: "plugin:listener:session-progress-event" }) /** user-defined constants **/ @@ -79,8 +85,11 @@ sessionEvent: "plugin:listener:session-event" /** user-defined types **/ -export type SessionEvent = { type: "inactive"; session_id: string } | { type: "running_active"; session_id: string } | { type: "finalizing"; session_id: string } | { type: "audioAmplitude"; session_id: string; mic: number; speaker: number } | { type: "micMuted"; session_id: string; value: boolean } | { type: "streamResponse"; session_id: string; response: StreamResponse } | { type: "ExitRequested" } +export type SessionDataEvent = { type: "audio_amplitude"; session_id: string; mic: number; speaker: number } | { type: "mic_muted"; session_id: string; value: boolean } | { type: "stream_response"; session_id: string; response: StreamResponse } +export type SessionErrorEvent = { type: "audio_error"; session_id: string; error: string; device: string | null; is_fatal: boolean } | { type: "connection_error"; session_id: string; error: string; is_retryable: boolean } +export type SessionLifecycleEvent = { type: "inactive"; session_id: string } | { type: "active"; session_id: string } | { type: "finalizing"; session_id: string } export type SessionParams = { session_id: string; languages: string[]; onboarding: boolean; record_enabled: boolean; model: string; base_url: string; api_key: string; keywords: string[] } +export type SessionProgressEvent = { type: "audio_initializing"; session_id: string } | { type: "audio_ready"; session_id: string } | { type: "connecting"; session_id: string } | { type: "connected"; session_id: string; adapter: string } export type StreamAlternatives = { transcript: string; words: StreamWord[]; confidence: number; languages?: string[] } export type StreamChannel = { alternatives: StreamAlternatives[] } export type StreamExtra = { started_unix_millis: number } diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index 14a897b1d2..6842dec621 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -13,7 +13,7 @@ use owhisper_interface::{ControlMessage, MixedMessage}; use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef, SupervisionEvent}; use tauri_specta::Event; -use crate::SessionEvent; +use crate::{SessionDataEvent, SessionErrorEvent, SessionProgressEvent}; const LISTEN_STREAM_TIMEOUT: Duration = Duration::from_secs(15 * 60); const LISTEN_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); @@ -88,7 +88,24 @@ impl Actor for ListenerActor { myself: ActorRef, args: Self::Arguments, ) -> Result { - let (tx, rx_task, shutdown_tx) = spawn_rx_task(args.clone(), myself).await?; + if let Err(error) = (SessionProgressEvent::Connecting { + session_id: args.session_id.clone(), + }) + .emit(&args.app) + { + tracing::error!(?error, "failed_to_emit_connecting"); + } + + let (tx, rx_task, shutdown_tx, adapter_name) = spawn_rx_task(args.clone(), myself).await?; + + if let Err(error) = (SessionProgressEvent::Connected { + session_id: args.session_id.clone(), + adapter: adapter_name, + }) + .emit(&args.app) + { + tracing::error!(?error, "failed_to_emit_connected"); + } let state = ListenerState { args, @@ -142,7 +159,7 @@ impl Actor for ListenerActor { crate::actors::ChannelMode::MicAndSpeaker => {} } - if let Err(error) = (SessionEvent::StreamResponse { + if let Err(error) = (SessionDataEvent::StreamResponse { session_id: state.args.session_id.clone(), response: Box::new(response), }) @@ -197,13 +214,24 @@ async fn spawn_rx_task( ChannelSender, tokio::task::JoinHandle<()>, tokio::sync::oneshot::Sender<()>, + String, // adapter name ), ActorProcessingErr, > { let adapter_kind = AdapterKind::from_url_and_languages(&args.base_url, &args.languages); let is_dual = matches!(args.mode, crate::actors::ChannelMode::MicAndSpeaker); - match (adapter_kind, is_dual) { + let adapter_name = match adapter_kind { + AdapterKind::Argmax => "Argmax", + AdapterKind::Soniox => "Soniox", + AdapterKind::Fireworks => "Fireworks", + AdapterKind::Deepgram => "Deepgram", + AdapterKind::AssemblyAI => "AssemblyAI", + AdapterKind::OpenAI => "OpenAI", + AdapterKind::Gladia => "Gladia", + }; + + let result = match (adapter_kind, is_dual) { (AdapterKind::Argmax, false) => { spawn_rx_task_single_with_adapter::(args, myself).await } @@ -246,7 +274,9 @@ async fn spawn_rx_task( (AdapterKind::Gladia, true) => { spawn_rx_task_dual_with_adapter::(args, myself).await } - } + }?; + + Ok((result.0, result.1, result.2, adapter_name.to_string())) } fn build_listen_params(args: &ListenerArgs) -> owhisper_interface::ListenParams { @@ -311,10 +341,22 @@ async fn spawn_rx_task_single_with_adapter( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), "listen_ws_connect_timeout(single)" ); + let _ = (SessionErrorEvent::ConnectionError { + session_id: args.session_id.clone(), + error: "listen_ws_connect_timeout".to_string(), + is_retryable: true, + }) + .emit(&args.app); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { tracing::error!(error = ?e, "listen_ws_connect_failed(single)"); + let _ = (SessionErrorEvent::ConnectionError { + session_id: args.session_id.clone(), + error: format!("listen_ws_connect_failed: {:?}", e), + is_retryable: true, + }) + .emit(&args.app); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, @@ -371,10 +413,22 @@ async fn spawn_rx_task_dual_with_adapter( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), "listen_ws_connect_timeout(dual)" ); + let _ = (SessionErrorEvent::ConnectionError { + session_id: args.session_id.clone(), + error: "listen_ws_connect_timeout".to_string(), + is_retryable: true, + }) + .emit(&args.app); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { tracing::error!(error = ?e, "listen_ws_connect_failed(dual)"); + let _ = (SessionErrorEvent::ConnectionError { + session_id: args.session_id.clone(), + error: format!("listen_ws_connect_failed: {:?}", e), + is_retryable: true, + }) + .emit(&args.app); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, diff --git a/plugins/listener/src/actors/source.rs b/plugins/listener/src/actors/source.rs index 914e4091f1..9384553fce 100644 --- a/plugins/listener/src/actors/source.rs +++ b/plugins/listener/src/actors/source.rs @@ -13,7 +13,7 @@ use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort, regis use tokio_util::sync::CancellationToken; use crate::{ - SessionEvent, + SessionDataEvent, SessionErrorEvent, SessionProgressEvent, actors::{AudioChunk, ChannelMode, ListenerActor, ListenerMsg, RecMsg, RecorderActor}, }; use hypr_aec::AEC; @@ -43,6 +43,7 @@ pub struct SourceArgs { } pub struct SourceState { + app: tauri::AppHandle, session_id: String, mic_device: Option, onboarding: bool, @@ -123,6 +124,14 @@ impl Actor for SourceActor { myself: ActorRef, args: Self::Arguments, ) -> Result { + if let Err(error) = (SessionProgressEvent::AudioInitializing { + session_id: args.session_id.clone(), + }) + .emit(&args.app) + { + tracing::error!(?error, "failed_to_emit_audio_initializing"); + } + let device_watcher = DeviceChangeWatcher::spawn(myself.clone()); let silence_stream_tx = Some(hypr_audio::AudioOutput::silence()); @@ -134,6 +143,7 @@ impl Actor for SourceActor { let pipeline = Pipeline::new(args.app.clone(), args.session_id.clone()); let mut st = SourceState { + app: args.app, session_id: args.session_id, mic_device, onboarding: args.onboarding, @@ -185,6 +195,13 @@ impl Actor for SourceActor { } SourceMsg::StreamFailed(reason) => { tracing::error!(%reason, "source_stream_failed_stopping"); + let _ = (SessionErrorEvent::AudioError { + session_id: st.session_id.clone(), + error: reason.clone(), + device: st.mic_device.clone(), + is_fatal: true, + }) + .emit(&st.app); myself.stop(Some(reason)); } } @@ -221,11 +238,23 @@ async fn start_source_loop( st.pipeline.reset(); - match new_mode { + let result = match new_mode { ChannelMode::MicOnly => start_source_loop_mic_only(myself, st).await, ChannelMode::SpeakerOnly => start_source_loop_speaker_only(myself, st).await, ChannelMode::MicAndSpeaker => start_source_loop_mic_and_speaker(myself, st).await, + }; + + if result.is_ok() { + if let Err(error) = (SessionProgressEvent::AudioReady { + session_id: st.session_id.clone(), + }) + .emit(&st.app) + { + tracing::error!(?error, "failed_to_emit_audio_ready"); + } } + + result } async fn start_source_loop_mic_only( @@ -728,14 +757,14 @@ impl AmplitudeEmitter { let mic_level = Self::amplitude_from_chunk(mic.as_ref()); let spk_level = Self::amplitude_from_chunk(spk.as_ref()); - if let Err(error) = (SessionEvent::AudioAmplitude { + if let Err(error) = (SessionDataEvent::AudioAmplitude { session_id: self.session_id.clone(), mic: mic_level, speaker: spk_level, }) .emit(&self.app) { - tracing::error!(error = ?error, "session_event_emit_failed"); + tracing::error!(error = ?error, "session_data_event_emit_failed"); } self.last_emit = Instant::now(); diff --git a/plugins/listener/src/events.rs b/plugins/listener/src/events.rs index 4a1cdab6cf..9b8b9b62e5 100644 --- a/plugins/listener/src/events.rs +++ b/plugins/listener/src/events.rs @@ -10,26 +10,64 @@ macro_rules! common_event_derives { common_event_derives! { #[serde(tag = "type")] - pub enum SessionEvent { + pub enum SessionLifecycleEvent { #[serde(rename = "inactive")] Inactive { session_id: String }, - #[serde(rename = "running_active")] - RunningActive { session_id: String }, + #[serde(rename = "active")] + Active { session_id: String }, #[serde(rename = "finalizing")] Finalizing { session_id: String }, - #[serde(rename = "audioAmplitude")] + } +} + +common_event_derives! { + #[serde(tag = "type")] + pub enum SessionProgressEvent { + #[serde(rename = "audio_initializing")] + AudioInitializing { session_id: String }, + #[serde(rename = "audio_ready")] + AudioReady { session_id: String }, + #[serde(rename = "connecting")] + Connecting { session_id: String }, + #[serde(rename = "connected")] + Connected { session_id: String, adapter: String }, + } +} + +common_event_derives! { + #[serde(tag = "type")] + pub enum SessionErrorEvent { + #[serde(rename = "audio_error")] + AudioError { + session_id: String, + error: String, + device: Option, + is_fatal: bool, + }, + #[serde(rename = "connection_error")] + ConnectionError { + session_id: String, + error: String, + is_retryable: bool, + }, + } +} + +common_event_derives! { + #[serde(tag = "type")] + pub enum SessionDataEvent { + #[serde(rename = "audio_amplitude")] AudioAmplitude { session_id: String, mic: u16, speaker: u16, }, - #[serde(rename = "micMuted")] + #[serde(rename = "mic_muted")] MicMuted { session_id: String, value: bool }, - #[serde(rename = "streamResponse")] + #[serde(rename = "stream_response")] StreamResponse { session_id: String, response: Box, }, - ExitRequested } } diff --git a/plugins/listener/src/ext.rs b/plugins/listener/src/ext.rs index a3c1f5c20c..b1519a8bd4 100644 --- a/plugins/listener/src/ext.rs +++ b/plugins/listener/src/ext.rs @@ -5,7 +5,7 @@ use tauri::{Manager, path::BaseDirectory}; use tauri_specta::Event; use crate::{ - SessionEvent, + SessionLifecycleEvent, actors::{SessionContext, SessionParams, SourceActor, SourceMsg, spawn_session_supervisor}, }; @@ -101,12 +101,12 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Listener<'a, R, M> { guard.session_supervisor = Some(supervisor_ref); guard.supervisor_handle = Some(handle); - if let Err(error) = (SessionEvent::RunningActive { + if let Err(error) = (SessionLifecycleEvent::Active { session_id: params.session_id, }) .emit(&guard.app) { - tracing::error!(?error, "failed_to_emit_running_active"); + tracing::error!(?error, "failed_to_emit_active"); } tracing::info!("session_started"); @@ -133,7 +133,8 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Listener<'a, R, M> { }; if let Some(session_id) = session_id.clone() { - if let Err(error) = (SessionEvent::Finalizing { session_id }).emit(&guard.app) { + if let Err(error) = (SessionLifecycleEvent::Finalizing { session_id }).emit(&guard.app) + { tracing::error!(?error, "failed_to_emit_finalizing"); } } @@ -152,7 +153,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Listener<'a, R, M> { } if let Some(session_id) = session_id { - if let Err(error) = (SessionEvent::Inactive { session_id }).emit(&guard.app) { + if let Err(error) = (SessionLifecycleEvent::Inactive { session_id }).emit(&guard.app) { tracing::error!(?error, "failed_to_emit_inactive"); } } diff --git a/plugins/listener/src/fsm.rs b/plugins/listener/src/fsm.rs index 13047ee385..6caf367358 100644 --- a/plugins/listener/src/fsm.rs +++ b/plugins/listener/src/fsm.rs @@ -1,6 +1,6 @@ #[derive(Debug, Clone)] pub enum State { - RunningActive, + Active, Finalizing, Inactive, } @@ -12,7 +12,7 @@ impl serde::Serialize for State { { match self { State::Inactive => serializer.serialize_str("inactive"), - State::RunningActive => serializer.serialize_str("running_active"), + State::Active => serializer.serialize_str("active"), State::Finalizing => serializer.serialize_str("finalizing"), } } diff --git a/plugins/listener/src/lib.rs b/plugins/listener/src/lib.rs index 56572e64c5..50c8620165 100644 --- a/plugins/listener/src/lib.rs +++ b/plugins/listener/src/lib.rs @@ -25,7 +25,7 @@ pub struct State { impl State { pub fn get_state(&self) -> fsm::State { if self.session_supervisor.is_some() { - crate::fsm::State::RunningActive + crate::fsm::State::Active } else { crate::fsm::State::Inactive } @@ -44,7 +44,12 @@ fn make_specta_builder() -> tauri_specta::Builder { commands::stop_session::, commands::get_state::, ]) - .events(tauri_specta::collect_events![SessionEvent]) + .events(tauri_specta::collect_events![ + SessionLifecycleEvent, + SessionProgressEvent, + SessionErrorEvent, + SessionDataEvent + ]) .error_handling(tauri_specta::ErrorHandlingMode::Result) }