diff --git a/.changeset/lucky-grapes-care.md b/.changeset/lucky-grapes-care.md new file mode 100644 index 000000000..d14ff4227 --- /dev/null +++ b/.changeset/lucky-grapes-care.md @@ -0,0 +1,10 @@ +--- +"@livekit/agents": patch +"@livekit/agents-plugin-cartesia": patch +"@livekit/agents-plugin-deepgram": patch +"@livekit/agents-plugin-google": patch +"@livekit/agents-plugin-openai": patch +"livekit-agents-examples": patch +--- + +Add granular session models usage stats diff --git a/agents/src/inference/interruption/defaults.ts b/agents/src/inference/interruption/defaults.ts index 66f2fe85f..ffea25bbd 100644 --- a/agents/src/inference/interruption/defaults.ts +++ b/agents/src/inference/interruption/defaults.ts @@ -14,37 +14,6 @@ export const SAMPLE_RATE = 16000; export const FRAMES_PER_SECOND = 40; export const FRAME_DURATION_IN_S = 0.025; // 25ms per frame -/** Default production inference URL */ -export const DEFAULT_BASE_URL = 'https://agent-gateway.livekit.cloud/v1'; - -/** Staging inference URL */ -export const STAGING_BASE_URL = 'https://agent-gateway-staging.livekit.cloud/v1'; - -/** - * Get the default inference URL based on the environment. - * - * Priority: - * 1. LIVEKIT_INFERENCE_URL if set - * 2. If LIVEKIT_URL contains '.staging.livekit.cloud', use staging gateway - * 3. Otherwise, use production gateway - */ -export function getDefaultInferenceUrl(): string { - // Priority 1: LIVEKIT_INFERENCE_URL - const inferenceUrl = process.env.LIVEKIT_INFERENCE_URL; - if (inferenceUrl) { - return inferenceUrl; - } - - // Priority 2: Check LIVEKIT_URL for staging (exact match to Python) - const livekitUrl = process.env.LIVEKIT_URL || ''; - if (livekitUrl.includes('.staging.livekit.cloud')) { - return STAGING_BASE_URL; - } - - // Priority 3: Default to production - return DEFAULT_BASE_URL; -} - export const apiConnectDefaults: ApiConnectOptions = { maxRetries: 3, retryInterval: 2_000, diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index a5a457072..43f8eb4ff 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -4,14 +4,8 @@ import type { TypedEventEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; import { log } from '../../log.js'; -import { - DEFAULT_BASE_URL, - FRAMES_PER_SECOND, - SAMPLE_RATE, - STAGING_BASE_URL, - getDefaultInferenceUrl, - interruptionOptionDefaults, -} from './defaults.js'; +import { DEFAULT_INFERENCE_URL, STAGING_INFERENCE_URL, getDefaultInferenceUrl } from '../utils.js'; +import { FRAMES_PER_SECOND, SAMPLE_RATE, interruptionOptionDefaults } from './defaults.js'; import type { InterruptionDetectionError } from './errors.js'; import { InterruptionStreamBase } from './interruption_stream.js'; import type { InterruptionEvent, InterruptionOptions } from './types.js'; @@ -56,7 +50,8 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ let useProxy: boolean; // Use LiveKit credentials if using the inference service (production or staging) - const isInferenceUrl = lkBaseUrl === DEFAULT_BASE_URL || lkBaseUrl === STAGING_BASE_URL; + const isInferenceUrl = + lkBaseUrl === DEFAULT_INFERENCE_URL || lkBaseUrl === STAGING_INFERENCE_URL; if (isInferenceUrl) { lkApiKey = apiKey ?? process.env.LIVEKIT_INFERENCE_API_KEY ?? process.env.LIVEKIT_API_KEY ?? ''; diff --git a/agents/src/inference/llm.ts b/agents/src/inference/llm.ts index c612b1654..8c700878a 100644 --- a/agents/src/inference/llm.ts +++ b/agents/src/inference/llm.ts @@ -12,9 +12,7 @@ import { } from '../index.js'; import * as llm from '../llm/index.js'; import type { APIConnectOptions } from '../types.js'; -import { type AnyString, createAccessToken } from './utils.js'; - -const DEFAULT_BASE_URL = 'https://agent-gateway.livekit.cloud/v1'; +import { type AnyString, createAccessToken, getDefaultInferenceUrl } from './utils.js'; export type OpenAIModels = | 'openai/gpt-5.2' @@ -127,7 +125,7 @@ export class LLM extends llm.LLM { strictToolSchema = false, } = opts; - const lkBaseURL = baseURL || process.env.LIVEKIT_INFERENCE_URL || DEFAULT_BASE_URL; + const lkBaseURL = baseURL || getDefaultInferenceUrl(); const lkApiKey = apiKey || process.env.LIVEKIT_INFERENCE_API_KEY || process.env.LIVEKIT_API_KEY; if (!lkApiKey) { throw new Error('apiKey is required: pass apiKey or set LIVEKIT_API_KEY'); @@ -163,6 +161,10 @@ export class LLM extends llm.LLM { return this.opts.model; } + get provider(): string { + return 'livekit'; + } + static fromModelString(modelString: string): LLM { return new LLM({ model: modelString }); } diff --git a/agents/src/inference/stt.ts b/agents/src/inference/stt.ts index f3bf3fb13..26b947ec4 100644 --- a/agents/src/inference/stt.ts +++ b/agents/src/inference/stt.ts @@ -22,7 +22,7 @@ import { type SttTranscriptEvent, sttServerEventSchema, } from './api_protos.js'; -import { type AnyString, connectWs, createAccessToken } from './utils.js'; +import { type AnyString, connectWs, createAccessToken, getDefaultInferenceUrl } from './utils.js'; export type DeepgramModels = | 'deepgram/flux-general' @@ -97,7 +97,6 @@ export type STTEncoding = 'pcm_s16le'; const DEFAULT_ENCODING: STTEncoding = 'pcm_s16le'; const DEFAULT_SAMPLE_RATE = 16000; -const DEFAULT_BASE_URL = 'wss://agent-gateway.livekit.cloud/v1'; const DEFAULT_CANCEL_TIMEOUT = 5000; export interface InferenceSTTOptions { @@ -143,7 +142,7 @@ export class STT extends BaseSTT { modelOptions = {} as STTOptions, } = opts || {}; - const lkBaseURL = baseURL || process.env.LIVEKIT_INFERENCE_URL || DEFAULT_BASE_URL; + const lkBaseURL = baseURL || getDefaultInferenceUrl(); const lkApiKey = apiKey || process.env.LIVEKIT_INFERENCE_API_KEY || process.env.LIVEKIT_API_KEY; if (!lkApiKey) { throw new Error('apiKey is required: pass apiKey or set LIVEKIT_API_KEY'); @@ -171,6 +170,14 @@ export class STT extends BaseSTT { return 'inference.STT'; } + get model(): string { + return this.opts.model ?? 'auto'; + } + + get provider(): string { + return 'livekit'; + } + static fromModelString(modelString: string): STT { if (modelString.includes(':')) { const [model, language] = modelString.split(':') as [AnyString, STTLanguages]; diff --git a/agents/src/inference/tts.ts b/agents/src/inference/tts.ts index d9c57a76f..ee727186a 100644 --- a/agents/src/inference/tts.ts +++ b/agents/src/inference/tts.ts @@ -20,7 +20,7 @@ import { ttsClientEventSchema, ttsServerEventSchema, } from './api_protos.js'; -import { type AnyString, connectWs, createAccessToken } from './utils.js'; +import { type AnyString, connectWs, createAccessToken, getDefaultInferenceUrl } from './utils.js'; export type CartesiaModels = | 'cartesia/sonic-3' @@ -94,7 +94,6 @@ type TTSEncoding = 'pcm_s16le'; const DEFAULT_ENCODING: TTSEncoding = 'pcm_s16le'; const DEFAULT_SAMPLE_RATE = 16000; -const DEFAULT_BASE_URL = 'https://agent-gateway.livekit.cloud/v1'; const NUM_CHANNELS = 1; const DEFAULT_LANGUAGE = 'en'; @@ -145,7 +144,7 @@ export class TTS extends BaseTTS { modelOptions = {} as TTSOptions, } = opts || {}; - const lkBaseURL = baseURL || process.env.LIVEKIT_INFERENCE_URL || DEFAULT_BASE_URL; + const lkBaseURL = baseURL || getDefaultInferenceUrl(); const lkApiKey = apiKey || process.env.LIVEKIT_INFERENCE_API_KEY || process.env.LIVEKIT_API_KEY; if (!lkApiKey) { throw new Error('apiKey is required: pass apiKey or set LIVEKIT_API_KEY'); @@ -202,6 +201,14 @@ export class TTS extends BaseTTS { return 'inference.TTS'; } + get model(): string { + return this.opts.model ?? 'unknown'; + } + + get provider(): string { + return 'livekit'; + } + static fromModelString(modelString: string): TTS { if (modelString.includes(':')) { const [model, voice] = modelString.split(':') as [TTSModels, string]; diff --git a/agents/src/inference/utils.ts b/agents/src/inference/utils.ts index b3b772ef6..518ad3d3a 100644 --- a/agents/src/inference/utils.ts +++ b/agents/src/inference/utils.ts @@ -7,6 +7,34 @@ import { APIConnectionError, APIStatusError } from '../index.js'; export type AnyString = string & NonNullable; +/** Default production inference URL */ +export const DEFAULT_INFERENCE_URL = 'https://agent-gateway.livekit.cloud/v1'; + +/** Staging inference URL */ +export const STAGING_INFERENCE_URL = 'https://agent-gateway.staging.livekit.cloud/v1'; + +/** + * Get the default inference URL based on the environment. + * + * Priority: + * 1. LIVEKIT_INFERENCE_URL if set + * 2. If LIVEKIT_URL contains '.staging.livekit.cloud', use staging gateway + * 3. Otherwise, use production gateway + */ +export function getDefaultInferenceUrl(): string { + const inferenceUrl = process.env.LIVEKIT_INFERENCE_URL; + if (inferenceUrl) { + return inferenceUrl; + } + + const livekitUrl = process.env.LIVEKIT_URL || ''; + if (livekitUrl.includes('.staging.livekit.cloud')) { + return STAGING_INFERENCE_URL; + } + + return DEFAULT_INFERENCE_URL; +} + export async function createAccessToken( apiKey: string, apiSecret: string, diff --git a/agents/src/llm/llm.ts b/agents/src/llm/llm.ts index 0ab158e6b..40055bd5c 100644 --- a/agents/src/llm/llm.ts +++ b/agents/src/llm/llm.ts @@ -65,6 +65,18 @@ export abstract class LLM extends (EventEmitter as new () => TypedEmitter { } return (usage?.completionTokens || 0) / (durationMs / 1000); })(), + metadata: { + modelProvider: this.#llm.provider, + modelName: this.#llm.model, + }, }; if (this.#llmRequestSpan) { diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index bebeffcf4..c47dd7a41 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -72,6 +72,10 @@ export abstract class RealtimeModel { /** The model name/identifier used by this realtime model */ abstract get model(): string; + get provider(): string { + return 'unknown'; + } + abstract session(): RealtimeSession; abstract close(): Promise; diff --git a/agents/src/metrics/base.ts b/agents/src/metrics/base.ts index 7f6d6a0cc..3c533b949 100644 --- a/agents/src/metrics/base.ts +++ b/agents/src/metrics/base.ts @@ -2,6 +2,13 @@ // // SPDX-License-Identifier: Apache-2.0 +export type MetricsMetadata = { + /** The provider name (e.g., 'openai', 'anthropic'). */ + modelProvider?: string; + /** The model name (e.g., 'gpt-4o', 'claude-3-5-sonnet'). */ + modelName?: string; +}; + export type AgentMetrics = | STTMetrics | LLMMetrics @@ -26,6 +33,8 @@ export type LLMMetrics = { totalTokens: number; tokensPerSecond: number; speechId?: string; + /** Metadata for model provider and name tracking. */ + metadata?: MetricsMetadata; }; export type STTMetrics = { @@ -41,10 +50,16 @@ export type STTMetrics = { * The duration of the pushed audio in milliseconds. */ audioDurationMs: number; + /** Input audio tokens (for token-based billing). */ + inputTokens?: number; + /** Output text tokens (for token-based billing). */ + outputTokens?: number; /** * Whether the STT is streaming (e.g using websocket). */ streamed: boolean; + /** Metadata for model provider and name tracking. */ + metadata?: MetricsMetadata; }; export type TTSMetrics = { @@ -59,10 +74,17 @@ export type TTSMetrics = { /** Generated audio duration in milliseconds. */ audioDurationMs: number; cancelled: boolean; + /** Number of characters synthesized (for character-based billing). */ charactersCount: number; + /** Input text tokens (for token-based billing, e.g., OpenAI TTS). */ + inputTokens?: number; + /** Output audio tokens (for token-based billing, e.g., OpenAI TTS). */ + outputTokens?: number; streamed: boolean; segmentId?: string; speechId?: string; + /** Metadata for model provider and name tracking. */ + metadata?: MetricsMetadata; }; export type VADMetrics = { @@ -133,6 +155,10 @@ export type RealtimeModelMetrics = { * The duration of the response from created to done in milliseconds. */ durationMs: number; + /** + * The duration of the session connection in milliseconds (for session-based billing like xAI). + */ + sessionDurationMs?: number; /** * Time to first audio token in milliseconds. -1 if no audio token was sent. */ @@ -165,4 +191,6 @@ export type RealtimeModelMetrics = { * Details about the output tokens used in the Response. */ outputTokenDetails: RealtimeModelMetricsOutputTokenDetails; + /** Metadata for model provider and name tracking. */ + metadata?: MetricsMetadata; }; diff --git a/agents/src/metrics/index.ts b/agents/src/metrics/index.ts index f400a9638..c83a9fbff 100644 --- a/agents/src/metrics/index.ts +++ b/agents/src/metrics/index.ts @@ -6,10 +6,19 @@ export type { AgentMetrics, EOUMetrics, LLMMetrics, + MetricsMetadata, RealtimeModelMetrics, STTMetrics, TTSMetrics, VADMetrics, } from './base.js'; +export { + filterZeroValues, + ModelUsageCollector, + type LLMModelUsage, + type ModelUsage, + type STTModelUsage, + type TTSModelUsage, +} from './model_usage.js'; export { UsageCollector, type UsageSummary } from './usage_collector.js'; export { logMetrics } from './utils.js'; diff --git a/agents/src/metrics/model_usage.test.ts b/agents/src/metrics/model_usage.test.ts new file mode 100644 index 000000000..d2f983beb --- /dev/null +++ b/agents/src/metrics/model_usage.test.ts @@ -0,0 +1,545 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { beforeEach, describe, expect, it } from 'vitest'; +import type { LLMMetrics, RealtimeModelMetrics, STTMetrics, TTSMetrics } from './base.js'; +import { + type LLMModelUsage, + ModelUsageCollector, + type STTModelUsage, + type TTSModelUsage, + filterZeroValues, +} from './model_usage.js'; + +describe('model_usage', () => { + describe('filterZeroValues', () => { + it('should filter out zero values from LLMModelUsage', () => { + const usage: LLMModelUsage = { + type: 'llm_usage', + provider: 'openai', + model: 'gpt-4o', + inputTokens: 100, + inputCachedTokens: 0, + inputAudioTokens: 0, + inputCachedAudioTokens: 0, + inputTextTokens: 0, + inputCachedTextTokens: 0, + inputImageTokens: 0, + inputCachedImageTokens: 0, + outputTokens: 50, + outputAudioTokens: 0, + outputTextTokens: 0, + sessionDurationMs: 0, + }; + + const filtered = filterZeroValues(usage); + + expect(filtered.type).toBe('llm_usage'); + expect(filtered.provider).toBe('openai'); + expect(filtered.model).toBe('gpt-4o'); + expect(filtered.inputTokens).toBe(100); + expect(filtered.outputTokens).toBe(50); + // Zero values should be filtered out + expect(filtered.inputCachedTokens).toBeUndefined(); + expect(filtered.inputAudioTokens).toBeUndefined(); + expect(filtered.sessionDurationMs).toBeUndefined(); + }); + + it('should filter out zero values from TTSModelUsage', () => { + const usage: TTSModelUsage = { + type: 'tts_usage', + provider: 'elevenlabs', + model: 'eleven_turbo_v2', + inputTokens: 0, + outputTokens: 0, + charactersCount: 500, + audioDurationMs: 3000, + }; + + const filtered = filterZeroValues(usage); + + expect(filtered.type).toBe('tts_usage'); + expect(filtered.provider).toBe('elevenlabs'); + expect(filtered.charactersCount).toBe(500); + expect(filtered.audioDurationMs).toBe(3000); + expect(filtered.inputTokens).toBeUndefined(); + expect(filtered.outputTokens).toBeUndefined(); + }); + + it('should keep all values when none are zero', () => { + const usage: STTModelUsage = { + type: 'stt_usage', + provider: 'deepgram', + model: 'nova-2', + inputTokens: 10, + outputTokens: 20, + audioDurationMs: 5000, + }; + + const filtered = filterZeroValues(usage); + + expect(Object.keys(filtered)).toHaveLength(6); + expect(filtered).toEqual(usage); + }); + }); + + describe('ModelUsageCollector', () => { + let collector: ModelUsageCollector; + + beforeEach(() => { + collector = new ModelUsageCollector(); + }); + + describe('collect LLM metrics', () => { + it('should aggregate LLM metrics by provider and model', () => { + const metrics1: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 100, + ttftMs: 50, + cancelled: false, + completionTokens: 100, + promptTokens: 200, + promptCachedTokens: 50, + totalTokens: 300, + tokensPerSecond: 10, + metadata: { + modelProvider: 'openai', + modelName: 'gpt-4o', + }, + }; + + const metrics2: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req2', + timestamp: Date.now(), + durationMs: 150, + ttftMs: 60, + cancelled: false, + completionTokens: 150, + promptTokens: 300, + promptCachedTokens: 75, + totalTokens: 450, + tokensPerSecond: 12, + metadata: { + modelProvider: 'openai', + modelName: 'gpt-4o', + }, + }; + + collector.collect(metrics1); + collector.collect(metrics2); + + const usage = collector.flatten(); + expect(usage).toHaveLength(1); + + const llmUsage = usage[0] as LLMModelUsage; + expect(llmUsage.type).toBe('llm_usage'); + expect(llmUsage.provider).toBe('openai'); + expect(llmUsage.model).toBe('gpt-4o'); + expect(llmUsage.inputTokens).toBe(500); // 200 + 300 + expect(llmUsage.inputCachedTokens).toBe(125); // 50 + 75 + expect(llmUsage.outputTokens).toBe(250); // 100 + 150 + }); + + it('should separate metrics by different providers', () => { + const openaiMetrics: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 100, + ttftMs: 50, + cancelled: false, + completionTokens: 100, + promptTokens: 200, + promptCachedTokens: 0, + totalTokens: 300, + tokensPerSecond: 10, + metadata: { + modelProvider: 'openai', + modelName: 'gpt-4o', + }, + }; + + const anthropicMetrics: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req2', + timestamp: Date.now(), + durationMs: 120, + ttftMs: 55, + cancelled: false, + completionTokens: 80, + promptTokens: 150, + promptCachedTokens: 0, + totalTokens: 230, + tokensPerSecond: 8, + metadata: { + modelProvider: 'anthropic', + modelName: 'claude-3-5-sonnet', + }, + }; + + collector.collect(openaiMetrics); + collector.collect(anthropicMetrics); + + const usage = collector.flatten(); + expect(usage).toHaveLength(2); + + const openaiUsage = usage.find( + (u) => u.type === 'llm_usage' && u.provider === 'openai', + ) as LLMModelUsage; + const anthropicUsage = usage.find( + (u) => u.type === 'llm_usage' && u.provider === 'anthropic', + ) as LLMModelUsage; + + expect(openaiUsage.inputTokens).toBe(200); + expect(openaiUsage.outputTokens).toBe(100); + expect(anthropicUsage.inputTokens).toBe(150); + expect(anthropicUsage.outputTokens).toBe(80); + }); + }); + + describe('collect TTS metrics', () => { + it('should aggregate TTS metrics by provider and model', () => { + const metrics1: TTSMetrics = { + type: 'tts_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + ttfbMs: 100, + durationMs: 500, + audioDurationMs: 3000, + cancelled: false, + charactersCount: 100, + inputTokens: 10, + outputTokens: 20, + streamed: true, + metadata: { + modelProvider: 'elevenlabs', + modelName: 'eleven_turbo_v2', + }, + }; + + const metrics2: TTSMetrics = { + type: 'tts_metrics', + label: 'test', + requestId: 'req2', + timestamp: Date.now(), + ttfbMs: 120, + durationMs: 600, + audioDurationMs: 4000, + cancelled: false, + charactersCount: 200, + inputTokens: 15, + outputTokens: 25, + streamed: true, + metadata: { + modelProvider: 'elevenlabs', + modelName: 'eleven_turbo_v2', + }, + }; + + collector.collect(metrics1); + collector.collect(metrics2); + + const usage = collector.flatten(); + expect(usage).toHaveLength(1); + + const ttsUsage = usage[0] as TTSModelUsage; + expect(ttsUsage.type).toBe('tts_usage'); + expect(ttsUsage.provider).toBe('elevenlabs'); + expect(ttsUsage.model).toBe('eleven_turbo_v2'); + expect(ttsUsage.charactersCount).toBe(300); // 100 + 200 + expect(ttsUsage.audioDurationMs).toBe(7000); // 3000 + 4000 + expect(ttsUsage.inputTokens).toBe(25); // 10 + 15 + expect(ttsUsage.outputTokens).toBe(45); // 20 + 25 + }); + }); + + describe('collect STT metrics', () => { + it('should aggregate STT metrics by provider and model', () => { + const metrics1: STTMetrics = { + type: 'stt_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 0, + audioDurationMs: 5000, + inputTokens: 50, + outputTokens: 100, + streamed: true, + metadata: { + modelProvider: 'deepgram', + modelName: 'nova-2', + }, + }; + + const metrics2: STTMetrics = { + type: 'stt_metrics', + label: 'test', + requestId: 'req2', + timestamp: Date.now(), + durationMs: 0, + audioDurationMs: 3000, + inputTokens: 30, + outputTokens: 60, + streamed: true, + metadata: { + modelProvider: 'deepgram', + modelName: 'nova-2', + }, + }; + + collector.collect(metrics1); + collector.collect(metrics2); + + const usage = collector.flatten(); + expect(usage).toHaveLength(1); + + const sttUsage = usage[0] as STTModelUsage; + expect(sttUsage.type).toBe('stt_usage'); + expect(sttUsage.provider).toBe('deepgram'); + expect(sttUsage.model).toBe('nova-2'); + expect(sttUsage.audioDurationMs).toBe(8000); // 5000 + 3000 + expect(sttUsage.inputTokens).toBe(80); // 50 + 30 + expect(sttUsage.outputTokens).toBe(160); // 100 + 60 + }); + }); + + describe('collect realtime model metrics', () => { + it('should aggregate realtime model metrics with detailed token breakdown', () => { + const metrics: RealtimeModelMetrics = { + type: 'realtime_model_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 1000, + ttftMs: 100, + cancelled: false, + inputTokens: 500, + outputTokens: 300, + totalTokens: 800, + tokensPerSecond: 10, + sessionDurationMs: 5000, + inputTokenDetails: { + audioTokens: 200, + textTokens: 250, + imageTokens: 50, + cachedTokens: 100, + cachedTokensDetails: { + audioTokens: 30, + textTokens: 50, + imageTokens: 20, + }, + }, + outputTokenDetails: { + textTokens: 200, + audioTokens: 100, + imageTokens: 0, + }, + metadata: { + modelProvider: 'openai', + modelName: 'gpt-4o-realtime', + }, + }; + + collector.collect(metrics); + + const usage = collector.flatten(); + expect(usage).toHaveLength(1); + + const llmUsage = usage[0] as LLMModelUsage; + expect(llmUsage.type).toBe('llm_usage'); + expect(llmUsage.provider).toBe('openai'); + expect(llmUsage.model).toBe('gpt-4o-realtime'); + expect(llmUsage.inputTokens).toBe(500); + expect(llmUsage.inputCachedTokens).toBe(100); + expect(llmUsage.inputAudioTokens).toBe(200); + expect(llmUsage.inputCachedAudioTokens).toBe(30); + expect(llmUsage.inputTextTokens).toBe(250); + expect(llmUsage.inputCachedTextTokens).toBe(50); + expect(llmUsage.inputImageTokens).toBe(50); + expect(llmUsage.inputCachedImageTokens).toBe(20); + expect(llmUsage.outputTokens).toBe(300); + expect(llmUsage.outputTextTokens).toBe(200); + expect(llmUsage.outputAudioTokens).toBe(100); + expect(llmUsage.sessionDurationMs).toBe(5000); + }); + }); + + describe('mixed metrics collection', () => { + it('should collect and separate LLM, TTS, and STT metrics', () => { + const llmMetrics: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 100, + ttftMs: 50, + cancelled: false, + completionTokens: 100, + promptTokens: 200, + promptCachedTokens: 0, + totalTokens: 300, + tokensPerSecond: 10, + metadata: { + modelProvider: 'openai', + modelName: 'gpt-4o', + }, + }; + + const ttsMetrics: TTSMetrics = { + type: 'tts_metrics', + label: 'test', + requestId: 'req2', + timestamp: Date.now(), + ttfbMs: 100, + durationMs: 500, + audioDurationMs: 3000, + cancelled: false, + charactersCount: 100, + streamed: true, + metadata: { + modelProvider: 'elevenlabs', + modelName: 'eleven_turbo_v2', + }, + }; + + const sttMetrics: STTMetrics = { + type: 'stt_metrics', + label: 'test', + requestId: 'req3', + timestamp: Date.now(), + durationMs: 0, + audioDurationMs: 5000, + streamed: true, + metadata: { + modelProvider: 'deepgram', + modelName: 'nova-2', + }, + }; + + collector.collect(llmMetrics); + collector.collect(ttsMetrics); + collector.collect(sttMetrics); + + const usage = collector.flatten(); + expect(usage).toHaveLength(3); + + const llmUsage = usage.find((u) => u.type === 'llm_usage'); + const ttsUsage = usage.find((u) => u.type === 'tts_usage'); + const sttUsage = usage.find((u) => u.type === 'stt_usage'); + + expect(llmUsage).toBeDefined(); + expect(ttsUsage).toBeDefined(); + expect(sttUsage).toBeDefined(); + }); + }); + + describe('flatten returns copies', () => { + it('should return deep copies of usage objects', () => { + const metrics: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 100, + ttftMs: 50, + cancelled: false, + completionTokens: 100, + promptTokens: 200, + promptCachedTokens: 0, + totalTokens: 300, + tokensPerSecond: 10, + metadata: { + modelProvider: 'openai', + modelName: 'gpt-4o', + }, + }; + + collector.collect(metrics); + + const usage1 = collector.flatten(); + const usage2 = collector.flatten(); + + // Should be equal values + expect(usage1[0]).toEqual(usage2[0]); + + // But not the same object reference + expect(usage1[0]).not.toBe(usage2[0]); + + // Modifying one shouldn't affect the other + (usage1[0] as LLMModelUsage).inputTokens = 9999; + expect((usage2[0] as LLMModelUsage).inputTokens).toBe(200); + }); + }); + + describe('handles missing metadata', () => { + it('should use empty strings when metadata is missing', () => { + const metrics: LLMMetrics = { + type: 'llm_metrics', + label: 'test', + requestId: 'req1', + timestamp: Date.now(), + durationMs: 100, + ttftMs: 50, + cancelled: false, + completionTokens: 100, + promptTokens: 200, + promptCachedTokens: 0, + totalTokens: 300, + tokensPerSecond: 10, + // No metadata + }; + + collector.collect(metrics); + + const usage = collector.flatten(); + expect(usage).toHaveLength(1); + + const llmUsage = usage[0] as LLMModelUsage; + expect(llmUsage.provider).toBe(''); + expect(llmUsage.model).toBe(''); + }); + }); + + describe('ignores VAD and EOU metrics', () => { + it('should not collect VAD metrics', () => { + const vadMetrics = { + type: 'vad_metrics' as const, + label: 'test', + timestamp: Date.now(), + idleTimeMs: 100, + inferenceDurationTotalMs: 50, + inferenceCount: 10, + }; + + collector.collect(vadMetrics); + + const usage = collector.flatten(); + expect(usage).toHaveLength(0); + }); + + it('should not collect EOU metrics', () => { + const eouMetrics = { + type: 'eou_metrics' as const, + timestamp: Date.now(), + endOfUtteranceDelayMs: 100, + transcriptionDelayMs: 50, + onUserTurnCompletedDelayMs: 30, + lastSpeakingTimeMs: Date.now(), + }; + + collector.collect(eouMetrics); + + const usage = collector.flatten(); + expect(usage).toHaveLength(0); + }); + }); + }); +}); diff --git a/agents/src/metrics/model_usage.ts b/agents/src/metrics/model_usage.ts new file mode 100644 index 000000000..d90ed7123 --- /dev/null +++ b/agents/src/metrics/model_usage.ts @@ -0,0 +1,227 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { + AgentMetrics, + LLMMetrics, + RealtimeModelMetrics, + STTMetrics, + TTSMetrics, +} from './base.js'; + +export type LLMModelUsage = { + type: 'llm_usage'; + /** The provider name (e.g., 'openai', 'anthropic'). */ + provider: string; + /** The model name (e.g., 'gpt-4o', 'claude-3-5-sonnet'). */ + model: string; + /** Total input tokens. */ + inputTokens: number; + /** Input tokens served from cache. */ + inputCachedTokens: number; + /** Input audio tokens (for multimodal models). */ + inputAudioTokens: number; + /** Cached input audio tokens. */ + inputCachedAudioTokens: number; + /** Input text tokens. */ + inputTextTokens: number; + /** Cached input text tokens. */ + inputCachedTextTokens: number; + /** Input image tokens (for multimodal models). */ + inputImageTokens: number; + /** Cached input image tokens. */ + inputCachedImageTokens: number; + /** Total output tokens. */ + outputTokens: number; + /** Output audio tokens (for multimodal models). */ + outputAudioTokens: number; + /** Output text tokens. */ + outputTextTokens: number; + /** Total session connection duration in milliseconds (for session-based billing like xAI). */ + sessionDurationMs: number; +}; + +export type TTSModelUsage = { + type: 'tts_usage'; + /** The provider name (e.g., 'elevenlabs', 'cartesia'). */ + provider: string; + /** The model name (e.g., 'eleven_turbo_v2', 'sonic'). */ + model: string; + /** Input text tokens (for token-based TTS billing, e.g., OpenAI TTS). */ + inputTokens: number; + /** Output audio tokens (for token-based TTS billing, e.g., OpenAI TTS). */ + outputTokens: number; + /** Number of characters synthesized (for character-based TTS billing). */ + charactersCount: number; + /** + * Duration of generated audio in milliseconds. + */ + audioDurationMs: number; +}; + +export type STTModelUsage = { + type: 'stt_usage'; + /** The provider name (e.g., 'deepgram', 'assemblyai'). */ + provider: string; + /** The model name (e.g., 'nova-2', 'best'). */ + model: string; + /** Input audio tokens (for token-based STT billing). */ + inputTokens: number; + /** Output text tokens (for token-based STT billing). */ + outputTokens: number; + /** Duration of processed audio in milliseconds. */ + audioDurationMs: number; +}; + +export type ModelUsage = LLMModelUsage | TTSModelUsage | STTModelUsage; + +export function filterZeroValues(usage: T): Partial { + const result: Partial = {} as Partial; + for (const [key, value] of Object.entries(usage)) { + if (value !== 0 && value !== 0.0) { + (result as Record)[key] = value; + } + } + return result; +} + +export class ModelUsageCollector { + private llmUsage: Map = new Map(); + private ttsUsage: Map = new Map(); + private sttUsage: Map = new Map(); + + /** Extract provider and model from metrics metadata. */ + private extractProviderModel( + metrics: LLMMetrics | STTMetrics | TTSMetrics | RealtimeModelMetrics, + ): [string, string] { + let provider = ''; + let model = ''; + if (metrics.metadata) { + provider = metrics.metadata.modelProvider || ''; + model = metrics.metadata.modelName || ''; + } + return [provider, model]; + } + + /** Get or create an LLMModelUsage for the given provider/model combination. */ + private getLLMUsage(provider: string, model: string): LLMModelUsage { + const key = `${provider}:${model}`; + let usage = this.llmUsage.get(key); + if (!usage) { + usage = { + type: 'llm_usage', + provider, + model, + inputTokens: 0, + inputCachedTokens: 0, + inputAudioTokens: 0, + inputCachedAudioTokens: 0, + inputTextTokens: 0, + inputCachedTextTokens: 0, + inputImageTokens: 0, + inputCachedImageTokens: 0, + outputTokens: 0, + outputAudioTokens: 0, + outputTextTokens: 0, + sessionDurationMs: 0, + }; + this.llmUsage.set(key, usage); + } + return usage; + } + + /** Get or create a TTSModelUsage for the given provider/model combination. */ + private getTTSUsage(provider: string, model: string): TTSModelUsage { + const key = `${provider}:${model}`; + let usage = this.ttsUsage.get(key); + if (!usage) { + usage = { + type: 'tts_usage', + provider, + model, + inputTokens: 0, + outputTokens: 0, + charactersCount: 0, + audioDurationMs: 0, + }; + this.ttsUsage.set(key, usage); + } + return usage; + } + + /** Get or create an STTModelUsage for the given provider/model combination. */ + private getSTTUsage(provider: string, model: string): STTModelUsage { + const key = `${provider}:${model}`; + let usage = this.sttUsage.get(key); + if (!usage) { + usage = { + type: 'stt_usage', + provider, + model, + inputTokens: 0, + outputTokens: 0, + audioDurationMs: 0, + }; + this.sttUsage.set(key, usage); + } + return usage; + } + + /** Collect metrics and aggregate usage by model/provider. */ + collect(metrics: AgentMetrics): void { + if (metrics.type === 'llm_metrics') { + const [provider, model] = this.extractProviderModel(metrics); + const usage = this.getLLMUsage(provider, model); + usage.inputTokens += metrics.promptTokens; + usage.inputCachedTokens += metrics.promptCachedTokens; + usage.outputTokens += metrics.completionTokens; + } else if (metrics.type === 'realtime_model_metrics') { + const [provider, model] = this.extractProviderModel(metrics); + const usage = this.getLLMUsage(provider, model); + usage.inputTokens += metrics.inputTokens; + usage.inputCachedTokens += metrics.inputTokenDetails.cachedTokens; + + usage.inputTextTokens += metrics.inputTokenDetails.textTokens; + usage.inputCachedTextTokens += metrics.inputTokenDetails.cachedTokensDetails?.textTokens ?? 0; + usage.inputImageTokens += metrics.inputTokenDetails.imageTokens; + usage.inputCachedImageTokens += + metrics.inputTokenDetails.cachedTokensDetails?.imageTokens ?? 0; + usage.inputAudioTokens += metrics.inputTokenDetails.audioTokens; + usage.inputCachedAudioTokens += + metrics.inputTokenDetails.cachedTokensDetails?.audioTokens ?? 0; + + usage.outputTextTokens += metrics.outputTokenDetails.textTokens; + usage.outputAudioTokens += metrics.outputTokenDetails.audioTokens; + usage.outputTokens += metrics.outputTokens; + usage.sessionDurationMs += metrics.sessionDurationMs ?? 0; + } else if (metrics.type === 'tts_metrics') { + const [provider, model] = this.extractProviderModel(metrics); + const ttsUsage = this.getTTSUsage(provider, model); + ttsUsage.inputTokens += metrics.inputTokens ?? 0; + ttsUsage.outputTokens += metrics.outputTokens ?? 0; + ttsUsage.charactersCount += metrics.charactersCount; + ttsUsage.audioDurationMs += metrics.audioDurationMs; + } else if (metrics.type === 'stt_metrics') { + const [provider, model] = this.extractProviderModel(metrics); + const sttUsage = this.getSTTUsage(provider, model); + sttUsage.inputTokens += metrics.inputTokens ?? 0; + sttUsage.outputTokens += metrics.outputTokens ?? 0; + sttUsage.audioDurationMs += metrics.audioDurationMs; + } + // VAD and EOU metrics are not aggregated for usage tracking. + } + + flatten(): ModelUsage[] { + const result: ModelUsage[] = []; + for (const u of this.llmUsage.values()) { + result.push({ ...u }); + } + for (const u of this.ttsUsage.values()) { + result.push({ ...u }); + } + for (const u of this.sttUsage.values()) { + result.push({ ...u }); + } + return result; + } +} diff --git a/agents/src/metrics/usage_collector.ts b/agents/src/metrics/usage_collector.ts index c7f0e6c3d..c815c8394 100644 --- a/agents/src/metrics/usage_collector.ts +++ b/agents/src/metrics/usage_collector.ts @@ -1,8 +1,13 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { log } from '../log.js'; import type { AgentMetrics } from './base.js'; +/** + * @deprecated Use LLMModelUsage, TTSModelUsage, or STTModelUsage instead. + * These new types provide per-model/provider usage aggregation for more detailed tracking. + */ export interface UsageSummary { llmPromptTokens: number; llmPromptCachedTokens: number; @@ -11,10 +16,16 @@ export interface UsageSummary { sttAudioDurationMs: number; } +/** + * @deprecated Use ModelUsageCollector instead. + * ModelUsageCollector provides per-model/provider usage aggregation for more detailed tracking. + */ export class UsageCollector { private summary: UsageSummary; + private logger = log(); constructor() { + this.logger.warn('UsageCollector is deprecated. Use ModelUsageCollector instead.'); this.summary = { llmPromptTokens: 0, llmPromptCachedTokens: 0, diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 48c689ba2..523689d5e 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -59,6 +59,10 @@ export interface SpeechData { export interface RecognitionUsage { audioDuration: number; + /** Input audio tokens (for token-based STT billing). */ + inputTokens?: number; + /** Output text tokens (for token-based STT billing). */ + outputTokens?: number; } /** SpeechEvent is a packet of speech-to-text data. */ @@ -121,6 +125,30 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter { const startTime = process.hrtime.bigint(); @@ -134,6 +162,10 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter durationMs: 0, label: this.#stt.label, audioDurationMs: Math.round(event.recognitionUsage!.audioDuration * 1000), + inputTokens: event.recognitionUsage!.inputTokens ?? 0, + outputTokens: event.recognitionUsage!.outputTokens ?? 0, streamed: true, + metadata: { + modelProvider: this.#stt.provider, + modelName: this.#stt.model, + }, }; this.#stt.emit('metrics_collected', metrics); } diff --git a/agents/src/telemetry/trace_types.ts b/agents/src/telemetry/trace_types.ts index 7220ec03a..3a0afbd0a 100644 --- a/agents/src/telemetry/trace_types.ts +++ b/agents/src/telemetry/trace_types.ts @@ -30,6 +30,10 @@ export const ATTR_FUNCTION_TOOLS = 'lk.function_tools'; export const ATTR_RESPONSE_TEXT = 'lk.response.text'; export const ATTR_RESPONSE_FUNCTION_CALLS = 'lk.response.function_calls'; +// New latency attributes for response timing +/** Time to first token in seconds. */ +export const ATTR_RESPONSE_TTFT = 'lk.response.ttft'; + // function tool export const ATTR_FUNCTION_TOOL_NAME = 'lk.function_tool.name'; export const ATTR_FUNCTION_TOOL_ARGS = 'lk.function_tool.arguments'; @@ -41,6 +45,9 @@ export const ATTR_TTS_INPUT_TEXT = 'lk.input_text'; export const ATTR_TTS_STREAMING = 'lk.tts.streaming'; export const ATTR_TTS_LABEL = 'lk.tts.label'; +/** Time to first byte in seconds. */ +export const ATTR_RESPONSE_TTFB = 'lk.response.ttfb'; + // eou detection export const ATTR_EOU_PROBABILITY = 'lk.eou.probability'; export const ATTR_EOU_UNLIKELY_THRESHOLD = 'lk.eou.unlikely_threshold'; @@ -63,10 +70,15 @@ export const ATTR_LLM_METRICS = 'lk.llm_metrics'; export const ATTR_TTS_METRICS = 'lk.tts_metrics'; export const ATTR_REALTIME_MODEL_METRICS = 'lk.realtime_model_metrics'; +/** End-to-end latency in seconds. */ +export const ATTR_E2E_LATENCY = 'lk.e2e_latency'; + // OpenTelemetry GenAI attributes // OpenTelemetry specification: https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/ export const ATTR_GEN_AI_OPERATION_NAME = 'gen_ai.operation.name'; export const ATTR_GEN_AI_REQUEST_MODEL = 'gen_ai.request.model'; +/** The provider name (e.g., 'openai', 'anthropic'). */ +export const ATTR_GEN_AI_PROVIDER_NAME = 'gen_ai.provider.name'; export const ATTR_GEN_AI_USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens'; export const ATTR_GEN_AI_USAGE_OUTPUT_TOKENS = 'gen_ai.usage.output_tokens'; diff --git a/agents/src/telemetry/traces.ts b/agents/src/telemetry/traces.ts index 28ef4c746..6f39ba427 100644 --- a/agents/src/telemetry/traces.ts +++ b/agents/src/telemetry/traces.ts @@ -24,6 +24,7 @@ import { AccessToken } from 'livekit-server-sdk'; import fs from 'node:fs/promises'; import type { ChatContent, ChatItem } from '../llm/index.js'; import { enableOtelLogging } from '../log.js'; +import { filterZeroValues } from '../metrics/model_usage.js'; import type { SessionReport } from '../voice/report.js'; import { type SimpleLogRecord, SimpleOTLPHttpLogExporter } from './otel_http_exporter.js'; import { flushPinoLogs, initPinoCloudExporter } from './pino_otel_transport.js'; @@ -445,6 +446,8 @@ export async function uploadSessionReport(options: { 'logger.name': 'chat_history', }; + const usage = report.modelUsage?.map(filterZeroValues) || null; + logRecords.push({ body: 'session report', timestampMs: report.startedAt || report.timestamp || 0, @@ -453,6 +456,7 @@ export async function uploadSessionReport(options: { 'session.options': report.options || {}, 'session.report_timestamp': report.timestamp, agent_name: agentName, + usage, }, }); diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 19f504c52..586a94238 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -96,6 +96,30 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter; #ttsRequestSpan?: Span; + #inputTokens = 0; + #outputTokens = 0; constructor(tts: TTS, connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS) { this.#tts = tts; @@ -284,6 +310,18 @@ export abstract class SynthesizeStream } } + /** + * Set token usage for token-based TTS billing (e.g., OpenAI TTS). + * Plugins should call this method to report token usage. + */ + protected setTokenUsage({ + inputTokens = 0, + outputTokens = 0, + }: { inputTokens?: number; outputTokens?: number } = {}): void { + this.#inputTokens = inputTokens; + this.#outputTokens = outputTokens; + } + protected async monitorMetrics() { const startTime = process.hrtime.bigint(); let audioDurationMs = 0; @@ -305,12 +343,22 @@ export abstract class SynthesizeStream audioDurationMs: roundedAudioDurationMs, cancelled: this.abortController.signal.aborted, label: this.#tts.label, - streamed: false, + inputTokens: this.#inputTokens, + outputTokens: this.#outputTokens, + streamed: true, + metadata: { + modelProvider: this.#tts.provider, + modelName: this.#tts.model, + }, }; if (this.#ttsRequestSpan) { this.#ttsRequestSpan.setAttribute(traceTypes.ATTR_TTS_METRICS, JSON.stringify(metrics)); } this.#tts.emit('metrics_collected', metrics); + + // Reset token usage after emitting metrics for the next segment + this.#inputTokens = 0; + this.#outputTokens = 0; } }; @@ -434,6 +482,8 @@ export abstract class ChunkedStream implements AsyncIterableIterator>; +} + export interface SessionOptions { maxToolSteps: number; /** @@ -196,6 +202,8 @@ export class AgentSession< private _interruptionDetection?: InterruptionConfig['mode']; + private _usageCollector: ModelUsageCollector = new ModelUsageCollector(); + /** @internal */ _recorderIO?: RecorderIO; @@ -276,6 +284,9 @@ export class AgentSession< ): boolean { const eventData = args[0] as AgentEvent; this._recordedEvents.push(eventData); + if (event === AgentSessionEventTypes.MetricsCollected) { + this._usageCollector.collect((eventData as MetricsCollectedEvent).metrics); + } return super.emit(event, ...args); } @@ -308,6 +319,14 @@ export class AgentSession< return this._interruptionDetection; } + /** + * Returns usage summaries for this session, one per model/provider combination. + */ + get usage(): AgentSessionUsage { + // Skip zero fields for more concise usage display (matches python behavior). + return { modelUsage: this._usageCollector.flatten().map(filterZeroValues) }; + } + get useTtsAlignedTranscript(): boolean { return this.options.useTtsAlignedTranscript; } @@ -435,6 +454,8 @@ export class AgentSession< return; } + this._usageCollector = new ModelUsageCollector(); + let ctx: JobContext | undefined = undefined; try { ctx = getJobContext(); diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 41df99d33..494c4fa21 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -68,6 +68,8 @@ export interface AudioRecognitionOptions { minEndpointingDelay: number; maxEndpointingDelay: number; rootSpanContext?: Context; + sttModel?: string; + sttProvider?: string; } // TODO add ability to update stt/vad/interruption-detection @@ -81,6 +83,8 @@ export class AudioRecognition { private maxEndpointingDelay: number; private lastLanguage?: string; private rootSpanContext?: Context; + private sttModel?: string; + private sttProvider?: string; private deferredInputStream: DeferredReadableStream; private logger = log(); @@ -128,6 +132,8 @@ export class AudioRecognition { this.maxEndpointingDelay = opts.maxEndpointingDelay; this.lastLanguage = undefined; this.rootSpanContext = opts.rootSpanContext; + this.sttModel = opts.sttModel; + this.sttProvider = opts.sttProvider; this.deferredInputStream = new DeferredReadableStream(); this.interruptionDetection = opts.interruptionDetection; @@ -287,10 +293,7 @@ export class AudioRecognition { return; } - if ( - firstAlternative.endTime > 0 && - firstAlternative.endTime + this.inputStartedAt < this.ignoreUserTranscriptUntil - ) { + if (this.#alternativeEndsBeforeIgnoreWindow(firstAlternative)) { emitFromIndex = null; } else { emitFromIndex = Math.min(emitFromIndex ?? i, i); @@ -316,6 +319,22 @@ export class AudioRecognition { } } + #alternativeEndsBeforeIgnoreWindow( + alternative: NonNullable[number], + ): boolean { + if ( + this.ignoreUserTranscriptUntil === undefined || + !this.inputStartedAt || + alternative.endTime <= 0 + ) { + return false; + } + + // `SpeechData.endTime` is in seconds relative to audio start, while `inputStartedAt` and + // `ignoreUserTranscriptUntil` are epoch milliseconds. + return alternative.endTime * 1000 + this.inputStartedAt < this.ignoreUserTranscriptUntil; + } + private shouldHoldSttEvent(ev: SpeechEvent): boolean { if (!this.isInterruptionEnabled) { return false; @@ -335,10 +354,8 @@ export class AudioRecognition { const alternative = ev.alternatives[0]; if ( - this.inputStartedAt && alternative.startTime !== alternative.endTime && - alternative.endTime > 0 && - alternative.endTime + this.inputStartedAt < this.ignoreUserTranscriptUntil + this.#alternativeEndsBeforeIgnoreWindow(alternative) ) { return true; } @@ -825,6 +842,16 @@ export class AudioRecognition { context: this.rootSpanContext, startTime, }); + + if (this.sttModel) { + this.userTurnSpan.setAttribute(traceTypes.ATTR_GEN_AI_REQUEST_MODEL, this.sttModel); + } + if (this.sttProvider) { + this.userTurnSpan.setAttribute( + traceTypes.ATTR_GEN_AI_PROVIDER_NAME, + this.sttProvider, + ); + } } // Capture sample rate from the first VAD event if not already set diff --git a/agents/src/voice/generation.ts b/agents/src/voice/generation.ts index fd274a66e..450931e7d 100644 --- a/agents/src/voice/generation.ts +++ b/agents/src/voice/generation.ts @@ -45,6 +45,7 @@ export class _LLMGenerationData { generatedText: string = ''; generatedToolCalls: FunctionCall[]; id: string; + ttft?: number; constructor( public readonly textStream: ReadableStream, @@ -410,6 +411,8 @@ export function performLLMInference( toolCtx: ToolContext, modelSettings: ModelSettings, controller: AbortController, + model?: string, + provider?: string, ): [Task, _LLMGenerationData] { const textStream = new IdentityTransform(); const toolCallStream = new IdentityTransform(); @@ -425,8 +428,17 @@ export function performLLMInference( ); span.setAttribute(traceTypes.ATTR_FUNCTION_TOOLS, JSON.stringify(Object.keys(toolCtx))); + if (model) { + span.setAttribute(traceTypes.ATTR_GEN_AI_REQUEST_MODEL, model); + } + if (provider) { + span.setAttribute(traceTypes.ATTR_GEN_AI_PROVIDER_NAME, provider); + } + let llmStreamReader: ReadableStreamDefaultReader | null = null; let llmStream: ReadableStream | null = null; + const startTime = performance.now() / 1000; // Convert to seconds + let firstTokenReceived = false; try { llmStream = await node(chatCtx, toolCtx, modelSettings); @@ -449,6 +461,11 @@ export function performLLMInference( const { done, value: chunk } = result; if (done) break; + if (!firstTokenReceived) { + firstTokenReceived = true; + data.ttft = performance.now() / 1000 - startTime; + } + if (typeof chunk === 'string') { data.generatedText += chunk; await textWriter.write(chunk); @@ -487,6 +504,9 @@ export function performLLMInference( } span.setAttribute(traceTypes.ATTR_RESPONSE_TEXT, data.generatedText); + if (data.ttft !== undefined) { + span.setAttribute(traceTypes.ATTR_RESPONSE_TTFT, data.ttft); + } } catch (error) { if (error instanceof DOMException && error.name === 'AbortError') { // Abort signal was triggered, handle gracefully @@ -521,6 +541,8 @@ export function performTTSInference( text: ReadableStream, modelSettings: ModelSettings, controller: AbortController, + model?: string, + provider?: string, ): [Task, _TTSGenerationData] { const audioStream = new IdentityTransform(); const outputWriter = audioStream.writable.getWriter(); @@ -552,10 +574,21 @@ export function performTTSInference( } })(); - const _performTTSInferenceImpl = async (signal: AbortSignal) => { + let ttfb: number | undefined; + + const _performTTSInferenceImpl = async (signal: AbortSignal, span: Span) => { + if (model) { + span.setAttribute(traceTypes.ATTR_GEN_AI_REQUEST_MODEL, model); + } + if (provider) { + span.setAttribute(traceTypes.ATTR_GEN_AI_PROVIDER_NAME, provider); + } + let ttsStreamReader: ReadableStreamDefaultReader | null = null; let ttsStream: ReadableStream | null = null; let pushedDuration = 0; + const startTime = performance.now() / 1000; // Convert to seconds + let firstByteReceived = false; try { ttsStream = await node(textOnlyStream.readable, modelSettings); @@ -589,6 +622,12 @@ export function performTTSInference( break; } + if (!firstByteReceived) { + firstByteReceived = true; + ttfb = performance.now() / 1000 - startTime; + span.setAttribute(traceTypes.ATTR_RESPONSE_TTFB, ttfb); + } + // Write the audio frame to the output stream await outputWriter.write(frame); @@ -625,6 +664,10 @@ export function performTTSInference( } throw error; } finally { + if (!timedTextsFut.done) { + // Ensure downstream consumers don't hang on errors. + timedTextsFut.resolve(null); + } ttsStreamReader?.releaseLock(); await ttsStream?.cancel(); await outputWriter.close(); @@ -636,7 +679,7 @@ export function performTTSInference( const currentContext = otelContext.active(); const inferenceTask = async (signal: AbortSignal) => - tracer.startActiveSpan(async () => _performTTSInferenceImpl(signal), { + tracer.startActiveSpan(async (span) => _performTTSInferenceImpl(signal, span), { name: 'tts_node', context: currentContext, }); @@ -644,6 +687,7 @@ export function performTTSInference( const genData: _TTSGenerationData = { audioStream: audioOutputStream, timedTextsFut, + ttfb, }; return [ @@ -713,7 +757,6 @@ export function performTextForwarding( export interface _AudioOut { audio: Array; - /** Future that will be set with the timestamp of the first frame's capture */ firstFrameFut: Future; } @@ -801,7 +844,6 @@ export function performAudioForwarding( ]; } -// function_tool span is already implemented in tracableToolExecution below (line ~796) export function performToolExecutions({ session, speechHandle, diff --git a/agents/src/voice/report.ts b/agents/src/voice/report.ts index 49701a696..b18c1e795 100644 --- a/agents/src/voice/report.ts +++ b/agents/src/voice/report.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { ChatContext } from '../llm/chat_context.js'; +import { type ModelUsage, filterZeroValues } from '../metrics/model_usage.js'; import type { VoiceOptions } from './agent_session.js'; import type { AgentEvent } from './events.js'; @@ -23,6 +24,8 @@ export interface SessionReport { audioRecordingStartedAt?: number; /** Duration of the session in milliseconds */ duration?: number; + /** Usage summaries for the session, one per model/provider combination */ + modelUsage?: ModelUsage[]; } export interface SessionReportOptions { @@ -41,6 +44,8 @@ export interface SessionReportOptions { audioRecordingPath?: string; /** Timestamp when the audio recording started (milliseconds) */ audioRecordingStartedAt?: number; + /** Usage summaries for the session, one per model/provider combination */ + modelUsage?: ModelUsage[]; } export function createSessionReport(opts: SessionReportOptions): SessionReport { @@ -61,6 +66,7 @@ export function createSessionReport(opts: SessionReportOptions): SessionReport { audioRecordingStartedAt, duration: audioRecordingStartedAt !== undefined ? timestamp - audioRecordingStartedAt : undefined, + modelUsage: opts.modelUsage, }; } @@ -96,5 +102,6 @@ export function sessionReportToJSON(report: SessionReport): Record { metrics.logMetrics(ev.metrics); - usageCollector.collect(ev.metrics); + }); + + // Log usage summary when job shuts down + ctx.addShutdownCallback(async () => { + logger.info( + { + usage: session.usage, + }, + 'Session usage summary', + ); }); session.on(voice.AgentSessionEventTypes.UserInterruptionDetected, (ev) => { diff --git a/examples/src/bey_avatar.ts b/examples/src/bey_avatar.ts index f8eb1f3d1..5cad7655c 100644 --- a/examples/src/bey_avatar.ts +++ b/examples/src/bey_avatar.ts @@ -1,7 +1,15 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { type JobContext, WorkerOptions, cli, defineAgent, metrics, voice } from '@livekit/agents'; +import { + type JobContext, + WorkerOptions, + cli, + defineAgent, + log, + metrics, + voice, +} from '@livekit/agents'; import * as bey from '@livekit/agents-plugin-bey'; import * as openai from '@livekit/agents-plugin-openai'; import { fileURLToPath } from 'node:url'; @@ -12,6 +20,7 @@ export default defineAgent({ instructions: 'You are a helpful assistant. Speak clearly and concisely.', }); + const logger = log(); const session = new voice.AgentSession({ llm: new openai.realtime.RealtimeModel({ voice: 'alloy', @@ -32,11 +41,19 @@ export default defineAgent({ }); await avatar.start(session, ctx.room); - const usageCollector = new metrics.UsageCollector(); - + // Log metrics as they are emitted (session.usage is automatically collected) session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { metrics.logMetrics(ev.metrics); - usageCollector.collect(ev.metrics); + }); + + // Log usage summary when job shuts down + ctx.addShutdownCallback(async () => { + logger.info( + { + usage: session.usage, + }, + 'Session usage summary', + ); }); session.generateReply({ diff --git a/examples/src/cartersia_tts.ts b/examples/src/cartersia_tts.ts index a11aae33a..4d40b7334 100644 --- a/examples/src/cartersia_tts.ts +++ b/examples/src/cartersia_tts.ts @@ -7,6 +7,7 @@ import { WorkerOptions, cli, defineAgent, + log, metrics, voice, } from '@livekit/agents'; @@ -28,6 +29,7 @@ export default defineAgent({ "You are a helpful assistant, you can hear the user's message and respond to it.", }); + const logger = log(); const vad = ctx.proc.userData.vad! as silero.VAD; const session = new voice.AgentSession({ @@ -40,11 +42,19 @@ export default defineAgent({ turnDetection: new livekit.turnDetector.MultilingualModel(), }); - const usageCollector = new metrics.UsageCollector(); - + // Log metrics as they are emitted (session.usage is automatically collected) session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { metrics.logMetrics(ev.metrics); - usageCollector.collect(ev.metrics); + }); + + // Log usage summary when job shuts down + ctx.addShutdownCallback(async () => { + logger.info( + { + usage: session.usage, + }, + 'Session usage summary', + ); }); await session.start({ diff --git a/examples/src/comprehensive_test.ts b/examples/src/comprehensive_test.ts index b6d08d6cd..bac9910cc 100644 --- a/examples/src/comprehensive_test.ts +++ b/examples/src/comprehensive_test.ts @@ -8,6 +8,7 @@ import { cli, defineAgent, llm, + log, metrics, voice, } from '@livekit/agents'; @@ -238,6 +239,7 @@ export default defineAgent({ proc.userData.vad = await silero.VAD.load(); }, entry: async (ctx: JobContext) => { + const logger = log(); const vad = ctx.proc.userData.vad! as silero.VAD; const session = new voice.AgentSession({ vad, @@ -249,11 +251,19 @@ export default defineAgent({ testedRealtimeLlmChoices: new Set(), }, }); - const usageCollector = new metrics.UsageCollector(); - + // Log metrics as they are emitted (session.usage is automatically collected) session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { metrics.logMetrics(ev.metrics); - usageCollector.collect(ev.metrics); + }); + + // Log usage summary when job shuts down + ctx.addShutdownCallback(async () => { + logger.info( + { + usage: session.usage, + }, + 'Session usage summary', + ); }); await session.start({ diff --git a/examples/src/hedra/hedra_avatar.ts b/examples/src/hedra/hedra_avatar.ts index b1211c453..af10cc478 100644 --- a/examples/src/hedra/hedra_avatar.ts +++ b/examples/src/hedra/hedra_avatar.ts @@ -8,6 +8,7 @@ import { cli, defineAgent, inference, + log, metrics, voice, } from '@livekit/agents'; @@ -30,6 +31,7 @@ export default defineAgent({ instructions: 'You are a helpful assistant. Speak clearly and concisely.', }); + const logger = log(); const session = new voice.AgentSession({ stt: new inference.STT({ model: 'deepgram/nova-3', @@ -64,11 +66,19 @@ export default defineAgent({ }); await avatar.start(session, ctx.room); - const usageCollector = new metrics.UsageCollector(); - + // Log metrics as they are emitted (session.usage is automatically collected) session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { metrics.logMetrics(ev.metrics); - usageCollector.collect(ev.metrics); + }); + + // Log usage summary when job shuts down + ctx.addShutdownCallback(async () => { + logger.info( + { + usage: session.usage, + }, + 'Session usage summary', + ); }); session.generateReply({ diff --git a/examples/src/inworld_tts.ts b/examples/src/inworld_tts.ts index fec5c552d..45bb6961c 100644 --- a/examples/src/inworld_tts.ts +++ b/examples/src/inworld_tts.ts @@ -7,6 +7,7 @@ import { WorkerOptions, cli, defineAgent, + log, metrics, voice, } from '@livekit/agents'; @@ -26,6 +27,7 @@ export default defineAgent({ "You are a helpful assistant, you can hear the user's message and respond to it in 1-2 short sentences.", }); + const logger = log(); // Create TTS instance const tts = new inworld.TTS({ timestampType: 'WORD', @@ -96,11 +98,19 @@ export default defineAgent({ } }); - const usageCollector = new metrics.UsageCollector(); - + // Log metrics as they are emitted (session.usage is automatically collected) session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { metrics.logMetrics(ev.metrics); - usageCollector.collect(ev.metrics); + }); + + // Log usage summary when job shuts down + ctx.addShutdownCallback(async () => { + logger.info( + { + usage: session.usage, + }, + 'Session usage summary', + ); }); await session.start({ diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index 944bbcb3f..f54f6e60e 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -78,6 +78,14 @@ export class TTS extends tts.TTS { #opts: TTSOptions; label = 'cartesia.TTS'; + get model(): string { + return this.#opts.model; + } + + get provider(): string { + return 'Cartesia'; + } + constructor(opts: Partial = {}) { const resolvedOpts = { ...defaultTTSOptions, diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 805015ec4..f2f232010 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -70,6 +70,14 @@ export class STT extends stt.STT { label = 'deepgram.STT'; private abortController = new AbortController(); + get model(): string { + return this.#opts.model; + } + + get provider(): string { + return 'Deepgram'; + } + constructor(opts: Partial = defaultSTTOptions) { super({ streaming: true, diff --git a/plugins/deepgram/src/tts.ts b/plugins/deepgram/src/tts.ts index 5e9aceb30..6c6c2ff98 100644 --- a/plugins/deepgram/src/tts.ts +++ b/plugins/deepgram/src/tts.ts @@ -46,6 +46,14 @@ export class TTS extends tts.TTS { private opts: TTSOptions; label = 'deepgram.TTS'; + get model(): string { + return this.opts.model; + } + + get provider(): string { + return 'Deepgram'; + } + constructor(opts: Partial = {}) { super(opts.sampleRate || defaultTTSOptions.sampleRate, NUM_CHANNELS, { streaming: opts.capabilities?.streaming ?? defaultTTSOptions.capabilities.streaming, diff --git a/plugins/google/src/llm.ts b/plugins/google/src/llm.ts index 990a953b7..7355a17ad 100644 --- a/plugins/google/src/llm.ts +++ b/plugins/google/src/llm.ts @@ -51,6 +51,13 @@ export class LLM extends llm.LLM { return this.#opts.model; } + get provider(): string { + if (this.#opts.vertexai) { + return 'Vertex AI'; + } + return 'Gemini'; + } + /** * Create a new instance of Google GenAI LLM. * diff --git a/plugins/openai/src/llm.ts b/plugins/openai/src/llm.ts index 278350d6e..fd7153a8a 100644 --- a/plugins/openai/src/llm.ts +++ b/plugins/openai/src/llm.ts @@ -86,6 +86,15 @@ export class LLM extends llm.LLM { return this.#opts.model; } + get provider(): string { + try { + const url = new URL(this.#client.baseURL); + return url.host; + } catch { + return 'api.openai.com'; + } + } + /** * Create a new instance of OpenAI LLM with Azure. * diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index 51b28afed..f23db0672 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -144,6 +144,15 @@ export class RealtimeModel extends llm.RealtimeModel { return this._options.model; } + get provider(): string { + try { + const url = new URL(this._options.baseURL); + return url.host; + } catch { + return 'api.openai.com'; + } + } + constructor( options: { model?: string; diff --git a/plugins/openai/src/stt.ts b/plugins/openai/src/stt.ts index ef2f32aea..2933c62fd 100644 --- a/plugins/openai/src/stt.ts +++ b/plugins/openai/src/stt.ts @@ -28,6 +28,19 @@ export class STT extends stt.STT { #client: OpenAI; label = 'openai.STT'; + get model(): string { + return this.#opts.model; + } + + get provider(): string { + try { + const url = new URL(this.#client.baseURL); + return url.host; + } catch { + return 'api.openai.com'; + } + } + /** * Create a new instance of OpenAI STT. * diff --git a/plugins/openai/src/tts.ts b/plugins/openai/src/tts.ts index 9e0f235af..ce0e82836 100644 --- a/plugins/openai/src/tts.ts +++ b/plugins/openai/src/tts.ts @@ -32,6 +32,19 @@ export class TTS extends tts.TTS { label = 'openai.TTS'; private abortController = new AbortController(); + get model(): string { + return this.#opts.model; + } + + get provider(): string { + try { + const url = new URL(this.#client.baseURL); + return url.host; + } catch { + return 'api.openai.com'; + } + } + /** * Create a new instance of OpenAI TTS. *