From 5047dae4066e219e4cc33699fbaba5abd52bda28 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 8 Jan 2025 09:25:27 +0100 Subject: [PATCH] DataStream sending support and recv fixes (#533) * sending support and recv fixes * fix duplicated function * fixes * fix compilation errors * add error callback * fix forwarding issues * fix optional params * fix conversion * fix optional fields * required/optional changes * remove totalchunks * fix none --- livekit-ffi/protocol/ffi.proto | 11 ++ livekit-ffi/protocol/room.proto | 66 +++++++-- livekit-ffi/src/conversion/room.rs | 60 ++++++++- livekit-ffi/src/livekit.proto.rs | 127 ++++++++++++++---- livekit-ffi/src/server/requests.rs | 28 ++++ livekit-ffi/src/server/room.rs | 70 +++++++++- livekit/src/room/mod.rs | 26 ++-- .../src/room/participant/local_participant.rs | 13 ++ livekit/src/rtc_engine/mod.rs | 14 +- livekit/src/rtc_engine/rtc_session.rs | 16 ++- 10 files changed, 366 insertions(+), 65 deletions(-) diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 4ff6df53c..8c715bbf1 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -111,6 +111,11 @@ message FfiRequest { // Track Publication EnableRemoteTrackPublicationRequest enable_remote_track_publication = 42; UpdateRemoteTrackPublicationDimensionRequest update_remote_track_publication_dimension = 43; + + // Data Streams + SendStreamHeaderRequest send_stream_header = 44; + SendStreamChunkRequest send_stream_chunk = 45; + } } @@ -168,6 +173,10 @@ message FfiResponse { // Track Publication EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41; UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42; + + // Data Streams + SendStreamHeaderResponse send_stream_header = 43; + SendStreamChunkResponse send_stream_chunk = 44; } } @@ -199,6 +208,8 @@ message FfiEvent { SendChatMessageCallback chat_message = 22; PerformRpcCallback perform_rpc = 23; RpcMethodInvocationEvent rpc_method_invocation = 24; + SendStreamHeaderCallback send_stream_header = 25; + SendStreamChunkCallback send_stream_chunk = 26; } } diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index c6f595699..90f14089b 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -367,8 +367,8 @@ message RoomEvent { DataPacketReceived data_packet_received = 27; TranscriptionReceived transcription_received = 28; ChatMessageReceived chat_message = 29; - DataStream.Header stream_header = 30; - DataStream.Chunk stream_chunk = 31; + DataStreamHeaderReceived stream_header_received = 30; + DataStreamChunkReceived stream_chunk_received = 31; } } @@ -541,10 +541,10 @@ message DataStream { // header properties specific to text streams message TextHeader { required OperationType operation_type = 1; - required int32 version = 2; // Optional: Version for updates/edits - required string reply_to_stream_id = 3; // Optional: Reply to specific message + optional int32 version = 2; // Optional: Version for updates/edits + optional string reply_to_stream_id = 3; // Optional: Reply to specific message repeated string attached_stream_ids = 4; // file attachments for text streams - required bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription + optional bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription } @@ -557,16 +557,15 @@ message DataStream { message Header { required string stream_id = 1; // unique identifier for this data stream required int64 timestamp = 2; // using int64 for Unix timestamp - required string topic = 3; - required string mime_type = 4; + required string mime_type = 3; + required string topic = 4; optional uint64 total_length = 5; // only populated for finite streams, if it's a stream of unknown size this stays empty - optional uint64 total_chunks = 6; // only populated for finite streams, if it's a stream of unknown size this stays empty - map extensions = 7; // user defined extensions map that can carry additional info + map extensions = 6; // user defined extensions map that can carry additional info // oneof to choose between specific header types oneof content_header { - TextHeader text_header = 8; - FileHeader file_header = 9; + TextHeader text_header = 7; + FileHeader file_header = 8; } } @@ -574,9 +573,50 @@ message DataStream { required string stream_id = 1; // unique identifier for this data stream to map it to the correct header required uint64 chunk_index = 2; required bytes content = 3; // content as binary (bytes) - required bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content - required int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced + optional bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content + optional int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption } } +message DataStreamHeaderReceived { + required string participant_identity = 1; + required DataStream.Header header = 2; +} + +message DataStreamChunkReceived { + required string participant_identity = 1; + required DataStream.Chunk chunk = 2; +} + +message SendStreamHeaderRequest { + required uint64 local_participant_handle = 1; + required DataStream.Header header = 2; + repeated string destination_identities = 3; + optional string sender_identity = 4; +} + +message SendStreamChunkRequest { + required uint64 local_participant_handle = 1; + required DataStream.Chunk chunk = 2; + repeated string destination_identities = 3; + optional string sender_identity = 4; +} + +message SendStreamHeaderResponse { + required uint64 async_id = 1; +} + +message SendStreamChunkResponse { + required uint64 async_id = 1; +} + +message SendStreamHeaderCallback { + required uint64 async_id = 1; + optional string error = 2; +} + +message SendStreamChunkCallback { + required uint64 async_id = 1; + optional string error = 2; +} \ No newline at end of file diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index 521c5c8c5..42479342f 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -289,10 +289,10 @@ impl From for proto::data_stream::Header Some(proto::data_stream::header::ContentHeader::TextHeader( proto::data_stream::TextHeader { operation_type: text_header.operation_type, - version: text_header.version, - reply_to_stream_id: text_header.reply_to_stream_id, + version: Some(text_header.version), + reply_to_stream_id: Some(text_header.reply_to_stream_id), attached_stream_ids: text_header.attached_stream_ids, - generated: text_header.generated, + generated: Some(text_header.generated), }, )) } @@ -309,7 +309,6 @@ impl From for proto::data_stream::Header timestamp: msg.timestamp, topic: msg.topic, mime_type: msg.mime_type, - total_chunks: msg.total_chunks, total_length: msg.total_length, extensions: msg.extensions, content_header, @@ -317,14 +316,63 @@ impl From for proto::data_stream::Header } } +impl From for livekit_protocol::data_stream::Header { + fn from(msg: proto::data_stream::Header) -> Self { + let content_header = match msg.content_header { + Some(proto::data_stream::header::ContentHeader::TextHeader(text_header)) => { + Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader( + livekit_protocol::data_stream::TextHeader { + operation_type: text_header.operation_type, + version: text_header.version.unwrap_or_default(), + reply_to_stream_id: text_header.reply_to_stream_id.unwrap_or_default(), + attached_stream_ids: text_header.attached_stream_ids, + generated: text_header.generated.unwrap_or(false), + }, + )) + } + Some(proto::data_stream::header::ContentHeader::FileHeader(file_header)) => { + Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader( + livekit_protocol::data_stream::FileHeader { file_name: file_header.file_name }, + )) + } + None => None, + }; + + livekit_protocol::data_stream::Header { + stream_id: msg.stream_id, + timestamp: msg.timestamp, + topic: msg.topic, + mime_type: msg.mime_type, + total_length: msg.total_length, + total_chunks: None, + extensions: msg.extensions, + content_header, + encryption_type: 0, + } + } +} + impl From for proto::data_stream::Chunk { fn from(msg: livekit_protocol::data_stream::Chunk) -> Self { proto::data_stream::Chunk { stream_id: msg.stream_id, content: msg.content, - complete: msg.complete, + complete: Some(msg.complete), + chunk_index: msg.chunk_index, + version: Some(msg.version), + iv: msg.iv, + } + } +} + +impl From for livekit_protocol::data_stream::Chunk { + fn from(msg: proto::data_stream::Chunk) -> Self { + livekit_protocol::data_stream::Chunk { + stream_id: msg.stream_id, + content: msg.content, + complete: msg.complete.unwrap_or(false), chunk_index: msg.chunk_index, - version: msg.version, + version: msg.version.unwrap_or(0), iv: msg.iv, } } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index c45f678c7..13b495481 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -2702,9 +2702,9 @@ pub mod room_event { #[prost(message, tag="29")] ChatMessage(super::ChatMessageReceived), #[prost(message, tag="30")] - StreamHeader(super::data_stream::Header), + StreamHeaderReceived(super::DataStreamHeaderReceived), #[prost(message, tag="31")] - StreamChunk(super::data_stream::Chunk), + StreamChunkReceived(super::DataStreamChunkReceived), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2992,17 +2992,17 @@ pub mod data_stream { #[prost(enumeration="OperationType", required, tag="1")] pub operation_type: i32, /// Optional: Version for updates/edits - #[prost(int32, required, tag="2")] - pub version: i32, + #[prost(int32, optional, tag="2")] + pub version: ::core::option::Option, /// Optional: Reply to specific message - #[prost(string, required, tag="3")] - pub reply_to_stream_id: ::prost::alloc::string::String, + #[prost(string, optional, tag="3")] + pub reply_to_stream_id: ::core::option::Option<::prost::alloc::string::String>, /// file attachments for text streams #[prost(string, repeated, tag="4")] pub attached_stream_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// true if the text has been generated by an agent from a participant's audio transcription - #[prost(bool, required, tag="5")] - pub generated: bool, + #[prost(bool, optional, tag="5")] + pub generated: ::core::option::Option, } /// header properties specific to file or image streams #[allow(clippy::derive_partial_eq_without_eq)] @@ -3023,20 +3023,17 @@ pub mod data_stream { #[prost(int64, required, tag="2")] pub timestamp: i64, #[prost(string, required, tag="3")] - pub topic: ::prost::alloc::string::String, - #[prost(string, required, tag="4")] pub mime_type: ::prost::alloc::string::String, + #[prost(string, required, tag="4")] + pub topic: ::prost::alloc::string::String, /// only populated for finite streams, if it's a stream of unknown size this stays empty #[prost(uint64, optional, tag="5")] pub total_length: ::core::option::Option, - /// only populated for finite streams, if it's a stream of unknown size this stays empty - #[prost(uint64, optional, tag="6")] - pub total_chunks: ::core::option::Option, /// user defined extensions map that can carry additional info - #[prost(map="string, string", tag="7")] + #[prost(map="string, string", tag="6")] pub extensions: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, /// oneof to choose between specific header types - #[prost(oneof="header::ContentHeader", tags="8, 9")] + #[prost(oneof="header::ContentHeader", tags="7, 8")] pub content_header: ::core::option::Option, } /// Nested message and enum types in `Header`. @@ -3045,9 +3042,9 @@ pub mod data_stream { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum ContentHeader { - #[prost(message, tag="8")] + #[prost(message, tag="7")] TextHeader(super::TextHeader), - #[prost(message, tag="9")] + #[prost(message, tag="8")] FileHeader(super::FileHeader), } } @@ -3063,11 +3060,11 @@ pub mod data_stream { #[prost(bytes="vec", required, tag="3")] pub content: ::prost::alloc::vec::Vec, /// true only if this is the last chunk of this stream - can also be sent with empty content - #[prost(bool, required, tag="4")] - pub complete: bool, + #[prost(bool, optional, tag="4")] + pub complete: ::core::option::Option, /// a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced - #[prost(int32, required, tag="5")] - pub version: i32, + #[prost(int32, optional, tag="5")] + pub version: ::core::option::Option, /// optional, initialization vector for AES-GCM encryption #[prost(bytes="vec", optional, tag="6")] pub iv: ::core::option::Option<::prost::alloc::vec::Vec>, @@ -3106,6 +3103,74 @@ pub mod data_stream { } } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DataStreamHeaderReceived { + #[prost(string, required, tag="1")] + pub participant_identity: ::prost::alloc::string::String, + #[prost(message, required, tag="2")] + pub header: data_stream::Header, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DataStreamChunkReceived { + #[prost(string, required, tag="1")] + pub participant_identity: ::prost::alloc::string::String, + #[prost(message, required, tag="2")] + pub chunk: data_stream::Chunk, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendStreamHeaderRequest { + #[prost(uint64, required, tag="1")] + pub local_participant_handle: u64, + #[prost(message, required, tag="2")] + pub header: data_stream::Header, + #[prost(string, repeated, tag="3")] + pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag="4")] + pub sender_identity: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendStreamChunkRequest { + #[prost(uint64, required, tag="1")] + pub local_participant_handle: u64, + #[prost(message, required, tag="2")] + pub chunk: data_stream::Chunk, + #[prost(string, repeated, tag="3")] + pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag="4")] + pub sender_identity: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendStreamHeaderResponse { + #[prost(uint64, required, tag="1")] + pub async_id: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendStreamChunkResponse { + #[prost(uint64, required, tag="1")] + pub async_id: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendStreamHeaderCallback { + #[prost(uint64, required, tag="1")] + pub async_id: u64, + #[prost(string, optional, tag="2")] + pub error: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendStreamChunkCallback { + #[prost(uint64, required, tag="1")] + pub async_id: u64, + #[prost(string, optional, tag="2")] + pub error: ::core::option::Option<::prost::alloc::string::String>, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum IceTransportType { @@ -3878,7 +3943,7 @@ pub struct RpcMethodInvocationEvent { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3976,13 +4041,18 @@ pub mod ffi_request { EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationRequest), #[prost(message, tag="43")] UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionRequest), + /// Data Streams + #[prost(message, tag="44")] + SendStreamHeader(super::SendStreamHeaderRequest), + #[prost(message, tag="45")] + SendStreamChunk(super::SendStreamChunkRequest), } } /// This is the output of livekit_ffi_request function. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -4078,6 +4148,11 @@ pub mod ffi_response { EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationResponse), #[prost(message, tag="42")] UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionResponse), + /// Data Streams + #[prost(message, tag="43")] + SendStreamHeader(super::SendStreamHeaderResponse), + #[prost(message, tag="44")] + SendStreamChunk(super::SendStreamChunkResponse), } } /// To minimize complexity, participant events are not included in the protocol. @@ -4086,7 +4161,7 @@ pub mod ffi_response { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiEvent { - #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24")] + #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiEvent`. @@ -4140,6 +4215,10 @@ pub mod ffi_event { PerformRpc(super::PerformRpcCallback), #[prost(message, tag="24")] RpcMethodInvocation(super::RpcMethodInvocationEvent), + #[prost(message, tag="25")] + SendStreamHeader(super::SendStreamHeaderCallback), + #[prost(message, tag="26")] + SendStreamChunk(super::SendStreamChunkCallback), } } /// Stop all rooms synchronously (Do we need async here?). diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 327402893..16739361c 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -236,6 +236,28 @@ fn on_edit_chat_message( Ok(ffi_participant.room.edit_chat_message(server, edit_chat_message)) } +fn on_send_stream_header( + server: &'static FfiServer, + stream_header_message: proto::SendStreamHeaderRequest, +) -> FfiResult { + let ffi_participant = server + .retrieve_handle::(stream_header_message.local_participant_handle)? + .clone(); + + Ok(ffi_participant.room.send_stream_header(server, stream_header_message)) +} + +fn on_send_stream_chunk( + server: &'static FfiServer, + stream_chunk_message: proto::SendStreamChunkRequest, +) -> FfiResult { + let ffi_participant = server + .retrieve_handle::(stream_chunk_message.local_participant_handle)? + .clone(); + + Ok(ffi_participant.room.send_stream_chunk(server, stream_chunk_message)) +} + /// Create a new video track from a source fn on_create_video_track( server: &'static FfiServer, @@ -1035,6 +1057,12 @@ pub fn handle_request( on_update_remote_track_publication_dimension(server, request)?, ) } + proto::ffi_request::Message::SendStreamHeader(request) => { + proto::ffi_response::Message::SendStreamHeader(on_send_stream_header(server, request)?) + } + proto::ffi_request::Message::SendStreamChunk(request) => { + proto::ffi_response::Message::SendStreamChunk(on_send_stream_chunk(server, request)?) + } }); Ok(res) diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index fe6be4110..75fbfa769 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -18,6 +18,7 @@ use std::{collections::HashSet, slice, sync::Arc}; use livekit::prelude::*; use livekit::ChatMessage; +use livekit_protocol as lk_proto; use parking_lot::Mutex; use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex}; use tokio::task::JoinHandle; @@ -674,6 +675,63 @@ impl RoomInner { proto::SendChatMessageResponse { async_id } } + pub fn send_stream_header( + self: &Arc, + server: &'static FfiServer, + send_stream_header: proto::SendStreamHeaderRequest, + ) -> proto::SendStreamHeaderResponse { + let packet = lk_proto::DataPacket { + kind: proto::DataPacketKind::KindReliable.into(), + participant_identity: send_stream_header.sender_identity.unwrap(), + destination_identities: send_stream_header.destination_identities, + value: livekit_protocol::data_packet::Value::StreamHeader( + send_stream_header.header.into(), + ) + .into(), + }; + let async_id = server.next_id(); + let inner = self.clone(); + let handle = server.async_runtime.spawn(async move { + let res = inner.room.local_participant().publish_raw_data(packet, true).await; + let cb = proto::SendStreamHeaderCallback { + async_id, + error: res.err().map(|e| e.to_string()), + }; + let _ = server.send_event(proto::ffi_event::Message::SendStreamHeader(cb)); + }); + server.watch_panic(handle); + proto::SendStreamHeaderResponse { async_id } + } + + pub fn send_stream_chunk( + self: &Arc, + server: &'static FfiServer, + send_stream_chunk: proto::SendStreamChunkRequest, + ) -> proto::SendStreamChunkResponse { + let packet = lk_proto::DataPacket { + kind: proto::DataPacketKind::KindReliable.into(), + participant_identity: send_stream_chunk.sender_identity.unwrap(), + destination_identities: send_stream_chunk.destination_identities, + value: livekit_protocol::data_packet::Value::StreamChunk( + send_stream_chunk.chunk.into(), + ) + .into(), + }; + let async_id = server.next_id(); + let inner = self.clone(); + let handle = server.async_runtime.spawn(async move { + let res: Result<(), RoomError> = + inner.room.local_participant().publish_raw_data(packet, true).await; + let cb = proto::SendStreamChunkCallback { + async_id, + error: res.err().map(|e| e.to_string()), + }; + let _ = server.send_event(proto::ffi_event::Message::SendStreamChunk(cb)); + }); + server.watch_panic(handle); + proto::SendStreamChunkResponse { async_id } + } + pub fn store_rpc_method_invocation_waiter( &self, invocation_id: u64, @@ -1145,11 +1203,15 @@ async fn forward_event( state: proto::EncryptionState::from(state).into(), })); } - RoomEvent::StreamHeaderReceived { header } => { - let _ = send_event(proto::room_event::Message::StreamHeader(header.into())); + RoomEvent::StreamHeaderReceived { header, participant_identity } => { + let _ = send_event(proto::room_event::Message::StreamHeaderReceived( + proto::DataStreamHeaderReceived { header: header.into(), participant_identity }, + )); } - RoomEvent::StreamChunkReceived { chunk } => { - let _ = send_event(proto::room_event::Message::StreamChunk(chunk.into())); + RoomEvent::StreamChunkReceived { chunk, participant_identity } => { + let _ = send_event(proto::room_event::Message::StreamChunkReceived( + proto::DataStreamChunkReceived { chunk: chunk.into(), participant_identity }, + )); } _ => { log::warn!("unhandled room event: {:?}", event); diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 5653002a1..051b56f05 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -170,9 +170,11 @@ pub enum RoomEvent { }, StreamHeaderReceived { header: proto::data_stream::Header, + participant_identity: String, }, StreamChunkReceived { chunk: proto::data_stream::Chunk, + participant_identity: String, }, E2eeStateChanged { participant: Participant, @@ -726,11 +728,11 @@ impl RoomSession { EngineEvent::LocalTrackSubscribed { track_sid } => { self.handle_track_subscribed(track_sid) } - EngineEvent::DataStreamHeader { header } => { - self.handle_data_stream_header(header); + EngineEvent::DataStreamHeader { header, participant_identity } => { + self.handle_data_stream_header(header, participant_identity); } - EngineEvent::DataStreamChunk { chunk } => { - self.handle_data_stream_chunk(chunk); + EngineEvent::DataStreamChunk { chunk, participant_identity } => { + self.handle_data_stream_chunk(chunk, participant_identity); } _ => {} } @@ -1242,13 +1244,21 @@ impl RoomSession { }); } - fn handle_data_stream_header(&self, header: proto::data_stream::Header) { - let event = RoomEvent::StreamHeaderReceived { header }; + fn handle_data_stream_header( + &self, + header: proto::data_stream::Header, + participant_identity: String, + ) { + let event = RoomEvent::StreamHeaderReceived { header, participant_identity }; self.dispatcher.dispatch(&event); } - fn handle_data_stream_chunk(&self, chunk: proto::data_stream::Chunk) { - let event = RoomEvent::StreamChunkReceived { chunk }; + fn handle_data_stream_chunk( + &self, + chunk: proto::data_stream::Chunk, + participant_identity: String, + ) { + let event = RoomEvent::StreamChunkReceived { chunk, participant_identity }; self.dispatcher.dispatch(&event); } diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 71ff4d775..bbf269c50 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -437,6 +437,19 @@ impl LocalParticipant { } } + /** internal */ + pub async fn publish_raw_data( + self, + packet: proto::DataPacket, + reliable: bool, + ) -> RoomResult<()> { + let kind = match reliable { + true => DataPacketKind::Reliable, + false => DataPacketKind::Lossy, + }; + self.inner.rtc_engine.publish_data(&packet, kind).await.map_err(Into::into) + } + pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> { let kind = match packet.reliable { true => DataPacketKind::Reliable, diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index d4f5dec97..2cd545215 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -161,9 +161,11 @@ pub enum EngineEvent { }, DataStreamHeader { header: proto::data_stream::Header, + participant_identity: String, }, DataStreamChunk { chunk: proto::data_stream::Chunk, + participant_identity: String, }, } @@ -530,11 +532,15 @@ impl EngineInner { SessionEvent::LocalTrackSubscribed { track_sid } => { let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid }); } - SessionEvent::DataStreamHeader { header } => { - let _ = self.engine_tx.send(EngineEvent::DataStreamHeader { header }); + SessionEvent::DataStreamHeader { header, participant_identity } => { + let _ = self + .engine_tx + .send(EngineEvent::DataStreamHeader { header, participant_identity }); } - SessionEvent::DataStreamChunk { chunk } => { - let _ = self.engine_tx.send(EngineEvent::DataStreamChunk { chunk }); + SessionEvent::DataStreamChunk { chunk, participant_identity } => { + let _ = self + .engine_tx + .send(EngineEvent::DataStreamChunk { chunk, participant_identity }); } } Ok(()) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index b3cb4f094..91dcbfddb 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -137,9 +137,11 @@ pub enum SessionEvent { }, DataStreamHeader { header: proto::data_stream::Header, + participant_identity: String, }, DataStreamChunk { chunk: proto::data_stream::Chunk, + participant_identity: String, }, } @@ -730,14 +732,16 @@ impl SessionInner { }); } proto::data_packet::Value::StreamHeader(message) => { - let _ = self - .emitter - .send(SessionEvent::DataStreamHeader { header: message.clone() }); + let _ = self.emitter.send(SessionEvent::DataStreamHeader { + header: message.clone(), + participant_identity: data.participant_identity.clone(), + }); } proto::data_packet::Value::StreamChunk(message) => { - let _ = self - .emitter - .send(SessionEvent::DataStreamChunk { chunk: message.clone() }); + let _ = self.emitter.send(SessionEvent::DataStreamChunk { + chunk: message.clone(), + participant_identity: data.participant_identity.clone(), + }); } _ => {} }