From 5dadba0b9f9faf3b74c5d5d9db6682028f80766d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 29 Apr 2025 13:30:37 +0200 Subject: [PATCH 01/43] Add subscriber timeout --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 475d23d24..a5e0a6311 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -37,6 +37,11 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { @objc public let recorder: LocalAudioTrackRecorder + /// The timeout for the remote participant to subscribe to the audio track. + /// If the remote participant does not subscribe to the audio track within this time, the audio buffer will be flushed. + @objc + public let timeout: TimeInterval + private let state = StateSync(State()) private struct State { var audioStream: LocalAudioTrackRecorder.Stream? @@ -53,10 +58,12 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { format: .pcmFormatInt16, // supported by agent plugins sampleRate: 24000, // supported by agent plugins maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB - )) + ), + timeout: TimeInterval = 5) { self.room = room self.recorder = recorder + self.timeout = timeout super.init() } @@ -98,6 +105,9 @@ extension PreConnectAudioBuffer: RoomDelegate { public func roomDidConnect(_ room: Room) { Task { try? await setParticipantAttribute(room: room) + + try? await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) + stopRecording(flush: true) } } From e7b0fea7a5dd4d09891f863ed03a05ad5e89b4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Wed, 30 Apr 2025 13:07:53 +0200 Subject: [PATCH 02/43] Send only to agents --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index a5e0a6311..c2ee24445 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -140,12 +140,14 @@ extension PreConnectAudioBuffer: RoomDelegate { throw LiveKitError(.invalidState, message: "Audio stream is nil") } + let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) let streamOptions = StreamByteOptions( topic: topic, attributes: [ "sampleRate": "\(recorder.sampleRate)", "channels": "\(recorder.channels)", - ] + ], + destinationIdentities: agentIdentities ) let writer = try await room.localParticipant.streamBytes(options: streamOptions) try await writer.write(audioStream.collect()) From 5230985f47c1d2e87cb452006d20fccf7ca87d75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Wed, 30 Apr 2025 13:08:52 +0200 Subject: [PATCH 03/43] Idempotence? --- Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index cd5ac1d67..28873d3c5 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -67,8 +67,8 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { /// - Returns: A stream of audio data. /// - Throws: An error if the audio track cannot be started. public func start() async throws -> Stream { - guard _state.continuation == nil else { - throw LiveKitError(.invalidState, message: "Cannot start the recorder multiple times.") + if let continuation = _state.continuation { + continuation.finish() } try await track.startCapture() From 1198a886a20b5ae7b8956bbcd7c95b6b7e96d1a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Wed, 30 Apr 2025 13:25:52 +0200 Subject: [PATCH 04/43] Don't set participant attr --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index c2ee24445..926d1310a 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -21,10 +21,6 @@ import Foundation /// and sends it on certain ``RoomDelegate`` events. @objc public final class PreConnectAudioBuffer: NSObject, Loggable { - /// The default participant attribute key used to indicate that the audio buffer is active. - @objc - public static let attributeKey = "lk.agent.pre-connect-audio" - /// The default data topic used to send the audio buffer. @objc public static let dataTopic = "lk.agent.pre-connect-audio-buffer" @@ -102,10 +98,8 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { // MARK: - RoomDelegate extension PreConnectAudioBuffer: RoomDelegate { - public func roomDidConnect(_ room: Room) { + public func roomDidConnect(_: Room) { Task { - try? await setParticipantAttribute(room: room) - try? await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) stopRecording(flush: true) } @@ -114,22 +108,14 @@ extension PreConnectAudioBuffer: RoomDelegate { public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) { stopRecording() Task { - try? await sendAudioData(to: room) + do { + try await sendAudioData(to: room) + } catch { + log("Unable to send audio: \(error)", .error) + } } } - /// Set the participant attribute to indicate that the audio buffer is active. - /// - Parameters: - /// - key: The key to set the attribute. - /// - room: The room instance to set the attribute. - @objc - public func setParticipantAttribute(key _: String = attributeKey, room: Room) async throws { - var attributes = room.localParticipant.attributes - attributes[Self.attributeKey] = "true" - try await room.localParticipant.set(attributes: attributes) - log("Set participant attribute", .info) - } - /// Send the audio data to the room. /// - Parameters: /// - room: The room instance to send the audio data. From 5801fa0b2c9f3afa4c241b7c3bad415189eea17f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Wed, 30 Apr 2025 15:33:23 +0200 Subject: [PATCH 05/43] WIP: Pass recorder's track --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 19 +++++++------- Sources/LiveKit/Core/Room.swift | 23 +++++++++++------ .../Recorders/LocalAudioTrackRecorder.swift | 4 +++ .../Types/Options/AudioCaptureOptions.swift | 25 +++++++++++++++++-- 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 926d1310a..879d3223c 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -48,17 +48,16 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { /// - room: The room instance to listen for events. /// - recorder: The audio recorder to use for capturing. @objc - public init(room: Room?, - recorder: LocalAudioTrackRecorder = LocalAudioTrackRecorder( - track: LocalAudioTrack.createTrack(), - format: .pcmFormatInt16, // supported by agent plugins - sampleRate: 24000, // supported by agent plugins - maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB - ), - timeout: TimeInterval = 5) - { + public init(room: Room?, timeout: TimeInterval = 5) { self.room = room - self.recorder = recorder + let roomOptions = room?._state.roomOptions + recorder = LocalAudioTrackRecorder( + track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions.withPreConnect(), + reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), + format: .pcmFormatInt16, // supported by agent plugins + sampleRate: 24000, // supported by agent plugins + maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB + ) self.timeout = timeout super.init() } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 8d7b994ce..e1dc3e592 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -354,13 +354,22 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { let enableMicrophone = _state.connectOptions.enableMicrophone log("Concurrent enable microphone mode: \(enableMicrophone)") - let createMicrophoneTrackTask: Task? = enableMicrophone ? Task { - let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions, - reportStatistics: _state.roomOptions.reportRemoteTrackStatistics) - // Initializes AudioDeviceModule's recording - try await localTrack.start() - return localTrack - } : nil + let createMicrophoneTrackTask: Task? = + if preConnectBuffer.recorder.isRecording { + Task { + preConnectBuffer.recorder.track + } + } else if enableMicrophone { + Task { + let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions, + reportStatistics: _state.roomOptions.reportRemoteTrackStatistics) + // Initializes AudioDeviceModule's recording + try await localTrack.start() + return localTrack + } + } else { + nil + } do { try await fullConnectSequence(url, token) diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index 28873d3c5..141b6bf40 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -43,6 +43,10 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { @objc public let maxSize: Int + var isRecording: Bool { + _state.continuation != nil + } + private let _state = StateSync(State()) private struct State { var continuation: Stream.Continuation? diff --git a/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift b/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift index d00b6bcf4..efac3602f 100644 --- a/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift +++ b/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift @@ -67,18 +67,23 @@ public final class AudioCaptureOptions: NSObject, CaptureOptions, Sendable { @objc public let typingNoiseDetection: Bool + @objc + public let preConnect: Bool + public init( echoCancellation: Bool = AudioCaptureOptions.defaultEchoCancellation, autoGainControl: Bool = AudioCaptureOptions.defaultAutoGainControl, noiseSuppression: Bool = AudioCaptureOptions.defaultNoiseSuppression, highpassFilter: Bool = false, - typingNoiseDetection: Bool = false + typingNoiseDetection: Bool = false, + preConnect: Bool = false ) { self.echoCancellation = echoCancellation self.noiseSuppression = noiseSuppression self.autoGainControl = autoGainControl self.typingNoiseDetection = typingNoiseDetection self.highpassFilter = highpassFilter + self.preConnect = preConnect } // MARK: - Equatable @@ -89,7 +94,8 @@ public final class AudioCaptureOptions: NSObject, CaptureOptions, Sendable { noiseSuppression == other.noiseSuppression && autoGainControl == other.autoGainControl && typingNoiseDetection == other.typingNoiseDetection && - highpassFilter == other.highpassFilter + highpassFilter == other.highpassFilter && + preConnect == other.preConnect } override public var hash: Int { @@ -99,6 +105,7 @@ public final class AudioCaptureOptions: NSObject, CaptureOptions, Sendable { hasher.combine(autoGainControl) hasher.combine(typingNoiseDetection) hasher.combine(highpassFilter) + hasher.combine(preConnect) return hasher.finalize() } } @@ -110,6 +117,20 @@ extension AudioCaptureOptions { echoCancellation ? .tfEchoCancellation : nil, noiseSuppression ? .tfNoiseSuppression : nil, autoGainControl ? .tfAutoGainControl : nil, + // TODO: Handle ].compactMap { $0 }) } } + +extension AudioCaptureOptions { + func withPreConnect() -> AudioCaptureOptions { + AudioCaptureOptions( + echoCancellation: echoCancellation, + autoGainControl: autoGainControl, + noiseSuppression: noiseSuppression, + highpassFilter: highpassFilter, + typingNoiseDetection: typingNoiseDetection, + preConnect: true + ) + } +} From d4ab70505b372b8a9c6dc09ec323f8c9fb415a15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 08:06:53 +0200 Subject: [PATCH 06/43] WIP: Pass trackId --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 879d3223c..c8a101c35 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -104,11 +104,11 @@ extension PreConnectAudioBuffer: RoomDelegate { } } - public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) { + public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack publication: LocalTrackPublication) { stopRecording() Task { do { - try await sendAudioData(to: room) + try await sendAudioData(to: room, track: publication.sid) } catch { log("Unable to send audio: \(error)", .error) } @@ -120,7 +120,7 @@ extension PreConnectAudioBuffer: RoomDelegate { /// - room: The room instance to send the audio data. /// - topic: The topic to send the audio data. @objc - public func sendAudioData(to room: Room, on topic: String = dataTopic) async throws { + public func sendAudioData(to room: Room, track: Track.Sid, on topic: String = dataTopic) async throws { guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") } @@ -131,6 +131,7 @@ extension PreConnectAudioBuffer: RoomDelegate { attributes: [ "sampleRate": "\(recorder.sampleRate)", "channels": "\(recorder.channels)", + "trackId": track.stringValue, ], destinationIdentities: agentIdentities ) From 7377a4bdb03a2a8fa585413fff832ca0c862157e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 10:56:00 +0200 Subject: [PATCH 07/43] WIP: Move timeout, handle empty data --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 32 +++++++++++-------- Sources/LiveKit/Core/Room+PreConnect.swift | 6 ++-- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index c8a101c35..0b4af151a 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -33,22 +33,17 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { @objc public let recorder: LocalAudioTrackRecorder - /// The timeout for the remote participant to subscribe to the audio track. - /// If the remote participant does not subscribe to the audio track within this time, the audio buffer will be flushed. - @objc - public let timeout: TimeInterval - private let state = StateSync(State()) private struct State { var audioStream: LocalAudioTrackRecorder.Stream? + var timeout: TimeInterval = 10 } /// Initialize the audio buffer with a room instance. /// - Parameters: /// - room: The room instance to listen for events. - /// - recorder: The audio recorder to use for capturing. @objc - public init(room: Room?, timeout: TimeInterval = 5) { + public init(room: Room?) { self.room = room let roomOptions = room?._state.roomOptions recorder = LocalAudioTrackRecorder( @@ -58,7 +53,6 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { sampleRate: 24000, // supported by agent plugins maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB ) - self.timeout = timeout super.init() } @@ -68,9 +62,12 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { } /// Start capturing audio and listening to ``RoomDelegate`` events. + /// - Parameters: + /// - timeout: The timeout for the remote participant to subscribe to the audio track. @objc - public func startRecording() async throws { + public func startRecording(timeout: TimeInterval = 10) async throws { room?.add(delegate: self) + state.mutate { $0.timeout = timeout } let stream = try await recorder.start() log("Started capturing audio", .info) @@ -84,6 +81,8 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { /// - flush: If `true`, the audio stream will be flushed immediately without sending. @objc public func stopRecording(flush: Bool = false) { + guard recorder.isRecording else { return } + recorder.stop() log("Stopped capturing audio", .info) if flush, let stream = state.audioStream { @@ -99,7 +98,7 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { extension PreConnectAudioBuffer: RoomDelegate { public func roomDidConnect(_: Room) { Task { - try? await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) + try? await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC) stopRecording(flush: true) } } @@ -121,10 +120,19 @@ extension PreConnectAudioBuffer: RoomDelegate { /// - topic: The topic to send the audio data. @objc public func sendAudioData(to room: Room, track: Track.Sid, on topic: String = dataTopic) async throws { + defer { + room.remove(delegate: self) + } + guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") } + let audioData = try await audioStream.collect() + guard !audioData.isEmpty else { + throw LiveKitError(.invalidState, message: "No audio data to send") + } + let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) let streamOptions = StreamByteOptions( topic: topic, @@ -136,10 +144,8 @@ extension PreConnectAudioBuffer: RoomDelegate { destinationIdentities: agentIdentities ) let writer = try await room.localParticipant.streamBytes(options: streamOptions) - try await writer.write(audioStream.collect()) + try await writer.write(audioData) try await writer.close() log("Sent audio data", .info) - - room.remove(delegate: self) } } diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index 83344015c..ab14f707c 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -21,9 +21,11 @@ public extension Room { /// so that it's not lost when the connection is established. /// It will be automatically sent via data stream to the other participant /// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed. + /// - Parameters: + /// - timeout: The timeout for the remote participant to subscribe to the audio track. /// - See: ``PreConnectAudioBuffer`` /// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early. - func startCapturingBeforeConnecting() async throws { - try await preConnectBuffer.startRecording() + func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws { + try await preConnectBuffer.startRecording(timeout: timeout) } } From 9c67eeb5aa1fbab0a4493f594b1a343f53d97733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 10:59:28 +0200 Subject: [PATCH 08/43] Proto v1.37.1 --- .../LiveKit/Protos/livekit_models.pb.swift | 7 + Sources/LiveKit/Protos/livekit_rtc.pb.swift | 342 ++++++++++++------ 2 files changed, 247 insertions(+), 102 deletions(-) diff --git a/Sources/LiveKit/Protos/livekit_models.pb.swift b/Sources/LiveKit/Protos/livekit_models.pb.swift index 9fa18fa56..02103a27f 100644 --- a/Sources/LiveKit/Protos/livekit_models.pb.swift +++ b/Sources/LiveKit/Protos/livekit_models.pb.swift @@ -611,6 +611,9 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { case tfEchoCancellation // = 3 case tfNoiseSuppression // = 4 case tfEnhancedNoiseCancellation // = 5 + + /// client will buffer audio once available and send it to the server via bytes stream once connected + case tfPreconnectBuffer // = 6 case UNRECOGNIZED(Int) init() { @@ -625,6 +628,7 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { case 3: self = .tfEchoCancellation case 4: self = .tfNoiseSuppression case 5: self = .tfEnhancedNoiseCancellation + case 6: self = .tfPreconnectBuffer default: self = .UNRECOGNIZED(rawValue) } } @@ -637,6 +641,7 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { case .tfEchoCancellation: return 3 case .tfNoiseSuppression: return 4 case .tfEnhancedNoiseCancellation: return 5 + case .tfPreconnectBuffer: return 6 case .UNRECOGNIZED(let i): return i } } @@ -649,6 +654,7 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { .tfEchoCancellation, .tfNoiseSuppression, .tfEnhancedNoiseCancellation, + .tfPreconnectBuffer, ] } @@ -2798,6 +2804,7 @@ extension Livekit_AudioTrackFeature: SwiftProtobuf._ProtoNameProviding { 3: .same(proto: "TF_ECHO_CANCELLATION"), 4: .same(proto: "TF_NOISE_SUPPRESSION"), 5: .same(proto: "TF_ENHANCED_NOISE_CANCELLATION"), + 6: .same(proto: "TF_PRECONNECT_BUFFER"), ] } diff --git a/Sources/LiveKit/Protos/livekit_rtc.pb.swift b/Sources/LiveKit/Protos/livekit_rtc.pb.swift index c98e47ab0..741d74a2c 100644 --- a/Sources/LiveKit/Protos/livekit_rtc.pb.swift +++ b/Sources/LiveKit/Protos/livekit_rtc.pb.swift @@ -618,54 +618,114 @@ struct Livekit_SimulcastCodec: Sendable { init() {} } -struct Livekit_AddTrackRequest: Sendable { +struct Livekit_AddTrackRequest: @unchecked Sendable { // SwiftProtobuf.Message conformance is added in an extension below. See the // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. /// client ID of track, to match it when RTC track is received - var cid: String = String() + var cid: String { + get {return _storage._cid} + set {_uniqueStorage()._cid = newValue} + } - var name: String = String() + var name: String { + get {return _storage._name} + set {_uniqueStorage()._name = newValue} + } - var type: Livekit_TrackType = .audio + var type: Livekit_TrackType { + get {return _storage._type} + set {_uniqueStorage()._type = newValue} + } /// to be deprecated in favor of layers - var width: UInt32 = 0 + var width: UInt32 { + get {return _storage._width} + set {_uniqueStorage()._width = newValue} + } - var height: UInt32 = 0 + var height: UInt32 { + get {return _storage._height} + set {_uniqueStorage()._height = newValue} + } /// true to add track and initialize to muted - var muted: Bool = false + var muted: Bool { + get {return _storage._muted} + set {_uniqueStorage()._muted = newValue} + } /// true if DTX (Discontinuous Transmission) is disabled for audio - var disableDtx: Bool = false + /// + /// NOTE: This field was marked as deprecated in the .proto file. + var disableDtx: Bool { + get {return _storage._disableDtx} + set {_uniqueStorage()._disableDtx = newValue} + } - var source: Livekit_TrackSource = .unknown + var source: Livekit_TrackSource { + get {return _storage._source} + set {_uniqueStorage()._source = newValue} + } - var layers: [Livekit_VideoLayer] = [] + var layers: [Livekit_VideoLayer] { + get {return _storage._layers} + set {_uniqueStorage()._layers = newValue} + } - var simulcastCodecs: [Livekit_SimulcastCodec] = [] + var simulcastCodecs: [Livekit_SimulcastCodec] { + get {return _storage._simulcastCodecs} + set {_uniqueStorage()._simulcastCodecs = newValue} + } /// server ID of track, publish new codec to exist track - var sid: String = String() + var sid: String { + get {return _storage._sid} + set {_uniqueStorage()._sid = newValue} + } - var stereo: Bool = false + /// deprecated in favor of audio_features + /// + /// NOTE: This field was marked as deprecated in the .proto file. + var stereo: Bool { + get {return _storage._stereo} + set {_uniqueStorage()._stereo = newValue} + } /// true if RED (Redundant Encoding) is disabled for audio - var disableRed: Bool = false + var disableRed: Bool { + get {return _storage._disableRed} + set {_uniqueStorage()._disableRed = newValue} + } - var encryption: Livekit_Encryption.TypeEnum = .none + var encryption: Livekit_Encryption.TypeEnum { + get {return _storage._encryption} + set {_uniqueStorage()._encryption = newValue} + } /// which stream the track belongs to, used to group tracks together. /// if not specified, server will infer it from track source to bundle camera/microphone, screenshare/audio together - var stream: String = String() + var stream: String { + get {return _storage._stream} + set {_uniqueStorage()._stream = newValue} + } + + var backupCodecPolicy: Livekit_BackupCodecPolicy { + get {return _storage._backupCodecPolicy} + set {_uniqueStorage()._backupCodecPolicy = newValue} + } - var backupCodecPolicy: Livekit_BackupCodecPolicy = .preferRegression + var audioFeatures: [Livekit_AudioTrackFeature] { + get {return _storage._audioFeatures} + set {_uniqueStorage()._audioFeatures = newValue} + } var unknownFields = SwiftProtobuf.UnknownStorage() init() {} + + fileprivate var _storage = _StorageClass.defaultInstance } struct Livekit_TrickleRequest: Sendable { @@ -2446,104 +2506,182 @@ extension Livekit_AddTrackRequest: SwiftProtobuf.Message, SwiftProtobuf._Message 14: .same(proto: "encryption"), 15: .same(proto: "stream"), 16: .standard(proto: "backup_codec_policy"), + 17: .standard(proto: "audio_features"), ] + fileprivate class _StorageClass { + var _cid: String = String() + var _name: String = String() + var _type: Livekit_TrackType = .audio + var _width: UInt32 = 0 + var _height: UInt32 = 0 + var _muted: Bool = false + var _disableDtx: Bool = false + var _source: Livekit_TrackSource = .unknown + var _layers: [Livekit_VideoLayer] = [] + var _simulcastCodecs: [Livekit_SimulcastCodec] = [] + var _sid: String = String() + var _stereo: Bool = false + var _disableRed: Bool = false + var _encryption: Livekit_Encryption.TypeEnum = .none + var _stream: String = String() + var _backupCodecPolicy: Livekit_BackupCodecPolicy = .preferRegression + var _audioFeatures: [Livekit_AudioTrackFeature] = [] + + #if swift(>=5.10) + // This property is used as the initial default value for new instances of the type. + // The type itself is protecting the reference to its storage via CoW semantics. + // This will force a copy to be made of this reference when the first mutation occurs; + // hence, it is safe to mark this as `nonisolated(unsafe)`. + static nonisolated(unsafe) let defaultInstance = _StorageClass() + #else + static let defaultInstance = _StorageClass() + #endif + + private init() {} + + init(copying source: _StorageClass) { + _cid = source._cid + _name = source._name + _type = source._type + _width = source._width + _height = source._height + _muted = source._muted + _disableDtx = source._disableDtx + _source = source._source + _layers = source._layers + _simulcastCodecs = source._simulcastCodecs + _sid = source._sid + _stereo = source._stereo + _disableRed = source._disableRed + _encryption = source._encryption + _stream = source._stream + _backupCodecPolicy = source._backupCodecPolicy + _audioFeatures = source._audioFeatures + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularStringField(value: &self.cid) }() - case 2: try { try decoder.decodeSingularStringField(value: &self.name) }() - case 3: try { try decoder.decodeSingularEnumField(value: &self.type) }() - case 4: try { try decoder.decodeSingularUInt32Field(value: &self.width) }() - case 5: try { try decoder.decodeSingularUInt32Field(value: &self.height) }() - case 6: try { try decoder.decodeSingularBoolField(value: &self.muted) }() - case 7: try { try decoder.decodeSingularBoolField(value: &self.disableDtx) }() - case 8: try { try decoder.decodeSingularEnumField(value: &self.source) }() - case 9: try { try decoder.decodeRepeatedMessageField(value: &self.layers) }() - case 10: try { try decoder.decodeRepeatedMessageField(value: &self.simulcastCodecs) }() - case 11: try { try decoder.decodeSingularStringField(value: &self.sid) }() - case 12: try { try decoder.decodeSingularBoolField(value: &self.stereo) }() - case 13: try { try decoder.decodeSingularBoolField(value: &self.disableRed) }() - case 14: try { try decoder.decodeSingularEnumField(value: &self.encryption) }() - case 15: try { try decoder.decodeSingularStringField(value: &self.stream) }() - case 16: try { try decoder.decodeSingularEnumField(value: &self.backupCodecPolicy) }() - default: break + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularStringField(value: &_storage._cid) }() + case 2: try { try decoder.decodeSingularStringField(value: &_storage._name) }() + case 3: try { try decoder.decodeSingularEnumField(value: &_storage._type) }() + case 4: try { try decoder.decodeSingularUInt32Field(value: &_storage._width) }() + case 5: try { try decoder.decodeSingularUInt32Field(value: &_storage._height) }() + case 6: try { try decoder.decodeSingularBoolField(value: &_storage._muted) }() + case 7: try { try decoder.decodeSingularBoolField(value: &_storage._disableDtx) }() + case 8: try { try decoder.decodeSingularEnumField(value: &_storage._source) }() + case 9: try { try decoder.decodeRepeatedMessageField(value: &_storage._layers) }() + case 10: try { try decoder.decodeRepeatedMessageField(value: &_storage._simulcastCodecs) }() + case 11: try { try decoder.decodeSingularStringField(value: &_storage._sid) }() + case 12: try { try decoder.decodeSingularBoolField(value: &_storage._stereo) }() + case 13: try { try decoder.decodeSingularBoolField(value: &_storage._disableRed) }() + case 14: try { try decoder.decodeSingularEnumField(value: &_storage._encryption) }() + case 15: try { try decoder.decodeSingularStringField(value: &_storage._stream) }() + case 16: try { try decoder.decodeSingularEnumField(value: &_storage._backupCodecPolicy) }() + case 17: try { try decoder.decodeRepeatedEnumField(value: &_storage._audioFeatures) }() + default: break + } } } } func traverse(visitor: inout V) throws { - if !self.cid.isEmpty { - try visitor.visitSingularStringField(value: self.cid, fieldNumber: 1) - } - if !self.name.isEmpty { - try visitor.visitSingularStringField(value: self.name, fieldNumber: 2) - } - if self.type != .audio { - try visitor.visitSingularEnumField(value: self.type, fieldNumber: 3) - } - if self.width != 0 { - try visitor.visitSingularUInt32Field(value: self.width, fieldNumber: 4) - } - if self.height != 0 { - try visitor.visitSingularUInt32Field(value: self.height, fieldNumber: 5) - } - if self.muted != false { - try visitor.visitSingularBoolField(value: self.muted, fieldNumber: 6) - } - if self.disableDtx != false { - try visitor.visitSingularBoolField(value: self.disableDtx, fieldNumber: 7) - } - if self.source != .unknown { - try visitor.visitSingularEnumField(value: self.source, fieldNumber: 8) - } - if !self.layers.isEmpty { - try visitor.visitRepeatedMessageField(value: self.layers, fieldNumber: 9) - } - if !self.simulcastCodecs.isEmpty { - try visitor.visitRepeatedMessageField(value: self.simulcastCodecs, fieldNumber: 10) - } - if !self.sid.isEmpty { - try visitor.visitSingularStringField(value: self.sid, fieldNumber: 11) - } - if self.stereo != false { - try visitor.visitSingularBoolField(value: self.stereo, fieldNumber: 12) - } - if self.disableRed != false { - try visitor.visitSingularBoolField(value: self.disableRed, fieldNumber: 13) - } - if self.encryption != .none { - try visitor.visitSingularEnumField(value: self.encryption, fieldNumber: 14) - } - if !self.stream.isEmpty { - try visitor.visitSingularStringField(value: self.stream, fieldNumber: 15) - } - if self.backupCodecPolicy != .preferRegression { - try visitor.visitSingularEnumField(value: self.backupCodecPolicy, fieldNumber: 16) + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if !_storage._cid.isEmpty { + try visitor.visitSingularStringField(value: _storage._cid, fieldNumber: 1) + } + if !_storage._name.isEmpty { + try visitor.visitSingularStringField(value: _storage._name, fieldNumber: 2) + } + if _storage._type != .audio { + try visitor.visitSingularEnumField(value: _storage._type, fieldNumber: 3) + } + if _storage._width != 0 { + try visitor.visitSingularUInt32Field(value: _storage._width, fieldNumber: 4) + } + if _storage._height != 0 { + try visitor.visitSingularUInt32Field(value: _storage._height, fieldNumber: 5) + } + if _storage._muted != false { + try visitor.visitSingularBoolField(value: _storage._muted, fieldNumber: 6) + } + if _storage._disableDtx != false { + try visitor.visitSingularBoolField(value: _storage._disableDtx, fieldNumber: 7) + } + if _storage._source != .unknown { + try visitor.visitSingularEnumField(value: _storage._source, fieldNumber: 8) + } + if !_storage._layers.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._layers, fieldNumber: 9) + } + if !_storage._simulcastCodecs.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._simulcastCodecs, fieldNumber: 10) + } + if !_storage._sid.isEmpty { + try visitor.visitSingularStringField(value: _storage._sid, fieldNumber: 11) + } + if _storage._stereo != false { + try visitor.visitSingularBoolField(value: _storage._stereo, fieldNumber: 12) + } + if _storage._disableRed != false { + try visitor.visitSingularBoolField(value: _storage._disableRed, fieldNumber: 13) + } + if _storage._encryption != .none { + try visitor.visitSingularEnumField(value: _storage._encryption, fieldNumber: 14) + } + if !_storage._stream.isEmpty { + try visitor.visitSingularStringField(value: _storage._stream, fieldNumber: 15) + } + if _storage._backupCodecPolicy != .preferRegression { + try visitor.visitSingularEnumField(value: _storage._backupCodecPolicy, fieldNumber: 16) + } + if !_storage._audioFeatures.isEmpty { + try visitor.visitPackedEnumField(value: _storage._audioFeatures, fieldNumber: 17) + } } try unknownFields.traverse(visitor: &visitor) } static func ==(lhs: Livekit_AddTrackRequest, rhs: Livekit_AddTrackRequest) -> Bool { - if lhs.cid != rhs.cid {return false} - if lhs.name != rhs.name {return false} - if lhs.type != rhs.type {return false} - if lhs.width != rhs.width {return false} - if lhs.height != rhs.height {return false} - if lhs.muted != rhs.muted {return false} - if lhs.disableDtx != rhs.disableDtx {return false} - if lhs.source != rhs.source {return false} - if lhs.layers != rhs.layers {return false} - if lhs.simulcastCodecs != rhs.simulcastCodecs {return false} - if lhs.sid != rhs.sid {return false} - if lhs.stereo != rhs.stereo {return false} - if lhs.disableRed != rhs.disableRed {return false} - if lhs.encryption != rhs.encryption {return false} - if lhs.stream != rhs.stream {return false} - if lhs.backupCodecPolicy != rhs.backupCodecPolicy {return false} + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._cid != rhs_storage._cid {return false} + if _storage._name != rhs_storage._name {return false} + if _storage._type != rhs_storage._type {return false} + if _storage._width != rhs_storage._width {return false} + if _storage._height != rhs_storage._height {return false} + if _storage._muted != rhs_storage._muted {return false} + if _storage._disableDtx != rhs_storage._disableDtx {return false} + if _storage._source != rhs_storage._source {return false} + if _storage._layers != rhs_storage._layers {return false} + if _storage._simulcastCodecs != rhs_storage._simulcastCodecs {return false} + if _storage._sid != rhs_storage._sid {return false} + if _storage._stereo != rhs_storage._stereo {return false} + if _storage._disableRed != rhs_storage._disableRed {return false} + if _storage._encryption != rhs_storage._encryption {return false} + if _storage._stream != rhs_storage._stream {return false} + if _storage._backupCodecPolicy != rhs_storage._backupCodecPolicy {return false} + if _storage._audioFeatures != rhs_storage._audioFeatures {return false} + return true + } + if !storagesAreEqual {return false} + } if lhs.unknownFields != rhs.unknownFields {return false} return true } From c0124c10a995a231f38b7c0106d773c875e5e518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 11:00:53 +0200 Subject: [PATCH 09/43] Track prop --- Sources/LiveKit/Types/Options/AudioCaptureOptions.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift b/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift index efac3602f..e24132f84 100644 --- a/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift +++ b/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift @@ -117,7 +117,7 @@ extension AudioCaptureOptions { echoCancellation ? .tfEchoCancellation : nil, noiseSuppression ? .tfNoiseSuppression : nil, autoGainControl ? .tfAutoGainControl : nil, - // TODO: Handle + preConnect ? .tfPreconnectBuffer : nil, ].compactMap { $0 }) } } From 2bc4f736f276e18ada4519982e8dbe15ca26bed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 12:28:31 +0200 Subject: [PATCH 10/43] Size --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 0b4af151a..d9df889e9 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -66,14 +66,15 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { /// - timeout: The timeout for the remote participant to subscribe to the audio track. @objc public func startRecording(timeout: TimeInterval = 10) async throws { - room?.add(delegate: self) - state.mutate { $0.timeout = timeout } - let stream = try await recorder.start() log("Started capturing audio", .info) + state.mutate { state in state.audioStream = stream + state.timeout = timeout } + + room?.add(delegate: self) } /// Stop capturing audio. @@ -85,6 +86,7 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { recorder.stop() log("Stopped capturing audio", .info) + if flush, let stream = state.audioStream { Task { for await _ in stream {} @@ -129,8 +131,8 @@ extension PreConnectAudioBuffer: RoomDelegate { } let audioData = try await audioStream.collect() - guard !audioData.isEmpty else { - throw LiveKitError(.invalidState, message: "No audio data to send") + guard audioData.count > 1024 else { + throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send") } let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) From 39d1569c24a226a1f02f101a92cfbadbf574cdbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 12:35:28 +0200 Subject: [PATCH 11/43] Move methods and override mic state --- .../LiveKit/Participant/LocalParticipant.swift | 8 ++++++++ Sources/LiveKit/Participant/Participant.swift | 18 +++++++++--------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 2be273978..68dba0f67 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -225,6 +225,14 @@ public class LocalParticipant: Participant, @unchecked Sendable { return didUpdate } + override public func isMicrophoneEnabled() -> Bool { + if let room = _room, room.preConnectBuffer.recorder.isRecording { + return true + } else { + return super.isMicrophoneEnabled() + } + } + // MARK: - Broadcast Activation #if os(iOS) diff --git a/Sources/LiveKit/Participant/Participant.swift b/Sources/LiveKit/Participant/Participant.swift index 1f5955480..c14f332e9 100644 --- a/Sources/LiveKit/Participant/Participant.swift +++ b/Sources/LiveKit/Participant/Participant.swift @@ -259,29 +259,29 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga return true } -} - -// MARK: - Simplified API -public extension Participant { - func isCameraEnabled() -> Bool { + public func isCameraEnabled() -> Bool { !(getTrackPublication(source: .camera)?.isMuted ?? true) } - func isMicrophoneEnabled() -> Bool { + public func isMicrophoneEnabled() -> Bool { !(getTrackPublication(source: .microphone)?.isMuted ?? true) } - func isScreenShareEnabled() -> Bool { + public func isScreenShareEnabled() -> Bool { !(getTrackPublication(source: .screenShareVideo)?.isMuted ?? true) } +} - internal func getTrackPublication(name: String) -> TrackPublication? { +// MARK: - Simplified API + +extension Participant { + func getTrackPublication(name: String) -> TrackPublication? { _state.trackPublications.values.first(where: { $0.name == name }) } /// find the first publication matching `source` or any compatible. - internal func getTrackPublication(source: Track.Source) -> TrackPublication? { + func getTrackPublication(source: Track.Source) -> TrackPublication? { // if source is unknown return nil guard source != .unknown else { return nil } // try to find a Publication with matching source From 21dcfcb8a0b9adc914a0e2a7c6930987b776045b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 12:48:58 +0200 Subject: [PATCH 12/43] Old Swift, log --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 4 ++-- Sources/LiveKit/Core/Room.swift | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index d9df889e9..43bab01d6 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -85,7 +85,7 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { guard recorder.isRecording else { return } recorder.stop() - log("Stopped capturing audio", .info) + log(flush ? "Flushing audio stream, no subscribers" : "Stopped capturing audio", .info) if flush, let stream = state.audioStream { Task { @@ -148,6 +148,6 @@ extension PreConnectAudioBuffer: RoomDelegate { let writer = try await room.localParticipant.streamBytes(options: streamOptions) try await writer.write(audioData) try await writer.close() - log("Sent audio data", .info) + log("Sent \(audioData.count / 1024) KB of audio data to \(agentIdentities.count) agent(s) ", .info) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index e1dc3e592..9713e9b3e 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -354,13 +354,13 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { let enableMicrophone = _state.connectOptions.enableMicrophone log("Concurrent enable microphone mode: \(enableMicrophone)") - let createMicrophoneTrackTask: Task? = + let createMicrophoneTrackTask: Task? = { if preConnectBuffer.recorder.isRecording { - Task { + return Task { preConnectBuffer.recorder.track } } else if enableMicrophone { - Task { + return Task { let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions, reportStatistics: _state.roomOptions.reportRemoteTrackStatistics) // Initializes AudioDeviceModule's recording @@ -368,8 +368,9 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { return localTrack } } else { - nil + return nil } + }() do { try await fullConnectSequence(url, token) From 7d566b0b803fa9b36a44aa8b72f183627595ebbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 12:58:56 +0200 Subject: [PATCH 13/43] Move agent checks --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 43bab01d6..5683ffe08 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -122,9 +122,8 @@ extension PreConnectAudioBuffer: RoomDelegate { /// - topic: The topic to send the audio data. @objc public func sendAudioData(to room: Room, track: Track.Sid, on topic: String = dataTopic) async throws { - defer { - room.remove(delegate: self) - } + let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) + guard !agentIdentities.isEmpty else { return } guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") @@ -135,7 +134,10 @@ extension PreConnectAudioBuffer: RoomDelegate { throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send") } - let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) + defer { + room.remove(delegate: self) + } + let streamOptions = StreamByteOptions( topic: topic, attributes: [ From 39002d3786b0e6e125cd89671c4293971e56c809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 13:01:05 +0200 Subject: [PATCH 14/43] Remove attr test --- .../PreConnectAudioBufferTests.swift | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/Tests/LiveKitTests/PreConnectAudioBufferTests.swift b/Tests/LiveKitTests/PreConnectAudioBufferTests.swift index fbdaa298a..866ac0145 100644 --- a/Tests/LiveKitTests/PreConnectAudioBufferTests.swift +++ b/Tests/LiveKitTests/PreConnectAudioBufferTests.swift @@ -18,40 +18,6 @@ import XCTest class PreConnectAudioBufferTests: LKTestCase { - func testRoomDidConnectSetsParticipantAttribute() async throws { - let attributeSetExpectation = expectation(description: "Participant attribute set") - - class AttributeDelegate: RoomDelegate, @unchecked Sendable { - let expectation: XCTestExpectation - var attributeValue: String? - - init(expectation: XCTestExpectation) { - self.expectation = expectation - } - - func room(_: Room, participant: Participant, didUpdateAttributes _: [String: String]) { - if let value = participant.attributes[PreConnectAudioBuffer.attributeKey] { - attributeValue = value - expectation.fulfill() - } - } - } - - let delegate = AttributeDelegate(expectation: attributeSetExpectation) - - try await withRooms([RoomTestingOptions(delegate: delegate)]) { rooms in - let room = rooms[0] - let buffer = PreConnectAudioBuffer(room: room) - - buffer.roomDidConnect(room) - - await self.fulfillment(of: [attributeSetExpectation], timeout: 5) - - XCTAssertEqual(delegate.attributeValue, "true") - XCTAssertEqual(room.localParticipant.attributes[PreConnectAudioBuffer.attributeKey], "true") - } - } - func testRemoteDidSubscribeTrackSendsAudioData() async throws { let receiveExpectation = expectation(description: "Receives audio data") From ec1888100b374a07e66929b73b2744b7fe0e8e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 13:21:30 +0200 Subject: [PATCH 15/43] Move Room, Sendable --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 9 ++++++--- .../Recorders/LocalAudioTrackRecorder.swift | 18 +++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 5683ffe08..6cedc22ce 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -20,14 +20,14 @@ import Foundation /// A buffer that captures audio before connecting to the server, /// and sends it on certain ``RoomDelegate`` events. @objc -public final class PreConnectAudioBuffer: NSObject, Loggable { +public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// The default data topic used to send the audio buffer. @objc public static let dataTopic = "lk.agent.pre-connect-audio-buffer" /// The room instance to listen for events. @objc - public let room: Room? + public var room: Room? { state.room } /// The audio recorder instance. @objc @@ -35,6 +35,7 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { private let state = StateSync(State()) private struct State { + weak var room: Room? var audioStream: LocalAudioTrackRecorder.Stream? var timeout: TimeInterval = 10 } @@ -44,7 +45,8 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { /// - room: The room instance to listen for events. @objc public init(room: Room?) { - self.room = room + state.mutate { $0.room = room } + let roomOptions = room?._state.roomOptions recorder = LocalAudioTrackRecorder( track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions.withPreConnect(), @@ -53,6 +55,7 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { sampleRate: 24000, // supported by agent plugins maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB ) + super.init() } diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index 141b6bf40..18d95de12 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -20,7 +20,7 @@ import Foundation /// A class that captures audio from a local track and streams it as a data stream /// in a selected format that can be sent to other participants via ``ByteStreamWriter``. @objc -public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { +public final class LocalAudioTrackRecorder: NSObject, Sendable, AudioRenderer { public typealias Stream = AsyncStream /// The local audio track to capture audio from. @@ -44,10 +44,10 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { public let maxSize: Int var isRecording: Bool { - _state.continuation != nil + state.continuation != nil } - private let _state = StateSync(State()) + private let state = StateSync(State()) private struct State { var continuation: Stream.Continuation? } @@ -71,7 +71,7 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { /// - Returns: A stream of audio data. /// - Throws: An error if the audio track cannot be started. public func start() async throws -> Stream { - if let continuation = _state.continuation { + if let continuation = state.continuation { continuation.finish() } @@ -80,14 +80,14 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { let buffer: Stream.Continuation.BufferingPolicy = maxSize > 0 ? .bufferingNewest(maxSize) : .unbounded let stream = Stream(bufferingPolicy: buffer) { continuation in - self._state.mutate { + self.state.mutate { $0.continuation = continuation } } - _state.continuation?.onTermination = { @Sendable (_: Stream.Continuation.Termination) in + state.continuation?.onTermination = { @Sendable (_: Stream.Continuation.Termination) in self.track.remove(audioRenderer: self) - self._state.mutate { + self.state.mutate { $0.continuation = nil } } @@ -98,7 +98,7 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { /// Stops capturing audio from the local track. @objc public func stop() { - _state.continuation?.finish() + state.continuation?.finish() } } @@ -111,7 +111,7 @@ public extension LocalAudioTrackRecorder { .convert(toCommonFormat: format)? .toData() { - _state.continuation?.yield(data) + state.continuation?.yield(data) } } } From 1dba3858d549f2164dbbdcf2ec760846329fcafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 5 May 2025 13:21:37 +0200 Subject: [PATCH 16/43] Expose stop --- Sources/LiveKit/Core/Room+PreConnect.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index ab14f707c..708a07a4e 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -28,4 +28,12 @@ public extension Room { func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws { try await preConnectBuffer.startRecording(timeout: timeout) } + + /// Explicitly stop capturing pre-connect audio. + /// - Parameters: + /// - flush: If `true`, the audio stream will be flushed immediately without sending. + /// - Note: Use ``Room/startCapturingBeforeConnecting(timeout:)`` with a timeout to automatically stop capturing. + func stopCapturingBeforeConnecting(flush: Bool = false) { + preConnectBuffer.stopRecording(flush: flush) + } } From f8f2bf39e9e180a56aa00cf244b8bf8e7329825d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 6 May 2025 08:18:30 +0200 Subject: [PATCH 17/43] Check track features --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 6cedc22ce..583fb86df 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -88,9 +88,10 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { guard recorder.isRecording else { return } recorder.stop() - log(flush ? "Flushing audio stream, no subscribers" : "Stopped capturing audio", .info) + log("Stopped capturing audio", .info) if flush, let stream = state.audioStream { + log("Flushing audio stream", .info) Task { for await _ in stream {} } @@ -109,6 +110,10 @@ extension PreConnectAudioBuffer: RoomDelegate { } public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack publication: LocalTrackPublication) { + guard let trackFeatures = publication._state.audioTrackFeatures, trackFeatures.contains(.tfPreconnectBuffer) else { + log("No preconnectBuffer feature set for track: \(publication.sid)", .info) + return + } stopRecording() Task { do { From 9e1f56b1affe49fe41743bf5774e09743e7de189 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 6 May 2025 08:48:34 +0200 Subject: [PATCH 18/43] Alternative API --- Sources/LiveKit/Core/Room+PreConnect.swift | 44 ++++++++++++++++------ Sources/LiveKit/Core/Room.swift | 4 -- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index 708a07a4e..b05a3643f 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -17,23 +17,43 @@ import Foundation public extension Room { - /// Start capturing audio before connecting to the server, - /// so that it's not lost when the connection is established. - /// It will be automatically sent via data stream to the other participant - /// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed. + /// Starts a pre-connect audio sequence that will automatically be cleaned up + /// when the operation fails. + /// /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. + /// - operation: The operation to perform while audio is being captured. + /// - Returns: The result of the operation. + /// + /// - Example: + /// ```swift + /// try await room.withPreConnectAudio { + /// // Perform any other (async) setup... + /// guard let connectionDetails = try await tokenService.fetchConnectionDetails(roomName: roomName, participantName: participantName) else { + /// return + /// } + /// try await room.connect(url: connectionDetails.serverUrl, token: connectionDetails.participantToken) + /// } + /// ``` + /// /// - See: ``PreConnectAudioBuffer`` - /// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early. - func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws { + /// - Important: Call ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` during app launch sequence to request microphone permissions early. + /// + func withPreConnectAudio(timeout: TimeInterval = 10, + _ operation: @Sendable @escaping () async throws -> T) async throws -> T + { try await preConnectBuffer.startRecording(timeout: timeout) + + do { + return try await operation() + } catch { + preConnectBuffer.stopRecording(flush: true) + throw error + } } - /// Explicitly stop capturing pre-connect audio. - /// - Parameters: - /// - flush: If `true`, the audio stream will be flushed immediately without sending. - /// - Note: Use ``Room/startCapturingBeforeConnecting(timeout:)`` with a timeout to automatically stop capturing. - func stopCapturingBeforeConnecting(flush: Bool = false) { - preConnectBuffer.stopRecording(flush: flush) + @available(*, deprecated, message: "Use withPreConnectAudio instead") + func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws { + try await preConnectBuffer.startRecording(timeout: timeout) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 9713e9b3e..7a41e8744 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -447,10 +447,6 @@ extension Room { e2eeManager.cleanUp() } - if disconnectError != nil { - preConnectBuffer.stopRecording(flush: true) - } - // Reset state _state.mutate { // if isFullReconnect, keep connection related states From a4d0b2acfb2b375cdf4e201d51289c44b2e82111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 6 May 2025 10:24:51 +0200 Subject: [PATCH 19/43] Log duration --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 2 +- Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift | 13 +++++++++++++ .../Track/Recorders/LocalAudioTrackRecorder.swift | 6 ++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 583fb86df..ae5176907 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -158,6 +158,6 @@ extension PreConnectAudioBuffer: RoomDelegate { let writer = try await room.localParticipant.streamBytes(options: streamOptions) try await writer.write(audioData) try await writer.close() - log("Sent \(audioData.count / 1024) KB of audio data to \(agentIdentities.count) agent(s) ", .info) + log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agentIdentities.count) agent(s) ", .info) } } diff --git a/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift b/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift index 933a44401..aadd03bc0 100644 --- a/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift +++ b/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift @@ -167,3 +167,16 @@ public extension AVAudioPCMBuffer { } } } + +extension AVAudioCommonFormat { + var bytesPerSample: Int { + switch self { + case .pcmFormatInt16: return 2 + case .pcmFormatInt32: return 4 + case .pcmFormatFloat32: return 4 + case .pcmFormatFloat64: return 8 + case .otherFormat: return 0 + @unknown default: return 0 + } + } +} diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index 18d95de12..67a01970e 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -100,6 +100,12 @@ public final class LocalAudioTrackRecorder: NSObject, Sendable, AudioRenderer { public func stop() { state.continuation?.finish() } + + func duration(_ dataSize: Int) -> TimeInterval { + let totalSamples = dataSize / format.bytesPerSample + let samplesPerChannel = totalSamples / channels + return Double(samplesPerChannel) / Double(sampleRate) + } } // MARK: - AudioRenderer From 2a9bd37eb379bc4656a358d03316bafdcf04f57e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 6 May 2025 10:38:08 +0200 Subject: [PATCH 20/43] Cmt --- Sources/LiveKit/Core/Room+PreConnect.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index b05a3643f..dce9a569e 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -28,7 +28,8 @@ public extension Room { /// - Example: /// ```swift /// try await room.withPreConnectAudio { - /// // Perform any other (async) setup... + /// // Audio is being captured automatically + /// // Perform any other (async) setup here /// guard let connectionDetails = try await tokenService.fetchConnectionDetails(roomName: roomName, participantName: participantName) else { /// return /// } From c27b7bde00195726c949baa792a74b8dc15b03f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 6 May 2025 14:30:34 +0200 Subject: [PATCH 21/43] Move preConnect to publish options --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 6 +---- Sources/LiveKit/Core/Room.swift | 2 +- .../Participant/LocalParticipant.swift | 5 ++-- .../Types/Options/AudioCaptureOptions.swift | 25 ++----------------- .../Types/Options/AudioPublishOptions.swift | 25 +++++++++++++++++-- 5 files changed, 30 insertions(+), 33 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index ae5176907..016eebb20 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -49,7 +49,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { let roomOptions = room?._state.roomOptions recorder = LocalAudioTrackRecorder( - track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions.withPreConnect(), + track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), format: .pcmFormatInt16, // supported by agent plugins sampleRate: 24000, // supported by agent plugins @@ -110,10 +110,6 @@ extension PreConnectAudioBuffer: RoomDelegate { } public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack publication: LocalTrackPublication) { - guard let trackFeatures = publication._state.audioTrackFeatures, trackFeatures.contains(.tfPreconnectBuffer) else { - log("No preconnectBuffer feature set for track: \(publication.sid)", .info) - return - } stopRecording() Task { do { diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 7a41e8744..6bb00c048 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -377,7 +377,7 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled { let track = try await createMicrophoneTrackTask.value - try await localParticipant._publish(track: track) + try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect()) } // Connect sequence successful diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 68dba0f67..8ff634035 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -507,9 +507,9 @@ extension [Livekit_SubscribedQuality] { // MARK: - Private -private extension LocalParticipant { +extension LocalParticipant { @discardableResult - internal func _publish(track: LocalTrack, options: TrackPublishOptions? = nil) async throws -> LocalTrackPublication { + func _publish(track: LocalTrack, options: TrackPublishOptions? = nil) async throws -> LocalTrackPublication { log("[publish] \(track) options: \(String(describing: options ?? nil))...", .info) try checkPermissions(toPublish: track) @@ -618,6 +618,7 @@ private extension LocalParticipant { populatorFunc = { populator in populator.disableDtx = !audioPublishOptions.dtx populator.disableRed = !audioPublishOptions.red + populator.audioFeatures = Array(audioPublishOptions.toFeatures()) if let streamName = options?.streamName { // Set stream name if specified in options diff --git a/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift b/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift index e24132f84..d00b6bcf4 100644 --- a/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift +++ b/Sources/LiveKit/Types/Options/AudioCaptureOptions.swift @@ -67,23 +67,18 @@ public final class AudioCaptureOptions: NSObject, CaptureOptions, Sendable { @objc public let typingNoiseDetection: Bool - @objc - public let preConnect: Bool - public init( echoCancellation: Bool = AudioCaptureOptions.defaultEchoCancellation, autoGainControl: Bool = AudioCaptureOptions.defaultAutoGainControl, noiseSuppression: Bool = AudioCaptureOptions.defaultNoiseSuppression, highpassFilter: Bool = false, - typingNoiseDetection: Bool = false, - preConnect: Bool = false + typingNoiseDetection: Bool = false ) { self.echoCancellation = echoCancellation self.noiseSuppression = noiseSuppression self.autoGainControl = autoGainControl self.typingNoiseDetection = typingNoiseDetection self.highpassFilter = highpassFilter - self.preConnect = preConnect } // MARK: - Equatable @@ -94,8 +89,7 @@ public final class AudioCaptureOptions: NSObject, CaptureOptions, Sendable { noiseSuppression == other.noiseSuppression && autoGainControl == other.autoGainControl && typingNoiseDetection == other.typingNoiseDetection && - highpassFilter == other.highpassFilter && - preConnect == other.preConnect + highpassFilter == other.highpassFilter } override public var hash: Int { @@ -105,7 +99,6 @@ public final class AudioCaptureOptions: NSObject, CaptureOptions, Sendable { hasher.combine(autoGainControl) hasher.combine(typingNoiseDetection) hasher.combine(highpassFilter) - hasher.combine(preConnect) return hasher.finalize() } } @@ -117,20 +110,6 @@ extension AudioCaptureOptions { echoCancellation ? .tfEchoCancellation : nil, noiseSuppression ? .tfNoiseSuppression : nil, autoGainControl ? .tfAutoGainControl : nil, - preConnect ? .tfPreconnectBuffer : nil, ].compactMap { $0 }) } } - -extension AudioCaptureOptions { - func withPreConnect() -> AudioCaptureOptions { - AudioCaptureOptions( - echoCancellation: echoCancellation, - autoGainControl: autoGainControl, - noiseSuppression: noiseSuppression, - highpassFilter: highpassFilter, - typingNoiseDetection: typingNoiseDetection, - preConnect: true - ) - } -} diff --git a/Sources/LiveKit/Types/Options/AudioPublishOptions.swift b/Sources/LiveKit/Types/Options/AudioPublishOptions.swift index 85f0a32f6..54e3c0509 100644 --- a/Sources/LiveKit/Types/Options/AudioPublishOptions.swift +++ b/Sources/LiveKit/Types/Options/AudioPublishOptions.swift @@ -34,17 +34,22 @@ public final class AudioPublishOptions: NSObject, TrackPublishOptions, Sendable @objc public let streamName: String? + @objc + public let preConnect: Bool + public init(name: String? = nil, encoding: AudioEncoding? = nil, dtx: Bool = true, red: Bool = true, - streamName: String? = nil) + streamName: String? = nil, + preConnect: Bool = false) { self.name = name self.encoding = encoding self.dtx = dtx self.red = red self.streamName = streamName + self.preConnect = preConnect } // MARK: - Equal @@ -55,7 +60,8 @@ public final class AudioPublishOptions: NSObject, TrackPublishOptions, Sendable encoding == other.encoding && dtx == other.dtx && red == other.red && - streamName == other.streamName + streamName == other.streamName && + preConnect == other.preConnect } override public var hash: Int { @@ -65,6 +71,7 @@ public final class AudioPublishOptions: NSObject, TrackPublishOptions, Sendable hasher.combine(dtx) hasher.combine(red) hasher.combine(streamName) + hasher.combine(preConnect) return hasher.finalize() } } @@ -74,6 +81,20 @@ extension AudioPublishOptions { func toFeatures() -> Set { Set([ !dtx ? .tfNoDtx : nil, + preConnect ? .tfPreconnectBuffer : nil, ].compactMap { $0 }) } } + +extension AudioPublishOptions { + func withPreconnect() -> AudioPublishOptions { + AudioPublishOptions( + name: name, + encoding: encoding, + dtx: dtx, + red: red, + streamName: streamName, + preConnect: true + ) + } +} From 9e26dbe90c78e0c2cf942967ae6abe6c29cb8cdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 6 May 2025 15:50:27 +0200 Subject: [PATCH 22/43] Inject recorder --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 016eebb20..17d600387 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -43,12 +43,13 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// Initialize the audio buffer with a room instance. /// - Parameters: /// - room: The room instance to listen for events. + /// - recorder: The audio recorder instance to use. @objc - public init(room: Room?) { + public init(room: Room?, recorder: LocalAudioTrackRecorder? = nil) { state.mutate { $0.room = room } let roomOptions = room?._state.roomOptions - recorder = LocalAudioTrackRecorder( + self.recorder = recorder ?? LocalAudioTrackRecorder( track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), format: .pcmFormatInt16, // supported by agent plugins From 05674b9f4a8fb1cb8025150d85f262d6091f9ef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Thu, 8 May 2025 13:52:10 +0200 Subject: [PATCH 23/43] Enable only when recording --- Sources/LiveKit/Core/Room.swift | 2 +- Sources/LiveKit/Types/Options/AudioPublishOptions.swift | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 6bb00c048..a39f641ab 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -377,7 +377,7 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled { let track = try await createMicrophoneTrackTask.value - try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect()) + try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect(preConnectBuffer.recorder.isRecording)) } // Connect sequence successful diff --git a/Sources/LiveKit/Types/Options/AudioPublishOptions.swift b/Sources/LiveKit/Types/Options/AudioPublishOptions.swift index 54e3c0509..2d487689e 100644 --- a/Sources/LiveKit/Types/Options/AudioPublishOptions.swift +++ b/Sources/LiveKit/Types/Options/AudioPublishOptions.swift @@ -87,14 +87,14 @@ extension AudioPublishOptions { } extension AudioPublishOptions { - func withPreconnect() -> AudioPublishOptions { + func withPreconnect(_ enabled: Bool) -> AudioPublishOptions { AudioPublishOptions( name: name, encoding: encoding, dtx: dtx, red: red, streamName: streamName, - preConnect: true + preConnect: enabled ) } } From a121762e6dbce3c4e2f7686f8c5466bef958de76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Fri, 9 May 2025 11:50:27 +0200 Subject: [PATCH 24/43] Don't persist track across recordings --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 37 +++++++++++-------- Sources/LiveKit/Core/Room.swift | 6 +-- .../Participant/LocalParticipant.swift | 2 +- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 17d600387..d8431a103 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -31,11 +31,12 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// The audio recorder instance. @objc - public let recorder: LocalAudioTrackRecorder + public var recorder: LocalAudioTrackRecorder? { state.recorder } private let state = StateSync(State()) private struct State { weak var room: Room? + var recorder: LocalAudioTrackRecorder? var audioStream: LocalAudioTrackRecorder.Stream? var timeout: TimeInterval = 10 } @@ -43,20 +44,9 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// Initialize the audio buffer with a room instance. /// - Parameters: /// - room: The room instance to listen for events. - /// - recorder: The audio recorder instance to use. @objc - public init(room: Room?, recorder: LocalAudioTrackRecorder? = nil) { + public init(room: Room?) { state.mutate { $0.room = room } - - let roomOptions = room?._state.roomOptions - self.recorder = recorder ?? LocalAudioTrackRecorder( - track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, - reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), - format: .pcmFormatInt16, // supported by agent plugins - sampleRate: 24000, // supported by agent plugins - maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB - ) - super.init() } @@ -68,12 +58,23 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// Start capturing audio and listening to ``RoomDelegate`` events. /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. + /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. @objc - public func startRecording(timeout: TimeInterval = 10) async throws { - let stream = try await recorder.start() + public func startRecording(timeout: TimeInterval = 10, recorder: LocalAudioTrackRecorder? = nil) async throws { + let roomOptions = room?._state.roomOptions + let newRecorder = recorder ?? LocalAudioTrackRecorder( + track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, + reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), + format: .pcmFormatInt16, // supported by agent plugins + sampleRate: 24000, // supported by agent plugins + maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB + ) + + let stream = try await newRecorder.start() log("Started capturing audio", .info) state.mutate { state in + state.recorder = newRecorder state.audioStream = stream state.timeout = timeout } @@ -86,7 +87,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// - flush: If `true`, the audio stream will be flushed immediately without sending. @objc public func stopRecording(flush: Bool = false) { - guard recorder.isRecording else { return } + guard let recorder, recorder.isRecording else { return } recorder.stop() log("Stopped capturing audio", .info) @@ -130,6 +131,10 @@ extension PreConnectAudioBuffer: RoomDelegate { let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) guard !agentIdentities.isEmpty else { return } + guard let recorder else { + throw LiveKitError(.invalidState, message: "Recorder is nil") + } + guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index a39f641ab..6b69f84e7 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -355,9 +355,9 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { log("Concurrent enable microphone mode: \(enableMicrophone)") let createMicrophoneTrackTask: Task? = { - if preConnectBuffer.recorder.isRecording { + if let recorder = preConnectBuffer.recorder, recorder.isRecording { return Task { - preConnectBuffer.recorder.track + recorder.track } } else if enableMicrophone { return Task { @@ -377,7 +377,7 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled { let track = try await createMicrophoneTrackTask.value - try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect(preConnectBuffer.recorder.isRecording)) + try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect(preConnectBuffer.recorder?.isRecording ?? false)) } // Connect sequence successful diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 8ff634035..dd2fbe096 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -226,7 +226,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { } override public func isMicrophoneEnabled() -> Bool { - if let room = _room, room.preConnectBuffer.recorder.isRecording { + if let room = _room, let recorder = room.preConnectBuffer.recorder, recorder.isRecording { return true } else { return super.isMicrophoneEnabled() From 5d939e0aaed2cd11c61555f49c7e5e880a30b6f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Fri, 9 May 2025 12:48:51 +0200 Subject: [PATCH 25/43] FIx publish options set/send order --- Sources/LiveKit/Participant/LocalParticipant.swift | 6 +++--- Sources/LiveKit/TrackPublications/TrackPublication.swift | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index dd2fbe096..7d904169c 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -705,14 +705,14 @@ extension LocalParticipant { } } + // Store publishOptions used for this track... + track._state.mutate { $0.lastPublishOptions = options } + let publication = LocalTrackPublication(info: trackInfo, participant: self) await publication.set(track: track) add(publication: publication) - // Store publishOptions used for this track... - track._state.mutate { $0.lastPublishOptions = options } - // Notify didPublish delegates.notify(label: { "localParticipant.didPublish \(publication)" }) { $0.participant?(self, didPublishTrack: publication) diff --git a/Sources/LiveKit/TrackPublications/TrackPublication.swift b/Sources/LiveKit/TrackPublications/TrackPublication.swift index 31980b480..13eca7c4d 100644 --- a/Sources/LiveKit/TrackPublications/TrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/TrackPublication.swift @@ -107,9 +107,8 @@ public class TrackPublication: NSObject, @unchecked Sendable, ObservableObject, dimensions: info.type == .video ? Dimensions(width: Int32(info.width), height: Int32(info.height)) : nil, isMetadataMuted: info.muted, encryptionType: info.encryption.toLKType(), - - // store the whole info - latestInfo: info + latestInfo: info, + audioTrackFeatures: Set(info.audioFeatures) )) self.participant = participant From 4bb7100705d09e7f7b2d7fe205c28541a69fd314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Fri, 9 May 2025 14:55:31 +0200 Subject: [PATCH 26/43] Total size --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index d8431a103..1e7b4970a 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -155,7 +155,8 @@ extension PreConnectAudioBuffer: RoomDelegate { "channels": "\(recorder.channels)", "trackId": track.stringValue, ], - destinationIdentities: agentIdentities + destinationIdentities: agentIdentities, + totalSize: audioData.count ) let writer = try await room.localParticipant.streamBytes(options: streamOptions) try await writer.write(audioData) From 8e4cfa973d23887ee0f8dc57b189afd9d33a88d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Fri, 9 May 2025 16:03:55 +0200 Subject: [PATCH 27/43] WIP: Move from track subscription to active participant state --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 50 ++++++------------- .../Core/Room+SignalClientDelegate.swift | 12 +++++ 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 1e7b4970a..494743765 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -39,6 +39,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { var recorder: LocalAudioTrackRecorder? var audioStream: LocalAudioTrackRecorder.Stream? var timeout: TimeInterval = 10 + var sent: Bool = false } /// Initialize the audio buffer with a room instance. @@ -52,7 +53,6 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { deinit { stopRecording() - room?.remove(delegate: self) } /// Start capturing audio and listening to ``RoomDelegate`` events. @@ -77,9 +77,13 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { state.recorder = newRecorder state.audioStream = stream state.timeout = timeout + state.sent = false } - room?.add(delegate: self) + Task { + try? await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC) + stopRecording(flush: true) + } } /// Stop capturing audio. @@ -99,37 +103,19 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { } } } -} - -// MARK: - RoomDelegate - -extension PreConnectAudioBuffer: RoomDelegate { - public func roomDidConnect(_: Room) { - Task { - try? await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC) - stopRecording(flush: true) - } - } - - public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack publication: LocalTrackPublication) { - stopRecording() - Task { - do { - try await sendAudioData(to: room, track: publication.sid) - } catch { - log("Unable to send audio: \(error)", .error) - } - } - } /// Send the audio data to the room. /// - Parameters: /// - room: The room instance to send the audio data. /// - topic: The topic to send the audio data. @objc - public func sendAudioData(to room: Room, track: Track.Sid, on topic: String = dataTopic) async throws { - let agentIdentities = room.remoteParticipants.filter { _, value in value.kind == .agent }.map(\.key) - guard !agentIdentities.isEmpty else { return } + public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws { + guard !agents.isEmpty else { return } + + guard !state.sent else { return } + state.mutate { $0.sent = true } + + stopRecording() guard let recorder else { throw LiveKitError(.invalidState, message: "Recorder is nil") @@ -144,23 +130,19 @@ extension PreConnectAudioBuffer: RoomDelegate { throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send") } - defer { - room.remove(delegate: self) - } - let streamOptions = StreamByteOptions( topic: topic, attributes: [ "sampleRate": "\(recorder.sampleRate)", "channels": "\(recorder.channels)", - "trackId": track.stringValue, + "trackId": recorder.track.sid?.stringValue ?? "", ], - destinationIdentities: agentIdentities, + destinationIdentities: agents, totalSize: audioData.count ) let writer = try await room.localParticipant.streamBytes(options: streamOptions) try await writer.write(audioData) try await writer.close() - log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agentIdentities.count) agent(s) ", .info) + log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agents.count) agent(s) ", .info) } } diff --git a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift index 750ef6bf3..d3ecba0e0 100644 --- a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift @@ -243,6 +243,18 @@ extension Room: SignalClientDelegate { func signalClient(_: SignalClient, didUpdateParticipants participants: [Livekit_ParticipantInfo]) async { log("participants: \(participants)") + let activeAgents = participants.filter { $0.kind == .agent && $0.state == .active } + .map(\.identity) + .map(Participant.Identity.init) + + Task { + do { + try await preConnectBuffer.sendAudioData(to: self, agents: activeAgents) + } catch { + log("Unable to send preconnect audio: \(error)", .error) + } + } + var disconnectedParticipantIdentities = [Participant.Identity]() var newParticipants = [RemoteParticipant]() From 8f479307bdfbb5d015c8b6e464cc131aa25a894e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 12 May 2025 09:10:37 +0200 Subject: [PATCH 28/43] Room cmts --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 494743765..f30688d43 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -17,15 +17,14 @@ import AVFAudio import Foundation -/// A buffer that captures audio before connecting to the server, -/// and sends it on certain ``RoomDelegate`` events. +/// A buffer that captures audio before connecting to the server. @objc public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// The default data topic used to send the audio buffer. @objc public static let dataTopic = "lk.agent.pre-connect-audio-buffer" - /// The room instance to listen for events. + /// The room instance to send the audio buffer to. @objc public var room: Room? { state.room } @@ -44,7 +43,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// Initialize the audio buffer with a room instance. /// - Parameters: - /// - room: The room instance to listen for events. + /// - room: The room instance to send the audio buffer to. @objc public init(room: Room?) { state.mutate { $0.room = room } @@ -55,7 +54,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { stopRecording() } - /// Start capturing audio and listening to ``RoomDelegate`` events. + /// Start capturing audio. /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. @@ -81,7 +80,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { } Task { - try? await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC) + try await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC) stopRecording(flush: true) } } @@ -107,6 +106,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// Send the audio data to the room. /// - Parameters: /// - room: The room instance to send the audio data. + /// - agents: The agents to send the audio data to. /// - topic: The topic to send the audio data. @objc public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws { From 8f06f407985a752a86e81f9d7577b90263725e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 12 May 2025 09:20:33 +0200 Subject: [PATCH 29/43] Handle no recorder case --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index f30688d43..21d10edf5 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -115,12 +115,13 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { guard !state.sent else { return } state.mutate { $0.sent = true } - stopRecording() - guard let recorder else { - throw LiveKitError(.invalidState, message: "Recorder is nil") + log("Skipping preconnect audio, recorder is nil", .info) + return } + stopRecording() + guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") } From bbb8c08f790f1d95603bfbacd4d8f4f8f0b3969b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 12 May 2025 09:28:07 +0200 Subject: [PATCH 30/43] Handle conversion fails --- Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index 67a01970e..19a41cdb0 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -118,6 +118,8 @@ public extension LocalAudioTrackRecorder { .toData() { state.continuation?.yield(data) + } else { + assertionFailure("Failed to convert PCM buffer to data") } } } From cbd7dde517c3c2486d703b5954fa5a89a21c4778 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 12 May 2025 09:59:04 +0200 Subject: [PATCH 31/43] Move timeout --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 21d10edf5..a0ffba396 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -37,7 +37,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { weak var room: Room? var recorder: LocalAudioTrackRecorder? var audioStream: LocalAudioTrackRecorder.Stream? - var timeout: TimeInterval = 10 + var timeoutTask: Task? var sent: Bool = false } @@ -75,14 +75,12 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { state.mutate { state in state.recorder = newRecorder state.audioStream = stream - state.timeout = timeout + state.timeoutTask = Task { [weak self] in + try? await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) + self?.stopRecording(flush: true) + } state.sent = false } - - Task { - try await Task.sleep(nanoseconds: UInt64(state.timeout) * NSEC_PER_SEC) - stopRecording(flush: true) - } } /// Stop capturing audio. From d090956c64b6a21f70c0d63149bd407cc5be7360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Mon, 12 May 2025 11:50:20 +0200 Subject: [PATCH 32/43] Cancel timeout --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index a0ffba396..f5d963263 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -37,7 +37,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { weak var room: Room? var recorder: LocalAudioTrackRecorder? var audioStream: LocalAudioTrackRecorder.Stream? - var timeoutTask: Task? + var timeoutTask: Task? var sent: Bool = false } @@ -72,11 +72,13 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { let stream = try await newRecorder.start() log("Started capturing audio", .info) + state.timeoutTask?.cancel() state.mutate { state in state.recorder = newRecorder state.audioStream = stream state.timeoutTask = Task { [weak self] in - try? await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) + try await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) + try Task.checkCancellation() self?.stopRecording(flush: true) } state.sent = false From a8d0c3f65fc5ad722f909a8b7dea4d3a79d62b3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 10:03:01 +0200 Subject: [PATCH 33/43] Expose participant state --- Sources/LiveKit/Participant/Participant.swift | 16 ++++ .../Protocols/ParticipantDelegate.swift | 5 + Sources/LiveKit/Protocols/RoomDelegate.swift | 9 +- Sources/LiveKit/Types/ParticipantState.swift | 93 +++++++++++++++++++ 4 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 Sources/LiveKit/Types/ParticipantState.swift diff --git a/Sources/LiveKit/Participant/Participant.swift b/Sources/LiveKit/Participant/Participant.swift index c14f332e9..b6fd0a60f 100644 --- a/Sources/LiveKit/Participant/Participant.swift +++ b/Sources/LiveKit/Participant/Participant.swift @@ -49,6 +49,9 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga @objc public var attributes: [String: String] { _state.attributes } + @objc + public var state: ParticipantState { _state.state } + @objc public var connectionQuality: ConnectionQuality { _state.connectionQuality } @@ -99,6 +102,7 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga var metadata: String? var joinedAt: Date? var kind: Kind = .unknown + var state: ParticipantState = .unknown var connectionQuality: ConnectionQuality = .unknown var permissions = ParticipantPermissions() var trackPublications = [Track.Sid: TrackPublication]() @@ -176,6 +180,17 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga } } + // state updated + if newState.state != oldState.state { + self.delegates.notify(label: { "participant.didUpdate state: \(newState.state)" }) { + $0.participant?(self, didUpdateState: newState.state) + } + room.delegates.notify(label: { "room.didUpdate state: \(newState.state)" }) { + $0.room?(room, participant: self, didUpdateState: newState.state) + } + } + + // connection quality updated if newState.connectionQuality != oldState.connectionQuality { self.delegates.notify(label: { "participant.didUpdate connectionQuality: \(self.connectionQuality)" }) { $0.participant?(self, didUpdateConnectionQuality: self.connectionQuality) @@ -228,6 +243,7 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga $0.metadata = info.metadata $0.kind = info.kind.toLKType() $0.attributes = info.attributes + $0.state = info.state.toLKType() // Attempt to get millisecond precision. if info.joinedAtMs != 0 { diff --git a/Sources/LiveKit/Protocols/ParticipantDelegate.swift b/Sources/LiveKit/Protocols/ParticipantDelegate.swift index 9a705f2d8..77285a46f 100644 --- a/Sources/LiveKit/Protocols/ParticipantDelegate.swift +++ b/Sources/LiveKit/Protocols/ParticipantDelegate.swift @@ -43,6 +43,11 @@ public protocol ParticipantDelegate: AnyObject, Sendable { @objc optional func participant(_ participant: Participant, didUpdateIsSpeaking isSpeaking: Bool) + /// The state of a ``Participant`` has updated. + /// `participant` Can be a ``LocalParticipant`` or a ``RemoteParticipant``. + @objc optional + func participant(_ participant: Participant, didUpdateState state: ParticipantState) + /// The connection quality of a ``Participant`` has updated. /// `participant` Can be a ``LocalParticipant`` or a ``RemoteParticipant``. @objc optional diff --git a/Sources/LiveKit/Protocols/RoomDelegate.swift b/Sources/LiveKit/Protocols/RoomDelegate.swift index 76505a34e..157cbb279 100644 --- a/Sources/LiveKit/Protocols/RoomDelegate.swift +++ b/Sources/LiveKit/Protocols/RoomDelegate.swift @@ -97,6 +97,10 @@ public protocol RoomDelegate: AnyObject, Sendable { @objc optional func room(_ room: Room, participant: Participant, didUpdateName name: String) + /// ``Participant/state`` has updated. + @objc optional + func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) + /// ``Participant/connectionQuality`` has updated. @objc optional func room(_ room: Room, participant: Participant, didUpdateConnectionQuality quality: ConnectionQuality) @@ -216,11 +220,6 @@ public protocol RoomDelegate: AnyObject, Sendable { @objc(room:participant:didUpdateMetadata_:) optional func room(_ room: Room, participant: Participant, didUpdate metadata: String?) - // Renamed to ``RoomDelegate/room(_:participant:didUpdateName:)``. - // @available(*, unavailable, renamed: "room(_:participant:didUpdateName:)") - // @objc(room:participant:didUpdateName_:) optional - // func room(_ room: Room, participant: Participant, didUpdateName: String) - /// Renamed to ``RoomDelegate/room(_:participant:didUpdateConnectionQuality:)``. @available(*, unavailable, renamed: "room(_:participant:didUpdateConnectionQuality:)") @objc(room:participant:didUpdateConnectionQuality_:) optional diff --git a/Sources/LiveKit/Types/ParticipantState.swift b/Sources/LiveKit/Types/ParticipantState.swift new file mode 100644 index 000000000..f3b0c93df --- /dev/null +++ b/Sources/LiveKit/Types/ParticipantState.swift @@ -0,0 +1,93 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Describes the state of a ``Participant``'s connection to the LiveKit server. +@objc +public enum ParticipantState: Int, Sendable, CaseIterable { + /// Websocket is connected, but no offer has been sent yet + case joining = 0 + + /// Server has received the client's offer + case joined = 1 + + /// ICE connectivity has been established + case active = 2 + + /// Websocket has disconnected + case disconnected = 3 + + /// Unknown state + case unknown = 999 +} + +// MARK: - Conversions from/to protobuf types + +extension ParticipantState { + init(from protoState: Livekit_ParticipantInfo.State) { + switch protoState { + case .joining: + self = .joining + case .joined: + self = .joined + case .active: + self = .active + case .disconnected: + self = .disconnected + case .UNRECOGNIZED: + self = .unknown + } + } + + var protoState: Livekit_ParticipantInfo.State { + switch self { + case .joining: + return .joining + case .joined: + return .joined + case .active: + return .active + case .disconnected: + return .disconnected + case .unknown: + return .joining // Default to joining for unknown state + } + } +} + +extension Livekit_ParticipantInfo.State { + func toLKType() -> ParticipantState { + ParticipantState(from: self) + } +} + +extension ParticipantState: CustomStringConvertible { + public var description: String { + switch self { + case .joining: + return "joining" + case .joined: + return "joined" + case .active: + return "active" + case .disconnected: + return "disconnected" + case .unknown: + return "unknown" + } + } +} From 333fb14a4ef0802cdf24ff78c1ff17f49fa9849d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 10:24:44 +0200 Subject: [PATCH 34/43] Bring back RoomDelegate --- .../LiveKit/Core/PreConnectAudioBuffer.swift | 48 ++++++++++++++++--- Sources/LiveKit/Core/Room+PreConnect.swift | 4 +- .../Core/Room+SignalClientDelegate.swift | 12 ----- 3 files changed, 44 insertions(+), 20 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index f5d963263..64a936168 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -20,6 +20,8 @@ import Foundation /// A buffer that captures audio before connecting to the server. @objc public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { + public typealias OnError = @Sendable (Error) -> Void + /// The default data topic used to send the audio buffer. @objc public static let dataTopic = "lk.agent.pre-connect-audio-buffer" @@ -39,14 +41,19 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { var audioStream: LocalAudioTrackRecorder.Stream? var timeoutTask: Task? var sent: Bool = false + var onError: OnError? = nil } /// Initialize the audio buffer with a room instance. /// - Parameters: /// - room: The room instance to send the audio buffer to. + /// - onError: The error handler to call when an error occurs while sending the audio buffer. @objc - public init(room: Room?) { - state.mutate { $0.room = room } + public init(room: Room?, onError: OnError? = nil) { + state.mutate { + $0.room = room + $0.onError = onError + } super.init() } @@ -54,12 +61,19 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { stopRecording() } + @objc + public func setErrorHandler(_ onError: OnError?) { + state.mutate { $0.onError = onError } + } + /// Start capturing audio. /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. @objc public func startRecording(timeout: TimeInterval = 10, recorder: LocalAudioTrackRecorder? = nil) async throws { + room?.add(delegate: self) + let roomOptions = room?._state.roomOptions let newRecorder = recorder ?? LocalAudioTrackRecorder( track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, @@ -100,6 +114,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { Task { for await _ in stream {} } + room?.remove(delegate: self) } } @@ -116,12 +131,9 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { state.mutate { $0.sent = true } guard let recorder else { - log("Skipping preconnect audio, recorder is nil", .info) - return + throw LiveKitError(.invalidState, message: "Recorder is nil") } - stopRecording() - guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") } @@ -144,6 +156,28 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { let writer = try await room.localParticipant.streamBytes(options: streamOptions) try await writer.write(audioData) try await writer.close() - log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agents.count) agent(s) ", .info) + log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agents.count) agent(s) \(agents)", .info) + } +} + +extension PreConnectAudioBuffer: RoomDelegate { + public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) { + print("bp", room, participant.kind, state) + + guard participant.kind == .agent, state == .active, let agent = participant.identity else { return } + log("Detected an active agent participant: \(agent), sending audio", .info) + + stopRecording() + + Task { + do { + try await sendAudioData(to: room, agents: [agent]) + } catch { + log("Unable to send preconnect audio: \(error)", .error) + self.state.onError?(error) + } + } + + room.remove(delegate: self) } } diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index dce9a569e..9cd60a4da 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -23,6 +23,7 @@ public extension Room { /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. /// - operation: The operation to perform while audio is being captured. + /// - onError: The error handler to call when an error occurs while sending the audio buffer. /// - Returns: The result of the operation. /// /// - Example: @@ -41,7 +42,8 @@ public extension Room { /// - Important: Call ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` during app launch sequence to request microphone permissions early. /// func withPreConnectAudio(timeout: TimeInterval = 10, - _ operation: @Sendable @escaping () async throws -> T) async throws -> T + _ operation: @Sendable @escaping () async throws -> T, + onError _: PreConnectAudioBuffer.OnError? = nil) async throws -> T { try await preConnectBuffer.startRecording(timeout: timeout) diff --git a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift index d3ecba0e0..750ef6bf3 100644 --- a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift @@ -243,18 +243,6 @@ extension Room: SignalClientDelegate { func signalClient(_: SignalClient, didUpdateParticipants participants: [Livekit_ParticipantInfo]) async { log("participants: \(participants)") - let activeAgents = participants.filter { $0.kind == .agent && $0.state == .active } - .map(\.identity) - .map(Participant.Identity.init) - - Task { - do { - try await preConnectBuffer.sendAudioData(to: self, agents: activeAgents) - } catch { - log("Unable to send preconnect audio: \(error)", .error) - } - } - var disconnectedParticipantIdentities = [Participant.Identity]() var newParticipants = [RemoteParticipant]() From 14b811bd1a46e2e699381b279dccc4fce031868b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 11:18:21 +0200 Subject: [PATCH 35/43] Fix test --- Tests/LiveKitTests/PreConnectAudioBufferTests.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Tests/LiveKitTests/PreConnectAudioBufferTests.swift b/Tests/LiveKitTests/PreConnectAudioBufferTests.swift index 866ac0145..79cebee85 100644 --- a/Tests/LiveKitTests/PreConnectAudioBufferTests.swift +++ b/Tests/LiveKitTests/PreConnectAudioBufferTests.swift @@ -18,7 +18,7 @@ import XCTest class PreConnectAudioBufferTests: LKTestCase { - func testRemoteDidSubscribeTrackSendsAudioData() async throws { + func testParticipantActiveStateSendsAudioData() async throws { let receiveExpectation = expectation(description: "Receives audio data") try await withRooms([RoomTestingOptions(canSubscribe: true), RoomTestingOptions(canPublish: true, canPublishData: true)]) { rooms in @@ -41,8 +41,8 @@ class PreConnectAudioBufferTests: LKTestCase { try await buffer.startRecording() try await Task.sleep(nanoseconds: NSEC_PER_SEC / 2) - let publication = LocalTrackPublication(info: Livekit_TrackInfo(), participant: rooms[0].localParticipant) - buffer.room(publisherRoom, participant: publisherRoom.localParticipant, remoteDidSubscribeTrack: publication) + subscriberRoom.localParticipant._state.mutate { $0.kind = .agent } // override kind + buffer.room(publisherRoom, participant: subscriberRoom.localParticipant, didUpdateState: .active) await self.fulfillment(of: [receiveExpectation], timeout: 10) } From 51ceff149578d4bdfb0ea167df20eccdb7591c27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 11:29:48 +0200 Subject: [PATCH 36/43] Cmt --- Sources/LiveKit/Core/Room+PreConnect.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index 9cd60a4da..662e38634 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -35,6 +35,8 @@ public extension Room { /// return /// } /// try await room.connect(url: connectionDetails.serverUrl, token: connectionDetails.participantToken) + /// } onError: { error in + /// print("Error sending audio buffer: \(error)") /// } /// ``` /// From 960b04c5feff6646f607e390dc5e9bc11f0a6cca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 15:37:48 +0200 Subject: [PATCH 37/43] Use subscription to stop audio --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 64a936168..14771830b 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -161,13 +161,14 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { } extension PreConnectAudioBuffer: RoomDelegate { - public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) { - print("bp", room, participant.kind, state) + public func room(_: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) { + log("Subscribed by remote participant", .info) + stopRecording() + } + public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) { guard participant.kind == .agent, state == .active, let agent = participant.identity else { return } - log("Detected an active agent participant: \(agent), sending audio", .info) - - stopRecording() + log("Detected active agent participant: \(agent), sending audio", .info) Task { do { @@ -177,7 +178,5 @@ extension PreConnectAudioBuffer: RoomDelegate { self.state.onError?(error) } } - - room.remove(delegate: self) } } From b958cf1fc4063c43f53310124a146d0609bd9b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 15:41:08 +0200 Subject: [PATCH 38/43] Stop --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 14771830b..863b0e4b4 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -169,6 +169,7 @@ extension PreConnectAudioBuffer: RoomDelegate { public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) { guard participant.kind == .agent, state == .active, let agent = participant.identity else { return } log("Detected active agent participant: \(agent), sending audio", .info) + stopRecording() Task { do { From 05850683cf7e17c0adba8c4a1501ea7882643736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 16:36:25 +0200 Subject: [PATCH 39/43] Constants --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 863b0e4b4..725df0fec 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -22,6 +22,12 @@ import Foundation public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { public typealias OnError = @Sendable (Error) -> Void + public enum Constants { + public static let maxSize = 10 * 1024 * 1024 // 10MB + public static let sampleRate = 24000 + public static let timeout: TimeInterval = 10 + } + /// The default data topic used to send the audio buffer. @objc public static let dataTopic = "lk.agent.pre-connect-audio-buffer" @@ -71,16 +77,16 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// - timeout: The timeout for the remote participant to subscribe to the audio track. /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. @objc - public func startRecording(timeout: TimeInterval = 10, recorder: LocalAudioTrackRecorder? = nil) async throws { + public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws { room?.add(delegate: self) let roomOptions = room?._state.roomOptions let newRecorder = recorder ?? LocalAudioTrackRecorder( track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), - format: .pcmFormatInt16, // supported by agent plugins - sampleRate: 24000, // supported by agent plugins - maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB + format: .pcmFormatInt16, + sampleRate: Constants.sampleRate, + maxSize: Constants.maxSize ) let stream = try await newRecorder.start() From a65ac15449b04cc44e15df7ec45f764c4f14da28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 16:40:45 +0200 Subject: [PATCH 40/43] Timeout comments --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 2 ++ Sources/LiveKit/Core/Room+PreConnect.swift | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 725df0fec..27267319d 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -75,6 +75,8 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// Start capturing audio. /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. + /// The room connection needs to be established and the remote participant needs to subscribe to the audio track + /// before the timeout is reached. Otherwise, the audio stream will be flushed without sending. /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. @objc public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws { diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index 662e38634..1a9500cd7 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -22,6 +22,8 @@ public extension Room { /// /// - Parameters: /// - timeout: The timeout for the remote participant to subscribe to the audio track. + /// The room connection needs to be established and the remote participant needs to subscribe to the audio track + /// before the timeout is reached. Otherwise, the audio stream will be flushed without sending. /// - operation: The operation to perform while audio is being captured. /// - onError: The error handler to call when an error occurs while sending the audio buffer. /// - Returns: The result of the operation. From 6351d7040c1e089c91e967eb2e79ddda39f5c72d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 16:58:06 +0200 Subject: [PATCH 41/43] Fix error handler --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 2 +- Sources/LiveKit/Core/Room+PreConnect.swift | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 27267319d..91c4067aa 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -170,7 +170,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { extension PreConnectAudioBuffer: RoomDelegate { public func room(_: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) { - log("Subscribed by remote participant", .info) + log("Subscribed by remote participant, stopping audio", .info) stopRecording() } diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index 1a9500cd7..b67147219 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -47,8 +47,9 @@ public extension Room { /// func withPreConnectAudio(timeout: TimeInterval = 10, _ operation: @Sendable @escaping () async throws -> T, - onError _: PreConnectAudioBuffer.OnError? = nil) async throws -> T + onError: PreConnectAudioBuffer.OnError? = nil) async throws -> T { + preConnectBuffer.setErrorHandler(onError) try await preConnectBuffer.startRecording(timeout: timeout) do { From 6101c5265e3b386624bc566b5700fe8498195dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 17:22:37 +0200 Subject: [PATCH 42/43] Revert "Stop" This reverts commit b958cf1fc4063c43f53310124a146d0609bd9b7c. --- Sources/LiveKit/Core/PreConnectAudioBuffer.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 91c4067aa..89ba9d609 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -177,7 +177,6 @@ extension PreConnectAudioBuffer: RoomDelegate { public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) { guard participant.kind == .agent, state == .active, let agent = participant.identity else { return } log("Detected active agent participant: \(agent), sending audio", .info) - stopRecording() Task { do { From 90b1b290b848cbe42d6f86b954a9940f7d861a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 13 May 2025 17:33:25 +0200 Subject: [PATCH 43/43] Simplify --- Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index 19a41cdb0..5e002e9a6 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -71,9 +71,7 @@ public final class LocalAudioTrackRecorder: NSObject, Sendable, AudioRenderer { /// - Returns: A stream of audio data. /// - Throws: An error if the audio track cannot be started. public func start() async throws -> Stream { - if let continuation = state.continuation { - continuation.finish() - } + stop() try await track.startCapture() track.add(audioRenderer: self)