Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions agents/src/stream/deferred_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class DeferredReadableStream<T> {
private transform: IdentityTransform<T>;
private writer: WritableStreamDefaultWriter<T>;
private sourceReader?: ReadableStreamDefaultReader<T>;
private writerClosed: boolean = false;

constructor() {
this.transform = new IdentityTransform<T>();
Expand All @@ -53,30 +54,46 @@ export class DeferredReadableStream<T> {

/**
* Call once the actual source is ready.
* Can be called again after detachSource() to attach a new source.
*/
setSource(source: ReadableStream<T>) {
if (this.isSourceSet) {
throw new Error('Stream source already set');
}

// If writer was closed from a previous source, recreate the transform
if (this.writerClosed) {
this.transform = new IdentityTransform<T>();
this.writer = this.transform.writable.getWriter();
this.writerClosed = false;
}

this.sourceReader = source.getReader();
this.pump();
}

private async pump() {
let sourceError: unknown;
let detached = false;

try {
while (true) {
const { done, value } = await this.sourceReader!.read();
// Check if source was detached (sourceReader set to undefined)
if (!this.sourceReader) {
detached = true;
break;
}
const { done, value } = await this.sourceReader.read();
if (done) break;
await this.writer.write(value);
}
} catch (e) {
// skip stream cleanup related errors
if (isStreamReaderReleaseError(e)) return;

sourceError = e;
// Skip stream cleanup related errors (happens when detachSource is called)
if (isStreamReaderReleaseError(e)) {
detached = true;
} else {
sourceError = e;
}
} finally {
// any other error from source will be propagated to the consumer
if (sourceError) {
Expand All @@ -85,6 +102,7 @@ export class DeferredReadableStream<T> {
} catch (e) {
// ignore if writer is already closed
}
this.writerClosed = true;
return;
}

Expand All @@ -98,19 +116,22 @@ export class DeferredReadableStream<T> {
// we only close the writable stream after done
try {
await this.transform.writable.close();
this.writerClosed = true;
// NOTE: we do not cancel this.transform.readable as there might be access to
// this.transform.readable.getReader() outside that blocks this cancellation
// hence, user is responsible for canceling reader on their own
} catch (e) {
// ignore TypeError: Invalid state: WritableStream is closed
// in case stream reader is already closed, this will throw
// but we ignore it as we are closing the stream anyway
this.writerClosed = true;
}
}
}

/**
* Detach the source stream and clean up resources.
* After calling this, setSource() can be called again with a new source.
*/
async detachSource() {
if (!this.isSourceSet) {
Expand All @@ -123,5 +144,9 @@ export class DeferredReadableStream<T> {
// using isStreamReaderReleaseError
// this will unblock any pending read() inside the async for loop
this.sourceReader!.releaseLock();

// Reset sourceReader so isSourceSet returns false
// This allows setSource() to be called again on reconnection
this.sourceReader = undefined;
}
}