Skip to content

Commit

Permalink
DataStream sending support and recv fixes (#533)
Browse files Browse the repository at this point in the history
* sending support and recv fixes

* fix duplicated function

* fixes

* fix compilation errors

* add error callback

* fix forwarding issues

* fix optional params

* fix conversion

* fix optional fields

* required/optional changes

* remove totalchunks

* fix none
  • Loading branch information
lukasIO authored Jan 8, 2025
1 parent c6132e8 commit 5047dae
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 65 deletions.
11 changes: 11 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ message FfiRequest {
// Track Publication
EnableRemoteTrackPublicationRequest enable_remote_track_publication = 42;
UpdateRemoteTrackPublicationDimensionRequest update_remote_track_publication_dimension = 43;

// Data Streams
SendStreamHeaderRequest send_stream_header = 44;
SendStreamChunkRequest send_stream_chunk = 45;

}
}

Expand Down Expand Up @@ -168,6 +173,10 @@ message FfiResponse {
// Track Publication
EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41;
UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42;

// Data Streams
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;
}
}

Expand Down Expand Up @@ -199,6 +208,8 @@ message FfiEvent {
SendChatMessageCallback chat_message = 22;
PerformRpcCallback perform_rpc = 23;
RpcMethodInvocationEvent rpc_method_invocation = 24;
SendStreamHeaderCallback send_stream_header = 25;
SendStreamChunkCallback send_stream_chunk = 26;
}
}

Expand Down
66 changes: 53 additions & 13 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ message RoomEvent {
DataPacketReceived data_packet_received = 27;
TranscriptionReceived transcription_received = 28;
ChatMessageReceived chat_message = 29;
DataStream.Header stream_header = 30;
DataStream.Chunk stream_chunk = 31;
DataStreamHeaderReceived stream_header_received = 30;
DataStreamChunkReceived stream_chunk_received = 31;
}
}

Expand Down Expand Up @@ -541,10 +541,10 @@ message DataStream {
// header properties specific to text streams
message TextHeader {
required OperationType operation_type = 1;
required int32 version = 2; // Optional: Version for updates/edits
required string reply_to_stream_id = 3; // Optional: Reply to specific message
optional int32 version = 2; // Optional: Version for updates/edits
optional string reply_to_stream_id = 3; // Optional: Reply to specific message
repeated string attached_stream_ids = 4; // file attachments for text streams
required bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription
optional bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription

}

Expand All @@ -557,26 +557,66 @@ message DataStream {
message Header {
required string stream_id = 1; // unique identifier for this data stream
required int64 timestamp = 2; // using int64 for Unix timestamp
required string topic = 3;
required string mime_type = 4;
required string mime_type = 3;
required string topic = 4;
optional uint64 total_length = 5; // only populated for finite streams, if it's a stream of unknown size this stays empty
optional uint64 total_chunks = 6; // only populated for finite streams, if it's a stream of unknown size this stays empty
map<string, string> extensions = 7; // user defined extensions map that can carry additional info
map<string, string> extensions = 6; // user defined extensions map that can carry additional info

// oneof to choose between specific header types
oneof content_header {
TextHeader text_header = 8;
FileHeader file_header = 9;
TextHeader text_header = 7;
FileHeader file_header = 8;
}
}

message Chunk {
required string stream_id = 1; // unique identifier for this data stream to map it to the correct header
required uint64 chunk_index = 2;
required bytes content = 3; // content as binary (bytes)
required bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
required int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
optional int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption
}
}

message DataStreamHeaderReceived {
required string participant_identity = 1;
required DataStream.Header header = 2;
}

message DataStreamChunkReceived {
required string participant_identity = 1;
required DataStream.Chunk chunk = 2;
}

message SendStreamHeaderRequest {
required uint64 local_participant_handle = 1;
required DataStream.Header header = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
}

message SendStreamChunkRequest {
required uint64 local_participant_handle = 1;
required DataStream.Chunk chunk = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
}

message SendStreamHeaderResponse {
required uint64 async_id = 1;
}

message SendStreamChunkResponse {
required uint64 async_id = 1;
}

message SendStreamHeaderCallback {
required uint64 async_id = 1;
optional string error = 2;
}

message SendStreamChunkCallback {
required uint64 async_id = 1;
optional string error = 2;
}
60 changes: 54 additions & 6 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header
Some(proto::data_stream::header::ContentHeader::TextHeader(
proto::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version,
reply_to_stream_id: text_header.reply_to_stream_id,
version: Some(text_header.version),
reply_to_stream_id: Some(text_header.reply_to_stream_id),
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated,
generated: Some(text_header.generated),
},
))
}
Expand All @@ -309,22 +309,70 @@ impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_chunks: msg.total_chunks,
total_length: msg.total_length,
extensions: msg.extensions,
content_header,
}
}
}

impl From<proto::data_stream::Header> for livekit_protocol::data_stream::Header {
fn from(msg: proto::data_stream::Header) -> Self {
let content_header = match msg.content_header {
Some(proto::data_stream::header::ContentHeader::TextHeader(text_header)) => {
Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader(
livekit_protocol::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version.unwrap_or_default(),
reply_to_stream_id: text_header.reply_to_stream_id.unwrap_or_default(),
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated.unwrap_or(false),
},
))
}
Some(proto::data_stream::header::ContentHeader::FileHeader(file_header)) => {
Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader(
livekit_protocol::data_stream::FileHeader { file_name: file_header.file_name },
))
}
None => None,
};

livekit_protocol::data_stream::Header {
stream_id: msg.stream_id,
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_length: msg.total_length,
total_chunks: None,
extensions: msg.extensions,
content_header,
encryption_type: 0,
}
}
}

impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
fn from(msg: livekit_protocol::data_stream::Chunk) -> Self {
proto::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete,
complete: Some(msg.complete),
chunk_index: msg.chunk_index,
version: Some(msg.version),
iv: msg.iv,
}
}
}

impl From<proto::data_stream::Chunk> for livekit_protocol::data_stream::Chunk {
fn from(msg: proto::data_stream::Chunk) -> Self {
livekit_protocol::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete.unwrap_or(false),
chunk_index: msg.chunk_index,
version: msg.version,
version: msg.version.unwrap_or(0),
iv: msg.iv,
}
}
Expand Down
Loading

0 comments on commit 5047dae

Please sign in to comment.