From b333bd02358d900788b5b393404bf7a59aa6d2da Mon Sep 17 00:00:00 2001 From: Long Chen Date: Tue, 25 Feb 2025 16:09:35 +0800 Subject: [PATCH 1/2] check data stream closed --- livekit-rtc/livekit/rtc/data_stream.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 9e994805..59f3fe53 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -166,6 +166,7 @@ def __init__( self._next_chunk_index: int = 0 self._destination_identities = destination_identities self._sender_identity = sender_identity or self._local_participant.identity + self._closed = False async def _send_header(self): req = proto_ffi.FfiRequest( @@ -191,6 +192,8 @@ async def _send_header(self): raise ConnectionError(cb.send_stream_header.error) async def _send_chunk(self, chunk: proto_DataStream.Chunk): + if self._closed: + raise RuntimeError(f"Cannot send chunk after stream is closed: {chunk}") req = proto_ffi.FfiRequest( send_stream_chunk=proto_room.SendStreamChunkRequest( chunk=chunk, @@ -214,6 +217,8 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk): raise ConnectionError(cb.send_stream_chunk.error) async def _send_trailer(self, trailer: proto_DataStream.Trailer): + if self._closed: + raise RuntimeError(f"Cannot send trailer after stream is closed: {trailer}") req = proto_ffi.FfiRequest( send_stream_trailer=proto_room.SendStreamTrailerRequest( trailer=trailer, @@ -235,6 +240,8 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): if cb.send_stream_chunk.error: raise ConnectionError(cb.send_stream_trailer.error) + self._closed = True + async def aclose( self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None ): From f8a4d93745430e5890c2b7ff8051a3db032459be Mon Sep 17 00:00:00 2001 From: Long Chen Date: Tue, 25 Feb 2025 19:07:41 +0800 Subject: [PATCH 2/2] check in aclose --- livekit-rtc/livekit/rtc/data_stream.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 59f3fe53..4e8054de 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -217,8 +217,6 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk): raise ConnectionError(cb.send_stream_chunk.error) async def _send_trailer(self, trailer: proto_DataStream.Trailer): - if self._closed: - raise RuntimeError(f"Cannot send trailer after stream is closed: {trailer}") req = proto_ffi.FfiRequest( send_stream_trailer=proto_room.SendStreamTrailerRequest( trailer=trailer, @@ -240,11 +238,12 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): if cb.send_stream_chunk.error: raise ConnectionError(cb.send_stream_trailer.error) - self._closed = True - async def aclose( self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None ): + if self._closed: + raise RuntimeError("Stream already closed") + self._closed = True await self._send_trailer( trailer=proto_DataStream.Trailer( stream_id=self._header.stream_id, reason=reason, attributes=attributes