Skip to content
Merged
439 changes: 0 additions & 439 deletions python/valuecell/core/agent/README.md

This file was deleted.

19 changes: 15 additions & 4 deletions python/valuecell/core/agent/card.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@


def parse_local_agent_card_dict(agent_card_dict: dict) -> Optional[AgentCard]:
"""Parse a dictionary into an AgentCard, filling in missing required fields.

Args:
agent_card_dict: Dictionary containing agent card data

Returns:
AgentCard instance if parsing succeeds, None if input is not a dict
"""
if not isinstance(agent_card_dict, dict):
return None
# Defined by us, remove fields that are not part of AgentCard
Expand Down Expand Up @@ -40,14 +48,17 @@ def parse_local_agent_card_dict(agent_card_dict: dict) -> Optional[AgentCard]:
def find_local_agent_card_by_agent_name(
agent_name: str, base_dir: Optional[str | Path] = None
) -> Optional[AgentCard]:
"""
Reads JSON files from agent_cards directory and returns the first one where name matches.
"""Find an agent card by name from local JSON configuration files.

Searches through JSON files in the agent_cards directory and returns the first
matching agent card where the name field matches the provided agent_name.

Args:
name: The agent name to search for
agent_name: The name of the agent to search for
base_dir: Optional base directory to search in. If None, uses default path

Returns:
Dict: The agent configuration dictionary if found, None otherwise
AgentCard instance if found and enabled, None otherwise
"""
agent_cards_path = Path(base_dir) if base_dir else Path(get_agent_card_path())

Expand Down
37 changes: 34 additions & 3 deletions python/valuecell/core/agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,19 @@


class AgentClient:
"""Client for communicating with remote agents via A2A protocol.

Handles HTTP communication with remote agents, including message sending
and agent card resolution. Supports both streaming and non-streaming modes.
"""

def __init__(self, agent_url: str, push_notification_url: str = None):
"""Initialize the agent client.

Args:
agent_url: URL of the remote agent
push_notification_url: Optional URL for push notifications
"""
self.agent_url = agent_url
self.push_notification_url = push_notification_url
self.agent_card = None
Expand All @@ -18,11 +30,13 @@ def __init__(self, agent_url: str, push_notification_url: str = None):
self._initialized = False

async def ensure_initialized(self):
"""Ensure the client is initialized with agent card and HTTP client."""
if not self._initialized:
await self._setup_client()
self._initialized = True

async def _setup_client(self):
"""Set up the HTTP client and resolve the agent card."""
self._httpx_client = httpx.AsyncClient(timeout=30)

config = ClientConfig(
Expand Down Expand Up @@ -55,10 +69,21 @@ async def send_message(
metadata: dict = None,
streaming: bool = False,
) -> AsyncIterator[RemoteAgentResponse]:
"""Send message to Agent.
"""Send a message to the remote agent and return an async iterator.

If `streaming` is True, return an async iterator producing (task, event) pairs.
If `streaming` is False, return the first (task, event) pair (and close the generator).
This method always returns an async iterator producing (remote_task,
event) pairs. When `streaming` is True the iterator yields streaming
events as they arrive. When `streaming` is False the iterator yields a
single (task, event) pair and then completes.

Args:
query: The user query to send to the agent.
conversation_id: Optional conversation id to correlate messages.
metadata: Optional metadata to send alongside the message.
streaming: Whether to request streaming responses from the agent.

Returns:
An async iterator yielding `RemoteAgentResponse` items (task,event).
"""
await self.ensure_initialized()

Expand Down Expand Up @@ -88,11 +113,17 @@ async def wrapper() -> AsyncIterator[RemoteAgentResponse]:
return wrapper()

async def get_agent_card(self):
"""Get the agent card from the remote agent.

Returns:
The resolved agent card
"""
await self.ensure_initialized()
card_resolver = A2ACardResolver(self._httpx_client, self.agent_url)
return await card_resolver.get_agent_card()

async def close(self):
"""Close the HTTP client and clean up resources."""
if self._httpx_client:
await self._httpx_client.aclose()
self._httpx_client = None
Expand Down
20 changes: 17 additions & 3 deletions python/valuecell/core/agent/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

@dataclass
class AgentContext:
"""Unified context for remote agents."""
"""Unified context for remote agents.

Stores connection state, URLs, and configuration for a remote agent.
"""

