Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/configs/agent_cards/investment_research_agent.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"tags": [
"sec filings",
"fundamental analysis"
]
],
"local_agent_class": "valuecell.agents.research_agent.core:ResearchAgent"
}
}
3 changes: 2 additions & 1 deletion python/configs/agent_cards/news_agent.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"current events",
"financial markets",
"real-time search"
]
],
"local_agent_class": "valuecell.agents.news_agent.core:NewsAgent"
}
}
3 changes: 2 additions & 1 deletion python/configs/agent_cards/strategy_agent.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"version": "0.1.0",
"author": "ValueCell Team",
"tags": ["strategy", "trading", "llm", "demo"],
"notes": "This card is a lightweight example; replace model api_key and tune parameters for production use."
"notes": "This card is a lightweight example; replace model api_key and tune parameters for production use.",
"local_agent_class": "valuecell.agents.strategy_agent.agent:StrategyAgent"
}
}
21 changes: 3 additions & 18 deletions python/scripts/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,10 @@ def main():

processes = []
logfiles = []
for selected_agent in selected_agents:
logfile_path = f"{log_dir}/{selected_agent}.log"
print(f"Starting agent: {selected_agent} - output to {logfile_path}")

# Open logfile for writing
logfile = open(logfile_path, "w")
logfiles.append(logfile)

# Launch command using Popen with output redirected to logfile
process = subprocess.Popen(
MAP_NAME_COMMAND[selected_agent], shell=True, stdout=logfile, stderr=logfile
)
processes.append(process)
print("All agents launched. Waiting for tasks...")

for selected_agent in selected_agents:
print(
f"You can monitor {selected_agent} logs at {log_dir}/{selected_agent}.log or chat on: {FRONTEND_URL}/agent/{selected_agent}"
)
print(
"Agents are now managed in-process by RemoteConnections; external processes are no longer started."
)

# Launch backend
logfile_path = f"{log_dir}/backend.log"
Expand Down
198 changes: 190 additions & 8 deletions python/valuecell/core/agent/connect.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import asyncio
import json
import logging
from dataclasses import dataclass
from importlib import import_module
from pathlib import Path
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional, Type

from a2a.types import AgentCard
from loguru import logger

from valuecell.core.agent.card import parse_local_agent_card_dict
from valuecell.core.agent.client import AgentClient
from valuecell.core.agent.decorator import create_wrapped_agent
from valuecell.core.agent.listener import NotificationListener
from valuecell.core.types import NotificationCallbackType
from valuecell.core.types import BaseAgent, NotificationCallbackType
from valuecell.utils import get_next_available_port

logger = logging.getLogger(__name__)
AGENT_METADATA_CLASS_KEY = "local_agent_class"


@dataclass
Expand All @@ -37,6 +39,90 @@ class AgentContext:
desired_listener_host: Optional[str] = None
desired_listener_port: Optional[int] = None
notification_callback: Optional[NotificationCallbackType] = None
# Local in-process agent runtime
# - `agent_class_spec`: original "module:Class" spec loaded from JSON
# We keep the spec so class resolution can be deferred (and performed
# off the event loop) when the agent is actually started.
# - `agent_instance`: concrete wrapped agent instance (created lazily)
# - `agent_instance_class`: resolved Python class for the agent, if imported
# - `agent_task`: asyncio.Task running the agent's HTTP server (if launched)
agent_class_spec: Optional[str] = None
agent_instance: Optional[BaseAgent] = None
agent_instance_class: Optional[Type[BaseAgent]] = None
agent_task: Optional[asyncio.Task] = None


_LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {}


def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]:
"""Resolve a `module:Class` spec to a Python class.

This function is synchronous and performs a normal import. Callers that
need to avoid blocking the event loop should invoke this via
`asyncio.to_thread(_resolve_local_agent_class, spec)`.

Results are cached in `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated
imports/attribute lookups.
"""

if not spec:
return None

cached = _LOCAL_AGENT_CLASS_CACHE.get(spec)
if cached is not None:
return cached

try:
module_path, class_name = spec.split(":", 1)
module = import_module(module_path)
agent_cls = getattr(module, class_name)
except (ValueError, AttributeError, ImportError) as exc:
logger.error("Failed to import agent class '{}': {}", spec, exc)
return None

_LOCAL_AGENT_CLASS_CACHE[spec] = agent_cls
return agent_cls


async def _build_local_agent(ctx: AgentContext):
"""Asynchronously produce a wrapped local agent instance for the
given `AgentContext`.

Behavior:
- If `agent_instance_class` is already present, use it.
- Otherwise, if `agent_class_spec` is provided, resolve it off the
event loop (`asyncio.to_thread`) so imports don't block the loop.
- If resolution fails, log a warning and return `None` (caller will
treat missing factory as "no local agent available").
- The actual wrapping call (`create_wrapped_agent`) is performed on
the event loop; this preserves any asyncio-related initialization
semantics required by the wrapper (if it needs loop context).
"""

agent_cls = ctx.agent_instance_class
if agent_cls is None and ctx.agent_class_spec:
# Resolve the import in a worker thread to avoid blocking the loop.
agent_cls = await asyncio.to_thread(
_resolve_local_agent_class, ctx.agent_class_spec
)
ctx.agent_instance_class = agent_cls
if agent_cls is None:
logger.warning(
"Unable to resolve local agent class '{}' for '{}'",
ctx.agent_class_spec,
ctx.name,
)
return None

if agent_cls is None:
# No factory available for this context
return None

# `create_wrapped_agent` can perform setup that expects to run in the
# main thread / event loop context (e.g. uvicorn/async setup). Keep it
# synchronous here so any asyncio primitives are created correctly.
return create_wrapped_agent(agent_cls)


