Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ const ScheduledTaskControllerRenderer: FC<
ScheduledTaskControllerRendererProps
> = ({ content }) => {
const { task_title, task_id, task_status } = parse(content);
const [isRunning, setIsRunning] = useState(task_status !== "cancelled");
const [isRunning, setIsRunning] = useState(task_status === "running");
const { mutateAsync: cancelTask } = useCancelTask();

useEffect(() => {
setIsRunning(task_status !== "cancelled");
setIsRunning(task_status === "running");
}, [task_status]);

const handleCancel = async () => {
Expand Down
2 changes: 1 addition & 1 deletion python/configs/agents/news_agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enabled: true
models:
# Primary model for news analysis, summarization, and query processing
primary:
model_id: "google/gemini-3-pro-preview"
model_id: "qwen/qwen3-max"
provider: "openrouter" # Must explicitly specify provider (not null)

# Provider-specific model mappings for fallback
Expand Down
56 changes: 56 additions & 0 deletions python/valuecell/core/conversation/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from datetime import datetime
from typing import List, Optional

from loguru import logger

from valuecell.core.types import (
ComponentType,
ConversationItem,
ConversationItemEvent,
ResponseMetadata,
Expand Down Expand Up @@ -230,6 +233,59 @@ async def require_user_input(self, conversation_id: str) -> bool:
conversation_id, ConversationStatus.REQUIRE_USER_INPUT
)

async def update_task_component_status(
self,
task_id: str,
status: str,
error_reason: Optional[str] = None,
) -> None:
"""Update persisted scheduled task controller component's status and error reason.

For a given task_id, find the persisted conversation item with
component_type='scheduled_task_controller', update its payload's
task_status field, and set error_reason in metadata if provided.

Args:
task_id: The task identifier to search for.
status: New task status value (e.g., 'failed').
error_reason: Optional error details to store in metadata.
"""
items = await self.item_store.get_items(task_id=task_id)
for item in items:
# Check if this is a scheduled_task_controller component
if not item.payload:
continue
try:
payload_obj = json.loads(item.payload)
if (
payload_obj.get("component_type")
!= ComponentType.SCHEDULED_TASK_CONTROLLER
):
continue
except Exception:
continue

# Update task_status in payload and error_reason in metadata
try:
payload_obj = json.loads(item.payload)
content = payload_obj.get("content") or "{}"
content_obj = json.loads(content)
content_obj["task_status"] = status
payload_obj["content"] = json.dumps(content_obj)
item.payload = json.dumps(payload_obj)

# Update metadata with error reason if provided
metadata_obj = json.loads(item.metadata or "{}")
if error_reason:
metadata_obj["error_reason"] = error_reason
item.metadata = json.dumps(metadata_obj, default=str)

await self.item_store.save_item(item)
except Exception as e:
logger.warning(
f"Failed to update task component status for task {task_id}: {e}"
)

async def get_conversations_by_status(
self,
user_id: str,
Expand Down
19 changes: 19 additions & 0 deletions python/valuecell/core/conversation/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,22 @@ async def get_conversation_items(
limit=limit,
offset=offset,
)

async def update_task_component_status(
self,
task_id: str,
status: str,
error_reason: Optional[str] = None,
) -> None:
"""Update persisted scheduled task component's status and error.

Args:
task_id: The task identifier.
status: New status (e.g., 'failed').
error_reason: Optional error reason for metadata.
"""
return await self._manager.update_task_component_status(
task_id=task_id,
status=status,
error_reason=error_reason,
)
85 changes: 85 additions & 0 deletions python/valuecell/core/conversation/tests/test_conv_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from valuecell.core.conversation.manager import ConversationManager
from valuecell.core.conversation.models import Conversation, ConversationStatus
from valuecell.core.types import ConversationItem, Role, NotifyResponseEvent
from valuecell.core.types import ComponentType


class TestConversationManager:
Expand Down Expand Up @@ -673,3 +674,87 @@ async def test_get_conversations_by_status(self):
manager.conversation_store.list_conversations.assert_called_once_with(
user_id, 20, 0
)

@pytest.mark.asyncio
async def test_update_task_component_status_updates_item(self):
"""update_task_component_status should update payload.content.task_status and metadata"""
manager = ConversationManager()

# Create a conversation item representing a scheduled_task_controller component
content_obj = {"task_id": "task-1", "task_title": "T"}
payload_obj = {
"component_type": ComponentType.SCHEDULED_TASK_CONTROLLER,
"content": json.dumps(content_obj),
}
item = ConversationItem(
item_id="item-1",
role=Role.AGENT,
event=NotifyResponseEvent.MESSAGE,
conversation_id="conv-1",
payload=json.dumps(payload_obj),
metadata="{}",
task_id="task-1",
)

manager.item_store.get_items = AsyncMock(return_value=[item])
manager.item_store.save_item = AsyncMock()

await manager.update_task_component_status("task-1", "failed", "boom")

# save_item called once with updated item
manager.item_store.save_item.assert_awaited_once()
saved_item = manager.item_store.save_item.call_args.args[0]

# payload should be JSON and contain content whose task_status is 'failed'
parsed = json.loads(saved_item.payload)
inner = json.loads(parsed.get("content"))
assert inner.get("task_status") == "failed"

# metadata should include error_reason
meta = json.loads(saved_item.metadata)
assert meta.get("error_reason") == "boom"

@pytest.mark.asyncio
async def test_update_task_component_status_skips_non_matching(self):
"""Non-matching component_type should be ignored."""
manager = ConversationManager()

payload_obj = {"component_type": "other_component", "content": "{}"}
item = ConversationItem(
item_id="item-2",
role=Role.AGENT,
event=NotifyResponseEvent.MESSAGE,
conversation_id="conv-1",
payload=json.dumps(payload_obj),
metadata="{}",
task_id="task-2",
)

manager.item_store.get_items = AsyncMock(return_value=[item])
manager.item_store.save_item = AsyncMock()

await manager.update_task_component_status("task-2", "failed", "boom")

manager.item_store.save_item.assert_not_awaited()

@pytest.mark.asyncio
async def test_update_task_component_status_skips_invalid_payload(self):
"""Invalid JSON payload should be skipped without raising."""
manager = ConversationManager()

item = ConversationItem(
item_id="item-3",
role=Role.AGENT,
event=NotifyResponseEvent.MESSAGE,
conversation_id="conv-1",
payload="not-a-json",
metadata="{}",
task_id="task-3",
)

manager.item_store.get_items = AsyncMock(return_value=[item])
manager.item_store.save_item = AsyncMock()

await manager.update_task_component_status("task-3", "failed", "boom")

manager.item_store.save_item.assert_not_awaited()
4 changes: 1 addition & 3 deletions python/valuecell/core/event/router.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional

from a2a.types import TaskState, TaskStatusUpdateEvent
from a2a.utils import get_message_text
from loguru import logger

from valuecell.core.agent.responses import EventPredicates
from valuecell.core.event.factory import ResponseFactory
Expand All @@ -14,8 +14,6 @@
CommonResponseEvent,
)

logger = logging.getLogger(__name__)


class SideEffectKind(Enum):
"""Kinds of side-effects that routing logic can request.
Expand Down
4 changes: 1 addition & 3 deletions python/valuecell/core/plan/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
"""

import asyncio
import logging
from datetime import datetime
from typing import Callable, List, Optional

from a2a.types import AgentCard
from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from loguru import logger

import valuecell.utils.model as model_utils_mod
from valuecell.core.agent.connect import RemoteConnections
Expand All @@ -33,8 +33,6 @@
PLANNER_INSTRUCTION,
)

logger = logging.getLogger(__name__)


class UserInputRequest:
"""
Expand Down
4 changes: 4 additions & 0 deletions python/valuecell/core/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
from .executor import TaskExecutor
from .manager import TaskManager
from .models import Task, TaskPattern, TaskStatus
from .task_store import InMemoryTaskStore, SQLiteTaskStore, TaskStore

__all__ = [
"Task",
"TaskStatus",
"TaskPattern",
"TaskManager",
"TaskExecutor",
"TaskStore",
"InMemoryTaskStore",
"SQLiteTaskStore",
]
17 changes: 13 additions & 4 deletions python/valuecell/core/task/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from valuecell.core.event.router import RouteResult, SideEffectKind
from valuecell.core.event.service import EventResponseService
from valuecell.core.plan.models import ExecutionPlan
from valuecell.core.task.models import Task
from valuecell.core.task.models import Task, TaskStatus
from valuecell.core.task.service import DEFAULT_EXECUTION_POLL_INTERVAL, TaskService
from valuecell.core.task.temporal import calculate_next_execution_delay
from valuecell.core.types import (
Expand Down Expand Up @@ -173,7 +173,7 @@ async def emit_subagent_end_once() -> Optional[BaseResponse]:

try:
await self._task_service.update_task(task)
async for response in self._execute_task(
async for response in self.execute_task(
task,
thread_id,
metadata,
Expand Down Expand Up @@ -229,7 +229,7 @@ async def _emit_subagent_conversation_component(
)
return await self._event_service.emit(component)

async def _execute_task(
async def execute_task(
self,
task: Task,
thread_id: str,
Expand All @@ -238,6 +238,7 @@ async def _execute_task(
on_before_done: Optional[
Callable[[], Awaitable[Optional[BaseResponse]]]
] = None,
resumed: bool = False,
) -> AsyncGenerator[BaseResponse, None]:
task_id = task.task_id
conversation_id = task.conversation_id
Expand All @@ -256,7 +257,7 @@ async def _execute_task(
},
)

if task.is_scheduled():
if task.is_scheduled() and not resumed:
yield await self._event_service.emit(
self._event_service.factory.schedule_task_controller_component(
conversation_id=conversation_id,
Expand Down Expand Up @@ -297,6 +298,7 @@ async def _execute_task(

await self._sleep_with_cancellation(task, delay)

task = await self._task_service.get_task(task.task_id)
if task.is_finished():
logger.info(f"Task `{task.title}` ({task_id}) is finished.")
break
Expand Down Expand Up @@ -361,6 +363,12 @@ async def _execute_single_task_run(
await self._task_service.fail_task(
task.task_id, side_effect.reason or ""
)
# Sync the failure back to persisted conversation items
await self._conversation_service.manager.update_task_component_status(
task_id=task.task_id,
status=TaskStatus.FAILED.value,
error_reason=side_effect.reason,
)
if route_result.done:
return
continue
Expand All @@ -382,6 +390,7 @@ async def _execute_single_task_run(
async def _sleep_with_cancellation(self, task: Task, delay: float) -> None:
remaining = delay
while remaining > 0:
task = await self._task_service.get_task(task.task_id)
if task.is_finished():
return
sleep_for = min(self._poll_interval, remaining)
Expand Down
9 changes: 8 additions & 1 deletion python/valuecell/core/task/locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import threading
from typing import Optional

from valuecell.utils.db import resolve_db_path

from .manager import TaskManager
from .service import TaskService
from .task_store import SQLiteTaskStore

_task_service: Optional[TaskService] = None
_lock = threading.Lock()
Expand All @@ -29,7 +32,11 @@ def get_task_service() -> TaskService:
if _task_service is None:
with _lock:
if _task_service is None:
_task_service = TaskService(manager=TaskManager())
db_path = resolve_db_path()
task_store = SQLiteTaskStore(db_path)
manager = TaskManager(task_store)

_task_service = TaskService(manager=manager)
return _task_service


Expand Down
Loading