From ea7021c80e1f24e3b46f72f52dc93bf2a48079fb Mon Sep 17 00:00:00 2001 From: beastoin Date: Fri, 6 Mar 2026 08:35:28 +0100 Subject: [PATCH 1/8] Add BackendTranscriptionService for /v4/listen WebSocket New service replacing direct Deepgram connection. Connects to backend /v4/listen with Bearer auth header, streams mono PCM16 audio at 16kHz, parses backend response format (segment arrays, ping heartbeats, events). Configurable source parameter for BLE device type propagation. Co-Authored-By: Claude Opus 4.6 --- .../Sources/BackendTranscriptionService.swift | 492 ++++++++++++++++++ 1 file changed, 492 insertions(+) create mode 100644 desktop/Desktop/Sources/BackendTranscriptionService.swift diff --git a/desktop/Desktop/Sources/BackendTranscriptionService.swift b/desktop/Desktop/Sources/BackendTranscriptionService.swift new file mode 100644 index 0000000000..12768fd583 --- /dev/null +++ b/desktop/Desktop/Sources/BackendTranscriptionService.swift @@ -0,0 +1,492 @@ +import Foundation + +/// Service for real-time speech-to-text transcription via the OMI backend. +/// Streams mono audio over WebSocket to /v4/listen and receives transcript segments. +/// This replaces direct Deepgram connections — the backend handles STT server-side. +class BackendTranscriptionService { + + // MARK: - Types + + /// Reuse the same TranscriptSegment type for compatibility with existing handlers + typealias TranscriptSegment = TranscriptionService.TranscriptSegment + typealias TranscriptHandler = (TranscriptSegment) -> Void + typealias ErrorHandler = (Error) -> Void + typealias ConnectionHandler = () -> Void + + enum BackendTranscriptionError: LocalizedError { + case notSignedIn + case connectionFailed(Error) + case invalidResponse + case webSocketError(String) + + var errorDescription: String? { + switch self { + case .notSignedIn: + return "Not signed in — cannot connect to backend" + case .connectionFailed(let error): + return "Connection failed: \(error.localizedDescription)" + case .invalidResponse: + return "Invalid response from backend" + case .webSocketError(let message): + return "WebSocket error: \(message)" + } + } + } + + // MARK: - Properties + + private var webSocketTask: URLSessionWebSocketTask? + private var urlSession: URLSession? + private var isConnected = false + private var shouldReconnect = false + + // Callbacks + private var onTranscript: TranscriptHandler? + private var onError: ErrorHandler? + private var onConnected: ConnectionHandler? + private var onDisconnected: ConnectionHandler? + + // Configuration + private let language: String + private let sampleRate = 16000 + private let codec = "pcm16" + private let channels = 1 // Always mono — backend handles diarization + private let source: String + private let conversationTimeout: Int + + // Reconnection + private var reconnectAttempts = 0 + private let maxReconnectAttempts = 10 + private var reconnectTask: Task? + + // Keepalive — send empty data periodically to prevent timeout + private var keepaliveTask: Task? + private let keepaliveInterval: TimeInterval = 8.0 + + // Watchdog: detect stale connections where WebSocket dies silently + private var watchdogTask: Task? + private var lastDataReceivedAt: Date? + private var lastKeepaliveSuccessAt: Date? + private let watchdogInterval: TimeInterval = 30.0 + private let staleThreshold: TimeInterval = 60.0 + + // Audio buffering + private var audioBuffer = Data() + private let audioBufferSize = 3200 // ~100ms of 16kHz 16-bit mono (16000 * 2 * 0.1) + private let audioBufferLock = NSLock() + + // MARK: - Initialization + + /// Initialize the backend transcription service + /// - Parameters: + /// - language: Language code for transcription (e.g., "en", "multi") + /// - source: Audio source identifier for backend analytics (e.g., "desktop", "omi", "bee") + /// - conversationTimeout: Seconds of silence before the backend creates a memory + init(language: String = "en", source: String = "desktop", conversationTimeout: Int = 120) { + self.language = language + self.source = source + self.conversationTimeout = conversationTimeout + log("BackendTranscriptionService: Initialized with language=\(language), source=\(source)") + } + + // MARK: - Public Methods + + /// Start the transcription service + func start( + onTranscript: @escaping TranscriptHandler, + onError: ErrorHandler? = nil, + onConnected: ConnectionHandler? = nil, + onDisconnected: ConnectionHandler? = nil + ) { + self.onTranscript = onTranscript + self.onError = onError + self.onConnected = onConnected + self.onDisconnected = onDisconnected + self.shouldReconnect = true + self.reconnectAttempts = 0 + + connect() + } + + /// Stop the transcription service + func stop() { + shouldReconnect = false + reconnectTask?.cancel() + reconnectTask = nil + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + + flushAudioBuffer() + disconnect() + } + + /// Signal the backend that no more audio will be sent, but keep connection open + /// to receive final transcription results. Call stop() later to fully disconnect. + func finishStream() { + shouldReconnect = false + reconnectTask?.cancel() + reconnectTask = nil + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + + flushAudioBuffer() + + // Backend doesn't have a CloseStream message like Deepgram. + // The connection will be closed when stop() is called. + log("BackendTranscriptionService: finishStream called, waiting for final results") + } + + /// Send audio data to the backend (buffered for efficiency) + func sendAudio(_ data: Data) { + guard isConnected else { return } + + audioBufferLock.lock() + audioBuffer.append(data) + + if audioBuffer.count >= audioBufferSize { + let chunk = audioBuffer + audioBuffer = Data() + audioBufferLock.unlock() + sendAudioChunk(chunk) + } else { + audioBufferLock.unlock() + } + } + + /// Flush any remaining audio in the buffer + private func flushAudioBuffer() { + audioBufferLock.lock() + let chunk = audioBuffer + audioBuffer = Data() + audioBufferLock.unlock() + + if !chunk.isEmpty { + sendAudioChunk(chunk) + } + } + + /// Actually send an audio chunk over the WebSocket + private func sendAudioChunk(_ data: Data) { + guard isConnected, let webSocketTask = webSocketTask else { return } + + let message = URLSessionWebSocketTask.Message.data(data) + webSocketTask.send(message) { [weak self] error in + if let error = error { + logError("BackendTranscriptionService: Send error", error: error) + self?.handleDisconnection() + } + } + } + + /// No-op for backend (Deepgram-specific Finalize message not needed) + func sendFinalize() { + // Backend handles segmentation server-side + } + + /// Public keepalive for VAD gate to call during extended silence + func sendKeepalivePublic() { + sendKeepalive() + } + + /// Check if connected + var connected: Bool { + return isConnected + } + + // MARK: - Connection + + private func connect() { + Task { + do { + let token = try await AuthService.shared.getIdToken() + let baseURL = await APIClient.shared.baseURL + self.connectWithToken(token, baseURL: baseURL) + } catch { + logError("BackendTranscriptionService: Failed to get auth token", error: error) + self.onError?(BackendTranscriptionError.notSignedIn) + } + } + } + + private func connectWithToken(_ token: String, baseURL: String) { + + // Convert http(s) to ws(s) + let wsBaseURL: String + if baseURL.hasPrefix("https://") { + wsBaseURL = "wss://" + baseURL.dropFirst("https://".count) + } else if baseURL.hasPrefix("http://") { + wsBaseURL = "ws://" + baseURL.dropFirst("http://".count) + } else { + wsBaseURL = "wss://" + baseURL + } + + // Strip trailing slash before appending path + let cleanBase = wsBaseURL.hasSuffix("/") ? String(wsBaseURL.dropLast()) : wsBaseURL + + var components = URLComponents(string: cleanBase + "/v4/listen")! + components.queryItems = [ + URLQueryItem(name: "language", value: language), + URLQueryItem(name: "sample_rate", value: String(sampleRate)), + URLQueryItem(name: "codec", value: codec), + URLQueryItem(name: "channels", value: String(channels)), + URLQueryItem(name: "source", value: source), + URLQueryItem(name: "include_speech_profile", value: "true"), + URLQueryItem(name: "speaker_auto_assign", value: "enabled"), + URLQueryItem(name: "conversation_timeout", value: String(conversationTimeout)), + ] + + guard let url = components.url else { + onError?(BackendTranscriptionError.connectionFailed(NSError(domain: "Invalid URL", code: -1))) + return + } + + log("BackendTranscriptionService: Connecting to \(url.absoluteString)") + + // Create URL request with Bearer auth header (same as mobile app) + var request = URLRequest(url: url) + request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization") + + // Create URLSession and WebSocket task + let configuration = URLSessionConfiguration.default + configuration.timeoutIntervalForRequest = 30 + configuration.timeoutIntervalForResource = 0 // No resource timeout for long-lived WebSocket + urlSession = URLSession(configuration: configuration) + webSocketTask = urlSession?.webSocketTask(with: request) + + // Start the connection + webSocketTask?.resume() + + // Start receiving messages + receiveMessage() + + // Mark as connected after a short delay (backend doesn't send a connect confirmation) + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { [weak self] in + guard let self = self, self.webSocketTask?.state == .running else { return } + self.isConnected = true + self.reconnectAttempts = 0 + self.lastDataReceivedAt = Date() + self.lastKeepaliveSuccessAt = Date() + log("BackendTranscriptionService: Connected") + self.startKeepalive() + self.startWatchdog() + self.onConnected?() + } + } + + // MARK: - Keepalive + + private func startKeepalive() { + keepaliveTask?.cancel() + keepaliveTask = Task { [weak self] in + while !Task.isCancelled { + try? await Task.sleep(nanoseconds: UInt64(self?.keepaliveInterval ?? 8.0) * 1_000_000_000) + guard !Task.isCancelled, let self = self, self.isConnected else { break } + self.sendKeepalive() + } + } + } + + private func sendKeepalive() { + guard isConnected, let webSocketTask = webSocketTask else { return } + + // Send a small chunk of silence as keepalive (2 bytes of zero = 1 silent sample) + let silence = Data(repeating: 0, count: 2) + let message = URLSessionWebSocketTask.Message.data(silence) + webSocketTask.send(message) { [weak self] error in + if let error = error { + logError("BackendTranscriptionService: Keepalive error", error: error) + self?.handleDisconnection() + } else { + self?.lastKeepaliveSuccessAt = Date() + } + } + } + + // MARK: - Watchdog + + private func startWatchdog() { + watchdogTask?.cancel() + watchdogTask = Task { [weak self] in + while !Task.isCancelled { + try? await Task.sleep(nanoseconds: UInt64(self?.watchdogInterval ?? 30.0) * 1_000_000_000) + guard !Task.isCancelled, let self = self, self.isConnected else { break } + + if let lastData = self.lastDataReceivedAt, + Date().timeIntervalSince(lastData) > self.staleThreshold { + if let lastKeepalive = self.lastKeepaliveSuccessAt, + Date().timeIntervalSince(lastKeepalive) < self.staleThreshold { + continue + } + log("BackendTranscriptionService: Watchdog detected stale connection — forcing reconnect") + self.handleDisconnection() + } + } + } + } + + // MARK: - Disconnect / Reconnect + + private func disconnect() { + isConnected = false + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + webSocketTask?.cancel(with: .normalClosure, reason: nil) + webSocketTask = nil + urlSession?.invalidateAndCancel() + urlSession = nil + log("BackendTranscriptionService: Disconnected") + onDisconnected?() + } + + private func handleDisconnection() { + guard isConnected else { return } + + isConnected = false + keepaliveTask?.cancel() + keepaliveTask = nil + watchdogTask?.cancel() + watchdogTask = nil + webSocketTask = nil + urlSession?.invalidateAndCancel() + urlSession = nil + onDisconnected?() + + if shouldReconnect && reconnectAttempts < maxReconnectAttempts { + reconnectAttempts += 1 + let delay = min(pow(2.0, Double(reconnectAttempts)), 32.0) + log("BackendTranscriptionService: Reconnecting in \(delay)s (attempt \(reconnectAttempts))") + + reconnectTask = Task { + try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + guard !Task.isCancelled, self.shouldReconnect else { return } + self.connect() + } + } else if reconnectAttempts >= maxReconnectAttempts { + log("BackendTranscriptionService: Max reconnect attempts reached") + onError?(BackendTranscriptionError.webSocketError("Max reconnect attempts reached")) + } + } + + // MARK: - Message Handling + + private func receiveMessage() { + webSocketTask?.receive { [weak self] result in + guard let self = self else { return } + + switch result { + case .success(let message): + self.handleMessage(message) + self.receiveMessage() + + case .failure(let error): + guard self.isConnected else { return } + logError("BackendTranscriptionService: Receive error", error: error) + self.handleDisconnection() + } + } + } + + private func handleMessage(_ message: URLSessionWebSocketTask.Message) { + lastDataReceivedAt = Date() + + switch message { + case .string(let text): + parseResponse(text) + case .data(let data): + if let text = String(data: data, encoding: .utf8) { + parseResponse(text) + } + @unknown default: + break + } + } + + private func parseResponse(_ text: String) { + // Handle heartbeat ping from backend + if text == "ping" { + return + } + + guard let data = text.data(using: .utf8) else { return } + + // Try parsing as array of transcript segments (main response format) + if let segments = try? JSONDecoder().decode([BackendSegment].self, from: data) { + for segment in segments { + // Map backend is_user to channel index: + // is_user=true → channelIndex=0 (mic/user) + // is_user=false → channelIndex=1 (system/others) + let channelIndex = segment.is_user ? 0 : 1 + + let transcriptSegment = TranscriptSegment( + text: segment.text, + isFinal: true, + speechFinal: true, + confidence: 1.0, + words: [TranscriptSegment.Word( + word: segment.text, + start: segment.start, + end: segment.end, + confidence: 1.0, + speaker: segment.speaker_id, + punctuatedWord: segment.text + )], + channelIndex: channelIndex + ) + onTranscript?(transcriptSegment) + } + return + } + + // Try parsing as event object (memory_created, service_status, etc.) + if let event = try? JSONDecoder().decode(BackendEvent.self, from: data) { + switch event.type { + case "memory_created": + log("BackendTranscriptionService: Memory created") + case "service_status": + log("BackendTranscriptionService: Service status: \(event.status ?? "unknown")") + default: + log("BackendTranscriptionService: Event: \(event.type)") + } + return + } + + // Unknown message — log for debugging + log("BackendTranscriptionService: Unknown message: \(text.prefix(200))") + } +} + +// MARK: - Backend Response Models + +/// Transcript segment from the OMI backend +private struct BackendSegment: Decodable { + let text: String + let speaker: String? + let speaker_id: Int? + let is_user: Bool + let start: Double + let end: Double + let person_id: String? +} + +/// Event message from the OMI backend +private struct BackendEvent: Decodable { + let type: String + let status: String? + + enum CodingKeys: String, CodingKey { + case type + case status + } + + init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + type = try container.decode(String.self, forKey: .type) + status = try container.decodeIfPresent(String.self, forKey: .status) + } +} From 8c28e580346db2773f99fea4aaab449d267a9297 Mon Sep 17 00:00:00 2001 From: beastoin Date: Fri, 6 Mar 2026 08:35:34 +0100 Subject: [PATCH 2/8] Add mono output mode and fix single-source operation in AudioMixer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OutputMode enum (.stereo/.mono) with mono averaging both channels. Fix processBuffers() to work when only one source has data (e.g. system audio disabled by default) — previously min(mic, 0) = 0 blocked all output. Existing silence-padding handles the gap. Co-Authored-By: Claude Opus 4.6 --- desktop/Desktop/Sources/AudioMixer.swift | 75 ++++++++++++++++++++---- 1 file changed, 62 insertions(+), 13 deletions(-) diff --git a/desktop/Desktop/Sources/AudioMixer.swift b/desktop/Desktop/Sources/AudioMixer.swift index b35d9a58b7..88ac96178b 100644 --- a/desktop/Desktop/Sources/AudioMixer.swift +++ b/desktop/Desktop/Sources/AudioMixer.swift @@ -1,19 +1,26 @@ import Foundation -/// Mixes microphone and system audio into a stereo stream for multichannel transcription +/// Mixes microphone and system audio into a combined stream for transcription. +/// Supports stereo (interleaved mic+system) or mono (averaged) output. /// Channel 0 (left) = Microphone (user) /// Channel 1 (right) = System audio (others) class AudioMixer { // MARK: - Types - /// Callback for receiving stereo audio chunks + enum OutputMode { + case stereo // Interleaved [mic0, sys0, mic1, sys1, ...] — for Deepgram multichannel + case mono // Averaged (mic + system) / 2 — for backend /v4/listen + } + + /// Callback for receiving mixed audio chunks typealias StereoAudioHandler = (Data) -> Void // MARK: - Properties private var onStereoChunk: StereoAudioHandler? private var isRunning = false + private(set) var outputMode: OutputMode = .stereo // Audio buffers (16kHz mono Int16 PCM) private var micBuffer = Data() @@ -29,15 +36,18 @@ class AudioMixer { // MARK: - Public Methods /// Start the mixer - /// - Parameter onStereoChunk: Callback receiving interleaved stereo 16-bit PCM at 16kHz - func start(onStereoChunk: @escaping StereoAudioHandler) { + /// - Parameters: + /// - outputMode: `.stereo` for interleaved multichannel, `.mono` for averaged single-channel + /// - onStereoChunk: Callback receiving mixed 16-bit PCM at 16kHz + func start(outputMode: OutputMode = .stereo, onStereoChunk: @escaping StereoAudioHandler) { bufferLock.lock() + self.outputMode = outputMode self.onStereoChunk = onStereoChunk self.isRunning = true micBuffer = Data() systemBuffer = Data() bufferLock.unlock() - log("AudioMixer: Started") + log("AudioMixer: Started (output=\(outputMode))") } /// Stop the mixer and flush remaining audio @@ -105,12 +115,17 @@ class AudioMixer { if flush { // When flushing, process whatever is available bytesToProcess = max(micBuffer.count, systemBuffer.count) + } else if micBuffer.count >= minBufferBytes && systemBuffer.count >= minBufferBytes { + // Both buffers have data — use shorter to stay in sync + bytesToProcess = (min(micBuffer.count, systemBuffer.count) / 2) * 2 + } else if micBuffer.count >= minBufferBytes { + // Only mic has data (system audio disabled/unavailable) — pad system with silence + bytesToProcess = (micBuffer.count / 2) * 2 + } else if systemBuffer.count >= minBufferBytes { + // Only system has data — pad mic with silence + bytesToProcess = (systemBuffer.count / 2) * 2 } else { - // Normal operation: process when both have data - let minAvailable = min(micBuffer.count, systemBuffer.count) - guard minAvailable >= minBufferBytes else { return } - // Align to sample boundary (2 bytes per Int16 sample) - bytesToProcess = (minAvailable / 2) * 2 + return } guard bytesToProcess >= 2 else { return } @@ -137,11 +152,17 @@ class AudioMixer { systemBuffer = Data() } - // Interleave into stereo - let stereoData = interleave(mic: micData, system: sysData) + // Mix according to output mode + let mixedData: Data + switch outputMode { + case .stereo: + mixedData = interleave(mic: micData, system: sysData) + case .mono: + mixedData = mixToMono(mic: micData, system: sysData) + } // Send to callback - onStereoChunk?(stereoData) + onStereoChunk?(mixedData) } /// Interleave two mono Int16 streams into stereo @@ -174,4 +195,32 @@ class AudioMixer { Data(buffer: buffer) } } + + /// Average two mono Int16 streams into a single mono stream + /// Output format: [(mic0+sys0)/2, (mic1+sys1)/2, ...] + private func mixToMono(mic: Data, system: Data) -> Data { + let sampleCount = mic.count / 2 + + var monoSamples = [Int16]() + monoSamples.reserveCapacity(sampleCount) + + mic.withUnsafeBytes { micPtr in + system.withUnsafeBytes { sysPtr in + let micSamples = micPtr.bindMemory(to: Int16.self) + let sysSamples = sysPtr.bindMemory(to: Int16.self) + + for i in 0.. Date: Fri, 6 Mar 2026 08:35:38 +0100 Subject: [PATCH 3/8] Switch BleAudioService to closure-based audio sink Replace concrete TranscriptionService parameter with audioSink closure for decoupled audio routing. Callers provide destination closure instead of coupling to a specific transcription type. Co-Authored-By: Claude Opus 4.6 --- .../Sources/Audio/BleAudioService.swift | 38 ++++--------------- 1 file changed, 7 insertions(+), 31 deletions(-) diff --git a/desktop/Desktop/Sources/Audio/BleAudioService.swift b/desktop/Desktop/Sources/Audio/BleAudioService.swift index 0cb9bb527f..9078653668 100644 --- a/desktop/Desktop/Sources/Audio/BleAudioService.swift +++ b/desktop/Desktop/Sources/Audio/BleAudioService.swift @@ -27,7 +27,7 @@ final class BleAudioService: ObservableObject { private var cancellables = Set() // Audio delivery - private var transcriptionService: TranscriptionService? + private var audioSink: ((Data) -> Void)? private var audioDataHandler: ((Data) -> Void)? private var rawFrameHandler: ((Data) -> Void)? @@ -44,12 +44,12 @@ final class BleAudioService: ObservableObject { /// Start processing audio from a device connection /// - Parameters: /// - connection: The device connection to get audio from - /// - transcriptionService: Optional transcription service to send audio to + /// - audioSink: Optional closure to receive decoded mono PCM audio (e.g., send to transcription service) /// - audioDataHandler: Optional handler for decoded PCM data (alternative to transcription) /// - rawFrameHandler: Optional handler for raw encoded frames (for WAL recording) func startProcessing( from connection: DeviceConnection, - transcriptionService: TranscriptionService? = nil, + audioSink: ((Data) -> Void)? = nil, audioDataHandler: ((Data) -> Void)? = nil, rawFrameHandler: ((Data) -> Void)? = nil ) async { @@ -58,7 +58,7 @@ final class BleAudioService: ObservableObject { return } - self.transcriptionService = transcriptionService + self.audioSink = audioSink self.audioDataHandler = audioDataHandler self.rawFrameHandler = rawFrameHandler @@ -126,7 +126,7 @@ final class BleAudioService: ObservableObject { cancellables.removeAll() isProcessing = false - transcriptionService = nil + audioSink = nil audioDataHandler = nil rawFrameHandler = nil @@ -194,37 +194,13 @@ final class BleAudioService: ObservableObject { // Calculate audio level updateAudioLevel(from: pcmData) - // Send to transcription service (mono channel) - if let transcription = transcriptionService { - // TranscriptionService expects stereo (2 channels) for multichannel transcription - // For BLE device audio, we duplicate to both channels (device is the "user") - let stereoData = convertToStereo(pcmData) - transcription.sendAudio(stereoData) - } + // Send decoded mono PCM to audio sink (e.g., transcription service) + audioSink?(pcmData) // Send to custom handler audioDataHandler?(pcmData) } - /// Convert mono PCM to stereo (duplicate to both channels) - private func convertToStereo(_ monoData: Data) -> Data { - // Mono: [S0, S1, S2, ...] - // Stereo: [S0, S0, S1, S1, S2, S2, ...] (interleaved) - var stereoData = Data(capacity: monoData.count * 2) - - monoData.withUnsafeBytes { bytes in - let samples = bytes.bindMemory(to: Int16.self) - for i in 0.. Date: Fri, 6 Mar 2026 08:35:43 +0100 Subject: [PATCH 4/8] Route desktop STT through backend /v4/listen in AppState Replace direct Deepgram with BackendTranscriptionService. Force streaming mode, set AudioMixer to mono. Add backendOwnsConversation flag to skip createConversationFromSegments() (backend creates conversations via lifecycle manager). Pass correct source for BLE devices. Remove DEEPGRAM_API_KEY check. Co-Authored-By: Claude Opus 4.6 --- desktop/Desktop/Sources/AppState.swift | 316 ++++++++++++------------- 1 file changed, 148 insertions(+), 168 deletions(-) diff --git a/desktop/Desktop/Sources/AppState.swift b/desktop/Desktop/Sources/AppState.swift index 862bfaf070..474fec6ceb 100644 --- a/desktop/Desktop/Sources/AppState.swift +++ b/desktop/Desktop/Sources/AppState.swift @@ -135,13 +135,16 @@ class AppState: ObservableObject { // Transcription services private var audioCaptureService: AudioCaptureService? - private var transcriptionService: TranscriptionService? + private var transcriptionService: BackendTranscriptionService? private var systemAudioCaptureService: Any? // SystemAudioCaptureService (macOS 14.4+) private var audioMixer: AudioMixer? private var vadGateService: VADGateService? - // Batch transcription mode + // Batch transcription mode (disabled — backend handles everything via /v4/listen) private var useBatchTranscription: Bool = false + // When true, backend owns conversation creation via /v4/listen lifecycle manager. + // Desktop skips createConversationFromSegments() to avoid duplicates. + private var backendOwnsConversation: Bool = false private var recordingStartCATime: Double = 0 // CACurrentMediaTime at recording start // Speaker segments for diarized transcription (sliding window — older segments are in SQLite) @@ -430,12 +433,7 @@ class AppState: ObservableObject { } } - // Log final state of important keys - if getenv("DEEPGRAM_API_KEY") != nil { - log("DEEPGRAM_API_KEY is set") - } else { - log("WARNING: DEEPGRAM_API_KEY is NOT set") - } + // DEEPGRAM_API_KEY no longer needed — STT routed through backend /v4/listen } private func shouldSkipBundledAnthropicKey(key: String, sourcePath: String, bundledEnvPath: String?) -> Bool { @@ -1152,165 +1150,143 @@ class AppState: ObservableObject { } } - do { - // Get effective language from settings (handles auto-detect vs single language) - let effectiveLanguage = AssistantSettings.shared.effectiveTranscriptionLanguage - let vocabulary = AssistantSettings.shared.effectiveVocabulary - log("Transcription: Using language=\(effectiveLanguage) (autoDetect=\(AssistantSettings.shared.transcriptionAutoDetect), selected=\(AssistantSettings.shared.transcriptionLanguage))") - log("Transcription: Custom vocabulary: \(vocabulary.joined(separator: ", "))") - - // Determine transcription mode - useBatchTranscription = AssistantSettings.shared.batchTranscriptionEnabled && effectiveSource == .microphone - - if !useBatchTranscription { - // Streaming mode: initialize WebSocket transcription service - transcriptionService = try TranscriptionService(language: effectiveLanguage, vocabulary: vocabulary) + // Get effective language from settings (handles auto-detect vs single language) + let effectiveLanguage = AssistantSettings.shared.effectiveTranscriptionLanguage + let vocabulary = AssistantSettings.shared.effectiveVocabulary + log("Transcription: Using language=\(effectiveLanguage) (autoDetect=\(AssistantSettings.shared.transcriptionAutoDetect), selected=\(AssistantSettings.shared.transcriptionLanguage))") + log("Transcription: Custom vocabulary: \(vocabulary.joined(separator: ", "))") + + // Always use streaming mode through the backend — batch mode not needed + // (backend handles STT, diarization, and memory creation server-side) + useBatchTranscription = false + // Backend owns conversation creation via /v4/listen lifecycle manager + backendOwnsConversation = true + + // Set conversation source based on audio source + let sourceValue: String + if effectiveSource == .bleDevice, let device = DeviceProvider.shared.connectedDevice { + currentConversationSource = ConversationSource.from(deviceType: device.type) + recordingInputDeviceName = device.displayName + sourceValue = currentConversationSource.rawValue + } else { + currentConversationSource = .desktop + recordingInputDeviceName = AudioCaptureService.getCurrentMicrophoneName() + sourceValue = "desktop" + } + + transcriptionService = BackendTranscriptionService(language: effectiveLanguage, source: sourceValue) + + // Initialize audio services based on source + if effectiveSource == .microphone { + // Initialize audio capture service + audioCaptureService = AudioCaptureService() + + // Initialize audio mixer for combining mic and system audio + audioMixer = AudioMixer() + + // VAD gate is optional for streaming mode (silence gating) + if AssistantSettings.shared.vadGateEnabled { + let gate = VADGateService() + vadGateService = gate + log("Transcription: VAD gate enabled") } else { - log("Transcription: Batch mode enabled — skipping WebSocket") + vadGateService = nil + } + + // Initialize system audio capture if supported (macOS 14.4+) + // Can be disabled via: defaults write com.omi.desktop-dev disableSystemAudioCapture -bool true + // or: defaults write com.omi.computer-macos disableSystemAudioCapture -bool true + let systemAudioDisabled = UserDefaults.standard.bool(forKey: "disableSystemAudioCapture") + if systemAudioDisabled { + log("Transcription: System audio capture DISABLED by user preference (disableSystemAudioCapture)") + } else if #available(macOS 14.4, *) { + systemAudioCaptureService = SystemAudioCaptureService() + log("Transcription: System audio capture initialized (macOS 14.4+)") + } else { + log("Transcription: System audio capture not available (requires macOS 14.4+)") } + } + // For BLE device, BleAudioService will be used in startAudioCapture - // Set conversation source based on audio source - if effectiveSource == .bleDevice, let device = DeviceProvider.shared.connectedDevice { - currentConversationSource = ConversationSource.from(deviceType: device.type) - recordingInputDeviceName = device.displayName - } else { - currentConversationSource = .desktop - recordingInputDeviceName = AudioCaptureService.getCurrentMicrophoneName() - } - - // Initialize audio services based on source - if effectiveSource == .microphone { - // Initialize audio capture service - audioCaptureService = AudioCaptureService() - - // Initialize audio mixer for combining mic and system audio - audioMixer = AudioMixer() - - // VAD gate is always needed for batch mode (chunk boundaries), - // and optional for streaming mode (silence gating) - if useBatchTranscription || AssistantSettings.shared.vadGateEnabled { - let gate = VADGateService() - if useBatchTranscription && !gate.modelAvailable { - // Batch mode requires working VAD — fall back to streaming - log("Transcription: VAD models unavailable, falling back from batch to streaming mode") - useBatchTranscription = false - vadGateService = nil - transcriptionService = try TranscriptionService(language: effectiveLanguage, vocabulary: vocabulary) - } else { - vadGateService = gate - log("Transcription: VAD gate enabled\(useBatchTranscription ? " (batch mode)" : "")") - } - } else { - vadGateService = nil + // Start backend transcription service, then audio on connect + transcriptionService?.start( + onTranscript: { [weak self] segment in + Task { @MainActor in + self?.handleTranscriptSegment(segment) } - - // Initialize system audio capture if supported (macOS 14.4+) - // Can be disabled via: defaults write com.omi.desktop-dev disableSystemAudioCapture -bool true - // or: defaults write com.omi.computer-macos disableSystemAudioCapture -bool true - let systemAudioDisabled = UserDefaults.standard.bool(forKey: "disableSystemAudioCapture") - if systemAudioDisabled { - log("Transcription: System audio capture DISABLED by user preference (disableSystemAudioCapture)") - } else if #available(macOS 14.4, *) { - systemAudioCaptureService = SystemAudioCaptureService() - log("Transcription: System audio capture initialized (macOS 14.4+)") - } else { - log("Transcription: System audio capture not available (requires macOS 14.4+)") + }, + onError: { [weak self] error in + Task { @MainActor in + logError("Transcription error", error: error) + AnalyticsManager.shared.recordingError(error: error.localizedDescription) + self?.stopTranscription() } - } - // For BLE device, BleAudioService will be used in startAudioCapture - - if useBatchTranscription { - // Batch mode: start audio capture directly (no WebSocket to wait for) - recordingStartCATime = CACurrentMediaTime() - Task { @MainActor [weak self] in + }, + onConnected: { [weak self] in + Task { @MainActor in + log("Transcription: Connected to backend") + // Start audio capture once connected await self?.startAudioCapture(source: effectiveSource) } - } else { - // Streaming mode: start transcription service first, then audio on connect - transcriptionService?.start( - onTranscript: { [weak self] segment in - Task { @MainActor in - self?.handleTranscriptSegment(segment) - } - }, - onError: { [weak self] error in - Task { @MainActor in - logError("Transcription error", error: error) - AnalyticsManager.shared.recordingError(error: error.localizedDescription) - self?.stopTranscription() - } - }, - onConnected: { [weak self] in - Task { @MainActor in - log("Transcription: Connected to DeepGram") - // Start audio capture once connected - await self?.startAudioCapture(source: effectiveSource) - } - }, - onDisconnected: { - log("Transcription: Disconnected from DeepGram") - } - ) + }, + onDisconnected: { + log("Transcription: Disconnected from backend") } + ) - isTranscribing = true - AssistantSettings.shared.transcriptionEnabled = true - audioSource = effectiveSource - currentTranscript = "" - speakerSegments = [] - totalSegmentCount = 0 - totalWordCount = 0 - liveSpeakerPersonMap = [:] - LiveTranscriptMonitor.shared.clear() - recordingStartTime = Date() - AudioLevelMonitor.shared.reset() - RecordingTimer.shared.start() + isTranscribing = true + AssistantSettings.shared.transcriptionEnabled = true + audioSource = effectiveSource + currentTranscript = "" + speakerSegments = [] + totalSegmentCount = 0 + totalWordCount = 0 + liveSpeakerPersonMap = [:] + LiveTranscriptMonitor.shared.clear() + recordingStartTime = Date() + AudioLevelMonitor.shared.reset() + RecordingTimer.shared.start() - log("Transcription: Using source: \(effectiveSource.rawValue), device: \(recordingInputDeviceName ?? "Unknown")") + log("Transcription: Using source: \(effectiveSource.rawValue), device: \(recordingInputDeviceName ?? "Unknown")") - // Create crash-safe DB session for persistence - Task { - do { - let sessionId = try await TranscriptionStorage.shared.startSession( - source: currentConversationSource.rawValue, - language: effectiveLanguage, - timezone: TimeZone.current.identifier, - inputDeviceName: recordingInputDeviceName - ) - await MainActor.run { - self.currentSessionId = sessionId - // Start live notes session - LiveNotesMonitor.shared.startSession(sessionId: sessionId) - } - log("Transcription: Created DB session \(sessionId)") - } catch { - logError("Transcription: Failed to create DB session", error: error) - // Non-fatal - continue recording even if DB fails + // Create crash-safe DB session for persistence + Task { + do { + let sessionId = try await TranscriptionStorage.shared.startSession( + source: currentConversationSource.rawValue, + language: effectiveLanguage, + timezone: TimeZone.current.identifier, + inputDeviceName: recordingInputDeviceName + ) + await MainActor.run { + self.currentSessionId = sessionId + // Start live notes session + LiveNotesMonitor.shared.startSession(sessionId: sessionId) } + log("Transcription: Created DB session \(sessionId)") + } catch { + logError("Transcription: Failed to create DB session", error: error) + // Non-fatal - continue recording even if DB fails } + } - // Start 4-hour max recording timer - maxRecordingTimer = Timer.scheduledTimer(withTimeInterval: maxRecordingDuration, repeats: false) { [weak self] _ in - Task { @MainActor in - guard let self = self, self.isTranscribing else { return } - log("Transcription: 4-hour limit reached - finalizing conversation") - _ = await self.finalizeConversation() - // Start a new recording session automatically - self.stopAudioCapture() - self.clearTranscriptionState() - self.startTranscription() - } + // Start 4-hour max recording timer + maxRecordingTimer = Timer.scheduledTimer(withTimeInterval: maxRecordingDuration, repeats: false) { [weak self] _ in + Task { @MainActor in + guard let self = self, self.isTranscribing else { return } + log("Transcription: 4-hour limit reached - finalizing conversation") + _ = await self.finalizeConversation() + // Start a new recording session automatically + self.stopAudioCapture() + self.clearTranscriptionState() + self.startTranscription() } + } - // Track transcription started - AnalyticsManager.shared.transcriptionStarted() - - log("Transcription: Starting...") + // Track transcription started + AnalyticsManager.shared.transcriptionStarted() - } catch { - AnalyticsManager.shared.recordingError(error: error.localizedDescription) - showAlert(title: "Transcription Error", message: error.localizedDescription) - } + log("Transcription: Starting...") } /// Start audio capture and pipe to transcription service @@ -1330,23 +1306,12 @@ class AppState: ObservableObject { guard let audioCaptureService = audioCaptureService, let audioMixer = audioMixer else { return } - // Start the audio mixer - it will send stereo audio to transcription service - // Branch on batch vs streaming mode - audioMixer.start { [weak self] stereoData in + // Start the audio mixer in mono mode — backend handles diarization server-side + audioMixer.start(outputMode: .mono) { [weak self] monoData in guard let self = self else { return } - if self.useBatchTranscription { - // Batch mode: accumulate audio in VAD gate, transcribe on silence - guard let gate = self.vadGateService else { return } - let output = gate.processAudioBatch(stereoData) - if output.isComplete, let audioBuffer = output.audioBuffer { - let wallStartTime = output.speechStartWallTime - Task { @MainActor [weak self] in - await self?.batchTranscribeChunk(audioBuffer: audioBuffer, wallStartTime: wallStartTime) - } - } - } else if let gate = self.vadGateService { + if let gate = self.vadGateService { // Streaming mode with VAD gate - let output = gate.processAudio(stereoData) + let output = gate.processAudio(monoData) if !output.audioToSend.isEmpty { self.transcriptionService?.sendAudio(output.audioToSend) } else if gate.needsKeepalive() { @@ -1357,7 +1322,7 @@ class AppState: ObservableObject { } } else { // Streaming mode without VAD gate - self.transcriptionService?.sendAudio(stereoData) + self.transcriptionService?.sendAudio(monoData) } } @@ -1411,10 +1376,12 @@ class AppState: ObservableObject { return } - // Start BLE audio processing and pipe directly to transcription + // Start BLE audio processing and pipe mono PCM directly to backend transcription await BleAudioService.shared.startProcessing( from: connection, - transcriptionService: transcriptionService, + audioSink: { [weak transcriptionService] pcmData in + transcriptionService?.sendAudio(pcmData) + }, audioDataHandler: { _ in // Audio level is updated by BleAudioService Task { @MainActor in @@ -2095,6 +2062,19 @@ class AppState: ObservableObject { log("Transcription: Finalizing conversation with \(segmentsToUpload.count) segments") + // When backend owns conversation creation (via /v4/listen lifecycle manager), + // skip client-side createConversationFromSegments() to avoid duplicates. + // The backend already has all segments from the live stream and will process + // the conversation on timeout or next connection. + if backendOwnsConversation { + log("Transcription: Backend owns conversation — skipping client-side upload (\(segmentsToUpload.count) segments streamed)") + if let sessionId = sessionId { + // Mark session as completed — no retry needed since backend has the data + try? await TranscriptionStorage.shared.markSessionCompleted(id: sessionId, backendId: "backend-owned") + } + return .saved + } + // Convert SpeakerSegment to API request format (include person_id from live naming) let speakerPersonMap = liveSpeakerPersonMap let apiSegments = segmentsToUpload.map { segment in From e80ec706e62f2d04883e7b7ca5a1bdbea078da2f Mon Sep 17 00:00:00 2001 From: beastoin Date: Fri, 6 Mar 2026 08:35:56 +0100 Subject: [PATCH 5/8] Use BackendTranscriptionService for push-to-talk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace direct Deepgram with backend service for live PTT. Remove batch transcription path entirely — backend handles STT server-side. Co-Authored-By: Claude Opus 4.6 --- .../PushToTalkManager.swift | 136 +++++------------- 1 file changed, 37 insertions(+), 99 deletions(-) diff --git a/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift b/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift index aee2f956d7..4156578928 100644 --- a/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift +++ b/desktop/Desktop/Sources/FloatingControlBar/PushToTalkManager.swift @@ -34,7 +34,7 @@ class PushToTalkManager: ObservableObject { private let doubleTapThreshold: TimeInterval = 0.4 // Transcription - private var transcriptionService: TranscriptionService? + private var transcriptionService: BackendTranscriptionService? private var audioCaptureService: AudioCaptureService? private var transcriptSegments: [String] = [] private var lastInterimText: String = "" @@ -302,58 +302,20 @@ class PushToTalkManager: ObservableObject { sound?.play() } - let isBatchMode = ShortcutSettings.shared.pttTranscriptionMode == .batch + // Flush remaining audio and wait for final transcript from backend + transcriptionService?.finishStream() + log("PushToTalkManager: finalizing — mic stopped, waiting for final transcript") - if isBatchMode { - // Batch mode: send accumulated audio to pre-recorded API - log("PushToTalkManager: finalizing (batch) — mic stopped, transcribing recorded audio") - batchAudioLock.lock() - let audioData = batchAudioBuffer - batchAudioBuffer = Data() - batchAudioLock.unlock() - - // Stop streaming service (was not used in batch mode, but clean up) - stopAudioTranscription() - - guard !audioData.isEmpty else { - log("PushToTalkManager: batch mode — no audio recorded") - sendTranscript() - return - } - - barState?.voiceTranscript = "Transcribing..." - - Task { - do { - let language = AssistantSettings.shared.effectiveTranscriptionLanguage - let transcript = try await TranscriptionService.batchTranscribe( - audioData: audioData, - language: language - ) - if let transcript, !transcript.isEmpty { - self.transcriptSegments = [transcript] - } - } catch { - logError("PushToTalkManager: batch transcription failed", error: error) - } + // Safety timeout: if backend doesn't send a final segment within 3s, send what we have + let timeout = DispatchWorkItem { [weak self] in + Task { @MainActor in + guard let self, self.state == .finalizing else { return } + log("PushToTalkManager: finalization timeout — sending transcript") self.sendTranscript() } - } else { - // Live mode: flush remaining audio and wait for final transcript from Deepgram - transcriptionService?.finishStream() - log("PushToTalkManager: finalizing (live) — mic stopped, waiting for final transcript") - - // Safety timeout: if Deepgram doesn't send a final segment within 3s, send what we have - let timeout = DispatchWorkItem { [weak self] in - Task { @MainActor in - guard let self, self.state == .finalizing else { return } - log("PushToTalkManager: live finalization timeout — sending transcript") - self.sendTranscript() - } - } - liveFinalizationTimeout = timeout - DispatchQueue.main.asyncAfter(deadline: .now() + 3.0, execute: timeout) } + liveFinalizationTimeout = timeout + DispatchQueue.main.asyncAfter(deadline: .now() + 3.0, execute: timeout) } private func sendTranscript() { @@ -421,50 +383,34 @@ class PushToTalkManager: ObservableObject { return } - let isBatchMode = ShortcutSettings.shared.pttTranscriptionMode == .batch + // Always use live streaming through the backend (no client-side batch mode) + startMicCapture() - if isBatchMode { - // Batch mode: just capture audio into buffer, no streaming connection - batchAudioLock.lock() - batchAudioBuffer = Data() - batchAudioLock.unlock() - startMicCapture(batchMode: true) - log("PushToTalkManager: started audio capture (batch mode)") - } else { - // Live mode: start mic capture and stream to Deepgram - startMicCapture() + let language = AssistantSettings.shared.effectiveTranscriptionLanguage + let service = BackendTranscriptionService(language: language) + transcriptionService = service - do { - let language = AssistantSettings.shared.effectiveTranscriptionLanguage - let service = try TranscriptionService(language: language, channels: 1) - transcriptionService = service - - service.start( - onTranscript: { [weak self] segment in - Task { @MainActor in - self?.handleTranscript(segment) - } - }, - onError: { [weak self] error in - Task { @MainActor in - logError("PushToTalkManager: transcription error", error: error) - self?.stopListening() - } - }, - onConnected: { - Task { @MainActor in - log("PushToTalkManager: DeepGram connected") - } - } - ) - } catch { - logError("PushToTalkManager: failed to create TranscriptionService", error: error) - stopListening() + service.start( + onTranscript: { [weak self] segment in + Task { @MainActor in + self?.handleTranscript(segment) + } + }, + onError: { [weak self] error in + Task { @MainActor in + logError("PushToTalkManager: transcription error", error: error) + self?.stopListening() + } + }, + onConnected: { + Task { @MainActor in + log("PushToTalkManager: backend connected") + } } - } + ) } - private func startMicCapture(batchMode: Bool = false) { + private func startMicCapture() { if audioCaptureService == nil { audioCaptureService = AudioCaptureService() } @@ -475,20 +421,12 @@ class PushToTalkManager: ObservableObject { do { try await capture.startCapture( onAudioChunk: { [weak self] audioData in - guard let self else { return } - if batchMode { - // Batch mode: accumulate audio in buffer - self.batchAudioLock.lock() - self.batchAudioBuffer.append(audioData) - self.batchAudioLock.unlock() - } else { - // Live mode: stream to Deepgram - self.transcriptionService?.sendAudio(audioData) - } + // Stream mono audio to backend + self?.transcriptionService?.sendAudio(audioData) }, onAudioLevel: { _ in } ) - log("PushToTalkManager: mic capture started (batch=\(batchMode))") + log("PushToTalkManager: mic capture started") } catch { logError("PushToTalkManager: mic capture failed", error: error) self.stopListening() From 3824c4a2bc7cee958dfb918659ed78d906bbd9be Mon Sep 17 00:00:00 2001 From: beastoin Date: Fri, 6 Mar 2026 08:36:00 +0100 Subject: [PATCH 6/8] Remove old transcriptionService parameter from AudioSourceManager Co-Authored-By: Claude Opus 4.6 --- desktop/Desktop/Sources/Audio/AudioSourceManager.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/desktop/Desktop/Sources/Audio/AudioSourceManager.swift b/desktop/Desktop/Sources/Audio/AudioSourceManager.swift index 9444be7d99..3247adc62c 100644 --- a/desktop/Desktop/Sources/Audio/AudioSourceManager.swift +++ b/desktop/Desktop/Sources/Audio/AudioSourceManager.swift @@ -301,7 +301,6 @@ final class AudioSourceManager: ObservableObject { // Start BLE audio processing with direct audio callback and WAL recording await bleAudioService.startProcessing( from: connection, - transcriptionService: nil, // We'll handle routing ourselves audioDataHandler: { [weak self] pcmData in // Convert decoded PCM mono to stereo and forward self?.handleBleAudio(pcmData) From 2152e699876b3281769236eb50e68d906b6c13ce Mon Sep 17 00:00:00 2001 From: beastoin Date: Fri, 6 Mar 2026 08:36:03 +0100 Subject: [PATCH 7/8] Remove DEEPGRAM_API_KEY from desktop .env.example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No longer needed — STT now routes through backend /v4/listen. Co-Authored-By: Claude Opus 4.6 --- desktop/.env.example | 3 --- 1 file changed, 3 deletions(-) diff --git a/desktop/.env.example b/desktop/.env.example index 87d0a94fb7..6c25ff1689 100644 --- a/desktop/.env.example +++ b/desktop/.env.example @@ -17,9 +17,6 @@ # Production: https://api.omi.me OMI_API_URL=http://localhost:8080 -# DeepGram API key — required for real-time transcription -DEEPGRAM_API_KEY= - # ─── AI (optional) ────────────────────────────────────────────────── # Gemini API key for proactive assistants and embeddings # Falls back to backend-side processing if not set From e2a88573499ae071b37ed3a095875de6c11fd68c Mon Sep 17 00:00:00 2001 From: beastoin Date: Fri, 6 Mar 2026 08:36:06 +0100 Subject: [PATCH 8/8] Add changelog entry for backend STT migration Co-Authored-By: Claude Opus 4.6 --- desktop/CHANGELOG.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/desktop/CHANGELOG.json b/desktop/CHANGELOG.json index 614cda711d..e9ad191f41 100644 --- a/desktop/CHANGELOG.json +++ b/desktop/CHANGELOG.json @@ -1,5 +1,7 @@ { - "unreleased": [], + "unreleased": [ + "Removed client-side Deepgram API key — transcription now routes securely through the Omi backend" + ], "releases": [ { "version": "0.11.90",