Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
93dfe68
refactor: remove MessageStore and its implementations
vcfgv Sep 23, 2025
87ade9d
refactor: update response event handling and remove unused message model
vcfgv Sep 23, 2025
c2ecdaa
feat: add thread started response handling and update related logic
vcfgv Sep 23, 2025
3e77cac
stash
vcfgv Sep 24, 2025
c6cbbf1
refactor: remove unused callback module for task session management
vcfgv Sep 24, 2025
fecb282
feat: response buffer and SQLite message storage
vcfgv Sep 24, 2025
b27e283
feat: enhance human-in-the-loop message management and user input han…
vcfgv Sep 24, 2025
a87d051
feat: add item_id generation to SaveMessage and ConversationItem for …
vcfgv Sep 24, 2025
c082633
refactor: add item_id and role to UnifiedResponseData and remove subt…
vcfgv Sep 24, 2025
cab4fd4
feat: enhance response buffering with stable item_id and role management
vcfgv Sep 24, 2025
2a48d2e
feat: enhance thread management by including user query in response a…
vcfgv Sep 24, 2025
5085c38
fix: update media type to text/event-stream for streaming responses
vcfgv Sep 24, 2025
8436d3d
Merge remote-tracking branch 'origin/main' into refactor/sqlite-messa…
vcfgv Sep 24, 2025
6bed661
feat: add aiosqlite package with dependencies to uv.lock
vcfgv Sep 24, 2025
fe1ab30
feat: refactor hedge fund streaming to use async methods for improved…
vcfgv Sep 24, 2025
773938d
feat: remove unused artifact update logic and adding message type checks
vcfgv Sep 24, 2025
e03b10c
fix: update task_id generation in message response to use 'ask' inste…
vcfgv Sep 24, 2025
f17d50a
fix lint
vcfgv Sep 24, 2025
4de8105
fix format
vcfgv Sep 24, 2025
83bbe43
update tests
vcfgv Sep 24, 2025
0287fd3
refactor: add session history methods and response conversion functio…
vcfgv Sep 24, 2025
d0f1d69
fix: update default database path from valuecell_core.db to valuecell.db
vcfgv Sep 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"agno[openai]>=1.8.2,<2.0",
"edgartools>=4.12.2",
"sqlalchemy>=2.0.43",
"aiosqlite>=0.19.0",
]

