Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions python/valuecell/core/agent/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class AgentContext:
listener_task: Optional[asyncio.Task] = None
listener_url: Optional[str] = None
client: Optional[AgentClient] = None
# Planner passthrough flag derived from raw agent card JSON
planner_passthrough: bool = False
metadata: Optional[Dict[str, Any]] = None
# Listener preferences
desired_listener_host: Optional[str] = None
desired_listener_port: Optional[int] = None
Expand All @@ -51,6 +50,23 @@ class AgentContext:
agent_instance_class: Optional[Type[BaseAgent]] = None
agent_task: Optional[asyncio.Task] = None

def _get_metadata_flag(self, key: str) -> Optional[bool]:
"""Retrieve a boolean-like flag from stored metadata or card."""
if isinstance(self.metadata, dict) and key in self.metadata:
return bool(self.metadata[key])

return None

@property
def planner_passthrough(self) -> bool:
flag = self._get_metadata_flag("planner_passthrough")
return bool(flag) if flag is not None else False

@property
def hidden(self) -> bool:
flag = self._get_metadata_flag("hidden")
return bool(flag) if flag is not None else False


_LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {}

Expand Down Expand Up @@ -179,33 +195,22 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None:
continue
if not agent_card_dict.get("enabled", True):
continue
metadata = (
agent_card_dict.get("metadata")
if isinstance(agent_card_dict.get("metadata"), dict)
else {}
)
class_spec = (
metadata.get(AGENT_METADATA_CLASS_KEY)
if isinstance(metadata, dict)
else None
raw_metadata = agent_card_dict.get("metadata")
metadata: Dict[str, Any] = (
dict(raw_metadata) if isinstance(raw_metadata, dict) else {}
)
agent_instance_class = None
# Detect planner passthrough from raw JSON (top-level or metadata)
passthrough = bool(agent_card_dict.get("planner_passthrough"))
if not passthrough:
passthrough = bool(metadata.get("planner_passthrough"))
class_spec = metadata.get(AGENT_METADATA_CLASS_KEY)
if not isinstance(class_spec, str):
class_spec = None
local_agent_card = parse_local_agent_card_dict(agent_card_dict)
if not local_agent_card:
continue
self._contexts[agent_name] = AgentContext(
name=agent_name,
url=local_agent_card.url,
local_agent_card=local_agent_card,
planner_passthrough=passthrough,
agent_instance_class=agent_instance_class,
agent_class_spec=(
class_spec if isinstance(class_spec, str) else None
),
metadata=metadata or None,
agent_class_spec=class_spec,
)
except (json.JSONDecodeError, FileNotFoundError, KeyError) as e:
logger.warning(
Expand Down Expand Up @@ -522,10 +527,26 @@ def get_all_agent_cards(self) -> Dict[str, AgentCard]:

return agent_cards

def get_planable_agent_cards(self) -> Dict[str, AgentCard]:
"""Return AgentCards that are available for planning workflows."""
self._ensure_remote_contexts_loaded()
planable_cards: Dict[str, AgentCard] = {}
for name, ctx in self._contexts.items():
if ctx.planner_passthrough or ctx.hidden:
continue
card = None
if ctx.client and ctx.client.agent_card:
card = ctx.client.agent_card
elif ctx.local_agent_card:
card = ctx.local_agent_card
if card:
planable_cards[name] = card
return planable_cards

def is_planner_passthrough(self, agent_name: str) -> bool:
"""Return True if the named agent is marked as planner passthrough.

The flag is read once from raw JSON on load and cached in AgentContext.
The flag is read from stored metadata associated with the AgentContext.
"""
self._ensure_remote_contexts_loaded()
ctx = self._contexts.get(agent_name)
Expand Down
41 changes: 41 additions & 0 deletions python/valuecell/core/agent/tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,47 @@ async def test_get_all_agent_cards_returns_local_cards(tmp_path: Path):
assert all(isinstance(card, AgentCard) for card in all_cards.values())


def test_agent_context_reads_metadata_flags(tmp_path: Path):
dir_path = tmp_path / "agent_cards"
dir_path.mkdir(parents=True)

card = make_card_dict("MetaVisible", "http://127.0.0.1:8910", True)
card["metadata"] = {"planner_passthrough": True, "hidden": True}

_write_card(dir_path / "MetaVisible.json", card)

rc = RemoteConnections()
rc.load_from_dir(str(dir_path))

ctx = rc._contexts["MetaVisible"]
assert ctx.metadata == card["metadata"]
assert ctx.planner_passthrough is True
assert ctx.hidden is True


def test_get_planable_agent_cards_filters_flags(tmp_path: Path):
dir_path = tmp_path / "agent_cards"
dir_path.mkdir(parents=True)

visible = make_card_dict("Planable", "http://127.0.0.1:8920", True)
hidden = make_card_dict("Hidden", "http://127.0.0.1:8921", True)
passthrough = make_card_dict("Passthrough", "http://127.0.0.1:8922", True)
hidden["metadata"] = {"hidden": True}
passthrough["metadata"] = {"planner_passthrough": True}

_write_card(dir_path / "Planable.json", visible)
_write_card(dir_path / "Hidden.json", hidden)
_write_card(dir_path / "Passthrough.json", passthrough)

rc = RemoteConnections()
rc.load_from_dir(str(dir_path))

planable = rc.get_planable_agent_cards()

assert set(planable.keys()) == {"Planable"}
assert planable["Planable"].name == "Planable"


@pytest.mark.asyncio
async def test_resolve_local_agent_class_from_metadata(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
Expand Down
63 changes: 48 additions & 15 deletions python/valuecell/core/plan/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,27 @@ async def _analyze_input_and_create_tasks(
"Please configure a valid API key or provider settings and retry."
)

run_response = agent.run(
PlannerInput(
target_agent_name=user_input.target_agent_name,
query=user_input.query,
),
session_id=conversation_id,
user_id=user_input.meta.user_id,
)
# Be robust if model attributes are unavailable
try:
model = agent.model
model_description = f"{model.id} (via {model.provider})"
except Exception:
model_description = "unknown model/provider"
try:
run_response = agent.run(
PlannerInput(
target_agent_name=user_input.target_agent_name,
query=user_input.query,
),
session_id=conversation_id,
user_id=user_input.meta.user_id,
)
except Exception as exc:
logger.exception("Planner run failed: %s", exc)
return [], (
f"Planner encountered an error during execution: {exc}. "
f"Please check the capabilities of your model `{model_description}` and try again later."
)

# Handle user input requests through Human-in-the-Loop workflow
while run_response.is_paused:
Expand Down Expand Up @@ -242,12 +255,6 @@ async def _analyze_input_and_create_tasks(
# Parse planning result and create tasks
plan_raw = run_response.content
if not isinstance(plan_raw, PlannerResponse):
# Be robust if model attributes are unavailable
try:
model = agent.model
model_description = f"{model.id} (via {model.provider})"
except Exception:
model_description = "unknown model/provider"
return (
[],
(
Expand All @@ -264,6 +271,32 @@ async def _analyze_input_and_create_tasks(
logger.info(f"Planner needs user guidance: {guidance_message}")
return [], guidance_message # Return empty task list with guidance

planable_cards = self.agent_connections.get_planable_agent_cards()
planable_agent_names = set(planable_cards.keys())
invalid_agents = {
t.agent_name
for t in plan_raw.tasks
if t.agent_name not in planable_agent_names
}
if invalid_agents:
available_agents = (
", ".join(sorted(planable_agent_names))
if planable_agent_names
else "none"
)
invalid_list = ", ".join(sorted(invalid_agents))
guidance_message = (
"Planner selected unsupported agent(s):"
f" {invalid_list}."
f" Maybe the chosen model `{model_description}` hallucinated."
)
logger.warning(
"Planner proposed unsupported agents: %s (available: %s)",
invalid_list,
available_agents,
)
return [], guidance_message

# Create tasks from planner response
tasks = []
for t in plan_raw.tasks:
Expand Down Expand Up @@ -346,7 +379,7 @@ def tool_get_agent_description(self, agent_name: str) -> str:
return "The requested agent could not be found or is not available."

def tool_get_enabled_agents(self) -> str:
map_agent_name_to_card = self.agent_connections.get_all_agent_cards()
map_agent_name_to_card = self.agent_connections.get_planable_agent_cards()
parts = []
for agent_name, card in map_agent_name_to_card.items():
parts.append(f"<{agent_name}>")
Expand Down
76 changes: 74 additions & 2 deletions python/valuecell/core/plan/tests/test_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@


class StubConnections:
def __init__(self, cards: dict[str, object] | None = None):
def __init__(
self,
cards: dict[str, object] | None = None,
planable: dict[str, object] | None = None,
):
self.cards = cards or {}
self.planable = planable or self.cards

def get_all_agent_cards(self) -> dict[str, object]:
return self.cards

def get_planable_agent_cards(self) -> dict[str, object]:
return self.planable

def get_agent_card(self, name: str):
return self.cards.get(name)

Expand Down Expand Up @@ -74,7 +82,8 @@ def continue_run(self, *args, **kwargs):
)
monkeypatch.setattr(planner_mod, "agent_debug_mode_enabled", lambda: False)

planner = ExecutionPlanner(StubConnections())
research_card = SimpleNamespace(name="ResearchAgent", description="Research")
planner = ExecutionPlanner(StubConnections({"ResearchAgent": research_card}))

user_input = UserInput(
query="Need super-agent handoff",
Expand Down Expand Up @@ -140,6 +149,69 @@ async def callback(request):
assert plan.guidance_message


@pytest.mark.asyncio
async def test_create_plan_rejects_non_planable_agents(
monkeypatch: pytest.MonkeyPatch,
):
invalid_plan = PlannerResponse.model_validate(
{
"adequate": True,
"reason": "ok",
"tasks": [
{
"title": "Run hidden agent",
"query": "Do secret things",
"agent_name": "HiddenAgent",
"pattern": "once",
"schedule_config": None,
}
],
"guidance_message": None,
}
)

class FakeAgent:
def __init__(self, *args, **kwargs):
self.model = SimpleNamespace(id="fake-model", provider="fake-provider")

def run(self, *args, **kwargs):
return SimpleNamespace(
is_paused=False,
tools_requiring_user_input=[],
tools=[],
content=invalid_plan,
)

monkeypatch.setattr(planner_mod, "Agent", FakeAgent)
monkeypatch.setattr(
model_utils_mod, "get_model_for_agent", lambda *args, **kwargs: "stub-model"
)
monkeypatch.setattr(planner_mod, "agent_debug_mode_enabled", lambda: False)

allowed_card = SimpleNamespace(name="VisibleAgent", description="Visible")
planner = ExecutionPlanner(
StubConnections(
{"VisibleAgent": allowed_card},
planable={"VisibleAgent": allowed_card},
)
)

user_input = UserInput(
query="Use hidden agent",
target_agent_name="VisibleAgent",
meta=UserInputMetadata(conversation_id="conv-3", user_id="user-3"),
)

async def callback(_): # pragma: no cover - should not be called
raise AssertionError("callback should not be invoked")

plan = await planner.create_plan(user_input, callback, "thread-77")

assert plan.tasks == []
assert plan.guidance_message
assert "unsupported agent" in plan.guidance_message


def test_tool_get_enabled_agents_formats_cards(monkeypatch: pytest.MonkeyPatch):
# Mock create_model to avoid API key validation in CI
monkeypatch.setattr(
Expand Down
32 changes: 21 additions & 11 deletions python/valuecell/core/super_agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,29 @@ async def run(self, user_input: UserInput) -> SuperAgentOutcome:
reason="SuperAgent unavailable: missing model/provider configuration",
)

response = await agent.arun(
user_input.query,
session_id=user_input.meta.conversation_id,
user_id=user_input.meta.user_id,
add_history_to_context=True,
)
try:
model = agent.model
model_description = f"{model.id} (via {model.provider})"
except Exception:
model_description = "unknown model/provider"
try:
response = await agent.arun(
user_input.query,
session_id=user_input.meta.conversation_id,
user_id=user_input.meta.user_id,
add_history_to_context=True,
)
except Exception as e:
return SuperAgentOutcome(
decision=SuperAgentDecision.ANSWER,
reason=(
f"SuperAgent encountered an error: {e}."
f"Please check the capabilities of your model `{model_description}` and try again later."
),
)

outcome = response.content
if not isinstance(outcome, SuperAgentOutcome):
try:
model = agent.model
model_description = f"{model.id} (via {model.provider})"
except Exception:
model_description = "unknown model/provider"
answer_content = (
f"SuperAgent produced a malformed response: `{outcome}`. "
f"Please check the capabilities of your model `{model_description}` and try again later."
Expand Down