From a5568630d8d2470663e3f5458a3692aae30cffad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Thu, 24 Oct 2024 17:36:43 -0700 Subject: [PATCH] ffi: be more flexible with optional options (#474) --- livekit-ffi/protocol/audio_frame.proto | 12 ++-- livekit-ffi/protocol/room.proto | 20 +++--- livekit-ffi/protocol/video_frame.proto | 6 +- livekit-ffi/src/conversion/room.rs | 37 +++++++---- livekit-ffi/src/livekit.proto.rs | 76 +++++++++++----------- livekit-ffi/src/server/audio_source.rs | 2 +- livekit-ffi/src/server/audio_stream.rs | 19 ++---- livekit-ffi/src/server/colorcvt/cvtimpl.rs | 19 ++++++ livekit-ffi/src/server/requests.rs | 11 ++-- livekit-ffi/src/server/video_stream.rs | 6 +- 10 files changed, 115 insertions(+), 93 deletions(-) diff --git a/livekit-ffi/protocol/audio_frame.proto b/livekit-ffi/protocol/audio_frame.proto index d10f3a382..4d70c310f 100644 --- a/livekit-ffi/protocol/audio_frame.proto +++ b/livekit-ffi/protocol/audio_frame.proto @@ -25,8 +25,8 @@ import "track.proto"; message NewAudioStreamRequest { required uint64 track_handle = 1; required AudioStreamType type = 2; - required uint32 sample_rate = 3; - required uint32 num_channels = 4; + optional uint32 sample_rate = 3; + optional uint32 num_channels = 4; } message NewAudioStreamResponse { required OwnedAudioStream stream = 1; } @@ -34,8 +34,8 @@ message AudioStreamFromParticipantRequest { required uint64 participant_handle = 1; required AudioStreamType type = 2; optional TrackSource track_source = 3; - required uint32 sample_rate = 5; - required uint32 num_channels = 6; + optional uint32 sample_rate = 5; + optional uint32 num_channels = 6; } message AudioStreamFromParticipantResponse { required OwnedAudioStream stream = 1; } @@ -46,7 +46,7 @@ message NewAudioSourceRequest { optional AudioSourceOptions options = 2; required uint32 sample_rate = 3; required uint32 num_channels = 4; - required uint32 queue_size_ms = 5; + optional uint32 queue_size_ms = 5; } message NewAudioSourceResponse { required OwnedAudioSource source = 1; } @@ -97,7 +97,7 @@ message NewSoxResamplerRequest { required SoxResamplerDataType input_data_type = 4; required SoxResamplerDataType output_data_type = 5; required SoxQualityRecipe quality_recipe = 6; - required uint32 flags = 7; + optional uint32 flags = 7; } message NewSoxResamplerResponse { oneof message { diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index b2b295d08..edae5c007 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -254,12 +254,12 @@ message TrackPublishOptions { // encodings are optional optional VideoEncoding video_encoding = 1; optional AudioEncoding audio_encoding = 2; - required VideoCodec video_codec = 3; - required bool dtx = 4; - required bool red = 5; - required bool simulcast = 6; - required TrackSource source = 7; - required string stream = 8; + optional VideoCodec video_codec = 3; + optional bool dtx = 4; + optional bool red = 5; + optional bool simulcast = 6; + optional TrackSource source = 7; + optional string stream = 8; } enum IceTransportType { @@ -286,12 +286,12 @@ message RtcConfig { } message RoomOptions { - required bool auto_subscribe = 1; - required bool adaptive_stream = 2; - required bool dynacast = 3; + optional bool auto_subscribe = 1; + optional bool adaptive_stream = 2; + optional bool dynacast = 3; optional E2eeOptions e2ee = 4; optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration - required uint32 join_retries = 6; + optional uint32 join_retries = 6; } // diff --git a/livekit-ffi/protocol/video_frame.proto b/livekit-ffi/protocol/video_frame.proto index 1291ccc48..6ce0a2bc8 100644 --- a/livekit-ffi/protocol/video_frame.proto +++ b/livekit-ffi/protocol/video_frame.proto @@ -27,7 +27,7 @@ message NewVideoStreamRequest { required VideoStreamType type = 2; // Get the frame on a specific format optional VideoBufferType format = 3; - required bool normalize_stride = 4; // if true, stride will be set to width/chroma_width + optional bool normalize_stride = 4; // if true, stride will be set to width/chroma_width } message NewVideoStreamResponse { required OwnedVideoStream stream = 1; } @@ -37,7 +37,7 @@ message VideoStreamFromParticipantRequest { required VideoStreamType type = 2; required TrackSource track_source = 3; optional VideoBufferType format = 4; - required bool normalize_stride = 5; + optional bool normalize_stride = 5; } message VideoStreamFromParticipantResponse { required OwnedVideoStream stream = 1;} @@ -63,7 +63,7 @@ message CaptureVideoFrameRequest { message CaptureVideoFrameResponse {} message VideoConvertRequest { - required bool flip_y = 1; + optional bool flip_y = 1; required VideoBufferInfo buffer = 2; required VideoBufferType dst_type = 3; } diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index b5295f1b3..2dc32170d 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -177,13 +177,12 @@ impl From for RoomOptions { value.rtc_config.map(Into::into).unwrap_or(RoomOptions::default().rtc_config); let mut options = RoomOptions::default(); - options.adaptive_stream = value.adaptive_stream; - options.auto_subscribe = value.auto_subscribe; - options.dynacast = value.dynacast; - options.e2ee = e2ee; + options.adaptive_stream = value.adaptive_stream.unwrap_or(options.adaptive_stream); + options.auto_subscribe = value.auto_subscribe.unwrap_or(options.auto_subscribe); + options.dynacast = value.dynacast.unwrap_or(options.dynacast); options.rtc_config = rtc_config; - options.join_retries = value.join_retries; - options.sdk_options = RoomSdkOptions::default(); + options.join_retries = value.join_retries.unwrap_or(options.join_retries); + options.e2ee = e2ee; options } } @@ -208,15 +207,25 @@ impl From for proto::DataPacketKind { impl From for TrackPublishOptions { fn from(opts: proto::TrackPublishOptions) -> Self { + let default_publish_options = TrackPublishOptions::default(); + let video_codec = opts.video_codec.map(|x| proto::VideoCodec::try_from(x).ok()).flatten(); + let source = opts.source.map(|x| proto::TrackSource::try_from(x).ok()).flatten(); + Self { - video_codec: opts.video_codec().into(), - source: opts.source().into(), - video_encoding: opts.video_encoding.map(Into::into), - audio_encoding: opts.audio_encoding.map(Into::into), - dtx: opts.dtx, - red: opts.red, - simulcast: opts.simulcast, - stream: opts.stream, + video_codec: video_codec.map(Into::into).unwrap_or(default_publish_options.video_codec), + source: source.map(Into::into).unwrap_or(default_publish_options.source), + video_encoding: opts + .video_encoding + .map(Into::into) + .or(default_publish_options.video_encoding), + audio_encoding: opts + .audio_encoding + .map(Into::into) + .or(default_publish_options.audio_encoding), + dtx: opts.dtx.unwrap_or(default_publish_options.dtx), + red: opts.red.unwrap_or(default_publish_options.red), + simulcast: opts.simulcast.unwrap_or(default_publish_options.simulcast), + stream: opts.stream.unwrap_or(default_publish_options.stream), } } } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index a068e502d..620588fcd 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1601,8 +1601,8 @@ pub struct NewVideoStreamRequest { #[prost(enumeration="VideoBufferType", optional, tag="3")] pub format: ::core::option::Option, /// if true, stride will be set to width/chroma_width - #[prost(bool, required, tag="4")] - pub normalize_stride: bool, + #[prost(bool, optional, tag="4")] + pub normalize_stride: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1622,8 +1622,8 @@ pub struct VideoStreamFromParticipantRequest { pub track_source: i32, #[prost(enumeration="VideoBufferType", optional, tag="4")] pub format: ::core::option::Option, - #[prost(bool, required, tag="5")] - pub normalize_stride: bool, + #[prost(bool, optional, tag="5")] + pub normalize_stride: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1670,8 +1670,8 @@ pub struct CaptureVideoFrameResponse { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct VideoConvertRequest { - #[prost(bool, required, tag="1")] - pub flip_y: bool, + #[prost(bool, optional, tag="1")] + pub flip_y: ::core::option::Option, #[prost(message, required, tag="2")] pub buffer: VideoBufferInfo, #[prost(enumeration="VideoBufferType", required, tag="3")] @@ -2423,18 +2423,18 @@ pub struct TrackPublishOptions { pub video_encoding: ::core::option::Option, #[prost(message, optional, tag="2")] pub audio_encoding: ::core::option::Option, - #[prost(enumeration="VideoCodec", required, tag="3")] - pub video_codec: i32, - #[prost(bool, required, tag="4")] - pub dtx: bool, - #[prost(bool, required, tag="5")] - pub red: bool, - #[prost(bool, required, tag="6")] - pub simulcast: bool, - #[prost(enumeration="TrackSource", required, tag="7")] - pub source: i32, - #[prost(string, required, tag="8")] - pub stream: ::prost::alloc::string::String, + #[prost(enumeration="VideoCodec", optional, tag="3")] + pub video_codec: ::core::option::Option, + #[prost(bool, optional, tag="4")] + pub dtx: ::core::option::Option, + #[prost(bool, optional, tag="5")] + pub red: ::core::option::Option, + #[prost(bool, optional, tag="6")] + pub simulcast: ::core::option::Option, + #[prost(enumeration="TrackSource", optional, tag="7")] + pub source: ::core::option::Option, + #[prost(string, optional, tag="8")] + pub stream: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2460,19 +2460,19 @@ pub struct RtcConfig { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RoomOptions { - #[prost(bool, required, tag="1")] - pub auto_subscribe: bool, - #[prost(bool, required, tag="2")] - pub adaptive_stream: bool, - #[prost(bool, required, tag="3")] - pub dynacast: bool, + #[prost(bool, optional, tag="1")] + pub auto_subscribe: ::core::option::Option, + #[prost(bool, optional, tag="2")] + pub adaptive_stream: ::core::option::Option, + #[prost(bool, optional, tag="3")] + pub dynacast: ::core::option::Option, #[prost(message, optional, tag="4")] pub e2ee: ::core::option::Option, /// allow to setup a custom RtcConfiguration #[prost(message, optional, tag="5")] pub rtc_config: ::core::option::Option, - #[prost(uint32, required, tag="6")] - pub join_retries: u32, + #[prost(uint32, optional, tag="6")] + pub join_retries: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3069,10 +3069,10 @@ pub struct NewAudioStreamRequest { pub track_handle: u64, #[prost(enumeration="AudioStreamType", required, tag="2")] pub r#type: i32, - #[prost(uint32, required, tag="3")] - pub sample_rate: u32, - #[prost(uint32, required, tag="4")] - pub num_channels: u32, + #[prost(uint32, optional, tag="3")] + pub sample_rate: ::core::option::Option, + #[prost(uint32, optional, tag="4")] + pub num_channels: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3089,10 +3089,10 @@ pub struct AudioStreamFromParticipantRequest { pub r#type: i32, #[prost(enumeration="TrackSource", optional, tag="3")] pub track_source: ::core::option::Option, - #[prost(uint32, required, tag="5")] - pub sample_rate: u32, - #[prost(uint32, required, tag="6")] - pub num_channels: u32, + #[prost(uint32, optional, tag="5")] + pub sample_rate: ::core::option::Option, + #[prost(uint32, optional, tag="6")] + pub num_channels: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3112,8 +3112,8 @@ pub struct NewAudioSourceRequest { pub sample_rate: u32, #[prost(uint32, required, tag="4")] pub num_channels: u32, - #[prost(uint32, required, tag="5")] - pub queue_size_ms: u32, + #[prost(uint32, optional, tag="5")] + pub queue_size_ms: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3202,8 +3202,8 @@ pub struct NewSoxResamplerRequest { pub output_data_type: i32, #[prost(enumeration="SoxQualityRecipe", required, tag="6")] pub quality_recipe: i32, - #[prost(uint32, required, tag="7")] - pub flags: u32, + #[prost(uint32, optional, tag="7")] + pub flags: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/livekit-ffi/src/server/audio_source.rs b/livekit-ffi/src/server/audio_source.rs index 703f9e668..115792710 100644 --- a/livekit-ffi/src/server/audio_source.rs +++ b/livekit-ffi/src/server/audio_source.rs @@ -43,7 +43,7 @@ impl FfiAudioSource { new_source.options.map(Into::into).unwrap_or_default(), new_source.sample_rate, new_source.num_channels, - new_source.queue_size_ms, + new_source.queue_size_ms.unwrap_or(1000), ); RtcAudioSource::Native(audio_source) } diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index bde45053d..782969613 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -58,12 +58,8 @@ impl FfiAudioStream { #[cfg(not(target_arch = "wasm32"))] proto::AudioStreamType::AudioStreamNative => { let audio_stream = Self { handle_id, stream_type, self_dropped_tx }; - - let sample_rate = - if new_stream.sample_rate == 0 { 48000 } else { new_stream.sample_rate as i32 }; - - let num_channels = - if new_stream.num_channels == 0 { 1 } else { new_stream.num_channels as i32 }; + let sample_rate = new_stream.sample_rate.unwrap_or(48000); + let num_channels = new_stream.num_channels.unwrap_or(1); let native_stream = NativeAudioStream::new(rtc_track, sample_rate as i32, num_channels as i32); @@ -85,7 +81,7 @@ impl FfiAudioStream { let info = proto::AudioStreamInfo::from(&audio_stream); server.store_handle(handle_id, audio_stream); - Ok(proto::OwnedAudioStream { handle: proto::FfiOwnedHandle { id: handle_id }, info: info }) + Ok(proto::OwnedAudioStream { handle: proto::FfiOwnedHandle { id: handle_id }, info }) } pub fn from_participant( @@ -116,7 +112,7 @@ impl FfiAudioStream { let info = proto::AudioStreamInfo::from(&audio_stream); server.store_handle(handle_id, audio_stream); - Ok(proto::OwnedAudioStream { handle: proto::FfiOwnedHandle { id: handle_id }, info: info }) + Ok(proto::OwnedAudioStream { handle: proto::FfiOwnedHandle { id: handle_id }, info }) } async fn participant_audio_stream_task( @@ -156,11 +152,8 @@ impl FfiAudioStream { let (c_tx, c_rx) = oneshot::channel::<()>(); let (handle_dropped_tx, handle_dropped_rx) = oneshot::channel::<()>(); let (done_tx, mut done_rx) = oneshot::channel::<()>(); - let sample_rate = - if request.sample_rate == 0 { 48000 } else { request.sample_rate as i32 }; - - let num_channels = - if request.num_channels == 0 { 1 } else { request.num_channels as i32 }; + let sample_rate = request.sample_rate.unwrap_or(48000) as i32; + let num_channels = request.num_channels.unwrap_or(1) as i32; let mut track_finished_rx = track_finished_tx.subscribe(); server.async_runtime.spawn(async move { diff --git a/livekit-ffi/src/server/colorcvt/cvtimpl.rs b/livekit-ffi/src/server/colorcvt/cvtimpl.rs index 0ddc6f211..b6ebcb631 100644 --- a/livekit-ffi/src/server/colorcvt/cvtimpl.rs +++ b/livekit-ffi/src/server/colorcvt/cvtimpl.rs @@ -63,6 +63,15 @@ pub unsafe fn cvt_rgba( ); Ok((dst, info)) } + proto::VideoBufferType::Bgra => { + let mut dst = vec![0u8; (width * height * 4) as usize].into_boxed_slice(); + let stride = width * 4; + + colorcvt::abgr_to_argb(data, stride, &mut dst, stride, width, height, flip_y); + + let info = rgba_info(dst.as_ptr(), dst_type, width, height); + Ok((dst, info)) + } _ => { Err(FfiError::InvalidRequest(format!("rgba to {:?} is not supported", dst_type).into())) } @@ -197,6 +206,16 @@ pub unsafe fn cvt_bgra( chroma_w, chroma_w, ); + + Ok((dst, info)) + } + proto::VideoBufferType::Rgba => { + let mut dst = vec![0u8; (width * height * 4) as usize].into_boxed_slice(); + let stride = width * 4; + + colorcvt::argb_to_abgr(data, stride, &mut dst, stride, width, height, flip_y); + + let info = rgba_info(dst.as_ptr(), dst_type, width, height); Ok((dst, info)) } _ => { diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index cb3b6a715..6af1fd665 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -396,12 +396,11 @@ unsafe fn on_video_convert( let ref buffer = video_convert.buffer; let flip_y = video_convert.flip_y; let dst_type = video_convert.dst_type(); - match cvtimpl::cvt(buffer.clone(), dst_type, flip_y) { + match cvtimpl::cvt(buffer.clone(), dst_type, flip_y.unwrap_or(false)) { Ok((buffer, info)) => { let id = server.next_id(); server.store_handle(id, buffer); - let owned_info = - proto::OwnedVideoBuffer { handle: proto::FfiOwnedHandle { id }, info: info }; + let owned_info = proto::OwnedVideoBuffer { handle: proto::FfiOwnedHandle { id }, info }; Ok(proto::VideoConvertResponse { message: Some(proto::video_convert_response::Message::Buffer(owned_info)), }) @@ -693,8 +692,10 @@ fn on_new_sox_resampler( output_type: new_soxr.output_data_type(), }; - let quality_spec = - resampler::QualitySpec { quality: new_soxr.quality_recipe(), flags: new_soxr.flags }; + let quality_spec = resampler::QualitySpec { + quality: new_soxr.quality_recipe(), + flags: new_soxr.flags.unwrap_or(0), + }; let runtime_spec = resampler::RuntimeSpec { num_threads: 1 }; diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 94d32688d..4e9f2ceb4 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -64,7 +64,7 @@ impl FfiVideoStream { server, handle_id, new_stream.format.and_then(|_| Some(new_stream.format())), - new_stream.normalize_stride, + new_stream.normalize_stride.unwrap_or(true), NativeVideoStream::new(rtc_track), self_dropped_rx, server.watch_handle_dropped(new_stream.track_handle), @@ -80,7 +80,7 @@ impl FfiVideoStream { let info = proto::VideoStreamInfo::from(&stream); server.store_handle(stream.handle_id, stream); - Ok(proto::OwnedVideoStream { handle: proto::FfiOwnedHandle { id: handle_id }, info: info }) + Ok(proto::OwnedVideoStream { handle: proto::FfiOwnedHandle { id: handle_id }, info }) } pub fn from_participant( @@ -242,7 +242,7 @@ impl FfiVideoStream { server, stream_handle, dst_type, - request.normalize_stride, + request.normalize_stride.unwrap_or(true), NativeVideoStream::new(rtc_track), c_rx, handle_dropped_rx,