diff --git a/.env.example b/.env.example index 53199a27b..637491ebb 100644 --- a/.env.example +++ b/.env.example @@ -28,6 +28,14 @@ PLANNER_MODEL_ID=google/gemini-2.5-flash SEC_PARSER_MODEL_ID=openai/gpt-4o-mini SEC_ANALYSIS_MODEL_ID=deepseek/deepseek-chat-v3-0324 AI_HEDGE_FUND_PARSER_MODEL_ID=google/gemini-2.5-flash +RESEARCH_AGENT_MODEL_ID=google/gemini-2.5-flash + +# Embedding +EMBEDDER_API_KEY= +EMBEDDER_BASE_URL= +EMBEDDER_MODEL_ID= +EMBEDDER_DIMENSION=1568 + # SEC Agent Configurations # Email address for SEC API requests (required by SEC) diff --git a/python/valuecell/agents/research_agent/core.py b/python/valuecell/agents/research_agent/core.py index a3bbe9bfe..c1d1a6d0d 100644 --- a/python/valuecell/agents/research_agent/core.py +++ b/python/valuecell/agents/research_agent/core.py @@ -4,6 +4,7 @@ from agno.agent import Agent, RunOutputEvent from agno.db.in_memory import InMemoryDb from agno.models.google import Gemini +from agno.models.openrouter import OpenRouter from edgar import set_identity from loguru import logger @@ -12,20 +13,30 @@ KNOWLEDGE_AGENT_EXPECTED_OUTPUT, KNOWLEDGE_AGENT_INSTRUCTION, ) -from valuecell.agents.research_agent.sources import fetch_sec_filings +from valuecell.agents.research_agent.sources import ( + fetch_event_sec_filings, + fetch_periodic_sec_filings, +) from valuecell.core.agent.responses import streaming from valuecell.core.types import BaseAgent, StreamResponse from valuecell.utils.env import agent_debug_mode_enabled +def _get_model_based_on_env() -> str: + model_id = os.getenv("RESEARCH_AGENT_MODEL_ID") + if os.getenv("GOOGLE_API_KEY"): + return Gemini(id=model_id or "gemini-2.5-flash") + return OpenRouter(id=model_id or "google/gemini-2.5-flash") + + class ResearchAgent(BaseAgent): def __init__(self, **kwargs): super().__init__(**kwargs) self.knowledge_research_agent = Agent( - model=Gemini(id="gemini-2.5-flash"), + model=_get_model_based_on_env(), instructions=[KNOWLEDGE_AGENT_INSTRUCTION], expected_output=KNOWLEDGE_AGENT_EXPECTED_OUTPUT, - tools=[fetch_sec_filings], + tools=[fetch_periodic_sec_filings, fetch_event_sec_filings], knowledge=knowledge, db=InMemoryDb(), # context diff --git a/python/valuecell/agents/research_agent/prompts.py b/python/valuecell/agents/research_agent/prompts.py index 9df126b8b..1c9e3e21b 100644 --- a/python/valuecell/agents/research_agent/prompts.py +++ b/python/valuecell/agents/research_agent/prompts.py @@ -3,33 +3,46 @@ You are a financial research assistant. Your primary objective is to satisfy the user's information request about a company's financials, filings, or performance with accurate, sourceable, and actionable answers. - -- fetch_sec_filings(ticker_or_cik, form, year?, quarter?): Use this when primary-source facts are needed (e.g., reported revenue, net income, footnotes). Provide exact parameters when invoking the tool. +- +- fetch_periodic_sec_filings(ticker_or_cik, forms, year?, quarter?, limit?): Use this for scheduled reports like 10-K/10-Q when you need primary-source facts (revenue, net income, MD&A text). Prefer batching by year to reduce calls. Note: year/quarter filters apply to filing_date (edgar behavior), not period_of_report. If year is omitted, the tool returns the latest filings using `limit` (default 10). If quarter is provided, year must also be provided. +- fetch_event_sec_filings(ticker_or_cik, forms, start_date?, end_date?, limit?): Use this for event-driven filings like 8-K and ownership forms (3/4/5). Use date ranges and limits to control scope. - Knowledge base search: Use the agent's internal knowledge index to find summaries, historical context, analyst commentary, and previously ingested documents. Efficient tool calling: 1. Batch parameters: When the user asks for multi-period data (e.g., "revenue for Q1-Q4 2024"), prefer a SINGLE call with broader parameters (e.g., year=2024 without quarter filter) rather than 4 separate quarterly calls. -2. Limit concurrent calls: Avoid making more than 3 `fetch_sec_filings` calls in a single response. If more data is needed: +2. Limit concurrent calls: Avoid making more than 3 filing tool calls in a single response. If more data is needed: - Prioritize the most recent or most relevant periods - Use knowledge base search to fill gaps - Suggest follow-up queries for additional details -3. Smart defaults: If year/quarter are unspecified, default to the most recent available data rather than calling multiple periods. -4. Knowledge base first: For broad questions or interpretive queries, search the knowledge base before calling fetch_sec_filings. Only fetch new filings if the knowledge base lacks the specific data needed. +3. Smart defaults: If year/quarter are unspecified for periodic filings, default to the most recent available data rather than calling multiple periods. For event-driven filings, use a recent date window (e.g., last 90 days) with a small limit unless the user specifies otherwise. +4. Knowledge base first: For broad questions or interpretive queries, search the knowledge base before calling filing tools. Only fetch new filings if the knowledge base lacks the specific data needed. + + + +To avoid parameter mistakes, keep these distinctions in mind when calling tools: +- Filing date (filing_date): The date the document was submitted to the SEC. A user saying "filed in Mar 2025" refers to this. +- Period of report (period_of_report): The reporting period end date covered by the filing (e.g., quarter-end or fiscal year-end). A user saying "Q3 2024" or "FY 2024" refers to this. +- Fiscal vs calendar: Users typically mean the company's fiscal calendar when they say Q/FY, unless they explicitly say "calendar". + +Parameter mapping rules: +- For 10-K/10-Q, the tool's year/quarter parameters filter by filing_date (edgar behavior). If the user specifies a fiscal period (period_of_report), fetch a reasonable set (e.g., year=2024) and then confirm the period_of_report in the retrieved metadata when extracting facts. If year is omitted, adjust `limit` to cover the likely number of filings needed (e.g., limit=4 for the last four quarters). +- If a request references when it was filed (filing_date), include that context in your answer. When the mapping is ambiguous (off-cycle fiscal year or unclear phrasing), ask one concise clarifying question or default to the latest and state your assumption. + Before answering, briefly plan your approach: 1. Query type: Is this factual (specific numbers), analytical (trends/comparisons), or exploratory (broad understanding)? -2. Tool strategy: Do I need fetch_sec_filings? How many calls? Can I batch parameters or use knowledge base instead? +2. Tool strategy: Do I need periodic or event filings? How many calls? Can I batch parameters or use knowledge base instead? 3. Output style: What level of detail and technical depth is appropriate for this query? 1. Clarify: If the user's request lacks a ticker/CIK, form type, or time range, ask a single clarifying question. -2. Primary check: If the user requests factual items (financial line items, footnote detail, MD&A text), call `fetch_sec_filings` with specific filters to retrieve the relevant filings. -3. Post-fetch knowledge search (required): Immediately after calling `fetch_sec_filings`, run a knowledge-base search for the same company and time period. Use the search results to: +2. Primary check: If the user requests factual items (financial line items, footnote detail, MD&A text), call `fetch_periodic_sec_filings` (10-Q/10-K) with specific filters. For corporate events or disclosures, call `fetch_event_sec_filings` (8-K/3/4/5) with a relevant date range. +3. Post-fetch knowledge search (required): Immediately after calling a filing tool, run a knowledge-base search for the same company and time period. Use the search results to: - confirm or enrich extracted facts, - surface relevant analyst commentary or historical context, - detect any pre-existing summaries already ingested that relate to the same filing. @@ -123,7 +136,7 @@ Example 1 - Factual query (user asks "What was Tesla's Q3 2024 revenue?"): -Tool plan: Call fetch_sec_filings('TSLA', '10-Q', year=2024, quarter=3) once, then search knowledge base. +Tool plan: Call fetch_periodic_sec_filings('TSLA', '10-Q', year=2024, quarter=3) once, then search knowledge base. Response: "Tesla reported revenue of $25.2 billion in Q3 2024 [Q3 2024 10-Q](file://...), representing 8% year-over-year growth. The automotive segment contributed $20.0 billion (79% of total revenue), while energy generation and storage added $2.4 billion [same source]. diff --git a/python/valuecell/agents/research_agent/sources.py b/python/valuecell/agents/research_agent/sources.py index d4bfdc63e..a0b0dbf5b 100644 --- a/python/valuecell/agents/research_agent/sources.py +++ b/python/valuecell/agents/research_agent/sources.py @@ -1,5 +1,6 @@ +from datetime import date, datetime from pathlib import Path -from typing import List, Optional +from typing import Iterable, List, Optional, Sequence import aiofiles from edgar import Company @@ -11,49 +12,55 @@ from .schemas import SECFilingMetadata, SECFilingResult -async def fetch_sec_filings( - cik_or_ticker: str, - form: List[str] | str = "10-Q", - year: Optional[int | List[int]] = None, - quarter: Optional[int | List[int]] = None, -): - """Fetch SEC filings for a given company. - If year and quarter are provided, filter filings accordingly. If not, fetch the latest filings. +def _ensure_list(value: str | Sequence[str] | None) -> List[str]: + if value is None: + return [] + if isinstance(value, str): + return [value] + return list(value) - Args: - cik_or_ticker (str): CIK or ticker symbol of the company. Never introduce backticks, quotes, or spaces. - form (List[str] | str, optional): Type of SEC filing form to fetch. - - Defaults to "10-Q". Can be a list of forms (e.g. ["10-K", "10-Q"]). - - Choices explained: - - "10-K": Annual report - - "10-Q": Quarterly report - - "8-K": Current report for unscheduled material events or corporate changes - year (Optional[int | List[int]], optional): Year or list of years to filter filings. Defaults to None. - quarter (Optional[int | List[int]], optional): Quarter or list of quarters to filter filings. Defaults to None. - Returns: - List[Tuple[str, Path, dict]]: A list of tuples containing the name, path, and metadata of each fetched filing. - """ - company = Company(cik_or_ticker) - if year or quarter: - filings = company.get_filings(form=form, year=year, quarter=quarter) - else: - filings = company.get_filings(form=form).latest() - if not isinstance(filings, EntityFilings): - filings = [filings] +def _parse_date(d: str | date | None) -> Optional[date]: + if d is None: + return None + if isinstance(d, date): + return d + # try common formats + for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y%m%d"): + try: + return datetime.strptime(d, fmt).date() + except ValueError: + continue + raise ValueError( + f"Invalid date format: {d}. Expect YYYY-MM-DD, YYYY/MM/DD, or YYYYMMDD." + ) - res = [] + +async def _write_and_ingest( + filings: Iterable, + knowledge_dir: Path, +) -> List[SECFilingResult]: + knowledge_dir.mkdir(parents=True, exist_ok=True) + results: List[SECFilingResult] = [] for filing in filings: filing_date: str = filing.filing_date.strftime("%Y-%m-%d") - period_of_report: str = filing.period_of_report - content: str = filing.document.markdown() + period_of_report: str = getattr(filing, "period_of_report", "") + # Convert to markdown; fall back to string if markdown unavailable + try: + content: str = filing.document.markdown() + except Exception: + try: + content = str(filing.document) + except Exception: + content = "" doc_type: str = filing.form company_name: str = filing.company orig_doc = filing.document.document - md_doc = orig_doc.replace(filing.document.extension, ".md") + # build stable markdown filename using suffix replacement, keep base name only + md_doc = Path(orig_doc).with_suffix(".md").name file_name = f"{doc_type}_{md_doc}" - path = Path(get_knowledge_path()) / file_name + path = knowledge_dir / file_name metadata = SECFilingMetadata( doc_type=doc_type, company=company_name, @@ -63,10 +70,120 @@ async def fetch_sec_filings( async with aiofiles.open(path, "w") as file: await file.write(content) - sec_filing_result = SECFilingResult(file_name, path, metadata) - res.append(sec_filing_result) + result = SECFilingResult(file_name, path, metadata) + results.append(result) + await insert_md_file_to_knowledge( name=file_name, path=path, metadata=metadata.__dict__ ) - return res + return results + + +async def fetch_periodic_sec_filings( + cik_or_ticker: str, + forms: List[str] | str = "10-Q", + year: Optional[int | List[int]] = None, + quarter: Optional[int | List[int]] = None, + limit: int = 10, +): + """Fetch periodic SEC filings (10-K/10-Q) and ingest into knowledge. + + - Designed for regular, scheduled reports with filing_date year/quarter filters (edgar API behavior). + - If year is omitted, fetch latest filings via latest(limit) ordered by filing_date, constrained by forms. If quarter is provided, year must also be provided. + + Date concept guidance: + - Filing date (filing_date): When the filing was submitted to the SEC. edgar filters by filing_date for year/quarter. + - Period of report (period_of_report): The reporting period end date covered by the document (fiscal year/quarter-end). It may differ from filing_date. + - Fiscal vs calendar: Users saying "Q3/FY" usually refer to period_of_report; however, the year/quarter parameters passed to edgar here filter by filing_date. + + Args: + cik_or_ticker: CIK or ticker symbol (no quotes or backticks). + forms: "10-K", "10-Q" or a list of these. Defaults to "10-Q". + year: Single year or list of years to include (by filing_date). When omitted, the tool returns the latest filings using `limit`. + quarter: Single quarter (1-4) or list of quarters (by filing_date). Requires `year` to be provided. + limit: When `year` is omitted, number of latest filings to return (by filing_date). Defaults to 10. + + Returns: + List[SECFilingResult] + """ + req_forms = set(_ensure_list(forms)) or {"10-Q"} + company = Company(cik_or_ticker) + + # If year is omitted, use latest(limit). Quarter without year is not supported. + if year is None: + if quarter is not None: + raise ValueError( + "quarter requires year to be specified for periodic filings" + ) + filings = company.get_filings(form=list(req_forms)).latest(limit) + if isinstance(filings, EntityFilings): + items = list(filings) + else: + items = [filings] + return await _write_and_ingest(items, Path(get_knowledge_path())) + + filings = company.get_filings(form=list(req_forms), year=year, quarter=quarter) + + return await _write_and_ingest(filings, Path(get_knowledge_path())) + + +async def fetch_event_sec_filings( + cik_or_ticker: str, + forms: List[str] | str = "8-K", + start_date: Optional[str | date] = None, + end_date: Optional[str | date] = None, + limit: int = 10, +): + """Fetch event-driven filings (e.g., 8-K, Forms 3/4/5) with optional date-range and limit. + + Args: + cik_or_ticker: CIK or ticker symbol (no quotes or backticks). + forms: One or more of ["8-K", "3", "4", "5"]. Defaults to "8-K". + start_date: Inclusive start date (YYYY-MM-DD or date). + end_date: Inclusive end date (YYYY-MM-DD or date). + limit: Maximum number of filings to fetch after filtering. Defaults to 10. + (Note: The tool will always ingest written markdown into the knowledge base.) + + Returns: + List[SECFilingResult] + """ + sd = _parse_date(start_date) + ed = _parse_date(end_date) + if sd and ed and sd > ed: + raise ValueError("start_date cannot be after end_date") + + req_forms = set(_ensure_list(forms)) or {"8-K"} + company = Company(cik_or_ticker) + + # If no date range specified, leverage edgar's latest(count) for efficiency + if not sd and not ed: + filings = company.get_filings(form=list(req_forms)).latest(limit) + if isinstance(filings, EntityFilings): + items = list(filings) + else: + items = [filings] + return await _write_and_ingest(items, Path(get_knowledge_path())) + + # Otherwise, fetch and filter by filing_date range + filings = company.get_filings(form=list(req_forms)) + if isinstance(filings, EntityFilings): + items = list(filings) + else: + items = [filings] + + filtered: List = [] + for f in items: + f_date = f.filing_date + if sd and f_date < sd: + continue + if ed and f_date > ed: + continue + filtered.append(f) + + # Sort desc and apply limit + filtered.sort(key=lambda f: f.filing_date, reverse=True) + if limit is not None and limit > 0: + filtered = filtered[:limit] + + return await _write_and_ingest(filtered, Path(get_knowledge_path())) diff --git a/python/valuecell/agents/research_agent/vdb.py b/python/valuecell/agents/research_agent/vdb.py index 7ed726d19..90c4512e2 100644 --- a/python/valuecell/agents/research_agent/vdb.py +++ b/python/valuecell/agents/research_agent/vdb.py @@ -1,4 +1,6 @@ -from agno.knowledge.embedder.google import GeminiEmbedder +import os + +from agno.knowledge.embedder.openai import OpenAIEmbedder from agno.vectordb.lancedb import LanceDb from agno.vectordb.search import SearchType @@ -6,7 +8,14 @@ # embedder = SentenceTransformerEmbedder(id="all-MiniLM-L6-v2", dimensions=384) # reranker = SentenceTransformerReranker(model="BAAI/bge-reranker-v2-m3", top_n=8) -embedder = GeminiEmbedder(id="gemini-embedding-001", dimensions=1536) +# embedder = GeminiEmbedder(id="gemini-embedding-001", dimensions=1536) +embedder = OpenAIEmbedder( + dimensions=int(os.getenv("EMBEDDER_DIMENSION", 1536)), + id=os.getenv("EMBEDDER_MODEL"), + base_url=os.getenv("EMBEDDER_BASE_URL"), + api_key=os.getenv("EMBEDDER_API_KEY"), +) + vector_db = LanceDb( table_name="research_agent_knowledge_base", diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 4286aceb7..fcc947a34 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -339,45 +339,50 @@ async def test_continue_planning_metadata_retrieval( """Test that _continue_planning correctly retrieves metadata from context.""" from valuecell.core.coordinate.orchestrator import ExecutionContext from valuecell.core.constants import PLANNING_TASK, ORIGINAL_USER_INPUT - + # Create a real asyncio.Task-like object that can be awaited import asyncio - + async def mock_plan_coroutine(): return Mock() # Mock ExecutionPlan - + # Create actual task from coroutine, but mark it as done with a result mock_planning_task = asyncio.create_task(mock_plan_coroutine()) # Wait a bit to let it complete await asyncio.sleep(0.01) - + # Create execution context with required metadata context = ExecutionContext("planning", conversation_id, "thread-1", "user-1") - context.add_metadata(**{ - PLANNING_TASK: mock_planning_task, - ORIGINAL_USER_INPUT: sample_user_input - }) - + context.add_metadata( + **{PLANNING_TASK: mock_planning_task, ORIGINAL_USER_INPUT: sample_user_input} + ) + # Set up execution context in orchestrator orchestrator._execution_contexts[conversation_id] = context - + # Mock dependencies orchestrator._response_factory.plan_failed = Mock() - + async def mock_execute_plan(*args): yield Mock() - + # Mock the async generator method directly - orchestrator._execute_plan_with_input_support = Mock(return_value=mock_execute_plan()) - + orchestrator._execute_plan_with_input_support = Mock( + return_value=mock_execute_plan() + ) + # Call the method to trigger metadata retrieval (lines 507-508) results = [] - async for response in orchestrator._continue_planning(conversation_id, "thread-1", context): + async for response in orchestrator._continue_planning( + conversation_id, "thread-1", context + ): results.append(response) - + # Verify that the method executed successfully # The fact that we got here without errors means metadata was retrieved correctly - assert conversation_id not in orchestrator._execution_contexts # Context should be cleaned up + assert ( + conversation_id not in orchestrator._execution_contexts + ) # Context should be cleaned up assert mock_planning_task.done() # Task should be completed assert len(results) >= 1 # Should have yielded at least one response @@ -389,33 +394,35 @@ async def test_cancel_execution_with_planning_task( """Test that _cancel_execution correctly retrieves planning_task metadata.""" from valuecell.core.coordinate.orchestrator import ExecutionContext from valuecell.core.constants import PLANNING_TASK - + # Create mock planning task mock_planning_task = Mock() mock_planning_task.done.return_value = False mock_planning_task.cancel = Mock() - + # Create execution context with planning task context = ExecutionContext("planning", conversation_id, "thread-1", "user-1") context.add_metadata(**{PLANNING_TASK: mock_planning_task}) - + # Set up execution context in orchestrator orchestrator._execution_contexts[conversation_id] = context - + # Mock user input manager orchestrator.user_input_manager.clear_request = Mock() - + # Mock conversation manager mock_conversation = _stub_conversation() orchestrator.conversation_manager.get_conversation.return_value = mock_conversation orchestrator.conversation_manager.update_conversation = AsyncMock() - + # Call _cancel_execution to trigger await orchestrator._cancel_execution(conversation_id) - + # Verify planning task was retrieved and cancelled mock_planning_task.cancel.assert_called_once() - + # Verify context cleanup assert conversation_id not in orchestrator._execution_contexts - orchestrator.user_input_manager.clear_request.assert_called_once_with(conversation_id) + orchestrator.user_input_manager.clear_request.assert_called_once_with( + conversation_id + )