Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/curvy-coins-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@livekit/agents": patch
"@livekit/agents-plugin-google": patch
"@livekit/agents-plugin-livekit": patch
"@livekit/agents-plugin-openai": patch
"livekit-agents-examples": patch
---

Implement AgentTask feature
53 changes: 20 additions & 33 deletions agents/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ const runServer = async (args: CliArgs) => {
* ```
*/
export const runApp = (opts: ServerOptions) => {
const logLevelOption = (defaultLevel: string) =>
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default(defaultLevel)
.env('LOG_LEVEL');

const program = new Command()
.name('agents')
.description('LiveKit Agents CLI')
.version(version)
.addOption(
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default('info')
.env('LOG_LEVEL'),
)
.addOption(
new Option('--url <string>', 'LiveKit server or Cloud project websocket URL').env(
'LIVEKIT_URL',
Expand Down Expand Up @@ -120,13 +120,15 @@ export const runApp = (opts: ServerOptions) => {
program
.command('start')
.description('Start the worker in production mode')
.action(() => {
const options = program.optsWithGlobals();
opts.wsURL = options.url || opts.wsURL;
opts.apiKey = options.apiKey || opts.apiKey;
opts.apiSecret = options.apiSecret || opts.apiSecret;
opts.logLevel = options.logLevel || opts.logLevel;
opts.workerToken = options.workerToken || opts.workerToken;
.addOption(logLevelOption('info'))
.action((...[, command]) => {
const globalOptions = program.optsWithGlobals();
const commandOptions = command.opts();
opts.wsURL = globalOptions.url || opts.wsURL;
opts.apiKey = globalOptions.apiKey || opts.apiKey;
opts.apiSecret = globalOptions.apiSecret || opts.apiSecret;
opts.logLevel = commandOptions.logLevel;
opts.workerToken = globalOptions.workerToken || opts.workerToken;
runServer({
opts,
production: true,
Expand All @@ -137,19 +139,14 @@ export const runApp = (opts: ServerOptions) => {
program
.command('dev')
.description('Start the worker in development mode')
.addOption(
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default('debug')
.env('LOG_LEVEL'),
)
.addOption(logLevelOption('debug'))
.action((...[, command]) => {
const globalOptions = program.optsWithGlobals();
const commandOptions = command.opts();
opts.wsURL = globalOptions.url || opts.wsURL;
opts.apiKey = globalOptions.apiKey || opts.apiKey;
opts.apiSecret = globalOptions.apiSecret || opts.apiSecret;
opts.logLevel = commandOptions.logLevel || globalOptions.logLevel || opts.logLevel;
opts.logLevel = commandOptions.logLevel;
opts.workerToken = globalOptions.workerToken || opts.workerToken;
runServer({
opts,
Expand All @@ -163,19 +160,14 @@ export const runApp = (opts: ServerOptions) => {
.description('Connect to a specific room')
.requiredOption('--room <string>', 'Room name to connect to')
.option('--participant-identity <string>', 'Identity of user to listen to')
.addOption(
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default('debug')
.env('LOG_LEVEL'),
)
.addOption(logLevelOption('info'))
.action((...[, command]) => {
const globalOptions = program.optsWithGlobals();
const commandOptions = command.opts();
opts.wsURL = globalOptions.url || opts.wsURL;
opts.apiKey = globalOptions.apiKey || opts.apiKey;
opts.apiSecret = globalOptions.apiSecret || opts.apiSecret;
opts.logLevel = commandOptions.logLevel || globalOptions.logLevel || opts.logLevel;
opts.logLevel = commandOptions.logLevel;
opts.workerToken = globalOptions.workerToken || opts.workerToken;
runServer({
opts,
Expand All @@ -189,12 +181,7 @@ export const runApp = (opts: ServerOptions) => {
program
.command('download-files')
.description('Download plugin dependency files')
.addOption(
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default('debug')
.env('LOG_LEVEL'),
)
.addOption(logLevelOption('debug'))
.action((...[, command]) => {
const commandOptions = command.opts();
initializeLogger({ pretty: true, level: commandOptions.logLevel });
Expand Down
21 changes: 16 additions & 5 deletions agents/src/ipc/job_proc_lazy_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ import type { IPCMessage } from './message.js';

const ORPHANED_TIMEOUT = 15 * 1000;

const safeSend = (msg: IPCMessage): boolean => {
if (process.connected && process.send) {
process.send(msg);
return true;
}
return false;
};

type JobTask = {
ctx: JobContext;
task: Promise<void>;
Expand Down Expand Up @@ -50,7 +58,10 @@ class InfClient implements InferenceExecutor {

async doInference(method: string, data: unknown): Promise<unknown> {
const requestId = shortuuid('inference_job_');
process.send!({ case: 'inferenceRequest', value: { requestId, method, data } });
if (!safeSend({ case: 'inferenceRequest', value: { requestId, method, data } })) {
throw new Error('IPC channel closed');
}

this.#requests[requestId] = new PendingInference();
const resp = await this.#requests[requestId]!.promise;
if (resp.error) {
Expand Down Expand Up @@ -117,7 +128,7 @@ const startJob = (
await once(closeEvent, 'close').then((close) => {
logger.debug('shutting down');
shutdown = true;
process.send!({ case: 'exiting', value: { reason: close[1] } });
safeSend({ case: 'exiting', value: { reason: close[1] } });
});

// Close the primary agent session if it exists
Expand All @@ -139,7 +150,7 @@ const startJob = (
logger.error({ error }, 'error while shutting down the job'),
);

process.send!({ case: 'done' });
safeSend({ case: 'done', value: undefined });
joinFuture.resolve();
})();

Expand Down Expand Up @@ -199,7 +210,7 @@ const startJob = (
logger.debug('initializing job runner');
await agent.prewarm(proc);
logger.debug('job runner initialized');
process.send({ case: 'initializeResponse' });
safeSend({ case: 'initializeResponse', value: undefined });

let job: JobTask | undefined = undefined;
const closeEvent = new EventEmitter();
Expand All @@ -213,7 +224,7 @@ const startJob = (
switch (msg.case) {
case 'pingRequest': {
orphanedTimeout.refresh();
process.send!({
safeSend({
case: 'pongResponse',
value: { lastTimestamp: msg.value.timestamp, timestamp: Date.now() },
});
Expand Down
35 changes: 35 additions & 0 deletions agents/src/llm/chat_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,41 @@ export class ChatContext {
return new ChatContext(items);
}

merge(
other: ChatContext,
options: {
excludeFunctionCall?: boolean;
excludeInstructions?: boolean;
} = {},
): ChatContext {
const { excludeFunctionCall = false, excludeInstructions = false } = options;
const existingIds = new Set(this._items.map((item) => item.id));

for (const item of other.items) {
if (excludeFunctionCall && ['function_call', 'function_call_output'].includes(item.type)) {
continue;
}

if (
excludeInstructions &&
item.type === 'message' &&
(item.role === 'system' || item.role === 'developer')
) {
continue;
}

if (existingIds.has(item.id)) {
continue;
}

const idx = this.findInsertionIndex(item.createdAt);
this._items.splice(idx, 0, item);
existingIds.add(item.id);
}

return this;
}

truncate(maxItems: number): ChatContext {
if (maxItems <= 0) return this;

Expand Down
10 changes: 6 additions & 4 deletions agents/src/llm/provider_format/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ class ChatItemGroup {
}

removeInvalidToolCalls() {
if (this.toolCalls.length === this.toolOutputs.length) {
return;
}

const toolCallIds = new Set(this.toolCalls.map((call) => call.callId));
const toolOutputIds = new Set(this.toolOutputs.map((output) => output.callId));
const sameIds =
toolCallIds.size === toolOutputIds.size &&
[...toolCallIds].every((id) => toolOutputIds.has(id));
if (this.toolCalls.length === this.toolOutputs.length && sameIds) {
return;
}

// intersection of tool call ids and tool output ids
const validCallIds = intersection(toolCallIds, toolOutputIds);
Expand Down
1 change: 1 addition & 0 deletions agents/src/llm/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface RealtimeCapabilities {
userTranscription: boolean;
autoToolReplyGeneration: boolean;
audioOutput: boolean;
manualFunctionCalls: boolean;
}

export interface InputTranscriptionCompleted {
Expand Down
23 changes: 17 additions & 6 deletions agents/src/stream/deferred_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,17 @@ export class DeferredReadableStream<T> {
throw new Error('Stream source already set');
}

this.sourceReader = source.getReader();
this.pump();
const sourceReader = source.getReader();
this.sourceReader = sourceReader;
void this.pump(sourceReader);
}

private async pump() {
private async pump(sourceReader: ReadableStreamDefaultReader<T>) {
let sourceError: unknown;

try {
while (true) {
const { done, value } = await this.sourceReader!.read();
const { done, value } = await sourceReader.read();
if (done) break;
await this.writer.write(value);
}
Expand All @@ -81,7 +82,7 @@ export class DeferredReadableStream<T> {
// any other error from source will be propagated to the consumer
if (sourceError) {
try {
this.writer.abort(sourceError);
await this.writer.abort(sourceError);
} catch (e) {
// ignore if writer is already closed
}
Expand Down Expand Up @@ -118,10 +119,20 @@ export class DeferredReadableStream<T> {
return;
}

const sourceReader = this.sourceReader!;
// Clear source first so future setSource() calls can reattach cleanly.
this.sourceReader = undefined;

// release lock will make any pending read() throw TypeError
// which are expected, and we intentionally catch those error
// using isStreamReaderReleaseError
// this will unblock any pending read() inside the async for loop
this.sourceReader!.releaseLock();
try {
sourceReader.releaseLock();
} catch (e) {
if (!isStreamReaderReleaseError(e)) {
throw e;
}
}
}
}
87 changes: 87 additions & 0 deletions agents/src/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,93 @@ describe('utils', () => {
expect((error as Error).name).toBe('TypeError');
}
});

it('should return undefined for Task.current outside task context', () => {
expect(Task.current()).toBeUndefined();
});

it('should preserve Task.current inside a task across awaits', async () => {
const task = Task.from(
async () => {
const currentAtStart = Task.current();
await delay(5);
const currentAfterAwait = Task.current();

expect(currentAtStart).toBeDefined();
expect(currentAfterAwait).toBe(currentAtStart);

return currentAtStart;
},
undefined,
'current-context-test',
);

const currentFromResult = await task.result;
expect(currentFromResult).toBe(task);
});

it('should isolate nested Task.current context and restore parent context', async () => {
const parentTask = Task.from(
async (controller) => {
const parentCurrent = Task.current();
expect(parentCurrent).toBeDefined();

const childTask = Task.from(
async () => {
const childCurrentStart = Task.current();
await delay(5);
const childCurrentAfterAwait = Task.current();

expect(childCurrentStart).toBeDefined();
expect(childCurrentAfterAwait).toBe(childCurrentStart);
expect(childCurrentStart).not.toBe(parentCurrent);

return childCurrentStart;
},
controller,
'child-current-context-test',
);

const childCurrent = await childTask.result;
const parentCurrentAfterChild = Task.current();

expect(parentCurrentAfterChild).toBe(parentCurrent);

return { parentCurrent, childCurrent };
},
undefined,
'parent-current-context-test',
);

const { parentCurrent, childCurrent } = await parentTask.result;
expect(parentCurrent).toBe(parentTask);
expect(childCurrent).not.toBe(parentCurrent);
expect(Task.current()).toBeUndefined();
});

it('should always expose Task.current for concurrent task callbacks', async () => {
const tasks = Array.from({ length: 25 }, (_, idx) =>
Task.from(
async () => {
const currentAtStart = Task.current();
await delay(1);
const currentAfterAwait = Task.current();

expect(currentAtStart).toBeDefined();
expect(currentAfterAwait).toBe(currentAtStart);

return currentAtStart;
},
undefined,
`current-context-stress-${idx}`,
),
);

const currentTasks = await Promise.all(tasks.map((task) => task.result));
currentTasks.forEach((currentTask, idx) => {
expect(currentTask).toBe(tasks[idx]);
});
});
});

describe('Event', () => {
Expand Down
Loading