-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Description
Describe the Bug
Hi, I'm Boris, currently working on the Opik integration for Google ADK (documentation).
Google ADK is easily one of the best agent frameworks I’ve used, but I encountered a subtle issue while following the quickstart tutorial. The guide instructs users to stop processing events by breaking the loop after detecting event.is_final_response():
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
if event.is_final_response():
...
break # Stop processing events once the final response is foundWhen following this approach in combination with Opik's integration and callback hooks, the after_agent_callback is never invoked. This callback is essential for finalizing the Opik trace, and its absence results in unexpected and inconsistent behavior. This has led to confusion among several Opik users, as reported in #2467, #2386, and internal tickets.
Interestingly, this issue does not appear when running the agent via adk run or adk web, the callback is invoked as expected in those environments. The problem seems specific to direct usage of runner.run_async as shown in the quickstart loop.
To Reproduce
Use the script below, adapted from the ADK quickstart, to reproduce the issue:
Click to expand full script
import asyncio
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents import Agent
from google.genai import types
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
if city.lower() == "new york":
return {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
return {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
return {
"status": "error",
"error_message": (f"Sorry, I don't have timezone information for {city}."),
}
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = f"The current time in {city} is {now.strftime('%Y-%m-%d %H:%M:%S %Z%z')}"
return {"status": "success", "report": report}
def before_agent_callback(*args, **kwargs):
print("Before agent callback")
return None
def after_agent_callback(*args, **kwargs):
print("After agent callback")
return None
def before_model_callback(*args, **kwargs):
print("Before model callback")
return None
def after_model_callback(*args, **kwargs):
print("After model callback")
return None
def before_tool_callback(*args, **kwargs):
print("Before tool callback")
return None
def after_tool_callback(*args, **kwargs):
print("After tool callback")
return None
weather_agent = Agent(
name="weather_time_agent",
model="gemini-2.0-flash",
description=("Agent to answer questions about the time and weather in a city."),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
before_agent_callback=before_agent_callback,
after_agent_callback=after_agent_callback,
before_model_callback=before_model_callback,
after_model_callback=after_model_callback,
before_tool_callback=before_tool_callback,
after_tool_callback=after_tool_callback,
)
async def call_agent_async(query: str, runner, user_id, session_id):
"""Sends a query to the agent and prints the final response."""
print(f"\n>>> User Query: {query}")
# Prepare the user's message in ADK format
content = types.Content(role="user", parts=[types.Part(text=query)])
final_response_text = "Agent did not produce a final response." # Default
async for event in runner.run_async(
user_id=user_id, session_id=session_id, new_message=content
):
if event.is_final_response():
if event.content and event.content.parts:
# Assuming text response in the first part
final_response_text = event.content.parts[0].text
elif (
event.actions and event.actions.escalate
): # Handle potential errors/escalations
final_response_text = (
f"Agent escalated: {event.error_message or 'No specific message.'}"
)
# Add more checks here if needed (e.g., specific error codes)
break # Stop processing events once the final response is found
print(f"<<< Agent Response: {final_response_text}")
async def run_conversation():
session_service = InMemorySessionService()
# Define constants for identifying the interaction context
APP_NAME = "weather_tutorial_app"
USER_ID = "user_1"
SESSION_ID = "session_001" # Using a fixed ID for simplicity
# Create the specific session where the conversation will happen
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
print(
f"Session created: App='{APP_NAME}', User='{USER_ID}', Session='{SESSION_ID}'"
)
# --- Runner ---
# Key Concept: Runner orchestrates the agent execution loop.
runner = Runner(
agent=weather_agent, # The agent we want to run
app_name=APP_NAME, # Associates runs with our app
session_service=session_service, # Uses our session manager
)
print(f"Runner created for agent '{runner.agent.name}'.")
await call_agent_async(
"What is the weather like in London?",
runner=runner,
user_id=USER_ID,
session_id=SESSION_ID,
)
await call_agent_async(
"How about Paris?", runner=runner, user_id=USER_ID, session_id=SESSION_ID
) # Expecting the tool's error message
if __name__ == "__main__":
asyncio.run(run_conversation())When executed, the output is as follows:
Session created: App='weather_tutorial_app', User='user_1', Session='session_001'
Runner created for agent 'weather_time_agent'.
>>> User Query: What is the weather like in London?
Before agent callback
Before model callback
Both GOOGLE_API_KEY and GEMINI_API_KEY are set. Using GOOGLE_API_KEY.
...
After model callback
<<< Agent Response: I am sorry, I cannot get the weather in London.
>>> User Query: How about Paris?
Before agent callback
Before model callback
...
After model callback
<<< Agent Response: I am sorry, I cannot get the weather in Paris.
Notably, the After agent callback is never printed, indicating that it was not executed.
However, when the break statement (line 128 in the script) is commented out, the after_agent_callback is correctly triggered.
Expected Behavior
The presence of the break in the quickstart guide is misleading. Users following this example may unknowingly prevent critical callbacks from firing, leading to trace finalization issues and data loss in observability tools like Opik.
It would be beneficial if:
- The
after_agent_callbackwere guaranteed to run even before yielding afinal_response, or - The quickstart documentation clarified this caveat or avoided the premature
breakaltogether.
Environment
- OS: Fedora release 40 (Forty)
- Python version: 3.11.10
- ADK version: 1.5.0
Model Information
- Using
gemini-2.0-flash - The issue appears to occur regardless of model selection.