diff --git a/samples/finance-insight-agent/.env.example b/samples/finance-insight-agent/.env.example new file mode 100644 index 000000000..8d03b991e --- /dev/null +++ b/samples/finance-insight-agent/.env.example @@ -0,0 +1,21 @@ +# Finance Insight Service - Environment Variables Template +# Copy this file to .env and fill in your API keys + +# Required: OpenAI API for AI agents (GPT-4, embeddings) +OPENAI_API_KEY=sk-... + +# News Search (required) +## Get from serpapi.com +SERPAPI_API_KEY= + +## Market Data (required) - Get from twelvedata.com +TWELVE_DATA_API_KEY= + +## Company Fundamentals (required for fundamental analysis) - Get from alphavantage.co +ALPHAVANTAGE_API_KEY= + +# Optional: Backend API authentication (protects endpoints like /config) +API_KEY= + +# Optional: CORS allow-list (comma-separated origins) +CORS_ALLOWED_ORIGINS=http://localhost:3000,http://localhost:3001 diff --git a/samples/finance-insight-agent/.gitignore b/samples/finance-insight-agent/.gitignore new file mode 100644 index 000000000..b7242f0d5 --- /dev/null +++ b/samples/finance-insight-agent/.gitignore @@ -0,0 +1,9 @@ +.env +__pycache__/ +.venv/ +.pytest_cache/ +*.egg-info/ +data/ +node_modules/ +.next/ +.choreo/ diff --git a/samples/finance-insight-agent/API_KEYS.md b/samples/finance-insight-agent/API_KEYS.md new file mode 100644 index 000000000..a3f44fdcd --- /dev/null +++ b/samples/finance-insight-agent/API_KEYS.md @@ -0,0 +1,119 @@ +# API Key Management + +## Quick Links + +| Service | Purpose | Get API Key | Free Tier | +|---------|---------|-------------|-----------| +| **OpenAI** | LLM for agents (Required) | [platform.openai.com/api-keys](https://platform.openai.com/api-keys) | $5 free credit | +| **SerpAPI** | Google search for news (Required) | [serpapi.com](https://serpapi.com) | 250 searches/month | +| **Alpha Vantage** | Company fundamentals (Required for ratios) | [alphavantage.co/support/#api-key](https://www.alphavantage.co/support/#api-key) | 25 requests/day | +| **Twelve Data** | Market data (Required) | [twelvedata.com/apikey](https://twelvedata.com/apikey) | Credits/day (varies by endpoint) | + +--- + +## How to Get API Keys + +### 1. OpenAI API Key (Required) + +**Step-by-step:** +1. Go to [platform.openai.com](https://platform.openai.com) +2. Sign up or log in to your account +3. Click your profile icon → **View API Keys** +4. Click **Create new secret key** +5. Name it "Finance Insight Service" +6. Copy the key (starts with `sk-proj-...`) +7. **Save it immediately** - you won't see it again! + +**Pricing:** +- GPT-4o: $2.50 / 1M input tokens, $10 / 1M output tokens +- GPT-4o-mini: $0.15 / 1M input tokens, $0.60 / 1M output tokens +- Average query cost: ~$0.10-0.30 with GPT-4o + +**Important:** Add billing information at [platform.openai.com/settings/billing](https://platform.openai.com/settings/billing) to increase rate limits. + +### 2. SerpAPI Key (Required) + +**Step-by-step:** +1. Go to [serpapi.com](https://serpapi.com) +2. Sign up for an account +3. Go to your dashboard and find the API key +4. Copy the key (long alphanumeric string) + +**Free Tier:** +- 250 searches per month (free tier may change) +- See pricing and free-tier limits: https://serpapi.com/pricing +- Credit card might be required depending on plan + +### 3. Alpha Vantage API Key (Required for fundamentals) + +**Step-by-step:** +1. Go to [alphavantage.co/support/#api-key](https://www.alphavantage.co/support/#api-key) +2. Enter your email and click **GET FREE API KEY** +3. Key is sent to your email instantly +4. Copy the key (alphanumeric string) + +**Free Tier:** +- 25 API requests per day +- 5 requests per minute +- No credit card required + +### 4. Twelve Data API Key (Required for market data) + +**Step-by-step:** +1. Go to [twelvedata.com](https://twelvedata.com) +2. Sign up for free account +3. Go to [Dashboard → API Key](https://twelvedata.com/apikey) +4. Copy your API key + +**Free Tier:** +- Quota is based on credits per day (endpoint costs vary) +- See pricing and limits: https://twelvedata.com/pricing + +--- + +## How It Works + +### Development Mode +In development, the backend reads API keys from a `.env` file in the project root: + +```bash +# Copy the example file +cp .env.example .env + +# Edit with your keys +nano .env +``` + +The backend must be **restarted** after changing `.env`: +```bash +uv run finance_insight_api --host 0.0.0.0 --port 5000 +``` + +### Production Mode +For production deployments, set environment variables directly: + +**Docker (recommended):** +```bash +# Create .env with your secrets (keep it out of git) +docker run --env-file .env finance-insight +``` + +**Systemd Service:** +```ini +[Service] +Environment="OPENAI_API_KEY=sk-..." +Environment="SERPAPI_API_KEY=..." +Environment="TWELVE_DATA_API_KEY=..." +Environment="ALPHAVANTAGE_API_KEY=..." +``` + +**OpenChoreo / AMP:** +- Set these as environment variables when creating or updating the agent in the AMP UI. + + +## Required Keys + +- **OPENAI_API_KEY** - AI agents won't work without this +- **SERPAPI_API_KEY** - Required for news search +- **TWELVE_DATA_API_KEY** - Required for market data +- **ALPHAVANTAGE_API_KEY** - Required for fundamentals diff --git a/samples/finance-insight-agent/Dockerfile b/samples/finance-insight-agent/Dockerfile new file mode 100644 index 000000000..00899b442 --- /dev/null +++ b/samples/finance-insight-agent/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.11-slim + +ENV PYTHONUNBUFFERED=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends build-essential libgomp1 \ + && rm -rf /var/lib/apt/lists/* + +COPY . . + +RUN pip install --no-cache-dir uv \ + && uv sync --frozen + +ENV VIRTUAL_ENV=/app/.venv +ENV PATH="$VIRTUAL_ENV/bin:$PATH" + +EXPOSE 8000 + +CMD ["python", "-m", "finance_insight_service.api_server", "--host", "0.0.0.0", "--port", "8000"] diff --git a/samples/finance-insight-agent/README.md b/samples/finance-insight-agent/README.md new file mode 100644 index 000000000..1ee9217f1 --- /dev/null +++ b/samples/finance-insight-agent/README.md @@ -0,0 +1,102 @@ +# Finance Insight Service + +## Overview +Finance Insight Service is a stateless, async report generator for financial research and analysis. It combines current market context with deterministic calculations to deliver concise, evidence-aware responses. + +## Key Features +- **Evidence-aware research** - Collects relevant financial context before answering +- **Deterministic quant** - All numbers are computed, not guessed +- **Audit + report split** - Validation and reporting are separated for clarity +- **Stateless requests** - Each request runs independently (no chat memory) + + +## Architecture + +The system runs a sequential, agent-based workflow with explicit quality gates: + +1. **Research** - Collects relevant financial context and evidence +2. **Quant** - Computes required metrics and scenarios deterministically +3. **Audit** - Validates outputs and flags issues +4. **Report** - Produces the final user-facing response + +Request flow (stateless): +- UI submits a request to `/chat/async` +- API server starts a background job and emits progress traces +- UI polls `/chat/async//status` and fetches `/result` when complete + +Core components: +- **Scenario UI** (web) for request submission and report viewing +- **API server (Flask)** for async job execution and status/result APIs +- **Agent orchestrator (CrewAI)** running Research → Quant → Audit → Report +- **Tools**: SerpAPI news search, Twelve Data OHLCV, Alpha Vantage fundamentals, Safe Python Exec +- **LLM provider**: OpenAI for reasoning and embeddings in tools + +Design principles: +- Strict handoff between stages to preserve context and quality +- Deterministic computations instead of LLM-generated numbers +- Transparent limitations whenever data is missing or uncertain + +![Architecture diagram](image.png) + +## AMP / Choreo Deployment + +AMP repo: + +### Prerequisites +- Kubernetes cluster (k3d or equivalent) +- AMP installed (see AMP quick start guide) +- Docker registry accessible to the cluster +- API keys for OpenAI, SerpAPI, Twelve Data, Alpha Vantage (the service will start without them, but related features will fail or degrade at runtime) + +### Create Component Definition +Create `.choreo/component.yaml` in your project root: + +```yaml +schemaVersion: "1.0" +id: finance-insight +name: Finance Insight Service +type: service +description: AI-powered financial research assistant +runtime: python +buildType: dockerfile +image: Dockerfile +ports: + - port: 8000 + type: http +env: + - name: OPENAI_API_KEY + valueFrom: SECRET + - name: SERPAPI_API_KEY + valueFrom: SECRET + - name: TWELVE_DATA_API_KEY + valueFrom: SECRET + - name: ALPHAVANTAGE_API_KEY + valueFrom: SECRET +``` + +### Build and Push Image +```bash +docker build -t finance-insight-service:latest . +docker tag finance-insight-service:latest \ + localhost:10082/default-finance-insight-image:v1 +docker push localhost:10082/default-finance-insight-image:v1 +``` + +### Deploy via AMP Console +1. Open AMP Console at `http://default.localhost:9080` +2. Create Agent → Service → Python → Port 8000 +3. Add environment variables listed above +4. Deploy and verify health at `/finance-insight/health` + +## Configuration +Copy `.env.example` to `.env` and set these keys: +- `OPENAI_API_KEY` +- `SERPAPI_API_KEY` +- `TWELVE_DATA_API_KEY` +- `ALPHAVANTAGE_API_KEY` +- `CORS_ALLOWED_ORIGINS` (optional, comma-separated origins for allowed frontend URLs, e.g. `http://localhost:3000,http://localhost:3001`) + +See key setup details in [API_KEYS.md](API_KEYS.md). + +## Contributing +Open an issue or submit a pull request with clear context and test notes. diff --git a/samples/finance-insight-agent/image.png b/samples/finance-insight-agent/image.png new file mode 100644 index 000000000..fd3f3858b Binary files /dev/null and b/samples/finance-insight-agent/image.png differ diff --git a/samples/finance-insight-agent/pyproject.toml b/samples/finance-insight-agent/pyproject.toml new file mode 100644 index 000000000..2922b3904 --- /dev/null +++ b/samples/finance-insight-agent/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "finance-insight-service" +version = "0.1.0" +description = "Finance Insight Service - research agent" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "crewai[tools]>=1.8.0,<1.9.0", + "crewai-tools>=1.8.0,<1.9.0", + "openinference-instrumentation-crewai>=0.1.0,<0.2.0", + "flask>=3.0.0,<4.0.0", + "flask-cors>=4.0.0,<5.0.0", + "numpy>=1.26.4,<2.0.0", + "openai>=1.40.0,<2.0.0", + "pandas>=2.2.2,<3.0.0", + "python-dotenv>=1.0.1,<2.0.0", + "setuptools>=65.0.0,<80.0.0", +] + +[project.scripts] +finance_insight_api = "finance_insight_service.api_server:main" + +[tool.uv] +package = true diff --git a/samples/finance-insight-agent/src/finance_insight_service/__init__.py b/samples/finance-insight-agent/src/finance_insight_service/__init__.py new file mode 100644 index 000000000..1b9ecc196 --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/__init__.py @@ -0,0 +1 @@ +"""Finance Insight Service package.""" diff --git a/samples/finance-insight-agent/src/finance_insight_service/api_server.py b/samples/finance-insight-agent/src/finance_insight_service/api_server.py new file mode 100644 index 000000000..534b47114 --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/api_server.py @@ -0,0 +1,576 @@ +from __future__ import annotations + +import argparse +import json +import os +import threading +import time +import uuid +from datetime import datetime, timezone +from enum import Enum +from typing import Any +from dotenv import load_dotenv +from flask import Flask, jsonify, request +from flask_cors import CORS +from openinference.instrumentation.crewai import CrewAIInstrumentor + +# Disable CrewAI interactive tracing prompt that causes timeout in containerized environments +os.environ.setdefault("CREWAI_TRACING_ENABLED", "false") + +from crewai.events import ( + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + CrewKickoffStartedEvent, + TaskCompletedEvent, + TaskFailedEvent, + TaskStartedEvent, + crewai_event_bus, +) +from finance_insight_service.crew import FinanceInsightCrew + + +class JobStatus(str, Enum): + """Enumerates async job lifecycle states.""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +# In-memory job storage ; This sample assumes a single replica; multiple replicas will not share job state. +jobs = {} +jobs_lock = threading.Lock() +JOB_EXPIRATION_SECONDS = 600 # Jobs expire after 10 minutes (reduced to free memory faster) + + +def _utc_now() -> datetime: + """Return the current UTC time.""" + return datetime.now(timezone.utc) + + +def _is_job_cancelled(job_id: str) -> bool: + """Check whether the job has been cancelled.""" + with jobs_lock: + job = jobs.get(job_id) + return bool(job and job.get("status") == JobStatus.CANCELLED) + + +def _cleanup_expired_jobs(): + """Background thread to periodically clean up expired jobs.""" + while True: + time.sleep(300) # Check every 5 minutes + try: + now = _utc_now() + expired_jobs = [] + + with jobs_lock: + for job_id, job in list(jobs.items()): + # Parse completion time + updated_str = job.get("updated_at", "") + if updated_str: + try: + updated_time = datetime.fromisoformat(updated_str.replace('Z', '+00:00')) + except (ValueError, TypeError): + continue + + age_seconds = (now - updated_time).total_seconds() + + # Remove completed/failed jobs older than expiration time + if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] and age_seconds > JOB_EXPIRATION_SECONDS: + expired_jobs.append(job_id) + del jobs[job_id] + print(f"[CLEANUP] Removed expired job {job_id} (age: {age_seconds:.0f}s)") + + if expired_jobs: + import gc + gc.collect() + print(f"[CLEANUP] Cleaned up {len(expired_jobs)} expired jobs, freed memory") + + except Exception as e: + print(f"[CLEANUP] Error in cleanup thread: {e}") + + +def _normalize_list(value: Any) -> list[str]: + """Normalize list-like inputs into a list of non-empty strings.""" + if value is None: + return [] + if isinstance(value, list): + return [str(item).strip() for item in value if str(item).strip()] + if isinstance(value, str): + return [v.strip() for v in value.split(",") if v.strip()] + return [str(value).strip()] + + +def _bounded_int(value: Any, default: int, min_v: int, max_v: int) -> int: + """Parse an int and clamp to an allowed range, falling back to default.""" + try: + v = int(value) + except (TypeError, ValueError): + return default + return v if min_v <= v <= max_v else default + + +def _build_search_query(query: str, tickers: Any, sites: Any) -> str: + """Build a SerpAPI search query with tickers and site filters.""" + parts = [query.strip()] if query.strip() else [] + tickers_list = _normalize_list(tickers) + if tickers_list: + parts.append("(" + " OR ".join(tickers_list) + ")") + sites_list = _normalize_list(sites) + if sites_list: + parts.append("(" + " OR ".join(f"site:{s}" for s in sites_list) + ")") + return " ".join(parts).strip() + + +def _format_task_label(task_name: str | None) -> str: + """Format a task name into a user-facing label.""" + name = (task_name or "").lower() + if "research" in name: + return "Research" + if "quant" in name: + return "Quant" + if "audit" in name: + return "Audit" + if "report" in name: + return "Report" + return (task_name or "Task").replace("_", " ").title() + + +def _extract_text(value: Any) -> str: + """Extract text from common CrewAI output shapes.""" + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, dict): + if "final_response" in value: + return str(value["final_response"]) + if "report" in value: + return str(value["report"]) + for attr in ("raw", "output", "json"): + if hasattr(value, attr): + try: + extracted = getattr(value, attr) + except Exception: + continue + if extracted: + return str(extracted) + return str(value) + + +def _extract_final_response(raw: Any) -> tuple[str, Any]: + """Extract a final response string and parsed payload.""" + if raw is None: + return "", raw + text = _extract_text(raw) + stripped = text.strip() + if not stripped: + return "", raw + try: + parsed = json.loads(stripped) + except json.JSONDecodeError: + return stripped, raw + if isinstance(parsed, dict): + if parsed.get("final_response"): + return str(parsed["final_response"]), parsed + if parsed.get("report"): + return str(parsed["report"]), parsed + return stripped, parsed + + +def _build_inputs(payload: dict[str, Any]) -> dict[str, Any]: + """Build the input dictionary for the crew.""" + user_request = payload.get("message", "") + query = payload.get("query") or user_request + tickers = payload.get("tickers", "") + sites = payload.get("sites", "") + symbol = payload.get("symbol", "") + interval = payload.get("interval", "1day") + outputsize = _bounded_int(payload.get("outputsize"), 260, 10, 5000) + horizon_days = _bounded_int(payload.get("horizon_days"), 30, 1, 365) + provided_data = payload.get("provided_data", "") + if isinstance(provided_data, (dict, list)): + provided_data = json.dumps(provided_data) + search_query = _build_search_query(query, tickers, sites) + + # Get current date/time to provide context + current_date = datetime.now().strftime("%Y-%m-%d") + current_year = datetime.now().year + days = _bounded_int(payload.get("days"), 7, 1, 30) + max_articles = _bounded_int(payload.get("max_articles"), 8, 1, 20) + + return { + "user_request": user_request, + "current_date": current_date, + "current_year": current_year, + "sources_requested": str(bool(payload.get("sources_requested"))), + "query": query, + "tickers": tickers, + "sites": sites, + "days": days, + "max_articles": max_articles, + "search_query": search_query, + "symbol": symbol, + "interval": interval, + "outputsize": outputsize, + "horizon_days": horizon_days, + "request": user_request, + "provided_data": provided_data, + } + + +def create_app() -> Flask: + """Create and configure the Flask app.""" + load_dotenv() + CrewAIInstrumentor().instrument() + + app = Flask(__name__) + allowed_origins_env = os.getenv("CORS_ALLOWED_ORIGINS", "").strip() + if allowed_origins_env: + allowed_origins = [ + origin.strip() + for origin in allowed_origins_env.split(",") + if origin.strip() + ] + else: + allowed_origins = "*" + CORS(app, resources={ + r"/*": { + "origins": allowed_origins, + "methods": ["GET", "POST", "OPTIONS"], + "allow_headers": ["Content-Type", "Authorization", "X-API-Key"], + "expose_headers": ["Content-Type"], + "supports_credentials": False + } + }) + + # Start cleanup thread (only once per app instance) + if not hasattr(create_app, '_cleanup_started'): + cleanup_thread = threading.Thread(target=_cleanup_expired_jobs, daemon=True) + cleanup_thread.start() + create_app._cleanup_started = True + print("[INIT] Started job cleanup thread") + + api_key = os.getenv("API_KEY", "") + + def check_auth() -> bool: + if not api_key: + return True + header = request.headers.get("Authorization", "") + token = "" + if header.lower().startswith("bearer "): + token = header.split(" ", 1)[1].strip() + token = token or request.headers.get("X-API-Key", "").strip() + return token == api_key + + @app.before_request + def _auth_guard(): + if request.path == "/health": + return None + if not check_auth(): + return jsonify({"error": "Unauthorized"}), 401 + return None + + @app.get("/health") + def health(): + import sys + job_count = 0 + pending = 0 + running = 0 + completed = 0 + failed = 0 + + with jobs_lock: + job_count = len(jobs) + for job in jobs.values(): + status = job.get("status") + if status == JobStatus.PENDING: + pending += 1 + elif status == JobStatus.RUNNING: + running += 1 + elif status == JobStatus.COMPLETED: + completed += 1 + elif status == JobStatus.FAILED: + failed += 1 + + return jsonify( + { + "status": "ok", + "jobs": { + "total": job_count, + "pending": pending, + "running": running, + "completed": completed, + "failed": failed, + }, + "memory_mb": sys.getsizeof(jobs) / (1024 * 1024), + } + ) + + @app.get("/config") + def config(): + """Return which API services are configured""" + has_serpapi = bool(os.getenv("SERPAPI_API_KEY")) + has_news = has_serpapi + return jsonify({ + "services": { + "openai": bool(os.getenv("OPENAI_API_KEY")), + "serpapi": has_serpapi, + "twelveData": bool(os.getenv("TWELVE_DATA_API_KEY")), + "alphaVantage": bool(os.getenv("ALPHAVANTAGE_API_KEY")), + }, + "capabilities": { + "news_search": has_news, + "market_data": bool(os.getenv("TWELVE_DATA_API_KEY")), + "fundamentals": bool(os.getenv("ALPHAVANTAGE_API_KEY")), + "ai_agents": bool(os.getenv("OPENAI_API_KEY")), + } + }) + + # ============= ASYNC JOB-BASED ENDPOINTS ============= + + @app.post("/chat/async") + def chat_async(): + """Start async report job and return job ID immediately.""" + payload = request.get_json(force=True) or {} + message = str(payload.get("message", "")).strip() + if not message: + return jsonify({"error": "Empty message"}), 400 + + # Create job + job_id = str(uuid.uuid4()) + + with jobs_lock: + jobs[job_id] = { + "id": job_id, + "status": JobStatus.PENDING, + "request": message, + "traces": [], + "result": None, + "error": None, + "created_at": _utc_now().isoformat(), + "updated_at": _utc_now().isoformat(), + } + + # Start background job + def run_job(): + with jobs_lock: + if jobs[job_id]["status"] == JobStatus.CANCELLED: + return + jobs[job_id]["status"] = JobStatus.RUNNING + jobs[job_id]["updated_at"] = _utc_now().isoformat() + + try: + # Setup trace collection + traces: list[dict[str, Any]] = [] + trace_lock = threading.Lock() + seq = 0 + pending_crew_completed = False + crew_completed_emitted = False + seen_tasks: set[str] = set() + + def emit_trace(event_type: str, task: str | None = None, agent: str | None = None) -> None: + label = _format_task_label(task) + message_text = event_type.replace("_", " ").title() + if event_type == "crew_started": + message_text = "Workflow started" + elif event_type == "crew_completed": + message_text = "Workflow completed" + elif event_type == "crew_failed": + message_text = "Workflow failed" + elif event_type == "task_started": + message_text = f"{label} in progress" + elif event_type == "task_completed": + message_text = f"{label} completed" + elif event_type == "task_failed": + message_text = f"{label} failed" + + with trace_lock: + nonlocal seq + seq += 1 + entry = { + "seq": seq, + "type": event_type, + "message": message_text, + "agent": agent, + "task": task, + "timestamp": _utc_now().isoformat(), + } + traces.append(entry) + with jobs_lock: + jobs[job_id]["traces"] = traces[-10:] + jobs[job_id]["updated_at"] = _utc_now().isoformat() + + # Execute crew + inputs = _build_inputs(payload) + crew = FinanceInsightCrew(job_id=job_id).build_crew() + + try: + # Subscribe to events within a scoped handler context + def handler_wrapper(src, evt): + nonlocal pending_crew_completed + nonlocal crew_completed_emitted + evt_crew_name = getattr(evt, "crew_name", None) + if evt_crew_name and evt_crew_name != crew.name: + return + if isinstance(evt, CrewKickoffStartedEvent): + emit_trace("crew_started") + elif isinstance(evt, CrewKickoffCompletedEvent): + if crew_completed_emitted: + return + if "report_task" in seen_tasks: + emit_trace("crew_completed") + crew_completed_emitted = True + else: + pending_crew_completed = True + elif isinstance(evt, CrewKickoffFailedEvent): + emit_trace("crew_failed") + elif isinstance(evt, TaskStartedEvent): + task_name = getattr(evt.task, "name", None) or "task" + agent = getattr(getattr(evt.task, "agent", None), "role", None) + emit_trace("task_started", task=task_name, agent=agent) + elif isinstance(evt, TaskCompletedEvent): + task_name = getattr(evt.task, "name", None) or "task" + agent = getattr(getattr(evt.task, "agent", None), "role", None) + emit_trace("task_completed", task=task_name, agent=agent) + seen_tasks.add(task_name) + if task_name == "report_task" and not crew_completed_emitted: + emit_trace("crew_completed") + crew_completed_emitted = True + pending_crew_completed = False + elif isinstance(evt, TaskFailedEvent): + task_name = getattr(evt.task, "name", None) or "task" + agent = getattr(getattr(evt.task, "agent", None), "role", None) + emit_trace("task_failed", task=task_name, agent=agent) + + with crewai_event_bus.scoped_handlers(): + for event_cls in [ + CrewKickoffStartedEvent, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + TaskStartedEvent, + TaskCompletedEvent, + TaskFailedEvent, + ]: + crewai_event_bus.on(event_cls)(handler_wrapper) + + result = crew.kickoff(inputs=inputs) + finally: + pass + + if _is_job_cancelled(job_id): + with jobs_lock: + jobs[job_id]["updated_at"] = _utc_now().isoformat() + return + + # Extract and save response + final_response, raw_output = _extract_final_response(result) + report_text = (final_response or "").strip() + if not report_text: + raise ValueError("No report returned from crew.") + + # Update job with result + with jobs_lock: + if jobs[job_id]["status"] == JobStatus.CANCELLED: + jobs[job_id]["updated_at"] = _utc_now().isoformat() + return + jobs[job_id]["status"] = JobStatus.COMPLETED + jobs[job_id]["result"] = { + "report": report_text, + } + jobs[job_id]["updated_at"] = _utc_now().isoformat() + + except Exception as e: + import traceback + traceback.print_exc() + with jobs_lock: + if jobs[job_id]["status"] != JobStatus.CANCELLED: + jobs[job_id]["status"] = JobStatus.FAILED + jobs[job_id]["error"] = str(e) + jobs[job_id]["updated_at"] = _utc_now().isoformat() + + thread = threading.Thread(target=run_job, daemon=True) + thread.start() + + return jsonify({"jobId": job_id, "status": JobStatus.PENDING}) + + @app.get("/chat/async//status") + def get_job_status(job_id: str): + """Get job status and latest traces.""" + with jobs_lock: + job = jobs.get(job_id) + if not job: + return jsonify({"error": "Job not found"}), 404 + + return jsonify({ + "jobId": job["id"], + "status": job["status"], + "traces": job["traces"][-10:], # Last 10 traces + "traceCount": len(job["traces"]), + "updatedAt": job["updated_at"], + }) + + @app.post("/chat/async//cancel") + def cancel_job(job_id: str): + """Cancel a running job (best effort).""" + with jobs_lock: + job = jobs.get(job_id) + if not job: + return jsonify({"error": "Job not found"}), 404 + + if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + return jsonify({"jobId": job_id, "status": job["status"]}) + + job["status"] = JobStatus.CANCELLED + job["error"] = "Cancelled by user" + job["updated_at"] = _utc_now().isoformat() + + return jsonify({"jobId": job_id, "status": JobStatus.CANCELLED}) + + @app.get("/chat/async//result") + def get_job_result(job_id: str): + """Get final job result.""" + with jobs_lock: + job = jobs.get(job_id) + if not job: + return jsonify({"error": "Job not found"}), 404 + + if job["status"] == JobStatus.PENDING or job["status"] == JobStatus.RUNNING: + return jsonify({"error": "Job not yet completed", "status": job["status"]}), 425 + + if job["status"] == JobStatus.FAILED: + return jsonify({"error": job["error"], "status": job["status"]}), 500 + + if job["status"] == JobStatus.CANCELLED: + return jsonify({"error": job.get("error", "Job cancelled"), "status": job["status"]}), 409 + + result = { + "jobId": job["id"], + "status": job["status"], + "result": job["result"], + } + + # Don't delete job immediately - let cleanup thread handle expiration + # This allows retrying result fetch if needed and prevents memory pressure + # Jobs will be auto-cleaned after JOB_EXPIRATION_SECONDS + + return jsonify(result) + + return app + + +def main() -> None: + """CLI entry point for running the API server.""" + parser = argparse.ArgumentParser(description="Finance Insight API server.") + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", type=int, default=5000) + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + + app = create_app() + app.run(host=args.host, port=args.port, debug=args.debug) + + +if __name__ == "__main__": + main() diff --git a/samples/finance-insight-agent/src/finance_insight_service/config/agents.yaml b/samples/finance-insight-agent/src/finance_insight_service/config/agents.yaml new file mode 100644 index 000000000..372c1301c --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/config/agents.yaml @@ -0,0 +1,51 @@ +researcher: + role: > + Researcher + goal: > + Find relevant finance news and extract evidence-backed drivers with citations. + backstory: > + You are a careful financial researcher who only uses sources you can access. + You highlight missing timestamps and avoid inventing facts or citations. + When a website fails, note it in limitations and move on to other sources. + You always attempt to find at least 2-3 accessible sources rather than failing on blocked sites. + +quant: + role: > + Analyst + goal: > + Fetch market data, compute indicators deterministically, and produce bounded scenarios. + backstory: > + You are a quantitative analyst who prioritizes reproducible calculations. + You never do math in your head; you always use the safe Python tool. + You can use numpy and pandas inside safe_python_exec; other modules are not available. + Built-in time module is permitted for pandas internals. + CRITICAL: When the user query mentions "today", "current", "now", or specific years, + ALWAYS use safe_python_exec to get the current date FIRST using datetime module. + Example: print(json.dumps({"current_date": datetime.now().strftime("%Y-%m-%d"), "current_year": datetime.now().year})) + You keep generated code clean: no leading indentation or markdown. + You normalize input data to simple arrays or dictionaries before inputing safe_python_exec to avoid type errors. + You always pass data_json as a list of row dicts or a dict of lists, never the provider wrapper or a JSON string. + You ensure safe_python_exec prints a single JSON object via json.dumps. + You choose only the computations needed for the request and keep scripts small; if big calculations are needed, break them into smaller parts. + You use provided_data when available and only fetch market data when required. + You use company_fundamentals_fetch only when the request needs company fundamentals or ratios. + +auditor: + role: > + Reviewer + goal: > + Validate outputs and flag issues before reporting. + backstory: > + You are a strict quality gate. You only approve outputs that are internally + consistent, evidence-backed, and policy-compliant. If something fails, you + provide precise, bounded repair instructions and state limitations. + +reporter: + role: > + Reporter + goal: > + Draft the final user-facing report from audited outputs. + backstory: > + You write concise, structured reports based on the audit summary and supporting + research and quant outputs. You never add new facts and never ask follow-up + questions. diff --git a/samples/finance-insight-agent/src/finance_insight_service/config/tasks.yaml b/samples/finance-insight-agent/src/finance_insight_service/config/tasks.yaml new file mode 100644 index 000000000..1a51b895a --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/config/tasks.yaml @@ -0,0 +1,222 @@ +research_task: + description: > + You are given a research request: + User request: {user_request} + Current date: {current_date} + Current year: {current_year} + Query: {query} + Tickers: {tickers} + Sites (optional): {sites} + Lookback days: {days} + Max articles: {max_articles} + Search query: {search_query} + + IMPORTANT: Today is {current_date} (year {current_year}). Use this as reference when + evaluating article recency and relevance. Focus on articles from recent months/years. + + Use the configured search tool (serpapi_news_search) to + discover relevant articles with search_query. If tickers are provided, include + them in the search terms. If sites are provided, use site: filters + (for example: site:reuters.com OR site:bloomberg.com). The search is not limited to + 3 sources; gather up to Max articles. + + For each article you choose, use ScrapeWebsiteTool to read the full content. If the + search results include a source_url, prefer that; otherwise use the link. If a URL + times out or fails to load (connection error, bot protection, etc.), note it in + limitations and try other sources. Do not fail the entire task. Aim to successfully + scrape at least 2-3 sources from the search results. + + Extract for each article: + - headline or title (if present) + - published timestamp (ISO 8601 if present; otherwise use "unknown") + - 1 to 3 key points grounded in the article + + Cluster the articles into 2 to 4 drivers (earnings, macro, regulation, product + news, etc.). For each driver, explain "why it matters" using only evidence from + the scraped sources. + + Scope: + - If user_request is not about finance, markets, companies, macroeconomics, or + investing, do not call any tools. Return empty drivers/articles/metrics_formulas + and add a limitation stating the request is out of scope. + - If the user_request is ambiguous, do not ask follow-up questions. Return empty + drivers/articles/metrics_formulas and add a limitation stating what is ambiguous + and what information is needed to proceed. + + Rules: + - Do not invent facts, dates, or citations. + - If a URL cannot be accessed or lacks a publish time, note that in limitations. + - Use only the sources you retrieved in this run. + - If the user request involves predictions, ratios, or valuation metrics, extract + formulas and variable definitions into metrics_formulas for internal use; do not + surface formulas in the final response unless the user requests them or they are + directly explanatory. + expected_output: > + Return strict JSON with these keys: + - drivers: list of 2 to 4 objects with fields: + - driver: short title + - why_it_matters: 1 to 3 sentences + - citations: list of objects with fields url, published_at, evidence + - articles: list of objects with fields: + - url + - headline + - published_at + - key_points (list of strings) + - metrics_formulas: list of objects with fields: + - metric + - formula + - variables + - citation_url + - limitations: list of strings (empty if none) + +quant_task: + description: > + You are given a quantitative request: + Symbol: {symbol} + Interval: {interval} + Output size: {outputsize} + Horizon days: {horizon_days} + Request: {request} + Current date: {current_date} + Current year: {current_year} + Provided data (optional): {provided_data} + + IMPORTANT: Today is {current_date} (year {current_year}). Use this as your reference + point for all time-based analysis. If you need to verify or use dates in calculations, + you can confirm via safe_python_exec with datetime module. + + Scope: + - If the Request is not about financial markets, companies, or quantitative analysis, + do not call any tools. Return a minimal snapshot with limitations noting the request + is out of scope. + - If the Request is ambiguous, do not ask follow-up questions. Return a minimal + snapshot and add a limitation stating what is ambiguous and what information is + needed to proceed. + + Decide which computations are needed based on the Request. Do not compute everything + by default. + + If provided_data is supplied, use it and do not call price_history_fetch unless the + Request explicitly asks for fresh market data. + If provided_data is not supplied and market data is needed, use price_history_fetch. + If the Request needs fundamentals or ratios (ROE, margins, FCF, D/E, P/E, P/B), + use company_fundamentals_fetch unless provided_data already includes those metrics. + company_fundamentals_fetch returns overview plus income/balance/cashflow reports; extract + ratio fields from overview and compute others from the statements as needed. + + Use safe_python_exec as a calculator. Prefer small, single-purpose scripts. + Each call should accept simplified input data, compute one logical step, + and print a single JSON object to stdout. Allowed modules: math, statistics, + datetime, time, json, numpy, pandas. + Always call safe_python_exec with both code and simplified data. + If you need to inspect data, do it outside the safe_python_exec. + If multiple symbols are mentioned, handle one symbol per call; if only one + quant task is available, pick the first symbol and note the limitation. + Do not pass a multi-symbol dict into a single safe_python_exec call. + Prefer simplified inputs: extract only the needed arrays/dicts from any + tool output before calling safe_python_exec (for example, pass a list of + closes or a list of OHLCV dicts, not the full provider wrapper). + Always pass data_json as a list of row dicts or a dict of lists. Do not + pass the full provider wrapper; extract provider_output["data"] first. + If you only have a single row dict, wrap it in a list. + Never pass data_json as a JSON string. Only use json.loads if you + explicitly passed a JSON string. + + If safe_python_exec returns CODE_ERROR, fix the code and retry. Do not proceed + until you get SUCCESS or you have retried 3 times, then return the error in + limitations. + + Coding rules (important to avoid exec errors): + - No leading indentation at top-level lines. + - Use 4 spaces for indents inside blocks only. + - Do not include code fences or markdown in the code string. + - Always include `import json` when printing results. + - Always end with print(json.dumps(result, ensure_ascii=True)). + - Prefer single-line expressions over multi-line blocks where possible. + - When parsing dates, use pandas to_datetime with errors="coerce" and format="mixed". + - Normalize input data before building the DataFrame: + - If data is a JSON string, parse it with json.loads. + - If data is already a dict or list, use it directly; do not call json.loads. + - If data is a dict with a "data" key, use data["data"] as records. + - If data is a list, use it directly as records. + - If records are strings, parse each element with json.loads. + - If required columns are missing, return a limitation explaining it. + - Do not call exit(); return a limitations list instead. + + Default fallback (when Request is empty or unclear): + - last_close, returns_1d, volatility_annualized, data_points. + + Scenarios (only if relevant to the Request): + - base: expected return over horizon_days + - bull: expected return + 1.0 * volatility over horizon_days + - bear: expected return - 1.0 * volatility over horizon_days + + Use the last_close as the starting price if scenarios are computed. Explain + assumptions in the output. + + Output must include as_of, snapshot, and limitations. Include scenarios only + if you computed them. + + Use only the available data; if data is missing, state limitations. + expected_output: > + Return strict JSON with these keys: + - as_of: object with timestamp and provider + - snapshot: object with data_points and computed metrics (keys vary by Request) + - scenarios: optional object with base, bull, bear each containing price_target, + range_low, range_high, and assumptions + - limitations: list of strings (empty if none) + +audit_task: + description: > + You are the Auditor. + User request: {user_request} + Use Research output and Quant output from context. + + Validate the outputs for: + - evidence quality and citations (research) + - numeric sanity and required metrics (quant) + - alignment with the user request + + If something is missing or inconsistent, set audit_status to REJECTED or PARTIAL + and explain issues with precise fix actions. Provide a concise audit_summary for + the report writer to use. + + Scope: + - If user_request is unrelated to finance, markets, companies, macroeconomics, or + investing, set audit_status to REJECTED and add an issue with category "scope". + - If user_request is ambiguous, set audit_status to PARTIAL and add an issue with + category "ambiguity". Do not ask follow-up questions. + expected_output: > + Return strict JSON with these keys: + - audit_status: APPROVED | REJECTED | PARTIAL + - issues: list of objects with category, problem, fix_action + - notes: list of strings + - audit_summary: string (1 to 3 sentences for the report writer) + +report_task: + description: > + You are the Report writer. + User request: {user_request} + Use Research output, Quant output, and Audit output from context. + + Generate the final user-facing response. The response must be concise, direct, + and not conversational. Do not ask follow-up questions. + + Rules: + - If audit_status is REJECTED, respond with a brief refusal and mention the key + limitation from the audit issues. + - If audit_status is PARTIAL, provide a minimal response and include a + "Limitations:" section describing what is missing. + - Do not invent facts or numbers. + - Only use numbers present in quant output. + - If research_output is missing, avoid news claims and state limitations. + - Do not include citations unless the user explicitly requested sources. + - Do not include formulas/variable definitions unless the user requested them. + + Formatting: + - Main response: 2 to 4 sentences or 3 to 5 bullet points. + - Add "Limitations:" if needed. + - Add "Sources:" only if requested. + expected_output: > + Return strict JSON with these keys: + - final_response: string (main answer, then Limitations:/Sources: sections if applicable) diff --git a/samples/finance-insight-agent/src/finance_insight_service/crew.py b/samples/finance-insight-agent/src/finance_insight_service/crew.py new file mode 100644 index 000000000..94bfcad5b --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/crew.py @@ -0,0 +1,169 @@ +from crewai import Agent, Crew, Process, Task +from crewai.project import CrewBase, agent, crew, task +from crewai_tools import ScrapeWebsiteTool + +from finance_insight_service.tools.company_fundamentals_fetch import ( + CompanyFundamentalsFetchTool, +) +from finance_insight_service.tools.safe_python_exec import SafePythonExecTool +from finance_insight_service.tools.price_history_fetch import PriceHistoryFetchTool +from finance_insight_service.tools.serpapi_news_search import SerpApiNewsSearchTool + + +@CrewBase +class FinanceInsightCrew: + """Research + quant crew for finance insight service.""" + + agents_config = "config/agents.yaml" + tasks_config = "config/tasks.yaml" + + def __init__(self, job_id: str | None = None) -> None: + """Initialize the crew with an optional job identifier.""" + self.job_id = job_id + + @agent + def researcher(self) -> Agent: + """Create the research agent.""" + return Agent( + config=self.agents_config["researcher"], + tools=[SerpApiNewsSearchTool(), ScrapeWebsiteTool()], + verbose=True, + allow_delegation=False, + ) + + @agent + def quant(self) -> Agent: + """Create the quantitative analysis agent.""" + return Agent( + config=self.agents_config["quant"], + tools=[ + PriceHistoryFetchTool(), + CompanyFundamentalsFetchTool(), + SafePythonExecTool(), + ], + verbose=True, + allow_delegation=False, + ) + + @agent + def auditor(self) -> Agent: + """Create the audit agent.""" + return Agent( + config=self.agents_config["auditor"], + verbose=True, + allow_delegation=False, + ) + + @agent + def reporter(self) -> Agent: + """Create the report-writing agent.""" + return Agent( + config=self.agents_config["reporter"], + verbose=True, + allow_delegation=False, + ) + + @task + def research_task(self) -> Task: + """Build the research task definition.""" + return Task( + config=self.tasks_config["research_task"], + agent=self.researcher(), + name="research_task", + ) + + @task + def quant_task(self) -> Task: + """Build the quantitative analysis task definition.""" + return Task( + config=self.tasks_config["quant_task"], + agent=self.quant(), + name="quant_task", + ) + + @task + def audit_task(self) -> Task: + """Build the audit task definition.""" + return Task( + config=self.tasks_config["audit_task"], + agent=self.auditor(), + name="audit_task", + ) + + @task + def report_task(self) -> Task: + """Build the report task definition.""" + return Task( + config=self.tasks_config["report_task"], + agent=self.reporter(), + name="report_task", + ) + + def build_crew( + self, task_names: list[str] | None = None, include_all_agents: bool = True + ) -> Crew: + """Build a crew with selected tasks and agents.""" + research_task = self.research_task() + quant_task = self.quant_task() + audit_task = self.audit_task() + report_task = self.report_task() + + full_order = [research_task, quant_task, audit_task, report_task] + + task_map = { + "research": research_task, + "quant": quant_task, + "audit": audit_task, + "report": report_task, + } + if task_names: + unknown = [name for name in task_names if name not in task_map] + if unknown: + raise ValueError(f"Unknown task names: {', '.join(unknown)}") + selected_tasks = [task_map[name] for name in task_names] + else: + selected_tasks = full_order + + if set(selected_tasks) == set(full_order): + quant_task.context = [research_task] + audit_task.context = [research_task, quant_task] + report_task.context = [research_task, quant_task, audit_task] + + if include_all_agents: + agents = [ + self.researcher(), + self.quant(), + self.auditor(), + self.reporter(), + ] + else: + selected_names = set(task_names or task_map.keys()) + agents = [] + if "research" in selected_names: + agents.append(self.researcher()) + if "quant" in selected_names: + agents.append(self.quant()) + if "audit" in selected_names: + agents.append(self.auditor()) + if "report" in selected_names: + agents.append(self.reporter()) + + crew_name = ( + f"finance_insight_crew_{self.job_id}" + if self.job_id + else "finance_insight_crew" + ) + + return Crew( + name=crew_name, + agents=agents, + tasks=selected_tasks, + process=Process.sequential, + verbose=True, + tracing=True, + ) + + @crew + def crew(self) -> Crew: + """Creates the Finance Insight crew.""" + return self.build_crew() diff --git a/samples/finance-insight-agent/src/finance_insight_service/tools/__init__.py b/samples/finance-insight-agent/src/finance_insight_service/tools/__init__.py new file mode 100644 index 000000000..3d4367a55 --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/tools/__init__.py @@ -0,0 +1 @@ +"""Tooling for the finance insight service.""" diff --git a/samples/finance-insight-agent/src/finance_insight_service/tools/company_fundamentals_fetch.py b/samples/finance-insight-agent/src/finance_insight_service/tools/company_fundamentals_fetch.py new file mode 100644 index 000000000..45545643b --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/tools/company_fundamentals_fetch.py @@ -0,0 +1,123 @@ +import json +import os +from datetime import datetime +from typing import Any +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from pydantic import BaseModel, Field + +from crewai.tools import BaseTool + + +class CompanyFundamentalsFetchArgs(BaseModel): + """Arguments for company fundamentals fetching.""" + symbol: str = Field(..., description="Ticker or symbol to fetch fundamentals for.") + limit: int = Field(1, description="Number of periods to return (latest first).") + + +class CompanyFundamentalsFetchTool(BaseTool): + """CrewAI tool for fetching company fundamentals.""" + name: str = "company_fundamentals_fetch" + description: str = ( + "Fetches fundamentals from Alpha Vantage (overview, income statement, " + "balance sheet, cash flow). Requires ALPHAVANTAGE_API_KEY in the environment." + ) + args_schema: type[BaseModel] = CompanyFundamentalsFetchArgs + + def _run(self, symbol: str, limit: int = 1) -> str: + """Fetch and return fundamentals for a ticker symbol.""" + symbol = (symbol or "").strip().upper() + if not symbol: + return _error_payload("symbol is required") + + api_key = (os.getenv("ALPHAVANTAGE_API_KEY") or "").strip().strip("\"'").strip() + if not api_key: + return _error_payload("ALPHAVANTAGE_API_KEY missing", provider="alpha_vantage") + + try: + limit = int(limit) + except (TypeError, ValueError): + limit = 1 + limit = max(1, min(limit, 12)) + + overview, overview_error = _fetch_alpha("OVERVIEW", symbol, api_key) + income_raw, income_error = _fetch_alpha("INCOME_STATEMENT", symbol, api_key) + balance_raw, balance_error = _fetch_alpha("BALANCE_SHEET", symbol, api_key) + cash_raw, cash_error = _fetch_alpha("CASH_FLOW", symbol, api_key) + + errors = [ + err + for err in [overview_error, income_error, balance_error, cash_error] + if err + ] + payload: dict[str, Any] = { + "provider": "alpha_vantage", + "symbol": symbol, + "fetched_at": datetime.utcnow().isoformat() + "Z", + "fundamentals": { + "overview": overview if isinstance(overview, dict) else {}, + "income_statement": _trim_reports(income_raw, limit), + "balance_sheet": _trim_reports(balance_raw, limit), + "cash_flow": _trim_reports(cash_raw, limit), + }, + "error": "; ".join(errors) if errors else "", + } + return json.dumps(payload, ensure_ascii=True) + + +def _fetch_alpha(function: str, symbol: str, api_key: str) -> tuple[Any, str]: + """Call Alpha Vantage and return payload plus error string.""" + base_url = "https://www.alphavantage.co/query" + query = {"function": function, "symbol": symbol, "apikey": api_key} + url = f"{base_url}?{urlencode(query)}" + request = Request(url, headers={"User-Agent": "FinanceInsightBot/1.0"}) + + try: + with urlopen(request, timeout=20) as response: + payload = json.loads(response.read().decode("utf-8")) + except Exception as exc: + return {}, f"{function} request failed: {exc}" + + if isinstance(payload, dict): + message = payload.get("Error Message") or payload.get("Note") or payload.get( + "Information" + ) + if message: + return {}, message + + return payload, "" + + +def _trim_reports(payload: Any, limit: int) -> dict[str, list[dict[str, Any]]]: + """Trim annual and quarterly reports to the given limit.""" + if not isinstance(payload, dict): + return {"annual": [], "quarterly": []} + + annual = payload.get("annualReports") or [] + quarterly = payload.get("quarterlyReports") or [] + if not isinstance(annual, list): + annual = [] + if not isinstance(quarterly, list): + quarterly = [] + + return {"annual": annual[:limit], "quarterly": quarterly[:limit]} + + +def _error_payload(message: str, provider: str = "") -> str: + """Return a JSON error payload for fundamentals fetches.""" + return json.dumps( + { + "provider": provider, + "symbol": "", + "fetched_at": datetime.utcnow().isoformat() + "Z", + "fundamentals": { + "overview": {}, + "income_statement": {"annual": [], "quarterly": []}, + "balance_sheet": {"annual": [], "quarterly": []}, + "cash_flow": {"annual": [], "quarterly": []}, + }, + "error": message, + }, + ensure_ascii=True, + ) diff --git a/samples/finance-insight-agent/src/finance_insight_service/tools/price_history_fetch.py b/samples/finance-insight-agent/src/finance_insight_service/tools/price_history_fetch.py new file mode 100644 index 000000000..686029a95 --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/tools/price_history_fetch.py @@ -0,0 +1,126 @@ +import json +import os +from datetime import datetime +from typing import Any +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from pydantic import BaseModel, Field + +from crewai.tools import BaseTool + + +class PriceHistoryFetchArgs(BaseModel): + """Arguments for price history fetching.""" + symbol: str = Field(..., description="Ticker or symbol to fetch.") + interval: str = Field("1day", description="Interval (1day, 1week, 1month).") + outputsize: int = Field(365, description="Number of data points to return.") + + +class PriceHistoryFetchTool(BaseTool): + """CrewAI tool for fetching OHLCV price history.""" + name: str = "price_history_fetch" + description: str = ( + "Fetches OHLCV price history from Twelve Data. " + "Requires TWELVE_DATA_API_KEY in the environment." + ) + args_schema: type[BaseModel] = PriceHistoryFetchArgs + + def _run(self, symbol: str, interval: str = "1day", outputsize: int = 365) -> str: + """Fetch OHLCV history for a symbol and return JSON.""" + symbol = (symbol or "").strip() + if not symbol: + return _error_payload("symbol is required") + + interval = (interval or "1day").strip().lower() + outputsize = max(10, min(int(outputsize), 2000)) + + payload = _fetch_twelve_data(symbol, interval, outputsize) + return json.dumps(payload, ensure_ascii=True) + + +def _error_payload(message: str, provider: str = "") -> str: + """Return a JSON error payload for price history fetches.""" + return json.dumps( + { + "provider": provider, + "symbol": "", + "interval": "", + "fetched_at": datetime.utcnow().isoformat() + "Z", + "data": [], + "error": message, + }, + ensure_ascii=True, + ) + + +def _fetch_twelve_data(symbol: str, interval: str, outputsize: int) -> dict[str, Any]: + """Fetch OHLCV data from Twelve Data.""" + api_key = os.getenv("TWELVE_DATA_API_KEY") + if not api_key: + return _error_dict("twelve_data", symbol, interval, "TWELVE_DATA_API_KEY missing") + + query = urlencode( + { + "symbol": symbol, + "interval": interval, + "outputsize": outputsize, + "apikey": api_key, + "format": "JSON", + } + ) + url = f"https://api.twelvedata.com/time_series?{query}" + request = Request(url, headers={"User-Agent": "FinanceInsightBot/1.0"}) + + try: + with urlopen(request, timeout=15) as response: + payload = json.loads(response.read().decode("utf-8")) + except Exception as exc: + return _error_dict("twelve_data", symbol, interval, f"request failed: {exc}") + + if "values" not in payload: + return _error_dict( + "twelve_data", + symbol, + interval, + payload.get("message", "unexpected response"), + ) + + values = payload["values"] + data = [] + for row in values: + try: + data.append( + { + "date": row["datetime"], + "open": float(row["open"]), + "high": float(row["high"]), + "low": float(row["low"]), + "close": float(row["close"]), + "volume": float(row.get("volume") or 0), + } + ) + except (KeyError, ValueError): + continue + + data.reverse() + return { + "provider": "twelve_data", + "symbol": symbol, + "interval": interval, + "fetched_at": datetime.utcnow().isoformat() + "Z", + "data": data[-outputsize:], + "error": "", + } + + +def _error_dict(provider: str, symbol: str, interval: str, message: str) -> dict[str, Any]: + """Build a structured error payload dictionary.""" + return { + "provider": provider, + "symbol": symbol, + "interval": interval, + "fetched_at": datetime.utcnow().isoformat() + "Z", + "data": [], + "error": message, + } diff --git a/samples/finance-insight-agent/src/finance_insight_service/tools/safe_python_exec.py b/samples/finance-insight-agent/src/finance_insight_service/tools/safe_python_exec.py new file mode 100644 index 000000000..917f70140 --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/tools/safe_python_exec.py @@ -0,0 +1,364 @@ +import ast +import datetime as datetime_module +import io +import json +import math +import statistics +import subprocess +import sys +import time +import traceback +import textwrap +from typing import Any +from contextlib import redirect_stdout + +from pydantic import BaseModel, Field + +from crewai.tools import BaseTool + + +class SafePythonExecArgs(BaseModel): + """Arguments for safe Python execution.""" + code: str = Field(..., description="Python code to execute.") + data_json: Any | None = Field( + None, + description=( + "Optional JSON string or object; available in code as `data`." + ), + ) + + +class SafePythonExecTool(BaseTool): + """CrewAI tool for executing restricted, trusted Python code only. + + This is a best-effort sandbox intended for agent-generated code in a + controlled environment. It is not safe for untrusted or adversarial input. + """ + name: str = "safe_python_exec" + description: str = ( + "Executes Python code in a restricted environment and returns a status JSON. " + "Provide code that prints the final JSON output. Accepts JSON string or object." + ) + args_schema: type[BaseModel] = SafePythonExecArgs + + def _run(self, code: str, data_json: Any | None = None) -> str: + """Execute code in a restricted environment and return JSON.""" + code_to_run = textwrap.dedent(code or "").replace("\t", " ").strip() + code_to_run = _normalize_indentation(code_to_run) + if not code_to_run: + return json.dumps( + {"status": "CODE_ERROR", "error": "code is empty", "code": code_to_run}, + ensure_ascii=True, + ) + try: + import numpy as np + import pandas as pd + except ImportError as exc: + return json.dumps( + { + "status": "CODE_ERROR", + "error": f"module import failed: {exc}", + "code": code, + }, + ensure_ascii=True, + ) + + def _deny_exit(*_args, **_kwargs): + raise RuntimeError("exit is not allowed; return limitations instead") + + data_payload = None + if data_json is not None: + try: + data_payload = _parse_json_payload(data_json) + except ValueError as exc: + return json.dumps( + { + "status": "CODE_ERROR", + "error": f"data_json invalid: {exc}", + "code": code_to_run, + }, + ensure_ascii=True, + ) + + try: + payload_json = json.dumps( + {"code": code_to_run, "data": data_payload}, + ensure_ascii=True, + ) + except TypeError: + payload_json = json.dumps( + {"code": code_to_run, "data": str(data_payload)}, + ensure_ascii=True, + ) + + try: + completed = subprocess.run( + [sys.executable, "-c", _EXEC_RUNNER], + input=payload_json, + text=True, + capture_output=True, + timeout=EXEC_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired: + return json.dumps( + { + "status": "TIMEOUT", + "error": f"execution timed out after {EXEC_TIMEOUT_SECONDS}s", + "code": code_to_run, + }, + ensure_ascii=True, + ) + + stdout_text = (completed.stdout or "").strip() + if not stdout_text: + return json.dumps( + { + "status": "CODE_ERROR", + "error": "execution failed: no output from sandbox", + "traceback": completed.stderr, + "code": code_to_run, + }, + ensure_ascii=True, + ) + try: + # Runner outputs a single JSON payload to stdout. + json.loads(stdout_text) + return stdout_text + except json.JSONDecodeError: + return json.dumps( + { + "status": "CODE_ERROR", + "error": "execution failed: invalid sandbox output", + "traceback": completed.stderr, + "code": code_to_run, + }, + ensure_ascii=True, + ) + + +def _parse_json_payload(value: object): + """Parse a JSON payload from string, dict, or list inputs.""" + if isinstance(value, (dict, list)): + return value + if not isinstance(value, str): + raise ValueError("data_json must be a JSON string") + + text = value.strip() + + try: + parsed = json.loads(text) + if isinstance(parsed, str): + return _parse_json_payload(parsed) + return _normalize_parsed_payload(parsed) + except json.JSONDecodeError: + # Fall back to raw decode / literal_eval strategies below. + pass + + decoder = json.JSONDecoder() + for start in (text.find("{"), text.find("[")): + if start == -1: + continue + try: + parsed, _ = decoder.raw_decode(text[start:]) + return _normalize_parsed_payload(parsed) + except json.JSONDecodeError: + continue + + try: + parsed = ast.literal_eval(text) + return _normalize_parsed_payload(parsed) + except (ValueError, SyntaxError) as exc: + raise ValueError(str(exc)) from exc + + +def _normalize_parsed_payload(payload: Any): + """Normalize parsed JSON payloads and nested data values.""" + if isinstance(payload, str): + return payload + + if isinstance(payload, list): + normalized = [] + for item in payload: + if isinstance(item, str): + text = item.strip() + if text.startswith("{") or text.startswith("["): + try: + normalized.append(json.loads(text)) + continue + except json.JSONDecodeError: + # If not valid JSON, try literal_eval or keep as-is. + pass + try: + normalized.append(ast.literal_eval(text)) + continue + except (ValueError, SyntaxError): + # Fall back to original item if not parseable. + pass + normalized.append(item) + return normalized + + if isinstance(payload, dict) and "data" in payload: + data_value = payload["data"] + if isinstance(data_value, str): + payload["data"] = _parse_json_payload(data_value) + elif isinstance(data_value, list): + payload["data"] = _normalize_parsed_payload(data_value) + return payload + + +def _normalize_indentation(code: str) -> str: + """Normalize indentation to avoid syntax errors in code.""" + if not code: + return code + + lines = code.splitlines() + normalized: list[str] = [] + prev_indent = 0 + prev_line = "" + + for line in lines: + stripped = line.lstrip() + if not stripped: + normalized.append("") + continue + + indent = len(line) - len(stripped) + first_token = stripped.split()[0] if stripped.split() else "" + + if first_token in {"elif", "else", "except", "finally"}: + indent = max(prev_indent - 4, 0) + elif prev_line.endswith(":"): + indent = prev_indent + 4 + else: + if prev_indent == 0 and indent > 0: + indent = 0 + elif indent > prev_indent: + indent = prev_indent + + normalized.append(" " * indent + stripped) + prev_indent = indent + prev_line = stripped + + return "\n".join(normalized) + + +EXEC_TIMEOUT_SECONDS = 60 + + +_EXEC_RUNNER = r""" +import json +import math +import statistics +import datetime as datetime_module +import time +import traceback +import io +import sys +from contextlib import redirect_stdout + +try: + import numpy as np + import pandas as pd +except Exception as exc: + print( + json.dumps( + { + "status": "CODE_ERROR", + "error": f"module import failed: {exc}", + "code": "", + }, + ensure_ascii=True, + ) + ) + raise SystemExit(1) + + +def _deny_exit(*_args, **_kwargs): + raise RuntimeError("exit is not allowed; return limitations instead") + + +safe_builtins = { + "abs": abs, + "all": all, + "any": any, + "bool": bool, + "dict": dict, + "enumerate": enumerate, + "Exception": Exception, + "exit": _deny_exit, + "float": float, + "int": int, + "isinstance": isinstance, + "len": len, + "list": list, + "max": max, + "min": min, + "NameError": NameError, + "print": print, + "range": range, + "round": round, + "set": set, + "sorted": sorted, + "str": str, + "sum": sum, + # NOTE: Including `type` enables basic introspection. This sandbox is + # only for trusted, agent-generated code and is not a security + # boundary for untrusted input. + "type": type, + "zip": zip, +} + +allowed_modules = { + "math": math, + "statistics": statistics, + "datetime": datetime_module, + "json": json, + "time": time, + "numpy": np, + "pandas": pd, +} + + +def _limited_import(name, globals=None, locals=None, fromlist=(), level=0): + if name in allowed_modules: + return allowed_modules[name] + raise ImportError(f"Module not allowed: {name}") + + +safe_builtins["__import__"] = _limited_import + +payload = json.load(sys.stdin) +code_to_run = payload.get("code") or "" +data = payload.get("data") + +context = {"__builtins__": safe_builtins} +context.update(allowed_modules) +context["np"] = np +context["pd"] = pd +context["data"] = data + +stdout = io.StringIO() +try: + with redirect_stdout(stdout): + exec(code_to_run, context) + output = stdout.getvalue().strip() + print( + json.dumps( + {"status": "SUCCESS", "final_output": output, "code": code_to_run}, + ensure_ascii=True, + ) + ) +except Exception as exc: + print( + json.dumps( + { + "status": "CODE_ERROR", + "error": f"execution failed: {exc}", + "traceback": traceback.format_exc(), + "code": code_to_run, + }, + ensure_ascii=True, + ) + ) + raise SystemExit(1) +""" diff --git a/samples/finance-insight-agent/src/finance_insight_service/tools/serpapi_news_search.py b/samples/finance-insight-agent/src/finance_insight_service/tools/serpapi_news_search.py new file mode 100644 index 000000000..b3c7aa6d5 --- /dev/null +++ b/samples/finance-insight-agent/src/finance_insight_service/tools/serpapi_news_search.py @@ -0,0 +1,124 @@ +import json +import os +from datetime import datetime +from typing import Any +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from pydantic import BaseModel, Field + +from crewai.tools import BaseTool + + +class SerpApiNewsSearchArgs(BaseModel): + """Arguments for news search via SerpAPI.""" + search_query: str = Field( + ..., description="Search query for recent news articles." + ) + location: str | None = Field( + None, description="Optional location to scope the search (e.g., United States)." + ) + n_results: int = Field(10, description="Maximum number of results to return.") + + +class SerpApiNewsSearchTool(BaseTool): + """CrewAI tool for fetching recent news results.""" + name: str = "serpapi_news_search" + description: str = ( + "Searches recent news via SerpAPI. Requires SERPAPI_API_KEY in the environment." + ) + args_schema: type[BaseModel] = SerpApiNewsSearchArgs + + def _run( + self, + search_query: str, + location: str | None = None, + n_results: int = 10, + ) -> str: + """Search recent news and return results as JSON.""" + query = (search_query or "").strip() + if not query: + return _error_payload("search_query is required") + + api_key = os.getenv("SERPAPI_API_KEY") + if not api_key: + return _error_payload("SERPAPI_API_KEY missing") + + n_results = max(1, min(int(n_results or 10), 20)) + params = { + "engine": "google", + "q": query, + "tbm": "nws", + "num": n_results, + "api_key": api_key, + } + if location: + params["location"] = location + + url = f"https://serpapi.com/search.json?{urlencode(params)}" + request = Request(url, headers={"User-Agent": "FinanceInsightBot/1.0"}) + + try: + with urlopen(request, timeout=15) as response: + payload = json.loads(response.read().decode("utf-8")) + except Exception as exc: + return _error_payload(f"request failed: {exc}") + + if isinstance(payload, dict) and payload.get("error"): + return _error_payload(str(payload.get("error"))) + + news_items = _extract_news_items(payload, n_results) + return json.dumps( + { + "provider": "serpapi", + "query": query, + "fetched_at": datetime.utcnow().isoformat() + "Z", + "news": news_items, + "error": "", + }, + ensure_ascii=True, + ) + + +def _extract_news_items(payload: dict[str, Any], n_results: int) -> list[dict[str, str]]: + """Extract news entries from a SerpAPI payload.""" + items: list[dict[str, str]] = [] + for entry in payload.get("news_results", [])[:n_results]: + items.append( + { + "title": str(entry.get("title") or ""), + "link": str(entry.get("link") or ""), + "snippet": str(entry.get("snippet") or ""), + "date": str(entry.get("date") or ""), + "source": str(entry.get("source") or ""), + } + ) + + if items: + return items + + for entry in payload.get("organic_results", [])[:n_results]: + items.append( + { + "title": str(entry.get("title") or ""), + "link": str(entry.get("link") or ""), + "snippet": str(entry.get("snippet") or ""), + "date": "", + "source": "", + } + ) + return items + + +def _error_payload(message: str) -> str: + """Return a JSON error payload for news searches.""" + return json.dumps( + { + "provider": "serpapi", + "query": "", + "fetched_at": datetime.utcnow().isoformat() + "Z", + "news": [], + "error": message, + }, + ensure_ascii=True, + ) diff --git a/samples/finance-insight-agent/src/ui/.gitignore b/samples/finance-insight-agent/src/ui/.gitignore new file mode 100644 index 000000000..5ef6a5207 --- /dev/null +++ b/samples/finance-insight-agent/src/ui/.gitignore @@ -0,0 +1,41 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# dependencies +/node_modules +/.pnp +.pnp.* +.yarn/* +!.yarn/patches +!.yarn/plugins +!.yarn/releases +!.yarn/versions + +# testing +/coverage + +# next.js +/.next/ +/out/ + +# production +/build + +# misc +.DS_Store +*.pem + +# debug +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.pnpm-debug.log* + +# env files (can opt-in for committing if needed) +.env* + +# vercel +.vercel + +# typescript +*.tsbuildinfo +next-env.d.ts diff --git a/samples/finance-insight-agent/src/ui/PROJECT_GUIDE.md b/samples/finance-insight-agent/src/ui/PROJECT_GUIDE.md new file mode 100644 index 000000000..7b55efc0a --- /dev/null +++ b/samples/finance-insight-agent/src/ui/PROJECT_GUIDE.md @@ -0,0 +1,65 @@ +# Finance Insight UI - Implementation & Backend Integration Guide + +This document summarizes the current UI, how the frontend is wired, and how it +connects to the backend for a stateless, scenario-based report flow. + +## What was implemented + +- Next.js (App Router + TypeScript) UI in `src/ui/`. +- Scenario-based report layout with a top bar and "New request" button. +- Dark/light mode toggle with smooth transitions. +- Real typing input (textarea) with Enter-to-send. +- Settings page for API URL + API key (stored locally). + +## Key UI routes + +- `/` main report UI +- `/settings` API authentication + connection test + +## How the frontend talks to your backend + +Frontend integration lives in `src/ui/lib/api.ts`: + +- `GET /health` used by the Settings page test button. +- `GET /config` used by the Settings page to show service status. +- `POST /chat/async` used to submit a report request (polls for status). + +### Request headers + +If an API key is set in `/settings`, the UI sends: + +- `Authorization: Bearer ` +- `X-API-Key: ` + +### Result format expected by the UI + +The UI expects: + +- `{ result: { report: "text" } }` + +### Environment option + +The UI defaults to the AMP/Choreo gateway URL: + +``` +http://default.localhost:9080/finance-insight +``` + +Override this for local development or other deployments via `src/ui/.env.local`: + +``` +NEXT_PUBLIC_API_BASE_URL=http://localhost:5000 +``` + +The Settings page overrides this per browser (stored in `localStorage`). + +## CrewAI integration notes + +- Run the CrewAI workflow inside `/chat/async`. +- Return a single `report` string in the final result. + +## Security notes + +- API keys stored in localStorage are vulnerable to XSS. Use this only for local dev. +- For production, use server-side auth (sessions/JWT) and do not expose keys. +- Use HTTPS and restrict CORS origins to your frontend domain. diff --git a/samples/finance-insight-agent/src/ui/README.md b/samples/finance-insight-agent/src/ui/README.md new file mode 100644 index 000000000..e215bc4cc --- /dev/null +++ b/samples/finance-insight-agent/src/ui/README.md @@ -0,0 +1,36 @@ +This is a [Next.js](https://nextjs.org) project bootstrapped with [`create-next-app`](https://nextjs.org/docs/app/api-reference/cli/create-next-app). + +## Getting Started + +First, run the development server: + +```bash +npm run dev +# or +yarn dev +# or +pnpm dev +# or +bun dev +``` + +Open [http://localhost:3000](http://localhost:3000) with your browser to see the result. + +You can start editing the page by modifying `app/page.tsx`. The page auto-updates as you edit the file. + +This project uses [`next/font`](https://nextjs.org/docs/app/building-your-application/optimizing/fonts) to automatically optimize and load [Geist](https://vercel.com/font), a new font family for Vercel. + +## Learn More + +To learn more about Next.js, take a look at the following resources: + +- [Next.js Documentation](https://nextjs.org/docs) - learn about Next.js features and API. +- [Learn Next.js](https://nextjs.org/learn) - an interactive Next.js tutorial. + +You can check out [the Next.js GitHub repository](https://github.com/vercel/next.js) - your feedback and contributions are welcome! + +## Deploy on Vercel + +The easiest way to deploy your Next.js app is to use the [Vercel Platform](https://vercel.com/new?utm_medium=default-template&filter=next.js&utm_source=create-next-app&utm_campaign=create-next-app-readme) from the creators of Next.js. + +Check out our [Next.js deployment documentation](https://nextjs.org/docs/app/building-your-application/deploying) for more details. diff --git a/samples/finance-insight-agent/src/ui/app/components/ChatComposer.tsx b/samples/finance-insight-agent/src/ui/app/components/ChatComposer.tsx new file mode 100644 index 000000000..ce1afe305 --- /dev/null +++ b/samples/finance-insight-agent/src/ui/app/components/ChatComposer.tsx @@ -0,0 +1,93 @@ +"use client"; + +import { useState, type ChangeEvent } from "react"; + +type ChatComposerProps = { + onSend: (message: string) => void; + onStop?: () => void; + disabled?: boolean; + loading?: boolean; +}; + +export default function ChatComposer({ + onSend, + onStop, + disabled, + loading, +}: ChatComposerProps) { + const [value, setValue] = useState(""); + + const submit = () => { + if (disabled) { + return; + } + const trimmed = value.trim(); + if (!trimmed) { + return; + } + + onSend(trimmed); + setValue(""); + }; + + const handleKeyDown = (event: React.KeyboardEvent) => { + if (event.key === "Enter" && !event.shiftKey) { + event.preventDefault(); + submit(); + } + }; + + return ( +
+
{ + event.preventDefault(); + submit(); + }} + > +