diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 7e642c5c0..a4638e229 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -132,6 +132,7 @@ pub enum RoomEvent { }, DataReceived { payload: Arc>, + topic: Option, kind: DataPacketKind, participant: Option, }, @@ -557,10 +558,11 @@ impl RoomSession { EngineEvent::Disconnected { reason } => self.handle_disconnected(reason), EngineEvent::Data { payload, + topic, kind, participant_sid, } => { - self.handle_data(payload, kind, participant_sid); + self.handle_data(payload, topic, kind, participant_sid); } EngineEvent::SpeakersChanged { speakers } => self.handle_speakers_changed(speakers), EngineEvent::ConnectionQuality { updates } => { @@ -970,6 +972,7 @@ impl RoomSession { fn handle_data( &self, payload: Vec, + topic: Option, kind: DataPacketKind, participant_sid: Option, ) { @@ -985,6 +988,7 @@ impl RoomSession { self.dispatcher.dispatch(&RoomEvent::DataReceived { payload: Arc::new(payload), + topic, kind, participant, }); diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 6595f04b3..8d31467ce 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -286,9 +286,19 @@ impl LocalParticipant { } } + pub async fn publish_data_only( + &self, + data: Vec, + kind: DataPacketKind, + destination_sids: Vec, + ) -> RoomResult<()> { + self.publish_data(data, None, kind, destination_sids).await + } + pub async fn publish_data( &self, data: Vec, + topic: Option, kind: DataPacketKind, destination_sids: Vec, ) -> RoomResult<()> { @@ -296,6 +306,7 @@ impl LocalParticipant { kind: kind as i32, value: Some(proto::data_packet::Value::User(proto::UserPacket { payload: data, + topic: topic, destination_sids: destination_sids.to_owned(), ..Default::default() })), diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index d3341c359..d48a39fe3 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -90,6 +90,7 @@ pub enum EngineEvent { Data { participant_sid: Option, payload: Vec, + topic: Option, kind: DataPacketKind, }, SpeakersChanged { @@ -383,11 +384,13 @@ impl EngineInner { SessionEvent::Data { participant_sid, payload, + topic, kind, } => { let _ = self.engine_tx.send(EngineEvent::Data { participant_sid, payload, + topic, kind, }); } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 49cec8cf6..f45ed028c 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -59,6 +59,7 @@ pub enum SessionEvent { // None when the data comes from the ServerSDK (So no real participant) participant_sid: Option, payload: Vec, + topic: Option, kind: DataPacketKind, }, MediaTrack { @@ -573,6 +574,7 @@ impl SessionInner { kind: data.kind().into(), participant_sid: participant_sid.map(|s| s.try_into().unwrap()), payload: user.payload.clone(), + topic: user.topic.clone(), }); } proto::data_packet::Value::Speaker(_) => {}