diff --git a/README.md b/README.md index bcfe19b..dfc5b2c 100644 --- a/README.md +++ b/README.md @@ -236,7 +236,10 @@ python -m dash # CLI mode |----------|----------|-------------| | `OPENAI_API_KEY` | Yes | OpenAI API key | | `EXA_API_KEY` | No | Web search for external knowledge | -| `DB_*` | No | Database config (defaults to localhost) | +| `DB_*` | No | Internal Dash DB config for knowledge/learnings/AgentOS | +| `ANALYTICS_DB_*` | No | Optional analytics DB URLs (`ANALYTICS_DB_`) and descriptions (`ANALYTICS_DB__DESC`) | + +If no `ANALYTICS_DB_*` variables are set, Dash keeps original single-database behavior and uses the internal DB as analytics source. ## Further Reading diff --git a/compose.yaml b/compose.yaml index 4188189..59d1d63 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,4 +1,5 @@ services: + # Internal database used by Dash for knowledge, learnings, and AgentOS state. dash-db: image: agnohq/pgvector:18 container_name: dash-db @@ -31,11 +32,16 @@ services: DATA_DIR: /data RUNTIME_ENV: dev AGNO_DEBUG: "True" + # Internal Dash DB (knowledge, learnings, AgentOS) DB_HOST: dash-db DB_PORT: 5432 DB_USER: ${DB_USER:-ai} DB_PASS: ${DB_PASS:-ai} DB_DATABASE: ${DB_DATABASE:-ai} + # Optional analytics DBs (read-only by credentials): + # ANALYTICS_DB_MAIN=postgresql+psycopg://user:pass@host:5432/dbname + # ANALYTICS_DB_MAIN_DESC=F1 sample data + # If ANALYTICS_DB_* is not set, Dash uses the internal DB as the single analytics source. WAIT_FOR_DB: "True" PRINT_ENV_ON_LOAD: "True" OPENAI_API_KEY: ${OPENAI_API_KEY} diff --git a/dash/agents.py b/dash/agents.py index 76eec7a..0be1d36 100644 --- a/dash/agents.py +++ b/dash/agents.py @@ -20,13 +20,22 @@ from agno.models.openai import OpenAIResponses from agno.tools.mcp import MCPTools from agno.tools.reasoning import ReasoningTools -from agno.tools.sql import SQLTools from agno.vectordb.pgvector import PgVector, SearchType from dash.context.business_rules import BUSINESS_CONTEXT from dash.context.semantic_model import SEMANTIC_MODEL_STR -from dash.tools import create_introspect_schema_tool, create_save_validated_query_tool -from db import db_url, get_postgres_db +from dash.tools import ( + create_analytics_sql_tools, + create_introspect_schema_tool, + create_save_validated_query_tool, +) +from db import ( + db_url, + get_analytics_descriptions, + get_analytics_registry, + get_postgres_db, + has_explicit_analytics_dbs, +) # ============================================================================ # Database & Knowledge @@ -62,11 +71,16 @@ # Tools # ============================================================================ +analytics_registry = get_analytics_registry() +analytics_descriptions = get_analytics_descriptions() +explicit_analytics_dbs_configured = has_explicit_analytics_dbs() + save_validated_query = create_save_validated_query_tool(dash_knowledge) -introspect_schema = create_introspect_schema_tool(db_url) +analytics_tools = create_analytics_sql_tools(analytics_registry) +introspect_schema = create_introspect_schema_tool(analytics_registry) base_tools: list = [ - SQLTools(db_url=db_url), + *analytics_tools, save_validated_query, introspect_schema, MCPTools(url=f"https://mcp.exa.ai/mcp?exaApiKey={getenv('EXA_API_KEY', '')}&tools=web_search_exa"), @@ -76,6 +90,37 @@ # Instructions # ============================================================================ +def _build_databases_section( + registry: dict[str, str], descriptions: dict[str, str] +) -> str: + """Build explicit analytics database instructions for the prompt.""" + lines = ["## AVAILABLE DATABASES", ""] + lines.append("Use these analytics databases for SQL queries:") + lines.append("") + + for name in sorted(registry): + description = descriptions.get(name, "") + suffix = f": {description}" if description else "" + lines.append(f"- **{name}**{suffix}") + + lines.append("") + if len(registry) > 1: + lines.append( + "- Always pass `database` to list_tables, describe_table, " + "run_sql_query, and introspect_schema." + ) + lines.append("- If the request is ambiguous, ask which database to use.") + + return "\n".join(lines) + + +DATABASES_SECTION = ( + _build_databases_section(analytics_registry, analytics_descriptions) + if explicit_analytics_dbs_configured + else "" +) +DATABASES_SECTION_BLOCK = f"{DATABASES_SECTION}\n\n---\n\n" if DATABASES_SECTION else "" + INSTRUCTIONS = f"""\ You are Dash, a self-learning data agent that provides **insights**, not just query results. @@ -151,7 +196,7 @@ --- -## SEMANTIC MODEL +{DATABASES_SECTION_BLOCK}## SEMANTIC MODEL {SEMANTIC_MODEL_STR} --- diff --git a/dash/context/business_rules.py b/dash/context/business_rules.py index d97205f..b0407bd 100644 --- a/dash/context/business_rules.py +++ b/dash/context/business_rules.py @@ -42,6 +42,8 @@ def build_business_context(business_dir: Path | None = None) -> str: lines.append("## METRICS\n") for m in business["metrics"]: lines.append(f"**{m.get('name', 'Unknown')}**: {m.get('definition', '')}") + if m.get("database"): + lines.append(f" - Database: `{m['database']}`") if m.get("table"): lines.append(f" - Table: `{m['table']}`") if m.get("calculation"): diff --git a/dash/context/semantic_model.py b/dash/context/semantic_model.py index ab7b41b..c16f216 100644 --- a/dash/context/semantic_model.py +++ b/dash/context/semantic_model.py @@ -30,6 +30,7 @@ def load_table_metadata(tables_dir: Path | None = None) -> list[dict[str, Any]]: "description": table.get("table_description", ""), "use_cases": table.get("use_cases", []), "data_quality_notes": table.get("data_quality_notes", [])[:MAX_QUALITY_NOTES], + "database": table.get("database"), } ) except (json.JSONDecodeError, KeyError, OSError) as e: @@ -45,9 +46,32 @@ def build_semantic_model(tables_dir: Path | None = None) -> dict[str, Any]: def format_semantic_model(model: dict[str, Any]) -> str: """Format semantic model for system prompt.""" + tables = model.get("tables", []) + if not tables: + return "" + + grouped: dict[str | None, list[dict[str, Any]]] = {} + for table in tables: + grouped.setdefault(table.get("database"), []).append(table) + lines: list[str] = [] - for table in model.get("tables", []): + for database in sorted(key for key in grouped if key is not None): + lines.append(f"### Database: **{database}**") + lines.append("") + for table in grouped[database]: + lines.append(f"#### {table['table_name']}") + if table.get("description"): + lines.append(table["description"]) + if table.get("use_cases"): + lines.append(f"**Use cases:** {', '.join(table['use_cases'])}") + if table.get("data_quality_notes"): + lines.append("**Data quality:**") + for note in table["data_quality_notes"]: + lines.append(f" - {note}") + lines.append("") + + for table in grouped.get(None, []): lines.append(f"### {table['table_name']}") if table.get("description"): lines.append(table["description"]) diff --git a/dash/knowledge/business/metrics.json b/dash/knowledge/business/metrics.json index e761c05..6cb6389 100644 --- a/dash/knowledge/business/metrics.json +++ b/dash/knowledge/business/metrics.json @@ -1,48 +1,56 @@ { "metrics": [ { + "database": "main", "name": "Race Win", "definition": "A driver finishing in first position in a race", "table": "race_wins", "calculation": "COUNT(*) from race_wins grouped by driver name" }, { + "database": "main", "name": "World Championship", "definition": "A driver finishing the season in first position in the drivers championship", "table": "drivers_championship", "calculation": "COUNT(*) from drivers_championship WHERE position = '1' (TEXT comparison)" }, { + "database": "main", "name": "Constructors Championship", "definition": "A team finishing the season in first position in the constructors championship", "table": "constructors_championship", "calculation": "COUNT(*) from constructors_championship WHERE position = 1 (INTEGER comparison)" }, { + "database": "main", "name": "Podium Finish", "definition": "A driver finishing in positions 1, 2, or 3 in a race", "table": "race_results", "calculation": "COUNT(*) from race_results WHERE position IN ('1', '2', '3')" }, { + "database": "main", "name": "Fastest Lap", "definition": "Recording the fastest lap time during a race", "table": "fastest_laps", "calculation": "COUNT(*) from fastest_laps grouped by driver name" }, { + "database": "main", "name": "DNF (Did Not Finish)", "definition": "A driver who retired from a race before completion", "table": "race_results", "calculation": "COUNT(*) from race_results WHERE position = 'Ret'" }, { + "database": "main", "name": "Points Finish", "definition": "A driver finishing in a points-scoring position", "table": "race_results", "calculation": "COUNT(*) from race_results WHERE points > 0" }, { + "database": "main", "name": "Championship Points", "definition": "Total points accumulated over a season", "table": "drivers_championship or constructors_championship", diff --git a/dash/knowledge/tables/constructors_championship.json b/dash/knowledge/tables/constructors_championship.json index ee6e0c9..b4b77ba 100644 --- a/dash/knowledge/tables/constructors_championship.json +++ b/dash/knowledge/tables/constructors_championship.json @@ -1,4 +1,5 @@ { + "database": "main", "table_name": "constructors_championship", "table_description": "Contains data for the constructor's championship from 1958 to 2020, capturing championship positions from when it was introduced.", "use_cases": [ diff --git a/dash/knowledge/tables/drivers_championship.json b/dash/knowledge/tables/drivers_championship.json index ffd3b9c..04f4604 100644 --- a/dash/knowledge/tables/drivers_championship.json +++ b/dash/knowledge/tables/drivers_championship.json @@ -1,4 +1,5 @@ { + "database": "main", "table_name": "drivers_championship", "table_description": "Contains data for driver's championship standings from 1950-2020, detailing driver positions, teams, and points.", "use_cases": [ diff --git a/dash/knowledge/tables/fastest_laps.json b/dash/knowledge/tables/fastest_laps.json index 3410021..51f0362 100644 --- a/dash/knowledge/tables/fastest_laps.json +++ b/dash/knowledge/tables/fastest_laps.json @@ -1,4 +1,5 @@ { + "database": "main", "table_name": "fastest_laps", "table_description": "Contains data for the fastest laps recorded in races from 1950-2020, including driver and team details.", "use_cases": [ diff --git a/dash/knowledge/tables/race_results.json b/dash/knowledge/tables/race_results.json index 9deca26..50746e7 100644 --- a/dash/knowledge/tables/race_results.json +++ b/dash/knowledge/tables/race_results.json @@ -1,4 +1,5 @@ { + "database": "main", "table_name": "race_results", "table_description": "Holds comprehensive race data for each Formula 1 race from 1950-2020, including positions, drivers, teams, and points.", "use_cases": [ diff --git a/dash/knowledge/tables/race_wins.json b/dash/knowledge/tables/race_wins.json index 5ff901a..22ca931 100644 --- a/dash/knowledge/tables/race_wins.json +++ b/dash/knowledge/tables/race_wins.json @@ -1,4 +1,5 @@ { + "database": "main", "table_name": "race_wins", "table_description": "Documents race win data from 1950-2020, detailing venue, winner, team, and race duration.", "use_cases": [ diff --git a/dash/scripts/load_data.py b/dash/scripts/load_data.py index 041db46..3e282fa 100644 --- a/dash/scripts/load_data.py +++ b/dash/scripts/load_data.py @@ -1,16 +1,19 @@ """ -Load F1 Data - Downloads F1 data (1950-2020) and loads into PostgreSQL. +Load F1 Data - Downloads F1 data (1950-2020) and loads into an analytics database. -Usage: python -m dash.scripts.load_data +Usage: + python -m dash.scripts.load_data + python -m dash.scripts.load_data --database main """ +import argparse from io import StringIO import httpx import pandas as pd from sqlalchemy import create_engine -from db import db_url +from db import get_analytics_registry S3_URI = "https://agno-public.s3.amazonaws.com/f1" @@ -23,7 +26,43 @@ } if __name__ == "__main__": - engine = create_engine(db_url) + parser = argparse.ArgumentParser( + description="Load F1 sample data into an analytics database" + ) + parser.add_argument( + "--database", + type=str, + default=None, + help=( + "Logical analytics DB name. Required when multiple analytics databases " + "are configured." + ), + ) + args = parser.parse_args() + + registry = get_analytics_registry() + is_single_db = len(registry) == 1 + + if is_single_db: + db_name = next(iter(registry)) + target_url = registry[db_name] + print(f"Target database: {db_name} (single database mode)\n") + else: + if not args.database: + print("Error: Multiple analytics databases configured.") + print("Pass --database with one of: " + ", ".join(sorted(registry))) + raise SystemExit(1) + + db_name = args.database.lower() + if db_name not in registry: + print(f"Error: Unknown database '{args.database}'.") + print("Available: " + ", ".join(sorted(registry))) + raise SystemExit(1) + + target_url = registry[db_name] + print(f"Target database: {db_name}\n") + + engine = create_engine(target_url) total = 0 for table, url in TABLES.items(): diff --git a/dash/tools/__init__.py b/dash/tools/__init__.py index 670cebd..c0c0ac4 100644 --- a/dash/tools/__init__.py +++ b/dash/tools/__init__.py @@ -2,8 +2,10 @@ from dash.tools.introspect import create_introspect_schema_tool from dash.tools.save_query import create_save_validated_query_tool +from dash.tools.sql import create_analytics_sql_tools __all__ = [ + "create_analytics_sql_tools", "create_introspect_schema_tool", "create_save_validated_query_tool", ] diff --git a/dash/tools/introspect.py b/dash/tools/introspect.py index 00cea46..10d745b 100644 --- a/dash/tools/introspect.py +++ b/dash/tools/introspect.py @@ -3,18 +3,46 @@ from agno.tools import tool from agno.utils.log import logger from sqlalchemy import create_engine, inspect, text +from sqlalchemy.engine import Engine from sqlalchemy.exc import DatabaseError, OperationalError -def create_introspect_schema_tool(db_url: str): - """Create introspect_schema tool with database connection.""" - engine = create_engine(db_url) +def create_introspect_schema_tool(registry: dict[str, str]): + """Create introspect_schema tool routed across analytics databases.""" + engines = {name: create_engine(url) for name, url in registry.items()} + db_names = sorted(engines) + is_single_db = len(engines) == 1 + default_db_name = db_names[0] if is_single_db else None + + def _resolve(database: str | None) -> tuple[str, Engine]: + if is_single_db and default_db_name is not None: + return default_db_name, engines[default_db_name] + + if not database: + available = ", ".join(db_names) + raise ValueError( + "Multiple analytics databases configured. " + f"Pass `database`. Available: {available}" + ) + + name = database.strip().lower() + if not name: + available = ", ".join(db_names) + raise ValueError( + "Multiple analytics databases configured. " + f"Pass `database`. Available: {available}" + ) + if name not in engines: + available = ", ".join(db_names) + raise ValueError(f"Unknown database '{name}'. Available: {available}") + return name, engines[name] @tool def introspect_schema( table_name: str | None = None, include_sample_data: bool = False, sample_limit: int = 5, + database: str | None = None, ) -> str: """Inspect database schema at runtime. @@ -22,17 +50,25 @@ def introspect_schema( table_name: Table to inspect. If None, lists all tables. include_sample_data: Include sample rows. sample_limit: Number of sample rows. + database: Logical database name. Optional in single-DB mode. """ try: + db_name, engine = _resolve(database) + include_db_in_output = not is_single_db insp = inspect(engine) if table_name is None: - # List all tables tables = insp.get_table_names() if not tables: + if include_db_in_output: + return f"No tables found in '{db_name}'." return "No tables found." - lines = ["## Tables", ""] + if include_db_in_output: + lines = [f"## Tables ({db_name})", ""] + else: + lines = ["## Tables", ""] + for t in sorted(tables): try: with engine.connect() as conn: @@ -42,14 +78,21 @@ def introspect_schema( lines.append(f"- **{t}**") return "\n".join(lines) - # Inspect specific table tables = insp.get_table_names() if table_name not in tables: - return f"Table '{table_name}' not found. Available: {', '.join(sorted(tables))}" + available = ", ".join(sorted(tables)) + if include_db_in_output: + return ( + f"Table '{table_name}' not found in '{db_name}'. " + f"Available: {available}" + ) + return f"Table '{table_name}' not found. Available: {available}" - lines = [f"## {table_name}", ""] + if include_db_in_output: + lines = [f"## {table_name} ({db_name})", ""] + else: + lines = [f"## {table_name}", ""] - # Columns cols = insp.get_columns(table_name) if cols: lines.extend(["### Columns", "", "| Column | Type | Nullable |", "| --- | --- | --- |"]) @@ -58,13 +101,11 @@ def introspect_schema( lines.append(f"| {c['name']} | {c['type']} | {nullable} |") lines.append("") - # Primary key pk = insp.get_pk_constraint(table_name) if pk and pk.get("constrained_columns"): lines.append(f"**Primary Key:** {', '.join(pk['constrained_columns'])}") lines.append("") - # Sample data if include_sample_data: lines.append("### Sample") try: @@ -85,6 +126,8 @@ def introspect_schema( return "\n".join(lines) + except ValueError as e: + return str(e) except OperationalError as e: logger.error(f"Database connection failed: {e}") return f"Error: Database connection failed - {e}" diff --git a/dash/tools/sql.py b/dash/tools/sql.py new file mode 100644 index 0000000..995d05d --- /dev/null +++ b/dash/tools/sql.py @@ -0,0 +1,69 @@ +"""Analytics SQL tools with logical database routing.""" + +from agno.tools import tool +from agno.tools.sql import SQLTools + + +def create_analytics_sql_tools(registry: dict[str, str]) -> list: + """Create routed SQL tools for one or more analytics databases.""" + sql_tools_by_db = {name: SQLTools(db_url=url) for name, url in registry.items()} + db_names = sorted(sql_tools_by_db) + is_single_db = len(sql_tools_by_db) == 1 + default_db_name = db_names[0] if is_single_db else None + + def _resolve_database_name(database: str | None) -> str: + if is_single_db and default_db_name is not None: + return default_db_name + + if not database: + available = ", ".join(db_names) + raise ValueError( + "Multiple analytics databases configured. " + f"Pass `database`. Available: {available}" + ) + + name = database.strip().lower() + if not name: + available = ", ".join(db_names) + raise ValueError( + "Multiple analytics databases configured. " + f"Pass `database`. Available: {available}" + ) + if name not in sql_tools_by_db: + available = ", ".join(db_names) + raise ValueError(f"Unknown database '{name}'. Available: {available}") + return name + + @tool + def list_tables(database: str | None = None) -> str: + """List all tables in an analytics database. + + Args: + database: Logical database name. Optional in single-DB mode. + """ + target_db = _resolve_database_name(database) + return sql_tools_by_db[target_db].list_tables() + + @tool + def describe_table(table_name: str, database: str | None = None) -> str: + """Describe a table in an analytics database. + + Args: + table_name: Name of table to describe. + database: Logical database name. Optional in single-DB mode. + """ + target_db = _resolve_database_name(database) + return sql_tools_by_db[target_db].describe_table(table_name) + + @tool + def run_sql_query(query: str, database: str | None = None) -> str: + """Run a SQL query in an analytics database. + + Args: + query: SQL query to execute. + database: Logical database name. Optional in single-DB mode. + """ + target_db = _resolve_database_name(database) + return sql_tools_by_db[target_db].run_sql_query(query) + + return [list_tables, describe_table, run_sql_query] diff --git a/db/__init__.py b/db/__init__.py index c016278..5e9496b 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -5,10 +5,20 @@ Database connection utilities. """ +from db.config import ( + get_analytics_descriptions, + get_analytics_registry, + get_internal_db_url, + has_explicit_analytics_dbs, +) from db.session import get_postgres_db from db.url import db_url __all__ = [ "db_url", + "get_analytics_descriptions", + "get_analytics_registry", + "get_internal_db_url", "get_postgres_db", + "has_explicit_analytics_dbs", ] diff --git a/db/config.py b/db/config.py new file mode 100644 index 0000000..47fdc4d --- /dev/null +++ b/db/config.py @@ -0,0 +1,72 @@ +"""Database configuration: internal DB and optional analytics registries.""" + +import os + +from db.url import db_url as internal_db_url + +ANALYTICS_PREFIX = "ANALYTICS_DB_" +ANALYTICS_DESC_SUFFIX = "_DESC" +IMPLICIT_DEFAULT_NAME = "default" + + +def _normalize_name(name: str) -> str: + return name.strip().lower() + + +def _iter_explicit_analytics_entries() -> list[tuple[str, str]]: + """Return explicit ANALYTICS_DB_= entries from env vars.""" + entries: list[tuple[str, str]] = [] + for key, value in os.environ.items(): + if not key.startswith(ANALYTICS_PREFIX) or not value: + continue + if key.endswith(ANALYTICS_DESC_SUFFIX): + continue + + raw_name = key[len(ANALYTICS_PREFIX) :] + name = _normalize_name(raw_name) + if not name: + continue + entries.append((name, value)) + return entries + + +def get_internal_db_url() -> str: + """Internal DB URL used by Dash for knowledge/learnings/AgentOS.""" + return internal_db_url + + +def has_explicit_analytics_dbs() -> bool: + """True when at least one ANALYTICS_DB_ URL is configured.""" + return bool(_iter_explicit_analytics_entries()) + + +def get_analytics_registry() -> dict[str, str]: + """Return analytics DB registry by logical name. + + Behavior: + - If explicit ANALYTICS_DB_ variables exist, return those. + - If none exist, fall back to a single implicit entry to internal DB. + """ + registry = dict(_iter_explicit_analytics_entries()) + if registry: + return registry + return {IMPLICIT_DEFAULT_NAME: internal_db_url} + + +def get_analytics_descriptions() -> dict[str, str]: + """Return optional ANALYTICS_DB__DESC descriptions by name.""" + descriptions: dict[str, str] = {} + for key, value in os.environ.items(): + if ( + not key.startswith(ANALYTICS_PREFIX) + or not key.endswith(ANALYTICS_DESC_SUFFIX) + or not value + ): + continue + + raw_name = key[len(ANALYTICS_PREFIX) : -len(ANALYTICS_DESC_SUFFIX)] + name = _normalize_name(raw_name) + if not name: + continue + descriptions[name] = value + return descriptions diff --git a/example.env b/example.env index bd84ef8..81b1ee8 100644 --- a/example.env +++ b/example.env @@ -8,7 +8,12 @@ OPENAI_API_KEY=sk-*** # Exa API key for web research # EXA_API_KEY=*** -# Database (defaults work out of the box) +# Internal Dash database (knowledge, learnings, AgentOS). Defaults work out of the box. # DB_USER=ai # DB_PASS=ai -# DB_DATABASE=ai \ No newline at end of file +# DB_DATABASE=ai + +# Optional analytics database(s), read-only. +# If ANALYTICS_DB_* is not set, Dash falls back to the internal DB as a single analytics source. +# ANALYTICS_DB_MAIN=postgresql+psycopg://user:pass@host:5432/dbname +# ANALYTICS_DB_MAIN_DESC=F1 sample data