diff --git a/core/src/agents/base_agent.ts b/core/src/agents/base_agent.ts index e99ccd1..441b0ae 100644 --- a/core/src/agents/base_agent.ts +++ b/core/src/agents/base_agent.ts @@ -5,12 +5,13 @@ */ import {Content} from '@google/genai'; -import {trace} from '@opentelemetry/api'; +import {context, trace} from '@opentelemetry/api'; import {createEvent, Event} from '../events/event.js'; import {CallbackContext} from './callback_context.js'; import {InvocationContext} from './invocation_context.js'; +import {runAsyncGeneratorWithOtelContext, traceAgentInvocation, tracer} from '../telemetry/tracing.js'; type SingleAgentCallback = (context: CallbackContext) => Promise|(Content|undefined); @@ -124,34 +125,37 @@ export abstract class BaseAgent { async * runAsync(parentContext: InvocationContext): AsyncGenerator { - const span = trace.getTracer('gcp.vertex.agent') - .startSpan(`agent_run [${this.name}]`); + const span = tracer.startSpan(`invoke_agent ${this.name}`); + const ctx = trace.setSpan(context.active(), span); try { - const context = this.createInvocationContext(parentContext); - - const beforeAgentCallbackEvent = - await this.handleBeforeAgentCallback(context); - if (beforeAgentCallbackEvent) { - yield beforeAgentCallbackEvent; - } - - if (context.endInvocation) { - return; - } - - for await (const event of this.runAsyncImpl(context)) { - yield event; - } - - if (context.endInvocation) { - return; - } - - const afterAgentCallbackEvent = - await this.handleAfterAgentCallback(context); - if (afterAgentCallbackEvent) { - yield afterAgentCallbackEvent; - } + yield* runAsyncGeneratorWithOtelContext(ctx, this, async function* () { + const context = this.createInvocationContext(parentContext); + + const beforeAgentCallbackEvent = + await this.handleBeforeAgentCallback(context); + if (beforeAgentCallbackEvent) { + yield beforeAgentCallbackEvent; + } + + if (context.endInvocation) { + return; + } + + traceAgentInvocation({agent: this, invocationContext: context}); + for await (const event of this.runAsyncImpl(context)) { + yield event; + } + + if (context.endInvocation) { + return; + } + + const afterAgentCallbackEvent = + await this.handleAfterAgentCallback(context); + if (afterAgentCallbackEvent) { + yield afterAgentCallbackEvent; + } + }); } finally { span.end(); } @@ -167,10 +171,12 @@ export abstract class BaseAgent { async * runLive(parentContext: InvocationContext): AsyncGenerator { - const span = trace.getTracer('gcp.vertex.agent') - .startSpan(`agent_run [${this.name}]`); + const span = tracer.startSpan(`invoke_agent ${this.name}`); + const ctx = trace.setSpan(context.active(), span); try { - // TODO(b/425992518): Implement live mode. + yield* runAsyncGeneratorWithOtelContext(ctx, this, async function* () { + // TODO(b/425992518): Implement live mode. + }); throw new Error('Live mode is not implemented yet.'); } finally { span.end(); diff --git a/core/src/agents/functions.ts b/core/src/agents/functions.ts index 4a8376e..606d6f2 100644 --- a/core/src/agents/functions.ts +++ b/core/src/agents/functions.ts @@ -17,6 +17,7 @@ import {randomUUID} from '../utils/env_aware_utils.js'; import {logger} from '../utils/logger.js'; import {SingleAfterToolCallback, SingleBeforeToolCallback} from './llm_agent.js'; +import {traceMergedToolCalls, tracer, traceToolCall} from '../telemetry/tracing.js'; const AF_FUNCTION_CALL_ID_PREFIX = 'adk-'; export const REQUEST_EUC_FUNCTION_CALL_NAME = 'adk_request_credential'; @@ -191,11 +192,53 @@ async function callToolAsync( args: Record, toolContext: ToolContext, ): Promise { - // TODO - b/436079721: implement [tracer.start_as_current_span] - logger.debug(`callToolAsync ${tool.name}`); - return await tool.runAsync({args, toolContext}); + const span = tracer.startSpan(`execute_tool ${tool.name}`); + try { + logger.debug(`callToolAsync ${tool.name}`); + const result = await tool.runAsync({args, toolContext}); + traceToolCall({ + tool, + args, + functionResponseEvent: buildResponseEvent(tool, result, toolContext, toolContext.invocationContext) + }) + return result; + } finally { + span.end(); + } } +function buildResponseEvent( + tool: BaseTool, + functionResult: any, + toolContext: ToolContext, + invocationContext: InvocationContext, + ): Event { + let responseResult = functionResult; + if (typeof functionResult !== 'object' || functionResult == null) { + responseResult = {result: functionResult}; + } + + const partFunctionResponse: Part = { + functionResponse: { + name: tool.name, + response: responseResult, + id: toolContext.functionCallId, + }, + }; + + const content: Content = { + role: 'user', + parts: [partFunctionResponse], + }; + + return createEvent({ + invocationId: invocationContext.invocationId, + author: invocationContext.agent.name, + content: content, + actions: toolContext.actions, + branch: invocationContext.branch, + }); +} /** * Handles function calls. * Runtime behavior to pay attention to: @@ -426,12 +469,18 @@ export async function handleFunctionCallList({ if (functionResponseEvents.length > 1) { // TODO - b/436079721: implement [tracer.start_as_current_span] - logger.debug('execute_tool (merged)'); - // TODO - b/436079721: implement [traceMergedToolCalls] - logger.debug('traceMergedToolCalls', { - responseEventId: mergedEvent.id, - functionResponseEvent: mergedEvent.id, - }); + const span = tracer.startSpan('execute_tool (merged)'); + try { + logger.debug('execute_tool (merged)'); + // TODO - b/436079721: implement [traceMergedToolCalls] + logger.debug('traceMergedToolCalls', { + responseEventId: mergedEvent.id, + functionResponseEvent: mergedEvent.id, + }); + traceMergedToolCalls({responseEventId: mergedEvent.id, functionResponseEvent: mergedEvent}); + } finally { + span.end(); + } } return mergedEvent; } diff --git a/core/src/agents/llm_agent.ts b/core/src/agents/llm_agent.ts index f237f15..640bb77 100644 --- a/core/src/agents/llm_agent.ts +++ b/core/src/agents/llm_agent.ts @@ -5,6 +5,7 @@ */ import {FunctionCall, GenerateContentConfig, Schema} from '@google/genai'; +import {context, trace} from '@opentelemetry/api'; import {z} from 'zod'; import {createEvent, createNewEventId, Event, getFunctionCalls, getFunctionResponses, isFinalResponse} from '../events/event.js'; @@ -30,6 +31,7 @@ import {injectSessionState} from './instructions.js'; import {InvocationContext} from './invocation_context.js'; import {ReadonlyContext} from './readonly_context.js'; import {StreamingMode} from './run_config.js'; +import {runAsyncGeneratorWithOtelContext, traceCallLlm, tracer} from '../telemetry/tracing.js'; /** An object that can provide an instruction string. */ export type InstructionProvider = ( @@ -1055,7 +1057,10 @@ export class LlmAgent extends BaseAgent { author: this.name, branch: invocationContext.branch, }); - for await (const llmResponse of this.callLlmAsync( + const span = tracer.startSpan('call_llm'); + const ctx = trace.setSpan(context.active(), span); + yield* runAsyncGeneratorWithOtelContext(ctx, this, async function* () { + for await (const llmResponse of this.callLlmAsync( invocationContext, llmRequest, modelResponseEvent)) { // ====================================================================== // Postprocess after calling the LLM @@ -1066,8 +1071,10 @@ export class LlmAgent extends BaseAgent { modelResponseEvent.id = createNewEventId(); modelResponseEvent.timestamp = new Date().getTime(); yield event; + } } - } + }); + span.end(); } private async * @@ -1217,7 +1224,6 @@ export class LlmAgent extends BaseAgent { // Calls the LLM. const llm = this.canonicalModel; - // TODO - b/436079721: Add tracer.start_as_current_span('call_llm') if (invocationContext.runConfig?.supportCfc) { // TODO - b/425992518: Implement CFC call path // This is a hack, underneath it calls runLive. Which makes @@ -1234,8 +1240,12 @@ export class LlmAgent extends BaseAgent { for await (const llmResponse of this.runAndHandleError( responsesGenerator, invocationContext, llmRequest, modelResponseEvent)) { - // TODO - b/436079721: Add trace_call_llm - + traceCallLlm({ + invocationContext, + eventId: modelResponseEvent.id, + llmRequest, + llmResponse, + }); // Runs after_model_callback if it exists. const alteredLlmResponse = await this.handleAfterModelCallback( invocationContext, llmResponse, modelResponseEvent); diff --git a/core/src/runner/runner.ts b/core/src/runner/runner.ts index 8d09538..8afc826 100644 --- a/core/src/runner/runner.ts +++ b/core/src/runner/runner.ts @@ -5,7 +5,7 @@ */ import {Content, createPartFromText} from '@google/genai'; -import {trace} from '@opentelemetry/api'; +import {context, trace} from '@opentelemetry/api'; import {BaseAgent} from '../agents/base_agent.js'; import {InvocationContext, newInvocationContextId} from '../agents/invocation_context.js'; @@ -21,6 +21,7 @@ import {PluginManager} from '../plugins/plugin_manager.js'; import {BaseSessionService} from '../sessions/base_session_service.js'; import {Session} from '../sessions/session.js'; import {logger} from '../utils/logger.js'; +import {runAsyncGeneratorWithOtelContext, tracer} from '../telemetry/tracing.js'; // TODO - b/425992518: Implement BuiltInCodeExecutor @@ -80,120 +81,123 @@ export class Runner { // ========================================================================= // Setup the session and invocation context // ========================================================================= - const span = trace.getTracer('gcp.vertex.agent').startSpan('invocation'); + const span = tracer.startSpan('invocation'); + const ctx = trace.setSpan(context.active(), span); try { - const session = + yield* runAsyncGeneratorWithOtelContext(ctx, this, async function* () { + const session = await this.sessionService.getSession({appName: this.appName, userId, sessionId}); - if (!session) { - throw new Error(`Session not found: ${sessionId}`); - } + if (!session) { + throw new Error(`Session not found: ${sessionId}`); + } - if (runConfig.supportCfc && this.agent instanceof LlmAgent) { - const modelName = this.agent.canonicalModel.model; - if (!modelName.startsWith('gemini-2')) { + if (runConfig.supportCfc && this.agent instanceof LlmAgent) { + const modelName = this.agent.canonicalModel.model; + if (!modelName.startsWith('gemini-2')) { throw new Error(`CFC is not supported for model: ${ modelName} in agent: ${this.agent.name}`); + } + // TODO - b/425992518: Add code executor support } - // TODO - b/425992518: Add code executor support - } - const invocationContext = new InvocationContext({ - artifactService: this.artifactService, - sessionService: this.sessionService, - memoryService: this.memoryService, - credentialService: this.credentialService, - invocationId: newInvocationContextId(), - agent: this.agent, - session, - userContent: newMessage, - runConfig, - pluginManager: this.pluginManager, - }); + const invocationContext = new InvocationContext({ + artifactService: this.artifactService, + sessionService: this.sessionService, + memoryService: this.memoryService, + credentialService: this.credentialService, + invocationId: newInvocationContextId(), + agent: this.agent, + session, + userContent: newMessage, + runConfig, + pluginManager: this.pluginManager, + }); - // ========================================================================= - // Preprocess plugins on user message - // ========================================================================= - const pluginUserMessage = + // ========================================================================= + // Preprocess plugins on user message + // ========================================================================= + const pluginUserMessage = await this.pluginManager.runOnUserMessageCallback({ userMessage: newMessage, invocationContext, }); - if (pluginUserMessage) { - newMessage = pluginUserMessage as Content; - } - - // ========================================================================= - // Append user message to session - // ========================================================================= - if (newMessage) { - if (!newMessage.parts?.length) { - throw new Error('No parts in the new_message.'); + if (pluginUserMessage) { + newMessage = pluginUserMessage as Content; } - // Directly saves the artifacts (if applicable) in the user message and - // replaces the artifact data with a file name placeholder. - // TODO - b/425992518: fix Runner<>>ArtifactService leaky abstraction. - if (runConfig.saveInputBlobsAsArtifacts) { - await this.saveArtifacts( + // ========================================================================= + // Append user message to session + // ========================================================================= + if (newMessage) { + if (!newMessage.parts?.length) { + throw new Error('No parts in the new_message.'); + } + + // Directly saves the artifacts (if applicable) in the user message and + // replaces the artifact data with a file name placeholder. + // TODO - b/425992518: fix Runner<>>ArtifactService leaky abstraction. + if (runConfig.saveInputBlobsAsArtifacts) { + await this.saveArtifacts( invocationContext.invocationId, session.userId, session.id, newMessage); + } + // Append the user message to the session with optional state delta. + await this.sessionService.appendEvent({ + session, + event: createEvent({ + invocationId: invocationContext.invocationId, + author: 'user', + actions: stateDelta ? createEventActions({stateDelta}) : undefined, + content: newMessage, + }), + }); } - // Append the user message to the session with optional state delta. - await this.sessionService.appendEvent({ - session, - event: createEvent({ - invocationId: invocationContext.invocationId, - author: 'user', - actions: stateDelta ? createEventActions({stateDelta}) : undefined, - content: newMessage, - }), - }); - } - // ========================================================================= - // Determine which agent should handle the workflow resumption. - // ========================================================================= - invocationContext.agent = + // ========================================================================= + // Determine which agent should handle the workflow resumption. + // ========================================================================= + invocationContext.agent = this.determineAgentForResumption(session, this.agent); - // ========================================================================= - // Run the agent with the plugins (aka hooks to apply in the lifecycle) - // ========================================================================= - // Step 1: Run the before_run callbacks to see if we should early exit. - const beforeRunCallbackResponse = + // ========================================================================= + // Run the agent with the plugins (aka hooks to apply in the lifecycle) + // ========================================================================= + // Step 1: Run the before_run callbacks to see if we should early exit. + const beforeRunCallbackResponse = await this.pluginManager.runBeforeRunCallback({invocationContext}); - if (beforeRunCallbackResponse) { - const earlyExitEvent = createEvent({ - invocationId: invocationContext.invocationId, - author: 'model', - content: beforeRunCallbackResponse, - }); - // TODO: b/447446338 - In the future, do *not* save live call audio - // content to session This is a feature in Python ADK + if (beforeRunCallbackResponse) { + const earlyExitEvent = createEvent({ + invocationId: invocationContext.invocationId, + author: 'model', + content: beforeRunCallbackResponse, + }); + // TODO: b/447446338 - In the future, do *not* save live call audio + // content to session This is a feature in Python ADK await this.sessionService.appendEvent({session, event: earlyExitEvent}); - yield earlyExitEvent; + yield earlyExitEvent; - } else { - // Step 2: Otherwise continue with normal execution - for await (const event of invocationContext.agent.runAsync( + } else { + // Step 2: Otherwise continue with normal execution + for await (const event of invocationContext.agent.runAsync( invocationContext)) { - if (!event.partial) { + if (!event.partial) { await this.sessionService.appendEvent({session, event}); - } - // Step 3: Run the on_event callbacks to optionally modify the event. - const modifiedEvent = await this.pluginManager.runOnEventCallback( + } + // Step 3: Run the on_event callbacks to optionally modify the event. + const modifiedEvent = await this.pluginManager.runOnEventCallback( {invocationContext, event}); - if (modifiedEvent) { - yield modifiedEvent; - } else { - yield event; + if (modifiedEvent) { + yield modifiedEvent; + } else { + yield event; + } } } - } - // Step 4: Run the after_run callbacks to optionally modify the context. - await this.pluginManager.runAfterRunCallback({invocationContext}); + // Step 4: Run the after_run callbacks to optionally modify the context. + await this.pluginManager.runAfterRunCallback({invocationContext}); + }); } finally { span.end(); } @@ -209,8 +213,8 @@ export class Runner { * @param message The message containing parts to process. */ private async saveArtifacts( - invocationId: string, userId: string, sessionId: string, - message: Content): Promise { + invocationId: string, userId: string, sessionId: string, + message: Content): Promise { if (!this.artifactService || !message.parts?.length) { return; } @@ -231,7 +235,7 @@ export class Runner { }); // TODO - b/425992518: potentially buggy if accidentally exposed to LLM. message.parts[i] = createPartFromText( - `Uploaded file: ${fileName}. It is saved into artifacts`); + `Uploaded file: ${fileName}. It is saved into artifacts`); } } @@ -242,7 +246,7 @@ export class Runner { // TODO - b/425992518: This is where LRO integration should happen. // Needs clean up before we can generalize it. private determineAgentForResumption(session: Session, rootAgent: BaseAgent): - BaseAgent { + BaseAgent { // ========================================================================= // Case 1: If the last event is a function response, this returns the // agent that made the original function call. @@ -325,8 +329,8 @@ function findEventByLastFunctionResponseId(events: Event[]): Event|null { const lastEvent = events[events.length - 1]; const functionCallId = - lastEvent.content?.parts?.find((part) => part.functionResponse) - ?.functionResponse?.id; + lastEvent.content?.parts?.find((part) => part.functionResponse) + ?.functionResponse?.id; if (!functionCallId) { return null; } diff --git a/core/src/telemetry/tracing.ts b/core/src/telemetry/tracing.ts index 828c9fa..13d767d 100644 --- a/core/src/telemetry/tracing.ts +++ b/core/src/telemetry/tracing.ts @@ -331,16 +331,17 @@ function buildLlmRequestForTrace(llmRequest: LlmRequest): Record( +function bindOtelContextToAsyncGenerator( ctx: Context, - generator: AsyncGenerator, -): AsyncGenerator { + generator: AsyncGenerator, +): AsyncGenerator { return { // Bind the next() method to execute within the provided context next: context.bind(ctx, generator.next.bind(generator)), @@ -353,11 +354,29 @@ export function bindAsyncGenerator( // Ensure the async iterator symbol also returns a context-bound generator [Symbol.asyncIterator]() { - return bindAsyncGenerator(ctx, generator[Symbol.asyncIterator]()); + return bindOtelContextToAsyncGenerator(ctx, generator[Symbol.asyncIterator]()); }, }; } +/** + * Runs an async generator function with both OTEL context and JavaScript 'this' context. + * + * @param otelContext - The OpenTelemetry context to bind the generator to + * @param generatorFnContext - The 'this' context to bind to the generator function + * @param generatorFn - The generator function to execute + * + * @returns A new async generator that executes within both contexts + */ +export function runAsyncGeneratorWithOtelContext( + otelContext: Context, + generatorFnContext: TThis, + generatorFn: (this: TThis) => AsyncGenerator +): AsyncGenerator { + const generator = generatorFn.call(generatorFnContext); + return bindOtelContextToAsyncGenerator(otelContext, generator); +} + /** * Determines whether to add request/response content to spans. *