[project.optional-dependencies]
Expand Down
9 changes: 5 additions & 4 deletions python/third_party/ai-hedge-fund/adapter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def stream(
logger.info(
f"Parsing query: {query}. Task ID: {task_id}, Session ID: {session_id}"
)
run_response = self.agno_agent.run(
run_response = await self.agno_agent.arun(
f"Parse the following hedge fund analysis request and extract the parameters: {query}"
)
hedge_fund_request = run_response.content
Expand Down Expand Up @@ -101,7 +101,7 @@ async def stream(
}

logger.info(f"Start analyzing. Task ID: {task_id}, Session ID: {session_id}")
for _, chunk in run_hedge_fund_stream(
async for _, chunk in run_hedge_fund_stream(
tickers=hedge_fund_request.tickers,
start_date=start_date,
end_date=end_date,
Expand All @@ -116,7 +116,7 @@ async def stream(
yield streaming.done()


def run_hedge_fund_stream(
async def run_hedge_fund_stream(
tickers: list[str],
start_date: str,
end_date: str,
Expand Down Expand Up @@ -153,7 +153,8 @@ def run_hedge_fund_stream(
"model_provider": model_provider,
},
}
yield from _agent.stream(inputs, stream_mode=["custom", "messages"])
async for res in _agent.astream(inputs, stream_mode=["custom", "messages"]):
yield res
finally:
# Stop progress tracking
progress.stop()
Expand Down
14 changes: 14 additions & 0 deletions python/third_party/ai-hedge-fund/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions python/valuecell/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
from .agent.decorator import create_wrapped_agent
from .agent.responses import notification, streaming
from .session import (
InMemoryMessageStore,
InMemorySessionStore,
Message,
MessageStore,
Role,
Session,
SessionManager,
SessionStatus,
SessionStore,
)
from .session.message_store import (
InMemoryMessageStore,
MessageStore,
SQLiteMessageStore,
)

Expand Down
66 changes: 19 additions & 47 deletions python/valuecell/core/agent/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
InMemoryTaskStore,
TaskUpdater,
)
from a2a.types import AgentCard, Part, TaskState, TextPart, UnsupportedOperationError
from a2a.types import AgentCard, TaskState, UnsupportedOperationError
from a2a.utils import new_agent_text_message, new_task
from a2a.utils.errors import ServerError
from valuecell.core.agent.card import find_local_agent_card_by_agent_name
from valuecell.core.types import (
BaseAgent,
NotifyResponse,
StreamResponse,
StreamResponseEvent,
CommonResponseEvent,
)
from valuecell.utils import parse_host_port
from .responses import EventPredicates
Expand Down Expand Up @@ -118,34 +118,6 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
task_id = task.id
session_id = task.context_id
updater = TaskUpdater(event_queue, task_id, session_id)
artifact_id = f"artifact-{agent_name}-{session_id}-{task_id}"
chunk_idx = -1

# Local helper to add a chunk
async def _add_chunk(
response: StreamResponse | NotifyResponse, is_complete: bool
):
nonlocal chunk_idx

chunk_idx += 1
if not response.content:
return

parts = [Part(root=TextPart(text=response.content))]
response_event = response.event
metadata = {
"response_event": response_event.value,
"subtask_id": response.subtask_id,
}
if response_event == StreamResponseEvent.COMPONENT_GENERATOR:
metadata["component_type"] = response.metadata.get("component_type")
await updater.add_artifact(
parts=parts,
artifact_id=artifact_id,
append=chunk_idx > 0,
last_chunk=is_complete,
metadata=metadata,
)

# Stream from the user agent and update task incrementally
await updater.update_status(
Expand All @@ -170,44 +142,44 @@ async def _add_chunk(
f"Agent {agent_name} reported failure: {response.content}"
)

is_complete = EventPredicates.is_task_completed(response_event)
metadata = {"response_event": response_event.value}
if EventPredicates.is_tool_call(response_event):
metadata["tool_call_id"] = response.metadata.get("tool_call_id")
metadata["tool_name"] = response.metadata.get("tool_name")
metadata["tool_result"] = response.metadata.get("content")
await updater.update_status(
TaskState.working,
message=new_agent_text_message(response.content or ""),
metadata={
"event": response_event.value,
"tool_call_id": response.metadata.get("tool_call_id"),
"tool_name": response.metadata.get("tool_name"),
"tool_result": response.metadata.get("content"),
"subtask_id": response.subtask_id,
},
metadata=metadata,
)
continue
if EventPredicates.is_reasoning(response_event):
await updater.update_status(
TaskState.working,
message=new_agent_text_message(response.content or ""),
metadata={
"event": response_event.value,
"subtask_id": response.subtask_id,
},
metadata=metadata,
)
continue

await _add_chunk(response, is_complete=is_complete)
if is_complete:
await updater.complete()
break
if not response.content:
continue
if response_event == CommonResponseEvent.COMPONENT_GENERATOR:
metadata["component_type"] = response.metadata.get("component_type")
await updater.update_status(
TaskState.working,
message=new_agent_text_message(response.content or ""),
metadata=metadata,
)

except Exception as e:
message = f"Error during {agent_name} agent execution: {e}"
logger.error(message)
await updater.update_status(
TaskState.failed,
message=new_agent_text_message(message, session_id, task_id),
final=True,
)
finally:
await updater.complete()

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
# Default cancel operation
Expand Down
61 changes: 23 additions & 38 deletions python/valuecell/core/agent/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,40 @@
from typing import Optional

from valuecell.core.types import (
CommonResponseEvent,
NotifyResponse,
NotifyResponseEvent,
StreamResponse,
StreamResponseEvent,
SystemResponseEvent,
TaskStatusEvent,
ToolCallPayload,
_TaskResponseEvent,
)


class _StreamResponseNamespace:
"""Factory methods for streaming responses."""

def message_chunk(
self, content: str, subtask_id: str | None = None
) -> StreamResponse:
def message_chunk(self, content: str) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.MESSAGE_CHUNK,
content=content,
subtask_id=subtask_id,
)

def tool_call_started(
self, tool_call_id: str, tool_name: str, subtask_id: str | None = None
) -> StreamResponse:
def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.TOOL_CALL_STARTED,
metadata=ToolCallPayload(
tool_call_id=tool_call_id,
tool_name=tool_name,
).model_dump(),
subtask_id=subtask_id,
)

def tool_call_completed(
self,
tool_result: str,
tool_call_id: str,
tool_name: str,
subtask_id: str | None = None,
) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
Expand All @@ -51,42 +45,19 @@ def tool_call_completed(
tool_name=tool_name,
tool_result=tool_result,
).model_dump(),
subtask_id=subtask_id,
)

def reasoning_started(self, subtask_id: str | None = None) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING_STARTED,
subtask_id=subtask_id,
)

def reasoning(self, content: str, subtask_id: str | None = None) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING,
content=content,
subtask_id=subtask_id,
)

def reasoning_completed(self, subtask_id: str | None = None) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING_COMPLETED,
subtask_id=subtask_id,
)

def component_generator(
self, content: str, component_type: str, subtask_id: str | None = None
) -> StreamResponse:
def component_generator(self, content: str, component_type: str) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.COMPONENT_GENERATOR,
event=CommonResponseEvent.COMPONENT_GENERATOR,
content=content,
metadata={"component_type": component_type},
subtask_id=subtask_id,
)

def done(self, content: Optional[str] = None) -> StreamResponse:
return StreamResponse(
content=content,
event=_TaskResponseEvent.TASK_COMPLETED,
event=TaskStatusEvent.TASK_COMPLETED,
)

def failed(self, content: Optional[str] = None) -> StreamResponse:
Expand All @@ -108,10 +79,17 @@ def message(self, content: str) -> NotifyResponse:
event=NotifyResponseEvent.MESSAGE,
)

def component_generator(self, content: str, component_type: str) -> StreamResponse:
return StreamResponse(
event=CommonResponseEvent.COMPONENT_GENERATOR,
content=content,
metadata={"component_type": component_type},
)

def done(self, content: Optional[str] = None) -> NotifyResponse:
return NotifyResponse(
content=content,
event=_TaskResponseEvent.TASK_COMPLETED,
event=TaskStatusEvent.TASK_COMPLETED,
)

def failed(self, content: Optional[str] = None) -> NotifyResponse:
Expand All @@ -134,7 +112,7 @@ class EventPredicates:
@staticmethod
def is_task_completed(response_type) -> bool:
return response_type in {
_TaskResponseEvent.TASK_COMPLETED,
TaskStatusEvent.TASK_COMPLETED,
}

@staticmethod
Expand All @@ -158,6 +136,13 @@ def is_reasoning(response_type) -> bool:
StreamResponseEvent.REASONING_COMPLETED,
}

@staticmethod
def is_message(response_type) -> bool:
return response_type in {
StreamResponseEvent.MESSAGE_CHUNK,
NotifyResponseEvent.MESSAGE,
}


__all__ = [
"streaming",
Expand Down
Loading