Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataStream Trailer for livekit-ffi #547

Merged
merged 8 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .nanpa/data-stream-trailer.kdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
patch type="changed" package="livekit-protocol" "Update protocol version to v1.31.0"
patch type="added" package="livekit-ffi" "Add DataStream.Trailer support"
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions livekit-api/src/services/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl IngressClient {
bypass_transcoding: options.bypass_transcoding,
enable_transcoding: options.enable_transcoding,
url: options.url,
enabled: Default::default(), // TODO: support this attribute
},
self.base
.auth_header(VideoGrants { ingress_admin: true, ..Default::default() }, None)?,
Expand Down Expand Up @@ -121,6 +122,7 @@ impl IngressClient {
video: Some(options.video),
bypass_transcoding: options.bypass_transcoding,
enable_transcoding: options.enable_transcoding,
enabled: Default::default(), // TODO: support this attribute
},
self.base
.auth_header(VideoGrants { ingress_admin: true, ..Default::default() }, None)?,
Expand Down
39 changes: 35 additions & 4 deletions livekit-api/src/services/sip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl SIPClient {
krisp_enabled: options.krisp_enabled.unwrap_or(false),
max_call_duration: Self::duration_to_proto(options.max_call_duration),
ringing_timeout: Self::duration_to_proto(options.ringing_timeout),

// TODO: support these attributes
include_headers: Default::default(),
media_encryption: Default::default(),
}),
},
self.base.auth_header(
Expand Down Expand Up @@ -228,6 +232,10 @@ impl SIPClient {
headers: options.headers.unwrap_or_default(),
headers_to_attributes: options.headers_to_attributes.unwrap_or_default(),
attributes_to_headers: options.attributes_to_headers.unwrap_or_default(),

// TODO: support these attributes
include_headers: Default::default(),
media_encryption: Default::default(),
}),
},
self.base.auth_header(
Expand Down Expand Up @@ -269,7 +277,11 @@ impl SIPClient {
.request(
SVC,
"ListSIPInboundTrunk",
proto::ListSipInboundTrunkRequest {},
proto::ListSipInboundTrunkRequest {
// TODO: support these attributes
trunk_ids: Default::default(),
numbers: Default::default(),
},
self.base.auth_header(
Default::default(),
Some(SIPGrants { admin: true, ..Default::default() }),
Expand All @@ -289,7 +301,11 @@ impl SIPClient {
.request(
SVC,
"ListSIPOutboundTrunk",
proto::ListSipOutboundTrunkRequest {},
proto::ListSipOutboundTrunkRequest {
// TODO: support these attributes
trunk_ids: Default::default(),
numbers: Default::default(),
},
self.base.auth_header(
Default::default(),
Some(SIPGrants { admin: true, ..Default::default() }),
Expand Down Expand Up @@ -332,6 +348,10 @@ impl SIPClient {
inbound_numbers: options.allowed_numbers.to_owned(),
hide_phone_number: options.hide_phone_number,
rule: Some(proto::SipDispatchRule { rule: Some(rule.to_owned()) }),

// TODO: support these attributes
room_preset: Default::default(),
room_config: Default::default(),
},
self.base.auth_header(
Default::default(),
Expand All @@ -351,7 +371,11 @@ impl SIPClient {
.request(
SVC,
"ListSIPDispatchRule",
proto::ListSipDispatchRuleRequest {},
proto::ListSipDispatchRuleRequest {
// TODO: support these attributes
dispatch_rule_ids: Default::default(),
trunk_ids: Default::default(),
},
self.base.auth_header(
Default::default(),
Some(SIPGrants { admin: true, ..Default::default() }),
Expand Down Expand Up @@ -414,7 +438,14 @@ impl SIPClient {
hide_phone_number: options.hide_phone_number.unwrap_or(false),
max_call_duration: Self::duration_to_proto(options.max_call_duration),
ringing_timeout: Self::duration_to_proto(options.ringing_timeout),
enable_krisp: options.enable_krisp.unwrap_or(false),

// TODO: rename local proto as well
krisp_enabled: options.enable_krisp.unwrap_or(false),

// TODO: support these attributes
headers: Default::default(),
include_headers: Default::default(),
media_encryption: Default::default(),
},
self.base.auth_header(
Default::default(),
Expand Down
10 changes: 6 additions & 4 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ message FfiRequest {
// Data Streams
SendStreamHeaderRequest send_stream_header = 44;
SendStreamChunkRequest send_stream_chunk = 45;

SendStreamTrailerRequest send_stream_trailer = 46;
}
}

Expand Down Expand Up @@ -174,9 +174,10 @@ message FfiResponse {
EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41;
UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42;

// Data Streams
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;
// Data Streams
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;
SendStreamTrailerResponse send_stream_trailer = 45;
}
}

Expand Down Expand Up @@ -210,6 +211,7 @@ message FfiEvent {
RpcMethodInvocationEvent rpc_method_invocation = 24;
SendStreamHeaderCallback send_stream_header = 25;
SendStreamChunkCallback send_stream_chunk = 26;
SendStreamTrailerCallback send_stream_trailer = 27;
}
}

Expand Down
33 changes: 27 additions & 6 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,14 @@ message DataStream {
required string stream_id = 1; // unique identifier for this data stream to map it to the correct header
required uint64 chunk_index = 2;
required bytes content = 3; // content as binary (bytes)
optional bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
optional int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption
optional int32 version = 4; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bytes iv = 5; // optional, initialization vector for AES-GCM encryption
}

message Trailer {
required string stream_id = 1; // unique identifier for this data stream
required string reason = 2; // reason why the stream was closed (could contain "error" / "interrupted" / empty for expected end)
map<string, string> extensions = 3; // finalizing updates for the stream, can also include additional insights for errors or endTime for transcription
}
}

Expand All @@ -593,14 +598,21 @@ message SendStreamHeaderRequest {
required uint64 local_participant_handle = 1;
required DataStream.Header header = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
required string sender_identity = 4;
}

message SendStreamChunkRequest {
required uint64 local_participant_handle = 1;
required DataStream.Chunk chunk = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
required string sender_identity = 4;
}

message SendStreamTrailerRequest {
required uint64 local_participant_handle = 1;
required DataStream.Trailer trailer = 2;
repeated string destination_identities = 3;
required string sender_identity = 4;
}

message SendStreamHeaderResponse {
Expand All @@ -611,6 +623,10 @@ message SendStreamChunkResponse {
required uint64 async_id = 1;
}

message SendStreamTrailerResponse {
required uint64 async_id = 1;
}

message SendStreamHeaderCallback {
required uint64 async_id = 1;
optional string error = 2;
Expand All @@ -619,4 +635,9 @@ message SendStreamHeaderCallback {
message SendStreamChunkCallback {
required uint64 async_id = 1;
optional string error = 2;
}
}

message SendStreamTrailerCallback {
required uint64 async_id = 1;
optional string error = 2;
}
15 changes: 12 additions & 3 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ impl From<proto::data_stream::Header> for livekit_protocol::data_stream::Header
topic: msg.topic,
mime_type: msg.mime_type,
total_length: msg.total_length,
total_chunks: None,
extensions: msg.extensions,
content_header,
encryption_type: 0,
Expand All @@ -357,7 +356,6 @@ impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
proto::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: Some(msg.complete),
chunk_index: msg.chunk_index,
version: Some(msg.version),
iv: msg.iv,
Expand All @@ -370,10 +368,21 @@ impl From<proto::data_stream::Chunk> for livekit_protocol::data_stream::Chunk {
livekit_protocol::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete.unwrap_or(false),
chunk_index: msg.chunk_index,
version: msg.version.unwrap_or(0),
iv: msg.iv,
}
}
}

impl From<livekit_protocol::data_stream::Trailer> for proto::data_stream::Trailer {
fn from(msg: livekit_protocol::data_stream::Trailer) -> Self {
Self { stream_id: msg.stream_id, reason: msg.reason, extensions: msg.extensions }
}
}

impl From<proto::data_stream::Trailer> for livekit_protocol::data_stream::Trailer {
fn from(msg: proto::data_stream::Trailer) -> Self {
Self { stream_id: msg.stream_id, reason: msg.reason, extensions: msg.extensions }
}
}
Loading
Loading