Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,24 @@ async def run_live(
break
logger.debug('Receive new event: %s', event)
yield event
# send back the function response
# send back the function response to models
if event.get_function_responses():
logger.debug(
'Sending back last function response event: %s', event
)
invocation_context.live_request_queue.send_content(
event.content
)
# We handle agent transfer here in `run_live` rather than
# in `_postprocess_live` to prevent duplication of function
# response processing. If agent transfer were handled in
# `_postprocess_live`, events yielded from child agent's
# `run_live` would bubble up to parent agent's `run_live`,
# causing `event.get_function_responses()` to be true in both
# child and parent, and `send_content()` to be called twice for
# the same function response. By handling agent transfer here,
# we ensure that only child agent processes its own function
# responses after the transfer.
if (
event.content
and event.content.parts
Expand All @@ -174,7 +184,21 @@ async def run_live(
await asyncio.sleep(DEFAULT_TRANSFER_AGENT_DELAY)
# cancel the tasks that belongs to the closed connection.
send_task.cancel()
logger.debug('Closing live connection')
await llm_connection.close()
logger.debug('Live connection closed.')
# transfer to the sub agent.
transfer_to_agent = event.actions.transfer_to_agent
if transfer_to_agent:
logger.debug('Transferring to agent: %s', transfer_to_agent)
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(
agent_to_run.run_live(invocation_context)
) as agen:
async for item in agen:
yield item
if (
event.content
and event.content.parts
Expand Down Expand Up @@ -638,15 +662,6 @@ async def _postprocess_live(
)
yield final_event

transfer_to_agent = function_response_event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
invocation_context, transfer_to_agent
)
async with Aclosing(agent_to_run.run_live(invocation_context)) as agen:
async for item in agen:
yield item

async def _postprocess_run_processors_async(
self, invocation_context: InvocationContext, llm_response: LlmResponse
) -> AsyncGenerator[Event, None]:
Expand Down
Loading