diff --git a/agents/src/stream/deferred_stream.ts b/agents/src/stream/deferred_stream.ts index 71a10c7e8..06527987f 100644 --- a/agents/src/stream/deferred_stream.ts +++ b/agents/src/stream/deferred_stream.ts @@ -37,6 +37,7 @@ export class DeferredReadableStream { private transform: IdentityTransform; private writer: WritableStreamDefaultWriter; private sourceReader?: ReadableStreamDefaultReader; + private writerClosed: boolean = false; constructor() { this.transform = new IdentityTransform(); @@ -53,30 +54,46 @@ export class DeferredReadableStream { /** * Call once the actual source is ready. + * Can be called again after detachSource() to attach a new source. */ setSource(source: ReadableStream) { if (this.isSourceSet) { throw new Error('Stream source already set'); } + // If writer was closed from a previous source, recreate the transform + if (this.writerClosed) { + this.transform = new IdentityTransform(); + this.writer = this.transform.writable.getWriter(); + this.writerClosed = false; + } + this.sourceReader = source.getReader(); this.pump(); } private async pump() { let sourceError: unknown; + let detached = false; try { while (true) { - const { done, value } = await this.sourceReader!.read(); + // Check if source was detached (sourceReader set to undefined) + if (!this.sourceReader) { + detached = true; + break; + } + const { done, value } = await this.sourceReader.read(); if (done) break; await this.writer.write(value); } } catch (e) { - // skip stream cleanup related errors - if (isStreamReaderReleaseError(e)) return; - - sourceError = e; + // Skip stream cleanup related errors (happens when detachSource is called) + if (isStreamReaderReleaseError(e)) { + detached = true; + } else { + sourceError = e; + } } finally { // any other error from source will be propagated to the consumer if (sourceError) { @@ -85,6 +102,7 @@ export class DeferredReadableStream { } catch (e) { // ignore if writer is already closed } + this.writerClosed = true; return; } @@ -98,6 +116,7 @@ export class DeferredReadableStream { // we only close the writable stream after done try { await this.transform.writable.close(); + this.writerClosed = true; // NOTE: we do not cancel this.transform.readable as there might be access to // this.transform.readable.getReader() outside that blocks this cancellation // hence, user is responsible for canceling reader on their own @@ -105,12 +124,14 @@ export class DeferredReadableStream { // ignore TypeError: Invalid state: WritableStream is closed // in case stream reader is already closed, this will throw // but we ignore it as we are closing the stream anyway + this.writerClosed = true; } } } /** * Detach the source stream and clean up resources. + * After calling this, setSource() can be called again with a new source. */ async detachSource() { if (!this.isSourceSet) { @@ -123,5 +144,9 @@ export class DeferredReadableStream { // using isStreamReaderReleaseError // this will unblock any pending read() inside the async for loop this.sourceReader!.releaseLock(); + + // Reset sourceReader so isSourceSet returns false + // This allows setSource() to be called again on reconnection + this.sourceReader = undefined; } }