Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions python/valuecell/core/agent/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,13 @@ async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]:
# Fast path: cache hit
cached = _LOCAL_AGENT_CLASS_CACHE.get(spec)
if cached is not None:
logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec)
logger.info("_resolve_local_agent_class: cache hit for '{}'", spec)
return cached

logger.info(
"_resolve_local_agent_class: cache miss for '{}', delegating to executor", spec
)

loop = asyncio.get_running_loop()
# Delegate the synchronous import to the thread pool
try:
Expand Down Expand Up @@ -292,17 +296,39 @@ def preload_local_agent_classes(self) -> None:
the main thread, we sidestep this issue entirely.
"""
self._ensure_remote_contexts_loaded()
preloaded_count = 0
for name, ctx in self._contexts.items():
if ctx.agent_class_spec and ctx.agent_instance_class is None:
logger.info("Preloading agent class for '{}'", name)
cls = _resolve_local_agent_class_sync(ctx.agent_class_spec)
ctx.agent_instance_class = cls
if cls is None:
logger.warning(
"Failed to preload agent class '{}' for '{}'",
ctx.agent_class_spec,
name,
)
if not ctx.agent_class_spec:
logger.debug("Skipping preload for '{}': no agent_class_spec", name)
continue
if ctx.agent_instance_class is not None:
logger.debug("Skipping preload for '{}': class already loaded", name)
continue
logger.info(
"Preloading agent class for '{}' (spec='{}')",
name,
ctx.agent_class_spec,
)
cls = _resolve_local_agent_class_sync(ctx.agent_class_spec)
ctx.agent_instance_class = cls
if cls is None:
logger.warning(
"Failed to preload agent class '{}' for '{}'",
ctx.agent_class_spec,
name,
)
else:
preloaded_count += 1
logger.info(
"Successfully preloaded class '{}' for '{}'",
cls.__name__,
name,
)
logger.info(
"Preload complete: {}/{} agent classes loaded",
preloaded_count,
len(self._contexts),
)

# Public helper primarily for tests or tooling to load from a custom dir
def load_from_dir(self, config_dir: str) -> None:
Expand Down
79 changes: 79 additions & 0 deletions python/valuecell/core/agent/tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,85 @@ def test_preload_agent_classes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
assert ctx.agent_instance_class is not None


def test_preload_skips_agents_without_class_spec(tmp_path: Path):
"""Test that preload skips agents without agent_class_spec."""
dir_path = tmp_path / "agent_cards"
dir_path.mkdir(parents=True)

# Card without metadata (no local_agent_class)
card = make_card_dict(
"NoSpecAgent", "http://127.0.0.1:8102", push_notifications=False
)
with open(dir_path / "NoSpecAgent.json", "w", encoding="utf-8") as f:
json.dump(card, f)

rc = RemoteConnections()
rc.load_from_dir(str(dir_path))

ctx = rc._contexts["NoSpecAgent"]
assert ctx.agent_class_spec is None

# Preload should skip this agent (no error, just skip)
rc.preload_local_agent_classes()

# Still None since there was no spec
assert ctx.agent_instance_class is None


def test_preload_skips_already_loaded_class(tmp_path: Path):
"""Test that preload skips agents with class already loaded."""
dir_path = tmp_path / "agent_cards"
dir_path.mkdir(parents=True)

card = make_card_dict(
"PreloadedAgent", "http://127.0.0.1:8103", push_notifications=False
)
card["metadata"] = {
"local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent"
}
with open(dir_path / "PreloadedAgent.json", "w", encoding="utf-8") as f:
json.dump(card, f)

rc = RemoteConnections()
rc.load_from_dir(str(dir_path))

ctx = rc._contexts["PreloadedAgent"]
# Simulate class already loaded
sentinel_class = type("SentinelClass", (), {})
ctx.agent_instance_class = sentinel_class

# Preload should skip since class is already loaded
rc.preload_local_agent_classes()

# Should still be sentinel, not replaced
assert ctx.agent_instance_class is sentinel_class


def test_preload_handles_failed_import(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Test that preload warns on failed import but continues."""
dir_path = tmp_path / "agent_cards"
dir_path.mkdir(parents=True)

card = make_card_dict(
"FailAgent", "http://127.0.0.1:8104", push_notifications=False
)
card["metadata"] = {"local_agent_class": "nonexistent.module:FakeClass"}
with open(dir_path / "FailAgent.json", "w", encoding="utf-8") as f:
json.dump(card, f)

rc = RemoteConnections()
rc.load_from_dir(str(dir_path))

ctx = rc._contexts["FailAgent"]
assert ctx.agent_class_spec == "nonexistent.module:FakeClass"

# Preload should warn but not raise
rc.preload_local_agent_classes()

# Class should remain None due to failed import
assert ctx.agent_instance_class is None


@pytest.mark.asyncio
async def test_start_agent_without_listener(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
Expand Down
11 changes: 0 additions & 11 deletions python/valuecell/server/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.info(f"Error configuring adapters: {e}")

# Preload local agent classes to avoid import deadlocks on Windows
try:
from valuecell.core.agent.connect import RemoteConnections

logger.info("Preloading local agent classes...")
rc = RemoteConnections()
rc.preload_local_agent_classes()
logger.info("✓ Local agent classes preloaded")
except Exception as e:
logger.warning(f"✗ Failed to preload local agent classes: {e}")

yield
# Shutdown
logger.info("ValueCell Server shutting down...")
Expand Down
26 changes: 26 additions & 0 deletions python/valuecell/server/services/agent_stream_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from loguru import logger

from valuecell.core.agent.connect import RemoteConnections
from valuecell.core.coordinate.orchestrator import AgentOrchestrator
from valuecell.core.task.executor import TaskExecutor
from valuecell.core.task.locator import get_task_service
Expand All @@ -15,13 +16,38 @@
from valuecell.utils.uuid import generate_conversation_id

_TASK_AUTORESTART_STARTED = False
_AGENT_CLASSES_PRELOADED = False


def _preload_agent_classes_once() -> None:
"""Preload local agent classes once to avoid Windows import lock deadlocks.

This must run in the main thread before any async operations that might
trigger imports in worker threads. Safe to call multiple times.
"""
global _AGENT_CLASSES_PRELOADED
if _AGENT_CLASSES_PRELOADED:
return
_AGENT_CLASSES_PRELOADED = True

try:
logger.info("Preloading local agent classes...")
rc = RemoteConnections()
rc.preload_local_agent_classes()
logger.info("✓ Local agent classes preloaded")
except Exception as e:
logger.warning(f"✗ Failed to preload local agent classes: {e}")


class AgentStreamService:
"""Service for handling streaming agent queries."""

def __init__(self):
"""Initialize the agent stream service."""
# Preload agent classes before creating orchestrator to avoid
# Windows import lock deadlocks when using thread pools
_preload_agent_classes_once()

self.orchestrator = AgentOrchestrator()
logger.info("Agent stream service initialized")

Expand Down