Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions apps/desktop/src/store/zustand/listener/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,25 @@ const hasSessionId = (
): payload is SessionEvent & { session_id: string } =>
"session_id" in payload && typeof payload.session_id === "string";

export type LoadingPhase =
| "idle"
| "initializing_audio"
| "audio_ready"
| "connecting"
| "connected";

export type GeneralState = {
live: {
sessionEventUnlisten?: () => void;
loading: boolean;
loadingPhase: LoadingPhase;
status: LiveSessionStatus;
amplitude: { mic: number; speaker: number };
seconds: number;
intervalId?: NodeJS.Timeout;
sessionId: string | null;
muted: boolean;
lastError: string | null;
};
};

Expand All @@ -65,10 +74,12 @@ const initialState: GeneralState = {
live: {
status: "inactive",
loading: false,
loadingPhase: "idle",
amplitude: { mic: 0, speaker: 0 },
seconds: 0,
sessionId: null,
muted: false,
lastError: null,
},
};

Expand Down Expand Up @@ -155,6 +166,7 @@ export const createGeneralSlice = <
mutate(state, (draft) => {
draft.live.status = "running_active";
draft.live.loading = false;
draft.live.loadingPhase = "idle";
draft.live.seconds = 0;
draft.live.intervalId = intervalId;
draft.live.sessionId = targetSessionId;
Expand All @@ -181,15 +193,55 @@ export const createGeneralSlice = <
mutate(state, (draft) => {
draft.live.status = "inactive";
draft.live.loading = false;
draft.live.loadingPhase = "idle";
draft.live.sessionId = null;
draft.live.sessionEventUnlisten = undefined;
draft.live.lastError = null;
}),
);

get().resetTranscript();
} else if (payload.type === "streamResponse") {
const response = payload.response;
get().handleTranscriptResponse(response as unknown as StreamResponse);
} else if (payload.type === "initializingAudio") {
set((state) =>
mutate(state, (draft) => {
draft.live.loadingPhase = "initializing_audio";
draft.live.lastError = null;
}),
);
} else if (payload.type === "audioReady") {
set((state) =>
mutate(state, (draft) => {
draft.live.loadingPhase = "audio_ready";
}),
);
} else if (payload.type === "audioError") {
set((state) =>
mutate(state, (draft) => {
draft.live.lastError = payload.error;
draft.live.loading = false;
}),
);
} else if (payload.type === "connecting") {
set((state) =>
mutate(state, (draft) => {
draft.live.loadingPhase = "connecting";
}),
);
} else if (payload.type === "connected") {
set((state) =>
mutate(state, (draft) => {
draft.live.loadingPhase = "connected";
}),
);
} else if (payload.type === "connectionError") {
set((state) =>
mutate(state, (draft) => {
draft.live.lastError = payload.error;
}),
);
}
};

Expand Down Expand Up @@ -259,11 +311,13 @@ export const createGeneralSlice = <

draft.live.sessionEventUnlisten = undefined;
draft.live.loading = false;
draft.live.loadingPhase = "idle";
draft.live.status = "inactive";
draft.live.amplitude = { mic: 0, speaker: 0 };
draft.live.seconds = 0;
draft.live.sessionId = null;
draft.live.muted = initialState.live.muted;
draft.live.lastError = null;
}),
);
},
Expand Down
60 changes: 57 additions & 3 deletions plugins/listener/src/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ impl Actor for ListenerActor {
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (tx, rx_task, shutdown_tx) = spawn_rx_task(args.clone(), myself).await?;
if let Err(error) = (SessionEvent::Connecting {
session_id: args.session_id.clone(),
})
.emit(&args.app)
{
tracing::error!(?error, "failed_to_emit_connecting");
}

let (tx, rx_task, shutdown_tx, adapter_name) = spawn_rx_task(args.clone(), myself).await?;

if let Err(error) = (SessionEvent::Connected {
session_id: args.session_id.clone(),
adapter: adapter_name,
})
.emit(&args.app)
{
tracing::error!(?error, "failed_to_emit_connected");
}

let state = ListenerState {
args,
Expand Down Expand Up @@ -197,13 +214,24 @@ async fn spawn_rx_task(
ChannelSender,
tokio::task::JoinHandle<()>,
tokio::sync::oneshot::Sender<()>,
String, // adapter name
),
ActorProcessingErr,
> {
let adapter_kind = AdapterKind::from_url_and_languages(&args.base_url, &args.languages);
let is_dual = matches!(args.mode, crate::actors::ChannelMode::MicAndSpeaker);

match (adapter_kind, is_dual) {
let adapter_name = match adapter_kind {
AdapterKind::Argmax => "Argmax",
AdapterKind::Soniox => "Soniox",
AdapterKind::Fireworks => "Fireworks",
AdapterKind::Deepgram => "Deepgram",
AdapterKind::AssemblyAI => "AssemblyAI",
AdapterKind::OpenAI => "OpenAI",
AdapterKind::Gladia => "Gladia",
};

let result = match (adapter_kind, is_dual) {
(AdapterKind::Argmax, false) => {
spawn_rx_task_single_with_adapter::<ArgmaxAdapter>(args, myself).await
}
Expand Down Expand Up @@ -246,7 +274,9 @@ async fn spawn_rx_task(
(AdapterKind::Gladia, true) => {
spawn_rx_task_dual_with_adapter::<GladiaAdapter>(args, myself).await
}
}
}?;

Ok((result.0, result.1, result.2, adapter_name.to_string()))
}

fn build_listen_params(args: &ListenerArgs) -> owhisper_interface::ListenParams {
Expand Down Expand Up @@ -311,10 +341,22 @@ async fn spawn_rx_task_single_with_adapter<A: RealtimeSttAdapter>(
timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(),
"listen_ws_connect_timeout(single)"
);
let _ = (SessionEvent::ConnectionError {
session_id: args.session_id.clone(),
error: "listen_ws_connect_timeout".to_string(),
is_retryable: true,
})
.emit(&args.app);
return Err(actor_error("listen_ws_connect_timeout"));
}
Ok(Err(e)) => {
tracing::error!(error = ?e, "listen_ws_connect_failed(single)");
let _ = (SessionEvent::ConnectionError {
session_id: args.session_id.clone(),
error: format!("listen_ws_connect_failed: {:?}", e),
is_retryable: true,
})
.emit(&args.app);
return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e)));
}
Ok(Ok(res)) => res,
Expand Down Expand Up @@ -371,10 +413,22 @@ async fn spawn_rx_task_dual_with_adapter<A: RealtimeSttAdapter>(
timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(),
"listen_ws_connect_timeout(dual)"
);
let _ = (SessionEvent::ConnectionError {
session_id: args.session_id.clone(),
error: "listen_ws_connect_timeout".to_string(),
is_retryable: true,
})
.emit(&args.app);
return Err(actor_error("listen_ws_connect_timeout"));
}
Ok(Err(e)) => {
tracing::error!(error = ?e, "listen_ws_connect_failed(dual)");
let _ = (SessionEvent::ConnectionError {
session_id: args.session_id.clone(),
error: format!("listen_ws_connect_failed: {:?}", e),
is_retryable: true,
})
.emit(&args.app);
return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e)));
}
Ok(Ok(res)) => res,
Expand Down
32 changes: 31 additions & 1 deletion plugins/listener/src/actors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct SourceArgs {
}

