diff --git a/libwebrtc/src/audio_source.rs b/libwebrtc/src/audio_source.rs index 4c9f08769..b430f4fb0 100644 --- a/libwebrtc/src/audio_source.rs +++ b/libwebrtc/src/audio_source.rs @@ -21,7 +21,6 @@ pub struct AudioSourceOptions { pub echo_cancellation: bool, pub noise_suppression: bool, pub auto_gain_control: bool, - pub enable_queue: bool, } #[non_exhaustive] @@ -64,8 +63,16 @@ pub mod native { options: AudioSourceOptions, sample_rate: u32, num_channels: u32, + enable_queue: Option, ) -> NativeAudioSource { - Self { handle: imp_as::NativeAudioSource::new(options, sample_rate, num_channels) } + Self { + handle: imp_as::NativeAudioSource::new( + options, + sample_rate, + num_channels, + enable_queue, + ), + } } pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> { @@ -87,5 +94,9 @@ pub mod native { pub fn num_channels(&self) -> u32 { self.handle.num_channels() } + + pub fn enable_queue(&self) -> bool { + self.handle.enable_queue() + } } } diff --git a/libwebrtc/src/native/audio_source.rs b/libwebrtc/src/native/audio_source.rs index 53dee22a6..273853b10 100644 --- a/libwebrtc/src/native/audio_source.rs +++ b/libwebrtc/src/native/audio_source.rs @@ -33,6 +33,9 @@ pub struct NativeAudioSource { sample_rate: u32, num_channels: u32, samples_10ms: usize, + // whether to queue audio frames or send them immediately + // defaults to true + enable_queue: bool, po_tx: mpsc::Sender>, } @@ -49,6 +52,7 @@ impl NativeAudioSource { options: AudioSourceOptions, sample_rate: u32, num_channels: u32, + enable_queue: Option, ) -> NativeAudioSource { let samples_10ms = (sample_rate / 100 * num_channels) as usize; let (po_tx, mut po_rx) = mpsc::channel(BUFFER_SIZE_MS / 10); @@ -62,6 +66,7 @@ impl NativeAudioSource { sample_rate, num_channels, samples_10ms, + enable_queue: enable_queue.unwrap_or(true), po_tx, }; @@ -71,9 +76,12 @@ impl NativeAudioSource { let mut interval = interval(Duration::from_millis(10)); interval.set_missed_tick_behavior(livekit_runtime::MissedTickBehavior::Delay); let blank_data = vec![0; samples_10ms]; + let enable_queue = source.enable_queue; loop { - interval.tick().await; + if enable_queue { + interval.tick().await; + } let frame = po_rx.try_recv(); if let Err(TryRecvError::Disconnected) = frame { @@ -81,12 +89,14 @@ impl NativeAudioSource { } if let Err(TryRecvError::Empty) = frame { - source.sys_handle.on_captured_frame( - &blank_data, - sample_rate, - num_channels, - blank_data.len() / num_channels as usize, - ); + if enable_queue { + source.sys_handle.on_captured_frame( + &blank_data, + sample_rate, + num_channels, + blank_data.len() / num_channels as usize, + ); + } continue; } @@ -124,6 +134,10 @@ impl NativeAudioSource { self.num_channels } + pub fn enable_queue(&self) -> bool { + self.enable_queue + } + pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> { if self.sample_rate != frame.sample_rate || self.num_channels != frame.num_channels { return Err(RtcError { @@ -134,8 +148,6 @@ impl NativeAudioSource { let mut inner = self.inner.lock().await; let mut samples = 0; - let enable_queue = self.sys_handle.audio_options().enable_queue; - // split frames into 10ms chunks loop { let remaining_samples = frame.data.len() - samples; @@ -154,16 +166,7 @@ impl NativeAudioSource { if inner.len == self.samples_10ms { let data = inner.buf.clone().to_vec(); - if enable_queue { - let _ = self.po_tx.send(data).await; - } else { - self.sys_handle.on_captured_frame( - &data, - frame.sample_rate, - frame.num_channels, - data.len() / frame.num_channels as usize, - ); - } + let _ = self.po_tx.send(data).await; inner.len = 0; } continue; @@ -172,16 +175,7 @@ impl NativeAudioSource { if remaining_samples >= self.samples_10ms { // TODO(theomonnom): avoid copying let data = frame.data[samples..samples + self.samples_10ms].to_vec(); - if enable_queue { - let _ = self.po_tx.send(data).await; - } else { - self.sys_handle.on_captured_frame( - &data, - frame.sample_rate, - frame.num_channels, - data.len() / frame.num_channels as usize, - ); - } + let _ = self.po_tx.send(data).await; samples += self.samples_10ms; } } @@ -196,7 +190,6 @@ impl From for AudioSourceOptions { echo_cancellation: options.echo_cancellation, noise_suppression: options.noise_suppression, auto_gain_control: options.auto_gain_control, - enable_queue: options.enable_queue, } } } @@ -207,7 +200,6 @@ impl From for sys_at::ffi::AudioSourceOptions { echo_cancellation: options.echo_cancellation, noise_suppression: options.noise_suppression, auto_gain_control: options.auto_gain_control, - enable_queue: options.enable_queue, } } } diff --git a/livekit-ffi/protocol/audio_frame.proto b/livekit-ffi/protocol/audio_frame.proto index 191d0e259..dd7be86a2 100644 --- a/livekit-ffi/protocol/audio_frame.proto +++ b/livekit-ffi/protocol/audio_frame.proto @@ -33,6 +33,7 @@ message NewAudioSourceRequest { optional AudioSourceOptions options = 2; uint32 sample_rate = 3; uint32 num_channels = 4; + optional bool enable_queue = 5; } message NewAudioSourceResponse { OwnedAudioSource source = 1; } @@ -124,7 +125,6 @@ message AudioSourceOptions { bool echo_cancellation = 1; bool noise_suppression = 2; bool auto_gain_control = 3; - bool enable_queue = 4; } enum AudioSourceType { diff --git a/livekit-ffi/src/conversion/audio_frame.rs b/livekit-ffi/src/conversion/audio_frame.rs index 7110de833..c25ff415f 100644 --- a/livekit-ffi/src/conversion/audio_frame.rs +++ b/livekit-ffi/src/conversion/audio_frame.rs @@ -25,7 +25,6 @@ impl From for AudioSourceOptions { echo_cancellation: opts.echo_cancellation, auto_gain_control: opts.auto_gain_control, noise_suppression: opts.noise_suppression, - enable_queue: opts.enable_queue, } } } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 1048ed14d..cd63562cb 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1,4 +1,5 @@ // @generated +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FrameCryptor { @@ -2718,6 +2719,8 @@ pub struct NewAudioSourceRequest { pub sample_rate: u32, #[prost(uint32, tag="4")] pub num_channels: u32, + #[prost(bool, optional, tag="5")] + pub enable_queue: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2860,8 +2863,6 @@ pub struct AudioSourceOptions { pub noise_suppression: bool, #[prost(bool, tag="3")] pub auto_gain_control: bool, - #[prost(bool, tag="4")] - pub enable_queue: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/livekit-ffi/src/server/audio_source.rs b/livekit-ffi/src/server/audio_source.rs index 845e15bcc..ba0f21f7a 100644 --- a/livekit-ffi/src/server/audio_source.rs +++ b/livekit-ffi/src/server/audio_source.rs @@ -43,6 +43,7 @@ impl FfiAudioSource { new_source.options.map(Into::into).unwrap_or_default(), new_source.sample_rate, new_source.num_channels, + new_source.enable_queue, ); RtcAudioSource::Native(audio_source) } diff --git a/webrtc-sys/src/audio_track.rs b/webrtc-sys/src/audio_track.rs index 8e6a315a2..ee32e475d 100644 --- a/webrtc-sys/src/audio_track.rs +++ b/webrtc-sys/src/audio_track.rs @@ -23,7 +23,6 @@ pub mod ffi { pub echo_cancellation: bool, pub noise_suppression: bool, pub auto_gain_control: bool, - pub enable_queue: bool, } extern "C++" {