From ad6dcc51bc3e44a0f8d6af6b47d8fae7ffa12c92 Mon Sep 17 00:00:00 2001 From: Dario A Lencina-Talarico Date: Sat, 18 Nov 2023 08:54:30 -0500 Subject: [PATCH] Revert "Factor out connection, peer management, and packet handling into `VideoCallClient` (#137)" (#148) This reverts commit bb27fb4589a90c2f748b0cf7f73c5d42a45df9df. --- yew-ui/src/components/attendants.rs | 311 +++++++++++++--- yew-ui/src/components/host.rs | 38 +- yew-ui/src/model/client/mod.rs | 3 - yew-ui/src/model/client/video_call_client.rs | 335 ------------------ yew-ui/src/model/connection/connection.rs | 3 +- yew-ui/src/model/connection/task.rs | 1 - yew-ui/src/model/connection/webtransport.rs | 28 +- .../decode/hash_map_with_ordered_keys.rs | 77 ---- yew-ui/src/model/decode/mod.rs | 3 +- .../src/model/decode/peer_decode_manager.rs | 168 ++++----- yew-ui/src/model/decode/peer_decoder.rs | 3 +- .../model/decode/video_decoder_with_buffer.rs | 8 +- .../src/model/decode/video_decoder_wrapper.rs | 3 +- yew-ui/src/model/encode/camera_encoder.rs | 31 +- yew-ui/src/model/encode/microphone_encoder.rs | 22 +- yew-ui/src/model/encode/screen_encoder.rs | 22 +- yew-ui/src/model/mod.rs | 5 +- 17 files changed, 421 insertions(+), 640 deletions(-) delete mode 100644 yew-ui/src/model/client/mod.rs delete mode 100644 yew-ui/src/model/client/video_call_client.rs delete mode 100644 yew-ui/src/model/decode/hash_map_with_ordered_keys.rs diff --git a/yew-ui/src/components/attendants.rs b/yew-ui/src/components/attendants.rs index 9080845a..ef9c68c0 100644 --- a/yew-ui/src/components/attendants.rs +++ b/yew-ui/src/components/attendants.rs @@ -1,11 +1,25 @@ +use anyhow::{anyhow, Result}; +use std::collections::HashMap; +use std::rc::Rc; +use std::sync::Arc; + use super::icons::push_pin::PushPinIcon; use crate::constants::{USERS_ALLOWED_TO_STREAM, WEBTRANSPORT_HOST}; -use crate::model::client::{VideoCallClient, VideoCallClientOptions}; +use crate::crypto::aes::Aes128State; +use crate::crypto::rsa::RsaWrapper; +use crate::model::connection::{ConnectOptions, Connection}; +use crate::model::decode::PeerDecodeManager; use crate::model::media_devices::MediaDeviceAccess; use crate::{components::host::Host, constants::ACTIX_WEBSOCKET}; -use log::{error, warn}; -use std::rc::Rc; +use log::{debug, error, info, warn}; +use protobuf::Message; +use rsa::pkcs8::{DecodePublicKey, EncodePublicKey}; +use rsa::RsaPublicKey; +use types::protos::aes_packet::AesPacket; use types::protos::media_packet::media_packet::MediaType; +use types::protos::packet_wrapper::packet_wrapper::PacketType; +use types::protos::packet_wrapper::PacketWrapper; +use types::protos::rsa_packet::RsaPacket; use wasm_bindgen::JsCast; use wasm_bindgen::JsValue; use web_sys::*; @@ -15,7 +29,7 @@ use yew::{html, Component, Context, Html}; #[derive(Debug)] pub enum WsAction { - Connect, + Connect(bool), Connected, Lost(Option), RequestMediaPermissions, @@ -34,6 +48,8 @@ pub enum MeetingAction { pub enum Msg { WsAction(WsAction), MeetingAction(MeetingAction), + OnInboundMedia(PacketWrapper), + OnOutboundPacket(PacketWrapper), OnPeerAdded(String), OnFirstFrame((String, MediaType)), } @@ -64,46 +80,79 @@ pub struct AttendantsComponentProps { } pub struct AttendantsComponent { - pub client: VideoCallClient, + pub connection: Option, + pub peer_decode_manager: PeerDecodeManager, pub media_device_access: MediaDeviceAccess, + pub outbound_audio_buffer: [u8; 2000], pub share_screen: bool, + pub e2ee_enabled: bool, + pub webtransport_enabled: bool, pub mic_enabled: bool, pub video_enabled: bool, pub error: Option, + pub peer_keys: HashMap, + aes: Arc, + rsa: Arc, } impl AttendantsComponent { - fn create_video_call_client(ctx: &Context) -> VideoCallClient { + fn is_connected(&self) -> bool { + match &self.connection { + Some(connection) => connection.is_connected(), + None => false, + } + } + + fn send_public_key(&self, ctx: &Context) { + if !self.e2ee_enabled { + return; + } let email = ctx.props().email.clone(); - let id = ctx.props().id.clone(); - let opts = VideoCallClientOptions { - userid: email.clone(), - websocket_url: format!("{ACTIX_WEBSOCKET}/{email}/{id}"), - webtransport_url: format!("{WEBTRANSPORT_HOST}/{email}/{id}"), - enable_e2ee: ctx.props().e2ee_enabled, - enable_webtransport: ctx.props().webtransport_enabled, - on_connected: { - let link = ctx.link().clone(); - Callback::from(move |_| link.send_message(Msg::from(WsAction::Connected))) - }, - on_connection_lost: { - let link = ctx.link().clone(); - Callback::from(move |_| link.send_message(Msg::from(WsAction::Lost(None)))) - }, - on_peer_added: { - let link = ctx.link().clone(); - Callback::from(move |email| link.send_message(Msg::OnPeerAdded(email))) - }, - on_peer_first_frame: { - let link = ctx.link().clone(); - Callback::from(move |(email, media_type)| { - link.send_message(Msg::OnFirstFrame((email, media_type))) - }) - }, - get_peer_video_canvas_id: Callback::from(|email| email), - get_peer_screen_canvas_id: Callback::from(|email| format!("screen-share-{}", &email)), + let rsa = &*self.rsa; + match rsa.pub_key.to_public_key_der() { + Ok(public_key_der) => { + let packet = RsaPacket { + username: email.clone(), + public_key_der: public_key_der.to_vec(), + ..Default::default() + }; + match packet.write_to_bytes() { + Ok(data) => { + ctx.link() + .send_message(Msg::OnOutboundPacket(PacketWrapper { + packet_type: PacketType::RSA_PUB_KEY.into(), + email, + data, + ..Default::default() + })); + } + Err(e) => { + error!("Failed to serialize rsa packet: {}", e.to_string()) + } + }; + } + Err(e) => { + error!("Failed to export rsa public key to der: {}", e.to_string()) + } + } + } + + fn create_peer_decoder_manager(ctx: &Context) -> PeerDecodeManager { + let mut peer_decode_manager = PeerDecodeManager::new(); + peer_decode_manager.on_peer_added = { + let link = ctx.link().clone(); + Callback::from(move |email| link.send_message(Msg::OnPeerAdded(email))) + }; + peer_decode_manager.on_first_frame = { + let link = ctx.link().clone(); + Callback::from(move |(email, media_type)| { + link.send_message(Msg::OnFirstFrame((email, media_type))) + }) }; - VideoCallClient::new(opts) + peer_decode_manager.get_video_canvas_id = Callback::from(|email| email); + peer_decode_manager.get_screen_canvas_id = + Callback::from(|email| format!("screen-share-{}", &email)); + peer_decode_manager } fn create_media_device_access(ctx: &Context) -> MediaDeviceAccess { @@ -120,6 +169,22 @@ impl AttendantsComponent { }; media_device_access } + + fn serialize_aes_packet(&self) -> Result> { + AesPacket { + key: self.aes.key.to_vec(), + iv: self.aes.iv.to_vec(), + ..Default::default() + } + .write_to_bytes() + .map_err(|e| anyhow!("Failed to serialize aes packet: {}", e.to_string())) + } + + fn encrypt_aes_packet(&self, aes_packet: &[u8], pub_key: &RsaPublicKey) -> Result> { + self.rsa + .encrypt_with_key(aes_packet, pub_key) + .map_err(|e| anyhow!("Failed to encrypt aes packet: {}", e.to_string())) + } } impl Component for AttendantsComponent { @@ -128,12 +193,19 @@ impl Component for AttendantsComponent { fn create(ctx: &Context) -> Self { Self { - client: Self::create_video_call_client(ctx), + connection: None, + peer_decode_manager: Self::create_peer_decoder_manager(ctx), media_device_access: Self::create_media_device_access(ctx), + outbound_audio_buffer: [0; 2000], share_screen: false, mic_enabled: false, video_enabled: false, + e2ee_enabled: ctx.props().e2ee_enabled, + webtransport_enabled: ctx.props().webtransport_enabled, error: None, + peer_keys: HashMap::new(), + aes: Arc::new(Aes128State::new(ctx.props().e2ee_enabled)), + rsa: Arc::new(RsaWrapper::new(ctx.props().e2ee_enabled)), } } @@ -146,34 +218,62 @@ impl Component for AttendantsComponent { fn update(&mut self, ctx: &Context, msg: Self::Message) -> bool { match msg { Msg::WsAction(action) => match action { - WsAction::Connect => { - if self.client.is_connected() { + WsAction::Connect(webtransport) => { + if self.connection.is_some() { return false; } - if let Err(e) = self.client.connect() { - ctx.link() - .send_message(WsAction::Log(format!("Connection failed: {e}"))); + info!("webtransport connect = {}", webtransport); + info!("end to end encryption enabled = {}", self.e2ee_enabled); + let id = ctx.props().id.clone(); + let email = ctx.props().email.clone(); + let options = ConnectOptions { + userid: email.clone(), + websocket_url: format!("{ACTIX_WEBSOCKET}/{email}/{id}"), + webtransport_url: format!("{WEBTRANSPORT_HOST}/{email}/{id}"), + on_inbound_media: ctx.link().callback(Msg::OnInboundMedia), + on_connected: ctx.link().callback(|_| Msg::from(WsAction::Connected)), + on_connection_lost: ctx + .link() + .callback(|_| Msg::from(WsAction::Lost(None))), + }; + match Connection::connect(webtransport, options, self.aes.clone()) { + Ok(connection) => { + self.connection = Some(connection); + } + Err(e) => { + ctx.link() + .send_message(WsAction::Log(format!("Connection failed: {e}"))); + } + } + + true + } + WsAction::Connected => { + info!("Connected"); + if self.e2ee_enabled { + self.send_public_key(ctx); } true } - WsAction::Connected => true, WsAction::Log(msg) => { warn!("{}", msg); false } WsAction::Lost(_reason) => { warn!("Lost"); - ctx.link().send_message(WsAction::Connect); + self.connection = None; + ctx.link() + .send_message(WsAction::Connect(self.webtransport_enabled)); true } WsAction::RequestMediaPermissions => { self.media_device_access.request(); - ctx.link().send_message(WsAction::Connect); + ctx.link() + .send_message(WsAction::Connect(self.webtransport_enabled)); false } WsAction::MediaPermissionsGranted => { self.error = None; - ctx.link().send_message(WsAction::Connect); true } WsAction::MediaPermissionsError(error) => { @@ -181,8 +281,101 @@ impl Component for AttendantsComponent { true } }, - Msg::OnPeerAdded(_email) => true, + Msg::OnPeerAdded(_email) => { + debug!("New peer arrived."); + if self.e2ee_enabled { + self.send_public_key(ctx); + } + true + } Msg::OnFirstFrame((_email, media_type)) => matches!(media_type, MediaType::SCREEN), + Msg::OnInboundMedia(response) => { + match response.packet_type.enum_value() { + Ok(PacketType::AES_KEY) => { + if !self.e2ee_enabled { + return false; + } + debug!("Received AES_KEY {}", &response.email); + if let Ok(bytes) = self.rsa.decrypt(&response.data) { + match AesPacket::parse_from_bytes(&bytes) { + Ok(aes_packet) => { + self.peer_keys.insert( + response.email, + Aes128State::from_vecs( + aes_packet.key, + aes_packet.iv, + self.e2ee_enabled, + ), + ); + } + Err(e) => { + error!("Failed to parse aes packet: {}", e.to_string()) + } + } + } + return false; + } + Ok(PacketType::RSA_PUB_KEY) => { + if !self.e2ee_enabled { + return false; + } + debug!("Received RSA_PUB_KEY"); + let encrypted_aes_packet = parse_rsa_packet(&response.data) + .and_then(parse_public_key) + .and_then(|pub_key| { + self.serialize_aes_packet() + .map(|aes_packet| (aes_packet, pub_key)) + }) + .and_then(|(aes_packet, pub_key)| { + self.encrypt_aes_packet(&aes_packet, &pub_key) + }); + + match encrypted_aes_packet { + Ok(data) => { + ctx.link() + .send_message(Msg::OnOutboundPacket(PacketWrapper { + packet_type: PacketType::AES_KEY.into(), + email: ctx.props().email.clone(), + data, + ..Default::default() + })); + } + Err(e) => { + error!("Failed to send AES_KEY to peer: {}", e.to_string()); + } + } + } + Ok(PacketType::MEDIA) => { + let email = response.email.clone(); + if self.e2ee_enabled { + if let Some(key) = self.peer_keys.get(&email) { + if let Err(e) = + self.peer_decode_manager.decode(response, Some(*key)) + { + error!("error decoding packet: {}", e.to_string()); + self.peer_decode_manager.delete_peer(&email); + self.peer_keys.remove(&email); + } + } else { + debug!("No key found for peer"); + self.send_public_key(ctx) + } + } else if let Err(e) = self.peer_decode_manager.decode(response, None) { + error!("error decoding packet: {}", e.to_string()); + self.peer_decode_manager.delete_peer(&email); + self.peer_keys.remove(&email); + } + } + Err(_) => {} + } + false + } + Msg::OnOutboundPacket(media) => { + if let Some(connection) = &self.connection { + connection.send_packet(media); + } + false + } Msg::MeetingAction(action) => { match action { MeetingAction::ToggleScreenShare => { @@ -202,10 +395,11 @@ impl Component for AttendantsComponent { fn view(&self, ctx: &Context) -> Html { let email = ctx.props().email.clone(); + let on_packet = ctx.link().callback(Msg::OnOutboundPacket); let media_access_granted = self.media_device_access.is_granted(); let rows: Vec = self - .client - .sorted_peer_keys() + .peer_decode_manager + .sorted_keys() .iter() .map(|key| { if !USERS_ALLOWED_TO_STREAM.is_empty() @@ -213,7 +407,12 @@ impl Component for AttendantsComponent { { return html! {}; } - let screen_share_css = if self.client.is_awaiting_peer_screen_frame(key) { + let peer = match self.peer_decode_manager.get(key) { + Some(peer) => peer, + None => return html! {}, + }; + + let screen_share_css = if peer.screen.is_waiting_for_keyframe() { "grid-item hidden" } else { "grid-item" @@ -278,20 +477,20 @@ impl Component for AttendantsComponent { { if media_access_granted { - html! {} + html! {} } else { html! {<>} } }

{email}

- {if !self.client.is_connected() { + {if !self.is_connected() { html! {

{"Connecting"}

} } else { html! {

{"Connected"}

} }} - {if ctx.props().e2ee_enabled { + {if self.e2ee_enabled { html! {

{"End to End Encryption Enabled"}

} } else { html! {

{"End to End Encryption Disabled"}

} @@ -356,3 +555,13 @@ fn toggle_pinned_div(div_id: &str) { } } } + +fn parse_rsa_packet(response_data: &[u8]) -> Result { + RsaPacket::parse_from_bytes(response_data) + .map_err(|e| anyhow!("Failed to parse rsa packet: {}", e.to_string())) +} + +fn parse_public_key(rsa_packet: RsaPacket) -> Result { + RsaPublicKey::from_public_key_der(&rsa_packet.public_key_der) + .map_err(|e| anyhow!("Failed to parse rsa public key: {}", e.to_string())) +} diff --git a/yew-ui/src/components/host.rs b/yew-ui/src/components/host.rs index 5975cbad..d0eb6a75 100644 --- a/yew-ui/src/components/host.rs +++ b/yew-ui/src/components/host.rs @@ -1,11 +1,13 @@ -use crate::model::client::VideoCallClient; use gloo_timers::callback::Timeout; use log::debug; +use types::protos::packet_wrapper::PacketWrapper; use std::fmt::Debug; +use std::sync::Arc; use yew::prelude::*; use crate::components::device_selector::DeviceSelector; +use crate::crypto::aes::Aes128State; use crate::model::encode::CameraEncoder; use crate::model::encode::MicrophoneEncoder; use crate::model::encode::ScreenEncoder; @@ -38,13 +40,19 @@ pub struct MeetingProps { #[prop_or_default] pub id: String, - pub client: VideoCallClient, + #[prop_or_default] + pub on_packet: Callback, + + #[prop_or_default] + pub email: String, pub share_screen: bool, pub mic_enabled: bool, pub video_enabled: bool, + + pub aes: Arc, } impl Component for Host { @@ -52,11 +60,11 @@ impl Component for Host { type Properties = MeetingProps; fn create(ctx: &Context) -> Self { - let client = &ctx.props().client; + let aes = ctx.props().aes.clone(); Self { - camera: CameraEncoder::new(client.clone(), VIDEO_ELEMENT_ID), - microphone: MicrophoneEncoder::new(client.clone()), - screen: ScreenEncoder::new(client.clone()), + camera: CameraEncoder::new(aes.clone()), + microphone: MicrophoneEncoder::new(aes.clone()), + screen: ScreenEncoder::new(aes), share_screen: ctx.props().share_screen, mic_enabled: ctx.props().mic_enabled, video_enabled: ctx.props().video_enabled, @@ -99,7 +107,10 @@ impl Component for Host { fn update(&mut self, ctx: &Context, msg: Self::Message) -> bool { match msg { Msg::EnableScreenShare => { - self.screen.start(); + let on_frame = ctx.props().on_packet.clone(); + let email = ctx.props().email.clone(); + self.screen + .start(email, move |packet: PacketWrapper| on_frame.emit(packet)); true } Msg::DisableScreenShare => { @@ -111,7 +122,10 @@ impl Component for Host { if !should_enable { return true; } - self.microphone.start(); + let on_audio = ctx.props().on_packet.clone(); + let email = ctx.props().email.clone(); + self.microphone + .start(email, move |packet: PacketWrapper| on_audio.emit(packet)); true } Msg::DisableMicrophone => { @@ -123,7 +137,13 @@ impl Component for Host { return true; } - self.camera.start(); + let on_packet = ctx.props().on_packet.clone(); + let email = ctx.props().email.clone(); + self.camera.start( + email, + move |packet: PacketWrapper| on_packet.emit(packet), + VIDEO_ELEMENT_ID, + ); true } Msg::DisableVideo => { diff --git a/yew-ui/src/model/client/mod.rs b/yew-ui/src/model/client/mod.rs deleted file mode 100644 index 2b5a0cb9..00000000 --- a/yew-ui/src/model/client/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod video_call_client; - -pub use video_call_client::{VideoCallClient, VideoCallClientOptions}; diff --git a/yew-ui/src/model/client/video_call_client.rs b/yew-ui/src/model/client/video_call_client.rs deleted file mode 100644 index 77eecfcf..00000000 --- a/yew-ui/src/model/client/video_call_client.rs +++ /dev/null @@ -1,335 +0,0 @@ -use super::super::connection::{ConnectOptions, Connection}; -use super::super::decode::{PeerDecodeManager, PeerStatus}; -use crate::crypto::aes::Aes128State; -use crate::crypto::rsa::RsaWrapper; -use anyhow::{anyhow, Result}; -use log::{debug, error, info}; -use protobuf::Message; -use rsa::pkcs8::{DecodePublicKey, EncodePublicKey}; -use rsa::RsaPublicKey; -use std::cell::RefCell; -use std::rc::{Rc, Weak}; -use std::sync::Arc; -use types::protos::aes_packet::AesPacket; -use types::protos::media_packet::media_packet::MediaType; -use types::protos::packet_wrapper::packet_wrapper::PacketType; -use types::protos::packet_wrapper::PacketWrapper; -use types::protos::rsa_packet::RsaPacket; -use yew::prelude::Callback; -#[derive(Clone, Debug, PartialEq)] -pub struct VideoCallClientOptions { - pub enable_e2ee: bool, - pub enable_webtransport: bool, - pub on_peer_added: Callback, - pub on_peer_first_frame: Callback<(String, MediaType)>, - pub get_peer_video_canvas_id: Callback, - pub get_peer_screen_canvas_id: Callback, - pub userid: String, - pub websocket_url: String, - pub webtransport_url: String, - pub on_connected: Callback<()>, - pub on_connection_lost: Callback<()>, -} - -#[derive(Debug)] -struct InnerOptions { - enable_e2ee: bool, - userid: String, - on_peer_added: Callback, -} - -#[derive(Debug)] -struct Inner { - options: InnerOptions, - connection: Option, - aes: Arc, - rsa: Arc, - peer_decode_manager: PeerDecodeManager, -} - -#[derive(Clone, Debug)] -pub struct VideoCallClient { - options: VideoCallClientOptions, - inner: Rc>, - aes: Arc, -} - -impl PartialEq for VideoCallClient { - fn eq(&self, other: &Self) -> bool { - Rc::ptr_eq(&self.inner, &other.inner) && self.options == other.options - } -} - -impl VideoCallClient { - pub fn new(options: VideoCallClientOptions) -> Self { - let aes = Arc::new(Aes128State::new(options.enable_e2ee)); - let inner = Rc::new(RefCell::new(Inner { - options: InnerOptions { - enable_e2ee: options.enable_e2ee, - userid: options.userid.clone(), - on_peer_added: options.on_peer_added.clone(), - }, - connection: None, - aes: aes.clone(), - rsa: Arc::new(RsaWrapper::new(options.enable_e2ee)), - peer_decode_manager: Self::create_peer_decoder_manager(&options), - })); - Self { - options, - aes, - inner, - } - } - - pub fn connect(&mut self) -> anyhow::Result<()> { - let options = ConnectOptions { - userid: self.options.userid.clone(), - websocket_url: self.options.websocket_url.clone(), - webtransport_url: self.options.webtransport_url.clone(), - on_inbound_media: { - let inner = Rc::downgrade(&self.inner); - Callback::from(move |packet| { - if let Some(inner) = Weak::upgrade(&inner) { - match inner.try_borrow_mut() { - Ok(mut inner) => inner.on_inbound_media(packet), - Err(_) => { - error!( - "Unable to borrow inner -- dropping receive packet {:?}", - packet - ); - } - } - } - }) - }, - on_connected: { - let inner = Rc::downgrade(&self.inner); - let callback = self.options.on_connected.clone(); - Callback::from(move |_| { - if let Some(inner) = Weak::upgrade(&inner) { - match inner.try_borrow() { - Ok(inner) => inner.send_public_key(), - Err(_) => { - error!("Unable to borrow inner -- not sending public key"); - } - } - } - callback.emit(()); - }) - }, - on_connection_lost: self.options.on_connection_lost.clone(), - }; - info!( - "webtransport connect = {}", - self.options.enable_webtransport - ); - info!( - "end to end encryption enabled = {}", - self.options.enable_e2ee - ); - - let mut borrowed = self.inner.try_borrow_mut()?; - borrowed.connection.replace(Connection::connect( - self.options.enable_webtransport, - options, - self.aes.clone(), - )?); - Ok(()) - } - - fn create_peer_decoder_manager(opts: &VideoCallClientOptions) -> PeerDecodeManager { - let mut peer_decode_manager = PeerDecodeManager::new(); - peer_decode_manager.on_first_frame = opts.on_peer_first_frame.clone(); - peer_decode_manager.get_video_canvas_id = opts.get_peer_video_canvas_id.clone(); - peer_decode_manager.get_screen_canvas_id = opts.get_peer_screen_canvas_id.clone(); - peer_decode_manager - } - - pub fn send_packet(&self, media: PacketWrapper) { - match self.inner.try_borrow() { - Ok(inner) => inner.send_packet(media), - Err(_) => { - error!("Unable to borrow inner -- dropping send packet {:?}", media) - } - } - } - - pub fn is_connected(&self) -> bool { - if let Ok(inner) = self.inner.try_borrow() { - if let Some(connection) = &inner.connection { - return connection.is_connected(); - } - }; - false - } - - pub fn sorted_peer_keys(&self) -> Vec { - match self.inner.try_borrow() { - Ok(inner) => inner.peer_decode_manager.sorted_keys().to_vec(), - Err(_) => Vec::::new(), - } - } - - pub fn is_awaiting_peer_screen_frame(&self, key: &String) -> bool { - if let Ok(inner) = self.inner.try_borrow() { - if let Some(peer) = inner.peer_decode_manager.get(key) { - return peer.screen.is_waiting_for_keyframe(); - } - } - false - } - - pub fn aes(&self) -> Arc { - self.aes.clone() - } - - pub fn userid(&self) -> &String { - &self.options.userid - } -} - -impl Inner { - fn send_packet(&self, media: PacketWrapper) { - if let Some(connection) = &self.connection { - connection.send_packet(media); - } - } - - fn on_inbound_media(&mut self, response: PacketWrapper) { - debug!( - "<< Received {:?} from {}", - response.packet_type.enum_value(), - response.email - ); - let peer_status = self.peer_decode_manager.ensure_peer(&response.email); - match response.packet_type.enum_value() { - Ok(PacketType::AES_KEY) => { - if !self.options.enable_e2ee { - return; - } - if let Ok(bytes) = self.rsa.decrypt(&response.data) { - debug!("Decrypted AES_KEY from {}", response.email); - match AesPacket::parse_from_bytes(&bytes) { - Ok(aes_packet) => { - if let Err(e) = self.peer_decode_manager.set_peer_aes( - &response.email, - Aes128State::from_vecs( - aes_packet.key, - aes_packet.iv, - self.options.enable_e2ee, - ), - ) { - error!("Failed to set peer aes: {}", e.to_string()); - } - } - Err(e) => { - error!("Failed to parse aes packet: {}", e.to_string()); - } - } - } - } - Ok(PacketType::RSA_PUB_KEY) => { - if !self.options.enable_e2ee { - return; - } - let encrypted_aes_packet = parse_rsa_packet(&response.data) - .and_then(parse_public_key) - .and_then(|pub_key| { - self.serialize_aes_packet() - .map(|aes_packet| (aes_packet, pub_key)) - }) - .and_then(|(aes_packet, pub_key)| { - self.encrypt_aes_packet(&aes_packet, &pub_key) - }); - - match encrypted_aes_packet { - Ok(data) => { - debug!(">> {} sending AES key", self.options.userid); - self.send_packet(PacketWrapper { - packet_type: PacketType::AES_KEY.into(), - email: self.options.userid.clone(), - data, - ..Default::default() - }); - } - Err(e) => { - error!("Failed to send AES_KEY to peer: {}", e.to_string()); - } - } - } - Ok(PacketType::MEDIA) => { - let email = response.email.clone(); - if let Err(e) = self.peer_decode_manager.decode(response) { - error!("error decoding packet: {}", e.to_string()); - self.peer_decode_manager.delete_peer(&email); - } - } - Err(_) => {} - } - if let PeerStatus::Added(peer_userid) = peer_status { - debug!("added peer {}", peer_userid); - self.send_public_key(); - self.options.on_peer_added.emit(peer_userid); - } - } - - fn send_public_key(&self) { - if !self.options.enable_e2ee { - return; - } - let userid = self.options.userid.clone(); - let rsa = &*self.rsa; - match rsa.pub_key.to_public_key_der() { - Ok(public_key_der) => { - let packet = RsaPacket { - username: userid.clone(), - public_key_der: public_key_der.to_vec(), - ..Default::default() - }; - match packet.write_to_bytes() { - Ok(data) => { - debug!(">> {} sending public key", userid); - self.send_packet(PacketWrapper { - packet_type: PacketType::RSA_PUB_KEY.into(), - email: userid, - data, - ..Default::default() - }); - } - Err(e) => { - error!("Failed to serialize rsa packet: {}", e.to_string()); - } - } - } - Err(e) => { - error!("Failed to export rsa public key to der: {}", e.to_string()); - } - } - } - - fn serialize_aes_packet(&self) -> Result> { - AesPacket { - key: self.aes.key.to_vec(), - iv: self.aes.iv.to_vec(), - ..Default::default() - } - .write_to_bytes() - .map_err(|e| anyhow!("Failed to serialize aes packet: {}", e.to_string())) - } - - fn encrypt_aes_packet(&self, aes_packet: &[u8], pub_key: &RsaPublicKey) -> Result> { - self.rsa - .encrypt_with_key(aes_packet, pub_key) - .map_err(|e| anyhow!("Failed to encrypt aes packet: {}", e.to_string())) - } -} - -fn parse_rsa_packet(response_data: &[u8]) -> Result { - RsaPacket::parse_from_bytes(response_data) - .map_err(|e| anyhow!("Failed to parse rsa packet: {}", e.to_string())) -} - -fn parse_public_key(rsa_packet: RsaPacket) -> Result { - RsaPublicKey::from_public_key_der(&rsa_packet.public_key_der) - .map_err(|e| anyhow!("Failed to parse rsa public key: {}", e.to_string())) -} diff --git a/yew-ui/src/model/connection/connection.rs b/yew-ui/src/model/connection/connection.rs index 6df610e5..cecf688e 100644 --- a/yew-ui/src/model/connection/connection.rs +++ b/yew-ui/src/model/connection/connection.rs @@ -15,14 +15,13 @@ use types::protos::packet_wrapper::packet_wrapper::PacketType; use types::protos::packet_wrapper::PacketWrapper; use yew::prelude::Callback; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy)] enum Status { Connecting, Connected, Closed, } -#[derive(Debug)] pub struct Connection { task: Arc, heartbeat: Option, diff --git a/yew-ui/src/model/connection/task.rs b/yew-ui/src/model/connection/task.rs index 425f9c22..e34dc003 100644 --- a/yew-ui/src/model/connection/task.rs +++ b/yew-ui/src/model/connection/task.rs @@ -10,7 +10,6 @@ use yew_webtransport::webtransport::WebTransportTask; use super::webmedia::{ConnectOptions, WebMedia}; -#[derive(Debug)] pub(super) enum Task { WebSocket(WebSocketTask), WebTransport(WebTransportTask), diff --git a/yew-ui/src/model/connection/webtransport.rs b/yew-ui/src/model/connection/webtransport.rs index 8f35353f..c5bfb5f8 100644 --- a/yew-ui/src/model/connection/webtransport.rs +++ b/yew-ui/src/model/connection/webtransport.rs @@ -32,36 +32,32 @@ enum MessageType { impl WebMedia for WebTransportTask { fn connect(options: ConnectOptions) -> anyhow::Result { let on_datagram = { - let callback = options.on_inbound_media.clone(); + let on_inbound_media = options.on_inbound_media.clone(); Callback::from(move |bytes: Vec| { - emit_packet(bytes, MessageType::Datagram, callback.clone()) + emit_packet(bytes, MessageType::Datagram, on_inbound_media.clone()) }) }; let on_unidirectional_stream = { - let callback = options.on_inbound_media.clone(); + let on_inbound_media = options.on_inbound_media.clone(); Callback::from(move |stream: WebTransportReceiveStream| { - handle_unidirectional_stream(stream, callback.clone()) + handle_unidirectional_stream(stream, on_inbound_media.clone()) }) }; let on_bidirectional_stream = { - let callback = options.on_inbound_media.clone(); + let on_inbound_media = options.on_inbound_media.clone(); Callback::from(move |stream: WebTransportBidirectionalStream| { - handle_bidirectional_stream(stream, callback.clone()) + handle_bidirectional_stream(stream, on_inbound_media.clone()) }) }; - let notification = { - let connected_callback = options.on_connected.clone(); - let connection_lost_callback = options.on_connection_lost.clone(); - Callback::from(move |status| match status { - WebTransportStatus::Opened => connected_callback.emit(()), - WebTransportStatus::Closed(_error) | WebTransportStatus::Error(_error) => { - connection_lost_callback.emit(()) - } - }) - }; + let notification = Callback::from(move |status| match status { + WebTransportStatus::Opened => options.on_connected.emit(()), + WebTransportStatus::Closed(_error) | WebTransportStatus::Error(_error) => { + options.on_connection_lost.emit(()) + } + }); debug!("WebTransport connecting to {}", &options.webtransport_url); let task = WebTransportService::connect( &options.webtransport_url, diff --git a/yew-ui/src/model/decode/hash_map_with_ordered_keys.rs b/yew-ui/src/model/decode/hash_map_with_ordered_keys.rs deleted file mode 100644 index 064e19b2..00000000 --- a/yew-ui/src/model/decode/hash_map_with_ordered_keys.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::borrow::Borrow; -use std::cmp::Ord; -use std::collections::HashMap; -use std::hash::Hash; - -#[derive(Debug)] -pub struct HashMapWithOrderedKeys { - map: HashMap, - keys: Vec, -} - -// -// Only implementing the methods that are needed in peer_decode_manager -// -impl HashMapWithOrderedKeys { - pub fn new() -> Self { - Self { - map: HashMap::new(), - keys: vec![], - } - } - - // - // Delegated methods - // - - pub fn get(&self, k: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - self.map.get(k) - } - - pub fn get_mut(&mut self, k: &Q) -> Option<&mut V> - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - self.map.get_mut(k) - } - - pub fn contains_key(&self, k: &Q) -> bool - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - self.map.contains_key(k) - } - - // - // Delegated methods with extra handling to maintain ordered keys - // - - pub fn insert(&mut self, k: K, v: V) -> Option { - self.map.insert(k.clone(), v).or_else(|| { - self.keys.push(k); - self.keys.sort(); - None - }) - } - - pub fn remove(&mut self, k: &K) -> Option { - if let Ok(index) = self.keys.binary_search(k) { - self.keys.remove(index); - } - self.map.remove(k) - } - - // - // New methods - // - - pub fn ordered_keys(&self) -> &Vec { - &self.keys - } -} diff --git a/yew-ui/src/model/decode/mod.rs b/yew-ui/src/model/decode/mod.rs index 23c1ad1b..74fa78eb 100644 --- a/yew-ui/src/model/decode/mod.rs +++ b/yew-ui/src/model/decode/mod.rs @@ -1,8 +1,7 @@ mod config; -mod hash_map_with_ordered_keys; mod peer_decode_manager; mod peer_decoder; mod video_decoder_with_buffer; mod video_decoder_wrapper; -pub use peer_decode_manager::{Peer, PeerDecodeError, PeerDecodeManager, PeerStatus}; +pub use peer_decode_manager::{MultiDecoder, MultiDecoderError, PeerDecodeManager}; diff --git a/yew-ui/src/model/decode/peer_decode_manager.rs b/yew-ui/src/model/decode/peer_decode_manager.rs index 66d3049d..6197230c 100644 --- a/yew-ui/src/model/decode/peer_decode_manager.rs +++ b/yew-ui/src/model/decode/peer_decode_manager.rs @@ -1,6 +1,6 @@ -use super::hash_map_with_ordered_keys::HashMapWithOrderedKeys; use log::debug; use protobuf::Message; +use std::collections::HashMap; use std::{fmt::Display, sync::Arc}; use types::protos::media_packet::MediaPacket; use types::protos::packet_wrapper::packet_wrapper::PacketType; @@ -12,136 +12,106 @@ use crate::crypto::aes::Aes128State; use super::peer_decoder::{AudioPeerDecoder, DecodeStatus, PeerDecode, VideoPeerDecoder}; #[derive(Debug)] -pub enum PeerDecodeError { +pub enum MultiDecoderError { AesDecryptError, IncorrectPacketType, AudioDecodeError, ScreenDecodeError, VideoDecodeError, - NoSuchPeer(String), - NoMediaType, - NoPacketType, - PacketParseError, + Other(String), } -#[derive(Debug)] -pub enum PeerStatus { - Added(String), - NoChange, -} - -impl Display for PeerDecodeError { +impl Display for MultiDecoderError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - PeerDecodeError::AesDecryptError => write!(f, "AesDecryptError"), - PeerDecodeError::IncorrectPacketType => write!(f, "IncorrectPacketType"), - PeerDecodeError::AudioDecodeError => write!(f, "AudioDecodeError"), - PeerDecodeError::ScreenDecodeError => write!(f, "ScreenDecodeError"), - PeerDecodeError::VideoDecodeError => write!(f, "VideoDecodeError"), - PeerDecodeError::NoSuchPeer(s) => write!(f, "Peer Not Found: {s}"), - PeerDecodeError::NoMediaType => write!(f, "No media_type"), - PeerDecodeError::NoPacketType => write!(f, "No packet_type"), - PeerDecodeError::PacketParseError => { - write!(f, "Failed to parse to protobuf MediaPacket") - } + MultiDecoderError::AesDecryptError => write!(f, "AesDecryptError"), + MultiDecoderError::IncorrectPacketType => write!(f, "IncorrectPacketType"), + MultiDecoderError::AudioDecodeError => write!(f, "AudioDecodeError"), + MultiDecoderError::ScreenDecodeError => write!(f, "ScreenDecodeError"), + MultiDecoderError::VideoDecodeError => write!(f, "VideoDecodeError"), + MultiDecoderError::Other(s) => write!(f, "Other: {s}"), } } } -#[derive(Debug)] -pub struct Peer { +pub struct MultiDecoder { pub audio: AudioPeerDecoder, pub video: VideoPeerDecoder, pub screen: VideoPeerDecoder, pub email: String, - pub video_canvas_id: String, - pub screen_canvas_id: String, pub aes: Option, } -impl Peer { +impl MultiDecoder { fn new( video_canvas_id: String, screen_canvas_id: String, email: String, aes: Option, ) -> Self { - let (audio, video, screen) = Self::new_decoders(&video_canvas_id, &screen_canvas_id); Self { - audio, - video, - screen, + audio: AudioPeerDecoder::new(), + video: VideoPeerDecoder::new(&video_canvas_id), + screen: VideoPeerDecoder::new(&screen_canvas_id), email, - video_canvas_id, - screen_canvas_id, aes, } } - fn new_decoders( - video_canvas_id: &str, - screen_canvas_id: &str, - ) -> (AudioPeerDecoder, VideoPeerDecoder, VideoPeerDecoder) { - ( - AudioPeerDecoder::new(), - VideoPeerDecoder::new(video_canvas_id), - VideoPeerDecoder::new(screen_canvas_id), - ) - } - - fn reset(&mut self) { - let (audio, video, screen) = - Self::new_decoders(&self.video_canvas_id, &self.screen_canvas_id); - self.audio = audio; - self.video = video; - self.screen = screen; - } - fn decode( &mut self, packet: &Arc, - ) -> Result<(MediaType, DecodeStatus), PeerDecodeError> { + ) -> Result<(MediaType, DecodeStatus), MultiDecoderError> { if packet .packet_type .enum_value() - .map_err(|_| PeerDecodeError::NoPacketType)? + .map_err(|_e| MultiDecoderError::Other(String::from("No packet_type")))? != PacketType::MEDIA { - return Err(PeerDecodeError::IncorrectPacketType); + return Err(MultiDecoderError::IncorrectPacketType); } - let packet = match self.aes { - Some(aes) => { + let packet = { + if let Some(aes) = self.aes { let data = aes .decrypt(&packet.data) - .map_err(|_| PeerDecodeError::AesDecryptError)?; - parse_media_packet(&data)? + .map_err(|_e| MultiDecoderError::AesDecryptError)?; + Arc::new(MediaPacket::parse_from_bytes(&data).map_err(|_e| { + MultiDecoderError::Other(String::from( + "Failed to parse to protobuf MediaPacket", + )) + })?) + } else { + Arc::new(MediaPacket::parse_from_bytes(&packet.data).map_err(|_e| { + MultiDecoderError::Other(String::from( + "Failed to parse to protobuf MediaPacket", + )) + })?) } - None => parse_media_packet(&packet.data)?, }; let media_type = packet .media_type .enum_value() - .map_err(|_| PeerDecodeError::NoMediaType)?; + .map_err(|_e| MultiDecoderError::Other(String::from("No media_type")))?; match media_type { MediaType::VIDEO => Ok(( media_type, self.video .decode(&packet) - .map_err(|_| PeerDecodeError::VideoDecodeError)?, + .map_err(|_e| MultiDecoderError::VideoDecodeError)?, )), MediaType::AUDIO => Ok(( media_type, self.audio .decode(&packet) - .map_err(|_| PeerDecodeError::AudioDecodeError)?, + .map_err(|_e| MultiDecoderError::AudioDecodeError)?, )), MediaType::SCREEN => Ok(( media_type, self.screen .decode(&packet) - .map_err(|_| PeerDecodeError::ScreenDecodeError)?, + .map_err(|_e| MultiDecoderError::ScreenDecodeError)?, )), MediaType::HEARTBEAT => Ok(( media_type, @@ -154,15 +124,10 @@ impl Peer { } } -fn parse_media_packet(data: &[u8]) -> Result, PeerDecodeError> { - Ok(Arc::new( - MediaPacket::parse_from_bytes(data).map_err(|_| PeerDecodeError::PacketParseError)?, - )) -} - -#[derive(Debug)] pub struct PeerDecodeManager { - connected_peers: HashMapWithOrderedKeys, + connected_peers: HashMap, + sorted_connected_peers_keys: Vec, + pub on_peer_added: Callback, pub on_first_frame: Callback<(String, MediaType)>, pub get_video_canvas_id: Callback, pub get_screen_canvas_id: Callback, @@ -171,7 +136,9 @@ pub struct PeerDecodeManager { impl PeerDecodeManager { pub fn new() -> Self { Self { - connected_peers: HashMapWithOrderedKeys::new(), + connected_peers: HashMap::new(), + sorted_connected_peers_keys: vec![], + on_peer_added: Callback::noop(), on_first_frame: Callback::noop(), get_video_canvas_id: Callback::from(|key| format!("video-{}", &key)), get_screen_canvas_id: Callback::from(|key| format!("screen-{}", &key)), @@ -179,16 +146,23 @@ impl PeerDecodeManager { } pub fn sorted_keys(&self) -> &Vec { - self.connected_peers.ordered_keys() + &self.sorted_connected_peers_keys } - pub fn get(&self, key: &String) -> Option<&Peer> { + pub fn get(&self, key: &String) -> Option<&MultiDecoder> { self.connected_peers.get(key) } - pub fn decode(&mut self, response: PacketWrapper) -> Result<(), PeerDecodeError> { + pub fn decode( + &mut self, + response: PacketWrapper, + aes: Option, + ) -> Result<(), MultiDecoderError> { let packet = Arc::new(response); let email = packet.email.clone(); + if !self.connected_peers.contains_key(&email) { + self.add_peer(&email, aes); + } if let Some(peer) = self.connected_peers.get_mut(&email) { match peer.decode(&packet) { Ok((media_type, decode_status)) => { @@ -198,52 +172,44 @@ impl PeerDecodeManager { Ok(()) } Err(e) => { - peer.reset(); + self.reset_peer(&email, aes); Err(e) } } } else { - Err(PeerDecodeError::NoSuchPeer(email.clone())) + Err(MultiDecoderError::Other(String::from("No peer found"))) } } fn add_peer(&mut self, email: &str, aes: Option) { debug!("Adding peer {}", email); + self.insert_peer(email, aes); + self.on_peer_added.emit(email.to_owned()) + } + + fn insert_peer(&mut self, email: &str, aes: Option) { self.connected_peers.insert( email.to_owned(), - Peer::new( + MultiDecoder::new( self.get_video_canvas_id.emit(email.to_owned()), self.get_screen_canvas_id.emit(email.to_owned()), email.to_owned(), aes, ), ); + self.sorted_connected_peers_keys.push(email.to_owned()); + self.sorted_connected_peers_keys.sort(); } pub fn delete_peer(&mut self, email: &String) { self.connected_peers.remove(email); - } - - pub fn ensure_peer(&mut self, email: &String) -> PeerStatus { - if self.connected_peers.contains_key(email) { - PeerStatus::NoChange - } else { - self.add_peer(email, None); - PeerStatus::Added(email.clone()) + if let Ok(index) = self.sorted_connected_peers_keys.binary_search(email) { + self.sorted_connected_peers_keys.remove(index); } } - pub fn set_peer_aes( - &mut self, - email: &String, - aes: Aes128State, - ) -> Result<(), PeerDecodeError> { - match self.connected_peers.get_mut(email) { - Some(peer) => { - peer.aes = Some(aes); - Ok(()) - } - None => Err(PeerDecodeError::NoSuchPeer(email.clone())), - } + fn reset_peer(&mut self, email: &String, aes: Option) { + self.delete_peer(email); + self.insert_peer(email, aes); } } diff --git a/yew-ui/src/model/decode/peer_decoder.rs b/yew-ui/src/model/decode/peer_decoder.rs index 460ba4ac..ba282398 100644 --- a/yew-ui/src/model/decode/peer_decoder.rs +++ b/yew-ui/src/model/decode/peer_decoder.rs @@ -10,7 +10,6 @@ // and each one's new() contains the type-specific creation/configuration code. // -use super::super::wrappers::EncodedVideoChunkTypeWrapper; use super::config::configure_audio_context; use super::video_decoder_with_buffer::VideoDecoderWithBuffer; use super::video_decoder_wrapper::VideoDecoderWrapper; @@ -18,6 +17,7 @@ use crate::constants::AUDIO_CHANNELS; use crate::constants::AUDIO_CODEC; use crate::constants::AUDIO_SAMPLE_RATE; use crate::constants::VIDEO_CODEC; +use crate::model::EncodedVideoChunkTypeWrapper; use log::error; use std::sync::Arc; use types::protos::media_packet::MediaPacket; @@ -43,7 +43,6 @@ pub struct DecodeStatus { // // Generic type for decoders captures common functionality. // -#[derive(Debug)] pub struct PeerDecoder { decoder: WebDecoder, waiting_for_keyframe: bool, diff --git a/yew-ui/src/model/decode/video_decoder_with_buffer.rs b/yew-ui/src/model/decode/video_decoder_with_buffer.rs index 5e795008..5e59dee9 100644 --- a/yew-ui/src/model/decode/video_decoder_with_buffer.rs +++ b/yew-ui/src/model/decode/video_decoder_with_buffer.rs @@ -1,15 +1,17 @@ -use super::super::wrappers::EncodedVideoChunkTypeWrapper; -use super::video_decoder_wrapper::VideoDecoderTrait; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; + use types::protos::media_packet::MediaPacket; use wasm_bindgen::JsValue; use web_sys::{CodecState, EncodedVideoChunkType, VideoDecoderConfig, VideoDecoderInit}; +use crate::model::EncodedVideoChunkTypeWrapper; + +use super::video_decoder_wrapper::VideoDecoderTrait; + const MAX_BUFFER_SIZE: usize = 10; // This is a wrapper of the web-sys VideoDecoder which handles // frames being out of order and other issues. -#[derive(Debug)] pub struct VideoDecoderWithBuffer { video_decoder: A, cache: BTreeMap>, diff --git a/yew-ui/src/model/decode/video_decoder_wrapper.rs b/yew-ui/src/model/decode/video_decoder_wrapper.rs index 6a87c2d5..319143e3 100644 --- a/yew-ui/src/model/decode/video_decoder_wrapper.rs +++ b/yew-ui/src/model/decode/video_decoder_wrapper.rs @@ -1,4 +1,4 @@ -use super::super::wrappers::EncodedVideoChunkTypeWrapper; +use crate::model::EncodedVideoChunkTypeWrapper; use js_sys::Uint8Array; use std::sync::Arc; use types::protos::media_packet::MediaPacket; @@ -19,7 +19,6 @@ pub trait VideoDecoderTrait { } // Create a wrapper struct for the foreign struct -#[derive(Debug)] pub struct VideoDecoderWrapper(web_sys::VideoDecoder); // Implement the trait for the wrapper struct diff --git a/yew-ui/src/model/encode/camera_encoder.rs b/yew-ui/src/model/encode/camera_encoder.rs index 8fc09dc1..c31867ac 100644 --- a/yew-ui/src/model/encode/camera_encoder.rs +++ b/yew-ui/src/model/encode/camera_encoder.rs @@ -5,7 +5,7 @@ use js_sys::JsString; use js_sys::Reflect; use log::debug; use log::error; -use std::sync::atomic::Ordering; +use std::sync::{atomic::Ordering, Arc}; use types::protos::packet_wrapper::PacketWrapper; use wasm_bindgen::prelude::Closure; use wasm_bindgen::JsCast; @@ -26,25 +26,23 @@ use web_sys::VideoEncoderInit; use web_sys::VideoFrame; use web_sys::VideoTrack; -use super::super::client::VideoCallClient; use super::encoder_state::EncoderState; use super::transform::transform_video_chunk; use crate::constants::VIDEO_CODEC; use crate::constants::VIDEO_HEIGHT; use crate::constants::VIDEO_WIDTH; +use crate::crypto::aes::Aes128State; pub struct CameraEncoder { - client: VideoCallClient, - video_elem_id: String, + aes: Arc, state: EncoderState, } impl CameraEncoder { - pub fn new(client: VideoCallClient, video_elem_id: &str) -> Self { + pub fn new(aes: Arc) -> Self { Self { - client, - video_elem_id: video_elem_id.to_string(), + aes, state: EncoderState::new(), } } @@ -60,21 +58,28 @@ impl CameraEncoder { self.state.stop() } - pub fn start(&mut self) { + pub fn start( + &mut self, + userid: String, + on_frame: impl Fn(PacketWrapper) + 'static, + video_elem_id: &str, + ) { // 1. Query the first device with a camera and a mic attached. // 2. setup WebCodecs, in particular // 3. send encoded video frames and raw audio to the server. - let client = self.client.clone(); - let userid = client.userid().clone(); - let aes = client.aes(); - let video_elem_id = self.video_elem_id.clone(); + let on_frame = Box::new(on_frame); + let userid = Box::new(userid); + let video_elem_id = video_elem_id.to_string(); let EncoderState { destroy, enabled, switching, .. } = self.state.clone(); + let aes = self.aes.clone(); let video_output_handler = { + let userid = userid; + let on_frame = on_frame; let mut buffer: [u8; 100000] = [0; 100000]; let mut sequence_number = 0; Box::new(move |chunk: JsValue| { @@ -86,7 +91,7 @@ impl CameraEncoder { &userid, aes.clone(), ); - client.send_packet(packet); + on_frame(packet); sequence_number += 1; }) }; diff --git a/yew-ui/src/model/encode/microphone_encoder.rs b/yew-ui/src/model/encode/microphone_encoder.rs index 274f31a6..4555241e 100644 --- a/yew-ui/src/model/encode/microphone_encoder.rs +++ b/yew-ui/src/model/encode/microphone_encoder.rs @@ -4,7 +4,7 @@ use js_sys::Boolean; use js_sys::JsString; use js_sys::Reflect; use log::error; -use std::sync::atomic::Ordering; +use std::sync::{atomic::Ordering, Arc}; use types::protos::packet_wrapper::PacketWrapper; use wasm_bindgen::prelude::Closure; use wasm_bindgen::JsCast; @@ -22,7 +22,6 @@ use web_sys::MediaStreamTrackProcessor; use web_sys::MediaStreamTrackProcessorInit; use web_sys::ReadableStreamDefaultReader; -use super::super::client::VideoCallClient; use super::encoder_state::EncoderState; use super::transform::transform_audio_chunk; @@ -30,16 +29,17 @@ use crate::constants::AUDIO_BITRATE; use crate::constants::AUDIO_CHANNELS; use crate::constants::AUDIO_CODEC; use crate::constants::AUDIO_SAMPLE_RATE; +use crate::crypto::aes::Aes128State; pub struct MicrophoneEncoder { - client: VideoCallClient, + aes: Arc, state: EncoderState, } impl MicrophoneEncoder { - pub fn new(client: VideoCallClient) -> Self { + pub fn new(aes: Arc) -> Self { Self { - client, + aes, state: EncoderState::new(), } } @@ -55,23 +55,23 @@ impl MicrophoneEncoder { self.state.stop() } - pub fn start(&mut self) { + pub fn start(&mut self, userid: String, on_audio: impl Fn(PacketWrapper) + 'static) { let device_id = if let Some(mic) = &self.state.selected { mic.to_string() } else { return; }; - let client = self.client.clone(); - let userid = client.userid().clone(); - let aes = client.aes(); + let aes = self.aes.clone(); let audio_output_handler = { + let email = userid; + let on_audio = on_audio; let mut buffer: [u8; 100000] = [0; 100000]; let mut sequence = 0; Box::new(move |chunk: JsValue| { let chunk = web_sys::EncodedAudioChunk::from(chunk); let packet: PacketWrapper = - transform_audio_chunk(&chunk, &mut buffer, &userid, sequence, aes.clone()); - client.send_packet(packet); + transform_audio_chunk(&chunk, &mut buffer, &email, sequence, aes.clone()); + on_audio(packet); sequence += 1; }) }; diff --git a/yew-ui/src/model/encode/screen_encoder.rs b/yew-ui/src/model/encode/screen_encoder.rs index b62e0482..0a293221 100644 --- a/yew-ui/src/model/encode/screen_encoder.rs +++ b/yew-ui/src/model/encode/screen_encoder.rs @@ -3,7 +3,7 @@ use js_sys::Array; use js_sys::JsString; use js_sys::Reflect; use log::error; -use std::sync::atomic::Ordering; +use std::sync::{atomic::Ordering, Arc}; use types::protos::packet_wrapper::PacketWrapper; use wasm_bindgen::prelude::Closure; use wasm_bindgen::JsCast; @@ -22,23 +22,23 @@ use web_sys::VideoEncoderInit; use web_sys::VideoFrame; use web_sys::VideoTrack; -use super::super::client::VideoCallClient; use super::encoder_state::EncoderState; use super::transform::transform_screen_chunk; use crate::constants::SCREEN_HEIGHT; use crate::constants::SCREEN_WIDTH; use crate::constants::VIDEO_CODEC; +use crate::crypto::aes::Aes128State; pub struct ScreenEncoder { - client: VideoCallClient, + aes: Arc, state: EncoderState, } impl ScreenEncoder { - pub fn new(client: VideoCallClient) -> Self { + pub fn new(aes: Arc) -> Self { Self { - client, + aes, state: EncoderState::new(), } } @@ -51,14 +51,16 @@ impl ScreenEncoder { self.state.stop() } - pub fn start(&mut self) { + pub fn start(&mut self, userid: String, on_frame: impl Fn(PacketWrapper) + 'static) { let EncoderState { enabled, destroy, .. } = self.state.clone(); - let client = self.client.clone(); - let userid = client.userid().clone(); - let aes = client.aes(); + let on_frame = Box::new(on_frame); + let userid = Box::new(userid); + let aes = self.aes.clone(); let screen_output_handler = { + let userid = userid; + let on_frame = on_frame; let mut buffer: [u8; 150000] = [0; 150000]; let mut sequence_number = 0; Box::new(move |chunk: JsValue| { @@ -70,7 +72,7 @@ impl ScreenEncoder { &userid, aes.clone(), ); - client.send_packet(packet); + on_frame(packet); sequence_number += 1; }) }; diff --git a/yew-ui/src/model/mod.rs b/yew-ui/src/model/mod.rs index 290602a0..8bd8aff5 100644 --- a/yew-ui/src/model/mod.rs +++ b/yew-ui/src/model/mod.rs @@ -1,8 +1,9 @@ -pub mod client; pub mod connection; pub mod decode; pub mod encode; pub mod media_devices; pub mod wrappers; -pub use client::{VideoCallClient, VideoCallClientOptions}; +pub use wrappers::{ + AudioSampleFormatWrapper, EncodedAudioChunkTypeWrapper, EncodedVideoChunkTypeWrapper, +};