Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 63 additions & 111 deletions python/valuecell/agents/sec_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import os
from datetime import datetime
from enum import Enum
from typing import Dict, Iterator
from typing import Dict, Iterator, AsyncGenerator

from agno.agent import Agent, RunResponse, RunResponseEvent # noqa
from agno.models.openrouter import OpenRouter
from edgar import Company, set_identity
from pydantic import BaseModel, Field, field_validator

from valuecell.core.types import BaseAgent
from valuecell.core.agent.responses import streaming, notification
from valuecell.core.types import BaseAgent, StreamResponse
from valuecell.core.agent.decorator import create_wrapped_agent

# Configure logging
Expand Down Expand Up @@ -299,10 +300,9 @@ async def _process_financial_data_query(
continue

if not all_filings_data:
yield {
"content": f"❌ **Insufficient Data**: No financial filings found for company '{ticker}'.",
"is_task_complete": True,
}
yield streaming.failed(
f"**Insufficient Data**: No financial filings found for company '{ticker}'."
)
return

# Generate financial data analysis report
Expand Down Expand Up @@ -336,10 +336,10 @@ async def _process_financial_data_query(

## Output requirements:
Please output analysis results in a clear structure, including:
- 📊 **Key Findings**: 3-5 important insights
- 📈 **Financial Highlights**: Important financial data and trends
- 🎯 **Investment Reference**: Reference value for investors
- ⚠️ **Risk Alerts**: Risk points that need attention
- **Key Findings**: 3-5 important insights
- **Financial Highlights**: Important financial data and trends
- **Investment Reference**: Reference value for investors
- **Risk Alerts**: Risk points that need attention

Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation.
"""
Expand All @@ -349,27 +349,22 @@ async def _process_financial_data_query(
)
for event in response_stream:
if event.event == "RunResponseContent":
yield {
"content": event.content,
"is_task_complete": False,
}
yield streaming.message_chunk(event.content)
elif event.event == "ToolCallStarted":
print(f"Tool call started: {event.tool}")
yield streaming.tool_call_started(
event.tool.tool_call_id, event.tool.tool_name
)
elif event.event == "ToolCallCompleted":
yield streaming.tool_call_completed(
event.tool.result, event.tool.tool_call_id, event.tool.tool_name
)
elif event.event == "ReasoningStep":
print(f"Reasoning step: {event.content}")
yield streaming.reasoning(event.reasoning_content)
logger.info("Financial data analysis completed")

yield {
"content": "",
"is_task_complete": True,
}

yield streaming.done()
except Exception as e:
logger.error(f"Financial data query failed: {e}")
yield {
"content": f"❌ **Financial Data Query Error**: Unable to retrieve financial data for company '{ticker}'.\nError details: {str(e)}",
"is_task_complete": True,
}
yield streaming.failed(f"Financial data query failed: {e}")

async def _process_fund_holdings_query(
self, ticker: str, session_id: str, task_id: str
Expand All @@ -386,10 +381,9 @@ async def _process_fund_holdings_query(
# Get 13F-HR filings
filings = company.get_filings(form="13F-HR").head(self.config.max_filings)
if len(filings) < 2:
yield {
"content": f"❌ **Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis).",
"is_task_complete": True,
}
yield streaming.failed(
f"**Insufficient Data**: Company '{ticker}' has insufficient 13F-HR filings (at least 2 filings required for comparison analysis)."
)
return
logger.info(f"Retrieved {len(filings)} 13F filings")

Expand Down Expand Up @@ -438,10 +432,10 @@ async def _process_fund_holdings_query(

## Output requirements:
Please output analysis results in a clear structure, including:
- 📊 **Key Findings**: 3-5 important insights
- 📈 **Important Data**: Specific change data and percentages
- 🎯 **Investment Insights**: Reference value for investors
- ⚠️ **Risk Alerts**: Risk points that need attention
- **Key Findings**: 3-5 important insights
- **Important Data**: Specific change data and percentages
- **Investment Insights**: Reference value for investors
- **Risk Alerts**: Risk points that need attention

Please ensure the analysis is objective and professional, based on actual data, avoiding excessive speculation.
"""
Expand All @@ -451,30 +445,27 @@ async def _process_fund_holdings_query(
)
for event in response_stream:
if event.event == "RunResponseContent":
yield {
"content": event.content,
"is_task_complete": False,
}
yield streaming.message_chunk(event.content)
elif event.event == "ToolCallStarted":
print(f"Tool call started: {event.tool}")
yield streaming.tool_call_started(
event.tool.tool_call_id, event.tool.tool_name
)
elif event.event == "ToolCallCompleted":
yield streaming.tool_call_completed(
event.tool.result, event.tool.tool_call_id, event.tool.tool_name
)
elif event.event == "ReasoningStep":
print(f"Reasoning step: {event.content}")
yield streaming.reasoning(event.reasoning_content)
logger.info("Financial data analysis completed")

yield {
"content": "",
"is_task_complete": True,
}
streaming.done()
logger.info("13F analysis completed")

except Exception as e:
logger.error(f"13F query failed: {e}")
yield {
"content": f"❌ **13F Query Error**: Unable to retrieve 13F data for company '{ticker}'.\nError details: {str(e)}",
"is_task_complete": True,
}
yield streaming.failed(f"13F query failed: {e}")

async def stream(self, query: str, session_id: str, task_id: str):
async def stream(
self, query: str, session_id: str, task_id: str
) -> AsyncGenerator[StreamResponse, None]:
"""
Main streaming method with intelligent routing support
"""
Expand All @@ -489,10 +480,9 @@ async def stream(self, query: str, session_id: str, task_id: str):
logger.info(f"Query classification result: {query_type}")
except Exception as e:
logger.error(f"Query classification failed: {e}")
yield {
"content": f"❌ **Classification Error**: Unable to analyze query type.\nError details: {str(e)}",
"is_task_complete": True,
}
yield streaming.failed(
"**Classification Error**: Unable to analyze query type."
)
return

# 2. Extract stock ticker
Expand All @@ -515,10 +505,9 @@ async def stream(self, query: str, session_id: str, task_id: str):
logger.info(f"Extracted stock ticker: {ticker}")
except Exception as e:
logger.error(f"Stock ticker extraction failed: {e}")
yield {
"content": f"❌ **Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker.\nError details: {str(e)}",
"is_task_complete": True,
}
yield streaming.failed(
"**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker."
)
return

# 3. Route to appropriate processing method based on query type
Expand All @@ -535,10 +524,7 @@ async def stream(self, query: str, session_id: str, task_id: str):

except Exception as e:
logger.error(f"Unexpected error in stream method: {e}")
yield {
"content": f"❌ **System Error**: An unexpected error occurred while processing the request.\nError details: {str(e)}",
"is_task_complete": True,
}
yield streaming.failed(f"Unexpected error: {e}")

async def notify(self, query: str, session_id: str, task_id: str):
"""
Expand All @@ -555,10 +541,9 @@ async def notify(self, query: str, session_id: str, task_id: str):
logger.info(f"Extracted ticker: {ticker}")
except Exception as e:
logger.error(f"Ticker extraction failed: {e}")
yield {
"content": f"❌ **Parse Error**: {str(e)}",
"is_task_complete": True,
}
yield notification.failed(
"**Parse Error**: Unable to parse query parameters. Please ensure you provide a valid stock ticker."
)
return

# 2. Initialize monitoring session
Expand All @@ -569,17 +554,6 @@ async def notify(self, query: str, session_id: str, task_id: str):
"last_check": None,
}

# 3. Send initial confirmation
yield {
"content": f" **SEC Filing Monitor Started**\n\n"
f"**Ticker**: {ticker}\n"
f"**Monitoring**: 10-K, 8-K, 10-Q, 13F filings\n"
f"**Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"I will continuously monitor {ticker}'s SEC filings and notify you of any changes...",
"is_task_complete": False,
}

# 4. Continuous monitoring loop
check_interval = 30 # Check every 30 seconds (can be configured)

while session_id in self.monitoring_sessions:
Expand All @@ -606,49 +580,27 @@ async def notify(self, query: str, session_id: str, task_id: str):
ticker, changes
)

yield {
"content": f"🚨 **New SEC Filing Detected!**\n\n"
f"**Ticker**: {ticker}\n"
f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"**Summary**:\n{summary}\n\n"
f"Continuing to monitor for further changes...",
"is_task_complete": False,
}
else:
# Periodic status update (every 10 checks)
if (
self.monitoring_sessions[session_id]["check_count"] % 10
== 0
):
yield {
"content": f"📊 **Monitoring Status Update**\n\n"
f"**Ticker**: {ticker}\n"
f"**Checks performed**: {self.monitoring_sessions[session_id]['check_count']}\n"
f"**Last check**: {datetime.now().strftime('%H:%M:%S')}\n"
f"**Status**: No new filings detected\n\n"
f"Monitoring continues...",
"is_task_complete": False,
}
else:
logger.warning(f"Failed to retrieve filings for {ticker}")
# yield {
# "content": f"🚨 **New SEC Filing Detected!**\n\n"
# f"**Ticker**: {ticker}\n"
# f"**Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
# f"**Summary**:\n{summary}\n\n"
# f"Continuing to monitor for further changes...",
# "is_task_complete": False,
# }
yield notification.message(summary)

# Wait before next check
await asyncio.sleep(check_interval)

except Exception as e:
logger.error(f"Error during monitoring check: {e}")
yield {
"content": f"⚠️ **Monitoring Error**: {str(e)}\n\nRetrying in {check_interval} seconds...",
"is_task_complete": False,
}
yield notification.failed(str(e))
await asyncio.sleep(check_interval)

except Exception as e:
logger.error(f"Unexpected error in notify method: {e}")
yield {
"content": f"❌ **System Error**: An unexpected error occurred while setting up monitoring.\nError details: {str(e)}",
"is_task_complete": True,
}
yield notification.failed(str(e))
finally:
# Clean up monitoring session
if session_id in self.monitoring_sessions:
Expand Down
6 changes: 6 additions & 0 deletions python/valuecell/core/agent/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def done(self, content: Optional[str] = None) -> NotifyResponse:
event=NotifyResponseEvent.TASK_DONE,
)

def failed(self, content: Optional[str] = None) -> NotifyResponse:
return NotifyResponse(
content=content,
event=NotifyResponse.TASK_FAILED,
)


notification = _NotifyResponseNamespace()

Expand Down