diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py index 826ca2b4a..cd4919f78 100644 --- a/python/valuecell/agents/sec_agent.py +++ b/python/valuecell/agents/sec_agent.py @@ -4,14 +4,15 @@ import os from datetime import datetime from enum import Enum -from typing import Dict, Iterator +from typing import Dict, Iterator, AsyncGenerator from agno.agent import Agent, RunResponse, RunResponseEvent # noqa from agno.models.openrouter import OpenRouter from edgar import Company, set_identity from pydantic import BaseModel, Field, field_validator -from valuecell.core.types import BaseAgent +from valuecell.core.agent.responses import streaming, notification +from valuecell.core.types import BaseAgent, StreamResponse from valuecell.core.agent.decorator import create_wrapped_agent # Configure logging @@ -299,10 +300,9 @@ async def _process_financial_data_query( continue if not all_filings_data: - yield { - "content": f"❌ **Insufficient Data**: No financial filings found for company '{ticker}'.", - "is_task_complete": True, - } + yield streaming.failed( + f"**Insufficient Data**: No financial filings found for company '{ticker}'." + ) return # Generate financial data analysis report @@ -336,10 +336,10 @@ async def _process_financial_data_query( ## Output requirements: Please output analysis results in a clear structure, including: - - 📊 **Key Findings**: 3-5 important insights - - 📈 **Financial Highlights**: Important financial data and trends - - 🎯 **Investment Reference**: Reference value for investors - - ⚠️ **Risk Alerts**: Risk points that need attention + - **Key Findings**: 3-5 important insights + - **Financial Highlights**: Important financial data and trends + - **Investment Reference**: Reference value for investors + - **Risk Alerts**: Risk points that need attention Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation. """ @@ -349,27 +349,22 @@ async def _process_financial_data_query( ) for event in response_stream: if event.event == "RunResponseContent": - yield { - "content": event.content, - "is_task_complete": False, - } + yield streaming.message_chunk(event.content) elif event.event == "ToolCallStarted": - print(f"Tool call started: {event.tool}") + yield streaming.tool_call_started( + event.tool.tool_call_id, event.tool.tool_name + ) + elif event.event == "ToolCallCompleted": + yield streaming.tool_call_completed( + event.tool.result, event.tool.tool_call_id, event.tool.tool_name + ) elif event.event == "ReasoningStep": - print(f"Reasoning step: {event.content}") + yield streaming.reasoning(event.reasoning_content) logger.info("Financial data analysis completed") - yield { - "content": "", - "is_task_complete": True, - } - + yield streaming.done() except Exception as e: - logger.error(f"Financial data query failed: {e}") - yield { - "content": f"❌ **Financial Data Query Error**: Unable to retrieve financial data for company '{ticker}'.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed(f"Financial data query failed: {e}") async def _process_fund_holdings_query( self, ticker: str, session_id: str, task_id: str @@ -386,10 +381,9 @@ async def _process_fund_holdings_query( # Get 13F-HR filings filings = company.get_filings(form="13F-HR").head(self.config.max_filings) if len(filings) < 2: - yield { - "content": f"❌ **Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis).", - "is_task_complete": True, - } + yield streaming.failed( + f"**Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis)." + ) return logger.info(f"Retrieved {len(filings)} 13F filings") @@ -438,10 +432,10 @@ async def _process_fund_holdings_query( ## Output requirements: Please output analysis results in a clear structure, including: - - 📊 **Key Findings**: 3-5 important insights - - 📈 **Important Data**: Specific change data and percentages - - 🎯 **Investment Insights**: Reference value for investors - - ⚠️ **Risk Alerts**: Risk points that need attention + - **Key Findings**: 3-5 important insights + - **Important Data**: Specific change data and percentages + - **Investment Insights**: Reference value for investors + - **Risk Alerts**: Risk points that need attention Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation. """ @@ -451,30 +445,27 @@ async def _process_fund_holdings_query( ) for event in response_stream: if event.event == "RunResponseContent": - yield { - "content": event.content, - "is_task_complete": False, - } + yield streaming.message_chunk(event.content) elif event.event == "ToolCallStarted": - print(f"Tool call started: {event.tool}") + yield streaming.tool_call_started( + event.tool.tool_call_id, event.tool.tool_name + ) + elif event.event == "ToolCallCompleted": + yield streaming.tool_call_completed( + event.tool.result, event.tool.tool_call_id, event.tool.tool_name + ) elif event.event == "ReasoningStep": - print(f"Reasoning step: {event.content}") + yield streaming.reasoning(event.reasoning_content) logger.info("Financial data analysis completed") - yield { - "content": "", - "is_task_complete": True, - } + streaming.done() logger.info("13F analysis completed") - except Exception as e: - logger.error(f"13F query failed: {e}") - yield { - "content": f"❌ **13F Query Error**: Unable to retrieve 13F data for company '{ticker}'.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed(f"13F query failed: {e}") - async def stream(self, query: str, session_id: str, task_id: str): + async def stream( + self, query: str, session_id: str, task_id: str + ) -> AsyncGenerator[StreamResponse, None]: """ Main streaming method with intelligent routing support """ @@ -489,10 +480,9 @@ async def stream(self, query: str, session_id: str, task_id: str): logger.info(f"Query classification result: {query_type}") except Exception as e: logger.error(f"Query classification failed: {e}") - yield { - "content": f"❌ **Classification Error**: Unable to analyze query type.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed( + "**Classification Error**: Unable to analyze query type." + ) return # 2. Extract stock ticker @@ -515,10 +505,9 @@ async def stream(self, query: str, session_id: str, task_id: str): logger.info(f"Extracted stock ticker: {ticker}") except Exception as e: logger.error(f"Stock ticker extraction failed: {e}") - yield { - "content": f"❌ **Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed( + "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." + ) return # 3. Route to appropriate processing method based on query type @@ -535,10 +524,7 @@ async def stream(self, query: str, session_id: str, task_id: str): except Exception as e: logger.error(f"Unexpected error in stream method: {e}") - yield { - "content": f"❌ **System Error**: An unexpected error occurred while processing the request.\nError details: {str(e)}", - "is_task_complete": True, - } + yield streaming.failed(f"Unexpected error: {e}") async def notify(self, query: str, session_id: str, task_id: str): """ @@ -555,10 +541,9 @@ async def notify(self, query: str, session_id: str, task_id: str): logger.info(f"Extracted ticker: {ticker}") except Exception as e: logger.error(f"Ticker extraction failed: {e}") - yield { - "content": f"❌ **Parse Error**: {str(e)}", - "is_task_complete": True, - } + yield notification.failed( + "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." + ) return # 2. Initialize monitoring session @@ -569,17 +554,6 @@ async def notify(self, query: str, session_id: str, task_id: str): "last_check": None, } - # 3. Send initial confirmation - yield { - "content": f" **SEC Filing Monitor Started**\n\n" - f"**Ticker**: {ticker}\n" - f"**Monitoring**: 10-K, 8-K, 10-Q, 13F filings\n" - f"**Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - f"I will continuously monitor {ticker}'s SEC filings and notify you of any changes...", - "is_task_complete": False, - } - - # 4. Continuous monitoring loop check_interval = 30 # Check every 30 seconds (can be configured) while session_id in self.monitoring_sessions: @@ -606,49 +580,27 @@ async def notify(self, query: str, session_id: str, task_id: str): ticker, changes ) - yield { - "content": f"🚨 **New SEC Filing Detected!**\n\n" - f"**Ticker**: {ticker}\n" - f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - f"**Summary**:\n{summary}\n\n" - f"Continuing to monitor for further changes...", - "is_task_complete": False, - } - else: - # Periodic status update (every 10 checks) - if ( - self.monitoring_sessions[session_id]["check_count"] % 10 - == 0 - ): - yield { - "content": f"📊 **Monitoring Status Update**\n\n" - f"**Ticker**: {ticker}\n" - f"**Checks performed**: {self.monitoring_sessions[session_id]['check_count']}\n" - f"**Last check**: {datetime.now().strftime('%H:%M:%S')}\n" - f"**Status**: No new filings detected\n\n" - f"Monitoring continues...", - "is_task_complete": False, - } - else: - logger.warning(f"Failed to retrieve filings for {ticker}") + # yield { + # "content": f"🚨 **New SEC Filing Detected!**\n\n" + # f"**Ticker**: {ticker}\n" + # f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + # f"**Summary**:\n{summary}\n\n" + # f"Continuing to monitor for further changes...", + # "is_task_complete": False, + # } + yield notification.message(summary) # Wait before next check await asyncio.sleep(check_interval) except Exception as e: logger.error(f"Error during monitoring check: {e}") - yield { - "content": f"⚠️ **Monitoring Error**: {str(e)}\n\nRetrying in {check_interval} seconds...", - "is_task_complete": False, - } + yield notification.failed(str(e)) await asyncio.sleep(check_interval) except Exception as e: logger.error(f"Unexpected error in notify method: {e}") - yield { - "content": f"❌ **System Error**: An unexpected error occurred while setting up monitoring.\nError details: {str(e)}", - "is_task_complete": True, - } + yield notification.failed(str(e)) finally: # Clean up monitoring session if session_id in self.monitoring_sessions: diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 64f768546..c44e94992 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -93,6 +93,12 @@ def done(self, content: Optional[str] = None) -> NotifyResponse: event=NotifyResponseEvent.TASK_DONE, ) + def failed(self, content: Optional[str] = None) -> NotifyResponse: + return NotifyResponse( + content=content, + event=NotifyResponse.TASK_FAILED, + ) + notification = _NotifyResponseNamespace()