Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 259 additions & 1 deletion python/valuecell/agents/sec_agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import hashlib
import logging
import os
from datetime import datetime
from enum import Enum
from typing import Iterator
from typing import Dict, Iterator

from agno.agent import Agent, RunResponse, RunResponseEvent # noqa
from agno.models.openrouter import OpenRouter
Expand Down Expand Up @@ -80,6 +82,10 @@ def __init__(self):
super().__init__()
self.config = Sec13FundAgentConfig()

# Monitoring state management
self.monitoring_sessions: Dict[str, Dict] = {} # session_id -> monitoring_info
self.filing_cache: Dict[str, Dict] = {} # ticker -> filing_hashes

try:
# Query classification agent - for determining query type
self.classifier_agent = Agent(
Expand All @@ -103,6 +109,132 @@ def __init__(self):
logger.error(f"Failed to initialize SEC Agent: {e}")
raise

async def _extract_ticker_from_query(self, query: str) -> str:
"""Extract ticker symbol from user query using AI"""
try:
extraction_prompt = f"""
Extract the stock ticker symbol from the following query: "{query}"

Rules:
- Return only the ticker symbol (e.g., "AAPL", "TSLA", "MSFT")
- If multiple tickers are mentioned, return the first one
- If no ticker is found, return "UNKNOWN"
- The ticker should be in uppercase

Query: {query}
"""

response = self.analysis_agent.run(extraction_prompt)
ticker = response.content.strip().upper()

# Basic validation
if ticker == "UNKNOWN" or len(ticker) > 10 or not ticker.isalpha():
raise ValueError(f"Invalid ticker extracted: {ticker}")

logger.info(f"Extracted ticker: {ticker} from query: {query}")
return ticker

except Exception as e:
logger.error(f"Failed to extract ticker from query: {e}")
raise ValueError(f"Could not extract valid ticker from query: {query}")

async def _get_sec_filings(self, ticker: str) -> Dict[str, str]:
"""Get current SEC filings for a ticker and return their hashes"""
try:
set_identity(self.config.sec_email)
company = Company(ticker)

filings_data = {}
filing_types = ["10-K", "8-K", "10-Q", "13F-HR"]

for filing_type in filing_types:
try:
# Get the most recent filing of this type
filings = company.get_filings(form=filing_type).latest(1)
if filings:
filing = filings[0]
# Create a hash of the filing content/metadata
filing_content = f"{filing.accession_number}_{filing.filing_date}_{filing.form}"
filing_hash = hashlib.md5(filing_content.encode()).hexdigest()
filings_data[filing_type] = filing_hash
logger.info(
f"Found {filing_type} filing for {ticker}: {filing.accession_number}"
)
else:
filings_data[filing_type] = None
logger.info(f"No {filing_type} filing found for {ticker}")
except Exception as e:
logger.warning(f"Error getting {filing_type} for {ticker}: {e}")
filings_data[filing_type] = None

return filings_data

except Exception as e:
logger.error(f"Failed to get SEC filings for {ticker}: {e}")
return {}

async def _detect_filing_changes(
self, ticker: str, current_filings: Dict[str, str]
) -> Dict[str, bool]:
"""Detect changes in SEC filings compared to cached versions"""
changes = {}

if ticker not in self.filing_cache:
# First time checking this ticker
self.filing_cache[ticker] = current_filings.copy()
# Consider all existing filings as "new" for first check
for filing_type, filing_hash in current_filings.items():
changes[filing_type] = filing_hash is not None
else:
# Compare with cached versions
cached_filings = self.filing_cache[ticker]
for filing_type, current_hash in current_filings.items():
cached_hash = cached_filings.get(filing_type)
changes[filing_type] = (
current_hash != cached_hash and current_hash is not None
)

# Update cache
self.filing_cache[ticker] = current_filings.copy()

return changes

async def _generate_filing_summary(
self, ticker: str, changed_filings: Dict[str, bool]
) -> str:
"""Generate AI summary of filing changes"""
try:
changed_types = [
filing_type
for filing_type, changed in changed_filings.items()
if changed
]

if not changed_types:
return f"No new filings detected for {ticker}."

summary_prompt = f"""
New SEC filings have been detected for {ticker}. The following filing types have been updated:
{", ".join(changed_types)}

Please provide a brief summary of what these filing types typically contain and their significance for investors:

- 10-K: Annual report with comprehensive business overview
- 8-K: Current report for material events
- 10-Q: Quarterly financial report
- 13F-HR: Institutional investment holdings report

Focus on explaining what investors should pay attention to with these new {ticker} filings.
Keep the summary concise but informative (2-3 paragraphs maximum).
"""

response = self.analysis_agent.run(summary_prompt)
return response.content

except Exception as e:
logger.error(f"Failed to generate filing summary: {e}")
return f"New filings detected for {ticker}: {', '.join(changed_types)}, but summary generation failed."

async def _classify_query(self, query: str) -> QueryType:
"""
Intelligently classify user queries to determine if it's financial data or 13F query
Expand Down Expand Up @@ -408,6 +540,132 @@ async def stream(self, query: str, session_id: str, task_id: str):
"is_task_complete": True,
}

async def notify(self, query: str, session_id: str, task_id: str):
"""
Main notify method with continuous SEC filing monitoring
"""
try:
logger.info(
f"Starting SEC filing monitoring - session: {session_id}, task: {task_id}"
)

# 1. Extract ticker from query
try:
ticker = await self._extract_ticker_from_query(query)
logger.info(f"Extracted ticker: {ticker}")
except Exception as e:
logger.error(f"Ticker extraction failed: {e}")
yield {
"content": f"❌ **Parse Error**: {str(e)}",
"is_task_complete": True,
}
return

# 2. Initialize monitoring session
self.monitoring_sessions[session_id] = {
"ticker": ticker,
"start_time": datetime.now(),
"check_count": 0,
"last_check": None,
}

# 3. Send initial confirmation
yield {
"content": f" **SEC Filing Monitor Started**\n\n"
f"**Ticker**: {ticker}\n"
f"**Monitoring**: 10-K, 8-K, 10-Q, 13F filings\n"
f"**Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"I will continuously monitor {ticker}'s SEC filings and notify you of any changes...",
"is_task_complete": False,
}

# 4. Continuous monitoring loop
check_interval = 30 # Check every 30 seconds (can be configured)

while session_id in self.monitoring_sessions:
try:
# Get current filings
current_filings = await self._get_sec_filings(ticker)

if current_filings:
# Detect changes
changes = await self._detect_filing_changes(
ticker, current_filings
)

# Update monitoring session
self.monitoring_sessions[session_id]["check_count"] += 1
self.monitoring_sessions[session_id]["last_check"] = (
datetime.now()
)

# Check if there are any changes
if any(changes.values()):
# Generate summary of changes
summary = await self._generate_filing_summary(
ticker, changes
)

yield {
"content": f"🚨 **New SEC Filing Detected!**\n\n"
f"**Ticker**: {ticker}\n"
f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"**Summary**:\n{summary}\n\n"
f"Continuing to monitor for further changes...",
"is_task_complete": False,
}
else:
# Periodic status update (every 10 checks)
if (
self.monitoring_sessions[session_id]["check_count"] % 10
== 0
):
yield {
"content": f"📊 **Monitoring Status Update**\n\n"
f"**Ticker**: {ticker}\n"
f"**Checks performed**: {self.monitoring_sessions[session_id]['check_count']}\n"
f"**Last check**: {datetime.now().strftime('%H:%M:%S')}\n"
f"**Status**: No new filings detected\n\n"
f"Monitoring continues...",
"is_task_complete": False,
}
else:
logger.warning(f"Failed to retrieve filings for {ticker}")

# Wait before next check
await asyncio.sleep(check_interval)

except Exception as e:
logger.error(f"Error during monitoring check: {e}")
yield {
"content": f"⚠️ **Monitoring Error**: {str(e)}\n\nRetrying in {check_interval} seconds...",
"is_task_complete": False,
}
await asyncio.sleep(check_interval)

except Exception as e:
logger.error(f"Unexpected error in notify method: {e}")
yield {
"content": f"❌ **System Error**: An unexpected error occurred while setting up monitoring.\nError details: {str(e)}",
"is_task_complete": True,
}
finally:
# Clean up monitoring session
if session_id in self.monitoring_sessions:
del self.monitoring_sessions[session_id]

def stop_monitoring(self, session_id: str) -> bool:
"""Stop monitoring for a specific session"""
if session_id in self.monitoring_sessions:
del self.monitoring_sessions[session_id]
logger.info(f"Stopped monitoring for session: {session_id}")
return True
return False

def get_monitoring_status(self, session_id: str) -> Dict:
"""Get current monitoring status for a session"""
return self.monitoring_sessions.get(session_id, {})


if __name__ == "__main__":
agent = create_wrapped_agent(SecAgent)
Expand Down