Skip to content

Commit

Permalink
Cleanup media streams when finished (#411)
Browse files Browse the repository at this point in the history
* [WIP] Cleanup stream

* .

* .

* revert previous idea

* Handle dropped broadcaster

* fmt

* remove unused

* remove useless log

* handle based watching
  • Loading branch information
keepingitneil authored Sep 3, 2024
1 parent c1be3f2 commit 09503f4
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
20 changes: 12 additions & 8 deletions livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct FfiAudioStream {
pub stream_type: proto::AudioStreamType,

#[allow(dead_code)]
close_tx: oneshot::Sender<()>, // Close the stream on drop
self_dropped_tx: oneshot::Sender<()>, // Close the stream on drop
}

impl FfiHandle for FfiAudioStream {}
Expand All @@ -44,18 +44,18 @@ impl FfiAudioStream {
) -> FfiResult<proto::OwnedAudioStream> {
let ffi_track = server.retrieve_handle::<FfiTrack>(new_stream.track_handle)?.clone();
let rtc_track = ffi_track.track.rtc_track();
let (self_dropped_tx, self_dropped_rx) = oneshot::channel();

let MediaStreamTrack::Audio(rtc_track) = rtc_track else {
return Err(FfiError::InvalidRequest("not an audio track".into()));
};

let (close_tx, close_rx) = oneshot::channel();
let stream_type = new_stream.r#type();
let handle_id = server.next_id();
let audio_stream = match stream_type {
#[cfg(not(target_arch = "wasm32"))]
proto::AudioStreamType::AudioStreamNative => {
let audio_stream = Self { handle_id, stream_type, close_tx };
let audio_stream = Self { handle_id, stream_type, self_dropped_tx };

let sample_rate =
if new_stream.sample_rate == 0 { 48000 } else { new_stream.sample_rate as i32 };
Expand All @@ -69,7 +69,8 @@ impl FfiAudioStream {
server,
handle_id,
native_stream,
close_rx,
self_dropped_rx,
server.watch_handle_dropped(new_stream.track_handle),
));
server.watch_panic(handle);
Ok::<FfiAudioStream, FfiError>(audio_stream)
Expand All @@ -91,11 +92,15 @@ impl FfiAudioStream {
server: &'static server::FfiServer,
stream_handle_id: FfiHandleId,
mut native_stream: NativeAudioStream,
mut close_rx: oneshot::Receiver<()>,
mut self_dropped_rx: oneshot::Receiver<()>,
mut handle_dropped_rx: oneshot::Receiver<()>,
) {
loop {
tokio::select! {
_ = &mut close_rx => {
_ = &mut self_dropped_rx => {
break;
}
_ = &mut handle_dropped_rx => {
break;
}
frame = native_stream.next() => {
Expand Down Expand Up @@ -126,14 +131,13 @@ impl FfiAudioStream {
}
}
}

if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent(
proto::AudioStreamEvent {
stream_handle: stream_handle_id,
message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})),
},
)) {
log::warn!("failed to send audio EOS: {}", err);
log::warn!("failed to send audio eos: {}", err);
}
}
}
24 changes: 22 additions & 2 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::{
collections::HashMap,
error::Error,
sync::{
atomic::{AtomicU64, Ordering},
Expand All @@ -26,7 +27,10 @@ use dashmap::{mapref::one::MappedRef, DashMap};
use downcast_rs::{impl_downcast, Downcast};
use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*};
use parking_lot::{deadlock, Mutex};
use tokio::task::JoinHandle;
use tokio::{
sync::{broadcast, oneshot},
task::JoinHandle,
};

use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE};

Expand Down Expand Up @@ -74,6 +78,7 @@ pub struct FfiServer {
next_id: AtomicU64,
config: Mutex<Option<FfiConfig>>,
logger: &'static logger::FfiLogger,
handle_dropped_txs: DashMap<FfiHandleId, Vec<oneshot::Sender<()>>>,
}

impl Default for FfiServer {
Expand Down Expand Up @@ -111,6 +116,7 @@ impl Default for FfiServer {
async_runtime,
config: Default::default(),
logger,
handle_dropped_txs: Default::default(),
}
}
}
Expand Down Expand Up @@ -142,6 +148,7 @@ impl FfiServer {

// Drop all handles
self.ffi_handles.clear();
self.handle_dropped_txs.clear();
*self.config.lock() = None; // Invalidate the config
}

Expand Down Expand Up @@ -192,7 +199,20 @@ impl FfiServer {
}

pub fn drop_handle(&self, id: FfiHandleId) -> bool {
self.ffi_handles.remove(&id).is_some()
let existed = self.ffi_handles.remove(&id).is_some();
self.handle_dropped_txs.remove(&id);
return existed;
}

