From 140226763cce5b8216eb3c6d0cfdf1923eb93ea7 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:08:52 +0800 Subject: [PATCH 01/10] feat: implement task management with SQLite and in-memory storage options --- python/valuecell/core/plan/planner.py | 4 +- python/valuecell/core/task/__init__.py | 4 + python/valuecell/core/task/locator.py | 9 +- python/valuecell/core/task/manager.py | 50 ++- python/valuecell/core/task/service.py | 36 +- python/valuecell/core/task/task_store.py | 323 +++++++++++++++++ .../valuecell/core/task/tests/test_manager.py | 56 +-- .../core/task/tests/test_task_store.py | 331 ++++++++++++++++++ python/valuecell/server/db/init_db.py | 57 ++- 9 files changed, 800 insertions(+), 70 deletions(-) create mode 100644 python/valuecell/core/task/task_store.py create mode 100644 python/valuecell/core/task/tests/test_task_store.py diff --git a/python/valuecell/core/plan/planner.py b/python/valuecell/core/plan/planner.py index 18654786f..618434aba 100644 --- a/python/valuecell/core/plan/planner.py +++ b/python/valuecell/core/plan/planner.py @@ -11,8 +11,8 @@ """ import asyncio -import logging from datetime import datetime +from loguru import logger from typing import Callable, List, Optional from a2a.types import AgentCard @@ -33,8 +33,6 @@ PLANNER_INSTRUCTION, ) -logger = logging.getLogger(__name__) - class UserInputRequest: """ diff --git a/python/valuecell/core/task/__init__.py b/python/valuecell/core/task/__init__.py index 829af751e..ea63c6cf3 100644 --- a/python/valuecell/core/task/__init__.py +++ b/python/valuecell/core/task/__init__.py @@ -3,6 +3,7 @@ from .executor import TaskExecutor from .manager import TaskManager from .models import Task, TaskPattern, TaskStatus +from .task_store import InMemoryTaskStore, SQLiteTaskStore, TaskStore __all__ = [ "Task", @@ -10,4 +11,7 @@ "TaskPattern", "TaskManager", "TaskExecutor", + "TaskStore", + "InMemoryTaskStore", + "SQLiteTaskStore", ] diff --git a/python/valuecell/core/task/locator.py b/python/valuecell/core/task/locator.py index 6398c5fdc..dd83bec53 100644 --- a/python/valuecell/core/task/locator.py +++ b/python/valuecell/core/task/locator.py @@ -16,8 +16,11 @@ import threading from typing import Optional +from valuecell.utils.db import resolve_db_path + from .manager import TaskManager from .service import TaskService +from .task_store import SQLiteTaskStore _task_service: Optional[TaskService] = None _lock = threading.Lock() @@ -29,7 +32,11 @@ def get_task_service() -> TaskService: if _task_service is None: with _lock: if _task_service is None: - _task_service = TaskService(manager=TaskManager()) + db_path = resolve_db_path() + task_store = SQLiteTaskStore(db_path) + manager = TaskManager(task_store) + + _task_service = TaskService(manager=manager) return _task_service diff --git a/python/valuecell/core/task/manager.py b/python/valuecell/core/task/manager.py index e2fa4ca75..30a45b1c5 100644 --- a/python/valuecell/core/task/manager.py +++ b/python/valuecell/core/task/manager.py @@ -1,20 +1,21 @@ import asyncio from datetime import datetime -from typing import Dict +from typing import Optional from .models import Task, TaskStatus +from .task_store import InMemoryTaskStore, TaskStore class TaskManager: - """Lightweight in-memory task manager. + """Lightweight task manager with pluggable storage backend. - Simplified to remove pluggable stores. If persistence is needed later, - a thin adapter can wrap these methods. + Defaults to in-memory storage for backward compatibility. Pass a TaskStore + implementation (e.g., SQLiteTaskStore) for persistent storage. """ - def __init__(self): - # In-memory store keyed by task_id - self._tasks: Dict[str, Task] = {} + def __init__(self, store: Optional[TaskStore] = None): + # Use provided store or default to in-memory + self._store = store or InMemoryTaskStore() # Process-local concurrency guard; protects in-memory state self._lock = asyncio.Lock() @@ -25,77 +26,68 @@ async def update_task(self, task: Task) -> None: async with self._lock: # Explicit updates should refresh updated_at task.updated_at = datetime.now() - self._update_task_no_lock(task) - - def _update_task_no_lock(self, task: Task) -> None: - """Write task to store without modifying timestamps. - - Callers must hold `_lock` before invoking this method. - """ - self._tasks[task.task_id] = task + await self._store.save_task(task) # ---- internal helpers ---- - def _get_task(self, task_id: str) -> Task | None: - return self._tasks.get(task_id) + async def _get_task(self, task_id: str) -> Task | None: + return await self._store.load_task(task_id) # Task status management async def start_task(self, task_id: str) -> bool: """Start task execution""" async with self._lock: - task = self._get_task(task_id) + task = await self._get_task(task_id) if not task or task.status != TaskStatus.PENDING: return False task.start() - self._update_task_no_lock(task) + await self._store.save_task(task) return True async def complete_task(self, task_id: str) -> bool: """Complete task""" async with self._lock: - task = self._get_task(task_id) + task = await self._get_task(task_id) if not task or task.is_finished(): return False task.complete() - self._update_task_no_lock(task) + await self._store.save_task(task) return True async def fail_task(self, task_id: str, error_message: str) -> bool: """Mark task as failed""" async with self._lock: - task = self._get_task(task_id) + task = await self._get_task(task_id) if not task or task.is_finished(): return False task.fail(error_message) - self._update_task_no_lock(task) + await self._store.save_task(task) return True async def cancel_task(self, task_id: str) -> bool: """Cancel task""" async with self._lock: - task = self._get_task(task_id) + task = await self._get_task(task_id) if not task or task.is_finished(): return False task.cancel() - self._update_task_no_lock(task) + await self._store.save_task(task) return True # Batch operations async def cancel_conversation_tasks(self, conversation_id: str) -> int: """Cancel all unfinished tasks in a conversation""" async with self._lock: - tasks = [ - t for t in self._tasks.values() if t.conversation_id == conversation_id - ] + tasks = await self._store.list_tasks(conversation_id=conversation_id) cancelled_count = 0 for task in tasks: if not task.is_finished(): task.cancel() - self._update_task_no_lock(task) + await self._store.save_task(task) cancelled_count += 1 return cancelled_count diff --git a/python/valuecell/core/task/service.py b/python/valuecell/core/task/service.py index ec8970c1c..aedef4655 100644 --- a/python/valuecell/core/task/service.py +++ b/python/valuecell/core/task/service.py @@ -2,8 +2,11 @@ from __future__ import annotations +from typing import List, Optional + from valuecell.core.task.manager import TaskManager -from valuecell.core.task.models import Task +from valuecell.core.task.models import Task, TaskStatus +from valuecell.core.task.task_store import TaskStore DEFAULT_EXECUTION_POLL_INTERVAL = 0.1 @@ -11,8 +14,14 @@ class TaskService: """Expose task management independent of the orchestrator.""" - def __init__(self, manager: TaskManager | None = None) -> None: - self._manager = manager or TaskManager() + def __init__( + self, manager: TaskManager | None = None, store: TaskStore | None = None + ) -> None: + # If a store is provided but no manager, create manager with the store + if manager is None and store is not None: + self._manager = TaskManager(store=store) + else: + self._manager = manager or TaskManager() @property def manager(self) -> TaskManager: @@ -35,3 +44,24 @@ async def cancel_task(self, task_id: str) -> bool: async def cancel_conversation_tasks(self, conversation_id: str) -> int: return await self._manager.cancel_conversation_tasks(conversation_id) + + async def get_task(self, task_id: str) -> Optional[Task]: + """Get a task by ID.""" + return await self._manager._get_task(task_id) + + async def list_tasks( + self, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + status: Optional[TaskStatus] = None, + limit: int = 100, + offset: int = 0, + ) -> List[Task]: + """List tasks with optional filters.""" + return await self._manager._store.list_tasks( + conversation_id=conversation_id, + user_id=user_id, + status=status, + limit=limit, + offset=offset, + ) diff --git a/python/valuecell/core/task/task_store.py b/python/valuecell/core/task/task_store.py new file mode 100644 index 000000000..636e5a34d --- /dev/null +++ b/python/valuecell/core/task/task_store.py @@ -0,0 +1,323 @@ +import asyncio +import sqlite3 +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Dict, List, Optional + +import aiosqlite + +from .models import Task, TaskStatus + + +class TaskStore(ABC): + """Task storage abstract base class. + + Implementations should provide async methods to save, load, delete and + list tasks. + """ + + @abstractmethod + async def save_task(self, task: Task) -> None: + """Save task""" + + @abstractmethod + async def load_task(self, task_id: str) -> Optional[Task]: + """Load task""" + + @abstractmethod + async def delete_task(self, task_id: str) -> bool: + """Delete task""" + + @abstractmethod + async def list_tasks( + self, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + status: Optional[TaskStatus] = None, + limit: int = 100, + offset: int = 0, + ) -> List[Task]: + """List tasks with optional filters.""" + + @abstractmethod + async def task_exists(self, task_id: str) -> bool: + """Check if task exists""" + + +class InMemoryTaskStore(TaskStore): + """In-memory TaskStore implementation used for testing and simple scenarios. + + Stores tasks in a dict keyed by task_id. + """ + + def __init__(self): + self._tasks: Dict[str, Task] = {} + + async def save_task(self, task: Task) -> None: + """Save task to memory""" + self._tasks[task.task_id] = task + + async def load_task(self, task_id: str) -> Optional[Task]: + """Load task from memory""" + return self._tasks.get(task_id) + + async def delete_task(self, task_id: str) -> bool: + """Delete task from memory""" + if task_id in self._tasks: + del self._tasks[task_id] + return True + return False + + async def list_tasks( + self, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + status: Optional[TaskStatus] = None, + limit: int = 100, + offset: int = 0, + ) -> List[Task]: + """List tasks with optional filters.""" + tasks = list(self._tasks.values()) + + # Apply filters + if conversation_id is not None: + tasks = [t for t in tasks if t.conversation_id == conversation_id] + if user_id is not None: + tasks = [t for t in tasks if t.user_id == user_id] + if status is not None: + tasks = [t for t in tasks if t.status == status] + + # Sort by creation time descending + tasks.sort(key=lambda t: t.created_at, reverse=True) + + # Apply pagination + start = offset + end = offset + limit + return tasks[start:end] + + async def task_exists(self, task_id: str) -> bool: + """Check if task exists""" + return task_id in self._tasks + + def clear_all(self) -> None: + """Clear all tasks (for testing)""" + self._tasks.clear() + + def get_task_count(self) -> int: + """Get total task count (for debugging)""" + return len(self._tasks) + + +class SQLiteTaskStore(TaskStore): + """SQLite-backed task store using aiosqlite for true async I/O. + + Lazily initializes the database schema on first use. Uses aiosqlite to + perform non-blocking DB operations and converts rows to Task instances. + """ + + def __init__(self, db_path: str): + self.db_path = db_path + self._initialized = False + self._init_lock = None # lazy to avoid loop-binding in __init__ + + async def _ensure_initialized(self): + """Ensure database is initialized with proper schema.""" + if self._initialized: + return + + if self._init_lock is None: + self._init_lock = asyncio.Lock() + + async with self._init_lock: + if self._initialized: + return + + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + """ + CREATE TABLE IF NOT EXISTS tasks ( + task_id TEXT PRIMARY KEY, + title TEXT, + query TEXT NOT NULL, + conversation_id TEXT NOT NULL, + thread_id TEXT NOT NULL, + user_id TEXT NOT NULL, + agent_name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + pattern TEXT NOT NULL DEFAULT 'once', + schedule_config TEXT, + handoff_from_super_agent INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + updated_at TEXT NOT NULL, + error_message TEXT + ) + """ + ) + # Create indexes for common queries + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_tasks_conversation ON tasks(conversation_id)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_tasks_user ON tasks(user_id)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)" + ) + await db.commit() + + self._initialized = True + + @staticmethod + def _row_to_task(row: sqlite3.Row) -> Task: + """Convert database row to Task object.""" + import json + + # Parse JSON fields + schedule_config = None + if row["schedule_config"]: + try: + schedule_config = json.loads(row["schedule_config"]) + except Exception: + pass + + return Task( + task_id=row["task_id"], + title=row["title"] or "", + query=row["query"], + conversation_id=row["conversation_id"], + thread_id=row["thread_id"], + user_id=row["user_id"], + agent_name=row["agent_name"], + status=row["status"], + pattern=row["pattern"], + schedule_config=schedule_config, + handoff_from_super_agent=bool(row["handoff_from_super_agent"]), + created_at=datetime.fromisoformat(row["created_at"]), + started_at=datetime.fromisoformat(row["started_at"]) + if row["started_at"] + else None, + completed_at=datetime.fromisoformat(row["completed_at"]) + if row["completed_at"] + else None, + updated_at=datetime.fromisoformat(row["updated_at"]), + error_message=row["error_message"], + ) + + async def save_task(self, task: Task) -> None: + """Save task to SQLite database.""" + import json + + await self._ensure_initialized() + + # Serialize complex fields + schedule_config_json = None + if task.schedule_config: + schedule_config_json = json.dumps(task.schedule_config.model_dump()) + + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + """ + INSERT OR REPLACE INTO tasks ( + task_id, title, query, conversation_id, thread_id, user_id, agent_name, + status, pattern, schedule_config, handoff_from_super_agent, + created_at, started_at, completed_at, updated_at, error_message + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + task.task_id, + task.title, + task.query, + task.conversation_id, + task.thread_id, + task.user_id, + task.agent_name, + task.status.value + if hasattr(task.status, "value") + else str(task.status), + task.pattern.value + if hasattr(task.pattern, "value") + else str(task.pattern), + schedule_config_json, + int(task.handoff_from_super_agent), + task.created_at.isoformat(), + task.started_at.isoformat() if task.started_at else None, + task.completed_at.isoformat() if task.completed_at else None, + task.updated_at.isoformat(), + task.error_message, + ), + ) + await db.commit() + + async def load_task(self, task_id: str) -> Optional[Task]: + """Load task from SQLite database.""" + await self._ensure_initialized() + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = sqlite3.Row + cur = await db.execute( + "SELECT * FROM tasks WHERE task_id = ?", + (task_id,), + ) + row = await cur.fetchone() + return self._row_to_task(row) if row else None + + async def delete_task(self, task_id: str) -> bool: + """Delete task from SQLite database.""" + await self._ensure_initialized() + async with aiosqlite.connect(self.db_path) as db: + cur = await db.execute( + "DELETE FROM tasks WHERE task_id = ?", + (task_id,), + ) + await db.commit() + return cur.rowcount > 0 + + async def list_tasks( + self, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + status: Optional[TaskStatus] = None, + limit: int = 100, + offset: int = 0, + ) -> List[Task]: + """List tasks from SQLite database with optional filters.""" + await self._ensure_initialized() + + # Build query with filters + query = "SELECT * FROM tasks WHERE 1=1" + params = [] + + if conversation_id is not None: + query += " AND conversation_id = ?" + params.append(conversation_id) + + if user_id is not None: + query += " AND user_id = ?" + params.append(user_id) + + if status is not None: + query += " AND status = ?" + params.append( + status.value if hasattr(status, "value") else str(status) + ) + + query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = sqlite3.Row + cur = await db.execute(query, params) + rows = await cur.fetchall() + return [self._row_to_task(row) for row in rows] + + async def task_exists(self, task_id: str) -> bool: + """Check if task exists in SQLite database.""" + await self._ensure_initialized() + async with aiosqlite.connect(self.db_path) as db: + cur = await db.execute( + "SELECT 1 FROM tasks WHERE task_id = ?", + (task_id,), + ) + row = await cur.fetchone() + return row is not None diff --git a/python/valuecell/core/task/tests/test_manager.py b/python/valuecell/core/task/tests/test_manager.py index 0775c6575..414d393f0 100644 --- a/python/valuecell/core/task/tests/test_manager.py +++ b/python/valuecell/core/task/tests/test_manager.py @@ -14,10 +14,12 @@ class TestTaskManager: """Test TaskManager class.""" - def test_init(self): + @pytest.mark.asyncio + async def test_init(self): """Test TaskManager initialization.""" manager = TaskManager() - assert manager._tasks == {} + tasks = await manager._store.list_tasks() + assert tasks == [] @pytest.mark.asyncio async def test_update_task(self): @@ -38,9 +40,11 @@ async def test_update_task(self): await manager.update_task(task) assert task.updated_at == update_time - assert manager._tasks["test-task-123"] == task + stored_task = await manager._store.load_task("test-task-123") + assert stored_task == task - def test_get_task_existing(self): + @pytest.mark.asyncio + async def test_get_task_existing(self): """Test _get_task with existing task.""" manager = TaskManager() task = Task( @@ -50,16 +54,17 @@ def test_get_task_existing(self): user_id="user-123", agent_name="test-agent", ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) - result = manager._get_task("test-task-123") + result = await manager._get_task("test-task-123") assert result == task - def test_get_task_nonexistent(self): + @pytest.mark.asyncio + async def test_get_task_nonexistent(self): """Test _get_task with nonexistent task.""" manager = TaskManager() - result = manager._get_task("nonexistent-task") + result = await manager._get_task("nonexistent-task") assert result is None @pytest.mark.asyncio @@ -74,7 +79,7 @@ async def test_start_task_success(self): agent_name="test-agent", status=TaskStatus.PENDING, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) with ( patch("valuecell.core.task.models.datetime") as mock_datetime, @@ -109,7 +114,7 @@ async def test_start_task_already_running(self): agent_name="test-agent", status=TaskStatus.RUNNING, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) result = await manager.start_task("test-task-123") assert result is False @@ -126,7 +131,7 @@ async def test_complete_task_success(self): agent_name="test-agent", status=TaskStatus.RUNNING, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) with ( patch("valuecell.core.task.models.datetime") as mock_datetime, @@ -137,6 +142,7 @@ async def test_complete_task_success(self): result = await manager.complete_task("test-task-123") assert result is True + task = await manager._get_task("test-task-123") assert task.status == TaskStatus.COMPLETED assert task.completed_at == complete_time assert task.updated_at == complete_time @@ -161,7 +167,7 @@ async def test_complete_task_already_finished(self): agent_name="test-agent", status=TaskStatus.COMPLETED, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) result = await manager.complete_task("test-task-123") assert result is False @@ -178,7 +184,7 @@ async def test_fail_task_success(self): agent_name="test-agent", status=TaskStatus.RUNNING, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) with ( patch("valuecell.core.task.models.datetime") as mock_datetime, @@ -189,6 +195,7 @@ async def test_fail_task_success(self): result = await manager.fail_task("test-task-123", "Test error") assert result is True + task = await manager._get_task("test-task-123") assert task.status == TaskStatus.FAILED assert task.completed_at == fail_time assert task.updated_at == fail_time @@ -214,7 +221,7 @@ async def test_fail_task_already_finished(self): agent_name="test-agent", status=TaskStatus.FAILED, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) result = await manager.fail_task("test-task-123", "Test error") assert result is False @@ -231,7 +238,7 @@ async def test_cancel_task_success(self): agent_name="test-agent", status=TaskStatus.RUNNING, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) with ( patch("valuecell.core.task.models.datetime") as mock_datetime, @@ -242,6 +249,7 @@ async def test_cancel_task_success(self): result = await manager.cancel_task("test-task-123") assert result is True + task = await manager._get_task("test-task-123") assert task.status == TaskStatus.CANCELLED assert task.completed_at == cancel_time assert task.updated_at == cancel_time @@ -266,7 +274,7 @@ async def test_cancel_task_already_finished(self): agent_name="test-agent", status=TaskStatus.COMPLETED, ) - manager._tasks["test-task-123"] = task + await manager._store.save_task(task) result = await manager.cancel_task("test-task-123") assert result is False @@ -310,12 +318,10 @@ async def test_cancel_conversation_tasks(self): status=TaskStatus.RUNNING, ) - manager._tasks = { - "task-1": task1, - "task-2": task2, - "task-3": task3, - "task-4": task4, - } + await manager._store.save_task(task1) + await manager._store.save_task(task2) + await manager._store.save_task(task3) + await manager._store.save_task(task4) with ( patch("valuecell.core.task.models.datetime") as mock_datetime, @@ -326,6 +332,10 @@ async def test_cancel_conversation_tasks(self): result = await manager.cancel_conversation_tasks("conv-123") assert result == 2 # Two tasks were cancelled + task1 = await manager._get_task("task-1") + task2 = await manager._get_task("task-2") + task3 = await manager._get_task("task-3") + task4 = await manager._get_task("task-4") assert task1.status == TaskStatus.CANCELLED assert task1.completed_at == cancel_time assert task1.updated_at == cancel_time @@ -358,7 +368,7 @@ async def test_cancel_conversation_tasks_all_finished(self): agent_name="test-agent", status=TaskStatus.COMPLETED, ) - manager._tasks["task-1"] = task + await manager._store.save_task(task) result = await manager.cancel_conversation_tasks("conv-123") assert result == 0 diff --git a/python/valuecell/core/task/tests/test_task_store.py b/python/valuecell/core/task/tests/test_task_store.py new file mode 100644 index 000000000..747011591 --- /dev/null +++ b/python/valuecell/core/task/tests/test_task_store.py @@ -0,0 +1,331 @@ +""" +Unit tests for valuecell.core.task.task_store module +""" + +import tempfile +from pathlib import Path + +import pytest + +from valuecell.core.task.models import Task, TaskStatus +from valuecell.core.task.task_store import InMemoryTaskStore, SQLiteTaskStore + + +class TestInMemoryTaskStore: + """Test InMemoryTaskStore class.""" + + @pytest.mark.asyncio + async def test_save_and_load_task(self): + """Test saving and loading a task.""" + store = InMemoryTaskStore() + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + + await store.save_task(task) + loaded_task = await store.load_task("test-task-123") + + assert loaded_task is not None + assert loaded_task.task_id == task.task_id + assert loaded_task.query == task.query + + @pytest.mark.asyncio + async def test_load_nonexistent_task(self): + """Test loading a task that doesn't exist.""" + store = InMemoryTaskStore() + loaded_task = await store.load_task("nonexistent") + + assert loaded_task is None + + @pytest.mark.asyncio + async def test_delete_task(self): + """Test deleting a task.""" + store = InMemoryTaskStore() + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + + await store.save_task(task) + result = await store.delete_task("test-task-123") + + assert result is True + loaded_task = await store.load_task("test-task-123") + assert loaded_task is None + + @pytest.mark.asyncio + async def test_delete_nonexistent_task(self): + """Test deleting a task that doesn't exist.""" + store = InMemoryTaskStore() + result = await store.delete_task("nonexistent") + + assert result is False + + @pytest.mark.asyncio + async def test_list_tasks(self): + """Test listing all tasks.""" + store = InMemoryTaskStore() + task1 = Task( + task_id="task-1", + query="Query 1", + conversation_id="conv-123", + user_id="user-123", + agent_name="agent-1", + ) + task2 = Task( + task_id="task-2", + query="Query 2", + conversation_id="conv-456", + user_id="user-456", + agent_name="agent-2", + ) + + await store.save_task(task1) + await store.save_task(task2) + + tasks = await store.list_tasks() + assert len(tasks) == 2 + + @pytest.mark.asyncio + async def test_list_tasks_by_conversation(self): + """Test listing tasks filtered by conversation_id.""" + store = InMemoryTaskStore() + task1 = Task( + task_id="task-1", + query="Query 1", + conversation_id="conv-123", + user_id="user-123", + agent_name="agent-1", + ) + task2 = Task( + task_id="task-2", + query="Query 2", + conversation_id="conv-456", + user_id="user-123", + agent_name="agent-2", + ) + + await store.save_task(task1) + await store.save_task(task2) + + tasks = await store.list_tasks(conversation_id="conv-123") + assert len(tasks) == 1 + assert tasks[0].conversation_id == "conv-123" + + @pytest.mark.asyncio + async def test_list_tasks_by_status(self): + """Test listing tasks filtered by status.""" + store = InMemoryTaskStore() + task1 = Task( + task_id="task-1", + query="Query 1", + conversation_id="conv-123", + user_id="user-123", + agent_name="agent-1", + status=TaskStatus.RUNNING, + ) + task2 = Task( + task_id="task-2", + query="Query 2", + conversation_id="conv-123", + user_id="user-123", + agent_name="agent-2", + status=TaskStatus.COMPLETED, + ) + + await store.save_task(task1) + await store.save_task(task2) + + tasks = await store.list_tasks(status=TaskStatus.RUNNING) + assert len(tasks) == 1 + assert tasks[0].status == TaskStatus.RUNNING + + @pytest.mark.asyncio + async def test_task_exists(self): + """Test checking if a task exists.""" + store = InMemoryTaskStore() + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + + await store.save_task(task) + + assert await store.task_exists("test-task-123") is True + assert await store.task_exists("nonexistent") is False + + +class TestSQLiteTaskStore: + """Test SQLiteTaskStore class.""" + + @pytest.mark.asyncio + async def test_save_and_load_task(self): + """Test saving and loading a task.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = str(Path(tmpdir) / "test.db") + store = SQLiteTaskStore(db_path) + + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + + await store.save_task(task) + loaded_task = await store.load_task("test-task-123") + + assert loaded_task is not None + assert loaded_task.task_id == task.task_id + assert loaded_task.query == task.query + assert loaded_task.conversation_id == task.conversation_id + + @pytest.mark.asyncio + async def test_load_nonexistent_task(self): + """Test loading a task that doesn't exist.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = str(Path(tmpdir) / "test.db") + store = SQLiteTaskStore(db_path) + + loaded_task = await store.load_task("nonexistent") + assert loaded_task is None + + @pytest.mark.asyncio + async def test_delete_task(self): + """Test deleting a task.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = str(Path(tmpdir) / "test.db") + store = SQLiteTaskStore(db_path) + + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + + await store.save_task(task) + result = await store.delete_task("test-task-123") + + assert result is True + loaded_task = await store.load_task("test-task-123") + assert loaded_task is None + + @pytest.mark.asyncio + async def test_list_tasks_with_filters(self): + """Test listing tasks with various filters.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = str(Path(tmpdir) / "test.db") + store = SQLiteTaskStore(db_path) + + task1 = Task( + task_id="task-1", + query="Query 1", + conversation_id="conv-123", + user_id="user-123", + agent_name="agent-1", + status=TaskStatus.RUNNING, + ) + task2 = Task( + task_id="task-2", + query="Query 2", + conversation_id="conv-456", + user_id="user-123", + agent_name="agent-2", + status=TaskStatus.COMPLETED, + ) + task3 = Task( + task_id="task-3", + query="Query 3", + conversation_id="conv-123", + user_id="user-456", + agent_name="agent-3", + status=TaskStatus.RUNNING, + ) + + await store.save_task(task1) + await store.save_task(task2) + await store.save_task(task3) + + # Test filter by conversation_id + tasks = await store.list_tasks(conversation_id="conv-123") + assert len(tasks) == 2 + + # Test filter by user_id + tasks = await store.list_tasks(user_id="user-123") + assert len(tasks) == 2 + + # Test filter by status + tasks = await store.list_tasks(status=TaskStatus.RUNNING) + assert len(tasks) == 2 + + # Test multiple filters - both task1 and task3 match conv-123 + RUNNING + tasks = await store.list_tasks( + conversation_id="conv-123", status=TaskStatus.RUNNING + ) + assert len(tasks) == 2 + task_ids = {t.task_id for t in tasks} + assert task_ids == {"task-1", "task-3"} + + # Test three filters - only task1 matches all three + tasks = await store.list_tasks( + conversation_id="conv-123", user_id="user-123", status=TaskStatus.RUNNING + ) + assert len(tasks) == 1 + assert tasks[0].task_id == "task-1" + + @pytest.mark.asyncio + async def test_task_exists(self): + """Test checking if a task exists.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = str(Path(tmpdir) / "test.db") + store = SQLiteTaskStore(db_path) + + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + + await store.save_task(task) + + assert await store.task_exists("test-task-123") is True + assert await store.task_exists("nonexistent") is False + + @pytest.mark.asyncio + async def test_persistence_across_instances(self): + """Test that data persists across different store instances.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = str(Path(tmpdir) / "test.db") + + # Create first store instance and save task + store1 = SQLiteTaskStore(db_path) + task = Task( + task_id="test-task-123", + query="Test query", + conversation_id="conv-123", + user_id="user-123", + agent_name="test-agent", + ) + await store1.save_task(task) + + # Create second store instance and verify task exists + store2 = SQLiteTaskStore(db_path) + loaded_task = await store2.load_task("test-task-123") + + assert loaded_task is not None + assert loaded_task.task_id == task.task_id diff --git a/python/valuecell/server/db/init_db.py b/python/valuecell/server/db/init_db.py index b0a9f9f95..b9d59f957 100644 --- a/python/valuecell/server/db/init_db.py +++ b/python/valuecell/server/db/init_db.py @@ -1,11 +1,12 @@ """Database initialization script for ValueCell Server.""" import json -import logging import sys from pathlib import Path from typing import Optional +from loguru import logger + # Smart path handling: try import first, add path only if needed # This allows running the script directly: uv run valuecell/server/db/init_db.py try: @@ -30,12 +31,6 @@ from valuecell.server.services.assets import get_asset_service from valuecell.utils.path import get_agent_card_path -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - class DatabaseInitializer: """Database initialization manager.""" @@ -173,6 +168,50 @@ def create_tables(self) -> bool: ON conversation_items(conversation_id, created_at) """) ) + + # Create tasks table for task management + conn.execute( + text(""" + CREATE TABLE IF NOT EXISTS tasks ( + task_id TEXT PRIMARY KEY, + title TEXT, + query TEXT NOT NULL, + conversation_id TEXT NOT NULL, + thread_id TEXT NOT NULL, + user_id TEXT NOT NULL, + agent_name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + pattern TEXT NOT NULL DEFAULT 'once', + schedule_config TEXT, + handoff_from_super_agent INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + started_at TIMESTAMP, + completed_at TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + error_message TEXT + ) + """) + ) + + # Indexes for common task queries + conn.execute( + text(""" + CREATE INDEX IF NOT EXISTS idx_tasks_conversation + ON tasks(conversation_id) + """) + ) + conn.execute( + text(""" + CREATE INDEX IF NOT EXISTS idx_tasks_user + ON tasks(user_id) + """) + ) + conn.execute( + text(""" + CREATE INDEX IF NOT EXISTS idx_tasks_status + ON tasks(status) + """) + ) conn.commit() @@ -624,13 +663,9 @@ def main(): action="store_true", help="Force re-initialization even if database exists", ) - parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") args = parser.parse_args() - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - logger.info("ValueCell Database Initialization") logger.info("=" * 50) From 4add1c0defc3e0fa1a311e76dc3158f2a0b8dcd9 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:23:41 +0800 Subject: [PATCH 02/10] fix: update cancel_task logic to handle finished tasks correctly --- python/valuecell/core/task/manager.py | 4 +++- python/valuecell/core/task/tests/test_manager.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/valuecell/core/task/manager.py b/python/valuecell/core/task/manager.py index 30a45b1c5..103dd6f6b 100644 --- a/python/valuecell/core/task/manager.py +++ b/python/valuecell/core/task/manager.py @@ -70,8 +70,10 @@ async def cancel_task(self, task_id: str) -> bool: """Cancel task""" async with self._lock: task = await self._get_task(task_id) - if not task or task.is_finished(): + if not task: return False + if task.is_finished(): + return True task.cancel() await self._store.save_task(task) diff --git a/python/valuecell/core/task/tests/test_manager.py b/python/valuecell/core/task/tests/test_manager.py index 414d393f0..4e85cb6df 100644 --- a/python/valuecell/core/task/tests/test_manager.py +++ b/python/valuecell/core/task/tests/test_manager.py @@ -277,7 +277,7 @@ async def test_cancel_task_already_finished(self): await manager._store.save_task(task) result = await manager.cancel_task("test-task-123") - assert result is False + assert result is True @pytest.mark.asyncio async def test_cancel_conversation_tasks(self): From 014d269958511d11bfc3d8bace8d35384c7cdbad Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:49:28 +0800 Subject: [PATCH 03/10] refactor: replace standard logging with loguru for improved logging functionality --- python/valuecell/core/event/router.py | 4 +--- python/valuecell/core/plan/planner.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/valuecell/core/event/router.py b/python/valuecell/core/event/router.py index 776765936..7e8d13c23 100644 --- a/python/valuecell/core/event/router.py +++ b/python/valuecell/core/event/router.py @@ -1,10 +1,10 @@ -import logging from dataclasses import dataclass from enum import Enum from typing import List, Optional from a2a.types import TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text +from loguru import logger from valuecell.core.agent.responses import EventPredicates from valuecell.core.event.factory import ResponseFactory @@ -14,8 +14,6 @@ CommonResponseEvent, ) -logger = logging.getLogger(__name__) - class SideEffectKind(Enum): """Kinds of side-effects that routing logic can request. diff --git a/python/valuecell/core/plan/planner.py b/python/valuecell/core/plan/planner.py index 618434aba..b6eb28cbc 100644 --- a/python/valuecell/core/plan/planner.py +++ b/python/valuecell/core/plan/planner.py @@ -12,12 +12,12 @@ import asyncio from datetime import datetime -from loguru import logger from typing import Callable, List, Optional from a2a.types import AgentCard from agno.agent import Agent from agno.db.in_memory import InMemoryDb +from loguru import logger import valuecell.utils.model as model_utils_mod from valuecell.core.agent.connect import RemoteConnections From 6454db10a8da2235c531775a605446622c719af0 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:50:09 +0800 Subject: [PATCH 04/10] feat: implement auto-resume for recurring tasks and refactor task execution methods --- python/valuecell/core/task/executor.py | 9 ++- python/valuecell/core/task/task_store.py | 4 +- .../core/task/tests/test_executor.py | 2 +- .../core/task/tests/test_task_store.py | 4 +- .../server/api/routers/agent_stream.py | 72 +++++++++++++++++++ python/valuecell/server/db/init_db.py | 2 +- 6 files changed, 84 insertions(+), 9 deletions(-) diff --git a/python/valuecell/core/task/executor.py b/python/valuecell/core/task/executor.py index 59ebea120..7385e3dd6 100644 --- a/python/valuecell/core/task/executor.py +++ b/python/valuecell/core/task/executor.py @@ -173,7 +173,7 @@ async def emit_subagent_end_once() -> Optional[BaseResponse]: try: await self._task_service.update_task(task) - async for response in self._execute_task( + async for response in self.execute_task( task, thread_id, metadata, @@ -229,7 +229,7 @@ async def _emit_subagent_conversation_component( ) return await self._event_service.emit(component) - async def _execute_task( + async def execute_task( self, task: Task, thread_id: str, @@ -238,6 +238,7 @@ async def _execute_task( on_before_done: Optional[ Callable[[], Awaitable[Optional[BaseResponse]]] ] = None, + resumed: bool = False, ) -> AsyncGenerator[BaseResponse, None]: task_id = task.task_id conversation_id = task.conversation_id @@ -256,7 +257,7 @@ async def _execute_task( }, ) - if task.is_scheduled(): + if task.is_scheduled() and not resumed: yield await self._event_service.emit( self._event_service.factory.schedule_task_controller_component( conversation_id=conversation_id, @@ -297,6 +298,7 @@ async def _execute_task( await self._sleep_with_cancellation(task, delay) + task = await self._task_service.get_task(task.task_id) if task.is_finished(): logger.info(f"Task `{task.title}` ({task_id}) is finished.") break @@ -382,6 +384,7 @@ async def _execute_single_task_run( async def _sleep_with_cancellation(self, task: Task, delay: float) -> None: remaining = delay while remaining > 0: + task = await self._task_service.get_task(task.task_id) if task.is_finished(): return sleep_for = min(self._poll_interval, remaining) diff --git a/python/valuecell/core/task/task_store.py b/python/valuecell/core/task/task_store.py index 636e5a34d..3183c3d2b 100644 --- a/python/valuecell/core/task/task_store.py +++ b/python/valuecell/core/task/task_store.py @@ -298,9 +298,7 @@ async def list_tasks( if status is not None: query += " AND status = ?" - params.append( - status.value if hasattr(status, "value") else str(status) - ) + params.append(status.value if hasattr(status, "value") else str(status)) query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) diff --git a/python/valuecell/core/task/tests/test_executor.py b/python/valuecell/core/task/tests/test_executor.py index f0617b1c6..aa3a6f3a0 100644 --- a/python/valuecell/core/task/tests/test_executor.py +++ b/python/valuecell/core/task/tests/test_executor.py @@ -312,7 +312,7 @@ async def on_before_done_cb(): emitted = [ resp - async for resp in executor._execute_task( + async for resp in executor.execute_task( task, thread_id="thread", metadata=None, on_before_done=on_before_done_cb ) ] diff --git a/python/valuecell/core/task/tests/test_task_store.py b/python/valuecell/core/task/tests/test_task_store.py index 747011591..896aaa61d 100644 --- a/python/valuecell/core/task/tests/test_task_store.py +++ b/python/valuecell/core/task/tests/test_task_store.py @@ -281,7 +281,9 @@ async def test_list_tasks_with_filters(self): # Test three filters - only task1 matches all three tasks = await store.list_tasks( - conversation_id="conv-123", user_id="user-123", status=TaskStatus.RUNNING + conversation_id="conv-123", + user_id="user-123", + status=TaskStatus.RUNNING, ) assert len(tasks) == 1 assert tasks[0].task_id == "task-1" diff --git a/python/valuecell/server/api/routers/agent_stream.py b/python/valuecell/server/api/routers/agent_stream.py index 85b9372ca..36765ae81 100644 --- a/python/valuecell/server/api/routers/agent_stream.py +++ b/python/valuecell/server/api/routers/agent_stream.py @@ -2,14 +2,22 @@ Agent stream router for handling streaming agent queries. """ +import asyncio import json from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse +from loguru import logger +# ExecutionPlan is not needed here; we resume individual Task executions +from valuecell.core.task.executor import TaskExecutor +from valuecell.core.task.locator import get_task_service +from valuecell.core.task.models import TaskPattern, TaskStatus from valuecell.server.api.schemas.agent_stream import AgentStreamRequest from valuecell.server.services.agent_stream_service import AgentStreamService +_TASK_AUTORESTART_STARTED = False + def create_agent_stream_router() -> APIRouter: """Create and configure the agent stream router.""" @@ -17,6 +25,13 @@ def create_agent_stream_router() -> APIRouter: router = APIRouter(prefix="/agents", tags=["Agent Stream"]) agent_service = AgentStreamService() + @router.on_event("startup") + async def _startup_resume_recurring_tasks() -> None: + try: + await _auto_resume_recurring_tasks(agent_service) + except Exception: + logger.exception("Failed to schedule recurring task auto-resume") + @router.post("/stream") async def stream_query_agent(request: AgentStreamRequest): """ @@ -50,3 +65,60 @@ async def generate_stream(): raise HTTPException(status_code=500, detail=f"Agent query failed: {str(e)}") return router + + +async def _auto_resume_recurring_tasks(agent_service: AgentStreamService) -> None: + """Resume persisted recurring tasks that were running before shutdown.""" + global _TASK_AUTORESTART_STARTED + if _TASK_AUTORESTART_STARTED: + return + _TASK_AUTORESTART_STARTED = True + + task_service = get_task_service() + try: + running_tasks = await task_service.list_tasks(status=TaskStatus.RUNNING) + except Exception: + logger.exception("Task auto-resume: failed to load tasks from store") + return + + candidates = [ + task for task in running_tasks if task.pattern == TaskPattern.RECURRING + ] + if not candidates: + logger.info("Task auto-resume: no recurring running tasks found") + return + + executor = agent_service.orchestrator.task_executor + + task_service = get_task_service() + for task in candidates: + try: + # Reset to pending and persist so TaskExecutor sees the correct state + task.status = TaskStatus.PENDING + await task_service.update_task(task) + + thread_id = task.thread_id or task.task_id + asyncio.create_task( + _drain_execute_task(executor, task, thread_id, task_service) + ) + logger.info( + "Task auto-resume: scheduled recurring task {} for execution", + task.task_id, + ) + except Exception: + logger.exception( + "Task auto-resume: failed to schedule task {}", task.task_id + ) + + +async def _drain_execute_task( + executor: TaskExecutor, task, thread_id: str, task_service +) -> None: + """Execute a single task via TaskExecutor and discard produced responses.""" + try: + async for _ in executor.execute_task(task, thread_id=thread_id, resumed=True): + pass + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Task auto-resume: execution failed for task {}", task.task_id) diff --git a/python/valuecell/server/db/init_db.py b/python/valuecell/server/db/init_db.py index b9d59f957..14350e0db 100644 --- a/python/valuecell/server/db/init_db.py +++ b/python/valuecell/server/db/init_db.py @@ -168,7 +168,7 @@ def create_tables(self) -> bool: ON conversation_items(conversation_id, created_at) """) ) - + # Create tasks table for task management conn.execute( text(""" From 655af7f0b2cd8d2e659f970a70f8ac2ea8d45b2e Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:55:27 +0800 Subject: [PATCH 05/10] fix: update primary model ID for news analysis in news_agent.yaml --- python/configs/agents/news_agent.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/configs/agents/news_agent.yaml b/python/configs/agents/news_agent.yaml index c9c7d84b7..fac5dc719 100644 --- a/python/configs/agents/news_agent.yaml +++ b/python/configs/agents/news_agent.yaml @@ -18,7 +18,7 @@ enabled: true models: # Primary model for news analysis, summarization, and query processing primary: - model_id: "google/gemini-3-pro-preview" + model_id: "qwen/qwen3-max" provider: "openrouter" # Must explicitly specify provider (not null) # Provider-specific model mappings for fallback From ce34879f8946c87fe378c04860657e91531693b6 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:58:56 +0800 Subject: [PATCH 06/10] fix tests --- python/valuecell/core/task/tests/test_executor.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python/valuecell/core/task/tests/test_executor.py b/python/valuecell/core/task/tests/test_executor.py index aa3a6f3a0..57ec09766 100644 --- a/python/valuecell/core/task/tests/test_executor.py +++ b/python/valuecell/core/task/tests/test_executor.py @@ -205,6 +205,7 @@ async def test_sleep_with_cancellation( class DummyTask: def __init__(self): self.calls = 0 + self.task_id = "task-1" def is_finished(self): self.calls += 1 @@ -218,7 +219,12 @@ async def fake_sleep(duration): monkeypatch.setattr("valuecell.core.task.executor.asyncio.sleep", fake_sleep) - await executor._sleep_with_cancellation(DummyTask(), delay=0.2) + # Ensure the TaskService.get_task returns the dummy so the executor can + # refresh the task state during the cancellable sleep loop. + dummy = DummyTask() + task_service.get_task = AsyncMock(return_value=dummy) + + await executor._sleep_with_cancellation(dummy, delay=0.2) assert sleeps @@ -227,7 +233,7 @@ async def fake_sleep(duration): async def test_execute_plan_emits_end_once_when_on_before_done_used( monkeypatch: pytest.MonkeyPatch, task_service: TaskService ): - """If _execute_task emits END via on_before_done, execute_plan should not duplicate it in finally.""" + """If execute_task emits END via on_before_done, execute_plan should not duplicate it in finally.""" event_service = StubEventService() executor = TaskExecutor( agent_connections=SimpleNamespace(), @@ -236,7 +242,7 @@ async def test_execute_plan_emits_end_once_when_on_before_done_used( conversation_service=StubConversationService(), ) - # Patch _execute_task to invoke on_before_done and yield its response + # Patch execute_task to invoke on_before_done and yield its response async def fake_execute_task(task, thread_id, metadata, on_before_done=None): if on_before_done is not None: maybe = await on_before_done() @@ -244,7 +250,7 @@ async def fake_execute_task(task, thread_id, metadata, on_before_done=None): yield maybe return - monkeypatch.setattr(executor, "_execute_task", fake_execute_task) + monkeypatch.setattr(executor, "execute_task", fake_execute_task) # Create a plan with a single subagent handoff task task = _make_task(handoff_from_super_agent=True) From e48e1e939d593381f228b45717062a98e7377ad1 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:14:39 +0800 Subject: [PATCH 07/10] feat: add update_task_component_status method to manage task status updates in conversation components --- .../scheduled-task-controller-renderer.tsx | 4 +- python/valuecell/core/conversation/manager.py | 56 +++++++++++++++++++ python/valuecell/core/conversation/service.py | 19 +++++++ python/valuecell/core/task/executor.py | 8 ++- 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/frontend/src/components/valuecell/renderer/scheduled-task-controller-renderer.tsx b/frontend/src/components/valuecell/renderer/scheduled-task-controller-renderer.tsx index 5fbe56e50..6a5d956b1 100644 --- a/frontend/src/components/valuecell/renderer/scheduled-task-controller-renderer.tsx +++ b/frontend/src/components/valuecell/renderer/scheduled-task-controller-renderer.tsx @@ -10,11 +10,11 @@ const ScheduledTaskControllerRenderer: FC< ScheduledTaskControllerRendererProps > = ({ content }) => { const { task_title, task_id, task_status } = parse(content); - const [isRunning, setIsRunning] = useState(task_status !== "cancelled"); + const [isRunning, setIsRunning] = useState(task_status === "running"); const { mutateAsync: cancelTask } = useCancelTask(); useEffect(() => { - setIsRunning(task_status !== "cancelled"); + setIsRunning(task_status === "running"); }, [task_status]); const handleCancel = async () => { diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index 4129bfaa6..bf5a43b5a 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -2,7 +2,10 @@ from datetime import datetime from typing import List, Optional +from loguru import logger + from valuecell.core.types import ( + ComponentType, ConversationItem, ConversationItemEvent, ResponseMetadata, @@ -230,6 +233,59 @@ async def require_user_input(self, conversation_id: str) -> bool: conversation_id, ConversationStatus.REQUIRE_USER_INPUT ) + async def update_task_component_status( + self, + task_id: str, + status: str, + error_reason: Optional[str] = None, + ) -> None: + """Update persisted scheduled task controller component's status and error reason. + + For a given task_id, find the persisted conversation item with + component_type='scheduled_task_controller', update its payload's + task_status field, and set error_reason in metadata if provided. + + Args: + task_id: The task identifier to search for. + status: New task status value (e.g., 'failed'). + error_reason: Optional error details to store in metadata. + """ + items = await self.item_store.get_items(task_id=task_id) + for item in items: + # Check if this is a scheduled_task_controller component + if not item.payload: + continue + try: + payload_obj = json.loads(item.payload) + if ( + payload_obj.get("component_type") + != ComponentType.SCHEDULED_TASK_CONTROLLER + ): + continue + except Exception: + continue + + # Update task_status in payload and error_reason in metadata + try: + payload_obj = json.loads(item.payload) + content = payload_obj.get("content") or "{}" + content_obj = json.loads(content) + content_obj["task_status"] = status + payload_obj["content"] = json.dumps(content_obj) + item.payload = json.dumps(payload_obj) + + # Update metadata with error reason if provided + metadata_obj = json.loads(item.metadata or "{}") + if error_reason: + metadata_obj["error_reason"] = error_reason + item.metadata = json.dumps(metadata_obj, default=str) + + await self.item_store.save_item(item) + except Exception as e: + logger.warning( + f"Failed to update task component status for task {task_id}: {e}" + ) + async def get_conversations_by_status( self, user_id: str, diff --git a/python/valuecell/core/conversation/service.py b/python/valuecell/core/conversation/service.py index 4ed06eaba..98682b6c1 100644 --- a/python/valuecell/core/conversation/service.py +++ b/python/valuecell/core/conversation/service.py @@ -139,3 +139,22 @@ async def get_conversation_items( limit=limit, offset=offset, ) + + async def update_task_component_status( + self, + task_id: str, + status: str, + error_reason: Optional[str] = None, + ) -> None: + """Update persisted scheduled task component's status and error. + + Args: + task_id: The task identifier. + status: New status (e.g., 'failed'). + error_reason: Optional error reason for metadata. + """ + return await self._manager.update_task_component_status( + task_id=task_id, + status=status, + error_reason=error_reason, + ) diff --git a/python/valuecell/core/task/executor.py b/python/valuecell/core/task/executor.py index 7385e3dd6..0aa32e339 100644 --- a/python/valuecell/core/task/executor.py +++ b/python/valuecell/core/task/executor.py @@ -21,7 +21,7 @@ from valuecell.core.event.router import RouteResult, SideEffectKind from valuecell.core.event.service import EventResponseService from valuecell.core.plan.models import ExecutionPlan -from valuecell.core.task.models import Task +from valuecell.core.task.models import Task, TaskStatus from valuecell.core.task.service import DEFAULT_EXECUTION_POLL_INTERVAL, TaskService from valuecell.core.task.temporal import calculate_next_execution_delay from valuecell.core.types import ( @@ -363,6 +363,12 @@ async def _execute_single_task_run( await self._task_service.fail_task( task.task_id, side_effect.reason or "" ) + # Sync the failure back to persisted conversation items + await self._conversation_service.manager.update_task_component_status( + task_id=task.task_id, + status=TaskStatus.FAILED.value, + error_reason=side_effect.reason, + ) if route_result.done: return continue From 4e95235f710fc5032263ca3604c68888e1a4fd63 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:24:10 +0800 Subject: [PATCH 08/10] add tests --- .../conversation/tests/test_conv_manager.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index a278c816c..d60a124c5 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -11,6 +11,7 @@ from valuecell.core.conversation.manager import ConversationManager from valuecell.core.conversation.models import Conversation, ConversationStatus from valuecell.core.types import ConversationItem, Role, NotifyResponseEvent +from valuecell.core.types import ComponentType class TestConversationManager: @@ -673,3 +674,87 @@ async def test_get_conversations_by_status(self): manager.conversation_store.list_conversations.assert_called_once_with( user_id, 20, 0 ) + + @pytest.mark.asyncio + async def test_update_task_component_status_updates_item(self): + """update_task_component_status should update payload.content.task_status and metadata""" + manager = ConversationManager() + + # Create a conversation item representing a scheduled_task_controller component + content_obj = {"task_id": "task-1", "task_title": "T"} + payload_obj = { + "component_type": ComponentType.SCHEDULED_TASK_CONTROLLER, + "content": json.dumps(content_obj), + } + item = ConversationItem( + item_id="item-1", + role=Role.AGENT, + event=NotifyResponseEvent.MESSAGE, + conversation_id="conv-1", + payload=json.dumps(payload_obj), + metadata="{}", + task_id="task-1", + ) + + manager.item_store.get_items = AsyncMock(return_value=[item]) + manager.item_store.save_item = AsyncMock() + + await manager.update_task_component_status("task-1", "failed", "boom") + + # save_item called once with updated item + manager.item_store.save_item.assert_awaited_once() + saved_item = manager.item_store.save_item.call_args.args[0] + + # payload should be JSON and contain content whose task_status is 'failed' + parsed = json.loads(saved_item.payload) + inner = json.loads(parsed.get("content")) + assert inner.get("task_status") == "failed" + + # metadata should include error_reason + meta = json.loads(saved_item.metadata) + assert meta.get("error_reason") == "boom" + + @pytest.mark.asyncio + async def test_update_task_component_status_skips_non_matching(self): + """Non-matching component_type should be ignored.""" + manager = ConversationManager() + + payload_obj = {"component_type": "other_component", "content": "{}"} + item = ConversationItem( + item_id="item-2", + role=Role.AGENT, + event=NotifyResponseEvent.MESSAGE, + conversation_id="conv-1", + payload=json.dumps(payload_obj), + metadata="{}", + task_id="task-2", + ) + + manager.item_store.get_items = AsyncMock(return_value=[item]) + manager.item_store.save_item = AsyncMock() + + await manager.update_task_component_status("task-2", "failed", "boom") + + manager.item_store.save_item.assert_not_awaited() + + @pytest.mark.asyncio + async def test_update_task_component_status_skips_invalid_payload(self): + """Invalid JSON payload should be skipped without raising.""" + manager = ConversationManager() + + item = ConversationItem( + item_id="item-3", + role=Role.AGENT, + event=NotifyResponseEvent.MESSAGE, + conversation_id="conv-1", + payload="not-a-json", + metadata="{}", + task_id="task-3", + ) + + manager.item_store.get_items = AsyncMock(return_value=[item]) + manager.item_store.save_item = AsyncMock() + + await manager.update_task_component_status("task-3", "failed", "boom") + + manager.item_store.save_item.assert_not_awaited() From b4c5cb4f6142a7367018b948470fe0b259b883bc Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:47:18 +0800 Subject: [PATCH 09/10] refactor: move _auto_resume_recurring_tasks to services --- .../server/api/routers/agent_stream.py | 69 ++----------------- .../server/services/agent_stream_service.py | 64 +++++++++++++++++ 2 files changed, 68 insertions(+), 65 deletions(-) diff --git a/python/valuecell/server/api/routers/agent_stream.py b/python/valuecell/server/api/routers/agent_stream.py index 36765ae81..a36a314f9 100644 --- a/python/valuecell/server/api/routers/agent_stream.py +++ b/python/valuecell/server/api/routers/agent_stream.py @@ -2,21 +2,17 @@ Agent stream router for handling streaming agent queries. """ -import asyncio import json from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from loguru import logger -# ExecutionPlan is not needed here; we resume individual Task executions -from valuecell.core.task.executor import TaskExecutor -from valuecell.core.task.locator import get_task_service -from valuecell.core.task.models import TaskPattern, TaskStatus from valuecell.server.api.schemas.agent_stream import AgentStreamRequest -from valuecell.server.services.agent_stream_service import AgentStreamService - -_TASK_AUTORESTART_STARTED = False +from valuecell.server.services.agent_stream_service import ( + AgentStreamService, + _auto_resume_recurring_tasks, +) def create_agent_stream_router() -> APIRouter: @@ -65,60 +61,3 @@ async def generate_stream(): raise HTTPException(status_code=500, detail=f"Agent query failed: {str(e)}") return router - - -async def _auto_resume_recurring_tasks(agent_service: AgentStreamService) -> None: - """Resume persisted recurring tasks that were running before shutdown.""" - global _TASK_AUTORESTART_STARTED - if _TASK_AUTORESTART_STARTED: - return - _TASK_AUTORESTART_STARTED = True - - task_service = get_task_service() - try: - running_tasks = await task_service.list_tasks(status=TaskStatus.RUNNING) - except Exception: - logger.exception("Task auto-resume: failed to load tasks from store") - return - - candidates = [ - task for task in running_tasks if task.pattern == TaskPattern.RECURRING - ] - if not candidates: - logger.info("Task auto-resume: no recurring running tasks found") - return - - executor = agent_service.orchestrator.task_executor - - task_service = get_task_service() - for task in candidates: - try: - # Reset to pending and persist so TaskExecutor sees the correct state - task.status = TaskStatus.PENDING - await task_service.update_task(task) - - thread_id = task.thread_id or task.task_id - asyncio.create_task( - _drain_execute_task(executor, task, thread_id, task_service) - ) - logger.info( - "Task auto-resume: scheduled recurring task {} for execution", - task.task_id, - ) - except Exception: - logger.exception( - "Task auto-resume: failed to schedule task {}", task.task_id - ) - - -async def _drain_execute_task( - executor: TaskExecutor, task, thread_id: str, task_service -) -> None: - """Execute a single task via TaskExecutor and discard produced responses.""" - try: - async for _ in executor.execute_task(task, thread_id=thread_id, resumed=True): - pass - except asyncio.CancelledError: - raise - except Exception: - logger.exception("Task auto-resume: execution failed for task {}", task.task_id) diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 2e2e3976c..bf77c8307 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -2,14 +2,21 @@ Agent stream service for handling streaming agent interactions. """ +import asyncio from typing import AsyncGenerator, Optional from loguru import logger from valuecell.core.coordinate.orchestrator import AgentOrchestrator + +from valuecell.core.task.executor import TaskExecutor +from valuecell.core.task.locator import get_task_service +from valuecell.core.task.models import TaskPattern, TaskStatus from valuecell.core.types import UserInput, UserInputMetadata from valuecell.utils.uuid import generate_conversation_id +_TASK_AUTORESTART_STARTED = False + class AgentStreamService: """Service for handling streaming agent queries.""" @@ -61,3 +68,60 @@ async def stream_query_agent( except Exception as e: logger.error(f"Error in stream_query_agent: {str(e)}") yield f"Error processing query: {str(e)}" + + +async def _auto_resume_recurring_tasks(agent_service: AgentStreamService) -> None: + """Resume persisted recurring tasks that were running before shutdown.""" + global _TASK_AUTORESTART_STARTED + if _TASK_AUTORESTART_STARTED: + return + _TASK_AUTORESTART_STARTED = True + + task_service = get_task_service() + try: + running_tasks = await task_service.list_tasks(status=TaskStatus.RUNNING) + except Exception: + logger.exception("Task auto-resume: failed to load tasks from store") + return + + candidates = [ + task for task in running_tasks if task.pattern == TaskPattern.RECURRING + ] + if not candidates: + logger.info("Task auto-resume: no recurring running tasks found") + return + + executor = agent_service.orchestrator.task_executor + + task_service = get_task_service() + for task in candidates: + try: + # Reset to pending and persist so TaskExecutor sees the correct state + task.status = TaskStatus.PENDING + await task_service.update_task(task) + + thread_id = task.thread_id or task.task_id + asyncio.create_task( + _drain_execute_task(executor, task, thread_id, task_service) + ) + logger.info( + "Task auto-resume: scheduled recurring task {} for execution", + task.task_id, + ) + except Exception: + logger.exception( + "Task auto-resume: failed to schedule task {}", task.task_id + ) + + +async def _drain_execute_task( + executor: TaskExecutor, task, thread_id: str, task_service +) -> None: + """Execute a single task via TaskExecutor and discard produced responses.""" + try: + async for _ in executor.execute_task(task, thread_id=thread_id, resumed=True): + pass + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Task auto-resume: execution failed for task {}", task.task_id) From 174d557e3ac137555ba0735fdc0bafd87404110b Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:52:45 +0800 Subject: [PATCH 10/10] fix format --- python/valuecell/server/services/agent_stream_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index bf77c8307..59ce9787e 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -8,7 +8,6 @@ from loguru import logger from valuecell.core.coordinate.orchestrator import AgentOrchestrator - from valuecell.core.task.executor import TaskExecutor from valuecell.core.task.locator import get_task_service from valuecell.core.task.models import TaskPattern, TaskStatus