@@ -166,6 +166,7 @@ def __init__(
166166 self._next_chunk_index: int = 0
167167 self._destination_identities = destination_identities
168168 self._sender_identity = sender_identity or self._local_participant.identity
169+ self._closed = False
169170
170171 async def _send_header(self):
171172 req = proto_ffi.FfiRequest(
@@ -191,6 +192,8 @@ async def _send_header(self):
191192 raise ConnectionError(cb.send_stream_header.error)
192193
193194 async def _send_chunk(self, chunk: proto_DataStream.Chunk):
195+ if self._closed:
196+ raise RuntimeError(f"Cannot send chunk after stream is closed: {chunk}")
194197 req = proto_ffi.FfiRequest(
195198 send_stream_chunk=proto_room.SendStreamChunkRequest(
196199 chunk=chunk,
@@ -214,6 +217,8 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk):
214217 raise ConnectionError(cb.send_stream_chunk.error)
215218
216219 async def _send_trailer(self, trailer: proto_DataStream.Trailer):
220+ if self._closed:
221+ raise RuntimeError(f"Cannot send trailer after stream is closed: {trailer}")
217222 req = proto_ffi.FfiRequest(
218223 send_stream_trailer=proto_room.SendStreamTrailerRequest(
219224 trailer=trailer,
@@ -235,6 +240,8 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer):
235240 if cb.send_stream_chunk.error:
236241 raise ConnectionError(cb.send_stream_trailer.error)
237242
243+ self._closed = True
244+
238245 async def aclose(
239246 self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None
240247 ):
0 commit comments