From 73cd6b3f69d9a841a09805ae559e2d09f4e51580 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 6 Feb 2026 16:26:42 -0800 Subject: [PATCH 1/6] add multi input stream and refactor --- agents/src/stream/index.ts | 1 + agents/src/stream/multi_input_stream.test.ts | 552 +++++++++++++++++++ agents/src/stream/multi_input_stream.ts | 163 ++++++ agents/src/voice/io.ts | 7 +- agents/src/voice/recorder_io/recorder_io.ts | 2 +- agents/src/voice/room_io/_input.ts | 17 +- 6 files changed, 731 insertions(+), 11 deletions(-) create mode 100644 agents/src/stream/multi_input_stream.test.ts create mode 100644 agents/src/stream/multi_input_stream.ts diff --git a/agents/src/stream/index.ts b/agents/src/stream/index.ts index 13aeecd6f..26b3f97b3 100644 --- a/agents/src/stream/index.ts +++ b/agents/src/stream/index.ts @@ -4,4 +4,5 @@ export { DeferredReadableStream } from './deferred_stream.js'; export { IdentityTransform } from './identity_transform.js'; export { mergeReadableStreams } from './merge_readable_streams.js'; +export { MultiInputStream } from './multi_input_stream.js'; export { createStreamChannel, type StreamChannel } from './stream_channel.js'; diff --git a/agents/src/stream/multi_input_stream.test.ts b/agents/src/stream/multi_input_stream.test.ts new file mode 100644 index 000000000..4f9963194 --- /dev/null +++ b/agents/src/stream/multi_input_stream.test.ts @@ -0,0 +1,552 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { ReadableStream } from 'node:stream/web'; +import { describe, expect, it } from 'vitest'; +import { delay } from '../utils.js'; +import { MultiInputStream } from './multi_input_stream.js'; + +function streamFrom(values: T[]): ReadableStream { + return new ReadableStream({ + start(controller) { + for (const v of values) controller.enqueue(v); + controller.close(); + }, + }); +} + +async function collectAll(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const results: T[] = []; + let result = await reader.read(); + while (!result.done) { + results.push(result.value); + result = await reader.read(); + } + reader.releaseLock(); + return results; +} + +describe('MultiInputStream', () => { + // --------------------------------------------------------------------------- + // Basic functionality + // --------------------------------------------------------------------------- + + it('should create a readable output stream', () => { + const multi = new MultiInputStream(); + expect(multi.stream).toBeInstanceOf(ReadableStream); + expect(multi.inputCount).toBe(0); + expect(multi.isClosed).toBe(false); + }); + + it('should read data from a single input stream', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom(['a', 'b', 'c'])); + + const results: string[] = []; + // Read three values then close manually (output stays open after input ends). + for (let i = 0; i < 3; i++) { + const { value } = await reader.read(); + results.push(value!); + } + + expect(results).toEqual(['a', 'b', 'c']); + reader.releaseLock(); + await multi.close(); + }); + + it('should merge data from multiple input streams', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom([1, 2])); + multi.addInputStream(streamFrom([3, 4])); + + const results: number[] = []; + for (let i = 0; i < 4; i++) { + const { value } = await reader.read(); + results.push(value!); + } + + // Order is non-deterministic but all values must arrive. + expect(results.sort()).toEqual([1, 2, 3, 4]); + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Dynamic add / remove + // --------------------------------------------------------------------------- + + it('should allow adding inputs dynamically while reading', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom(['first'])); + + const r1 = await reader.read(); + expect(r1.value).toBe('first'); + + // Add a second input after reading from the first. + multi.addInputStream(streamFrom(['second'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('second'); + + reader.releaseLock(); + await multi.close(); + }); + + it('should continue reading from remaining inputs after removing one', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // A slow stream that emits over time. + const slowSource = new ReadableStream({ + async start(controller) { + controller.enqueue('slow-1'); + await delay(50); + controller.enqueue('slow-2'); + await delay(50); + controller.enqueue('slow-3'); + controller.close(); + }, + }); + + const slowId = multi.addInputStream(slowSource); + + // Read first value from slow source. + const r1 = await reader.read(); + expect(r1.value).toBe('slow-1'); + + // Remove the slow source and add a fast one. + await multi.removeInputStream(slowId); + + multi.addInputStream(streamFrom(['fast-1', 'fast-2'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('fast-1'); + + const r3 = await reader.read(); + expect(r3.value).toBe('fast-2'); + + reader.releaseLock(); + await multi.close(); + }); + + it('should handle swapping inputs (remove then add)', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const id1 = multi.addInputStream(streamFrom(['from-A'])); + + const r1 = await reader.read(); + expect(r1.value).toBe('from-A'); + + await multi.removeInputStream(id1); + + const id2 = multi.addInputStream(streamFrom(['from-B'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('from-B'); + + await multi.removeInputStream(id2); + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Reading before any input is added + // --------------------------------------------------------------------------- + + it('should keep reader awaiting until an input is added', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + let readCompleted = false; + const readPromise = reader.read().then((result) => { + readCompleted = true; + return result; + }); + + await delay(50); + expect(readCompleted).toBe(false); + + // Now add an input to unblock the read. + multi.addInputStream(streamFrom(['hello'])); + + const result = await readPromise; + expect(readCompleted).toBe(true); + expect(result.value).toBe('hello'); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Empty input streams + // --------------------------------------------------------------------------- + + it('should handle empty input streams without closing the output', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // Add an empty stream — it should end immediately without affecting the output. + multi.addInputStream(streamFrom([])); + + await delay(20); + + // The output should still be open. Adding a real input should work. + multi.addInputStream(streamFrom(['data'])); + + const result = await reader.read(); + expect(result.value).toBe('data'); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Error handling + // --------------------------------------------------------------------------- + + it('should remove errored input without killing the output', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // An input that errors after emitting one value. + const errorSource = new ReadableStream({ + async start(controller) { + controller.enqueue('before-error'); + await delay(20); + controller.error(new Error('boom')); + }, + }); + + multi.addInputStream(errorSource); + + const r1 = await reader.read(); + expect(r1.value).toBe('before-error'); + + // Wait for the error to propagate and the input to be removed. + await delay(50); + + expect(multi.inputCount).toBe(0); + + // The output is still alive — we can add another input. + multi.addInputStream(streamFrom(['after-error'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('after-error'); + + reader.releaseLock(); + await multi.close(); + }); + + it('should keep other inputs alive when one errors', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const goodSource = new ReadableStream({ + async start(controller) { + await delay(60); + controller.enqueue('good'); + controller.close(); + }, + }); + + const badSource = new ReadableStream({ + async start(controller) { + controller.error(new Error('bad')); + }, + }); + + multi.addInputStream(goodSource); + multi.addInputStream(badSource); + + // Wait a bit for the bad source to error and be removed. + await delay(10); + + // The good source should still be pumping. + const result = await reader.read(); + expect(result.value).toBe('good'); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Close semantics + // --------------------------------------------------------------------------- + + it('should end the output stream with done:true when close is called', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom(['data'])); + + const r1 = await reader.read(); + expect(r1.value).toBe('data'); + + await multi.close(); + + const r2 = await reader.read(); + expect(r2.done).toBe(true); + expect(r2.value).toBeUndefined(); + + reader.releaseLock(); + }); + + it('should resolve pending reads as done when close is called', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // No inputs — read will be pending. + const readPromise = reader.read(); + + await delay(10); + await multi.close(); + + const result = await readPromise; + expect(result.done).toBe(true); + expect(result.value).toBeUndefined(); + + reader.releaseLock(); + }); + + it('should be idempotent for multiple close calls', async () => { + const multi = new MultiInputStream(); + + await multi.close(); + await multi.close(); + + expect(multi.isClosed).toBe(true); + }); + + it('should throw when adding input after close', async () => { + const multi = new MultiInputStream(); + await multi.close(); + + expect(() => multi.addInputStream(streamFrom(['x']))).toThrow('MultiInputStream is closed'); + }); + + // --------------------------------------------------------------------------- + // removeInputStream edge cases + // --------------------------------------------------------------------------- + + it('should no-op when removing a non-existent input', async () => { + const multi = new MultiInputStream(); + + // Should not throw. + await multi.removeInputStream('does-not-exist'); + + await multi.close(); + }); + + it('should release the source reader lock so the source can be reused', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const source = new ReadableStream({ + async start(controller) { + controller.enqueue('chunk-0'); + await delay(30); + controller.enqueue('chunk-1'); + controller.close(); + }, + }); + + const id = multi.addInputStream(source); + + const r1 = await reader.read(); + expect(r1.value).toBe('chunk-0'); + + await multi.removeInputStream(id); + + // The source's reader lock should be released — we can get a new reader. + const sourceReader = source.getReader(); + const sr = await sourceReader.read(); + expect(sr.value).toBe('chunk-1'); + sourceReader.releaseLock(); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Input count tracking + // --------------------------------------------------------------------------- + + it('should track inputCount correctly through add / remove / natural end', async () => { + const multi = new MultiInputStream(); + + expect(multi.inputCount).toBe(0); + + const id1 = multi.addInputStream(streamFrom(['a'])); + const id2 = multi.addInputStream(streamFrom(['b'])); + + expect(multi.inputCount).toBe(2); + + await multi.removeInputStream(id1); + expect(multi.inputCount).toBeLessThanOrEqual(1); + + // Let the remaining stream finish. + await delay(20); + expect(multi.inputCount).toBe(0); + + await multi.removeInputStream(id2); // already gone, no-op + expect(multi.inputCount).toBe(0); + + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Concurrent reads and writes + // --------------------------------------------------------------------------- + + it('should handle concurrent reads and slow writes', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const chunks = ['a', 'b', 'c', 'd', 'e']; + let idx = 0; + + const source = new ReadableStream({ + start(controller) { + const writeNext = () => { + if (idx < chunks.length) { + controller.enqueue(chunks[idx++]); + setTimeout(writeNext, 5); + } else { + controller.close(); + } + }; + writeNext(); + }, + }); + + multi.addInputStream(source); + + const results: string[] = []; + for (let i = 0; i < chunks.length; i++) { + const { value } = await reader.read(); + results.push(value!); + } + + expect(results).toEqual(chunks); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Backpressure + // --------------------------------------------------------------------------- + + it('should handle backpressure with large data', async () => { + const multi = new MultiInputStream(); + + const largeChunks = Array.from({ length: 1000 }, (_, i) => `chunk-${i}`); + multi.addInputStream(streamFrom(largeChunks)); + + const reader = multi.stream.getReader(); + const results: string[] = []; + + let result = await reader.read(); + while (!result.done) { + results.push(result.value); + // Check if we've collected all expected values before reading again, + // to avoid hanging on the output which stays open after input ends. + if (results.length === largeChunks.length) break; + result = await reader.read(); + } + + expect(results).toEqual(largeChunks); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Multiple tee / concurrent consumers + // --------------------------------------------------------------------------- + + it('should support tee on the output stream', async () => { + const multi = new MultiInputStream(); + + const [s1, s2] = multi.stream.tee(); + const r1 = s1.getReader(); + const r2 = s2.getReader(); + + multi.addInputStream(streamFrom([10, 20])); + + const [a1, a2] = await Promise.all([r1.read(), r2.read()]); + expect(a1.value).toBe(10); + expect(a2.value).toBe(10); + + const [b1, b2] = await Promise.all([r1.read(), r2.read()]); + expect(b1.value).toBe(20); + expect(b2.value).toBe(20); + + r1.releaseLock(); + r2.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Return value of addInputStream + // --------------------------------------------------------------------------- + + it('should return unique IDs from addInputStream', () => { + const multi = new MultiInputStream(); + + const id1 = multi.addInputStream(streamFrom(['a'])); + const id2 = multi.addInputStream(streamFrom(['b'])); + const id3 = multi.addInputStream(streamFrom(['c'])); + + expect(id1).not.toBe(id2); + expect(id2).not.toBe(id3); + expect(id1).not.toBe(id3); + }); + + // --------------------------------------------------------------------------- + // close() while pumps are actively writing + // --------------------------------------------------------------------------- + + it('should cleanly close while pumps are actively writing', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // A source that never stops on its own. + const infiniteSource = new ReadableStream({ + async start(controller) { + let i = 0; + while (true) { + try { + controller.enqueue(`tick-${i++}`); + } catch { + // controller.enqueue throws after stream is canceled + break; + } + await delay(5); + } + }, + }); + + multi.addInputStream(infiniteSource); + + // Read a couple of values. + const r1 = await reader.read(); + expect(r1.done).toBe(false); + + // Close while the infinite source is still pumping. + await multi.close(); + + const r2 = await reader.read(); + expect(r2.done).toBe(true); + + reader.releaseLock(); + }); +}); diff --git a/agents/src/stream/multi_input_stream.ts b/agents/src/stream/multi_input_stream.ts new file mode 100644 index 000000000..f97758b00 --- /dev/null +++ b/agents/src/stream/multi_input_stream.ts @@ -0,0 +1,163 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { + ReadableStream, + ReadableStreamDefaultReader, + WritableStreamDefaultWriter, +} from 'node:stream/web'; +import { isStreamReaderReleaseError } from './deferred_stream.js'; +import { IdentityTransform } from './identity_transform.js'; + +/** + * A fan-in multiplexer that merges multiple {@link ReadableStream} inputs into + * a single output {@link ReadableStream}. Inputs can be dynamically added and + * removed at any time while the stream is open. + * + * Unlike {@link DeferredReadableStream} which supports a single readable source, + * `MultiInputStream` allows N concurrent input streams to pump data into one output. + * + * Key behaviors: + * - An error in one input removes that input but does **not** kill the output. + * - When all inputs end or are removed, the output stays open (waiting for new inputs). + * - The output only closes when {@link close} is called explicitly. + * - {@link removeInputStream} releases the reader lock so the source can be reused. + */ +export class MultiInputStream { + private transform: IdentityTransform; + private writer: WritableStreamDefaultWriter; + private inputs: Map> = new Map(); + private pumpPromises: Map> = new Map(); + private nextId = 0; + private _closed = false; + + constructor() { + this.transform = new IdentityTransform(); + this.writer = this.transform.writable.getWriter(); + } + + /** The single output stream that consumers read from. */ + get stream(): ReadableStream { + return this.transform.readable; + } + + /** Number of currently active input streams. */ + get inputCount(): number { + return this.inputs.size; + } + + /** Whether {@link close} has been called. */ + get isClosed(): boolean { + return this._closed; + } + + /** + * Add an input {@link ReadableStream} that will be pumped into the output. + * + * @returns A unique identifier that can be passed to {@link removeInputStream}. + * @throws If the stream has already been closed. + */ + addInputStream(source: ReadableStream): string { + if (this._closed) { + throw new Error('MultiInputStream is closed'); + } + + const id = `input-${this.nextId++}`; + const reader = source.getReader(); + this.inputs.set(id, reader); + + const pumpDone = this.pumpInput(id, reader); + this.pumpPromises.set(id, pumpDone); + + return id; + } + + /** + * Detach an input stream by its ID and release the reader lock so the + * source stream can be reused elsewhere. + * + * No-op if the ID does not exist (e.g. the input already ended or was removed). + */ + async removeInputStream(id: string): Promise { + const reader = this.inputs.get(id); + if (!reader) return; + + // Delete first so the pump's finally-block is a harmless no-op. + this.inputs.delete(id); + + // Releasing the lock causes any pending reader.read() inside pump to throw + // a TypeError, which is caught by isStreamReaderReleaseError. + reader.releaseLock(); + + // Wait for the pump to finish so the caller knows cleanup is complete. + const pump = this.pumpPromises.get(id); + if (pump) { + await pump; + this.pumpPromises.delete(id); + } + } + + /** + * Close the output stream and detach all inputs. + * + * Idempotent — calling more than once is a no-op. + */ + async close(): Promise { + if (this._closed) return; + this._closed = true; + + // Release every input reader to unblock pending reads inside pumps. + for (const reader of this.inputs.values()) { + reader.releaseLock(); + } + this.inputs.clear(); + + // Wait for every pump loop to finish before touching the writer. + await Promise.allSettled([...this.pumpPromises.values()]); + this.pumpPromises.clear(); + + // Close the output writer + writable side of the transform. + try { + this.writer.releaseLock(); + } catch { + // ignore if already released + } + + try { + await this.transform.writable.close(); + } catch { + // ignore if already closed + } + } + + private shouldStopPumping(id: string): boolean { + return this._closed || !this.inputs.has(id); + } + + private async pumpInput(id: string, reader: ReadableStreamDefaultReader): Promise { + try { + while (true) { + // If the stream was closed or the input was removed while we were + // awaiting the previous write, bail out immediately. + if (this.shouldStopPumping(id)) break; + + const { done, value } = await reader.read(); + if (done) break; + + // Double-check after the (potentially long) read. + if (this.shouldStopPumping(id)) break; + + await this.writer.write(value); + } + } catch (e) { + // TypeErrors from releaseLock() during removeInputStream / close are expected. + if (!isStreamReaderReleaseError(e)) { + // For any other error we silently remove the input — the output stays alive. + // (Contrast with DeferredReadableStream which propagates errors to the output.) + } + } finally { + this.inputs.delete(id); + this.pumpPromises.delete(id); + } + } +} diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index 02ec22ecd..6cdfe718e 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -8,7 +8,7 @@ import type { ChatContext } from '../llm/chat_context.js'; import type { ChatChunk } from '../llm/llm.js'; import type { ToolContext } from '../llm/tool_context.js'; import { log } from '../log.js'; -import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { MultiInputStream } from '../stream/multi_input_stream.js'; import type { SpeechEvent } from '../stt/stt.js'; import { Future } from '../utils.js'; import type { ModelSettings } from './agent.js'; @@ -84,11 +84,10 @@ export interface AudioOutputCapabilities { } export abstract class AudioInput { - protected deferredStream: DeferredReadableStream = - new DeferredReadableStream(); + protected multiStream: MultiInputStream = new MultiInputStream(); get stream(): ReadableStream { - return this.deferredStream.stream; + return this.multiStream.stream; } onAttached(): void {} diff --git a/agents/src/voice/recorder_io/recorder_io.ts b/agents/src/voice/recorder_io/recorder_io.ts index 8f478ff55..20131aee4 100644 --- a/agents/src/voice/recorder_io/recorder_io.ts +++ b/agents/src/voice/recorder_io/recorder_io.ts @@ -378,7 +378,7 @@ class RecorderAudioInput extends AudioInput { this.source = source; // Set up the intercepting stream - this.deferredStream.setSource(this.createInterceptingStream()); + this.multiStream.addInputStream(this.createInterceptingStream()); } /** diff --git a/agents/src/voice/room_io/_input.ts b/agents/src/voice/room_io/_input.ts index 6df90c4e6..1234249f9 100644 --- a/agents/src/voice/room_io/_input.ts +++ b/agents/src/voice/room_io/_input.ts @@ -1,9 +1,10 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { type AudioFrame, FrameProcessor } from '@livekit/rtc-node'; import { + type AudioFrame, AudioStream, + FrameProcessor, type NoiseCancellationOptions, RemoteParticipant, type RemoteTrack, @@ -25,7 +26,9 @@ export class ParticipantAudioInputStream extends AudioInput { private frameProcessor?: FrameProcessor; private publication: RemoteTrackPublication | null = null; private participantIdentity: string | null = null; + private currentInputId: string | null = null; private logger = log(); + constructor({ room, sampleRate, @@ -119,8 +122,11 @@ export class ParticipantAudioInputStream extends AudioInput { }; private closeStream() { - if (this.deferredStream.isSourceSet) { - this.deferredStream.detachSource(); + if (this.currentInputId) { + this.multiStream.removeInputStream(this.currentInputId).catch((e) => { + this.logger.error({ error: e }, 'Error removing input stream'); + }); + this.currentInputId = null; } this.frameProcessor?.close(); @@ -143,7 +149,7 @@ export class ParticipantAudioInputStream extends AudioInput { } this.closeStream(); this.publication = publication; - this.deferredStream.setSource( + this.currentInputId = this.multiStream.addInputStream( resampleStream({ stream: this.createStream(track), outputRate: this.sampleRate, @@ -184,7 +190,6 @@ export class ParticipantAudioInputStream extends AudioInput { this.room.off(RoomEvent.TrackUnpublished, this.onTrackUnpublished); this.room.off(RoomEvent.TokenRefreshed, this.onTokenRefreshed); this.closeStream(); - // Ignore errors - stream may be locked by RecorderIO or already cancelled - await this.deferredStream.stream.cancel().catch(() => {}); + await this.multiStream.close(); } } From 746822bac0e0cbac8d7cf95ecb067153eaa55640 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 6 Feb 2026 16:39:23 -0800 Subject: [PATCH 2/6] explicitly close multi stream --- agents/src/voice/io.ts | 4 ++++ agents/src/voice/recorder_io/recorder_io.ts | 1 + agents/src/voice/room_io/_input.ts | 8 +++----- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index 6cdfe718e..ff5d8a8b1 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -90,6 +90,10 @@ export abstract class AudioInput { return this.multiStream.stream; } + async close(): Promise { + await this.multiStream.close(); + } + onAttached(): void {} onDetached(): void {} diff --git a/agents/src/voice/recorder_io/recorder_io.ts b/agents/src/voice/recorder_io/recorder_io.ts index 20131aee4..72b363e90 100644 --- a/agents/src/voice/recorder_io/recorder_io.ts +++ b/agents/src/voice/recorder_io/recorder_io.ts @@ -105,6 +105,7 @@ export class RecorderIO { await this.outChan.close(); await this.closeFuture.await; await cancelAndWait([this.forwardTask!, this.encodeTask!]); + await this.inRecord?.close(); this.started = false; } finally { diff --git a/agents/src/voice/room_io/_input.ts b/agents/src/voice/room_io/_input.ts index 1234249f9..d6430d046 100644 --- a/agents/src/voice/room_io/_input.ts +++ b/agents/src/voice/room_io/_input.ts @@ -123,9 +123,7 @@ export class ParticipantAudioInputStream extends AudioInput { private closeStream() { if (this.currentInputId) { - this.multiStream.removeInputStream(this.currentInputId).catch((e) => { - this.logger.error({ error: e }, 'Error removing input stream'); - }); + void this.multiStream.removeInputStream(this.currentInputId); this.currentInputId = null; } @@ -185,11 +183,11 @@ export class ParticipantAudioInputStream extends AudioInput { }) as unknown as ReadableStream; } - async close() { + override async close() { this.room.off(RoomEvent.TrackSubscribed, this.onTrackSubscribed); this.room.off(RoomEvent.TrackUnpublished, this.onTrackUnpublished); this.room.off(RoomEvent.TokenRefreshed, this.onTokenRefreshed); this.closeStream(); - await this.multiStream.close(); + await super.close(); } } From cc7bd25786b84fa4a30e114bf3fb7d693826885c Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 6 Feb 2026 16:41:49 -0800 Subject: [PATCH 3/6] Create neat-parents-develop.md --- .changeset/neat-parents-develop.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/neat-parents-develop.md diff --git a/.changeset/neat-parents-develop.md b/.changeset/neat-parents-develop.md new file mode 100644 index 000000000..ef419d1b4 --- /dev/null +++ b/.changeset/neat-parents-develop.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +feat: Create MultiInputStream API primitive From 2cd5b8ca31eab841a1d1d1b5ceb20c67a24ed903 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 6 Feb 2026 16:46:14 -0800 Subject: [PATCH 4/6] Update multi_input_stream.test.ts --- agents/src/stream/multi_input_stream.test.ts | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/agents/src/stream/multi_input_stream.test.ts b/agents/src/stream/multi_input_stream.test.ts index 4f9963194..cda78b62b 100644 --- a/agents/src/stream/multi_input_stream.test.ts +++ b/agents/src/stream/multi_input_stream.test.ts @@ -15,18 +15,6 @@ function streamFrom(values: T[]): ReadableStream { }); } -async function collectAll(stream: ReadableStream): Promise { - const reader = stream.getReader(); - const results: T[] = []; - let result = await reader.read(); - while (!result.done) { - results.push(result.value); - result = await reader.read(); - } - reader.releaseLock(); - return results; -} - describe('MultiInputStream', () => { // --------------------------------------------------------------------------- // Basic functionality From 80b56beec599ea52dbce53c78386844a498c89c7 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Tue, 17 Feb 2026 13:22:21 -0800 Subject: [PATCH 5/6] Update multi_input_stream.ts --- agents/src/stream/multi_input_stream.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/agents/src/stream/multi_input_stream.ts b/agents/src/stream/multi_input_stream.ts index f97758b00..fd4abcd0d 100644 --- a/agents/src/stream/multi_input_stream.ts +++ b/agents/src/stream/multi_input_stream.ts @@ -6,6 +6,7 @@ import type { ReadableStreamDefaultReader, WritableStreamDefaultWriter, } from 'node:stream/web'; +import { log } from '../log.js'; import { isStreamReaderReleaseError } from './deferred_stream.js'; import { IdentityTransform } from './identity_transform.js'; @@ -30,6 +31,7 @@ export class MultiInputStream { private pumpPromises: Map> = new Map(); private nextId = 0; private _closed = false; + private logger = log(); constructor() { this.transform = new IdentityTransform(); @@ -154,8 +156,17 @@ export class MultiInputStream { if (!isStreamReaderReleaseError(e)) { // For any other error we silently remove the input — the output stays alive. // (Contrast with DeferredReadableStream which propagates errors to the output.) + return; } + + this.logger.error({ error: e }, 'Error pumping input stream from MultiInputStream'); } finally { + try { + reader.releaseLock(); + } catch { + // ignore if already released + } + this.inputs.delete(id); this.pumpPromises.delete(id); } From 70f784097afb59c4c6fd9018d035027b09a460dd Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Tue, 17 Feb 2026 13:31:52 -0800 Subject: [PATCH 6/6] Update multi_input_stream.ts --- agents/src/stream/multi_input_stream.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/agents/src/stream/multi_input_stream.ts b/agents/src/stream/multi_input_stream.ts index fd4abcd0d..99f03dfc4 100644 --- a/agents/src/stream/multi_input_stream.ts +++ b/agents/src/stream/multi_input_stream.ts @@ -153,9 +153,7 @@ export class MultiInputStream { } } catch (e) { // TypeErrors from releaseLock() during removeInputStream / close are expected. - if (!isStreamReaderReleaseError(e)) { - // For any other error we silently remove the input — the output stays alive. - // (Contrast with DeferredReadableStream which propagates errors to the output.) + if (isStreamReaderReleaseError(e)) { return; }