From 9767294e668856fa7a0202015390540db882fc72 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 30 May 2025 11:13:01 +0200 Subject: [PATCH 1/9] wip injectable stream --- agents/package.json | 1 + agents/src/stream/identity_transform.ts | 1 - agents/src/stream/injectable_stream.ts | 36 +++++++++++++++++++++++++ package.json | 3 ++- 4 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 agents/src/stream/injectable_stream.ts diff --git a/agents/package.json b/agents/package.json index bec93945d..8268beace 100644 --- a/agents/package.json +++ b/agents/package.json @@ -41,6 +41,7 @@ "@livekit/mutex": "^1.1.1", "@livekit/protocol": "^1.29.1", "@livekit/typed-emitter": "^3.0.0", + "@std/streams": "jsr:^1.0.9", "commander": "^12.0.0", "livekit-server-sdk": "^2.9.2", "pino": "^8.19.0", diff --git a/agents/src/stream/identity_transform.ts b/agents/src/stream/identity_transform.ts index cb83f091f..c4f0abb2b 100644 --- a/agents/src/stream/identity_transform.ts +++ b/agents/src/stream/identity_transform.ts @@ -1,7 +1,6 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { TransformStream } from 'node:stream/web'; export class IdentityTransform extends TransformStream { constructor() { diff --git a/agents/src/stream/injectable_stream.ts b/agents/src/stream/injectable_stream.ts new file mode 100644 index 000000000..1d5f1cf51 --- /dev/null +++ b/agents/src/stream/injectable_stream.ts @@ -0,0 +1,36 @@ +import { mergeReadableStreams } from '@std/streams'; +import { IdentityTransform } from './identity_transform.js'; + +export class InjectableStream { + private source: ReadableStream; + private identityStream: IdentityTransform; + private mergedStream: ReadableStream; + + constructor(source: ReadableStream) { + this.source = source; + this.identityStream = new IdentityTransform(); + this.mergedStream = mergeReadableStreams(this.source, this.identityStream.readable); + } + + async inject(value: T) { + // note this will still fail for parallel writes + // we can acquire the writer in the constructor but this will lead to the problem with multiple sync loops blocking when trying to write + this.identityStream.writable.getWriter().write(value); + } + + async close() { + // this will not cancel the source stream but instead keep the readable open until the source finishes + await this.identityStream.writable.close(); + } + + async cancel(reason?: any) { + await Promise.all([ + this.mergedStream.cancel(reason), + this.identityStream.writable.abort(reason), + ]); + } + + get readable() { + return this.mergedStream; + } +} diff --git a/package.json b/package.json index 839271015..94f85b0c1 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "examples:minimal": "pnpm exec tsx examples/src/multimodal_agent.ts" }, "devDependencies": { + "@types/node": "^22.13.10", "@changesets/cli": "^2.27.1", "@livekit/changesets-changelog-github": "^0.0.4", "@rushstack/heft": "^0.66.0", @@ -44,5 +45,5 @@ "typescript": "^5.4.5", "vitest": "^1.6.0" }, - "packageManager": "pnpm@9.7.0" + "packageManager": "pnpm@10.11.0" } From 12127f1450f64d73b88ee60be2f2430991a9016f Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 30 May 2025 13:04:49 +0200 Subject: [PATCH 2/9] close writer --- agents/src/stream/injectable_stream.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/agents/src/stream/injectable_stream.ts b/agents/src/stream/injectable_stream.ts index 1d5f1cf51..3768a7535 100644 --- a/agents/src/stream/injectable_stream.ts +++ b/agents/src/stream/injectable_stream.ts @@ -15,7 +15,9 @@ export class InjectableStream { async inject(value: T) { // note this will still fail for parallel writes // we can acquire the writer in the constructor but this will lead to the problem with multiple sync loops blocking when trying to write - this.identityStream.writable.getWriter().write(value); + const writer = this.identityStream.writable.getWriter(); + await writer.write(value); + await writer.close(); } async close() { From 708b9ae04d93dc64b3694d580c6e56ad5bd1c62c Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 30 May 2025 13:06:43 +0200 Subject: [PATCH 3/9] mutex to avoid parallel writes --- agents/src/stream/injectable_stream.ts | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/agents/src/stream/injectable_stream.ts b/agents/src/stream/injectable_stream.ts index 3768a7535..0ce751d13 100644 --- a/agents/src/stream/injectable_stream.ts +++ b/agents/src/stream/injectable_stream.ts @@ -1,3 +1,4 @@ +import { Mutex } from '@livekit/mutex'; import { mergeReadableStreams } from '@std/streams'; import { IdentityTransform } from './identity_transform.js'; @@ -5,6 +6,7 @@ export class InjectableStream { private source: ReadableStream; private identityStream: IdentityTransform; private mergedStream: ReadableStream; + private injectMutex = new Mutex(); constructor(source: ReadableStream) { this.source = source; @@ -13,16 +15,24 @@ export class InjectableStream { } async inject(value: T) { - // note this will still fail for parallel writes - // we can acquire the writer in the constructor but this will lead to the problem with multiple sync loops blocking when trying to write - const writer = this.identityStream.writable.getWriter(); - await writer.write(value); - await writer.close(); + const unlock = await this.injectMutex.lock(); + try { + const writer = this.identityStream.writable.getWriter(); + await writer.write(value); + await writer.close(); + } finally { + unlock(); + } } async close() { - // this will not cancel the source stream but instead keep the readable open until the source finishes - await this.identityStream.writable.close(); + const unlock = await this.injectMutex.lock(); + try { + // this will not cancel the source stream but instead keep the readable open until the source finishes + await this.identityStream.writable.close(); + } finally { + unlock(); + } } async cancel(reason?: any) { From 7a523c5115d789aee5de09518aa8bafa086cdb50 Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 4 Jun 2025 00:21:42 -0700 Subject: [PATCH 4/9] remove explicit ReadableStream import --- agents/src/stream/deferred_stream.ts | 1 - agents/src/stt/stt.ts | 1 - agents/src/vad.ts | 6 +- agents/src/voice/agent.ts | 3 +- agents/src/voice/agent_activity.ts | 1 - agents/src/voice/agent_session.ts | 1 - agents/src/voice/audio_recognition.ts | 1 - agents/src/voice/io.ts | 1 - agents/src/voice/room_io.ts | 1 - pnpm-lock.yaml | 248 ++++++++++++++++++++++---- 10 files changed, 211 insertions(+), 53 deletions(-) diff --git a/agents/src/stream/deferred_stream.ts b/agents/src/stream/deferred_stream.ts index 305f5efec..defcac149 100644 --- a/agents/src/stream/deferred_stream.ts +++ b/agents/src/stream/deferred_stream.ts @@ -1,7 +1,6 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { type ReadableStream } from 'node:stream/web'; import { IdentityTransform } from './identity_transform.js'; export class DeferredReadableStream { diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index ed8a5a47b..e1723ab46 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -4,7 +4,6 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; diff --git a/agents/src/vad.ts b/agents/src/vad.ts index 2e135df3a..a91976826 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -4,11 +4,7 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import type { - ReadableStream, - ReadableStreamDefaultReader, - WritableStreamDefaultWriter, -} from 'node:stream/web'; + import { log } from './log.js'; import type { VADMetrics } from './metrics/base.js'; import { DeferredReadableStream } from './stream/deferred_stream.js'; diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts index d29d8b35b..2e405b0d2 100644 --- a/agents/src/voice/agent.ts +++ b/agents/src/voice/agent.ts @@ -6,11 +6,10 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { AudioFrame } from '@livekit/rtc-node'; -import { ReadableStream } from 'node:stream/web'; import type { ChatChunk, ChatMessage, LLM } from '../llm/index.js'; import { ChatContext } from '../llm/index.js'; -import { StreamAdapter as STTStreamAdapter } from '../stt/index.js'; import type { STT, SpeechEvent } from '../stt/index.js'; +import { StreamAdapter as STTStreamAdapter } from '../stt/index.js'; import type { TTS } from '../tts/index.js'; import type { VAD } from '../vad.js'; import type { AgentActivity } from './agent_activity.js'; diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 89d913ea8..1c411e0f8 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; -import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { STT, SpeechEvent } from '../stt/stt.js'; import type { VADEvent } from '../vad.js'; diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index b3dc43e7e..fb8ebcb6f 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame, AudioSource, Room } from '@livekit/rtc-node'; -import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { AgentState } from '../pipeline/index.js'; import type { STT } from '../stt/index.js'; diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 88699e6bf..a99c77d49 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; -import { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; import { type SpeechEvent, SpeechEventType } from '../stt/stt.js'; diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index 0a79bf0ae..ca2b8c47e 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; -import type { ReadableStream } from 'node:stream/web'; import type { SpeechEvent } from '../stt/stt.js'; export type STTNode = ( diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts index 5d60bbe0c..3b7baa5a3 100644 --- a/agents/src/voice/room_io.ts +++ b/agents/src/voice/room_io.ts @@ -13,7 +13,6 @@ import { TrackPublishOptions, TrackSource, } from '@livekit/rtc-node'; -import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; import type { AgentSession } from './agent_session.js'; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 881fedc0a..abd7451cb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16,10 +16,13 @@ importers: version: 0.0.4 '@rushstack/heft': specifier: ^0.66.0 - version: 0.66.9(@types/node@22.5.5) + version: 0.66.9(@types/node@22.15.29) '@trivago/prettier-plugin-sort-imports': specifier: ^4.3.0 version: 4.3.0(prettier@3.2.5) + '@types/node': + specifier: ^22.13.10 + version: 22.15.29 '@typescript-eslint/eslint-plugin': specifier: ^6.21.0 version: 6.21.0(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0)(typescript@5.4.5) @@ -37,7 +40,7 @@ importers: version: 8.10.0(eslint@8.57.0) eslint-config-standard: specifier: ^17.1.0 - version: 17.1.0(eslint-plugin-import@2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0))(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0) + version: 17.1.0(eslint-plugin-import@2.29.1)(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0) eslint-config-turbo: specifier: ^1.12.2 version: 1.13.3(eslint@8.57.0) @@ -73,7 +76,7 @@ importers: version: 5.4.5 vitest: specifier: ^1.6.0 - version: 1.6.0(@types/node@22.5.5) + version: 1.6.0(@types/node@22.15.29) agents: dependencies: @@ -86,6 +89,9 @@ importers: '@livekit/typed-emitter': specifier: ^3.0.0 version: 3.0.0 + '@std/streams': + specifier: jsr:^1.0.9 + version: '@jsr/std__streams@1.0.9' commander: specifier: ^12.0.0 version: 12.0.0 @@ -184,13 +190,13 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -215,13 +221,13 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -246,13 +252,13 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -271,13 +277,13 @@ importers: version: link:../../agents '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) onnxruntime-common: specifier: ^1.19.2 version: 1.19.2 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -302,13 +308,13 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -339,13 +345,13 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -370,13 +376,13 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -398,7 +404,7 @@ importers: version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 - version: 7.43.7(@types/node@22.5.5) + version: 7.43.7(@types/node@22.15.29) '@types/ws': specifier: ^8.5.10 version: 8.5.10 @@ -407,7 +413,7 @@ importers: version: 1.19.2 tsup: specifier: ^8.3.5 - version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) typescript: specifier: ^5.0.0 version: 5.4.5 @@ -1320,6 +1326,12 @@ packages: '@jridgewell/trace-mapping@0.3.25': resolution: {integrity: sha512-vNk6aEwybGtawWmy/PzwnGDOjCkLWSD2wqvjGGAgOAwCGWySYXfYoxt00IJkTF+8Lb57DwOb3Aa0o9CApepiYQ==} + '@jsr/std__bytes@1.0.6': + resolution: {integrity: sha512-St6yKggjFGhxS52IFLJWvkchRFbAKg2Xh8UxA4S1EGz7GJ2Ui+ssDDldj/w2c8vCxvl6qgR0HaYbKeFJNqujmA==, tarball: https://npm.jsr.io/~/11/@jsr/std__bytes/1.0.6.tgz} + + '@jsr/std__streams@1.0.9': + resolution: {integrity: sha512-nPkEijnOPwLQt6ZtvOajE6F2hCmRILaq0kR6I0JFVvxSsvhdeyFMGc8F4DqypCpfq/U+8umLcE0RNMcYdx4NlQ==, tarball: https://npm.jsr.io/~/11/@jsr/std__streams/1.0.9.tgz} + '@livekit/changesets-changelog-github@0.0.4': resolution: {integrity: sha512-MXaiLYwgkYciZb8G2wkVtZ1pJJzZmVx5cM30Q+ClslrIYyAqQhRbPmZDM79/5CGxb1MTemR/tfOM25tgJgAK0g==} @@ -1792,6 +1804,9 @@ packages: '@types/node@18.19.64': resolution: {integrity: sha512-955mDqvO2vFf/oL7V3WiUtiz+BugyX8uVbaT2H8oj3+8dRyH2FLiNdowe7eNqRM7IOIZvzDH76EoAT+gwm6aIQ==} + '@types/node@22.15.29': + resolution: {integrity: sha512-LNdjOkUDlU1RZb8e1kOIUpN1qQUlzGkEtbVNo53vbrwDg5om6oduhm4SiUaPW5ASTXhAiP0jInWG8Qx9fVlOeQ==} + '@types/node@22.5.5': resolution: {integrity: sha512-Xjs4y5UPO/CLdzpgR6GirZJx36yScjh73+2NlLlkFRSoQN8B0DpfXPdZGnvVmLRLOsqDpOfTNv7D9trgGhmOIA==} @@ -4258,6 +4273,9 @@ packages: undici-types@6.19.8: resolution: {integrity: sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==} + undici-types@6.21.0: + resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} + universalify@0.1.2: resolution: {integrity: sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg==} engines: {node: '>= 4.0.0'} @@ -5166,6 +5184,12 @@ snapshots: '@jridgewell/resolve-uri': 3.1.2 '@jridgewell/sourcemap-codec': 1.4.15 + '@jsr/std__bytes@1.0.6': {} + + '@jsr/std__streams@1.0.9': + dependencies: + '@jsr/std__bytes': 1.0.6 + '@livekit/changesets-changelog-github@0.0.4': dependencies: '@changesets/get-github-info': 0.5.2 @@ -5227,6 +5251,14 @@ snapshots: globby: 11.1.0 read-yaml-file: 1.1.0 + '@microsoft/api-extractor-model@7.28.17(@types/node@22.15.29)': + dependencies: + '@microsoft/tsdoc': 0.14.2 + '@microsoft/tsdoc-config': 0.16.2 + '@rushstack/node-core-library': 4.3.0(@types/node@22.15.29) + transitivePeerDependencies: + - '@types/node' + '@microsoft/api-extractor-model@7.28.17(@types/node@22.5.5)': dependencies: '@microsoft/tsdoc': 0.14.2 @@ -5235,6 +5267,24 @@ snapshots: transitivePeerDependencies: - '@types/node' + '@microsoft/api-extractor@7.43.7(@types/node@22.15.29)': + dependencies: + '@microsoft/api-extractor-model': 7.28.17(@types/node@22.15.29) + '@microsoft/tsdoc': 0.14.2 + '@microsoft/tsdoc-config': 0.16.2 + '@rushstack/node-core-library': 4.3.0(@types/node@22.15.29) + '@rushstack/rig-package': 0.5.2 + '@rushstack/terminal': 0.11.0(@types/node@22.15.29) + '@rushstack/ts-command-line': 4.21.0(@types/node@22.15.29) + lodash: 4.17.21 + minimatch: 3.0.8 + resolve: 1.22.8 + semver: 7.5.4 + source-map: 0.6.1 + typescript: 5.4.2 + transitivePeerDependencies: + - '@types/node' + '@microsoft/api-extractor@7.43.7(@types/node@22.5.5)': dependencies: '@microsoft/api-extractor-model': 7.28.17(@types/node@22.5.5) @@ -5470,23 +5520,23 @@ snapshots: '@rushstack/eslint-patch@1.10.3': {} - '@rushstack/heft-config-file@0.14.19(@types/node@22.5.5)': + '@rushstack/heft-config-file@0.14.19(@types/node@22.15.29)': dependencies: - '@rushstack/node-core-library': 4.3.0(@types/node@22.5.5) + '@rushstack/node-core-library': 4.3.0(@types/node@22.15.29) '@rushstack/rig-package': 0.5.2 - '@rushstack/terminal': 0.11.0(@types/node@22.5.5) + '@rushstack/terminal': 0.11.0(@types/node@22.15.29) jsonpath-plus: 4.0.0 transitivePeerDependencies: - '@types/node' - '@rushstack/heft@0.66.9(@types/node@22.5.5)': + '@rushstack/heft@0.66.9(@types/node@22.15.29)': dependencies: - '@rushstack/heft-config-file': 0.14.19(@types/node@22.5.5) - '@rushstack/node-core-library': 4.3.0(@types/node@22.5.5) - '@rushstack/operation-graph': 0.2.19(@types/node@22.5.5) + '@rushstack/heft-config-file': 0.14.19(@types/node@22.15.29) + '@rushstack/node-core-library': 4.3.0(@types/node@22.15.29) + '@rushstack/operation-graph': 0.2.19(@types/node@22.15.29) '@rushstack/rig-package': 0.5.2 - '@rushstack/terminal': 0.11.0(@types/node@22.5.5) - '@rushstack/ts-command-line': 4.21.0(@types/node@22.5.5) + '@rushstack/terminal': 0.11.0(@types/node@22.15.29) + '@rushstack/ts-command-line': 4.21.0(@types/node@22.15.29) '@types/tapable': 1.0.6 fast-glob: 3.3.2 git-repo-info: 2.1.1 @@ -5497,6 +5547,17 @@ snapshots: transitivePeerDependencies: - '@types/node' + '@rushstack/node-core-library@4.3.0(@types/node@22.15.29)': + dependencies: + fs-extra: 7.0.1 + import-lazy: 4.0.0 + jju: 1.4.0 + resolve: 1.22.8 + semver: 7.5.4 + z-schema: 5.0.5 + optionalDependencies: + '@types/node': 22.15.29 + '@rushstack/node-core-library@4.3.0(@types/node@22.5.5)': dependencies: fs-extra: 7.0.1 @@ -5508,18 +5569,25 @@ snapshots: optionalDependencies: '@types/node': 22.5.5 - '@rushstack/operation-graph@0.2.19(@types/node@22.5.5)': + '@rushstack/operation-graph@0.2.19(@types/node@22.15.29)': dependencies: - '@rushstack/node-core-library': 4.3.0(@types/node@22.5.5) - '@rushstack/terminal': 0.11.0(@types/node@22.5.5) + '@rushstack/node-core-library': 4.3.0(@types/node@22.15.29) + '@rushstack/terminal': 0.11.0(@types/node@22.15.29) optionalDependencies: - '@types/node': 22.5.5 + '@types/node': 22.15.29 '@rushstack/rig-package@0.5.2': dependencies: resolve: 1.22.8 strip-json-comments: 3.1.1 + '@rushstack/terminal@0.11.0(@types/node@22.15.29)': + dependencies: + '@rushstack/node-core-library': 4.3.0(@types/node@22.15.29) + supports-color: 8.1.1 + optionalDependencies: + '@types/node': 22.15.29 + '@rushstack/terminal@0.11.0(@types/node@22.5.5)': dependencies: '@rushstack/node-core-library': 4.3.0(@types/node@22.5.5) @@ -5527,6 +5595,15 @@ snapshots: optionalDependencies: '@types/node': 22.5.5 + '@rushstack/ts-command-line@4.21.0(@types/node@22.15.29)': + dependencies: + '@rushstack/terminal': 0.11.0(@types/node@22.15.29) + '@types/argparse': 1.0.38 + argparse: 1.0.10 + string-argv: 0.3.2 + transitivePeerDependencies: + - '@types/node' + '@rushstack/ts-command-line@4.21.0(@types/node@22.5.5)': dependencies: '@rushstack/terminal': 0.11.0(@types/node@22.5.5) @@ -5566,7 +5643,7 @@ snapshots: '@types/node-fetch@2.6.11': dependencies: - '@types/node': 22.5.5 + '@types/node': 22.15.29 form-data: 4.0.1 '@types/node@12.20.55': {} @@ -5575,6 +5652,10 @@ snapshots: dependencies: undici-types: 5.26.5 + '@types/node@22.15.29': + dependencies: + undici-types: 6.21.0 + '@types/node@22.5.5': dependencies: undici-types: 6.19.8 @@ -5587,7 +5668,7 @@ snapshots: '@types/ws@8.5.10': dependencies: - '@types/node': 22.5.5 + '@types/node': 22.15.29 '@typescript-eslint/eslint-plugin@6.21.0(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0)(typescript@5.4.5)': dependencies: @@ -6403,7 +6484,7 @@ snapshots: dependencies: eslint: 8.57.0 - eslint-config-standard@17.1.0(eslint-plugin-import@2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0))(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0): + eslint-config-standard@17.1.0(eslint-plugin-import@2.29.1)(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0): dependencies: eslint: 8.57.0 eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) @@ -6428,7 +6509,7 @@ snapshots: debug: 4.3.4 enhanced-resolve: 5.16.1 eslint: 8.57.0 - eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0) + eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) fast-glob: 3.3.2 get-tsconfig: 4.7.5 @@ -6440,7 +6521,7 @@ snapshots: - eslint-import-resolver-webpack - supports-color - eslint-module-utils@2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0): + eslint-module-utils@2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0): dependencies: debug: 3.2.7 optionalDependencies: @@ -6468,7 +6549,7 @@ snapshots: doctrine: 2.1.0 eslint: 8.57.0 eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0) + eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) hasown: 2.0.2 is-core-module: 2.13.1 is-glob: 4.0.3 @@ -7658,7 +7739,7 @@ snapshots: '@protobufjs/path': 1.1.2 '@protobufjs/pool': 1.1.0 '@protobufjs/utf8': 1.1.0 - '@types/node': 22.5.5 + '@types/node': 22.15.29 long: 5.2.3 pseudomap@1.0.2: {} @@ -8219,6 +8300,34 @@ snapshots: tslib@2.6.2: {} + tsup@8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.15.29))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5): + dependencies: + bundle-require: 5.0.0(esbuild@0.24.0) + cac: 6.7.14 + chokidar: 4.0.1 + consola: 3.2.3 + debug: 4.3.7 + esbuild: 0.24.0 + joycon: 3.1.1 + picocolors: 1.1.1 + postcss-load-config: 6.0.1(postcss@8.4.38)(tsx@4.19.2) + resolve-from: 5.0.0 + rollup: 4.27.3 + source-map: 0.8.0-beta.0 + sucrase: 3.35.0 + tinyexec: 0.3.1 + tinyglobby: 0.2.10 + tree-kill: 1.2.2 + optionalDependencies: + '@microsoft/api-extractor': 7.43.7(@types/node@22.15.29) + postcss: 8.4.38 + typescript: 5.4.5 + transitivePeerDependencies: + - jiti + - supports-color + - tsx + - yaml + tsup@8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5): dependencies: bundle-require: 5.0.0(esbuild@0.24.0) @@ -8392,6 +8501,8 @@ snapshots: undici-types@6.19.8: {} + undici-types@6.21.0: {} + universalify@0.1.2: {} uri-js@4.4.1: @@ -8405,6 +8516,23 @@ snapshots: validator@13.12.0: {} + vite-node@1.6.0(@types/node@22.15.29): + dependencies: + cac: 6.7.14 + debug: 4.3.4 + pathe: 1.1.2 + picocolors: 1.0.1 + vite: 5.2.11(@types/node@22.15.29) + transitivePeerDependencies: + - '@types/node' + - less + - lightningcss + - sass + - stylus + - sugarss + - supports-color + - terser + vite-node@1.6.0(@types/node@22.5.5): dependencies: cac: 6.7.14 @@ -8422,6 +8550,15 @@ snapshots: - supports-color - terser + vite@5.2.11(@types/node@22.15.29): + dependencies: + esbuild: 0.20.2 + postcss: 8.4.38 + rollup: 4.17.2 + optionalDependencies: + '@types/node': 22.15.29 + fsevents: 2.3.3 + vite@5.2.11(@types/node@22.5.5): dependencies: esbuild: 0.20.2 @@ -8431,6 +8568,39 @@ snapshots: '@types/node': 22.5.5 fsevents: 2.3.3 + vitest@1.6.0(@types/node@22.15.29): + dependencies: + '@vitest/expect': 1.6.0 + '@vitest/runner': 1.6.0 + '@vitest/snapshot': 1.6.0 + '@vitest/spy': 1.6.0 + '@vitest/utils': 1.6.0 + acorn-walk: 8.3.2 + chai: 4.4.1 + debug: 4.3.4 + execa: 8.0.1 + local-pkg: 0.5.0 + magic-string: 0.30.10 + pathe: 1.1.2 + picocolors: 1.0.1 + std-env: 3.7.0 + strip-literal: 2.1.0 + tinybench: 2.8.0 + tinypool: 0.8.4 + vite: 5.2.11(@types/node@22.15.29) + vite-node: 1.6.0(@types/node@22.15.29) + why-is-node-running: 2.2.2 + optionalDependencies: + '@types/node': 22.15.29 + transitivePeerDependencies: + - less + - lightningcss + - sass + - stylus + - sugarss + - supports-color + - terser + vitest@1.6.0(@types/node@22.5.5): dependencies: '@vitest/expect': 1.6.0 From 257b467836990c3a0519b0a991a018ee32d151ae Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 4 Jun 2025 00:25:05 -0700 Subject: [PATCH 5/9] rm imports --- agents/src/tts/tts.ts | 1 - agents/src/voice/generation.ts | 1 - 2 files changed, 2 deletions(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index c37832731..58df55f97 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -4,7 +4,6 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { TTSMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; diff --git a/agents/src/voice/generation.ts b/agents/src/voice/generation.ts index 01fe11e4d..9ad8d3804 100644 --- a/agents/src/voice/generation.ts +++ b/agents/src/voice/generation.ts @@ -3,7 +3,6 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame, AudioSource } from '@livekit/rtc-node'; import { randomUUID } from 'node:crypto'; -import type { ReadableStream } from 'stream/web'; import type { ChatContext } from '../llm/chat_context.js'; import { IdentityTransform } from '../stream/identity_transform.js'; import { Future } from '../utils.js'; From 4a62e54e5facdb5a7b166bbd98a3d6e5e481ada5 Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 4 Jun 2025 15:05:46 -0700 Subject: [PATCH 6/9] fix partial --- agents/src/stream/injectable_stream.ts | 14 +- agents/src/voice/room_io.ts | 2 +- agents/tests/stream/injectable_stream.test.ts | 344 ++++++++++++++++++ agents/tsconfig.json | 1 + 4 files changed, 357 insertions(+), 4 deletions(-) create mode 100644 agents/tests/stream/injectable_stream.test.ts diff --git a/agents/src/stream/injectable_stream.ts b/agents/src/stream/injectable_stream.ts index 0ce751d13..9f34f170c 100644 --- a/agents/src/stream/injectable_stream.ts +++ b/agents/src/stream/injectable_stream.ts @@ -7,19 +7,25 @@ export class InjectableStream { private identityStream: IdentityTransform; private mergedStream: ReadableStream; private injectMutex = new Mutex(); + private writer: WritableStreamDefaultWriter; + private closed = false; constructor(source: ReadableStream) { this.source = source; this.identityStream = new IdentityTransform(); this.mergedStream = mergeReadableStreams(this.source, this.identityStream.readable); + this.writer = this.identityStream.writable.getWriter(); } async inject(value: T) { const unlock = await this.injectMutex.lock(); + + if (this.closed) { + throw new Error('Cannot inject into a closed stream'); + } + try { - const writer = this.identityStream.writable.getWriter(); - await writer.write(value); - await writer.close(); + await this.writer.write(value); } finally { unlock(); } @@ -29,7 +35,9 @@ export class InjectableStream { const unlock = await this.injectMutex.lock(); try { // this will not cancel the source stream but instead keep the readable open until the source finishes + this.writer.releaseLock(); await this.identityStream.writable.close(); + this.closed = true; } finally { unlock(); } diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts index 7e94f20f3..626006297 100644 --- a/agents/src/voice/room_io.ts +++ b/agents/src/voice/room_io.ts @@ -56,7 +56,7 @@ export class RoomIO { // TODO(AJS-41) remove hardcoded sample rate sampleRate: 16000, numChannels: 1, - }), + }) as ReadableStream, ); } }; diff --git a/agents/tests/stream/injectable_stream.test.ts b/agents/tests/stream/injectable_stream.test.ts new file mode 100644 index 000000000..ec2608606 --- /dev/null +++ b/agents/tests/stream/injectable_stream.test.ts @@ -0,0 +1,344 @@ +import { describe, expect, it } from 'vitest'; +import { InjectableStream } from '../../src/stream/injectable_stream.js'; + +describe('InjectableStream', () => { + // Helper to create a readable stream from an array + function createReadableStream(items: T[]): ReadableStream { + return new ReadableStream({ + start(controller) { + for (const item of items) { + controller.enqueue(item); + } + controller.close(); + }, + }); + } + + // Helper to collect all values from a stream + async function collectStream(stream: InjectableStream): Promise { + const reader = stream.readable.getReader(); + const values: T[] = []; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + values.push(value); + } + } finally { + reader.releaseLock(); + } + + return values; + } + + // Helper to create a controlled source stream + function createControlledStream() { + let controller: ReadableStreamDefaultController; + const stream = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + return { + stream, + enqueue: (value: T) => controller.enqueue(value), + close: () => controller.close(), + error: (e: any) => controller.error(e), + }; + } + + describe('Happy Path', () => { + it('should pass through source stream data without injection', async () => { + const sourceData = [1, 2, 3, 4, 5]; + const source = createReadableStream(sourceData); + const injectable = new InjectableStream(source); + injectable.close(); + + const result = await collectStream(injectable); + expect(result).toEqual(sourceData); + }); + + it('should handle empty source stream', async () => { + const source = createReadableStream([]); + const injectable = new InjectableStream(source); + injectable.close(); + + const result = await collectStream(injectable); + expect(result).toEqual([]); + }); + }); + + describe('Read/Write with Inject', () => { + it('should merge injected values with source stream', async () => { + const controlled = createControlledStream(); + const injectable = new InjectableStream(controlled.stream); + + // Start reading + const readPromise = collectStream(injectable); + + // Enqueue source values + controlled.enqueue('source1'); + controlled.enqueue('source2'); + + // Inject a value + await injectable.inject('injected1'); + + // More source values + controlled.enqueue('source3'); + + // Close source + controlled.close(); + injectable.close(); // close the injectable stream + + const result = await readPromise; + + // The order might vary due to merging, but all values should be present + expect(result).toHaveLength(4); + expect(result).toContain('source1'); + expect(result).toContain('source2'); + expect(result).toContain('source3'); + expect(result).toContain('injected1'); + }); + + it('should handle multiple injections (if supported)', async () => { + const controlled = createControlledStream(); + const injectable = new InjectableStream(controlled.stream); + + const readPromise = collectStream(injectable); + + // Multiple injections should now work + await injectable.inject(100); + await injectable.inject(200); + await injectable.inject(300); + + controlled.close(); + injectable.close(); + + const result = await readPromise; + expect(result).toEqual([100, 200, 300]); + }); + }); + + describe('After Close', () => { + it('should continue reading from source after close', async () => { + const controlled = createControlledStream(); + const injectable = new InjectableStream(controlled.stream); + + // Start reading + const reader = injectable.readable.getReader(); + const values: string[] = []; + + // Read first value + controlled.enqueue('before-close'); + let result = await reader.read(); + if (!result.done) values.push(result.value); + + // Close injectable + await injectable.close(); + + // Source should still work + controlled.enqueue('after-close-1'); + controlled.enqueue('after-close-2'); + controlled.close(); + + // Continue reading + while (true) { + result = await reader.read(); + if (result.done) break; + values.push(result.value); + } + + reader.releaseLock(); + + expect(values).toEqual(['before-close', 'after-close-1', 'after-close-2']); + }); + + it('should prevent injection after close', async () => { + const source = createReadableStream([1, 2, 3]); + const injectable = new InjectableStream(source); + + await injectable.close(); + + try { + await injectable.inject(999); + expect.fail('Expected inject to fail'); + } catch (e) { + expect(e).toBeInstanceOf(Error); + } + }); + }); + + describe('After Cancel', () => { + it('should cancel both streams when cancel is called', async () => { + const controlled = createControlledStream(); + const injectable = new InjectableStream(controlled.stream); + + const reader = injectable.readable.getReader(); + + // Cancel the stream + await injectable.cancel('test cancellation'); + + const { done } = await reader.read(); + expect(done).toBe(true); + + reader.releaseLock(); + }); + + it('should prevent injection after cancel', async () => { + const source = createReadableStream([1, 2, 3]); + const injectable = new InjectableStream(source); + + await injectable.cancel(); + + // Injection should fail after cancel + await expect(injectable.inject(999)).rejects.toThrow(); + }); + + it('should propagate cancel reason', async () => { + const controlled = createControlledStream(); + const injectable = new InjectableStream(controlled.stream); + + const reason = new Error('Custom cancel reason'); + + // Start reading to see if error propagates + const reader = injectable.readable.getReader(); + const readPromise = reader.read(); + + await injectable.cancel(reason); + + // The read should complete with done=true (cancel doesn't necessarily propagate as error to reader) + const result = await readPromise; + expect(result.done).toBe(true); + + reader.releaseLock(); + }); + }); + +// describe('Complex Cases', () => { +// it('should handle concurrent injections safely', async () => { +// const controlled = createControlledStream(); +// const injectable = new InjectableStream(controlled.stream); + +// const readPromise = collectStream(injectable); + +// // Try concurrent injections (mutex should serialize them) +// const injectPromises = [ +// injectable.inject(1), +// injectable.inject(2), +// injectable.inject(3), +// ]; + +// // Wait for all injections to complete (some might fail) +// await Promise.allSettled(injectPromises); + +// await injectable.close(); +// await controlled.close(); + +// const values = await readPromise; + +// // At least the first injection should succeed +// expect(values.sort()).toEqual([1, 2, 3]); +// }); + +// it('should handle backpressure correctly', async () => { +// const controlled = createControlledStream(); +// const injectable = new InjectableStream(controlled.stream); + +// // Create a slow reader to induce backpressure +// const reader = injectable.readable.getReader(); +// const values: number[] = []; + +// // Enqueue many values quickly +// for (let i = 0; i < 10; i++) { +// controlled.enqueue(i); +// } + +// // Try to inject while there's backpressure +// const injectPromise = injectable.inject(999); + +// // Slowly read values +// for (let i = 0; i < 5; i++) { +// const { done, value } = await reader.read(); +// if (!done) values.push(value); +// await new Promise(resolve => setTimeout(resolve, 10)); +// } + +// await injectPromise; +// controlled.close(); + +// // Read remaining values +// while (true) { +// const { done, value } = await reader.read(); +// if (done) break; +// values.push(value); +// } + +// reader.releaseLock(); + +// // All values should be present +// expect(values.length).toBeGreaterThan(5); +// expect(values).toContain(999); +// }); + +// it('should handle source stream errors', async () => { +// const controlled = createControlledStream(); +// const injectable = new InjectableStream(controlled.stream); + +// const reader = injectable.readable.getReader(); + +// controlled.enqueue('value1'); + +// // Error the source stream +// const error = new Error('Source stream error'); +// controlled.error(error); + +// // First read should succeed +// const result1 = await reader.read(); +// expect(result1.done).toBe(false); +// expect(result1.value).toBe('value1'); + +// // Next read should propagate the error +// await expect(reader.read()).rejects.toThrow('Source stream error'); + +// reader.releaseLock(); +// }); + +// it('should handle injection during active read', async () => { +// const controlled = createControlledStream(); +// const injectable = new InjectableStream(controlled.stream); + +// const reader = injectable.readable.getReader(); + +// // Start a read that will wait +// const readPromise = reader.read(); + +// // Inject while read is pending +// await injectable.inject('injected'); + +// // The read should resolve with the injected value (or source value if it comes first) +// const { done, value } = await readPromise; +// expect(done).toBe(false); +// expect(value).toBe('injected'); + +// controlled.close(); +// reader.releaseLock(); +// }); +// }); + +// describe('Implementation Issues', () => { +// it('multiple injections now work correctly', async () => { +// const source = createReadableStream([]); +// const injectable = new InjectableStream(source); + +// // All injections should work now +// await expect(injectable.inject('first')).resolves.not.toThrow(); +// await expect(injectable.inject('second')).resolves.not.toThrow(); +// await expect(injectable.inject('third')).resolves.not.toThrow(); + +// const result = await collectStream(injectable); +// expect(result).toEqual(['first', 'second', 'third']); +// }); +// }); +}); diff --git a/agents/tsconfig.json b/agents/tsconfig.json index 48b3ae203..9a6122d43 100644 --- a/agents/tsconfig.json +++ b/agents/tsconfig.json @@ -2,6 +2,7 @@ "extends": "../tsconfig.json", "compilerOptions": { + "types": ["node"], "rootDir": "./src", "declarationDir": "dist", "outDir": "dist" From 5165585cb3d8f18a2f46a02d28c41e0b1881edd1 Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 4 Jun 2025 15:26:25 -0700 Subject: [PATCH 7/9] not fixed yet --- agents/src/stream/injectable_stream.ts | 6 ++++++ agents/tests/stream/injectable_stream.test.ts | 1 + 2 files changed, 7 insertions(+) diff --git a/agents/src/stream/injectable_stream.ts b/agents/src/stream/injectable_stream.ts index 9f34f170c..6768f72fa 100644 --- a/agents/src/stream/injectable_stream.ts +++ b/agents/src/stream/injectable_stream.ts @@ -33,6 +33,11 @@ export class InjectableStream { async close() { const unlock = await this.injectMutex.lock(); + + if (this.closed) { + return; + } + try { // this will not cancel the source stream but instead keep the readable open until the source finishes this.writer.releaseLock(); @@ -44,6 +49,7 @@ export class InjectableStream { } async cancel(reason?: any) { + await this.close(); await Promise.all([ this.mergedStream.cancel(reason), this.identityStream.writable.abort(reason), diff --git a/agents/tests/stream/injectable_stream.test.ts b/agents/tests/stream/injectable_stream.test.ts index ec2608606..79b4127f5 100644 --- a/agents/tests/stream/injectable_stream.test.ts +++ b/agents/tests/stream/injectable_stream.test.ts @@ -176,6 +176,7 @@ describe('InjectableStream', () => { const injectable = new InjectableStream(controlled.stream); const reader = injectable.readable.getReader(); + reader.releaseLock(); // Cancel the stream await injectable.cancel('test cancellation'); From e9111f7ea457abdedd949ca6314157d8420a7bb2 Mon Sep 17 00:00:00 2001 From: Toubat Date: Mon, 9 Jun 2025 12:35:09 -0700 Subject: [PATCH 8/9] revert typing --- agents/src/stream/identity_transform.ts | 4 ++- agents/src/stream/injectable_stream.ts | 36 +++++++++++++++++++ agents/src/stt/stt.ts | 1 + agents/src/tts/tts.ts | 1 + agents/src/vad.ts | 5 +++ agents/src/voice/agent.ts | 1 + agents/src/voice/agent_activity.ts | 1 + agents/src/voice/agent_session.ts | 1 + agents/src/voice/audio_recognition.ts | 1 + agents/src/voice/generation.ts | 1 + agents/src/voice/io.ts | 1 + agents/src/voice/room_io.ts | 3 +- agents/tests/stream/injectable_stream.test.ts | 1 + agents/tsconfig.json | 1 + 14 files changed, 56 insertions(+), 2 deletions(-) diff --git a/agents/src/stream/identity_transform.ts b/agents/src/stream/identity_transform.ts index c4f0abb2b..93aa940a8 100644 --- a/agents/src/stream/identity_transform.ts +++ b/agents/src/stream/identity_transform.ts @@ -2,10 +2,12 @@ // // SPDX-License-Identifier: Apache-2.0 +import { TransformStream } from 'node:stream/web'; + export class IdentityTransform extends TransformStream { constructor() { super({ transform: (chunk, controller) => controller.enqueue(chunk), }); } -} +} \ No newline at end of file diff --git a/agents/src/stream/injectable_stream.ts b/agents/src/stream/injectable_stream.ts index 6768f72fa..60277f6fb 100644 --- a/agents/src/stream/injectable_stream.ts +++ b/agents/src/stream/injectable_stream.ts @@ -1,5 +1,6 @@ import { Mutex } from '@livekit/mutex'; import { mergeReadableStreams } from '@std/streams'; +import type { ReadableStream } from 'node:stream/web'; import { IdentityTransform } from './identity_transform.js'; export class InjectableStream { @@ -60,3 +61,38 @@ export class InjectableStream { return this.mergedStream; } } + +// // Copied from @std/streams/merge-readable-streams.ts to avoid incompetible ReadableStream types +// export function mergeReadableStreams( +// ...streams: ReadableStream[] +// ): ReadableStream { +// const resolvePromises = streams.map(() => Promise.withResolvers()); +// return new ReadableStream({ +// start(controller) { +// let mustClose = false; +// Promise.all(resolvePromises.map(({ promise }) => promise)) +// .then(() => { +// controller.close(); +// }) +// .catch((error) => { +// mustClose = true; +// controller.error(error); +// }); +// for (const [index, stream] of streams.entries()) { +// (async () => { +// try { +// for await (const data of stream) { +// if (mustClose) { +// break; +// } +// controller.enqueue(data); +// } +// resolvePromises[index]!.resolve(); +// } catch (error) { +// resolvePromises[index]!.reject(error); +// } +// })(); +// } +// }, +// }); +// } diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index e1723ab46..ed8a5a47b 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -4,6 +4,7 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 58df55f97..c37832731 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -4,6 +4,7 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { TTSMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; diff --git a/agents/src/vad.ts b/agents/src/vad.ts index a91976826..fb082e5e2 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -5,6 +5,11 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import type { + ReadableStream, + ReadableStreamDefaultReader, + WritableStreamDefaultWriter, +} from 'node:stream/web'; import { log } from './log.js'; import type { VADMetrics } from './metrics/base.js'; import { DeferredReadableStream } from './stream/deferred_stream.js'; diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts index 06cee3d96..5f41f368c 100644 --- a/agents/src/voice/agent.ts +++ b/agents/src/voice/agent.ts @@ -6,6 +6,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { AudioFrame } from '@livekit/rtc-node'; +import { ReadableStream } from 'node:stream/web'; import type { ChatChunk, ChatMessage, LLM } from '../llm/index.js'; import { ChatContext } from '../llm/index.js'; import type { STT, SpeechEvent } from '../stt/index.js'; diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 907370604..aa730c419 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import { Heap } from 'heap-js'; +import type { ReadableStream } from 'node:stream/web'; import { type ChatContext, ChatMessage, ChatRole } from '../llm/chat_context.js'; import type { LLM } from '../llm/index.js'; import { log } from '../log.js'; diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 81a4552c2..eb9377f5f 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame, AudioSource, Room } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; import type { ChatMessage } from '../llm/chat_context.js'; import { ChatContext } from '../llm/chat_context.js'; import type { LLM } from '../llm/index.js'; diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 0d2fc0fb8..f3b4644ef 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import { delay } from '@std/async'; +import { ReadableStream } from 'node:stream/web'; import { type ChatContext, ChatRole } from '../llm/chat_context.js'; import { log } from '../log.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; diff --git a/agents/src/voice/generation.ts b/agents/src/voice/generation.ts index 9ad8d3804..4e47673f1 100644 --- a/agents/src/voice/generation.ts +++ b/agents/src/voice/generation.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame, AudioSource } from '@livekit/rtc-node'; import { randomUUID } from 'node:crypto'; +import type { ReadableStream } from 'node:stream/web'; import type { ChatContext } from '../llm/chat_context.js'; import { IdentityTransform } from '../stream/identity_transform.js'; import { Future } from '../utils.js'; diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index bf82ab380..dd989f6d5 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; import type { ChatContext } from '../llm/chat_context.js'; import type { ChatChunk } from '../llm/llm.js'; import type { SpeechEvent } from '../stt/stt.js'; diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts index 626006297..65e9b9638 100644 --- a/agents/src/voice/room_io.ts +++ b/agents/src/voice/room_io.ts @@ -13,6 +13,7 @@ import { TrackPublishOptions, TrackSource, } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; import type { AgentSession } from './agent_session.js'; @@ -56,7 +57,7 @@ export class RoomIO { // TODO(AJS-41) remove hardcoded sample rate sampleRate: 16000, numChannels: 1, - }) as ReadableStream, + }), ); } }; diff --git a/agents/tests/stream/injectable_stream.test.ts b/agents/tests/stream/injectable_stream.test.ts index 79b4127f5..42a3c8fe2 100644 --- a/agents/tests/stream/injectable_stream.test.ts +++ b/agents/tests/stream/injectable_stream.test.ts @@ -1,3 +1,4 @@ +import { ReadableStream } from 'node:stream/web'; import { describe, expect, it } from 'vitest'; import { InjectableStream } from '../../src/stream/injectable_stream.js'; diff --git a/agents/tsconfig.json b/agents/tsconfig.json index 9a6122d43..a3e6c2ca4 100644 --- a/agents/tsconfig.json +++ b/agents/tsconfig.json @@ -3,6 +3,7 @@ "compilerOptions": { "types": ["node"], + "lib": ["es2024"], "rootDir": "./src", "declarationDir": "dist", "outDir": "dist" From 141f4cf65345b43770048705a31b34b8d883123b Mon Sep 17 00:00:00 2001 From: Toubat Date: Mon, 9 Jun 2025 12:35:29 -0700 Subject: [PATCH 9/9] lint --- agents/src/stream/identity_transform.ts | 3 +-- agents/src/vad.ts | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/agents/src/stream/identity_transform.ts b/agents/src/stream/identity_transform.ts index 93aa940a8..cb83f091f 100644 --- a/agents/src/stream/identity_transform.ts +++ b/agents/src/stream/identity_transform.ts @@ -1,7 +1,6 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - import { TransformStream } from 'node:stream/web'; export class IdentityTransform extends TransformStream { @@ -10,4 +9,4 @@ export class IdentityTransform extends TransformStream { transform: (chunk, controller) => controller.enqueue(chunk), }); } -} \ No newline at end of file +} diff --git a/agents/src/vad.ts b/agents/src/vad.ts index fb082e5e2..2e135df3a 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -4,7 +4,6 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; - import type { ReadableStream, ReadableStreamDefaultReader,