From 814472373df323f6e3574fbc8f315fbff3aa2a02 Mon Sep 17 00:00:00 2001 From: paisley Date: Fri, 24 Oct 2025 16:30:28 +0800 Subject: [PATCH] Remove sec_agent.py Deleted the python/valuecell/agents/sec_agent.py file, removing the SEC agent implementation. Added .idea/workspace.xml for IDE workspace configuration. Unignore .idea directory in .gitignore The .idea directory is now tracked by git, allowing JetBrains IDE project files to be included in version control. This change may help share IDE configurations among team members. --- .gitignore | 2 +- python/valuecell/agents/sec_agent.py | 665 --------------------------- 2 files changed, 1 insertion(+), 666 deletions(-) delete mode 100644 python/valuecell/agents/sec_agent.py diff --git a/.gitignore b/.gitignore index 2b798f7b5..b78e7a56f 100644 --- a/.gitignore +++ b/.gitignore @@ -173,7 +173,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ # Abstra # Abstra is an AI-powered process automation framework. diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py deleted file mode 100644 index 13c6f8562..000000000 --- a/python/valuecell/agents/sec_agent.py +++ /dev/null @@ -1,665 +0,0 @@ -import asyncio -import hashlib -import json -import logging -import os -from datetime import datetime -from enum import Enum -from typing import AsyncGenerator, Dict, Iterator, Optional - -from agno.agent import Agent, RunOutputEvent -from agno.models.openrouter import OpenRouter -from edgar import Company, set_identity -from pydantic import BaseModel, Field, field_validator - -from valuecell.core.agent.decorator import create_wrapped_agent -from valuecell.core.agent.responses import notification, streaming -from valuecell.core.constants import LANGUAGE, TIMEZONE -from valuecell.core.types import BaseAgent, StreamResponse - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) -SEC_FILLINGS_COMPONENT_TYPE = "sec_feed" - - -class QueryType(str, Enum): - """Query type enumeration""" - - FINANCIAL_DATA = "financial_data" # Financial data queries (10-K, 8-K, 10-Q) - FUND_HOLDINGS = "fund_holdings" # 13F fund holdings queries - - -class SecRequest(BaseModel): - """Unified SEC query request model""" - - ticker: str = Field( - ..., - description="Stock ticker symbol to analyze (e.g., 'AAPL', 'TSLA'). Only one ticker symbol is allowed per request.", - ) - query_type: QueryType = Field( - ..., - description="Type of SEC data to query: 'financial_data' for 10-K/8-K/10-Q filings, 'fund_holdings' for 13F holdings analysis", - ) - - @field_validator("ticker") - @classmethod - def validate_ticker(cls, v): - if not v or not isinstance(v, str): - raise ValueError("Ticker must be a non-empty string") - return v.upper().strip() - - -class Sec13FundRequest(BaseModel): - ticker: str = Field( - ..., - description="Stock ticker symbol to analyze (e.g., 'AAPL', 'TSLA'). Only one ticker symbol is allowed per request.", - ) - - @field_validator("ticker") - @classmethod - def validate_tickers(cls, v): - if not v or not isinstance(v, str): - raise ValueError("Ticker must be a non-empty string") - return v.upper().strip() - - -class Sec13FundAgentConfig: - """Configuration management class for SEC 13F Agent""" - - def __init__(self): - self.sec_email = os.getenv("SEC_EMAIL") - self.parser_model_id = os.getenv("SEC_PARSER_MODEL_ID", "openai/gpt-4o-mini") - self.analysis_model_id = os.getenv( - "SEC_ANALYSIS_MODEL_ID", "google/gemini-2.5-pro" - ) - self.max_filings = int(os.getenv("SEC_MAX_FILINGS", "5")) - self.request_timeout = int(os.getenv("SEC_REQUEST_TIMEOUT", "30")) - - -class SECAgent(BaseAgent): - """ - Intelligent SEC analysis agent supporting financial data queries and 13F fund holdings analysis - """ - - def __init__(self): - super().__init__() - self.config = Sec13FundAgentConfig() - - # Monitoring state management - self.monitoring_sessions: Dict[str, Dict] = {} # session_id -> monitoring_info - self.filing_cache: Dict[str, Dict] = {} # ticker -> filing_hashes - - try: - # Query classification agent - for determining query type - self.classifier_agent = Agent( - model=OpenRouter(id=self.config.parser_model_id), - output_schema=SecRequest, - markdown=True, - ) - # Traditional 13F parsing agent - maintains backward compatibility - self.parser_agent = Agent( - model=OpenRouter(id=self.config.parser_model_id), - output_schema=Sec13FundRequest, - markdown=True, - ) - # Analysis agent - self.analysis_agent = Agent( - model=OpenRouter(id=self.config.analysis_model_id, max_tokens=None), - markdown=True, - ) - logger.info("SEC intelligent analysis agent initialized successfully") - except Exception as e: - logger.error(f"Failed to initialize SEC Agent: {e}") - raise - - async def _extract_ticker_from_query(self, query: str) -> str: - """Extract ticker symbol from user query using AI""" - try: - extraction_prompt = f""" - Extract the stock ticker symbol from the following query: "{query}" - - Rules: - - Return only the ticker symbol (e.g., "AAPL", "TSLA", "MSFT") - - If multiple tickers are mentioned, return the first one - - If no ticker is found, return "UNKNOWN" - - The ticker should be in uppercase - - Query: {query} - """ - - response = await self.analysis_agent.arun(extraction_prompt) - ticker = response.content.strip().upper() - - # Basic validation - if ticker == "UNKNOWN" or len(ticker) > 10: - raise ValueError(f"Invalid ticker extracted: {ticker}") - - logger.info(f"Extracted ticker: {ticker} from query: {query}") - return ticker - - except Exception as e: - logger.error(f"Failed to extract ticker from query: {e}") - raise ValueError(f"Could not extract valid ticker from query: {query}") - - async def _get_sec_filings(self, ticker: str) -> Dict[str, str]: - """Get current SEC filings for a ticker and return their hashes""" - try: - set_identity(self.config.sec_email) - company = Company(ticker) - - filings_data = {} - filing_types = ["10-K", "8-K", "10-Q", "13F-HR"] - - for filing_type in filing_types: - try: - # Get the most recent filing of this type - filing = company.get_filings(form=filing_type).latest() - if filing: - # Create a hash of the filing content/metadata - filing_content = f"{filing.accession_number}_{filing.filing_date}_{filing.form}" - filing_hash = hashlib.md5(filing_content.encode()).hexdigest() - filings_data[filing_type] = filing_hash - logger.info( - f"Found {filing_type} filing for {ticker}: {filing.accession_number}" - ) - else: - filings_data[filing_type] = None - logger.info(f"No {filing_type} filing found for {ticker}") - except Exception as e: - logger.warning(f"Error getting {filing_type} for {ticker}: {e}") - filings_data[filing_type] = None - - return filings_data - - except Exception as e: - logger.error(f"Failed to get SEC filings for {ticker}: {e}") - return {} - - async def _detect_filing_changes( - self, ticker: str, current_filings: Dict[str, str] - ) -> Dict[str, bool]: - """Detect changes in SEC filings compared to cached versions""" - changes = {} - - if ticker not in self.filing_cache: - # First time checking this ticker - self.filing_cache[ticker] = current_filings.copy() - # Consider all existing filings as "new" for first check - for filing_type, filing_hash in current_filings.items(): - changes[filing_type] = filing_hash is not None - else: - # Compare with cached versions - cached_filings = self.filing_cache[ticker] - for filing_type, current_hash in current_filings.items(): - cached_hash = cached_filings.get(filing_type) - changes[filing_type] = ( - current_hash != cached_hash and current_hash is not None - ) - - # Update cache - self.filing_cache[ticker] = current_filings.copy() - - return changes - - async def _generate_filing_summary( - self, ticker: str, changed_filings: Dict[str, bool] - ) -> str: - """Generate AI summary of filing changes in JSON format""" - try: - changed_types = [ - filing_type - for filing_type, changed in changed_filings.items() - if changed - ] - - if not changed_types: - return "" - - summary_prompt = f""" - New SEC filings have been detected for {ticker}. The following filing types have been updated: - {", ".join(changed_types)} - - Please provide a brief summary of what these filing types typically contain and their significance for investors: - - - 10-K: Annual report with comprehensive business overview - - 8-K: Current report for material events - - 10-Q: Quarterly financial report - - 13F-HR: Institutional investment holdings report - - Focus on explaining what investors should pay attention to with these new {ticker} filings. - Keep the summary concise but informative (2-3 paragraphs maximum). - """ - - response = await self.analysis_agent.arun(summary_prompt) - - # Create JSON structure - summary_data = { - "ticker": ticker, - "source": "SEC", - "data": response.content, - "create_time": datetime.now().isoformat(), - } - - return json.dumps(summary_data) - - except Exception as e: - logger.error(f"Failed to generate filing summary: {e}") - error_summary_data = { - "ticker": ticker, - "source": "SEC", - "data": f"New filings detected for {ticker}: {', '.join(changed_types) if 'changed_types' in locals() else 'unknown'}, but summary generation failed.", - "create_time": datetime.now().isoformat(), - } - return "" - - async def _classify_query(self, query: str) -> QueryType: - """ - Intelligently classify user queries to determine if it's financial data or 13F query - """ - classification_prompt = f""" - Please analyze the following user query and determine what type of SEC data the user wants to obtain: - - User query: "{query}" - - Please judge the user's intent based on the query content: - 1. If the user explicitly wants to understand company financial data, financial statements, annual reports, quarterly reports, major events, etc., choose "financial_data" - 2. If the user explicitly wants to understand 13F fund holdings, institutional investor holding changes, fund shareholding situations, etc., choose "fund_holdings" - - Keyword hints: - - Financial data related: financial statements, annual reports, quarterly reports, 10-K, 8-K, 10-Q, financial condition, revenue, profit, balance sheet, cash flow - - 13F holdings related: fund holdings, institutional investors, shareholding changes, 13F, fund shareholding, investment portfolio - - Please extract the stock ticker and determine the query type. - """ - - try: - response = await self.classifier_agent.arun(classification_prompt) - return response.content.query_type - except Exception as e: - logger.warning( - f"Query classification failed, defaulting to 13F analysis: {e}" - ) - # If classification fails, default to 13F analysis (maintains backward compatibility) - return QueryType.FUND_HOLDINGS - - async def _process_financial_data_query( - self, ticker: str, dependencies: Optional[Dict] = None - ): - """ - Process financial data queries (10-K, 8-K, 10-Q) - """ - try: - # Set SEC identity - set_identity(self.config.sec_email) - company = Company(ticker) - logger.info(f"Starting financial data query for {ticker}") - - # Get different types of financial filings - filing_types = ["10-K", "8-K", "10-Q", "4"] - all_filings_data = {} - - for filing_type in filing_types: - try: - filings = company.get_filings(form=filing_type).head(3) - if len(filings) > 0: - all_filings_data[filing_type] = [] - for filing in filings: - filing_info = { - "date": filing.filing_date, - "accession_number": filing.accession_number, - "form": filing.form, - } - all_filings_data[filing_type].append(filing_info) - logger.info(f"Retrieved {len(filings)} {filing_type} filings") - except Exception as e: - logger.warning(f"Failed to retrieve {filing_type} filings: {e}") - continue - - if not all_filings_data: - yield streaming.failed( - f"**Insufficient Data**: No financial filings found for company '{ticker}'." - ) - return - - # Generate financial data analysis report with user context - user_context = "" - if dependencies: - user_language = dependencies.get(LANGUAGE, "en-US") - user_timezone = dependencies.get(TIMEZONE, "America/New_York") - - user_context = f""" -## User Context: -- User's preferred language: {user_language} -- User's timezone: {user_timezone} - -Please adapt your response to the user's language preference naturally. If the user prefers a non-English language, provide your analysis in that language while maintaining professional standards.""" - - analysis_prompt = f""" - As a professional financial analyst, please analyze the following company's SEC financial filings: - - Company ticker: {ticker} - - ## Available financial filings: - {all_filings_data} - - ## Analysis requirements: - Please provide professional analysis from the following perspectives: - - ### 1. Financial Filing Overview - - Latest 10-K annual report status - - Latest 10-Q quarterly report status - - Important 8-K event disclosures - - ### 2. Filing Timeline Analysis - - Time distribution of filing documents - - Filing frequency and timeliness - - ### 3. Key Financial Events - - Major events identified from 8-K filings - - Important information that may affect investment decisions - - ### 4. Investment Recommendations - - Investment references based on filing documents - - Risk points that need attention - - ## Output requirements: - Please output analysis results in a clear structure, including: - - **Key Findings**: 3-5 important insights - - **Financial Highlights**: Important financial data and trends - - **Investment Reference**: Reference value for investors - - **Risk Alerts**: Risk points that need attention - - Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation.{user_context} - """ - - response_stream: Iterator[RunOutputEvent] = self.analysis_agent.arun( - analysis_prompt, stream=True, stream_intermediate_steps=True - ) - async for event in response_stream: - if event.event == "RunContent": - yield streaming.message_chunk(event.content) - elif event.event == "ToolCallStarted": - yield streaming.tool_call_started( - event.tool.tool_call_id, event.tool.tool_name - ) - elif event.event == "ToolCallCompleted": - yield streaming.tool_call_completed( - event.tool.result, event.tool.tool_call_id, event.tool.tool_name - ) - logger.info("Financial data analysis completed") - - yield streaming.done() - except Exception as e: - yield streaming.failed(f"Financial data query failed: {e}") - - async def _process_fund_holdings_query( - self, ticker: str, dependencies: Optional[Dict] = None - ): - """ - Process 13F fund holdings queries (original logic) - """ - try: - # Set SEC identity - set_identity(self.config.sec_email) - company = Company(ticker) - logger.info(f"Starting 13F holdings data query for {ticker}") - - # Get 13F-HR filings - filings = company.get_filings(form="13F-HR").head(self.config.max_filings) - if len(filings) < 2: - yield streaming.failed( - f"**Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis)." - ) - return - logger.info(f"Retrieved {len(filings)} 13F filings") - - # %% - o = filings[1].obj() - current_filing = o.infotable.to_json() - - # %% - o = filings[2].obj() - previous_filing = o.infotable.to_json() - - logger.info("Successfully parsed current and historical holdings data") - - # Generate 13F analysis report with user context - user_context = "" - if dependencies: - user_language = dependencies.get("language", "en-US") - user_timezone = dependencies.get("timezone", "UTC") - - user_context = f""" -## User Context: -- User's preferred language: {user_language} -- User's timezone: {user_timezone} - -Please adapt your response to the user's language preference naturally. If the user prefers a non-English language, provide your analysis in that language while maintaining professional standards.""" - - analysis_prompt = f""" - As a professional investment analyst, please conduct an in-depth analysis of the following 13F holdings data: - - ## Historical holdings data (earlier period): - {previous_filing} - - ## Current holdings data (latest period): - {current_filing} - - ## Analysis requirements: - Please provide professional analysis from the following perspectives: - - ### 1. Holdings Changes Summary - - New positions: List newly purchased stocks and possible investment rationale - - Exits/reductions: Analyze sold stocks and speculated reasons - - Position adjustments: Focus on stocks with position changes exceeding 20% - - ### 2. Sector Allocation Analysis - - Sector weight changes: Adjustments in sector allocation percentages - - Investment preferences: Changes in sector preferences reflected by capital flows - - Concentration changes: Adjustments in portfolio concentration - - ### 3. Key Holdings Analysis - - Specific changes in top 10 holdings - - Calculate increase/decrease percentages for major positions - - Analyze possible reasons for significant position adjustments - ### 4. Investment Strategy Insights - - Investment style adjustments reflected in holdings changes - - Market trend judgments and responses - - Changes in risk appetite - - ## Output requirements: - Please output analysis results in a clear structure, including: - - **Key Findings**: 3-5 important insights - - **Important Data**: Specific change data and percentages - - **Investment Insights**: Reference value for investors - - **Risk Alerts**: Risk points that need attention - - Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation.{user_context} - """ - - response_stream: Iterator[RunOutputEvent] = await self.analysis_agent.arun( - analysis_prompt, stream=True, stream_intermediate_steps=True - ) - async for event in response_stream: - if event.event == "RunContent": - yield streaming.message_chunk(event.content) - elif event.event == "ToolCallStarted": - yield streaming.tool_call_started( - event.tool.tool_call_id, event.tool.tool_name - ) - elif event.event == "ToolCallCompleted": - yield streaming.tool_call_completed( - event.tool.result, event.tool.tool_call_id, event.tool.tool_name - ) - logger.info("Financial data analysis completed") - - streaming.done() - logger.info("13F analysis completed") - except Exception as e: - yield streaming.failed(f"13F query failed: {e}") - - async def stream( - self, - query: str, - session_id: str, - task_id: str, - dependencies: Optional[Dict] = None, - ) -> AsyncGenerator[StreamResponse, None]: - """ - Main streaming method with intelligent routing support - """ - try: - logger.info( - f"Processing SEC query request - session: {session_id}, task: {task_id}, dependencies: {dependencies}" - ) - - # 1. Intelligent query classification - try: - query_type = await self._classify_query(query) - logger.info(f"Query classification result: {query_type}") - except Exception as e: - logger.error(f"Query classification failed: {e}") - yield streaming.failed( - "**Classification Error**: Unable to analyze query type." - ) - return - - # 2. Extract stock ticker - try: - if query_type == QueryType.FINANCIAL_DATA: - # Use new classification agent to extract stock ticker - classification_prompt = f""" - Please extract the stock ticker from the following query: "{query}" - Please set query_type to "financial_data" and extract the ticker. - """ - response = self.classifier_agent.run(classification_prompt) - ticker = response.content.ticker - else: - # Use original parsing agent (maintains backward compatibility) - run_response = self.parser_agent.run( - f"Parse the following sec 13 funds request and extract the parameters: {query}" - ) - ticker = run_response.content.ticker - - logger.info(f"Extracted stock ticker: {ticker}") - except Exception as e: - logger.error(f"Stock ticker extraction failed: {e}") - yield streaming.failed( - "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." - ) - return - - # 3. Route to appropriate processing method based on query type - if query_type == QueryType.FINANCIAL_DATA: - async for result in self._process_financial_data_query( - ticker, dependencies - ): - yield result - else: # QueryType.FUND_HOLDINGS - async for result in self._process_fund_holdings_query( - ticker, dependencies - ): - yield result - - except Exception as e: - logger.error(f"Unexpected error in stream method: {e}") - yield streaming.failed(f"Unexpected error: {e}") - - async def notify( - self, - query: str, - session_id: str, - task_id: str, - dependencies: Optional[Dict] = None, - ): - """ - Main notify method with continuous SEC filing monitoring - """ - try: - logger.info( - f"Starting SEC filing monitoring - session: {session_id}, task: {task_id}, dependencies: {dependencies}" - ) - - # 1. Extract ticker from query - try: - ticker = await self._extract_ticker_from_query(query) - logger.info(f"Extracted ticker: {ticker}") - except Exception as e: - logger.error(f"Ticker extraction failed: {e}") - yield notification.failed( - "**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker." - ) - return - - # 2. Initialize monitoring session - self.monitoring_sessions[session_id] = { - "ticker": ticker, - "start_time": datetime.now(), - "check_count": 0, - "last_check": None, - } - - check_interval = 30 # Check every 30 seconds (can be configured) - - while session_id in self.monitoring_sessions: - try: - # Get current filings - current_filings = await self._get_sec_filings(ticker) - - if current_filings: - # Detect changes - changes = await self._detect_filing_changes( - ticker, current_filings - ) - - # Update monitoring session - self.monitoring_sessions[session_id]["check_count"] += 1 - self.monitoring_sessions[session_id]["last_check"] = ( - datetime.now() - ) - - # Check if there are any changes - if any(changes.values()): - # Generate summary of changes - summary = await self._generate_filing_summary( - ticker, changes - ) - - if summary: - yield notification.component_generator( - summary, SEC_FILLINGS_COMPONENT_TYPE - ) - - # Wait before next check - await asyncio.sleep(check_interval) - - except Exception as e: - logger.error(f"Error during monitoring check: {e}") - yield notification.failed(str(e)) - await asyncio.sleep(check_interval) - - except Exception as e: - logger.error(f"Unexpected error in notify method: {e}") - yield notification.failed(str(e)) - finally: - # Clean up monitoring session - if session_id in self.monitoring_sessions: - del self.monitoring_sessions[session_id] - - def stop_monitoring(self, session_id: str) -> bool: - """Stop monitoring for a specific session""" - if session_id in self.monitoring_sessions: - del self.monitoring_sessions[session_id] - logger.info(f"Stopped monitoring for session: {session_id}") - return True - return False - - def get_monitoring_status(self, session_id: str) -> Dict: - """Get current monitoring status for a session""" - return self.monitoring_sessions.get(session_id, {}) - - -if __name__ == "__main__": - agent = create_wrapped_agent(SECAgent) - asyncio.run(agent.serve())