diff --git a/examples/basic_room/src/main.rs b/examples/basic_room/src/main.rs index fb8875658..d83c04402 100644 --- a/examples/basic_room/src/main.rs +++ b/examples/basic_room/src/main.rs @@ -1,5 +1,5 @@ -use livekit_api::access_token; use livekit::prelude::*; +use livekit_api::access_token; use std::env; // Connect to a room using the specified env variables @@ -24,18 +24,17 @@ async fn main() { .to_jwt() .unwrap(); - let (room, mut rx) = Room::connect(&url, &token, RoomOptions::default()) .await .unwrap(); log::info!("Connected to room: {} - {}", room.name(), room.sid()); room.local_participant() - .publish_data( - "Hello world".to_owned().into_bytes(), - DataPacketKind::Reliable, - Default::default(), - ) + .publish_data(DataPacket { + payload: "Hello world".to_owned().into_bytes(), + kind: DataPacketKind::Reliable, + ..Default::default() + }) .await .unwrap(); diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 7d341faca..64f61444e 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -89,6 +89,7 @@ message PublishDataRequest { uint64 data_len = 3; DataPacketKind kind = 4; repeated string destination_sids = 5; // destination + optional string topic = 6; } message PublishDataResponse { uint64 async_id = 1; @@ -343,6 +344,7 @@ message DataReceived { OwnedBuffer data = 1; optional string participant_sid = 2; // Can be empty if the data is sent a server SDK DataPacketKind kind = 3; + optional string topic = 4; } message ConnectionStateChanged { ConnectionState state = 1; } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 4d318a76e..bf5fa74e9 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -2157,6 +2157,8 @@ pub struct PublishDataRequest { /// destination #[prost(string, repeated, tag="5")] pub destination_sids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag="6")] + pub topic: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2541,6 +2543,8 @@ pub struct DataReceived { pub participant_sid: ::core::option::Option<::prost::alloc::string::String>, #[prost(enumeration="DataPacketKind", tag="3")] pub kind: i32, + #[prost(string, optional, tag="4")] + pub topic: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 123b2c467..17d481824 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -59,7 +59,7 @@ pub struct FfiRoom { pub struct RoomInner { pub room: Room, handle_id: FfiHandleId, - data_tx: mpsc::UnboundedSender, + data_tx: mpsc::UnboundedSender, // local tracks just published, it is used to synchronize the publish events: // - make sure LocalTrackPublised is sent *after* the PublishTrack callback) @@ -74,10 +74,8 @@ struct Handle { close_tx: broadcast::Sender<()>, } -struct DataPacket { - data: Vec, - kind: DataPacketKind, - destination_sids: Vec, +struct FfiDataPacket { + payload: DataPacket, async_id: u64, } @@ -99,9 +97,12 @@ impl FfiRoom { Ok((room, mut events)) => { // Successfully connected to the room // Forward the initial state for the FfiClient - let Some(RoomEvent::Connected { participants_with_tracks}) = events.recv().await else { - unreachable!("Connected event should always be the first event"); - }; + let Some(RoomEvent::Connected { + participants_with_tracks, + }) = events.recv().await + else { + unreachable!("Connected event should always be the first event"); + }; let (data_tx, data_rx) = mpsc::unbounded_channel(); let (close_tx, close_rx) = broadcast::channel(1); @@ -201,14 +202,20 @@ impl RoomInner { slice::from_raw_parts(publish.data_ptr as *const u8, publish.data_len as usize) }; let kind = publish.kind(); - let destination_sids: Vec = publish.destination_sids; + let destination_sids = publish.destination_sids; let async_id = server.next_id(); self.data_tx - .send(DataPacket { - data: data.to_vec(), // Avoid copy? - kind: kind.into(), - destination_sids, + .send(FfiDataPacket { + payload: DataPacket { + payload: data.to_vec(), // Avoid copy? + kind: kind.into(), + topic: publish.topic, + destination_sids: destination_sids + .into_iter() + .map(|str| str.try_into().unwrap()) + .collect(), + }, async_id, }) .map_err(|_| FfiError::InvalidRequest("failed to send data packet".into()))?; @@ -384,17 +391,13 @@ impl RoomInner { async fn data_task( server: &'static FfiServer, inner: Arc, - mut data_rx: mpsc::UnboundedReceiver, + mut data_rx: mpsc::UnboundedReceiver, mut close_rx: broadcast::Receiver<()>, ) { loop { tokio::select! { Some(event) = data_rx.recv() => { - let res = inner.room.local_participant().publish_data( - event.data, - event.kind, - event.destination_sids, - ).await; + let res = inner.room.local_participant().publish_data(event.payload).await; let cb = proto::PublishDataCallback { async_id: event.async_id, @@ -727,6 +730,7 @@ async fn forward_event( payload, kind, participant, + topic, } => { let handle_id = server.next_id(); let buffer_info = proto::BufferInfo { @@ -749,6 +753,7 @@ async fn forward_event( }), participant_sid: participant.map(|p| p.sid().to_string()), kind: proto::DataPacketKind::from(kind).into(), + topic, }, )) .await; diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index 96ab1c67b..0668b9de8 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -15,7 +15,8 @@ pub use crate::participant::{ConnectionQuality, LocalParticipant, Participant, RemoteParticipant}; pub use crate::{ - ConnectionState, DataPacketKind, Room, RoomError, RoomEvent, RoomOptions, RoomResult, + ConnectionState, DataPacket, DataPacketKind, Room, RoomError, RoomEvent, RoomOptions, + RoomResult, }; pub use crate::publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication}; diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index a4638e229..a120de502 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -166,6 +166,25 @@ pub enum DataPacketKind { Reliable, } +#[derive(Debug, Clone)] +pub struct DataPacket { + pub payload: Vec, + pub topic: Option, + pub kind: DataPacketKind, + pub destination_sids: Vec, +} + +impl Default for DataPacket { + fn default() -> Self { + Self { + payload: Vec::new(), + topic: None, + kind: DataPacketKind::Reliable, + destination_sids: Vec::new(), + } + } +} + #[derive(Clone)] pub struct RoomOptions { pub auto_subscribe: bool, @@ -428,6 +447,10 @@ impl Room { } } + pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> { + self.inner.rtc_engine.simulate_scenario(scenario).await + } + pub fn subscribe(&self) -> mpsc::UnboundedReceiver { self.inner.dispatcher.register() } @@ -456,10 +479,6 @@ impl Room { self.inner.participants.read().0.clone() } - pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> { - self.inner.rtc_engine.simulate_scenario(scenario).await - } - pub fn e2ee_manager(&self) -> &E2eeManager { &self.inner.e2ee_manager } diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 8d31467ce..9d76367ac 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -21,6 +21,7 @@ use crate::options::video_layers_from_encodings; use crate::options::TrackPublishOptions; use crate::prelude::*; use crate::rtc_engine::RtcEngine; +use crate::DataPacket; use crate::DataPacketKind; use libwebrtc::rtp_parameters::RtpEncodingParameters; use livekit_protocol as proto; @@ -286,35 +287,24 @@ impl LocalParticipant { } } - pub async fn publish_data_only( - &self, - data: Vec, - kind: DataPacketKind, - destination_sids: Vec, - ) -> RoomResult<()> { - self.publish_data(data, None, kind, destination_sids).await - } - - pub async fn publish_data( - &self, - data: Vec, - topic: Option, - kind: DataPacketKind, - destination_sids: Vec, - ) -> RoomResult<()> { + pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> { let data = proto::DataPacket { - kind: kind as i32, + kind: DataPacketKind::from(packet.kind) as i32, value: Some(proto::data_packet::Value::User(proto::UserPacket { - payload: data, - topic: topic, - destination_sids: destination_sids.to_owned(), + payload: packet.payload, + topic: packet.topic, + destination_sids: packet + .destination_sids + .into_iter() + .map(Into::into) + .collect(), ..Default::default() })), }; self.inner .rtc_engine - .publish_data(&data, kind) + .publish_data(&data, packet.kind) .await .map_err(Into::into) }