Skip to content

Commit 09f7da1

Browse files
authored
optimize AudioSource captures (#425)
1 parent 346cc22 commit 09f7da1

18 files changed

+370
-216
lines changed

examples/Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/wgpu_room/src/sine_track.rs

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,7 @@ pub struct SineParameters {
1717

1818
impl Default for SineParameters {
1919
fn default() -> Self {
20-
Self {
21-
sample_rate: 48000,
22-
freq: 440.0,
23-
amplitude: 1.0,
24-
num_channels: 2,
25-
}
20+
Self { sample_rate: 48000, freq: 440.0, amplitude: 1.0, num_channels: 2 }
2621
}
2722
}
2823

@@ -46,7 +41,7 @@ impl SineTrack {
4641
AudioSourceOptions::default(),
4742
params.sample_rate,
4843
params.num_channels,
49-
None,
44+
1000,
5045
),
5146
params,
5247
room,
@@ -65,28 +60,18 @@ impl SineTrack {
6560
RtcAudioSource::Native(self.rtc_source.clone()),
6661
);
6762

68-
let task = tokio::spawn(Self::track_task(
69-
close_rx,
70-
self.rtc_source.clone(),
71-
self.params.clone(),
72-
));
63+
let task =
64+
tokio::spawn(Self::track_task(close_rx, self.rtc_source.clone(), self.params.clone()));
7365

7466
self.room
7567
.local_participant()
7668
.publish_track(
7769
LocalTrack::Audio(track.clone()),
78-
TrackPublishOptions {
79-
source: TrackSource::Microphone,
80-
..Default::default()
81-
},
70+
TrackPublishOptions { source: TrackSource::Microphone, ..Default::default() },
8271
)
8372
.await?;
8473

85-
let handle = TrackHandle {
86-
close_tx,
87-
track,
88-
task,
89-
};
74+
let handle = TrackHandle { close_tx, track, task };
9075

9176
self.handle = Some(handle);
9277
Ok(())
@@ -96,10 +81,7 @@ impl SineTrack {
9681
if let Some(handle) = self.handle.take() {
9782
handle.close_tx.send(()).ok();
9883
handle.task.await.ok();
99-
self.room
100-
.local_participant()
101-
.unpublish_track(&handle.track.sid())
102-
.await?;
84+
self.room.local_participant().unpublish_track(&handle.track.sid()).await?;
10385
}
10486

10587
Ok(())

libwebrtc/src/audio_source.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,22 @@ pub mod native {
6363
options: AudioSourceOptions,
6464
sample_rate: u32,
6565
num_channels: u32,
66-
enable_queue: Option<bool>,
66+
queue_size_ms: u32,
6767
) -> NativeAudioSource {
6868
Self {
6969
handle: imp_as::NativeAudioSource::new(
7070
options,
7171
sample_rate,
7272
num_channels,
73-
enable_queue,
73+
queue_size_ms,
7474
),
7575
}
7676
}
7777

78+
pub fn clear_buffer(&self) {
79+
self.handle.clear_buffer()
80+
}
81+
7882
pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
7983
self.handle.capture_frame(frame).await
8084
}
@@ -94,9 +98,5 @@ pub mod native {
9498
pub fn num_channels(&self) -> u32 {
9599
self.handle.num_channels()
96100
}
97-
98-
pub fn enable_queue(&self) -> bool {
99-
self.handle.enable_queue()
100-
}
101101
}
102102
}

libwebrtc/src/native/audio_source.rs

Lines changed: 47 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -12,106 +12,43 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{sync::Arc, time::Duration};
16-
1715
use cxx::SharedPtr;
18-
use livekit_runtime::interval;
19-
use tokio::sync::{
20-
mpsc::{self, error::TryRecvError},
21-
Mutex as AsyncMutex,
22-
};
16+
use tokio::sync::oneshot;
2317
use webrtc_sys::audio_track as sys_at;
2418

2519
use crate::{audio_frame::AudioFrame, audio_source::AudioSourceOptions, RtcError, RtcErrorType};
2620

27-
const BUFFER_SIZE_MS: usize = 50;
28-
2921
#[derive(Clone)]
3022
pub struct NativeAudioSource {
3123
sys_handle: SharedPtr<sys_at::ffi::AudioTrackSource>,
32-
inner: Arc<AsyncMutex<AudioSourceInner>>,
3324
sample_rate: u32,
3425
num_channels: u32,
35-
samples_10ms: usize,
36-
// whether to queue audio frames or send them immediately
37-
// defaults to true
38-
enable_queue: bool,
39-
po_tx: mpsc::Sender<Vec<i16>>,
40-
}
41-
42-
struct AudioSourceInner {
43-
buf: Box<[i16]>,
44-
45-
// Amount of data from the previous frame that hasn't been sent to the libwebrtc source
46-
// (because it requires 10ms of data)
47-
len: usize,
26+
queue_size_samples: u32,
4827
}
4928

5029
impl NativeAudioSource {
5130
pub fn new(
5231
options: AudioSourceOptions,
5332
sample_rate: u32,
5433
num_channels: u32,
55-
enable_queue: Option<bool>,
34+
queue_size_ms: u32,
5635
) -> NativeAudioSource {
57-
let samples_10ms = (sample_rate / 100 * num_channels) as usize;
58-
let (po_tx, mut po_rx) = mpsc::channel(BUFFER_SIZE_MS / 10);
59-
60-
let source = Self {
61-
sys_handle: sys_at::ffi::new_audio_track_source(options.into()),
62-
inner: Arc::new(AsyncMutex::new(AudioSourceInner {
63-
buf: vec![0; samples_10ms].into_boxed_slice(),
64-
len: 0,
65-
})),
66-
sample_rate,
67-
num_channels,
68-
samples_10ms,
69-
enable_queue: enable_queue.unwrap_or(true),
70-
po_tx,
71-
};
72-
73-
livekit_runtime::spawn({
74-
let source = source.clone();
75-
async move {
76-
let mut interval = interval(Duration::from_millis(10));
77-
interval.set_missed_tick_behavior(livekit_runtime::MissedTickBehavior::Delay);
78-
let blank_data = vec![0; samples_10ms];
79-
let enable_queue = source.enable_queue;
80-
81-
loop {
82-
if enable_queue {
83-
interval.tick().await;
84-
}
85-
86-
let frame = po_rx.try_recv();
87-
if let Err(TryRecvError::Disconnected) = frame {
88-
break;
89-
}
90-
91-
if let Err(TryRecvError::Empty) = frame {
92-
if enable_queue {
93-
source.sys_handle.on_captured_frame(
94-
&blank_data,
95-
sample_rate,
96-
num_channels,
97-
blank_data.len() / num_channels as usize,
98-
);
99-
}
100-
continue;
101-
}
102-
103-
let frame = frame.unwrap();
104-
source.sys_handle.on_captured_frame(
105-
&frame,
106-
sample_rate,
107-
num_channels,
108-
frame.len() / num_channels as usize,
109-
);
110-
}
111-
}
112-
});
113-
114-
source
36+
assert!(queue_size_ms % 10 == 0, "queue_size_ms must be a multiple of 10");
37+
38+
print!(
39+
"new audio source {} {} {} {}",
40+
sample_rate, num_channels, queue_size_ms, options.echo_cancellation
41+
);
42+
43+
let sys_handle = sys_at::ffi::new_audio_track_source(
44+
options.into(),
45+
sample_rate.try_into().unwrap(),
46+
num_channels.try_into().unwrap(),
47+
queue_size_ms.try_into().unwrap(),
48+
);
49+
50+
let queue_size_samples = (queue_size_ms * sample_rate / 1000) * num_channels;
51+
Self { sys_handle, sample_rate, num_channels, queue_size_samples }
11552
}
11653

11754
pub fn sys_handle(&self) -> SharedPtr<sys_at::ffi::AudioTrackSource> {
@@ -134,8 +71,8 @@ impl NativeAudioSource {
13471
self.num_channels
13572
}
13673

137-
pub fn enable_queue(&self) -> bool {
138-
self.enable_queue
74+
pub fn clear_buffer(&self) {
75+
self.sys_handle.clear_buffer();
13976
}
14077

14178
pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
@@ -146,38 +83,36 @@ impl NativeAudioSource {
14683
});
14784
}
14885

149-
let mut inner = self.inner.lock().await;
150-
let mut samples = 0;
151-
// split frames into 10ms chunks
152-
loop {
153-
let remaining_samples = frame.data.len() - samples;
154-
if remaining_samples == 0 {
155-
break;
156-
}
86+
extern "C" fn lk_audio_source_complete(userdata: *const sys_at::SourceContext) {
87+
let tx = unsafe { Box::from_raw(userdata as *mut oneshot::Sender<()>) };
88+
let _ = tx.send(());
89+
}
15790

158-
if (inner.len != 0 && remaining_samples > 0) || remaining_samples < self.samples_10ms {
159-
let missing_len = self.samples_10ms - inner.len;
160-
let to_add = missing_len.min(remaining_samples);
161-
let start = inner.len;
162-
inner.buf[start..start + to_add]
163-
.copy_from_slice(&frame.data[samples..samples + to_add]);
164-
inner.len += to_add;
165-
samples += to_add;
166-
167-
if inner.len == self.samples_10ms {
168-
let data = inner.buf.clone().to_vec();
169-
let _ = self.po_tx.send(data).await;
170-
inner.len = 0;
91+
// iterate over chunks of self._queue_size_samples
92+
for chunk in frame.data.chunks(self.queue_size_samples as usize) {
93+
let nb_frames = chunk.len() / self.num_channels as usize;
94+
let (tx, rx) = oneshot::channel::<()>();
95+
let ctx = Box::new(tx);
96+
let ctx_ptr = Box::into_raw(ctx) as *const sys_at::SourceContext;
97+
98+
unsafe {
99+
if !self.sys_handle.capture_frame(
100+
chunk,
101+
self.sample_rate,
102+
self.num_channels,
103+
nb_frames,
104+
ctx_ptr,
105+
sys_at::CompleteCallback(lk_audio_source_complete),
106+
) {
107+
return Err(RtcError {
108+
error_type: RtcErrorType::InvalidState,
109+
message: "failed to capture frame".to_owned(),
110+
});
171111
}
172-
continue;
173112
}
174113

175-
if remaining_samples >= self.samples_10ms {
176-
// TODO(theomonnom): avoid copying
177-
let data = frame.data[samples..samples + self.samples_10ms].to_vec();
178-
let _ = self.po_tx.send(data).await;
179-
samples += self.samples_10ms;
180-
}
114+
let _ = rx.await;
115+
println!("captured frame");
181116
}
182117

183118
Ok(())

livekit-ffi/protocol/audio_frame.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ message NewAudioSourceRequest {
4646
optional AudioSourceOptions options = 2;
4747
uint32 sample_rate = 3;
4848
uint32 num_channels = 4;
49-
optional bool enable_queue = 5;
49+
uint32 queue_size_ms = 5;
5050
}
5151
message NewAudioSourceResponse { OwnedAudioSource source = 1; }
5252

@@ -64,6 +64,11 @@ message CaptureAudioFrameCallback {
6464
optional string error = 2;
6565
}
6666

67+
message ClearAudioBufferRequest {
68+
uint64 source_handle = 1;
69+
}
70+
message ClearAudioBufferResponse {}
71+
6772
// Create a new AudioResampler
6873
message NewAudioResamplerRequest {}
6974
message NewAudioResamplerResponse {

0 commit comments

Comments
 (0)