Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fcfa9bc
implementing different a2a transport
Jan 5, 2026
bcd5c6e
Merge branch 'main' into feat/a2a-transport
greysonlalonde Jan 5, 2026
6ff2ca4
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 6, 2026
6db8f73
defaulting the transport protocol to JSONRPC by default
Jan 6, 2026
4207386
removing the optional transport_protocol
Jan 6, 2026
6adb72c
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 6, 2026
8d80725
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 7, 2026
df30c90
instead of default moving to default_factory for transport_protocol
Jan 7, 2026
bcdd4a0
fixing cursor comments
Jan 7, 2026
a0839c8
fixing cursor comments
Jan 7, 2026
0f6f9e0
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 7, 2026
3c17e83
fixing cursor comments
Jan 8, 2026
ebc31db
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 8, 2026
4f58073
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 8, 2026
a40da9c
fixing cursor comments
Jan 8, 2026
0e0d271
Merge remote-tracking branch 'origin/feat/a2a-transport' into feat/a2…
Jan 8, 2026
40cc5c1
chore: use redefined type instead of optional dep
greysonlalonde Jan 9, 2026
5ba7c25
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 10, 2026
9b2fbf9
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 12, 2026
1620f8b
Merge branch 'main' into feat/a2a-transport
koushiv777 Jan 12, 2026
2cafbbf
fix: ensure proper typing throughout execution chain
greysonlalonde Jan 12, 2026
e4798bf
chore: update docs
greysonlalonde Jan 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/en/learn/a2a-agent-delegation.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ The `A2AConfig` class accepts the following parameters:
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>

<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>

## Authentication

For A2A agents that require authentication, use one of the provided auth schemes:
Expand Down
7 changes: 6 additions & 1 deletion lib/crewai/src/crewai/a2a/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from __future__ import annotations

from typing import Annotated, Any, ClassVar
from typing import Annotated, Any, ClassVar, Literal

from pydantic import (
BaseModel,
Expand Down Expand Up @@ -53,6 +53,7 @@ class A2AConfig(BaseModel):
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
"""

model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
Expand Down Expand Up @@ -82,3 +83,7 @@ class A2AConfig(BaseModel):
default_factory=_get_default_update_config,
description="Update mechanism config",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)
47 changes: 42 additions & 5 deletions lib/crewai/src/crewai/a2a/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
import uuid

from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
Expand All @@ -18,7 +18,6 @@
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
Expand Down Expand Up @@ -259,6 +258,7 @@ async def _fetch_agent_card_request() -> httpx.Response:

def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
Expand All @@ -282,6 +282,23 @@ def execute_a2a_delegation(
use aexecute_a2a_delegation directly.

Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
Expand Down Expand Up @@ -323,6 +340,7 @@ def execute_a2a_delegation(
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
)
Expand All @@ -333,6 +351,7 @@ def execute_a2a_delegation(

async def aexecute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
Expand All @@ -356,6 +375,23 @@ async def aexecute_a2a_delegation(
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).

Args:
endpoint: A2A agent endpoint URL
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: Task to delegate
context: Optional context
context_id: Context ID for correlation
task_id: Specific task identifier
reference_task_ids: Related task IDs
metadata: Additional metadata
extensions: Protocol extensions
conversation_history: Previous Message objects
turn_number: Current turn number
agent_branch: Agent tree branch for logging
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
Expand Down Expand Up @@ -414,6 +450,7 @@ async def aexecute_a2a_delegation(
agent_role=agent_role,
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
)

crewai_event_bus.emit(
Expand All @@ -431,6 +468,7 @@ async def aexecute_a2a_delegation(

async def _aexecute_a2a_delegation_impl(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
Expand Down Expand Up @@ -524,7 +562,6 @@ async def _aexecute_a2a_delegation_impl(
extensions=extensions,
)

transport_protocol = TransportProtocol("JSONRPC")
new_messages: list[Message] = [*conversation_history, message]
crewai_event_bus.emit(
None,
Expand Down Expand Up @@ -596,7 +633,7 @@ async def _aexecute_a2a_delegation_impl(
@asynccontextmanager
async def _create_a2a_client(
agent_card: AgentCard,
transport_protocol: TransportProtocol,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
timeout: int,
headers: MutableMapping[str, str],
streaming: bool,
Expand Down Expand Up @@ -640,7 +677,7 @@ async def _create_a2a_client(

config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[str(transport_protocol.value)],
supported_transports=[transport_protocol],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],
Expand Down
2 changes: 2 additions & 0 deletions lib/crewai/src/crewai/a2a/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ def _delegate_to_a2a(
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
)

conversation_history = a2a_result.get("history", [])
Expand Down Expand Up @@ -1085,6 +1086,7 @@ async def _adelegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
)

Expand Down
Loading