pub struct SourceState {
app: tauri::AppHandle,
session_id: String,
mic_device: Option<String>,
onboarding: bool,
Expand Down Expand Up @@ -123,6 +124,14 @@ impl Actor for SourceActor {
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
if let Err(error) = (SessionEvent::InitializingAudio {
session_id: args.session_id.clone(),
})
.emit(&args.app)
{
tracing::error!(?error, "failed_to_emit_initializing_audio");
}

let device_watcher = DeviceChangeWatcher::spawn(myself.clone());

let silence_stream_tx = Some(hypr_audio::AudioOutput::silence());
Expand All @@ -134,6 +143,7 @@ impl Actor for SourceActor {
let pipeline = Pipeline::new(args.app.clone(), args.session_id.clone());

let mut st = SourceState {
app: args.app,
session_id: args.session_id,
mic_device,
onboarding: args.onboarding,
Expand Down Expand Up @@ -185,6 +195,12 @@ impl Actor for SourceActor {
}
SourceMsg::StreamFailed(reason) => {
tracing::error!(%reason, "source_stream_failed_stopping");
let _ = (SessionEvent::AudioError {
session_id: st.session_id.clone(),
error: reason.clone(),
device: st.mic_device.clone(),
})
.emit(&st.app);
myself.stop(Some(reason));
}
}
Expand Down Expand Up @@ -221,11 +237,25 @@ async fn start_source_loop(

st.pipeline.reset();

match new_mode {
let result = match new_mode {
ChannelMode::MicOnly => start_source_loop_mic_only(myself, st).await,
ChannelMode::SpeakerOnly => start_source_loop_speaker_only(myself, st).await,
ChannelMode::MicAndSpeaker => start_source_loop_mic_and_speaker(myself, st).await,
};

if result.is_ok() {
if let Err(error) = (SessionEvent::AudioReady {
session_id: st.session_id.clone(),
mode: st.current_mode.into(),
device: st.mic_device.clone(),
})
.emit(&st.app)
{
tracing::error!(?error, "failed_to_emit_audio_ready");
}
}

result
}

async fn start_source_loop_mic_only(
Expand Down
46 changes: 46 additions & 0 deletions plugins/listener/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ macro_rules! common_event_derives {
};
}

/// Audio channel mode for the session
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, specta::Type)]
#[serde(rename_all = "snake_case")]
pub enum AudioMode {
MicOnly,
SpeakerOnly,
MicAndSpeaker,
}

impl From<crate::actors::ChannelMode> for AudioMode {
fn from(mode: crate::actors::ChannelMode) -> Self {
match mode {
crate::actors::ChannelMode::MicOnly => AudioMode::MicOnly,
crate::actors::ChannelMode::SpeakerOnly => AudioMode::SpeakerOnly,
crate::actors::ChannelMode::MicAndSpeaker => AudioMode::MicAndSpeaker,
}
}
}

common_event_derives! {
#[serde(tag = "type")]
pub enum SessionEvent {
Expand All @@ -30,6 +49,33 @@ common_event_derives! {
session_id: String,
response: Box<StreamResponse>,
},
#[serde(rename = "initializingAudio")]
InitializingAudio { session_id: String },
#[serde(rename = "audioReady")]
AudioReady {
session_id: String,
mode: AudioMode,
device: Option<String>,
},
#[serde(rename = "audioError")]
AudioError {
session_id: String,
error: String,
device: Option<String>,
},
#[serde(rename = "connecting")]
Connecting { session_id: String },
#[serde(rename = "connected")]
Connected {
session_id: String,
adapter: String,
},
#[serde(rename = "connectionError")]
ConnectionError {
session_id: String,
error: String,
is_retryable: bool,
},
ExitRequested
}
}