Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c8c1adb
feat(react_agent): implement react agent functionality with LangGraph…
vcfgv Dec 9, 2025
ae3f873
refactor: simplify tool registration by removing redundant argument s…
vcfgv Dec 9, 2025
5c1bfad
refactor(executor): remove unused summary tool registration and updat…
vcfgv Dec 9, 2025
b21bdb9
refactor(graph): update type hints for state parameters to use AgentS…
vcfgv Dec 9, 2025
4c393ba
refactor(react_agent): update tool_name type to str and add asset_sym…
vcfgv Dec 9, 2025
9cd7f36
refactor(react_agent): enhance planning and execution flow with itera…
vcfgv Dec 9, 2025
801a842
refactor(scheduler): remove redundant scheduler_node function
vcfgv Dec 9, 2025
e076d38
refactor(executor): register new research tools and remove unused too…
vcfgv Dec 10, 2025
8bf1b47
refactor(critic): enhance decision criteria with synthesis phase guid…
vcfgv Dec 10, 2025
f3dd60d
refactor(react_agent): add summarizer node for generating final response
vcfgv Dec 10, 2025
36140d5
refactor(inquirer): enhance decision logic and context handling for u…
vcfgv Dec 10, 2025
c4c1256
refactor(executor): simplify summary generation with result preview f…
vcfgv Dec 10, 2025
19d0ced
refactor(inquirer, planner, state): introduce focus_topic for enhance…
vcfgv Dec 10, 2025
d9d29bd
refactor(inquirer, planner, state): remove focus_topic and enhance st…
vcfgv Dec 10, 2025
748d562
refactor(inquirer): streamline asset merging and enhance system promp…
vcfgv Dec 10, 2025
adcecc7
refactor(research): remove unused crypto search tools from research a…
vcfgv Dec 10, 2025
7c2a736
refactor(state): update messages type to use Annotated with List[Base…
vcfgv Dec 10, 2025
eab8202
refactor(executor, research): reorganize import statements for clarity
vcfgv Dec 10, 2025
9ec343d
refactor(inquirer, planner): enhance execution context handling and i…
vcfgv Dec 10, 2025
48249f6
refactor(summarizer): enhance report generation with langchain-native…
vcfgv Dec 10, 2025
b004919
refactor(executor): enhance progress logging with task_id and tool de…
vcfgv Dec 10, 2025
f0d484b
refactor(inquirer): replace SystemMessage with AIMessage for context …
vcfgv Dec 10, 2025
8dfd659
feat(react-agent): add FastAPI server with SSE support for chat strea…
vcfgv Dec 10, 2025
f20e010
refactor(inquirer, planner, summarizer, state): simplify decision mod…
vcfgv Dec 10, 2025
6fe8882
feat(planner): extract recent conversation context for improved task …
vcfgv Dec 10, 2025
9141503
refactor(react-agent): update decision model to use natural language …
vcfgv Dec 10, 2025
f940dca
make format
vcfgv Dec 10, 2025
1555362
feat(react-agent): implement streaming for React Agent execution with…
vcfgv Dec 11, 2025
905349a
feat(planner): add description to Task instantiation in planner_node …
vcfgv Dec 11, 2025
1decadd
feat(react-agent): enhance logging with task descriptions and update …
vcfgv Dec 11, 2025
f01ed9e
feat(react-agent): add description field to ExecutorResult for better…
vcfgv Dec 11, 2025
0da6c34
fix(inquirer): update conversation formatting to improve readability
vcfgv Dec 11, 2025
92cffcc
make lint
vcfgv Dec 11, 2025
f516b2d
make lint
vcfgv Dec 11, 2025
b7ba4c1
refactor(state): remove unused fields from state class for cleaner code
vcfgv Dec 11, 2025
d674512
fix(summarizer): refine Markdown guideline to emphasize key metrics o…
vcfgv Dec 11, 2025
01b47cb
refactor(executor): enhance task metadata handling and improve tool n…
vcfgv Dec 11, 2025
05d1ad6
refactor(critic): enhance user intent handling and improve execution …
vcfgv Dec 11, 2025
b7d3266
refactor(executor): enhance task result handling and improve executio…
vcfgv Dec 11, 2025
1e8a2ef
refactor(summarizer): streamline execution context handling and enhan…
vcfgv Dec 11, 2025
b12d2fd
refactor(planner): remove unused variables and streamline task planni…
vcfgv Dec 11, 2025
b41bfa4
refactor: add user context and datetime handling across agent nodes
vcfgv Dec 11, 2025
1736d94
add todos
vcfgv Dec 11, 2025
cd508e6
refactor(executor): update tool registration to include AlphaVantage …
vcfgv Dec 11, 2025
5b10829
refactor(orchestrator): simplify task description formatting in plan …
vcfgv Dec 12, 2025
4ad7e9a
feat(react-agent): implement TaskContext for tool runtime and enhance…
vcfgv Dec 12, 2025
6c90c3e
refactor(orchestrator, buffer): update event handling to use reasonin…
vcfgv Dec 12, 2025
443da57
refactor: streamline context usage in market sentiment retrieval and …
vcfgv Dec 12, 2025
09371a6
feat(orchestrator): implement checkpoint handling and message loading…
vcfgv Dec 12, 2025
bd283d6
refactor(inquirer): simplify decision logic and remove RESET status f…
vcfgv Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies = [
"ccxt>=4.5.15",
"baostock>=0.8.9",
"func-timeout>=4.3.5",
"langchain-openai>=1.1.1",
"langgraph>=1.0.4",
]

[project.optional-dependencies]
Expand Down
258 changes: 254 additions & 4 deletions python/uv.lock

Large diffs are not rendered by default.

Empty file.
56 changes: 56 additions & 0 deletions python/valuecell/agents/react_agent/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Task execution context for tool runtime."""

from typing import Optional

from langchain_core.callbacks import adispatch_custom_event
from langchain_core.runnables import RunnableConfig


class TaskContext:
"""Context object passed to tools, encapsulating task metadata and event dispatch.

This context binds a task_id with the LangGraph config, allowing tools to
emit progress events and artifacts without polluting their parameter schemas.

Example:
```python
async def my_tool(symbol: str, context: Optional[TaskContext] = None) -> str:
if context:
await context.emit_progress("Fetching data...")
# ... tool logic ...
return result
```
"""

def __init__(self, task_id: str, config: RunnableConfig):
"""Initialize task context.

Args:
task_id: Unique identifier for the current task
config: LangGraph RunnableConfig for event dispatch
"""
self.task_id = task_id
self._config = config

async def emit_progress(
self,
msg: str,
step: Optional[str] = None,
) -> None:
"""Emit a progress event linked to this specific task.

Args:
msg: Human-readable progress message
percent: Optional completion percentage (0-100)
step: Optional step identifier (e.g., "fetching_income")
"""
if not msg.endswith("\n"):
msg += "\n"

payload = {
"type": "progress",
"task_id": self.task_id,
"msg": msg,
"step": step,
}
await adispatch_custom_event("tool_event", payload, config=self._config)
Empty file.
384 changes: 384 additions & 0 deletions python/valuecell/agents/react_agent/demo/index.html

Large diffs are not rendered by default.

180 changes: 180 additions & 0 deletions python/valuecell/agents/react_agent/demo/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
FastAPI server for React Agent with SSE (Server-Sent Events) streaming.
Fixed for: Pydantic serialization, Router filtering, and Node observability.
"""

from __future__ import annotations

import json
from typing import Any

import uvicorn
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from langchain_core.messages import AIMessage, HumanMessage
from loguru import logger
from pydantic import BaseModel

from valuecell.agents.react_agent.graph import get_app

app = FastAPI(title="React Agent API")

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


class ChatRequest(BaseModel):
message: str
thread_id: str


def format_sse(event_type: str, data: Any) -> str:
"""Format SSE message with proper JSON serialization for Pydantic objects."""
# jsonable_encoder converts Pydantic models to dicts automatically
clean_data = jsonable_encoder(data)
return f"data: {json.dumps({'type': event_type, 'data': clean_data})}\n\n"


async def event_stream_generator(user_input: str, thread_id: str):
"""
Convert LangGraph v2 event stream to frontend UI protocol.
"""
try:
graph = get_app()
inputs = {"messages": [HumanMessage(content=user_input)]}
config = {"configurable": {"thread_id": thread_id}}

logger.info(f"Stream start: {thread_id}")

async for event in graph.astream_events(inputs, config=config, version="v2"):
kind = event.get("event", "")
node = event.get("metadata", {}).get("langgraph_node", "")
data = event.get("data") or {}

# --- Helper: Check if this is a valid node output (not a router string) ---
def is_real_node_output(d):
output = d.get("output")
# Routers return strings like "wait", "plan". Nodes return dicts or Messages.
if isinstance(output, str):
return False
return True

# =================================================================
# 1. OBSERVABILITY EVENTS (Planner, Executor, Critic)
# =================================================================

# PLANNER: Emit the task list
if kind == "on_chain_end" and node == "planner":
if is_real_node_output(data):
output = data.get("output", {})
# Ensure we have a plan
if isinstance(output, dict) and "plan" in output:
yield format_sse(
"planner_update",
{
"plan": output.get("plan"),
"reasoning": output.get("strategy_update"),
},
)

# EXECUTOR: Emit specific task results (text/data)
elif kind == "on_chain_end" and node == "executor":
if is_real_node_output(data):
output = data.get("output", {})
if isinstance(output, dict) and "completed_tasks" in output:
for task_id, res in output["completed_tasks"].items():
# res structure: {'task_id': 't1', 'ok': True, 'result': '...'}
yield format_sse(
"task_result",
{
"task_id": task_id,
"status": "success" if res.get("ok") else "error",
"result": res.get(
"result"
), # This is the markdown text
},
)

# CRITIC: Emit approval/rejection logic
elif kind == "on_chain_end" and node == "critic":
if is_real_node_output(data):
output = data.get("output", {})
if isinstance(output, dict):
summary = output.get("_critic_summary")
if summary:
yield format_sse("critic_decision", summary)

# AGNO/TOOL LOGS: Intermediate progress
elif kind == "on_custom_event" and event.get("name") == "agno_event":
yield format_sse(
"tool_progress", {"node": node or "executor", "details": data}
)

# =================================================================
# 2. CHAT CONTENT EVENTS (Inquirer, Summarizer)
# =================================================================

# STREAMING CONTENT (Summarizer)
if kind == "on_chat_model_stream" and node == "summarizer":
chunk = data.get("chunk")
text = chunk.content if chunk else None
if text:
yield format_sse("content_token", {"delta": text})

# STATIC CONTENT (Inquirer / Fallback)
# Inquirer returns a full AIMessage at the end, not streamed
elif kind == "on_chain_end" and node == "inquirer":
if is_real_node_output(data):
output = data.get("output", {})
msgs = output.get("messages", [])
if msgs and isinstance(msgs, list):
last_msg = msgs[-1]
# Verify it's an AI message meant for the user
if isinstance(last_msg, AIMessage) and last_msg.content:
# Only emit if we haven't streamed this content already
# (Inquirer doesn't stream, so this is safe)
yield format_sse(
"content_token", {"delta": last_msg.content}
)

# =================================================================
# 3. UI STATE EVENTS
# =================================================================

elif kind == "on_chain_start" and node:
yield format_sse("step_change", {"step": node, "status": "started"})

elif kind == "on_chain_end" and node:
# Filter out routers for UI cleanliness
if is_real_node_output(data):
yield format_sse(
"step_change", {"step": node, "status": "completed"}
)

# End of stream
yield format_sse("done", {})
logger.info(f"Stream done: {thread_id}")

except Exception as exc:
logger.exception(f"Stream error: {exc}")
yield format_sse("error", {"message": str(exc)})


@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
event_stream_generator(request.message, request.thread_id),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)


if __name__ == "__main__":
uvicorn.run("server:app", host="0.0.0.0", port=8009)
143 changes: 143 additions & 0 deletions python/valuecell/agents/react_agent/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations

import uuid
from typing import Any

from langchain_core.runnables import RunnableConfig

from .nodes.critic import critic_node
from .nodes.executor import executor_node
from .nodes.inquirer import inquirer_node
from .nodes.planner import planner_node
from .nodes.summarizer import summarizer_node
from .state import AgentState


def _route_after_planner(state: AgentState):
"""Route after planner based on is_final flag.

- If is_final=True: Route to critic for verification.
- If plan has tasks: Route to executor via Send.
- Otherwise: Route to critic as safety fallback.
"""
try:
from langgraph.types import Send # type: ignore
except Exception as exc: # pragma: no cover
raise RuntimeError(
"LangGraph is required for the orchestrator. Install 'langgraph'."
) from exc

is_final = state.get("is_final", False)
plan = state.get("plan") or []

# If planner claims done, verify with critic
if is_final:
return "critic"

# If planner produced tasks, execute them in parallel
if plan:
return [Send("executor", {"task": t}) for t in plan]

# Safety fallback: no tasks and not final -> go to critic
return "critic"


async def _executor_entry(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
"""Entry adapter for executor: expects a `task` injected via Send().

Args:
state: Agent state containing task data
config: RunnableConfig injected by LangGraph
"""
task = state.get("task") or {}
return await executor_node(state, task, config)


def build_app() -> Any:
"""Build and compile the LangGraph StateGraph with memory checkpointer."""
# Local imports to keep module import safe if langgraph isn't installed yet
try:
from langgraph.checkpoint.memory import MemorySaver # type: ignore
from langgraph.graph import END, START, StateGraph # type: ignore
except Exception as exc: # pragma: no cover - import-time guard
raise RuntimeError(
"LangGraph is required for the orchestrator. Install 'langgraph'."
) from exc

graph = StateGraph(AgentState)

graph.add_node("inquirer", inquirer_node)
graph.add_node("planner", planner_node)
graph.add_node("executor", _executor_entry)
graph.add_node("critic", critic_node)
graph.add_node("summarizer", summarizer_node)

graph.add_edge(START, "inquirer")

def _route_after_inquirer(st: AgentState) -> str:
# After refactor: Inquirer now writes `current_intent` (natural language string)
# Route to planner when an intent is present, otherwise wait/end.
return "plan" if st.get("current_intent") else "wait"

graph.add_conditional_edges(
"inquirer", _route_after_inquirer, {"plan": "planner", "wait": END}
)

# After planning, route based on is_final and plan content
graph.add_conditional_edges("planner", _route_after_planner, {"critic": "critic"})

# After executor completion, go back to planner for next iteration
graph.add_edge("executor", "planner")

def _route_after_critic(st: AgentState) -> str:
na = st.get("next_action")
val = getattr(na, "value", na)
v = str(val).lower() if val is not None else "exit"
if v == "replan":
# Clear is_final flag to allow fresh planning cycle
st["is_final"] = False
return "replan"
# Critic approved: route to summarizer for final report
return "summarize"

graph.add_conditional_edges(
"critic", _route_after_critic, {"replan": "planner", "summarize": "summarizer"}
)

# Summarizer generates final report, then END
graph.add_edge("summarizer", END)

memory = MemorySaver()
app = graph.compile(checkpointer=memory)
return app


# Lazy singleton accessor to avoid import-time dependency failures
_APP_SINGLETON: Any | None = None


def get_app() -> Any:
global _APP_SINGLETON
if _APP_SINGLETON is None:
_APP_SINGLETON = build_app()
return _APP_SINGLETON


# Backwards-compat: expose `app` when available, else None until build
app: Any | None = None


async def astream_events(initial_state: dict[str, Any], config: dict | None = None):
"""Stream LangGraph events (v2) from the compiled app.

Usage: async for ev in astream_events(state): ...
"""
application = get_app()
# Ensure checkpointer receives required configurable keys.
cfg: dict = dict(config or {})
cfg.setdefault("thread_id", "main")
cfg.setdefault("checkpoint_ns", "react_agent")
cfg.setdefault("checkpoint_id", str(uuid.uuid4()))

async for ev in application.astream_events(initial_state, config=cfg, version="v2"):
yield ev
Loading
Loading