diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py index c5df0e248..826ca2b4a 100644 --- a/python/valuecell/agents/sec_agent.py +++ b/python/valuecell/agents/sec_agent.py @@ -1,8 +1,10 @@ import asyncio +import hashlib import logging import os +from datetime import datetime from enum import Enum -from typing import Iterator +from typing import Dict, Iterator from agno.agent import Agent, RunResponse, RunResponseEvent # noqa from agno.models.openrouter import OpenRouter @@ -80,6 +82,10 @@ def __init__(self): super().__init__() self.config = Sec13FundAgentConfig() + # Monitoring state management + self.monitoring_sessions: Dict[str, Dict] = {} # session_id -> monitoring_info + self.filing_cache: Dict[str, Dict] = {} # ticker -> filing_hashes + try: # Query classification agent - for determining query type self.classifier_agent = Agent( @@ -103,6 +109,132 @@ def __init__(self): logger.error(f"Failed to initialize SEC Agent: {e}") raise + async def _extract_ticker_from_query(self, query: str) -> str: + """Extract ticker symbol from user query using AI""" + try: + extraction_prompt = f""" + Extract the stock ticker symbol from the following query: "{query}" + + Rules: + - Return only the ticker symbol (e.g., "AAPL", "TSLA", "MSFT") + - If multiple tickers are mentioned, return the first one + - If no ticker is found, return "UNKNOWN" + - The ticker should be in uppercase + + Query: {query} + """ + + response = self.analysis_agent.run(extraction_prompt) + ticker = response.content.strip().upper() + + # Basic validation + if ticker == "UNKNOWN" or len(ticker) > 10 or not ticker.isalpha(): + raise ValueError(f"Invalid ticker extracted: {ticker}") + + logger.info(f"Extracted ticker: {ticker} from query: {query}") + return ticker + + except Exception as e: + logger.error(f"Failed to extract ticker from query: {e}") + raise ValueError(f"Could not extract valid ticker from query: {query}") + + async def _get_sec_filings(self, ticker: str) -> Dict[str, str]: + """Get current SEC filings for a ticker and return their hashes""" + try: + set_identity(self.config.sec_email) + company = Company(ticker) + + filings_data = {} + filing_types = ["10-K", "8-K", "10-Q", "13F-HR"] + + for filing_type in filing_types: + try: + # Get the most recent filing of this type + filings = company.get_filings(form=filing_type).latest(1) + if filings: + filing = filings[0] + # Create a hash of the filing content/metadata + filing_content = f"{filing.accession_number}_{filing.filing_date}_{filing.form}" + filing_hash = hashlib.md5(filing_content.encode()).hexdigest() + filings_data[filing_type] = filing_hash + logger.info( + f"Found {filing_type} filing for {ticker}: {filing.accession_number}" + ) + else: + filings_data[filing_type] = None + logger.info(f"No {filing_type} filing found for {ticker}") + except Exception as e: + logger.warning(f"Error getting {filing_type} for {ticker}: {e}") + filings_data[filing_type] = None + + return filings_data + + except Exception as e: + logger.error(f"Failed to get SEC filings for {ticker}: {e}") + return {} + + async def _detect_filing_changes( + self, ticker: str, current_filings: Dict[str, str] + ) -> Dict[str, bool]: + """Detect changes in SEC filings compared to cached versions""" + changes = {} + + if ticker not in self.filing_cache: + # First time checking this ticker + self.filing_cache[ticker] = current_filings.copy() + # Consider all existing filings as "new" for first check + for filing_type, filing_hash in current_filings.items(): + changes[filing_type] = filing_hash is not None + else: + # Compare with cached versions + cached_filings = self.filing_cache[ticker] + for filing_type, current_hash in current_filings.items(): + cached_hash = cached_filings.get(filing_type) + changes[filing_type] = ( + current_hash != cached_hash and current_hash is not None + ) + + # Update cache + self.filing_cache[ticker] = current_filings.copy() + + return changes + + async def _generate_filing_summary( + self, ticker: str, changed_filings: Dict[str, bool] + ) -> str: + """Generate AI summary of filing changes""" + try: + changed_types = [ + filing_type + for filing_type, changed in changed_filings.items() + if changed + ] + + if not changed_types: + return f"No new filings detected for {ticker}." + + summary_prompt = f""" + New SEC filings have been detected for {ticker}. The following filing types have been updated: + {", ".join(changed_types)} + + Please provide a brief summary of what these filing types typically contain and their significance for investors: + + - 10-K: Annual report with comprehensive business overview + - 8-K: Current report for material events + - 10-Q: Quarterly financial report + - 13F-HR: Institutional investment holdings report + + Focus on explaining what investors should pay attention to with these new {ticker} filings. + Keep the summary concise but informative (2-3 paragraphs maximum). + """ + + response = self.analysis_agent.run(summary_prompt) + return response.content + + except Exception as e: + logger.error(f"Failed to generate filing summary: {e}") + return f"New filings detected for {ticker}: {', '.join(changed_types)}, but summary generation failed." + async def _classify_query(self, query: str) -> QueryType: """ Intelligently classify user queries to determine if it's financial data or 13F query @@ -408,6 +540,132 @@ async def stream(self, query: str, session_id: str, task_id: str): "is_task_complete": True, } + async def notify(self, query: str, session_id: str, task_id: str): + """ + Main notify method with continuous SEC filing monitoring + """ + try: + logger.info( + f"Starting SEC filing monitoring - session: {session_id}, task: {task_id}" + ) + + # 1. Extract ticker from query + try: + ticker = await self._extract_ticker_from_query(query) + logger.info(f"Extracted ticker: {ticker}") + except Exception as e: + logger.error(f"Ticker extraction failed: {e}") + yield { + "content": f"❌ **Parse Error**: {str(e)}", + "is_task_complete": True, + } + return + + # 2. Initialize monitoring session + self.monitoring_sessions[session_id] = { + "ticker": ticker, + "start_time": datetime.now(), + "check_count": 0, + "last_check": None, + } + + # 3. Send initial confirmation + yield { + "content": f" **SEC Filing Monitor Started**\n\n" + f"**Ticker**: {ticker}\n" + f"**Monitoring**: 10-K, 8-K, 10-Q, 13F filings\n" + f"**Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + f"I will continuously monitor {ticker}'s SEC filings and notify you of any changes...", + "is_task_complete": False, + } + + # 4. Continuous monitoring loop + check_interval = 30 # Check every 30 seconds (can be configured) + + while session_id in self.monitoring_sessions: + try: + # Get current filings + current_filings = await self._get_sec_filings(ticker) + + if current_filings: + # Detect changes + changes = await self._detect_filing_changes( + ticker, current_filings + ) + + # Update monitoring session + self.monitoring_sessions[session_id]["check_count"] += 1 + self.monitoring_sessions[session_id]["last_check"] = ( + datetime.now() + ) + + # Check if there are any changes + if any(changes.values()): + # Generate summary of changes + summary = await self._generate_filing_summary( + ticker, changes + ) + + yield { + "content": f"🚨 **New SEC Filing Detected!**\n\n" + f"**Ticker**: {ticker}\n" + f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + f"**Summary**:\n{summary}\n\n" + f"Continuing to monitor for further changes...", + "is_task_complete": False, + } + else: + # Periodic status update (every 10 checks) + if ( + self.monitoring_sessions[session_id]["check_count"] % 10 + == 0 + ): + yield { + "content": f"📊 **Monitoring Status Update**\n\n" + f"**Ticker**: {ticker}\n" + f"**Checks performed**: {self.monitoring_sessions[session_id]['check_count']}\n" + f"**Last check**: {datetime.now().strftime('%H:%M:%S')}\n" + f"**Status**: No new filings detected\n\n" + f"Monitoring continues...", + "is_task_complete": False, + } + else: + logger.warning(f"Failed to retrieve filings for {ticker}") + + # Wait before next check + await asyncio.sleep(check_interval) + + except Exception as e: + logger.error(f"Error during monitoring check: {e}") + yield { + "content": f"⚠️ **Monitoring Error**: {str(e)}\n\nRetrying in {check_interval} seconds...", + "is_task_complete": False, + } + await asyncio.sleep(check_interval) + + except Exception as e: + logger.error(f"Unexpected error in notify method: {e}") + yield { + "content": f"❌ **System Error**: An unexpected error occurred while setting up monitoring.\nError details: {str(e)}", + "is_task_complete": True, + } + finally: + # Clean up monitoring session + if session_id in self.monitoring_sessions: + del self.monitoring_sessions[session_id] + + def stop_monitoring(self, session_id: str) -> bool: + """Stop monitoring for a specific session""" + if session_id in self.monitoring_sessions: + del self.monitoring_sessions[session_id] + logger.info(f"Stopped monitoring for session: {session_id}") + return True + return False + + def get_monitoring_status(self, session_id: str) -> Dict: + """Get current monitoring status for a session""" + return self.monitoring_sessions.get(session_id, {}) + if __name__ == "__main__": agent = create_wrapped_agent(SecAgent)