From 26d16fe3979fb2f61a821c315ef96797aa5c4cf7 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Mon, 22 Sep 2025 11:49:20 +0800 Subject: [PATCH 1/3] adapt streaming response for sec agent --- python/valuecell/agents/sec_agent.py | 85 +++++++++++++--------------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py index 826ca2b4a..35fd067b5 100644 --- a/python/valuecell/agents/sec_agent.py +++ b/python/valuecell/agents/sec_agent.py @@ -4,14 +4,14 @@ import os from datetime import datetime from enum import Enum -from typing import Dict, Iterator +from typing import Dict, Iterator, AsyncGenerator from agno.agent import Agent, RunResponse, RunResponseEvent # noqa from agno.models.openrouter import OpenRouter from edgar import Company, set_identity from pydantic import BaseModel, Field, field_validator -from valuecell.core.types import BaseAgent +from valuecell.core.types import BaseAgent, StreamResponse, streaming from valuecell.core.agent.decorator import create_wrapped_agent # Configure logging @@ -299,10 +299,9 @@ async def _process_financial_data_query( continue if not all_filings_data: - yield { - "content": f"❌ **Insufficient Data**: No financial filings found for company '{ticker}'.", - "is_task_complete": True, - } + yield streaming.failed( + f"**Insufficient Data**: No financial filings found for company '{ticker}'." + ) return # Generate financial data analysis report @@ -336,10 +335,10 @@ async def _process_financial_data_query( ## Output requirements: Please output analysis results in a clear structure, including: - - 📊 **Key Findings**: 3-5 important insights - - 📈 **Financial Highlights**: Important financial data and trends - - 🎯 **Investment Reference**: Reference value for investors - - ⚠️ **Risk Alerts**: Risk points that need attention + - **Key Findings**: 3-5 important insights + - **Financial Highlights**: Important financial data and trends + - **Investment Reference**: Reference value for investors + - **Risk Alerts**: Risk points that need attention Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation. """ @@ -349,27 +348,22 @@ async def _process_financial_data_query( ) for event in response_stream: if event.event == "RunResponseContent": - yield { - "content": event.content, - "is_task_complete": False, - } + yield streaming.message_chunk(event.content) elif event.event == "ToolCallStarted": - print(f"Tool call started: {event.tool}") + yield streaming.tool_call_started( + event.tool.tool_call_id, event.tool.tool_name + ) + elif event.event == "ToolCallCompleted": + yield streaming.tool_call_completed( + event.tool.result, event.tool.tool_call_id, event.tool.tool_name + ) elif event.event == "ReasoningStep": - print(f"Reasoning step: {event.content}") + yield streaming.reasoning(event.reasoning_content) logger.info("Financial data analysis completed") - yield { - "content": "", - "is_task_complete": True, - } - + yield streaming.done() except Exception as e: - logger.error(f"Financial data query failed: {e}") - yield { - "content": f"❌ **Financial Data Query Error**: Unable to retrieve financial data for company '{ticker}'.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed(f"Financial data query failed: {e}") async def _process_fund_holdings_query( self, ticker: str, session_id: str, task_id: str @@ -438,10 +432,10 @@ async def _process_fund_holdings_query( ## Output requirements: Please output analysis results in a clear structure, including: - - 📊 **Key Findings**: 3-5 important insights - - 📈 **Important Data**: Specific change data and percentages - - 🎯 **Investment Insights**: Reference value for investors - - ⚠️ **Risk Alerts**: Risk points that need attention + - **Key Findings**: 3-5 important insights + - **Important Data**: Specific change data and percentages + - **Investment Insights**: Reference value for investors + - **Risk Alerts**: Risk points that need attention Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation. """ @@ -451,30 +445,27 @@ async def _process_fund_holdings_query( ) for event in response_stream: if event.event == "RunResponseContent": - yield { - "content": event.content, - "is_task_complete": False, - } + yield streaming.message_chunk(event.content) elif event.event == "ToolCallStarted": - print(f"Tool call started: {event.tool}") + yield streaming.tool_call_started( + event.tool.tool_call_id, event.tool.tool_name + ) + elif event.event == "ToolCallCompleted": + yield streaming.tool_call_completed( + event.tool.result, event.tool.tool_call_id, event.tool.tool_name + ) elif event.event == "ReasoningStep": - print(f"Reasoning step: {event.content}") + yield streaming.reasoning(event.reasoning_content) logger.info("Financial data analysis completed") - yield { - "content": "", - "is_task_complete": True, - } + streaming.done() logger.info("13F analysis completed") - except Exception as e: - logger.error(f"13F query failed: {e}") - yield { - "content": f"❌ **13F Query Error**: Unable to retrieve 13F data for company '{ticker}'.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed(f"13F query failed: {e}") - async def stream(self, query: str, session_id: str, task_id: str): + async def stream( + self, query: str, session_id: str, task_id: str + ) -> AsyncGenerator[StreamResponse, None]: """ Main streaming method with intelligent routing support """ From c5d7440ce33781288e17edc3ca08001bf31fe204 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Mon, 22 Sep 2025 12:01:11 +0800 Subject: [PATCH 2/3] lint --- python/valuecell/agents/sec_agent.py | 33 +++++++++++----------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py index 35fd067b5..02752dc33 100644 --- a/python/valuecell/agents/sec_agent.py +++ b/python/valuecell/agents/sec_agent.py @@ -380,10 +380,9 @@ async def _process_fund_holdings_query( # Get 13F-HR filings filings = company.get_filings(form="13F-HR").head(self.config.max_filings) if len(filings) < 2: - yield { - "content": f"❌ **Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis).", - "is_task_complete": True, - } + yield streaming.failed( + f"**Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis)." + ) return logger.info(f"Retrieved {len(filings)} 13F filings") @@ -480,10 +479,9 @@ async def stream( logger.info(f"Query classification result: {query_type}") except Exception as e: logger.error(f"Query classification failed: {e}") - yield { - "content": f"❌ **Classification Error**: Unable to analyze query type.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed( + "**Classification Error**: Unable to analyze query type." + ) return # 2. Extract stock ticker @@ -506,10 +504,9 @@ async def stream( logger.info(f"Extracted stock ticker: {ticker}") except Exception as e: logger.error(f"Stock ticker extraction failed: {e}") - yield { - "content": f"❌ **Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed( + "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." + ) return # 3. Route to appropriate processing method based on query type @@ -526,10 +523,7 @@ async def stream( except Exception as e: logger.error(f"Unexpected error in stream method: {e}") - yield { - "content": f"❌ **System Error**: An unexpected error occurred while processing the request.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed(f"Unexpected error: {e}") async def notify(self, query: str, session_id: str, task_id: str): """ @@ -546,10 +540,9 @@ async def notify(self, query: str, session_id: str, task_id: str): logger.info(f"Extracted ticker: {ticker}") except Exception as e: logger.error(f"Ticker extraction failed: {e}") - yield { - "content": f"❌ **Parse Error**: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed( + "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." + ) return # 2. Initialize monitoring session From 3a9f128810c9b80a048aa65239cd3d93ce4f32ae Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Mon, 22 Sep 2025 13:32:13 +0800 Subject: [PATCH 3/3] format --- python/valuecell/agents/sec_agent.py | 60 ++++++------------------ python/valuecell/core/agent/responses.py | 6 +++ 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py index 02752dc33..cd4919f78 100644 --- a/python/valuecell/agents/sec_agent.py +++ b/python/valuecell/agents/sec_agent.py @@ -11,7 +11,8 @@ from edgar import Company, set_identity from pydantic import BaseModel, Field, field_validator -from valuecell.core.types import BaseAgent, StreamResponse, streaming +from valuecell.core.agent.responses import streaming, notification +from valuecell.core.types import BaseAgent, StreamResponse from valuecell.core.agent.decorator import create_wrapped_agent # Configure logging @@ -540,7 +541,7 @@ async def notify(self, query: str, session_id: str, task_id: str): logger.info(f"Extracted ticker: {ticker}") except Exception as e: logger.error(f"Ticker extraction failed: {e}") - yield streaming.failed( + yield notification.failed( "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." ) return @@ -553,17 +554,6 @@ async def notify(self, query: str, session_id: str, task_id: str): "last_check": None, } - # 3. Send initial confirmation - yield { - "content": f" **SEC Filing Monitor Started**\n\n" - f"**Ticker**: {ticker}\n" - f"**Monitoring**: 10-K, 8-K, 10-Q, 13F filings\n" - f"**Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - f"I will continuously monitor {ticker}'s SEC filings and notify you of any changes...", - "is_task_complete": False, - } - - # 4. Continuous monitoring loop check_interval = 30 # Check every 30 seconds (can be configured) while session_id in self.monitoring_sessions: @@ -590,49 +580,27 @@ async def notify(self, query: str, session_id: str, task_id: str): ticker, changes ) - yield { - "content": f"🚨 **New SEC Filing Detected!**\n\n" - f"**Ticker**: {ticker}\n" - f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - f"**Summary**:\n{summary}\n\n" - f"Continuing to monitor for further changes...", - "is_task_complete": False, - } - else: - # Periodic status update (every 10 checks) - if ( - self.monitoring_sessions[session_id]["check_count"] % 10 - == 0 - ): - yield { - "content": f"📊 **Monitoring Status Update**\n\n" - f"**Ticker**: {ticker}\n" - f"**Checks performed**: {self.monitoring_sessions[session_id]['check_count']}\n" - f"**Last check**: {datetime.now().strftime('%H:%M:%S')}\n" - f"**Status**: No new filings detected\n\n" - f"Monitoring continues...", - "is_task_complete": False, - } - else: - logger.warning(f"Failed to retrieve filings for {ticker}") + # yield { + # "content": f"🚨 **New SEC Filing Detected!**\n\n" + # f"**Ticker**: {ticker}\n" + # f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + # f"**Summary**:\n{summary}\n\n" + # f"Continuing to monitor for further changes...", + # "is_task_complete": False, + # } + yield notification.message(summary) # Wait before next check await asyncio.sleep(check_interval) except Exception as e: logger.error(f"Error during monitoring check: {e}") - yield { - "content": f"⚠️ **Monitoring Error**: {str(e)}\n\nRetrying in {check_interval} seconds...", - "is_task_complete": False, - } + yield notification.failed(str(e)) await asyncio.sleep(check_interval) except Exception as e: logger.error(f"Unexpected error in notify method: {e}") - yield { - "content": f"❌ **System Error**: An unexpected error occurred while setting up monitoring.\nError details: {str(e)}", - "is_task_complete": True, - } + yield notification.failed(str(e)) finally: # Clean up monitoring session if session_id in self.monitoring_sessions: diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 64f768546..c44e94992 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -93,6 +93,12 @@ def done(self, content: Optional[str] = None) -> NotifyResponse: event=NotifyResponseEvent.TASK_DONE, ) + def failed(self, content: Optional[str] = None) -> NotifyResponse: + return NotifyResponse( + content=content, + event=NotifyResponse.TASK_FAILED, + ) + notification = _NotifyResponseNamespace()