Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/databao_context_engine/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
init_dce_project,
install_ollama_if_needed,
)
from databao_context_engine.cli.datasources import add_datasource_config_cli, check_datasource_connection_cli
from databao_context_engine.cli.datasources import (
add_datasource_config_cli,
check_datasource_connection_cli,
run_sql_query_cli,
)
from databao_context_engine.cli.info import echo_info
from databao_context_engine.config.logging import configure_logging
from databao_context_engine.mcp.mcp_runner import McpTransport, run_mcp_server
Expand Down Expand Up @@ -128,6 +132,21 @@ def check_datasource_config(ctx: Context, datasources_config_files: list[str] |
check_datasource_connection_cli(ctx.obj["project_dir"], datasource_ids=datasource_ids)


@datasource.command(name="run_sql")
@click.argument(
"datasource-config-file",
type=click.STRING,
)
@click.argument(
"sql",
type=click.STRING,
)
@click.pass_context
def run_sql_query(ctx: Context, datasource_config_file: str, sql: str) -> None:
datasource_id = DatasourceId.from_string_repr(datasource_config_file)
run_sql_query_cli(ctx.obj["project_dir"], datasource_id=datasource_id, sql=sql)


@dce.command()
@click.option(
"-m",
Expand Down
13 changes: 13 additions & 0 deletions src/databao_context_engine/cli/datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from databao_context_engine import (
CheckDatasourceConnectionResult,
DatabaoContextEngine,
DatabaoContextProjectManager,
DatasourceConnectionStatus,
DatasourceId,
Expand Down Expand Up @@ -62,3 +63,15 @@ def _print_check_datasource_connection_results(results: list[CheckDatasourceConn
)
else:
click.echo("No datasource found")


def run_sql_query_cli(project_dir: Path, *, datasource_id: DatasourceId, sql: str) -> None:
databao_engine = DatabaoContextEngine(project_dir=project_dir)
result = databao_engine.run_sql(datasource_id=datasource_id, sql=sql, params=None)

# save somewhere or pretty print
click.echo(f"Found {len(result.rows)} rows for query: {sql}")
for row in result.rows:
click.echo(row)

click.echo(f"Columns are: {result.columns}")
23 changes: 19 additions & 4 deletions src/databao_context_engine/databao_context_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
get_datasource_context,
get_introspected_datasource_list,
)
from databao_context_engine.datasources.execute_sql_query import run_sql
from databao_context_engine.datasources.types import Datasource, DatasourceId
from databao_context_engine.pluginlib.build_plugin import DatasourceType
from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult
from databao_context_engine.project.layout import ProjectLayout, ensure_project_dir
from databao_context_engine.retrieve_embeddings import retrieve_embeddings

Expand Down Expand Up @@ -98,7 +100,7 @@ def search_context(
limit: int | None = None,
datasource_ids: list[DatasourceId] | None = None,
) -> list[ContextSearchResult]:
"""Search in the avaialable context for the closest matches to the given text.
"""Search in the available context for the closest matches to the given text.

Args:
retrieve_text: The text to search for in the contexts.
Expand All @@ -122,6 +124,19 @@ def search_context(
for result in results
]

def run_sql(self, datasource_id: DatasourceId, sql: str, params: list[str]) -> dict[str, Any]:
"""Not Implemented yet. This will allow to run a SQL query against a datasource (if the datasource supports it)."""
raise NotImplementedError("Running SQL is not supported yet")
def run_sql(
self,
datasource_id: DatasourceId,
sql: str,
params: list[Any] | None = None,
read_only: bool = True,
) -> SqlExecutionResult:
"""Execute a SQL query against a datasource if it supports it.

- Optional per plugin: raises NotSupportedError for datasources that don’t support SQL.
- Read-only by default: set read_only=False to permit mutating statements.

Returns:
Sql execution result containing columns and rows.
"""
return run_sql(self._project_layout, datasource_id, sql, params, read_only)
43 changes: 43 additions & 0 deletions src/databao_context_engine/datasources/execute_sql_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any

from databao_context_engine.datasources.datasource_discovery import get_datasource_descriptors, prepare_source
from databao_context_engine.datasources.sql_read_only import is_read_only_sql
from databao_context_engine.datasources.types import DatasourceId, PreparedConfig
from databao_context_engine.plugin_loader import DatabaoContextPluginLoader
from databao_context_engine.pluginlib.build_plugin import NotSupportedError, SqlRunnablePlugin
from databao_context_engine.pluginlib.plugin_utils import execute_sql_for_datasource
from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult
from databao_context_engine.project.layout import ProjectLayout, logger


def run_sql(
project_layout: ProjectLayout,
datasource_id: DatasourceId,
sql: str,
params: list[Any] | None = None,
read_only: bool = True,
) -> SqlExecutionResult:
if read_only and not is_read_only_sql(sql):
# we could use SqlReadOnlyDecision in the future
raise PermissionError("SQL execution is only supported for read-only queries")

logger.info(f"Running SQL query against datasource {datasource_id}: {sql}")
datasource_descriptor = get_datasource_descriptors(project_layout, [datasource_id])[0]

prepared = prepare_source(datasource_descriptor)
if not isinstance(prepared, PreparedConfig):
raise NotSupportedError("SQL execution is only supported for config-backed datasources")

loader = DatabaoContextPluginLoader()
plugin = loader.get_plugin_for_datasource_type(prepared.datasource_type)
if not isinstance(plugin, SqlRunnablePlugin):
raise NotSupportedError("Plugin doesn't support SQL execution")

return execute_sql_for_datasource(
plugin=plugin,
datasource_type=prepared.datasource_type,
config=prepared.config,
sql=sql,
params=params,
read_only=read_only,
)
23 changes: 23 additions & 0 deletions src/databao_context_engine/datasources/sql_read_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from enum import Enum


class SqlClass(Enum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming sounds unclear. What is meant here? DatabaseAccess? DatabaseRole?

READ_ONLY = "read_only"
WRITE = "write"
UNKNOWN = "unknown"


@dataclass
class SqlReadOnlyDecision:
classification: SqlClass
reason: str | None = None


def classify_sql(sql: str) -> SqlReadOnlyDecision:
return SqlReadOnlyDecision(classification=SqlClass.READ_ONLY)


def is_read_only_sql(sql: str) -> bool:
# todo add sql parsing and allowlist of tokens here
return True
22 changes: 22 additions & 0 deletions src/databao_context_engine/pluginlib/build_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from io import BufferedReader
from typing import Any, Protocol, runtime_checkable

from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult


@dataclass
class EmbeddableChunk:
Expand Down Expand Up @@ -88,3 +90,23 @@ class DatasourceType:
"""

full_type: str


@runtime_checkable
class SqlRunnablePlugin[T](BuildDatasourcePlugin[T], Protocol):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feel like it should be its own independent protocol rather than extending BuildDatasourcePlugin

And plugins would keep on implementing BuildDatasourcePlugin but they would also implement SqlRunnablePlugin on top

I'm not fully sure how well it would work with the generics since we would want it to be the same for both the BuildDatasourcePlugin and the SqlRunnablePlugin 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it would be easier for the run_sql to always be part of BuildDatasourcePlugin but with a default implementation throwing an exception. And instead of casting, we're always calling the method and catching the NotSupported exception

Copy link
Collaborator Author

@annav1asova annav1asova Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried both those ways before and imo the implementation was less clear (there were issues with generics in the first case). but maybe it's worth trying somehow again

def run_sql(
self,
file_config: T,
sql: str,
params: list[Any] | None = None,
read_only: bool = True,
) -> SqlExecutionResult:
"""Execute SQL against the datasource represented by `file_config`.

Implementations should honor `read_only=True` by default and refuse mutating statements
unless explicitly allowed.

Raises:
NotSupportedError: If the plugin doesn't support this method.
"""
raise NotSupportedError("This method is not implemented for this plugin")
29 changes: 28 additions & 1 deletion src/databao_context_engine/pluginlib/plugin_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@

from pydantic import TypeAdapter

from databao_context_engine.pluginlib.build_plugin import BuildDatasourcePlugin, BuildFilePlugin, DatasourceType
from databao_context_engine.pluginlib.build_plugin import (
BuildDatasourcePlugin,
BuildFilePlugin,
DatasourceType,
SqlRunnablePlugin,
)
from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,3 +72,24 @@ def generate_json_schema(plugin: BuildDatasourcePlugin, pretty_print: bool = Tru

def format_json_schema_for_output(plugin: BuildDatasourcePlugin, json_schema: str) -> str:
return os.linesep.join([f"JSON Schema for plugin {plugin.id}:", json_schema])


def execute_sql_for_datasource(
plugin: BuildDatasourcePlugin,
datasource_type: DatasourceType,
config: Mapping[str, Any],
sql: str,
params: list[Any] | None = None,
read_only: bool = True,
) -> SqlExecutionResult:
if not isinstance(plugin, SqlRunnablePlugin):
raise ValueError("Sql query execution can only be performed on SqlRunnablePlugin")

validated_config = _validate_datasource_config_file(config, plugin)

return plugin.run_sql(
file_config=validated_config,
sql=sql,
params=params,
read_only=read_only,
)
Empty file.
8 changes: 8 additions & 0 deletions src/databao_context_engine/pluginlib/sql/sql_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass
from typing import Any


@dataclass
class SqlExecutionResult:
columns: list[str]
rows: list[tuple[Any, ...]]
11 changes: 11 additions & 0 deletions src/databao_context_engine/plugins/databases/base_db_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EmbeddableChunk,
)
from databao_context_engine.pluginlib.config import ConfigPropertyAnnotation
from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult
from databao_context_engine.plugins.databases.base_introspector import BaseIntrospector
from databao_context_engine.plugins.databases.database_chunker import build_database_chunks
from databao_context_engine.plugins.databases.introspection_scope import IntrospectionScope
Expand Down Expand Up @@ -44,3 +45,13 @@ def check_connection(self, full_type: str, datasource_name: str, file_config: T)

def divide_context_into_chunks(self, context: Any) -> list[EmbeddableChunk]:
return build_database_chunks(context)

def run_sql(
self, file_config: T, sql: str, params: list[Any] | None = None, read_only: bool = True
) -> SqlExecutionResult:
return self._introspector.run_sql(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This method doesn't really belong in the introspector

I'm not sure it's worth it to take it out though: you'd need to refactor a bit to have a class executing SQL (including our SQL introspection queries) that is used by the Introspector

file_config=file_config,
sql=sql,
params=params,
read_only=read_only,
)
19 changes: 19 additions & 0 deletions src/databao_context_engine/plugins/databases/base_introspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from typing import Any, Mapping, Protocol, Sequence, Union

from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult
from databao_context_engine.plugins.databases.databases_types import (
DatabaseCatalog,
DatabaseIntrospectionResult,
Expand Down Expand Up @@ -133,6 +134,24 @@ def _resolve_pseudo_catalog_name(self, file_config: T) -> str:
def _ignored_schemas(self) -> set[str]:
return self._IGNORED_SCHEMAS

def run_sql(
self,
file_config: T,
sql: str,
params: list[Any] | None,
read_only: bool,
) -> SqlExecutionResult:
# for now, we don't have any read-only related logic implemented on the database side
with self._connect(file_config) as connection:
rows_dicts: list[dict] = self._fetchall_dicts(connection, sql, params)

if not rows_dicts:
return SqlExecutionResult(columns=[], rows=[])

columns: list[str] = list(rows_dicts[0].keys())
rows: list[tuple[Any, ...]] = [tuple(row.get(col) for col in columns) for row in rows_dicts]
return SqlExecutionResult(columns=columns, rows=rows)


@dataclass
class SQLQuery:
Expand Down
Empty file added tests/datasources/__init__.py
Empty file.
Loading