name: str
# Connection/runtime state
Expand Down Expand Up @@ -200,9 +203,20 @@ async def _start_listener(
self,
host: str = "localhost",
port: Optional[int] = None,
notification_callback: callable = None,
notification_callback: NotificationCallbackType = None,
) -> tuple[asyncio.Task, str]:
"""Start a NotificationListener and return (task, url)."""
"""Start a NotificationListener and return (task, url).

Args:
host: Host to bind the listener to.
port: Optional port to bind; if None a free port will be selected.
notification_callback: Callback invoked when notifications arrive;
should conform to `NotificationCallbackType`.

Returns:
Tuple of (asyncio.Task, listener_url) where listener_url is the
http URL where notifications should be posted.
"""
if port is None:
port = get_next_available_port(5000)
listener = NotificationListener(
Expand Down
60 changes: 60 additions & 0 deletions python/valuecell/core/agent/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@


def _serve(agent_card: AgentCard):
"""Create a decorator that wraps an agent class with server capabilities.

Args:
agent_card: The agent card containing configuration

Returns:
A decorator function that adds serve() method to agent classes
"""

def decorator(cls: Type) -> Type:
# Determine the agent name consistently
agent_name = cls.__name__
Expand Down Expand Up @@ -99,10 +108,30 @@ async def serve(self):


class GenericAgentExecutor(AgentExecutor):
"""Generic executor for BaseAgent implementations.

Handles the execution lifecycle including task creation, streaming responses,
and error handling for agents that implement the BaseAgent interface.
"""

def __init__(self, agent: BaseAgent):
"""Initialize the executor with an agent instance.

Args:
agent: The agent instance to execute
"""
self.agent = agent

async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Execute the agent with the given context and event queue.

Handles task creation if needed, streams responses from the agent,
and updates task status throughout execution.

Args:
context: The request context containing user input and metadata
event_queue: Queue for sending events back to the client
"""
# Prepare query and ensure a task exists in the system
query = context.get_user_input()
task = context.current_task
Expand Down Expand Up @@ -184,15 +213,46 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
await updater.complete()

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Cancel the current agent execution.

Args:
context: The request context
event_queue: Queue for sending events

Raises:
ServerError: Always raises as cancel is not supported
"""
# Default cancel operation
raise ServerError(error=UnsupportedOperationError())


def _create_agent_executor(agent_instance):
"""Create a GenericAgentExecutor for the given agent instance.

Args:
agent_instance: The agent instance to wrap

Returns:
GenericAgentExecutor instance
"""
return GenericAgentExecutor(agent_instance)


def create_wrapped_agent(agent_class: Type[BaseAgent]):
"""Create a wrapped agent instance with server capabilities.

Loads the agent card from local configuration and wraps the agent class
with server functionality.

Args:
agent_class: The agent class to wrap

Returns:
Wrapped agent instance ready to serve

Raises:
ValueError: If no agent configuration is found
"""
# Get agent configuration from agent cards
agent_card = find_local_agent_card_by_agent_name(agent_class.__name__)
if not agent_card:
Expand Down
25 changes: 25 additions & 0 deletions python/valuecell/core/agent/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,45 @@


class NotificationListener:
"""HTTP server for receiving push notifications from agents.

Listens on a specified host and port for incoming notification requests,
validates them, and forwards them to a callback function.
"""

def __init__(
self,
host: str = "localhost",
port: int = 5000,
notification_callback: Optional[Callable] = None,
):
"""Initialize the notification listener.

Args:
host: Host to bind the server to
port: Port to listen on
notification_callback: Function to call when notifications are received
"""
self.host = host
self.port = port
self.notification_callback = notification_callback
self.app = self._create_app()

def _create_app(self):
"""Create the Starlette application with notification routes."""
app = Starlette()
app.add_route("/notify", self.handle_notification, methods=["POST"])
return app

async def handle_notification(self, request: Request):
"""Handle incoming notification requests.

Args:
request: The incoming HTTP request

Returns:
JSONResponse with status or error
"""
try:
task_dict = await request.json()
logger.info(
Expand All @@ -49,10 +71,12 @@ async def handle_notification(self, request: Request):
return JSONResponse({"error": str(e)}, status_code=500)

def start(self):
"""Start the notification listener server (blocking)."""
logger.info(f"Starting listener on {self.host}:{self.port}")
uvicorn.run(self.app, host=self.host, port=self.port)

async def start_async(self):
"""Start the notification listener server asynchronously."""
logger.info(f"Starting async listener on {self.host}:{self.port}")
config = uvicorn.Config(
self.app, host=self.host, port=self.port, log_level="info"
Expand All @@ -62,6 +86,7 @@ async def start_async(self):


def main():
"""Main entry point for running the notification listener."""
listener = NotificationListener()
listener.start()

Expand Down
Loading