pub fn watch_handle_dropped(&self, handle: FfiHandleId) -> oneshot::Receiver<()> {
// Create vec if not exists
if self.handle_dropped_txs.get(&handle).is_none() {
self.handle_dropped_txs.insert(handle, Vec::new());
}
let (tx, rx) = oneshot::channel::<()>();
let mut tx_vec = self.handle_dropped_txs.get_mut(&handle).unwrap();
tx_vec.push(tx);
return rx;
}

pub fn send_panic(&self, err: Box<dyn Error>) {
Expand Down
14 changes: 13 additions & 1 deletion livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::{collections::HashSet, slice, sync::Arc, time::Duration};

use livekit::participant;
use livekit::prelude::*;
use livekit::{participant, track};
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -70,6 +71,8 @@ pub struct RoomInner {
pending_published_tracks: Mutex<HashSet<TrackSid>>,
// Used to wait for the LocalTrackUnpublished event
pending_unpublished_tracks: Mutex<HashSet<TrackSid>>,

track_handle_lookup: Arc<Mutex<HashMap<TrackSid, FfiHandleId>>>,
}

struct Handle {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl FfiRoom {
dtmf_tx,
pending_published_tracks: Default::default(),
pending_unpublished_tracks: Default::default(),
track_handle_lookup: Default::default(),
});

let (local_info, remote_infos) =
Expand Down Expand Up @@ -815,10 +819,12 @@ async fn forward_event(
}
RoomEvent::TrackSubscribed { track, publication: _, participant } => {
let handle_id = server.next_id();
let track_sid = track.sid();
let ffi_track = FfiTrack { handle: handle_id, track: track.into() };

let track_info = proto::TrackInfo::from(&ffi_track);
server.store_handle(ffi_track.handle, ffi_track);
inner.track_handle_lookup.lock().insert(track_sid, handle_id);

let _ =
send_event(proto::room_event::Message::TrackSubscribed(proto::TrackSubscribed {
Expand All @@ -830,6 +836,12 @@ async fn forward_event(
}));
}
RoomEvent::TrackUnsubscribed { track, publication: _, participant } => {
let track_sid = track.sid();
if let Some(handle) = inner.track_handle_lookup.lock().remove(&track_sid) {
server.drop_handle(handle);
} else {
log::warn!("track {} was not found in the lookup table", track_sid);
}
let _ = send_event(proto::room_event::Message::TrackUnsubscribed(
proto::TrackUnsubscribed {
participant_identity: participant.identity().to_string(),
Expand Down
18 changes: 11 additions & 7 deletions livekit-ffi/src/server/video_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct FfiVideoStream {
pub stream_type: proto::VideoStreamType,

#[allow(dead_code)]
close_tx: oneshot::Sender<()>, // Close the stream on drop
self_dropped_tx: oneshot::Sender<()>, // Close the stream on drop
}

impl FfiHandle for FfiVideoStream {}
Expand All @@ -49,20 +49,21 @@ impl FfiVideoStream {
return Err(FfiError::InvalidRequest("not a video track".into()));
};

let (close_tx, close_rx) = oneshot::channel();
let (self_dropped_tx, self_dropped_rx) = oneshot::channel();
let stream_type = new_stream.r#type();
let handle_id = server.next_id();
let stream = match stream_type {
#[cfg(not(target_arch = "wasm32"))]
proto::VideoStreamType::VideoStreamNative => {
let video_stream = Self { handle_id, close_tx, stream_type };
let video_stream = Self { handle_id, self_dropped_tx, stream_type };
let handle = server.async_runtime.spawn(Self::native_video_stream_task(
server,
handle_id,
new_stream.format.and_then(|_| Some(new_stream.format())),
new_stream.normalize_stride,
NativeVideoStream::new(rtc_track),
close_rx,
self_dropped_rx,
server.watch_handle_dropped(new_stream.track_handle),
));
server.watch_panic(handle);
Ok::<FfiVideoStream, FfiError>(video_stream)
Expand All @@ -86,19 +87,22 @@ impl FfiVideoStream {
dst_type: Option<proto::VideoBufferType>,
normalize_stride: bool,
mut native_stream: NativeVideoStream,
mut close_rx: oneshot::Receiver<()>,
mut self_dropped_rx: oneshot::Receiver<()>,
mut handle_dropped_rx: oneshot::Receiver<()>,
) {
loop {
tokio::select! {
_ = &mut close_rx => {
_ = &mut self_dropped_rx => {
break;
}
_ = &mut handle_dropped_rx => {
break;
}
frame = native_stream.next() => {
let Some(frame) = frame else {
break;
};


let Ok((buffer, info)) = colorcvt::to_video_buffer_info(frame.buffer, dst_type, normalize_stride) else {
log::error!("video stream failed to convert video frame to {:?}", dst_type);
continue;
Expand Down

0 comments on commit 09503f4

Please sign in to comment.