class RemoteConnections:
Expand Down Expand Up @@ -93,12 +179,21 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None:
continue
if not agent_card_dict.get("enabled", True):
continue
metadata = (
agent_card_dict.get("metadata")
if isinstance(agent_card_dict.get("metadata"), dict)
else {}
)
class_spec = (
metadata.get(AGENT_METADATA_CLASS_KEY)
if isinstance(metadata, dict)
else None
)
agent_instance_class = None
# Detect planner passthrough from raw JSON (top-level or metadata)
passthrough = bool(agent_card_dict.get("planner_passthrough"))
if not passthrough:
meta = agent_card_dict.get("metadata") or {}
if isinstance(meta, dict):
passthrough = bool(meta.get("planner_passthrough"))
passthrough = bool(metadata.get("planner_passthrough"))
local_agent_card = parse_local_agent_card_dict(agent_card_dict)
if not local_agent_card:
continue
Expand All @@ -107,6 +202,10 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None:
url=local_agent_card.url,
local_agent_card=local_agent_card,
planner_passthrough=passthrough,
agent_instance_class=agent_instance_class,
agent_class_spec=(
class_spec if isinstance(class_spec, str) else None
),
)
except (json.JSONDecodeError, FileNotFoundError, KeyError) as e:
logger.warning(
Expand Down Expand Up @@ -147,6 +246,8 @@ async def start_agent(
ctx.desired_listener_port = listener_port
ctx.notification_callback = notification_callback

await self._ensure_agent_runtime(ctx)

# If already connected, return card
if ctx.client and ctx.client.agent_card:
return ctx.client.agent_card
Expand Down Expand Up @@ -193,7 +294,7 @@ async def _ensure_client(self, ctx: AgentContext) -> None:
# Initialize a temporary client; only assign to context on success
tmp_client = AgentClient(url, push_notification_url=ctx.listener_url)
try:
await tmp_client.ensure_initialized()
await self._initialize_client(tmp_client, ctx)
# Ensure agent card was resolved by the resolver
if not getattr(tmp_client, "agent_card", None):
raise RuntimeError("Agent card resolution returned None")
Expand All @@ -211,6 +312,65 @@ async def _ensure_client(self, ctx: AgentContext) -> None:
logger.error(f"Failed to initialize client for '{ctx.name}' at {url}: {e}")
raise

async def _ensure_agent_runtime(self, ctx: AgentContext) -> None:
"""Launch the agent locally if a factory is available."""
# Existing running task: keep as is
if ctx.agent_task and not ctx.agent_task.done():
return

# Clean up finished tasks and propagate failures
if ctx.agent_task and ctx.agent_task.done():
try:
ctx.agent_task.result()
except Exception as exc:
raise RuntimeError(f"Agent '{ctx.name}' failed during startup") from exc
finally:
ctx.agent_task = None
ctx.agent_instance = None

if ctx.agent_instance is None:
agent_instance = await _build_local_agent(ctx)
if agent_instance is None:
return
ctx.agent_instance = agent_instance
logger.info(f"Launching in-process agent '{ctx.name}'")

if ctx.agent_task is None:
ctx.agent_task = asyncio.create_task(ctx.agent_instance.serve())
# Give the event loop a chance to schedule startup work
await asyncio.sleep(0)
if ctx.agent_task.done():
try:
ctx.agent_task.result()
except Exception as exc:
raise RuntimeError(
f"Agent '{ctx.name}' failed during startup"
) from exc
finally:
ctx.agent_task = None
ctx.agent_instance = None

async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> None:
"""Initialize client with retry for local agents."""
retries = 3 if ctx.agent_task else 1
delay = 0.2
for attempt in range(retries):
try:
await client.ensure_initialized()
return
except Exception as exc:
if attempt >= retries - 1:
raise
logger.debug(
"Retrying client initialization for '{}' ({}/{}): {}",
ctx.name,
attempt + 1,
retries,
exc,
)
await asyncio.sleep(delay)
delay = min(delay * 2, 1.0)

async def _start_listener(
self,
host: str = "localhost",
Expand Down Expand Up @@ -264,6 +424,28 @@ async def _cleanup_agent(self, agent_name: str):
ctx = self._contexts.get(agent_name)
if not ctx:
return
agent_task = ctx.agent_task
if agent_task:
if ctx.agent_instance and hasattr(ctx.agent_instance, "shutdown"):
try:
await ctx.agent_instance.shutdown()
except Exception as exc:
logger.warning(
"Error shutting down agent '{}': {}", agent_name, exc
)
try:
await asyncio.wait_for(agent_task, timeout=5)
except asyncio.TimeoutError:
agent_task.cancel()
try:
await agent_task
except asyncio.CancelledError:
pass
finally:
ctx.agent_task = None
ctx.agent_instance = None
elif ctx.agent_instance is not None:
ctx.agent_instance = None
# Close client
if ctx.client:
await ctx.client.close()
Expand Down
16 changes: 14 additions & 2 deletions python/valuecell/core/agent/decorator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
from typing import Type

Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(self, *args, **kwargs):
agent_card.url, default_scheme="http"
)
self._executor = None
self._server: uvicorn.Server | None = None

async def serve(self):
# Create AgentExecutor wrapper
Expand Down Expand Up @@ -88,9 +90,19 @@ async def serve(self):
port=self._port,
log_level="info",
)
server = uvicorn.Server(config)
self._server = uvicorn.Server(config)
logger.info(f"Starting {agent_name} server at {self.agent_card.url}")
await server.serve()
try:
await self._server.serve()
finally:
await client.aclose()
self._server = None

async def shutdown(self) -> None:
if not self._server:
return
self._server.should_exit = True
await asyncio.sleep(0)

# Preserve original class metadata
DecoratedAgent.__name__ = cls.__name__
Expand Down
Loading