diff --git a/.changeset/neat-parents-develop.md b/.changeset/neat-parents-develop.md new file mode 100644 index 000000000..ef419d1b4 --- /dev/null +++ b/.changeset/neat-parents-develop.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +feat: Create MultiInputStream API primitive diff --git a/agents/src/stream/index.ts b/agents/src/stream/index.ts index 13aeecd6f..26b3f97b3 100644 --- a/agents/src/stream/index.ts +++ b/agents/src/stream/index.ts @@ -4,4 +4,5 @@ export { DeferredReadableStream } from './deferred_stream.js'; export { IdentityTransform } from './identity_transform.js'; export { mergeReadableStreams } from './merge_readable_streams.js'; +export { MultiInputStream } from './multi_input_stream.js'; export { createStreamChannel, type StreamChannel } from './stream_channel.js'; diff --git a/agents/src/stream/multi_input_stream.test.ts b/agents/src/stream/multi_input_stream.test.ts new file mode 100644 index 000000000..cda78b62b --- /dev/null +++ b/agents/src/stream/multi_input_stream.test.ts @@ -0,0 +1,540 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { ReadableStream } from 'node:stream/web'; +import { describe, expect, it } from 'vitest'; +import { delay } from '../utils.js'; +import { MultiInputStream } from './multi_input_stream.js'; + +function streamFrom(values: T[]): ReadableStream { + return new ReadableStream({ + start(controller) { + for (const v of values) controller.enqueue(v); + controller.close(); + }, + }); +} + +describe('MultiInputStream', () => { + // --------------------------------------------------------------------------- + // Basic functionality + // --------------------------------------------------------------------------- + + it('should create a readable output stream', () => { + const multi = new MultiInputStream(); + expect(multi.stream).toBeInstanceOf(ReadableStream); + expect(multi.inputCount).toBe(0); + expect(multi.isClosed).toBe(false); + }); + + it('should read data from a single input stream', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom(['a', 'b', 'c'])); + + const results: string[] = []; + // Read three values then close manually (output stays open after input ends). + for (let i = 0; i < 3; i++) { + const { value } = await reader.read(); + results.push(value!); + } + + expect(results).toEqual(['a', 'b', 'c']); + reader.releaseLock(); + await multi.close(); + }); + + it('should merge data from multiple input streams', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom([1, 2])); + multi.addInputStream(streamFrom([3, 4])); + + const results: number[] = []; + for (let i = 0; i < 4; i++) { + const { value } = await reader.read(); + results.push(value!); + } + + // Order is non-deterministic but all values must arrive. + expect(results.sort()).toEqual([1, 2, 3, 4]); + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Dynamic add / remove + // --------------------------------------------------------------------------- + + it('should allow adding inputs dynamically while reading', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom(['first'])); + + const r1 = await reader.read(); + expect(r1.value).toBe('first'); + + // Add a second input after reading from the first. + multi.addInputStream(streamFrom(['second'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('second'); + + reader.releaseLock(); + await multi.close(); + }); + + it('should continue reading from remaining inputs after removing one', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // A slow stream that emits over time. + const slowSource = new ReadableStream({ + async start(controller) { + controller.enqueue('slow-1'); + await delay(50); + controller.enqueue('slow-2'); + await delay(50); + controller.enqueue('slow-3'); + controller.close(); + }, + }); + + const slowId = multi.addInputStream(slowSource); + + // Read first value from slow source. + const r1 = await reader.read(); + expect(r1.value).toBe('slow-1'); + + // Remove the slow source and add a fast one. + await multi.removeInputStream(slowId); + + multi.addInputStream(streamFrom(['fast-1', 'fast-2'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('fast-1'); + + const r3 = await reader.read(); + expect(r3.value).toBe('fast-2'); + + reader.releaseLock(); + await multi.close(); + }); + + it('should handle swapping inputs (remove then add)', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const id1 = multi.addInputStream(streamFrom(['from-A'])); + + const r1 = await reader.read(); + expect(r1.value).toBe('from-A'); + + await multi.removeInputStream(id1); + + const id2 = multi.addInputStream(streamFrom(['from-B'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('from-B'); + + await multi.removeInputStream(id2); + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Reading before any input is added + // --------------------------------------------------------------------------- + + it('should keep reader awaiting until an input is added', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + let readCompleted = false; + const readPromise = reader.read().then((result) => { + readCompleted = true; + return result; + }); + + await delay(50); + expect(readCompleted).toBe(false); + + // Now add an input to unblock the read. + multi.addInputStream(streamFrom(['hello'])); + + const result = await readPromise; + expect(readCompleted).toBe(true); + expect(result.value).toBe('hello'); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Empty input streams + // --------------------------------------------------------------------------- + + it('should handle empty input streams without closing the output', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // Add an empty stream — it should end immediately without affecting the output. + multi.addInputStream(streamFrom([])); + + await delay(20); + + // The output should still be open. Adding a real input should work. + multi.addInputStream(streamFrom(['data'])); + + const result = await reader.read(); + expect(result.value).toBe('data'); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Error handling + // --------------------------------------------------------------------------- + + it('should remove errored input without killing the output', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // An input that errors after emitting one value. + const errorSource = new ReadableStream({ + async start(controller) { + controller.enqueue('before-error'); + await delay(20); + controller.error(new Error('boom')); + }, + }); + + multi.addInputStream(errorSource); + + const r1 = await reader.read(); + expect(r1.value).toBe('before-error'); + + // Wait for the error to propagate and the input to be removed. + await delay(50); + + expect(multi.inputCount).toBe(0); + + // The output is still alive — we can add another input. + multi.addInputStream(streamFrom(['after-error'])); + + const r2 = await reader.read(); + expect(r2.value).toBe('after-error'); + + reader.releaseLock(); + await multi.close(); + }); + + it('should keep other inputs alive when one errors', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const goodSource = new ReadableStream({ + async start(controller) { + await delay(60); + controller.enqueue('good'); + controller.close(); + }, + }); + + const badSource = new ReadableStream({ + async start(controller) { + controller.error(new Error('bad')); + }, + }); + + multi.addInputStream(goodSource); + multi.addInputStream(badSource); + + // Wait a bit for the bad source to error and be removed. + await delay(10); + + // The good source should still be pumping. + const result = await reader.read(); + expect(result.value).toBe('good'); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Close semantics + // --------------------------------------------------------------------------- + + it('should end the output stream with done:true when close is called', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + multi.addInputStream(streamFrom(['data'])); + + const r1 = await reader.read(); + expect(r1.value).toBe('data'); + + await multi.close(); + + const r2 = await reader.read(); + expect(r2.done).toBe(true); + expect(r2.value).toBeUndefined(); + + reader.releaseLock(); + }); + + it('should resolve pending reads as done when close is called', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // No inputs — read will be pending. + const readPromise = reader.read(); + + await delay(10); + await multi.close(); + + const result = await readPromise; + expect(result.done).toBe(true); + expect(result.value).toBeUndefined(); + + reader.releaseLock(); + }); + + it('should be idempotent for multiple close calls', async () => { + const multi = new MultiInputStream(); + + await multi.close(); + await multi.close(); + + expect(multi.isClosed).toBe(true); + }); + + it('should throw when adding input after close', async () => { + const multi = new MultiInputStream(); + await multi.close(); + + expect(() => multi.addInputStream(streamFrom(['x']))).toThrow('MultiInputStream is closed'); + }); + + // --------------------------------------------------------------------------- + // removeInputStream edge cases + // --------------------------------------------------------------------------- + + it('should no-op when removing a non-existent input', async () => { + const multi = new MultiInputStream(); + + // Should not throw. + await multi.removeInputStream('does-not-exist'); + + await multi.close(); + }); + + it('should release the source reader lock so the source can be reused', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const source = new ReadableStream({ + async start(controller) { + controller.enqueue('chunk-0'); + await delay(30); + controller.enqueue('chunk-1'); + controller.close(); + }, + }); + + const id = multi.addInputStream(source); + + const r1 = await reader.read(); + expect(r1.value).toBe('chunk-0'); + + await multi.removeInputStream(id); + + // The source's reader lock should be released — we can get a new reader. + const sourceReader = source.getReader(); + const sr = await sourceReader.read(); + expect(sr.value).toBe('chunk-1'); + sourceReader.releaseLock(); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Input count tracking + // --------------------------------------------------------------------------- + + it('should track inputCount correctly through add / remove / natural end', async () => { + const multi = new MultiInputStream(); + + expect(multi.inputCount).toBe(0); + + const id1 = multi.addInputStream(streamFrom(['a'])); + const id2 = multi.addInputStream(streamFrom(['b'])); + + expect(multi.inputCount).toBe(2); + + await multi.removeInputStream(id1); + expect(multi.inputCount).toBeLessThanOrEqual(1); + + // Let the remaining stream finish. + await delay(20); + expect(multi.inputCount).toBe(0); + + await multi.removeInputStream(id2); // already gone, no-op + expect(multi.inputCount).toBe(0); + + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Concurrent reads and writes + // --------------------------------------------------------------------------- + + it('should handle concurrent reads and slow writes', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + const chunks = ['a', 'b', 'c', 'd', 'e']; + let idx = 0; + + const source = new ReadableStream({ + start(controller) { + const writeNext = () => { + if (idx < chunks.length) { + controller.enqueue(chunks[idx++]); + setTimeout(writeNext, 5); + } else { + controller.close(); + } + }; + writeNext(); + }, + }); + + multi.addInputStream(source); + + const results: string[] = []; + for (let i = 0; i < chunks.length; i++) { + const { value } = await reader.read(); + results.push(value!); + } + + expect(results).toEqual(chunks); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Backpressure + // --------------------------------------------------------------------------- + + it('should handle backpressure with large data', async () => { + const multi = new MultiInputStream(); + + const largeChunks = Array.from({ length: 1000 }, (_, i) => `chunk-${i}`); + multi.addInputStream(streamFrom(largeChunks)); + + const reader = multi.stream.getReader(); + const results: string[] = []; + + let result = await reader.read(); + while (!result.done) { + results.push(result.value); + // Check if we've collected all expected values before reading again, + // to avoid hanging on the output which stays open after input ends. + if (results.length === largeChunks.length) break; + result = await reader.read(); + } + + expect(results).toEqual(largeChunks); + + reader.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Multiple tee / concurrent consumers + // --------------------------------------------------------------------------- + + it('should support tee on the output stream', async () => { + const multi = new MultiInputStream(); + + const [s1, s2] = multi.stream.tee(); + const r1 = s1.getReader(); + const r2 = s2.getReader(); + + multi.addInputStream(streamFrom([10, 20])); + + const [a1, a2] = await Promise.all([r1.read(), r2.read()]); + expect(a1.value).toBe(10); + expect(a2.value).toBe(10); + + const [b1, b2] = await Promise.all([r1.read(), r2.read()]); + expect(b1.value).toBe(20); + expect(b2.value).toBe(20); + + r1.releaseLock(); + r2.releaseLock(); + await multi.close(); + }); + + // --------------------------------------------------------------------------- + // Return value of addInputStream + // --------------------------------------------------------------------------- + + it('should return unique IDs from addInputStream', () => { + const multi = new MultiInputStream(); + + const id1 = multi.addInputStream(streamFrom(['a'])); + const id2 = multi.addInputStream(streamFrom(['b'])); + const id3 = multi.addInputStream(streamFrom(['c'])); + + expect(id1).not.toBe(id2); + expect(id2).not.toBe(id3); + expect(id1).not.toBe(id3); + }); + + // --------------------------------------------------------------------------- + // close() while pumps are actively writing + // --------------------------------------------------------------------------- + + it('should cleanly close while pumps are actively writing', async () => { + const multi = new MultiInputStream(); + const reader = multi.stream.getReader(); + + // A source that never stops on its own. + const infiniteSource = new ReadableStream({ + async start(controller) { + let i = 0; + while (true) { + try { + controller.enqueue(`tick-${i++}`); + } catch { + // controller.enqueue throws after stream is canceled + break; + } + await delay(5); + } + }, + }); + + multi.addInputStream(infiniteSource); + + // Read a couple of values. + const r1 = await reader.read(); + expect(r1.done).toBe(false); + + // Close while the infinite source is still pumping. + await multi.close(); + + const r2 = await reader.read(); + expect(r2.done).toBe(true); + + reader.releaseLock(); + }); +}); diff --git a/agents/src/stream/multi_input_stream.ts b/agents/src/stream/multi_input_stream.ts new file mode 100644 index 000000000..99f03dfc4 --- /dev/null +++ b/agents/src/stream/multi_input_stream.ts @@ -0,0 +1,172 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { + ReadableStream, + ReadableStreamDefaultReader, + WritableStreamDefaultWriter, +} from 'node:stream/web'; +import { log } from '../log.js'; +import { isStreamReaderReleaseError } from './deferred_stream.js'; +import { IdentityTransform } from './identity_transform.js'; + +/** + * A fan-in multiplexer that merges multiple {@link ReadableStream} inputs into + * a single output {@link ReadableStream}. Inputs can be dynamically added and + * removed at any time while the stream is open. + * + * Unlike {@link DeferredReadableStream} which supports a single readable source, + * `MultiInputStream` allows N concurrent input streams to pump data into one output. + * + * Key behaviors: + * - An error in one input removes that input but does **not** kill the output. + * - When all inputs end or are removed, the output stays open (waiting for new inputs). + * - The output only closes when {@link close} is called explicitly. + * - {@link removeInputStream} releases the reader lock so the source can be reused. + */ +export class MultiInputStream { + private transform: IdentityTransform; + private writer: WritableStreamDefaultWriter; + private inputs: Map> = new Map(); + private pumpPromises: Map> = new Map(); + private nextId = 0; + private _closed = false; + private logger = log(); + + constructor() { + this.transform = new IdentityTransform(); + this.writer = this.transform.writable.getWriter(); + } + + /** The single output stream that consumers read from. */ + get stream(): ReadableStream { + return this.transform.readable; + } + + /** Number of currently active input streams. */ + get inputCount(): number { + return this.inputs.size; + } + + /** Whether {@link close} has been called. */ + get isClosed(): boolean { + return this._closed; + } + + /** + * Add an input {@link ReadableStream} that will be pumped into the output. + * + * @returns A unique identifier that can be passed to {@link removeInputStream}. + * @throws If the stream has already been closed. + */ + addInputStream(source: ReadableStream): string { + if (this._closed) { + throw new Error('MultiInputStream is closed'); + } + + const id = `input-${this.nextId++}`; + const reader = source.getReader(); + this.inputs.set(id, reader); + + const pumpDone = this.pumpInput(id, reader); + this.pumpPromises.set(id, pumpDone); + + return id; + } + + /** + * Detach an input stream by its ID and release the reader lock so the + * source stream can be reused elsewhere. + * + * No-op if the ID does not exist (e.g. the input already ended or was removed). + */ + async removeInputStream(id: string): Promise { + const reader = this.inputs.get(id); + if (!reader) return; + + // Delete first so the pump's finally-block is a harmless no-op. + this.inputs.delete(id); + + // Releasing the lock causes any pending reader.read() inside pump to throw + // a TypeError, which is caught by isStreamReaderReleaseError. + reader.releaseLock(); + + // Wait for the pump to finish so the caller knows cleanup is complete. + const pump = this.pumpPromises.get(id); + if (pump) { + await pump; + this.pumpPromises.delete(id); + } + } + + /** + * Close the output stream and detach all inputs. + * + * Idempotent — calling more than once is a no-op. + */ + async close(): Promise { + if (this._closed) return; + this._closed = true; + + // Release every input reader to unblock pending reads inside pumps. + for (const reader of this.inputs.values()) { + reader.releaseLock(); + } + this.inputs.clear(); + + // Wait for every pump loop to finish before touching the writer. + await Promise.allSettled([...this.pumpPromises.values()]); + this.pumpPromises.clear(); + + // Close the output writer + writable side of the transform. + try { + this.writer.releaseLock(); + } catch { + // ignore if already released + } + + try { + await this.transform.writable.close(); + } catch { + // ignore if already closed + } + } + + private shouldStopPumping(id: string): boolean { + return this._closed || !this.inputs.has(id); + } + + private async pumpInput(id: string, reader: ReadableStreamDefaultReader): Promise { + try { + while (true) { + // If the stream was closed or the input was removed while we were + // awaiting the previous write, bail out immediately. + if (this.shouldStopPumping(id)) break; + + const { done, value } = await reader.read(); + if (done) break; + + // Double-check after the (potentially long) read. + if (this.shouldStopPumping(id)) break; + + await this.writer.write(value); + } + } catch (e) { + // TypeErrors from releaseLock() during removeInputStream / close are expected. + if (isStreamReaderReleaseError(e)) { + return; + } + + this.logger.error({ error: e }, 'Error pumping input stream from MultiInputStream'); + } finally { + try { + reader.releaseLock(); + } catch { + // ignore if already released + } + + this.inputs.delete(id); + this.pumpPromises.delete(id); + } + } +} diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index 02ec22ecd..ff5d8a8b1 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -8,7 +8,7 @@ import type { ChatContext } from '../llm/chat_context.js'; import type { ChatChunk } from '../llm/llm.js'; import type { ToolContext } from '../llm/tool_context.js'; import { log } from '../log.js'; -import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { MultiInputStream } from '../stream/multi_input_stream.js'; import type { SpeechEvent } from '../stt/stt.js'; import { Future } from '../utils.js'; import type { ModelSettings } from './agent.js'; @@ -84,11 +84,14 @@ export interface AudioOutputCapabilities { } export abstract class AudioInput { - protected deferredStream: DeferredReadableStream = - new DeferredReadableStream(); + protected multiStream: MultiInputStream = new MultiInputStream(); get stream(): ReadableStream { - return this.deferredStream.stream; + return this.multiStream.stream; + } + + async close(): Promise { + await this.multiStream.close(); } onAttached(): void {} diff --git a/agents/src/voice/recorder_io/recorder_io.ts b/agents/src/voice/recorder_io/recorder_io.ts index 8f478ff55..72b363e90 100644 --- a/agents/src/voice/recorder_io/recorder_io.ts +++ b/agents/src/voice/recorder_io/recorder_io.ts @@ -105,6 +105,7 @@ export class RecorderIO { await this.outChan.close(); await this.closeFuture.await; await cancelAndWait([this.forwardTask!, this.encodeTask!]); + await this.inRecord?.close(); this.started = false; } finally { @@ -378,7 +379,7 @@ class RecorderAudioInput extends AudioInput { this.source = source; // Set up the intercepting stream - this.deferredStream.setSource(this.createInterceptingStream()); + this.multiStream.addInputStream(this.createInterceptingStream()); } /** diff --git a/agents/src/voice/room_io/_input.ts b/agents/src/voice/room_io/_input.ts index 28c353fe7..6ede89e2f 100644 --- a/agents/src/voice/room_io/_input.ts +++ b/agents/src/voice/room_io/_input.ts @@ -1,9 +1,10 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { type AudioFrame, FrameProcessor } from '@livekit/rtc-node'; import { + type AudioFrame, AudioStream, + FrameProcessor, type NoiseCancellationOptions, RemoteParticipant, type RemoteTrack, @@ -25,7 +26,9 @@ export class ParticipantAudioInputStream extends AudioInput { private frameProcessor?: FrameProcessor; private publication: RemoteTrackPublication | null = null; private participantIdentity: string | null = null; + private currentInputId: string | null = null; private logger = log(); + constructor({ room, sampleRate, @@ -121,8 +124,9 @@ export class ParticipantAudioInputStream extends AudioInput { }; private closeStream() { - if (this.deferredStream.isSourceSet) { - this.deferredStream.detachSource(); + if (this.currentInputId) { + void this.multiStream.removeInputStream(this.currentInputId); + this.currentInputId = null; } this.publication = null; @@ -143,7 +147,7 @@ export class ParticipantAudioInputStream extends AudioInput { } this.closeStream(); this.publication = publication; - this.deferredStream.setSource( + this.currentInputId = this.multiStream.addInputStream( resampleStream({ stream: this.createStream(track), outputRate: this.sampleRate, @@ -179,14 +183,14 @@ export class ParticipantAudioInputStream extends AudioInput { }) as unknown as ReadableStream; } - async close() { + override async close() { this.room.off(RoomEvent.TrackSubscribed, this.onTrackSubscribed); this.room.off(RoomEvent.TrackUnpublished, this.onTrackUnpublished); this.room.off(RoomEvent.TokenRefreshed, this.onTokenRefreshed); this.closeStream(); + await super.close(); + this.frameProcessor?.close(); this.frameProcessor = undefined; - // Ignore errors - stream may be locked by RecorderIO or already cancelled - await this.deferredStream.stream.cancel().catch(() => {}); } }