diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 475d23d24..89ba9d609 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -17,63 +17,93 @@ import AVFAudio import Foundation -/// A buffer that captures audio before connecting to the server, -/// and sends it on certain ``RoomDelegate`` events. +/// A buffer that captures audio before connecting to the server. @objc -public final class PreConnectAudioBuffer: NSObject, Loggable { - /// The default participant attribute key used to indicate that the audio buffer is active. - @objc - public static let attributeKey = "lk.agent.pre-connect-audio" +public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { + public typealias OnError = @Sendable (Error) -> Void + + public enum Constants { + public static let maxSize = 10 * 1024 * 1024 // 10MB + public static let sampleRate = 24000 + public static let timeout: TimeInterval = 10 + } /// The default data topic used to send the audio buffer. @objc public static let dataTopic = "lk.agent.pre-connect-audio-buffer" - /// The room instance to listen for events. + /// The room instance to send the audio buffer to. @objc - public let room: Room? + public var room: Room? { state.room } /// The audio recorder instance. @objc - public let recorder: LocalAudioTrackRecorder + public var recorder: LocalAudioTrackRecorder? { state.recorder } private let state = StateSync(State()) private struct State { + weak var room: Room? + var recorder: LocalAudioTrackRecorder? var audioStream: LocalAudioTrackRecorder.Stream? + var timeoutTask: Task? + var sent: Bool = false + var onError: OnError? = nil } /// Initialize the audio buffer with a room instance. /// - Parameters: - /// - room: The room instance to listen for events. - /// - recorder: The audio recorder to use for capturing. + /// - room: The room instance to send the audio buffer to. + /// - onError: The error handler to call when an error occurs while sending the audio buffer. @objc - public init(room: Room?, - recorder: LocalAudioTrackRecorder = LocalAudioTrackRecorder( - track: LocalAudioTrack.createTrack(), - format: .pcmFormatInt16, // supported by agent plugins - sampleRate: 24000, // supported by agent plugins - maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB - )) - { - self.room = room - self.recorder = recorder + public init(room: Room?, onError: OnError? = nil) { + state.mutate { + $0.room = room + $0.onError = onError + } super.init() } deinit { stopRecording() - room?.remove(delegate: self) } - /// Start capturing audio and listening to ``RoomDelegate`` events. @objc - public func startRecording() async throws { + public func setErrorHandler(_ onError: OnError?) { + state.mutate { $0.onError = onError } + } + + /// Start capturing audio. + /// - Parameters: + /// - timeout: The timeout for the remote participant to subscribe to the audio track. + /// The room connection needs to be established and the remote participant needs to subscribe to the audio track + /// before the timeout is reached. Otherwise, the audio stream will be flushed without sending. + /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. + @objc + public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws { room?.add(delegate: self) - let stream = try await recorder.start() + let roomOptions = room?._state.roomOptions + let newRecorder = recorder ?? LocalAudioTrackRecorder( + track: LocalAudioTrack.createTrack(options: roomOptions?.defaultAudioCaptureOptions, + reportStatistics: roomOptions?.reportRemoteTrackStatistics ?? false), + format: .pcmFormatInt16, + sampleRate: Constants.sampleRate, + maxSize: Constants.maxSize + ) + + let stream = try await newRecorder.start() log("Started capturing audio", .info) + + state.timeoutTask?.cancel() state.mutate { state in + state.recorder = newRecorder state.audioStream = stream + state.timeoutTask = Task { [weak self] in + try await Task.sleep(nanoseconds: UInt64(timeout) * NSEC_PER_SEC) + try Task.checkCancellation() + self?.stopRecording(flush: true) + } + state.sent = false } } @@ -82,66 +112,79 @@ public final class PreConnectAudioBuffer: NSObject, Loggable { /// - flush: If `true`, the audio stream will be flushed immediately without sending. @objc public func stopRecording(flush: Bool = false) { + guard let recorder, recorder.isRecording else { return } + recorder.stop() log("Stopped capturing audio", .info) + if flush, let stream = state.audioStream { + log("Flushing audio stream", .info) Task { for await _ in stream {} } + room?.remove(delegate: self) } } -} - -// MARK: - RoomDelegate - -extension PreConnectAudioBuffer: RoomDelegate { - public func roomDidConnect(_ room: Room) { - Task { - try? await setParticipantAttribute(room: room) - } - } - - public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) { - stopRecording() - Task { - try? await sendAudioData(to: room) - } - } - - /// Set the participant attribute to indicate that the audio buffer is active. - /// - Parameters: - /// - key: The key to set the attribute. - /// - room: The room instance to set the attribute. - @objc - public func setParticipantAttribute(key _: String = attributeKey, room: Room) async throws { - var attributes = room.localParticipant.attributes - attributes[Self.attributeKey] = "true" - try await room.localParticipant.set(attributes: attributes) - log("Set participant attribute", .info) - } /// Send the audio data to the room. /// - Parameters: /// - room: The room instance to send the audio data. + /// - agents: The agents to send the audio data to. /// - topic: The topic to send the audio data. @objc - public func sendAudioData(to room: Room, on topic: String = dataTopic) async throws { + public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws { + guard !agents.isEmpty else { return } + + guard !state.sent else { return } + state.mutate { $0.sent = true } + + guard let recorder else { + throw LiveKitError(.invalidState, message: "Recorder is nil") + } + guard let audioStream = state.audioStream else { throw LiveKitError(.invalidState, message: "Audio stream is nil") } + let audioData = try await audioStream.collect() + guard audioData.count > 1024 else { + throw LiveKitError(.unknown, message: "Audio data size too small, nothing to send") + } + let streamOptions = StreamByteOptions( topic: topic, attributes: [ "sampleRate": "\(recorder.sampleRate)", "channels": "\(recorder.channels)", - ] + "trackId": recorder.track.sid?.stringValue ?? "", + ], + destinationIdentities: agents, + totalSize: audioData.count ) let writer = try await room.localParticipant.streamBytes(options: streamOptions) - try await writer.write(audioStream.collect()) + try await writer.write(audioData) try await writer.close() - log("Sent audio data", .info) + log("Sent \(recorder.duration(audioData.count))s = \(audioData.count / 1024)KB of audio data to \(agents.count) agent(s) \(agents)", .info) + } +} - room.remove(delegate: self) +extension PreConnectAudioBuffer: RoomDelegate { + public func room(_: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) { + log("Subscribed by remote participant, stopping audio", .info) + stopRecording() + } + + public func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) { + guard participant.kind == .agent, state == .active, let agent = participant.identity else { return } + log("Detected active agent participant: \(agent), sending audio", .info) + + Task { + do { + try await sendAudioData(to: room, agents: [agent]) + } catch { + log("Unable to send preconnect audio: \(error)", .error) + self.state.onError?(error) + } + } } } diff --git a/Sources/LiveKit/Core/Room+PreConnect.swift b/Sources/LiveKit/Core/Room+PreConnect.swift index 83344015c..b67147219 100644 --- a/Sources/LiveKit/Core/Room+PreConnect.swift +++ b/Sources/LiveKit/Core/Room+PreConnect.swift @@ -17,13 +17,51 @@ import Foundation public extension Room { - /// Start capturing audio before connecting to the server, - /// so that it's not lost when the connection is established. - /// It will be automatically sent via data stream to the other participant - /// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed. + /// Starts a pre-connect audio sequence that will automatically be cleaned up + /// when the operation fails. + /// + /// - Parameters: + /// - timeout: The timeout for the remote participant to subscribe to the audio track. + /// The room connection needs to be established and the remote participant needs to subscribe to the audio track + /// before the timeout is reached. Otherwise, the audio stream will be flushed without sending. + /// - operation: The operation to perform while audio is being captured. + /// - onError: The error handler to call when an error occurs while sending the audio buffer. + /// - Returns: The result of the operation. + /// + /// - Example: + /// ```swift + /// try await room.withPreConnectAudio { + /// // Audio is being captured automatically + /// // Perform any other (async) setup here + /// guard let connectionDetails = try await tokenService.fetchConnectionDetails(roomName: roomName, participantName: participantName) else { + /// return + /// } + /// try await room.connect(url: connectionDetails.serverUrl, token: connectionDetails.participantToken) + /// } onError: { error in + /// print("Error sending audio buffer: \(error)") + /// } + /// ``` + /// /// - See: ``PreConnectAudioBuffer`` - /// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early. - func startCapturingBeforeConnecting() async throws { - try await preConnectBuffer.startRecording() + /// - Important: Call ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` during app launch sequence to request microphone permissions early. + /// + func withPreConnectAudio(timeout: TimeInterval = 10, + _ operation: @Sendable @escaping () async throws -> T, + onError: PreConnectAudioBuffer.OnError? = nil) async throws -> T + { + preConnectBuffer.setErrorHandler(onError) + try await preConnectBuffer.startRecording(timeout: timeout) + + do { + return try await operation() + } catch { + preConnectBuffer.stopRecording(flush: true) + throw error + } + } + + @available(*, deprecated, message: "Use withPreConnectAudio instead") + func startCapturingBeforeConnecting(timeout: TimeInterval = 10) async throws { + try await preConnectBuffer.startRecording(timeout: timeout) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 8d7b994ce..6b69f84e7 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -354,20 +354,30 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { let enableMicrophone = _state.connectOptions.enableMicrophone log("Concurrent enable microphone mode: \(enableMicrophone)") - let createMicrophoneTrackTask: Task? = enableMicrophone ? Task { - let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions, - reportStatistics: _state.roomOptions.reportRemoteTrackStatistics) - // Initializes AudioDeviceModule's recording - try await localTrack.start() - return localTrack - } : nil + let createMicrophoneTrackTask: Task? = { + if let recorder = preConnectBuffer.recorder, recorder.isRecording { + return Task { + recorder.track + } + } else if enableMicrophone { + return Task { + let localTrack = LocalAudioTrack.createTrack(options: _state.roomOptions.defaultAudioCaptureOptions, + reportStatistics: _state.roomOptions.reportRemoteTrackStatistics) + // Initializes AudioDeviceModule's recording + try await localTrack.start() + return localTrack + } + } else { + return nil + } + }() do { try await fullConnectSequence(url, token) if let createMicrophoneTrackTask, !createMicrophoneTrackTask.isCancelled { let track = try await createMicrophoneTrackTask.value - try await localParticipant._publish(track: track) + try await localParticipant._publish(track: track, options: _state.roomOptions.defaultAudioPublishOptions.withPreconnect(preConnectBuffer.recorder?.isRecording ?? false)) } // Connect sequence successful @@ -437,10 +447,6 @@ extension Room { e2eeManager.cleanUp() } - if disconnectError != nil { - preConnectBuffer.stopRecording(flush: true) - } - // Reset state _state.mutate { // if isFullReconnect, keep connection related states diff --git a/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift b/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift index 933a44401..aadd03bc0 100644 --- a/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift +++ b/Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift @@ -167,3 +167,16 @@ public extension AVAudioPCMBuffer { } } } + +extension AVAudioCommonFormat { + var bytesPerSample: Int { + switch self { + case .pcmFormatInt16: return 2 + case .pcmFormatInt32: return 4 + case .pcmFormatFloat32: return 4 + case .pcmFormatFloat64: return 8 + case .otherFormat: return 0 + @unknown default: return 0 + } + } +} diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 2be273978..7d904169c 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -225,6 +225,14 @@ public class LocalParticipant: Participant, @unchecked Sendable { return didUpdate } + override public func isMicrophoneEnabled() -> Bool { + if let room = _room, let recorder = room.preConnectBuffer.recorder, recorder.isRecording { + return true + } else { + return super.isMicrophoneEnabled() + } + } + // MARK: - Broadcast Activation #if os(iOS) @@ -499,9 +507,9 @@ extension [Livekit_SubscribedQuality] { // MARK: - Private -private extension LocalParticipant { +extension LocalParticipant { @discardableResult - internal func _publish(track: LocalTrack, options: TrackPublishOptions? = nil) async throws -> LocalTrackPublication { + func _publish(track: LocalTrack, options: TrackPublishOptions? = nil) async throws -> LocalTrackPublication { log("[publish] \(track) options: \(String(describing: options ?? nil))...", .info) try checkPermissions(toPublish: track) @@ -610,6 +618,7 @@ private extension LocalParticipant { populatorFunc = { populator in populator.disableDtx = !audioPublishOptions.dtx populator.disableRed = !audioPublishOptions.red + populator.audioFeatures = Array(audioPublishOptions.toFeatures()) if let streamName = options?.streamName { // Set stream name if specified in options @@ -696,14 +705,14 @@ private extension LocalParticipant { } } + // Store publishOptions used for this track... + track._state.mutate { $0.lastPublishOptions = options } + let publication = LocalTrackPublication(info: trackInfo, participant: self) await publication.set(track: track) add(publication: publication) - // Store publishOptions used for this track... - track._state.mutate { $0.lastPublishOptions = options } - // Notify didPublish delegates.notify(label: { "localParticipant.didPublish \(publication)" }) { $0.participant?(self, didPublishTrack: publication) diff --git a/Sources/LiveKit/Participant/Participant.swift b/Sources/LiveKit/Participant/Participant.swift index 1f5955480..b6fd0a60f 100644 --- a/Sources/LiveKit/Participant/Participant.swift +++ b/Sources/LiveKit/Participant/Participant.swift @@ -49,6 +49,9 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga @objc public var attributes: [String: String] { _state.attributes } + @objc + public var state: ParticipantState { _state.state } + @objc public var connectionQuality: ConnectionQuality { _state.connectionQuality } @@ -99,6 +102,7 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga var metadata: String? var joinedAt: Date? var kind: Kind = .unknown + var state: ParticipantState = .unknown var connectionQuality: ConnectionQuality = .unknown var permissions = ParticipantPermissions() var trackPublications = [Track.Sid: TrackPublication]() @@ -176,6 +180,17 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga } } + // state updated + if newState.state != oldState.state { + self.delegates.notify(label: { "participant.didUpdate state: \(newState.state)" }) { + $0.participant?(self, didUpdateState: newState.state) + } + room.delegates.notify(label: { "room.didUpdate state: \(newState.state)" }) { + $0.room?(room, participant: self, didUpdateState: newState.state) + } + } + + // connection quality updated if newState.connectionQuality != oldState.connectionQuality { self.delegates.notify(label: { "participant.didUpdate connectionQuality: \(self.connectionQuality)" }) { $0.participant?(self, didUpdateConnectionQuality: self.connectionQuality) @@ -228,6 +243,7 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga $0.metadata = info.metadata $0.kind = info.kind.toLKType() $0.attributes = info.attributes + $0.state = info.state.toLKType() // Attempt to get millisecond precision. if info.joinedAtMs != 0 { @@ -259,29 +275,29 @@ public class Participant: NSObject, @unchecked Sendable, ObservableObject, Logga return true } -} -// MARK: - Simplified API - -public extension Participant { - func isCameraEnabled() -> Bool { + public func isCameraEnabled() -> Bool { !(getTrackPublication(source: .camera)?.isMuted ?? true) } - func isMicrophoneEnabled() -> Bool { + public func isMicrophoneEnabled() -> Bool { !(getTrackPublication(source: .microphone)?.isMuted ?? true) } - func isScreenShareEnabled() -> Bool { + public func isScreenShareEnabled() -> Bool { !(getTrackPublication(source: .screenShareVideo)?.isMuted ?? true) } +} - internal func getTrackPublication(name: String) -> TrackPublication? { +// MARK: - Simplified API + +extension Participant { + func getTrackPublication(name: String) -> TrackPublication? { _state.trackPublications.values.first(where: { $0.name == name }) } /// find the first publication matching `source` or any compatible. - internal func getTrackPublication(source: Track.Source) -> TrackPublication? { + func getTrackPublication(source: Track.Source) -> TrackPublication? { // if source is unknown return nil guard source != .unknown else { return nil } // try to find a Publication with matching source diff --git a/Sources/LiveKit/Protocols/ParticipantDelegate.swift b/Sources/LiveKit/Protocols/ParticipantDelegate.swift index 9a705f2d8..77285a46f 100644 --- a/Sources/LiveKit/Protocols/ParticipantDelegate.swift +++ b/Sources/LiveKit/Protocols/ParticipantDelegate.swift @@ -43,6 +43,11 @@ public protocol ParticipantDelegate: AnyObject, Sendable { @objc optional func participant(_ participant: Participant, didUpdateIsSpeaking isSpeaking: Bool) + /// The state of a ``Participant`` has updated. + /// `participant` Can be a ``LocalParticipant`` or a ``RemoteParticipant``. + @objc optional + func participant(_ participant: Participant, didUpdateState state: ParticipantState) + /// The connection quality of a ``Participant`` has updated. /// `participant` Can be a ``LocalParticipant`` or a ``RemoteParticipant``. @objc optional diff --git a/Sources/LiveKit/Protocols/RoomDelegate.swift b/Sources/LiveKit/Protocols/RoomDelegate.swift index 76505a34e..157cbb279 100644 --- a/Sources/LiveKit/Protocols/RoomDelegate.swift +++ b/Sources/LiveKit/Protocols/RoomDelegate.swift @@ -97,6 +97,10 @@ public protocol RoomDelegate: AnyObject, Sendable { @objc optional func room(_ room: Room, participant: Participant, didUpdateName name: String) + /// ``Participant/state`` has updated. + @objc optional + func room(_ room: Room, participant: Participant, didUpdateState state: ParticipantState) + /// ``Participant/connectionQuality`` has updated. @objc optional func room(_ room: Room, participant: Participant, didUpdateConnectionQuality quality: ConnectionQuality) @@ -216,11 +220,6 @@ public protocol RoomDelegate: AnyObject, Sendable { @objc(room:participant:didUpdateMetadata_:) optional func room(_ room: Room, participant: Participant, didUpdate metadata: String?) - // Renamed to ``RoomDelegate/room(_:participant:didUpdateName:)``. - // @available(*, unavailable, renamed: "room(_:participant:didUpdateName:)") - // @objc(room:participant:didUpdateName_:) optional - // func room(_ room: Room, participant: Participant, didUpdateName: String) - /// Renamed to ``RoomDelegate/room(_:participant:didUpdateConnectionQuality:)``. @available(*, unavailable, renamed: "room(_:participant:didUpdateConnectionQuality:)") @objc(room:participant:didUpdateConnectionQuality_:) optional diff --git a/Sources/LiveKit/Protos/livekit_models.pb.swift b/Sources/LiveKit/Protos/livekit_models.pb.swift index 9fa18fa56..02103a27f 100644 --- a/Sources/LiveKit/Protos/livekit_models.pb.swift +++ b/Sources/LiveKit/Protos/livekit_models.pb.swift @@ -611,6 +611,9 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { case tfEchoCancellation // = 3 case tfNoiseSuppression // = 4 case tfEnhancedNoiseCancellation // = 5 + + /// client will buffer audio once available and send it to the server via bytes stream once connected + case tfPreconnectBuffer // = 6 case UNRECOGNIZED(Int) init() { @@ -625,6 +628,7 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { case 3: self = .tfEchoCancellation case 4: self = .tfNoiseSuppression case 5: self = .tfEnhancedNoiseCancellation + case 6: self = .tfPreconnectBuffer default: self = .UNRECOGNIZED(rawValue) } } @@ -637,6 +641,7 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { case .tfEchoCancellation: return 3 case .tfNoiseSuppression: return 4 case .tfEnhancedNoiseCancellation: return 5 + case .tfPreconnectBuffer: return 6 case .UNRECOGNIZED(let i): return i } } @@ -649,6 +654,7 @@ enum Livekit_AudioTrackFeature: SwiftProtobuf.Enum, Swift.CaseIterable { .tfEchoCancellation, .tfNoiseSuppression, .tfEnhancedNoiseCancellation, + .tfPreconnectBuffer, ] } @@ -2798,6 +2804,7 @@ extension Livekit_AudioTrackFeature: SwiftProtobuf._ProtoNameProviding { 3: .same(proto: "TF_ECHO_CANCELLATION"), 4: .same(proto: "TF_NOISE_SUPPRESSION"), 5: .same(proto: "TF_ENHANCED_NOISE_CANCELLATION"), + 6: .same(proto: "TF_PRECONNECT_BUFFER"), ] } diff --git a/Sources/LiveKit/Protos/livekit_rtc.pb.swift b/Sources/LiveKit/Protos/livekit_rtc.pb.swift index c98e47ab0..741d74a2c 100644 --- a/Sources/LiveKit/Protos/livekit_rtc.pb.swift +++ b/Sources/LiveKit/Protos/livekit_rtc.pb.swift @@ -618,54 +618,114 @@ struct Livekit_SimulcastCodec: Sendable { init() {} } -struct Livekit_AddTrackRequest: Sendable { +struct Livekit_AddTrackRequest: @unchecked Sendable { // SwiftProtobuf.Message conformance is added in an extension below. See the // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. /// client ID of track, to match it when RTC track is received - var cid: String = String() + var cid: String { + get {return _storage._cid} + set {_uniqueStorage()._cid = newValue} + } - var name: String = String() + var name: String { + get {return _storage._name} + set {_uniqueStorage()._name = newValue} + } - var type: Livekit_TrackType = .audio + var type: Livekit_TrackType { + get {return _storage._type} + set {_uniqueStorage()._type = newValue} + } /// to be deprecated in favor of layers - var width: UInt32 = 0 + var width: UInt32 { + get {return _storage._width} + set {_uniqueStorage()._width = newValue} + } - var height: UInt32 = 0 + var height: UInt32 { + get {return _storage._height} + set {_uniqueStorage()._height = newValue} + } /// true to add track and initialize to muted - var muted: Bool = false + var muted: Bool { + get {return _storage._muted} + set {_uniqueStorage()._muted = newValue} + } /// true if DTX (Discontinuous Transmission) is disabled for audio - var disableDtx: Bool = false + /// + /// NOTE: This field was marked as deprecated in the .proto file. + var disableDtx: Bool { + get {return _storage._disableDtx} + set {_uniqueStorage()._disableDtx = newValue} + } - var source: Livekit_TrackSource = .unknown + var source: Livekit_TrackSource { + get {return _storage._source} + set {_uniqueStorage()._source = newValue} + } - var layers: [Livekit_VideoLayer] = [] + var layers: [Livekit_VideoLayer] { + get {return _storage._layers} + set {_uniqueStorage()._layers = newValue} + } - var simulcastCodecs: [Livekit_SimulcastCodec] = [] + var simulcastCodecs: [Livekit_SimulcastCodec] { + get {return _storage._simulcastCodecs} + set {_uniqueStorage()._simulcastCodecs = newValue} + } /// server ID of track, publish new codec to exist track - var sid: String = String() + var sid: String { + get {return _storage._sid} + set {_uniqueStorage()._sid = newValue} + } - var stereo: Bool = false + /// deprecated in favor of audio_features + /// + /// NOTE: This field was marked as deprecated in the .proto file. + var stereo: Bool { + get {return _storage._stereo} + set {_uniqueStorage()._stereo = newValue} + } /// true if RED (Redundant Encoding) is disabled for audio - var disableRed: Bool = false + var disableRed: Bool { + get {return _storage._disableRed} + set {_uniqueStorage()._disableRed = newValue} + } - var encryption: Livekit_Encryption.TypeEnum = .none + var encryption: Livekit_Encryption.TypeEnum { + get {return _storage._encryption} + set {_uniqueStorage()._encryption = newValue} + } /// which stream the track belongs to, used to group tracks together. /// if not specified, server will infer it from track source to bundle camera/microphone, screenshare/audio together - var stream: String = String() + var stream: String { + get {return _storage._stream} + set {_uniqueStorage()._stream = newValue} + } + + var backupCodecPolicy: Livekit_BackupCodecPolicy { + get {return _storage._backupCodecPolicy} + set {_uniqueStorage()._backupCodecPolicy = newValue} + } - var backupCodecPolicy: Livekit_BackupCodecPolicy = .preferRegression + var audioFeatures: [Livekit_AudioTrackFeature] { + get {return _storage._audioFeatures} + set {_uniqueStorage()._audioFeatures = newValue} + } var unknownFields = SwiftProtobuf.UnknownStorage() init() {} + + fileprivate var _storage = _StorageClass.defaultInstance } struct Livekit_TrickleRequest: Sendable { @@ -2446,104 +2506,182 @@ extension Livekit_AddTrackRequest: SwiftProtobuf.Message, SwiftProtobuf._Message 14: .same(proto: "encryption"), 15: .same(proto: "stream"), 16: .standard(proto: "backup_codec_policy"), + 17: .standard(proto: "audio_features"), ] + fileprivate class _StorageClass { + var _cid: String = String() + var _name: String = String() + var _type: Livekit_TrackType = .audio + var _width: UInt32 = 0 + var _height: UInt32 = 0 + var _muted: Bool = false + var _disableDtx: Bool = false + var _source: Livekit_TrackSource = .unknown + var _layers: [Livekit_VideoLayer] = [] + var _simulcastCodecs: [Livekit_SimulcastCodec] = [] + var _sid: String = String() + var _stereo: Bool = false + var _disableRed: Bool = false + var _encryption: Livekit_Encryption.TypeEnum = .none + var _stream: String = String() + var _backupCodecPolicy: Livekit_BackupCodecPolicy = .preferRegression + var _audioFeatures: [Livekit_AudioTrackFeature] = [] + + #if swift(>=5.10) + // This property is used as the initial default value for new instances of the type. + // The type itself is protecting the reference to its storage via CoW semantics. + // This will force a copy to be made of this reference when the first mutation occurs; + // hence, it is safe to mark this as `nonisolated(unsafe)`. + static nonisolated(unsafe) let defaultInstance = _StorageClass() + #else + static let defaultInstance = _StorageClass() + #endif + + private init() {} + + init(copying source: _StorageClass) { + _cid = source._cid + _name = source._name + _type = source._type + _width = source._width + _height = source._height + _muted = source._muted + _disableDtx = source._disableDtx + _source = source._source + _layers = source._layers + _simulcastCodecs = source._simulcastCodecs + _sid = source._sid + _stereo = source._stereo + _disableRed = source._disableRed + _encryption = source._encryption + _stream = source._stream + _backupCodecPolicy = source._backupCodecPolicy + _audioFeatures = source._audioFeatures + } + } + + fileprivate mutating func _uniqueStorage() -> _StorageClass { + if !isKnownUniquelyReferenced(&_storage) { + _storage = _StorageClass(copying: _storage) + } + return _storage + } + mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularStringField(value: &self.cid) }() - case 2: try { try decoder.decodeSingularStringField(value: &self.name) }() - case 3: try { try decoder.decodeSingularEnumField(value: &self.type) }() - case 4: try { try decoder.decodeSingularUInt32Field(value: &self.width) }() - case 5: try { try decoder.decodeSingularUInt32Field(value: &self.height) }() - case 6: try { try decoder.decodeSingularBoolField(value: &self.muted) }() - case 7: try { try decoder.decodeSingularBoolField(value: &self.disableDtx) }() - case 8: try { try decoder.decodeSingularEnumField(value: &self.source) }() - case 9: try { try decoder.decodeRepeatedMessageField(value: &self.layers) }() - case 10: try { try decoder.decodeRepeatedMessageField(value: &self.simulcastCodecs) }() - case 11: try { try decoder.decodeSingularStringField(value: &self.sid) }() - case 12: try { try decoder.decodeSingularBoolField(value: &self.stereo) }() - case 13: try { try decoder.decodeSingularBoolField(value: &self.disableRed) }() - case 14: try { try decoder.decodeSingularEnumField(value: &self.encryption) }() - case 15: try { try decoder.decodeSingularStringField(value: &self.stream) }() - case 16: try { try decoder.decodeSingularEnumField(value: &self.backupCodecPolicy) }() - default: break + _ = _uniqueStorage() + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularStringField(value: &_storage._cid) }() + case 2: try { try decoder.decodeSingularStringField(value: &_storage._name) }() + case 3: try { try decoder.decodeSingularEnumField(value: &_storage._type) }() + case 4: try { try decoder.decodeSingularUInt32Field(value: &_storage._width) }() + case 5: try { try decoder.decodeSingularUInt32Field(value: &_storage._height) }() + case 6: try { try decoder.decodeSingularBoolField(value: &_storage._muted) }() + case 7: try { try decoder.decodeSingularBoolField(value: &_storage._disableDtx) }() + case 8: try { try decoder.decodeSingularEnumField(value: &_storage._source) }() + case 9: try { try decoder.decodeRepeatedMessageField(value: &_storage._layers) }() + case 10: try { try decoder.decodeRepeatedMessageField(value: &_storage._simulcastCodecs) }() + case 11: try { try decoder.decodeSingularStringField(value: &_storage._sid) }() + case 12: try { try decoder.decodeSingularBoolField(value: &_storage._stereo) }() + case 13: try { try decoder.decodeSingularBoolField(value: &_storage._disableRed) }() + case 14: try { try decoder.decodeSingularEnumField(value: &_storage._encryption) }() + case 15: try { try decoder.decodeSingularStringField(value: &_storage._stream) }() + case 16: try { try decoder.decodeSingularEnumField(value: &_storage._backupCodecPolicy) }() + case 17: try { try decoder.decodeRepeatedEnumField(value: &_storage._audioFeatures) }() + default: break + } } } } func traverse(visitor: inout V) throws { - if !self.cid.isEmpty { - try visitor.visitSingularStringField(value: self.cid, fieldNumber: 1) - } - if !self.name.isEmpty { - try visitor.visitSingularStringField(value: self.name, fieldNumber: 2) - } - if self.type != .audio { - try visitor.visitSingularEnumField(value: self.type, fieldNumber: 3) - } - if self.width != 0 { - try visitor.visitSingularUInt32Field(value: self.width, fieldNumber: 4) - } - if self.height != 0 { - try visitor.visitSingularUInt32Field(value: self.height, fieldNumber: 5) - } - if self.muted != false { - try visitor.visitSingularBoolField(value: self.muted, fieldNumber: 6) - } - if self.disableDtx != false { - try visitor.visitSingularBoolField(value: self.disableDtx, fieldNumber: 7) - } - if self.source != .unknown { - try visitor.visitSingularEnumField(value: self.source, fieldNumber: 8) - } - if !self.layers.isEmpty { - try visitor.visitRepeatedMessageField(value: self.layers, fieldNumber: 9) - } - if !self.simulcastCodecs.isEmpty { - try visitor.visitRepeatedMessageField(value: self.simulcastCodecs, fieldNumber: 10) - } - if !self.sid.isEmpty { - try visitor.visitSingularStringField(value: self.sid, fieldNumber: 11) - } - if self.stereo != false { - try visitor.visitSingularBoolField(value: self.stereo, fieldNumber: 12) - } - if self.disableRed != false { - try visitor.visitSingularBoolField(value: self.disableRed, fieldNumber: 13) - } - if self.encryption != .none { - try visitor.visitSingularEnumField(value: self.encryption, fieldNumber: 14) - } - if !self.stream.isEmpty { - try visitor.visitSingularStringField(value: self.stream, fieldNumber: 15) - } - if self.backupCodecPolicy != .preferRegression { - try visitor.visitSingularEnumField(value: self.backupCodecPolicy, fieldNumber: 16) + try withExtendedLifetime(_storage) { (_storage: _StorageClass) in + if !_storage._cid.isEmpty { + try visitor.visitSingularStringField(value: _storage._cid, fieldNumber: 1) + } + if !_storage._name.isEmpty { + try visitor.visitSingularStringField(value: _storage._name, fieldNumber: 2) + } + if _storage._type != .audio { + try visitor.visitSingularEnumField(value: _storage._type, fieldNumber: 3) + } + if _storage._width != 0 { + try visitor.visitSingularUInt32Field(value: _storage._width, fieldNumber: 4) + } + if _storage._height != 0 { + try visitor.visitSingularUInt32Field(value: _storage._height, fieldNumber: 5) + } + if _storage._muted != false { + try visitor.visitSingularBoolField(value: _storage._muted, fieldNumber: 6) + } + if _storage._disableDtx != false { + try visitor.visitSingularBoolField(value: _storage._disableDtx, fieldNumber: 7) + } + if _storage._source != .unknown { + try visitor.visitSingularEnumField(value: _storage._source, fieldNumber: 8) + } + if !_storage._layers.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._layers, fieldNumber: 9) + } + if !_storage._simulcastCodecs.isEmpty { + try visitor.visitRepeatedMessageField(value: _storage._simulcastCodecs, fieldNumber: 10) + } + if !_storage._sid.isEmpty { + try visitor.visitSingularStringField(value: _storage._sid, fieldNumber: 11) + } + if _storage._stereo != false { + try visitor.visitSingularBoolField(value: _storage._stereo, fieldNumber: 12) + } + if _storage._disableRed != false { + try visitor.visitSingularBoolField(value: _storage._disableRed, fieldNumber: 13) + } + if _storage._encryption != .none { + try visitor.visitSingularEnumField(value: _storage._encryption, fieldNumber: 14) + } + if !_storage._stream.isEmpty { + try visitor.visitSingularStringField(value: _storage._stream, fieldNumber: 15) + } + if _storage._backupCodecPolicy != .preferRegression { + try visitor.visitSingularEnumField(value: _storage._backupCodecPolicy, fieldNumber: 16) + } + if !_storage._audioFeatures.isEmpty { + try visitor.visitPackedEnumField(value: _storage._audioFeatures, fieldNumber: 17) + } } try unknownFields.traverse(visitor: &visitor) } static func ==(lhs: Livekit_AddTrackRequest, rhs: Livekit_AddTrackRequest) -> Bool { - if lhs.cid != rhs.cid {return false} - if lhs.name != rhs.name {return false} - if lhs.type != rhs.type {return false} - if lhs.width != rhs.width {return false} - if lhs.height != rhs.height {return false} - if lhs.muted != rhs.muted {return false} - if lhs.disableDtx != rhs.disableDtx {return false} - if lhs.source != rhs.source {return false} - if lhs.layers != rhs.layers {return false} - if lhs.simulcastCodecs != rhs.simulcastCodecs {return false} - if lhs.sid != rhs.sid {return false} - if lhs.stereo != rhs.stereo {return false} - if lhs.disableRed != rhs.disableRed {return false} - if lhs.encryption != rhs.encryption {return false} - if lhs.stream != rhs.stream {return false} - if lhs.backupCodecPolicy != rhs.backupCodecPolicy {return false} + if lhs._storage !== rhs._storage { + let storagesAreEqual: Bool = withExtendedLifetime((lhs._storage, rhs._storage)) { (_args: (_StorageClass, _StorageClass)) in + let _storage = _args.0 + let rhs_storage = _args.1 + if _storage._cid != rhs_storage._cid {return false} + if _storage._name != rhs_storage._name {return false} + if _storage._type != rhs_storage._type {return false} + if _storage._width != rhs_storage._width {return false} + if _storage._height != rhs_storage._height {return false} + if _storage._muted != rhs_storage._muted {return false} + if _storage._disableDtx != rhs_storage._disableDtx {return false} + if _storage._source != rhs_storage._source {return false} + if _storage._layers != rhs_storage._layers {return false} + if _storage._simulcastCodecs != rhs_storage._simulcastCodecs {return false} + if _storage._sid != rhs_storage._sid {return false} + if _storage._stereo != rhs_storage._stereo {return false} + if _storage._disableRed != rhs_storage._disableRed {return false} + if _storage._encryption != rhs_storage._encryption {return false} + if _storage._stream != rhs_storage._stream {return false} + if _storage._backupCodecPolicy != rhs_storage._backupCodecPolicy {return false} + if _storage._audioFeatures != rhs_storage._audioFeatures {return false} + return true + } + if !storagesAreEqual {return false} + } if lhs.unknownFields != rhs.unknownFields {return false} return true } diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index cd5ac1d67..5e002e9a6 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -20,7 +20,7 @@ import Foundation /// A class that captures audio from a local track and streams it as a data stream /// in a selected format that can be sent to other participants via ``ByteStreamWriter``. @objc -public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { +public final class LocalAudioTrackRecorder: NSObject, Sendable, AudioRenderer { public typealias Stream = AsyncStream /// The local audio track to capture audio from. @@ -43,7 +43,11 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { @objc public let maxSize: Int - private let _state = StateSync(State()) + var isRecording: Bool { + state.continuation != nil + } + + private let state = StateSync(State()) private struct State { var continuation: Stream.Continuation? } @@ -67,23 +71,21 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { /// - Returns: A stream of audio data. /// - Throws: An error if the audio track cannot be started. public func start() async throws -> Stream { - guard _state.continuation == nil else { - throw LiveKitError(.invalidState, message: "Cannot start the recorder multiple times.") - } + stop() try await track.startCapture() track.add(audioRenderer: self) let buffer: Stream.Continuation.BufferingPolicy = maxSize > 0 ? .bufferingNewest(maxSize) : .unbounded let stream = Stream(bufferingPolicy: buffer) { continuation in - self._state.mutate { + self.state.mutate { $0.continuation = continuation } } - _state.continuation?.onTermination = { @Sendable (_: Stream.Continuation.Termination) in + state.continuation?.onTermination = { @Sendable (_: Stream.Continuation.Termination) in self.track.remove(audioRenderer: self) - self._state.mutate { + self.state.mutate { $0.continuation = nil } } @@ -94,7 +96,13 @@ public final class LocalAudioTrackRecorder: NSObject, AudioRenderer { /// Stops capturing audio from the local track. @objc public func stop() { - _state.continuation?.finish() + state.continuation?.finish() + } + + func duration(_ dataSize: Int) -> TimeInterval { + let totalSamples = dataSize / format.bytesPerSample + let samplesPerChannel = totalSamples / channels + return Double(samplesPerChannel) / Double(sampleRate) } } @@ -107,7 +115,9 @@ public extension LocalAudioTrackRecorder { .convert(toCommonFormat: format)? .toData() { - _state.continuation?.yield(data) + state.continuation?.yield(data) + } else { + assertionFailure("Failed to convert PCM buffer to data") } } } diff --git a/Sources/LiveKit/TrackPublications/TrackPublication.swift b/Sources/LiveKit/TrackPublications/TrackPublication.swift index 31980b480..13eca7c4d 100644 --- a/Sources/LiveKit/TrackPublications/TrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/TrackPublication.swift @@ -107,9 +107,8 @@ public class TrackPublication: NSObject, @unchecked Sendable, ObservableObject, dimensions: info.type == .video ? Dimensions(width: Int32(info.width), height: Int32(info.height)) : nil, isMetadataMuted: info.muted, encryptionType: info.encryption.toLKType(), - - // store the whole info - latestInfo: info + latestInfo: info, + audioTrackFeatures: Set(info.audioFeatures) )) self.participant = participant diff --git a/Sources/LiveKit/Types/Options/AudioPublishOptions.swift b/Sources/LiveKit/Types/Options/AudioPublishOptions.swift index 85f0a32f6..2d487689e 100644 --- a/Sources/LiveKit/Types/Options/AudioPublishOptions.swift +++ b/Sources/LiveKit/Types/Options/AudioPublishOptions.swift @@ -34,17 +34,22 @@ public final class AudioPublishOptions: NSObject, TrackPublishOptions, Sendable @objc public let streamName: String? + @objc + public let preConnect: Bool + public init(name: String? = nil, encoding: AudioEncoding? = nil, dtx: Bool = true, red: Bool = true, - streamName: String? = nil) + streamName: String? = nil, + preConnect: Bool = false) { self.name = name self.encoding = encoding self.dtx = dtx self.red = red self.streamName = streamName + self.preConnect = preConnect } // MARK: - Equal @@ -55,7 +60,8 @@ public final class AudioPublishOptions: NSObject, TrackPublishOptions, Sendable encoding == other.encoding && dtx == other.dtx && red == other.red && - streamName == other.streamName + streamName == other.streamName && + preConnect == other.preConnect } override public var hash: Int { @@ -65,6 +71,7 @@ public final class AudioPublishOptions: NSObject, TrackPublishOptions, Sendable hasher.combine(dtx) hasher.combine(red) hasher.combine(streamName) + hasher.combine(preConnect) return hasher.finalize() } } @@ -74,6 +81,20 @@ extension AudioPublishOptions { func toFeatures() -> Set { Set([ !dtx ? .tfNoDtx : nil, + preConnect ? .tfPreconnectBuffer : nil, ].compactMap { $0 }) } } + +extension AudioPublishOptions { + func withPreconnect(_ enabled: Bool) -> AudioPublishOptions { + AudioPublishOptions( + name: name, + encoding: encoding, + dtx: dtx, + red: red, + streamName: streamName, + preConnect: enabled + ) + } +} diff --git a/Sources/LiveKit/Types/ParticipantState.swift b/Sources/LiveKit/Types/ParticipantState.swift new file mode 100644 index 000000000..f3b0c93df --- /dev/null +++ b/Sources/LiveKit/Types/ParticipantState.swift @@ -0,0 +1,93 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Describes the state of a ``Participant``'s connection to the LiveKit server. +@objc +public enum ParticipantState: Int, Sendable, CaseIterable { + /// Websocket is connected, but no offer has been sent yet + case joining = 0 + + /// Server has received the client's offer + case joined = 1 + + /// ICE connectivity has been established + case active = 2 + + /// Websocket has disconnected + case disconnected = 3 + + /// Unknown state + case unknown = 999 +} + +// MARK: - Conversions from/to protobuf types + +extension ParticipantState { + init(from protoState: Livekit_ParticipantInfo.State) { + switch protoState { + case .joining: + self = .joining + case .joined: + self = .joined + case .active: + self = .active + case .disconnected: + self = .disconnected + case .UNRECOGNIZED: + self = .unknown + } + } + + var protoState: Livekit_ParticipantInfo.State { + switch self { + case .joining: + return .joining + case .joined: + return .joined + case .active: + return .active + case .disconnected: + return .disconnected + case .unknown: + return .joining // Default to joining for unknown state + } + } +} + +extension Livekit_ParticipantInfo.State { + func toLKType() -> ParticipantState { + ParticipantState(from: self) + } +} + +extension ParticipantState: CustomStringConvertible { + public var description: String { + switch self { + case .joining: + return "joining" + case .joined: + return "joined" + case .active: + return "active" + case .disconnected: + return "disconnected" + case .unknown: + return "unknown" + } + } +} diff --git a/Tests/LiveKitTests/PreConnectAudioBufferTests.swift b/Tests/LiveKitTests/PreConnectAudioBufferTests.swift index fbdaa298a..79cebee85 100644 --- a/Tests/LiveKitTests/PreConnectAudioBufferTests.swift +++ b/Tests/LiveKitTests/PreConnectAudioBufferTests.swift @@ -18,41 +18,7 @@ import XCTest class PreConnectAudioBufferTests: LKTestCase { - func testRoomDidConnectSetsParticipantAttribute() async throws { - let attributeSetExpectation = expectation(description: "Participant attribute set") - - class AttributeDelegate: RoomDelegate, @unchecked Sendable { - let expectation: XCTestExpectation - var attributeValue: String? - - init(expectation: XCTestExpectation) { - self.expectation = expectation - } - - func room(_: Room, participant: Participant, didUpdateAttributes _: [String: String]) { - if let value = participant.attributes[PreConnectAudioBuffer.attributeKey] { - attributeValue = value - expectation.fulfill() - } - } - } - - let delegate = AttributeDelegate(expectation: attributeSetExpectation) - - try await withRooms([RoomTestingOptions(delegate: delegate)]) { rooms in - let room = rooms[0] - let buffer = PreConnectAudioBuffer(room: room) - - buffer.roomDidConnect(room) - - await self.fulfillment(of: [attributeSetExpectation], timeout: 5) - - XCTAssertEqual(delegate.attributeValue, "true") - XCTAssertEqual(room.localParticipant.attributes[PreConnectAudioBuffer.attributeKey], "true") - } - } - - func testRemoteDidSubscribeTrackSendsAudioData() async throws { + func testParticipantActiveStateSendsAudioData() async throws { let receiveExpectation = expectation(description: "Receives audio data") try await withRooms([RoomTestingOptions(canSubscribe: true), RoomTestingOptions(canPublish: true, canPublishData: true)]) { rooms in @@ -75,8 +41,8 @@ class PreConnectAudioBufferTests: LKTestCase { try await buffer.startRecording() try await Task.sleep(nanoseconds: NSEC_PER_SEC / 2) - let publication = LocalTrackPublication(info: Livekit_TrackInfo(), participant: rooms[0].localParticipant) - buffer.room(publisherRoom, participant: publisherRoom.localParticipant, remoteDidSubscribeTrack: publication) + subscriberRoom.localParticipant._state.mutate { $0.kind = .agent } // override kind + buffer.room(publisherRoom, participant: subscriberRoom.localParticipant, didUpdateState: .active) await self.fulfillment(of: [receiveExpectation], timeout: 10) }