-
I've really been enjoying learning more about A2A, but I'm at a loss for how to proceed. I specifically want to try out a simple client-agent framework with client-side polling. This seems to me to be a pretty fundamental building block in my understanding, as well as the utility of the protocol. I'm asking here because I'm currently developing with the Python SDK. My agent-side code simplified looks like this: streaming_result = call_llm_streaming(query)
async for event in streaming_result:
text = event.get("content")
message = TaskArtifactUpdateEvent(
context_id=context.context_id, # type: ignore
task_id=context.task_id, # type: ignore
artifact=new_text_artifact(name="current_result", text=text),
)
await event_queue.enqueue_event(message)
if event["done"]:
break
status = TaskStatusUpdateEvent(
context_id=context.context_id, # type: ignore
task_id=context.task_id, # type: ignore
status=TaskStatus(state=TaskState.completed),
final=True,
)
await event_queue.enqueue_event(status) and my client-side code simplified looks like this: agent_client = A2AClient(httpx_client, agent_card)
send_message_payload: dict[str, Any] = {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "something"}],
"messageId": uuid4().hex,
},
}
request = SendMessageRequest(
id=str(uuid4()), params=MessageSendParams(**send_message_payload)
)
response: SendMessageResponse = await agent_client.send_message(request)
result = response.root.result
full_resp_text = ""
for artifact in result.artifacts:
for part in artifact.parts:
full_resp_text += part.root.text
print(f"The agent answered:\n{full_resp_text}") This has the result of returning the full response in a single response object, locking my client while I'm waiting. What I want to do instead client-side is:
I tried consulting the samples, but I haven't found anything like this. I've never implemented a listening solution like this, so it's my first time on many fronts. Hope someone's able to help! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
Hi yasharhon, I understand that you don't want your In general, the protocol defines the I can see that you're using a legacy the This is how I image it looking: a2a_client = ClientFactory(ClientConfig(polling=False)).create(agent_card)
response = await anext(a2a_client.send_message(request))
task_id = response[0].id
task = await a2a_client.get_task(TaskIdParams(id=task_id))
# Check the status of the task and act accordingly... Let me know if this helps! |
Beta Was this translation helpful? Give feedback.
-
This turns out to be an issue in the code. On the client side, you need to send a streaming message request to initiate reading the SSE feed, and on the server side the code needs to be async all the way in order for the event loop to be available to send an update. Corrected code looks like this on the server side: streaming_result = await async_call_llm_streaming(query) # This method blocked the event loop in my original code
async for event in streaming_result:
text = event.get("content")
message = TaskArtifactUpdateEvent(
context_id=context.context_id, # type: ignore
task_id=context.task_id, # type: ignore
artifact=new_text_artifact(name="current_result", text=text),
)
await event_queue.enqueue_event(message)
status = TaskStatusUpdateEvent(
context_id=context.context_id,
task_id=context.task_id,
status=TaskStatus(state=TaskState.completed),
final=True,
)
await event_queue.enqueue_event(status) On the client side it should look like this: agent_client = A2AClient(httpx_client, agent_card)
send_message_payload: dict[str, Any] = {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "something"}],
"messageId": uuid4().hex,
},
}
request = SendStreamingMessageRequest(
id=str(uuid4()), params=MessageSendParams(**send_message_payload)
) # This must be updated to be a streaming message request
response = agent_client.send_message_streaming(request) # This call needs to be updated
async for update in response:
pass # Insert your code for handling status updates here |
Beta Was this translation helpful? Give feedback.
This turns out to be an issue in the code. On the client side, you need to send a streaming message request to initiate reading the SSE feed, and on the server side the code needs to be async all the way in order for the event loop to be available to send an update. Corrected code looks like this on the server side: