Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0466d71
feat: add schedule configuration for recurring tasks and related func…
vcfgv Oct 21, 2025
5c6fb39
feat: implement task execution with optional scheduling support
vcfgv Oct 21, 2025
be340fe
fix: change default streaming parameter to True in send_message method
vcfgv Oct 21, 2025
1279356
feat: add guidance message support for inadequate execution plans
vcfgv Oct 21, 2025
71f7f84
refactor: comment out notify metadata configuration for non-ONE task …
vcfgv Oct 21, 2025
7cdae47
feat: enhance ExecutionPlanner with persistent agent configuration an…
vcfgv Oct 21, 2025
8d6ce40
Merge branch 'main' into feature/scheduled-tasks
vcfgv Oct 22, 2025
fb1c806
feat: implement task cancellation support in orchestrator and planner…
vcfgv Oct 22, 2025
9cc7d62
Revert "feat: implement task cancellation support in orchestrator and…
vcfgv Oct 22, 2025
4a4cb25
feat: add task cancellation checks during execution and sleep intervals
vcfgv Oct 22, 2025
7bc644d
feat: add title field to tasks and enhance scheduled task handling in…
vcfgv Oct 22, 2025
dc83345
feat: refactor EventPredicates methods for consistency and add Schedu…
vcfgv Oct 22, 2025
f048097
fix format
vcfgv Oct 22, 2025
9b63dd5
refactor: remove unused scheduled task waiting message code
vcfgv Oct 22, 2025
ca614f1
feat: refactor user input processing to use a background task for res…
vcfgv Oct 22, 2025
a3b898b
refactor: break orchestrator into conversation, plan, response, super…
vcfgv Oct 23, 2025
d0a2d05
refactor: rename ResponseService as EventResponseService
vcfgv Oct 24, 2025
c9c9eed
fix circular imports
vcfgv Oct 24, 2025
d2a6b9b
Merge branch 'main' into feature/scheduled-tasks
vcfgv Oct 24, 2025
1db7f8a
refactor: remove unused get_conversation_history method and update Co…
vcfgv Oct 24, 2025
dfa2747
refactor: enhance CORE_ARCHITECTURE documentation with Super Agent tr…
vcfgv Oct 24, 2025
92b5be9
Merge branch 'main' into feature/scheduled-tasks
vcfgv Oct 24, 2025
457d5a6
refactor: update guidance_message for scheduled task confirmations to…
vcfgv Oct 27, 2025
5c238ec
refactor: add pagination support to get_conversation_items method in …
vcfgv Oct 27, 2025
9fd8323
refactor: add metadata field to conversation items and update related…
vcfgv Oct 27, 2025
ad8c56c
test: add unit tests for thread_id preservation during agent handoff
vcfgv Oct 27, 2025
37f89f6
refactor: rename response_service to event_service for consistency ac…
vcfgv Oct 27, 2025
c4c6eb6
Merge branch 'main' into feature/scheduled-tasks
vcfgv Oct 27, 2025
651f4a3
fix tests
vcfgv Oct 27, 2025
9744e3a
update coverage
vcfgv Oct 27, 2025
bc80eb3
make format
vcfgv Oct 27, 2025
a58b646
add type hint
vcfgv Oct 27, 2025
5264b94
refactor: move tests
vcfgv Oct 28, 2025
c82ec98
refactor: disable Crawl4AI tools temporarily
vcfgv Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 159 additions & 48 deletions docs/CORE_ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,103 @@ This document explains how the modules under `valuecell/core/` collaborate at ru

## Highlights

- Async, re-entrant orchestrator: `process_user_input` is a streaming async entrypoint that can pause for HITL and resume safely.
- Planner with HITL: pauses on missing info/risky steps via `UserInputRequest` (asyncio.Event), resumes after user feedback to produce an adequate plan.
- Streaming pipeline: `Response` → `ResponseBuffer` (buffered vs immediate) → `ResponseRouter` to UI and Store, with stable item IDs for partial aggregation.
- Super Agent triage ahead of planning: a lightweight "Super Agent" analyzes the user input first and either answers directly or hands off an enriched query to the planner.
- Async, re-entrant orchestrator: `process_user_input` streams responses and now runs planning/execution in a background producer so long-running work continues even if the client disconnects.
- Planner with HITL: pauses on missing info/risky steps via `UserInputRequest`, resumes after user feedback to produce an adequate plan.
- Streaming pipeline: A2A status events → `ResponseRouter` (map to BaseResponse) → `ResponseBuffer` (annotate/aggregate) → persisted to Store and streamed to UI, with stable item IDs for partial aggregation.
- Agent2Agent (A2A) integration: tasks call remote agents via `a2a-sdk`; status events drive routing; agents can be wrapped by lightweight decorators/servers.
- Conversation memory: in-memory/SQLite stores enable reproducible history, fast "resume from last", and auditability.
- Robustness: typed errors, side-effects (e.g., fail task) from router, and room for retry/backoff policies where appropriate.

## Services interaction overview

The diagram below focuses on how the orchestrator collaborates with the core services. It reflects the current code structure under `coordinate/`, `super_agent/`, `plan/`, `task/`, `event/`, and `conversation/`.

```mermaid
flowchart LR
subgraph UI[UI / Client]
end

O[Orchestrator]

CS[ConversationService]
SA[SuperAgentService]
PS[PlanService]
TE[TaskExecutor]
RC[RemoteConnections]

subgraph ES[EventResponseService]
ROuter[event → responses]
RBuf[annotate/aggregate]
end

Store[(Conversation Store)]

%% Entry & conversation lifecycle
UI -->|user_input| O
O -->|ensure/load| CS

%% Super Agent triage
O -->|run| SA
SA -- ANSWER --> O
O -->|emit message| ES
ES --> RBuf
RBuf --> Store
O --> UI

SA -- HANDOFF(enriched) --> PS

%% Planner + HITL
O -->|start_planning_task| PS
PS -- UserInputRequest --> O
O -->|require_user_input / activate| CS
PS -- ExecutionPlan --> O

%% Execution
O -->|execute_plan| TE
TE -->|send_message| RC
RC -- TaskStatusUpdateEvent --> TE
TE -->|route_task_status| ES
ES --> ROuter
ROuter --> ES
ES --> RBuf
RBuf --> Store
O -->|stream annotated responses| UI
```

Key points:

- Orchestrator is the hub: it calls SuperAgentService, PlanService, TaskExecutor, and uses ConversationService to manage statuses.
- EventResponseService performs two roles:
- Routing: maps remote task status events to typed BaseResponses via ResponseRouter.
- Buffering & persistence: annotates with stable item IDs via ResponseBuffer and writes to the conversation store.
- Super Agent can short-circuit with a direct answer; otherwise it hands off an enriched query to the planner.

## High-level flow

The orchestration loop ingests a user input, plans next steps, optionally requests human input to resolve ambiguity, and then executes tasks via remote agents (Agent2Agent, A2A). Responses stream back incrementally and are routed to the appropriate sinks (UI, logs, stores).
The orchestration loop ingests a user input, lets the Super Agent triage and possibly answer or enrich the request, then plans next steps (with HITL when needed) and executes tasks via remote agents (A2A). Responses stream back incrementally and are routed to the appropriate sinks (UI, logs, stores).

```mermaid
flowchart TD
U[User Input] --> O[Orchestrator
process_user_input]
O -->|analyze input + context| P[Planner]
U[User Input] --> O[Orchestrator process_user_input]
O --> SA[Super Agent triage]
SA -->|answer directly| SR1[Responses]
SR1 --> RB1[ResponseBuffer]
RB1 --> UI
RB1 --> ST[Store]
SA -->|handoff enriched| P[Planner]
P -->|adequate plan| PL[Plan]
P -->|needs clarification| HITL[HITL: clarification / approval]
HITL --> UI[UI / Operator]
HITL --> UI
UI -->|feedback| P
PL --> T[Tasks]
PL --> T[Task Executor]
T --> A2A[A2A calls]
A2A --> RA[Remote Agents]
RA --> SR[Streamed Responses]
SR --> RB[ResponseBuffer]
RB --> RR[ResponseRouter]
RR --> UI
RR --> ST[Store]
RA --> RR2[ResponseRouter]
RR2 --> SR2[Responses]
SR2 --> RB2[ResponseBuffer]
RB2 --> UI
RB2 --> ST[Store]
```

### Sequence: async and reentrancy
Expand All @@ -41,6 +110,7 @@ sequenceDiagram
autonumber
participant U as User/UI
participant O as Orchestrator
participant SA as Super Agent
participant CS as ConversationStore/ItemStore
participant P as Planner
participant RB as ResponseBuffer
Expand All @@ -50,58 +120,87 @@ sequenceDiagram
participant RA as Remote Agent

U->>O: user_input(query, meta)
O->>CS: load conversation context
O->>CS: ensure/load conversation
CS-->>O: context/items
O->>P: create_plan(user_input, callback)
alt needs clarification
P-->>O: UserInputRequest(prompt)
O-->>U: PLAN_REQUIRE_USER_INPUT(prompt)
U->>O: provide_user_input(response)
O->>P: resume with response
O->>SA: run(user_input)
alt Super Agent answers
SA-->>O: decision=ANSWER, content
O->>RB: annotate/ingest(message)
RB-->>ST: persist SaveItem(s)
O-->>U: stream
O-->>U: done
else Super Agent handoff
SA-->>O: decision=HANDOFF_TO_PLANNER, enriched_query
O->>P: create_plan(enriched_query, callback)
alt needs clarification
P-->>O: UserInputRequest(prompt)
O-->>U: PLAN_REQUIRE_USER_INPUT(prompt)
U->>O: provide_user_input(response)
O->>P: resume with response
end
P-->>O: ExecutionPlan(tasks)
loop each task
O->>A2A: execute(task)
A2A->>RA: request(stream)
RA-->>O: TaskStatusUpdateEvent (streaming)
O->>RR: route(status→responses)
RR-->>O: BaseResponse(s)
O->>RB: annotate/ingest(responses)
RB-->>ST: persist SaveItem(s)
O-->>U: stream to UI
end
O-->>U: done
end
P-->>O: ExecutionPlan(tasks)
loop each task
O->>A2A: execute(task)
A2A->>RA: request(stream)
RA-->>O: TaskStatusUpdateEvent (streaming)
O->>RB: annotate/ingest(resp)
RB-->>O: SaveItem(s)
O->>RR: route(resp)
RR-->>U: stream to UI
RR-->>ST: persist SaveItem(s)
end
O-->>U: done
```

## Orchestrator: process_user_input

The orchestrator entrypoint (conceptually `process_user_input`) receives a user message (plus context IDs) and coordinates the entire lifecycle:
The orchestrator entrypoint (`coordinate/orchestrator.py::AgentOrchestrator.process_user_input`) receives a user message (plus context IDs) and coordinates the entire lifecycle:

1. Delegate to the Planner to derive an actionable plan
2. If the plan needs confirmation or extra parameters, trigger Human-in-the-Loop (HITL)
3. Execute the plan as one or more tasks
1. Delegate to the Super Agent to triage the request: directly answer simple queries or enrich the query and hand off to planning
2. Run the Planner to derive an actionable plan; if the plan needs confirmation or extra parameters, trigger Human-in-the-Loop (HITL)
3. Execute the plan via the Task Executor
4. Stream partial responses while executing
5. Persist results and emit final responses

The orchestrator is async and re-entrant:
The orchestrator is async and re-entrant, and now decouples producers/consumers:

- All I/O boundaries (`await`) are explicit to support concurrency
- A background producer continues planning/execution even if the client disconnects; the async generator simply drains a per-call queue
- If a human confirmation is required, the orchestrator can pause, surface a checkpoint, and resume later when feedback arrives
- Reentrancy is supported by idempotent response buffering and conversation state: resuming continues from the last acknowledged step

### Streaming model

Responses are produced incrementally while tasks execute:

- `Response` represents typed chunks (tokens, tool results, notifications)
- `ResponseBuffer` accumulates and aggregates partials into stable snapshots
- `ResponseRouter` fans out to multiple sinks (UI streams, logs, stores)
- Remote agent status events are first mapped by `ResponseRouter` into typed `Response` objects (message chunks, reasoning, tool results, components)
- `ResponseBuffer` annotates with stable item IDs and aggregates partials, and `EventResponseService` persists them to the conversation store
- The orchestrator streams the annotated responses to the UI; persistence and streaming are decoupled from the client connection

This allows the UI to render partial progress while long-running steps (such as remote agent calls) are still in flight.

## Super Agent: triage before planning

The Super Agent performs a quick, tool-augmented triage of the user input to decide whether it can answer directly or should hand off to the planner.

Responsibilities:

- Detect simple Q&A or retrieval-style requests that can be answered immediately
- Optionally enrich/normalize the query and provide a concise restatement for planning
- Record minimal rationale for auditability

Under the hood:

- `super_agent/core.py` defines the `SuperAgent`, decision schema (`SuperAgentOutcome`) and tool wiring
- `super_agent/prompts.py` contains the instruction and expected output schema
- `super_agent/service.py` exposes a simple façade used by the orchestrator

If the decision is ANSWER, the orchestrator streams the content and returns. If the decision is HANDOFF_TO_PLANNER, the enriched query is passed to the planner.

## Planner: intent → plan (with HITL)

The Planner turns a natural-language user input into an executable plan. Its responsibilities include:
The Planner turns a natural-language user input (often enriched by the Super Agent) into an executable plan. Its responsibilities include:

- Interpreting the user’s goal and available agent capabilities
- Identifying missing parameters and ambiguities
Expand All @@ -115,19 +214,26 @@ Human-in-the-loop is integrated into planning:

Under the hood:

- `planner.py` encapsulates the decision logic
- `planner_prompts.py` centralizes prompt templates (when LLM-based planning is used)
- `coordinate/models.py` defines plan/step data models used by both planner and orchestrator
- `plan/planner.py` encapsulates the decision logic (`ExecutionPlanner` and `UserInputRequest`)
- `plan/prompts.py` centralizes prompt templates (when LLM-based planning is used)
- `plan/models.py` defines plan/step data models, consumed by the orchestrator and executor
- `plan/service.py` manages the planner lifecycle and the pending user-input registry

## Task execution

After planning, the orchestrator executes each task. A task is an atomic unit that typically invokes a remote agent to perform work.
After planning, the Task Executor runs each task. A task is an atomic unit that typically invokes a remote agent to perform work. Scheduled tasks are supported and can re-run according to their schedule; streaming output is accumulated and summarized for schedule results.

Execution characteristics:

- Tasks are awaited asynchronously; independent tasks may run concurrently when safe
- Each task emits structured responses (tool results, logs, progress) as it runs
- Failures are converted into typed errors and can trigger retries or compensating steps (policy-dependent)
- When the Super Agent hands off to a specific sub-agent, start/end components are emitted to mark that sub-agent conversation window

Under the hood:

- `task/executor.py` streams execution, integrates scheduled task accumulation, and routes A2A events through the response service
- `task/service.py` persists and transitions task state; `task/models.py` define the task’s shape

The conversation and item stores record inputs/outputs for reproducibility and auditing.

Expand Down Expand Up @@ -163,28 +269,33 @@ This memory layer underpins reentrancy and auditability.

## Async & reentrancy details

- All external calls (planning, remote agents, storage) are awaited
- All external calls (super-agent triage, planning, remote agents, storage) are awaited
- A background producer runs independently of the client connection; consumers can cancel without stopping execution
- `ResponseBuffer` enables idempotent aggregation of partial output so a resumed session can safely replay or continue
- Orchestrator checkpoints (HITL) are modeled as explicit yield points; upon resumption, the same context IDs lead the flow to continue from the next step
- Execution contexts support validation (user consistency, TTL) and cleanup of expired sessions
- Backpressure: routers can apply flow control when sinks are slow

## Error handling & resilience

Typical edge cases and policies:

- Missing parameters → HITL clarification
- Super Agent errors → surfaced as structured failures; fallback to planner handoff can be policy-defined
- Planner errors → structured failure with user-facing guidance
- Agent timeouts → retry/backoff policies; partial results remain in the buffer
- Transport errors → surfaced via typed exceptions; orchestration may retry or abort
- Invalid or expired execution contexts → cancelled safely with user-facing messages
- Consistency → conversation records ensure inputs/outputs are durable

## Extensibility

- Add a new agent: create a capability card, implement a decorated async handler, register/connect it
- Add a new store: implement the `ItemStore`/`ConversationStore` interfaces
- Add a new transport: integrate a compatible adapter and update the A2A client wiring
- Customize the Super Agent: adjust prompts/decision logic or tools; control when to answer vs handoff
- Customize planning: extend planner prompts/logic and enrich plan models

---

In short, the orchestrator coordinates an async, re-entrant loop of plan → execute → stream, with human checkpoints where appropriate. Tasks talk A2A to remote agents, and the response pipeline keeps users informed in real time while maintaining durable, reproducible state.
In short, the orchestrator coordinates an async, re-entrant loop of triage → plan → execute → stream, with human checkpoints where appropriate. The Super Agent can answer or enrich before planning, tasks talk A2A to remote agents, and the response pipeline keeps users informed in real time while maintaining durable, reproducible state.
2 changes: 1 addition & 1 deletion python/valuecell/core/agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def send_message(
query: str,
conversation_id: str = None,
metadata: dict = None,
streaming: bool = False,
streaming: bool = True,
) -> AsyncIterator[RemoteAgentResponse]:
"""Send a message to the remote agent and return an async iterator.

Expand Down
20 changes: 10 additions & 10 deletions python/valuecell/core/agent/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class EventPredicates:
"""

@staticmethod
def is_task_completed(response_type) -> bool:
def is_task_completed(response_event) -> bool:
"""Check if the response type indicates task completion.

Args:
Expand All @@ -234,12 +234,12 @@ def is_task_completed(response_type) -> bool:
Returns:
True if the event indicates task completion
"""
return response_type in {
return response_event in {
TaskStatusEvent.TASK_COMPLETED,
}

@staticmethod
def is_task_failed(response_type) -> bool:
def is_task_failed(response_event) -> bool:
"""Check if the response type indicates task failure.

Args:
Expand All @@ -248,12 +248,12 @@ def is_task_failed(response_type) -> bool:
Returns:
True if the event indicates task failure
"""
return response_type in {
return response_event in {
TaskStatusEvent.TASK_FAILED,
}

@staticmethod
def is_tool_call(response_type) -> bool:
def is_tool_call(response_event) -> bool:
"""Check if the response type indicates a tool call event.

Args:
Expand All @@ -262,13 +262,13 @@ def is_tool_call(response_type) -> bool:
Returns:
True if the event is related to tool calls
"""
return response_type in {
return response_event in {
StreamResponseEvent.TOOL_CALL_STARTED,
StreamResponseEvent.TOOL_CALL_COMPLETED,
}

@staticmethod
def is_reasoning(response_type) -> bool:
def is_reasoning(response_event) -> bool:
"""Check if the response type indicates a reasoning event.

Args:
Expand All @@ -277,14 +277,14 @@ def is_reasoning(response_type) -> bool:
Returns:
True if the event is related to reasoning
"""
return response_type in {
return response_event in {
StreamResponseEvent.REASONING_STARTED,
StreamResponseEvent.REASONING,
StreamResponseEvent.REASONING_COMPLETED,
}

@staticmethod
def is_message(response_type) -> bool:
def is_message(response_event) -> bool:
"""Check if the response type indicates a message event.

Args:
Expand All @@ -293,7 +293,7 @@ def is_message(response_type) -> bool:
Returns:
True if the event is a message-related event
"""
return response_type in {
return response_event in {
StreamResponseEvent.MESSAGE_CHUNK,
NotifyResponseEvent.MESSAGE,
}
Expand Down
Loading