Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rtc stats #218

Merged
merged 14 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions examples/wgpu_room/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ impl LkApp {
let _ = self.service.send(AsyncCmd::ToggleSine);
}
});

ui.menu_button("Debug", |ui| {
if ui.button("Refresh stats").clicked() {
// TODO
}
});
});
}

Expand Down
2 changes: 2 additions & 0 deletions libwebrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ repository = "https://github.com/livekit/client-sdk-rust"
[dependencies]
livekit-protocol = { path = "../livekit-protocol", version = "0.2.0" }
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

[target.'cfg(target_os = "android")'.dependencies]
Expand Down
10 changes: 6 additions & 4 deletions libwebrtc/src/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::{imp::data_channel as dc_imp, rtp_parameters::Priority};
use serde::Deserialize;
use std::{fmt::Debug, str::Utf8Error};
use thiserror::Error;

Expand Down Expand Up @@ -49,8 +50,9 @@ pub enum DataChannelError {
Utf8(#[from] Utf8Error),
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DataState {
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DataChannelState {
Connecting,
Open,
Closing,
Expand All @@ -63,7 +65,7 @@ pub struct DataBuffer<'a> {
pub binary: bool,
}

pub type OnStateChange = Box<dyn FnMut(DataState) + Send + Sync>;
pub type OnStateChange = Box<dyn FnMut(DataChannelState) + Send + Sync>;
pub type OnMessage = Box<dyn FnMut(DataBuffer) + Send + Sync>;
pub type OnBufferedAmountChange = Box<dyn FnMut(u64) + Send + Sync>;

Expand All @@ -85,7 +87,7 @@ impl DataChannel {
self.handle.label()
}

pub fn state(&self) -> DataState {
pub fn state(&self) -> DataChannelState {
self.handle.state()
}

Expand Down
1 change: 1 addition & 0 deletions libwebrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod rtp_receiver;
pub mod rtp_sender;
pub mod rtp_transceiver;
pub mod session_description;
pub mod stats;
pub mod video_frame;
pub mod video_source;
pub mod video_stream;
Expand Down
8 changes: 4 additions & 4 deletions libwebrtc/src/native/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
// limitations under the License.

use crate::data_channel::{
DataBuffer, DataChannelError, DataChannelInit, DataState, OnBufferedAmountChange, OnMessage,
OnStateChange,
DataBuffer, DataChannelError, DataChannelInit, DataChannelState, OnBufferedAmountChange,
OnMessage, OnStateChange,
};
use cxx::SharedPtr;
use parking_lot::Mutex;
use std::str;
use std::sync::Arc;
use webrtc_sys::data_channel as sys_dc;

impl From<sys_dc::ffi::DataState> for DataState {
impl From<sys_dc::ffi::DataState> for DataChannelState {
fn from(value: sys_dc::ffi::DataState) -> Self {
match value {
sys_dc::ffi::DataState::Connecting => Self::Connecting,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl DataChannel {
self.sys_handle.label()
}

pub fn state(&self) -> DataState {
pub fn state(&self) -> DataChannelState {
self.sys_handle.state().into()
}

Expand Down
37 changes: 30 additions & 7 deletions libwebrtc/src/native/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::rtp_receiver::RtpReceiver;
use crate::rtp_sender::RtpSender;
use crate::rtp_transceiver::RtpTransceiver;
use crate::rtp_transceiver::RtpTransceiverInit;
use crate::stats::RtcStats;
use crate::MediaType;
use crate::RtcErrorType;
use crate::{session_description::SessionDescription, RtcError};
Expand All @@ -49,6 +50,7 @@ use tokio::sync::oneshot;
use webrtc_sys::data_channel as sys_dc;
use webrtc_sys::jsep as sys_jsep;
use webrtc_sys::peer_connection as sys_pc;
use webrtc_sys::peer_connection_factory as sys_pcf;
use webrtc_sys::rtc_error as sys_err;

impl From<OfferOptions> for sys_pc::ffi::RtcOfferAnswerOptions {
Expand Down Expand Up @@ -203,7 +205,7 @@ impl PeerConnection {
options: OfferOptions,
) -> Result<SessionDescription, RtcError> {
let (tx, mut rx) = mpsc::channel::<Result<SessionDescription, RtcError>>(1);
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
type CtxType = mpsc::Sender<Result<SessionDescription, RtcError>>;

self.sys_handle.create_offer(
Expand All @@ -229,7 +231,7 @@ impl PeerConnection {
options: AnswerOptions,
) -> Result<SessionDescription, RtcError> {
let (tx, mut rx) = mpsc::channel::<Result<SessionDescription, RtcError>>(1);
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
type CtxType = mpsc::Sender<Result<SessionDescription, RtcError>>;

self.sys_handle.create_answer(
Expand All @@ -252,7 +254,7 @@ impl PeerConnection {

pub async fn set_local_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle
.set_local_description(desc.handle.sys_handle, ctx, |ctx, err| {
Expand All @@ -273,7 +275,7 @@ impl PeerConnection {

pub async fn set_remote_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle
.set_remote_description(desc.handle.sys_handle, ctx, |ctx, err| {
Expand All @@ -297,7 +299,7 @@ impl PeerConnection {

pub async fn add_ice_candidate(&self, candidate: IceCandidate) -> Result<(), RtcError> {
let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle
.add_ice_candidate(candidate.handle.sys_handle, ctx, |ctx, err| {
Expand Down Expand Up @@ -440,6 +442,27 @@ impl PeerConnection {
.map_err(|e| unsafe { sys_err::ffi::RtcError::from(e.what()).into() })
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle.get_stats(ctx, |ctx, stats| {
let tx = ctx
.0
.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>()
.unwrap();

// Unwrap because it should not happens
let vec = serde_json::from_str(&stats).unwrap();
let _ = tx.send(Ok(vec));
});

rx.await.map_err(|_| RtcError {
error_type: RtcErrorType::Internal,
message: "get_stats cancelled".to_owned(),
})?
}

pub fn senders(&self) -> Vec<RtpSender> {
self.sys_handle
.get_senders()
Expand Down Expand Up @@ -526,7 +549,7 @@ pub struct PeerObserver {
pub track_handler: Mutex<Option<OnTrack>>,
}

impl sys_pc::PeerConnectionObserver for PeerObserver {
impl sys_pcf::PeerConnectionObserver for PeerObserver {
fn on_signaling_change(&self, new_state: sys_pc::ffi::SignalingState) {
if let Some(f) = self.signaling_change_handler.lock().as_mut() {
f(new_state.into());
Expand Down Expand Up @@ -612,7 +635,7 @@ impl sys_pc::PeerConnectionObserver for PeerObserver {

fn on_ice_selected_candidate_pair_changed(
&self,
_event: sys_pc::ffi::CandidatePairChangeEvent,
_event: sys_pcf::ffi::CandidatePairChangeEvent,
) {
}

Expand Down
14 changes: 6 additions & 8 deletions libwebrtc/src/native/peer_connection_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use cxx::UniquePtr;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use std::sync::Arc;
use webrtc_sys::peer_connection as sys_pc;
use webrtc_sys::peer_connection_factory as sys_pcf;
use webrtc_sys::rtc_error as sys_err;
use webrtc_sys::webrtc as sys_rtc;
Expand Down Expand Up @@ -78,13 +77,12 @@ impl PeerConnectionFactory {
config: RtcConfiguration,
) -> Result<PeerConnection, RtcError> {
let observer = Arc::new(imp_pc::PeerObserver::default());
let native_observer = sys_pc::ffi::create_native_peer_connection_observer(Box::new(
sys_pc::PeerConnectionObserverWrapper::new(observer.clone()),
));

let res = self
.sys_handle
.create_peer_connection(config.into(), native_observer);
let res = self.sys_handle.create_peer_connection(
config.into(),
Box::new(sys_pcf::PeerConnectionObserverWrapper::new(
observer.clone(),
)),
);

match res {
Ok(sys_handle) => Ok(PeerConnection {
Expand Down
24 changes: 24 additions & 0 deletions libwebrtc/src/native/rtp_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use crate::imp::media_stream_track::new_media_stream_track;
use crate::media_stream_track::MediaStreamTrack;
use crate::rtp_parameters::RtpParameters;
use crate::stats::RtcStats;
use crate::{RtcError, RtcErrorType};
use cxx::SharedPtr;
use tokio::sync::oneshot;
use webrtc_sys::rtp_receiver as sys_rr;

#[derive(Clone)]
Expand All @@ -33,6 +36,27 @@ impl RtpReceiver {
Some(new_media_stream_track(track_handle))
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
let ctx = Box::new(sys_rr::ReceiverContext(Box::new(tx)));

self.sys_handle.get_stats(ctx, |ctx, stats| {
let tx = ctx
.0
.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>()
.unwrap();

// Unwrap because it should not happens
let vec = serde_json::from_str(&stats).unwrap();
let _ = tx.send(Ok(vec));
});

rx.await.map_err(|_| RtcError {
error_type: RtcErrorType::Internal,
message: "get_stats cancelled".to_owned(),
})?
}

pub fn parameters(&self) -> RtpParameters {
self.sys_handle.get_parameters().into()
}
Expand Down
23 changes: 23 additions & 0 deletions libwebrtc/src/native/rtp_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

use super::media_stream_track::new_media_stream_track;
use crate::media_stream_track::MediaStreamTrack;
use crate::stats::RtcStats;
use crate::{rtp_parameters::RtpParameters, RtcError, RtcErrorType};
use cxx::SharedPtr;
use tokio::sync::oneshot;
use webrtc_sys::rtc_error as sys_err;
use webrtc_sys::rtp_sender as sys_rs;

Expand All @@ -34,6 +36,27 @@ impl RtpSender {
Some(new_media_stream_track(track_handle))
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
let ctx = Box::new(sys_rs::SenderContext(Box::new(tx)));

self.sys_handle.get_stats(ctx, |ctx, stats| {
let tx = ctx
.0
.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>()
.unwrap();

// Unwrap because it should not happens
let vec = serde_json::from_str(&stats).unwrap();
let _ = tx.send(Ok(vec));
});

rx.await.map_err(|_| RtcError {
error_type: RtcErrorType::Internal,
message: "get_stats cancelled".to_owned(),
})?
}

pub fn set_track(&self, track: Option<MediaStreamTrack>) -> Result<(), RtcError> {
if !self
.sys_handle
Expand Down
6 changes: 6 additions & 0 deletions libwebrtc/src/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::rtp_receiver::RtpReceiver;
use crate::rtp_sender::RtpSender;
use crate::rtp_transceiver::{RtpTransceiver, RtpTransceiverInit};
use crate::session_description::SessionDescription;
use crate::stats::RtcStats;
use crate::{MediaType, RtcError};
use std::fmt::Debug;

Expand Down Expand Up @@ -157,6 +158,10 @@ impl PeerConnection {
self.handle.remove_track(sender)
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
self.handle.get_stats().await
}

pub fn add_transceiver(
&self,
track: MediaStreamTrack,
Expand All @@ -172,6 +177,7 @@ impl PeerConnection {
) -> Result<RtpTransceiver, RtcError> {
self.handle.add_transceiver_for_media(media_type, init)
}

pub fn close(&self) {
self.handle.close()
}
Expand Down
2 changes: 1 addition & 1 deletion libwebrtc/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use crate::audio_frame::AudioFrame;
pub use crate::audio_source::{AudioSourceOptions, RtcAudioSource};
pub use crate::audio_track::RtcAudioTrack;
pub use crate::data_channel::{
DataBuffer, DataChannel, DataChannelError, DataChannelInit, DataState,
DataBuffer, DataChannel, DataChannelError, DataChannelInit, DataChannelState,
};
pub use crate::ice_candidate::IceCandidate;
pub use crate::media_stream::MediaStream;
Expand Down
6 changes: 5 additions & 1 deletion libwebrtc/src/rtp_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::Debug;

use crate::{
imp::rtp_receiver as imp_rr, media_stream_track::MediaStreamTrack,
rtp_parameters::RtpParameters,
rtp_parameters::RtpParameters, stats::RtcStats, RtcError,
};

#[derive(Clone)]
Expand All @@ -29,6 +29,10 @@ impl RtpReceiver {
self.handle.track()
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
self.handle.get_stats().await
}

pub fn parameters(&self) -> RtpParameters {
self.handle.parameters()
}
Expand Down
Loading
Loading