diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 039cf935e..29feaeee3 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -33,8 +33,7 @@ class AgentContext: listener_task: Optional[asyncio.Task] = None listener_url: Optional[str] = None client: Optional[AgentClient] = None - # Planner passthrough flag derived from raw agent card JSON - planner_passthrough: bool = False + metadata: Optional[Dict[str, Any]] = None # Listener preferences desired_listener_host: Optional[str] = None desired_listener_port: Optional[int] = None @@ -51,6 +50,23 @@ class AgentContext: agent_instance_class: Optional[Type[BaseAgent]] = None agent_task: Optional[asyncio.Task] = None + def _get_metadata_flag(self, key: str) -> Optional[bool]: + """Retrieve a boolean-like flag from stored metadata or card.""" + if isinstance(self.metadata, dict) and key in self.metadata: + return bool(self.metadata[key]) + + return None + + @property + def planner_passthrough(self) -> bool: + flag = self._get_metadata_flag("planner_passthrough") + return bool(flag) if flag is not None else False + + @property + def hidden(self) -> bool: + flag = self._get_metadata_flag("hidden") + return bool(flag) if flag is not None else False + _LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} @@ -179,21 +195,13 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: continue if not agent_card_dict.get("enabled", True): continue - metadata = ( - agent_card_dict.get("metadata") - if isinstance(agent_card_dict.get("metadata"), dict) - else {} - ) - class_spec = ( - metadata.get(AGENT_METADATA_CLASS_KEY) - if isinstance(metadata, dict) - else None + raw_metadata = agent_card_dict.get("metadata") + metadata: Dict[str, Any] = ( + dict(raw_metadata) if isinstance(raw_metadata, dict) else {} ) - agent_instance_class = None - # Detect planner passthrough from raw JSON (top-level or metadata) - passthrough = bool(agent_card_dict.get("planner_passthrough")) - if not passthrough: - passthrough = bool(metadata.get("planner_passthrough")) + class_spec = metadata.get(AGENT_METADATA_CLASS_KEY) + if not isinstance(class_spec, str): + class_spec = None local_agent_card = parse_local_agent_card_dict(agent_card_dict) if not local_agent_card: continue @@ -201,11 +209,8 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: name=agent_name, url=local_agent_card.url, local_agent_card=local_agent_card, - planner_passthrough=passthrough, - agent_instance_class=agent_instance_class, - agent_class_spec=( - class_spec if isinstance(class_spec, str) else None - ), + metadata=metadata or None, + agent_class_spec=class_spec, ) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: logger.warning( @@ -522,10 +527,26 @@ def get_all_agent_cards(self) -> Dict[str, AgentCard]: return agent_cards + def get_planable_agent_cards(self) -> Dict[str, AgentCard]: + """Return AgentCards that are available for planning workflows.""" + self._ensure_remote_contexts_loaded() + planable_cards: Dict[str, AgentCard] = {} + for name, ctx in self._contexts.items(): + if ctx.planner_passthrough or ctx.hidden: + continue + card = None + if ctx.client and ctx.client.agent_card: + card = ctx.client.agent_card + elif ctx.local_agent_card: + card = ctx.local_agent_card + if card: + planable_cards[name] = card + return planable_cards + def is_planner_passthrough(self, agent_name: str) -> bool: """Return True if the named agent is marked as planner passthrough. - The flag is read once from raw JSON on load and cached in AgentContext. + The flag is read from stored metadata associated with the AgentContext. """ self._ensure_remote_contexts_loaded() ctx = self._contexts.get(agent_name) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index f4b1852bb..8fdd8879c 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -376,6 +376,47 @@ async def test_get_all_agent_cards_returns_local_cards(tmp_path: Path): assert all(isinstance(card, AgentCard) for card in all_cards.values()) +def test_agent_context_reads_metadata_flags(tmp_path: Path): + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict("MetaVisible", "http://127.0.0.1:8910", True) + card["metadata"] = {"planner_passthrough": True, "hidden": True} + + _write_card(dir_path / "MetaVisible.json", card) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["MetaVisible"] + assert ctx.metadata == card["metadata"] + assert ctx.planner_passthrough is True + assert ctx.hidden is True + + +def test_get_planable_agent_cards_filters_flags(tmp_path: Path): + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + visible = make_card_dict("Planable", "http://127.0.0.1:8920", True) + hidden = make_card_dict("Hidden", "http://127.0.0.1:8921", True) + passthrough = make_card_dict("Passthrough", "http://127.0.0.1:8922", True) + hidden["metadata"] = {"hidden": True} + passthrough["metadata"] = {"planner_passthrough": True} + + _write_card(dir_path / "Planable.json", visible) + _write_card(dir_path / "Hidden.json", hidden) + _write_card(dir_path / "Passthrough.json", passthrough) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + planable = rc.get_planable_agent_cards() + + assert set(planable.keys()) == {"Planable"} + assert planable["Planable"].name == "Planable" + + @pytest.mark.asyncio async def test_resolve_local_agent_class_from_metadata( tmp_path: Path, monkeypatch: pytest.MonkeyPatch diff --git a/python/valuecell/core/plan/planner.py b/python/valuecell/core/plan/planner.py index eb54d2d81..18654786f 100644 --- a/python/valuecell/core/plan/planner.py +++ b/python/valuecell/core/plan/planner.py @@ -207,14 +207,27 @@ async def _analyze_input_and_create_tasks( "Please configure a valid API key or provider settings and retry." ) - run_response = agent.run( - PlannerInput( - target_agent_name=user_input.target_agent_name, - query=user_input.query, - ), - session_id=conversation_id, - user_id=user_input.meta.user_id, - ) + # Be robust if model attributes are unavailable + try: + model = agent.model + model_description = f"{model.id} (via {model.provider})" + except Exception: + model_description = "unknown model/provider" + try: + run_response = agent.run( + PlannerInput( + target_agent_name=user_input.target_agent_name, + query=user_input.query, + ), + session_id=conversation_id, + user_id=user_input.meta.user_id, + ) + except Exception as exc: + logger.exception("Planner run failed: %s", exc) + return [], ( + f"Planner encountered an error during execution: {exc}. " + f"Please check the capabilities of your model `{model_description}` and try again later." + ) # Handle user input requests through Human-in-the-Loop workflow while run_response.is_paused: @@ -242,12 +255,6 @@ async def _analyze_input_and_create_tasks( # Parse planning result and create tasks plan_raw = run_response.content if not isinstance(plan_raw, PlannerResponse): - # Be robust if model attributes are unavailable - try: - model = agent.model - model_description = f"{model.id} (via {model.provider})" - except Exception: - model_description = "unknown model/provider" return ( [], ( @@ -264,6 +271,32 @@ async def _analyze_input_and_create_tasks( logger.info(f"Planner needs user guidance: {guidance_message}") return [], guidance_message # Return empty task list with guidance + planable_cards = self.agent_connections.get_planable_agent_cards() + planable_agent_names = set(planable_cards.keys()) + invalid_agents = { + t.agent_name + for t in plan_raw.tasks + if t.agent_name not in planable_agent_names + } + if invalid_agents: + available_agents = ( + ", ".join(sorted(planable_agent_names)) + if planable_agent_names + else "none" + ) + invalid_list = ", ".join(sorted(invalid_agents)) + guidance_message = ( + "Planner selected unsupported agent(s):" + f" {invalid_list}." + f" Maybe the chosen model `{model_description}` hallucinated." + ) + logger.warning( + "Planner proposed unsupported agents: %s (available: %s)", + invalid_list, + available_agents, + ) + return [], guidance_message + # Create tasks from planner response tasks = [] for t in plan_raw.tasks: @@ -346,7 +379,7 @@ def tool_get_agent_description(self, agent_name: str) -> str: return "The requested agent could not be found or is not available." def tool_get_enabled_agents(self) -> str: - map_agent_name_to_card = self.agent_connections.get_all_agent_cards() + map_agent_name_to_card = self.agent_connections.get_planable_agent_cards() parts = [] for agent_name, card in map_agent_name_to_card.items(): parts.append(f"<{agent_name}>") diff --git a/python/valuecell/core/plan/tests/test_planner.py b/python/valuecell/core/plan/tests/test_planner.py index 2e53a81b8..15282b3dc 100644 --- a/python/valuecell/core/plan/tests/test_planner.py +++ b/python/valuecell/core/plan/tests/test_planner.py @@ -12,12 +12,20 @@ class StubConnections: - def __init__(self, cards: dict[str, object] | None = None): + def __init__( + self, + cards: dict[str, object] | None = None, + planable: dict[str, object] | None = None, + ): self.cards = cards or {} + self.planable = planable or self.cards def get_all_agent_cards(self) -> dict[str, object]: return self.cards + def get_planable_agent_cards(self) -> dict[str, object]: + return self.planable + def get_agent_card(self, name: str): return self.cards.get(name) @@ -74,7 +82,8 @@ def continue_run(self, *args, **kwargs): ) monkeypatch.setattr(planner_mod, "agent_debug_mode_enabled", lambda: False) - planner = ExecutionPlanner(StubConnections()) + research_card = SimpleNamespace(name="ResearchAgent", description="Research") + planner = ExecutionPlanner(StubConnections({"ResearchAgent": research_card})) user_input = UserInput( query="Need super-agent handoff", @@ -140,6 +149,69 @@ async def callback(request): assert plan.guidance_message +@pytest.mark.asyncio +async def test_create_plan_rejects_non_planable_agents( + monkeypatch: pytest.MonkeyPatch, +): + invalid_plan = PlannerResponse.model_validate( + { + "adequate": True, + "reason": "ok", + "tasks": [ + { + "title": "Run hidden agent", + "query": "Do secret things", + "agent_name": "HiddenAgent", + "pattern": "once", + "schedule_config": None, + } + ], + "guidance_message": None, + } + ) + + class FakeAgent: + def __init__(self, *args, **kwargs): + self.model = SimpleNamespace(id="fake-model", provider="fake-provider") + + def run(self, *args, **kwargs): + return SimpleNamespace( + is_paused=False, + tools_requiring_user_input=[], + tools=[], + content=invalid_plan, + ) + + monkeypatch.setattr(planner_mod, "Agent", FakeAgent) + monkeypatch.setattr( + model_utils_mod, "get_model_for_agent", lambda *args, **kwargs: "stub-model" + ) + monkeypatch.setattr(planner_mod, "agent_debug_mode_enabled", lambda: False) + + allowed_card = SimpleNamespace(name="VisibleAgent", description="Visible") + planner = ExecutionPlanner( + StubConnections( + {"VisibleAgent": allowed_card}, + planable={"VisibleAgent": allowed_card}, + ) + ) + + user_input = UserInput( + query="Use hidden agent", + target_agent_name="VisibleAgent", + meta=UserInputMetadata(conversation_id="conv-3", user_id="user-3"), + ) + + async def callback(_): # pragma: no cover - should not be called + raise AssertionError("callback should not be invoked") + + plan = await planner.create_plan(user_input, callback, "thread-77") + + assert plan.tasks == [] + assert plan.guidance_message + assert "unsupported agent" in plan.guidance_message + + def test_tool_get_enabled_agents_formats_cards(monkeypatch: pytest.MonkeyPatch): # Mock create_model to avoid API key validation in CI monkeypatch.setattr( diff --git a/python/valuecell/core/super_agent/core.py b/python/valuecell/core/super_agent/core.py index ca91bd1cd..b2b7be9c6 100644 --- a/python/valuecell/core/super_agent/core.py +++ b/python/valuecell/core/super_agent/core.py @@ -93,19 +93,29 @@ async def run(self, user_input: UserInput) -> SuperAgentOutcome: reason="SuperAgent unavailable: missing model/provider configuration", ) - response = await agent.arun( - user_input.query, - session_id=user_input.meta.conversation_id, - user_id=user_input.meta.user_id, - add_history_to_context=True, - ) + try: + model = agent.model + model_description = f"{model.id} (via {model.provider})" + except Exception: + model_description = "unknown model/provider" + try: + response = await agent.arun( + user_input.query, + session_id=user_input.meta.conversation_id, + user_id=user_input.meta.user_id, + add_history_to_context=True, + ) + except Exception as e: + return SuperAgentOutcome( + decision=SuperAgentDecision.ANSWER, + reason=( + f"SuperAgent encountered an error: {e}." + f"Please check the capabilities of your model `{model_description}` and try again later." + ), + ) + outcome = response.content if not isinstance(outcome, SuperAgentOutcome): - try: - model = agent.model - model_description = f"{model.id} (via {model.provider})" - except Exception: - model_description = "unknown model/provider" answer_content = ( f"SuperAgent produced a malformed response: `{outcome}`. " f"Please check the capabilities of your model `{model_description}` and try again later."