From 90c94655aa7bc4ac744d9b212b20158bc6b80cee Mon Sep 17 00:00:00 2001 From: Anna Vlasova Date: Wed, 4 Feb 2026 17:20:56 +0100 Subject: [PATCH 1/3] Sql query: initial commit (no sql params and readonly logic added yet) --- src/databao_context_engine/cli/commands.py | 21 ++- src/databao_context_engine/cli/datasources.py | 13 ++ .../databao_context_engine.py | 23 +++- .../datasources/execute_sql_query.py | 43 ++++++ .../datasources/sql_read_only.py | 23 ++++ .../pluginlib/build_plugin.py | 22 ++++ .../pluginlib/plugin_utils.py | 29 +++- .../pluginlib/sql/__init__.py | 0 .../pluginlib/sql/sql_types.py | 8 ++ .../plugins/databases/base_db_plugin.py | 11 ++ .../plugins/databases/base_introspector.py | 19 +++ tests/datasources/__init__.py | 0 tests/datasources/test_engine_run_sql.py | 124 ++++++++++++++++++ 13 files changed, 330 insertions(+), 6 deletions(-) create mode 100644 src/databao_context_engine/datasources/execute_sql_query.py create mode 100644 src/databao_context_engine/datasources/sql_read_only.py create mode 100644 src/databao_context_engine/pluginlib/sql/__init__.py create mode 100644 src/databao_context_engine/pluginlib/sql/sql_types.py create mode 100644 tests/datasources/__init__.py create mode 100644 tests/datasources/test_engine_run_sql.py diff --git a/src/databao_context_engine/cli/commands.py b/src/databao_context_engine/cli/commands.py index d0e60b80..80f11502 100644 --- a/src/databao_context_engine/cli/commands.py +++ b/src/databao_context_engine/cli/commands.py @@ -16,7 +16,11 @@ init_dce_project, install_ollama_if_needed, ) -from databao_context_engine.cli.datasources import add_datasource_config_cli, check_datasource_connection_cli +from databao_context_engine.cli.datasources import ( + add_datasource_config_cli, + check_datasource_connection_cli, + run_sql_query_cli, +) from databao_context_engine.cli.info import echo_info from databao_context_engine.config.logging import configure_logging from databao_context_engine.mcp.mcp_runner import McpTransport, run_mcp_server @@ -128,6 +132,21 @@ def check_datasource_config(ctx: Context, datasources_config_files: list[str] | check_datasource_connection_cli(ctx.obj["project_dir"], datasource_ids=datasource_ids) +@datasource.command(name="run_sql") +@click.argument( + "datasource-config-file", + type=click.STRING, +) +@click.argument( + "sql", + type=click.STRING, +) +@click.pass_context +def run_sql_query(ctx: Context, datasource_config_file: str, sql: str) -> None: + datasource_id = DatasourceId.from_string_repr(datasource_config_file) + run_sql_query_cli(ctx.obj["project_dir"], datasource_id=datasource_id, sql=sql) + + @dce.command() @click.option( "-m", diff --git a/src/databao_context_engine/cli/datasources.py b/src/databao_context_engine/cli/datasources.py index 3fde9ae5..854fd496 100644 --- a/src/databao_context_engine/cli/datasources.py +++ b/src/databao_context_engine/cli/datasources.py @@ -62,3 +62,16 @@ def _print_check_datasource_connection_results(results: list[CheckDatasourceConn ) else: click.echo("No datasource found") + + +def run_sql_query_cli(project_dir: Path, *, datasource_id: DatasourceId, sql: str) -> None: + databao_project_manager = DatabaoContextProjectManager(project_dir=project_dir) + databao_engine = databao_project_manager.get_engine_for_project() + result = databao_engine.run_sql(datasource_id=datasource_id, sql=sql, params=None) + + # save somewhere or pretty print + click.echo(f"Found {len(result.rows)} rows for query: {sql}") + for row in result.rows: + click.echo(row) + + click.echo(f"Columns are: {result.columns}") diff --git a/src/databao_context_engine/databao_context_engine.py b/src/databao_context_engine/databao_context_engine.py index 01499326..53ecf811 100644 --- a/src/databao_context_engine/databao_context_engine.py +++ b/src/databao_context_engine/databao_context_engine.py @@ -10,8 +10,10 @@ get_datasource_context, get_introspected_datasource_list, ) +from databao_context_engine.datasources.execute_sql_query import run_sql from databao_context_engine.datasources.types import Datasource, DatasourceId from databao_context_engine.pluginlib.build_plugin import DatasourceType +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult from databao_context_engine.project.layout import ProjectLayout, ensure_project_dir from databao_context_engine.retrieve_embeddings import retrieve_embeddings @@ -98,7 +100,7 @@ def search_context( limit: int | None = None, datasource_ids: list[DatasourceId] | None = None, ) -> list[ContextSearchResult]: - """Search in the avaialable context for the closest matches to the given text. + """Search in the available context for the closest matches to the given text. Args: retrieve_text: The text to search for in the contexts. @@ -122,6 +124,19 @@ def search_context( for result in results ] - def run_sql(self, datasource_id: DatasourceId, sql: str, params: list[str]) -> dict[str, Any]: - """Not Implemented yet. This will allow to run a SQL query against a datasource (if the datasource supports it).""" - raise NotImplementedError("Running SQL is not supported yet") + def run_sql( + self, + datasource_id: DatasourceId, + sql: str, + params: list[Any] | None = None, + read_only: bool = True, + ) -> SqlExecutionResult: + """Execute a SQL query against a datasource if it supports it. + + - Optional per plugin: raises NotSupportedError for datasources that don’t support SQL. + - Read-only by default: set read_only=False to permit mutating statements. + + Returns: + Sql execution result containing columns and rows. + """ + return run_sql(self._project_layout, datasource_id, sql, params, read_only) diff --git a/src/databao_context_engine/datasources/execute_sql_query.py b/src/databao_context_engine/datasources/execute_sql_query.py new file mode 100644 index 00000000..cd13ba0b --- /dev/null +++ b/src/databao_context_engine/datasources/execute_sql_query.py @@ -0,0 +1,43 @@ +from typing import Any + +from databao_context_engine.datasources.datasource_discovery import get_datasource_descriptors, prepare_source +from databao_context_engine.datasources.sql_read_only import is_read_only_sql +from databao_context_engine.datasources.types import DatasourceId, PreparedConfig +from databao_context_engine.plugin_loader import DatabaoContextPluginLoader +from databao_context_engine.pluginlib.build_plugin import NotSupportedError, SqlRunnablePlugin +from databao_context_engine.pluginlib.plugin_utils import execute_sql_for_datasource +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult +from databao_context_engine.project.layout import ProjectLayout, logger + + +def run_sql( + project_layout: ProjectLayout, + datasource_id: DatasourceId, + sql: str, + params: list[Any] | None = None, + read_only: bool = True, +) -> SqlExecutionResult: + if read_only and not is_read_only_sql(sql): + # we could use SqlReadOnlyDecision in the future + raise PermissionError("SQL execution is only supported for read-only queries") + + logger.info(f"Running SQL query against datasource {datasource_id}: {sql}") + datasource_descriptor = get_datasource_descriptors(project_layout, [datasource_id])[0] + + prepared = prepare_source(datasource_descriptor) + if not isinstance(prepared, PreparedConfig): + raise NotSupportedError("SQL execution is only supported for config-backed datasources") + + loader = DatabaoContextPluginLoader() + plugin = loader.get_plugin_for_datasource_type(prepared.datasource_type) + if not isinstance(plugin, SqlRunnablePlugin): + raise NotSupportedError("Plugin doesn't support SQL execution") + + return execute_sql_for_datasource( + plugin=plugin, + datasource_type=prepared.datasource_type, + config=prepared.config, + sql=sql, + params=params, + read_only=read_only, + ) diff --git a/src/databao_context_engine/datasources/sql_read_only.py b/src/databao_context_engine/datasources/sql_read_only.py new file mode 100644 index 00000000..bc90e52f --- /dev/null +++ b/src/databao_context_engine/datasources/sql_read_only.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from enum import Enum + + +class SqlClass(Enum): + READ_ONLY = "read_only" + WRITE = "write" + UNKNOWN = "unknown" + + +@dataclass +class SqlReadOnlyDecision: + classification: SqlClass + reason: str | None = None + + +def classify_sql(sql: str) -> SqlReadOnlyDecision: + return SqlReadOnlyDecision(classification=SqlClass.READ_ONLY) + + +def is_read_only_sql(sql: str) -> bool: + # todo add sql parsing and allowlist of tokens here + return True diff --git a/src/databao_context_engine/pluginlib/build_plugin.py b/src/databao_context_engine/pluginlib/build_plugin.py index b1139b86..4be9f202 100644 --- a/src/databao_context_engine/pluginlib/build_plugin.py +++ b/src/databao_context_engine/pluginlib/build_plugin.py @@ -2,6 +2,8 @@ from io import BufferedReader from typing import Any, Protocol, runtime_checkable +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult + @dataclass class EmbeddableChunk: @@ -88,3 +90,23 @@ class DatasourceType: """ full_type: str + + +@runtime_checkable +class SqlRunnablePlugin[T](BuildDatasourcePlugin[T], Protocol): + def run_sql( + self, + file_config: T, + sql: str, + params: list[Any] | None = None, + read_only: bool = True, + ) -> SqlExecutionResult: + """Execute SQL against the datasource represented by `file_config`. + + Implementations should honor `read_only=True` by default and refuse mutating statements + unless explicitly allowed. + + Raises: + NotSupportedError: If the plugin doesn't support this method. + """ + raise NotSupportedError("This method is not implemented for this plugin") diff --git a/src/databao_context_engine/pluginlib/plugin_utils.py b/src/databao_context_engine/pluginlib/plugin_utils.py index 27205f91..17883434 100644 --- a/src/databao_context_engine/pluginlib/plugin_utils.py +++ b/src/databao_context_engine/pluginlib/plugin_utils.py @@ -6,7 +6,13 @@ from pydantic import TypeAdapter -from databao_context_engine.pluginlib.build_plugin import BuildDatasourcePlugin, BuildFilePlugin, DatasourceType +from databao_context_engine.pluginlib.build_plugin import ( + BuildDatasourcePlugin, + BuildFilePlugin, + DatasourceType, + SqlRunnablePlugin, +) +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult logger = logging.getLogger(__name__) @@ -66,3 +72,24 @@ def generate_json_schema(plugin: BuildDatasourcePlugin, pretty_print: bool = Tru def format_json_schema_for_output(plugin: BuildDatasourcePlugin, json_schema: str) -> str: return os.linesep.join([f"JSON Schema for plugin {plugin.id}:", json_schema]) + + +def execute_sql_for_datasource( + plugin: BuildDatasourcePlugin, + datasource_type: DatasourceType, + config: Mapping[str, Any], + sql: str, + params: list[Any] | None = None, + read_only: bool = True, +) -> SqlExecutionResult: + if not isinstance(plugin, SqlRunnablePlugin): + raise ValueError("Sql query execution can only be performed on SqlRunnablePlugin") + + validated_config = _validate_datasource_config_file(config, plugin) + + return plugin.run_sql( + file_config=validated_config, + sql=sql, + params=params, + read_only=read_only, + ) diff --git a/src/databao_context_engine/pluginlib/sql/__init__.py b/src/databao_context_engine/pluginlib/sql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/databao_context_engine/pluginlib/sql/sql_types.py b/src/databao_context_engine/pluginlib/sql/sql_types.py new file mode 100644 index 00000000..c06d149e --- /dev/null +++ b/src/databao_context_engine/pluginlib/sql/sql_types.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass +from typing import Any + + +@dataclass +class SqlExecutionResult: + columns: list[str] + rows: list[tuple[Any, ...]] diff --git a/src/databao_context_engine/plugins/databases/base_db_plugin.py b/src/databao_context_engine/plugins/databases/base_db_plugin.py index 85723857..855412ae 100644 --- a/src/databao_context_engine/plugins/databases/base_db_plugin.py +++ b/src/databao_context_engine/plugins/databases/base_db_plugin.py @@ -9,6 +9,7 @@ EmbeddableChunk, ) from databao_context_engine.pluginlib.config import ConfigPropertyAnnotation +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult from databao_context_engine.plugins.databases.base_introspector import BaseIntrospector from databao_context_engine.plugins.databases.database_chunker import build_database_chunks from databao_context_engine.plugins.databases.introspection_scope import IntrospectionScope @@ -44,3 +45,13 @@ def check_connection(self, full_type: str, datasource_name: str, file_config: T) def divide_context_into_chunks(self, context: Any) -> list[EmbeddableChunk]: return build_database_chunks(context) + + def run_sql( + self, file_config: T, sql: str, params: list[Any] | None = None, read_only: bool = True + ) -> SqlExecutionResult: + return self._introspector.run_sql( + file_config=file_config, + sql=sql, + params=params, + read_only=read_only, + ) diff --git a/src/databao_context_engine/plugins/databases/base_introspector.py b/src/databao_context_engine/plugins/databases/base_introspector.py index 3aa3663c..71994857 100644 --- a/src/databao_context_engine/plugins/databases/base_introspector.py +++ b/src/databao_context_engine/plugins/databases/base_introspector.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from typing import Any, Mapping, Protocol, Sequence, Union +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult from databao_context_engine.plugins.databases.databases_types import ( DatabaseCatalog, DatabaseIntrospectionResult, @@ -133,6 +134,24 @@ def _resolve_pseudo_catalog_name(self, file_config: T) -> str: def _ignored_schemas(self) -> set[str]: return self._IGNORED_SCHEMAS + def run_sql( + self, + file_config: T, + sql: str, + params: list[Any] | None, + read_only: bool, + ) -> SqlExecutionResult: + # for now, we don't have any read-only related logic implemented on the database side + with self._connect(file_config) as connection: + rows_dicts: list[dict] = self._fetchall_dicts(connection, sql, params) + + if not rows_dicts: + return SqlExecutionResult(columns=[], rows=[]) + + columns: list[str] = list(rows_dicts[0].keys()) + rows: list[tuple[Any, ...]] = [tuple(row.get(col) for col in columns) for row in rows_dicts] + return SqlExecutionResult(columns=columns, rows=rows) + @dataclass class SQLQuery: diff --git a/tests/datasources/__init__.py b/tests/datasources/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/datasources/test_engine_run_sql.py b/tests/datasources/test_engine_run_sql.py new file mode 100644 index 00000000..c2fcb3fb --- /dev/null +++ b/tests/datasources/test_engine_run_sql.py @@ -0,0 +1,124 @@ +import pytest + +from databao_context_engine import DatabaoContextEngine, DatasourceId +from databao_context_engine.plugin_loader import DatabaoContextPluginLoader +from databao_context_engine.pluginlib.build_plugin import ( + DatasourceType, + DefaultBuildDatasourcePlugin, + NotSupportedError, +) +from databao_context_engine.pluginlib.sql.sql_types import SqlExecutionResult +from tests.utils.project_creation import given_datasource_config_file + + +class DummySqlPlugin(DefaultBuildDatasourcePlugin): + id = "dummy/dumme_sql" + name = "Dummy SQL Plugin" + + def supported_types(self) -> set[str]: + return {"dummy_sql"} + + def build_context(self, full_type: str, datasource_name: str, file_config: dict) -> dict: + return {"ok": True} + + def run_sql( + self, file_config: dict, sql: str, params: list[object] | None = None, read_only: bool = True + ) -> SqlExecutionResult: + cols = ["a", "b"] + row = (sql, tuple(params) if params is not None else None) + return SqlExecutionResult(columns=cols, rows=[row]) + + +class DummyNonSqlPlugin(DefaultBuildDatasourcePlugin): + id = "dummy/dummy_no_sql" + name = "Dummy Non-SQL Plugin" + + def supported_types(self) -> set[str]: + return {"dummy_no_sql"} + + def build_context(self, full_type: str, datasource_name: str, file_config: dict) -> dict: + return {"ok": True} + + +def _plugins_map_with(*plugins): + mapping: dict[DatasourceType, object] = {} + for p in plugins: + for t in p.supported_types(): + mapping[DatasourceType(full_type=t)] = p + return mapping + + +@pytest.fixture +def patch_plugins(mocker): + def _apply(plugins_map): + mocker.patch( + "databao_context_engine.plugin_loader.load_plugins", + new=lambda: plugins_map, + ) + return DatabaoContextPluginLoader() + + return _apply + + +def test_engine_run_sql_happy_path(project_path, patch_plugins): + plugins_map = _plugins_map_with(DummySqlPlugin()) + patch_plugins(plugins_map) + + engine = DatabaoContextEngine(project_dir=project_path) + given_datasource_config_file( + engine._project_layout, + datasource_name="databases/my_ds", + config_content={"type": "dummy_sql", "name": "my_ds"}, + ) + + ds_id = DatasourceId.from_string_repr("databases/my_ds.yaml") + + res = engine.run_sql(ds_id, "SELECT 1", params=[123], read_only=True) + + assert res.columns == ["a", "b"] + assert res.rows and isinstance(res.rows[0], tuple) + + +def test_engine_run_sql_params_passthrough(project_path, patch_plugins): + received = {} + + class CapturingSqlPlugin(DummySqlPlugin): + def run_sql( + self, file_config: dict, sql: str, params: list[object] | None = None, read_only: bool = True + ) -> SqlExecutionResult: + received["sql"] = sql + received["params"] = params + return super().run_sql(file_config, sql, params, read_only) + + plugins_map = _plugins_map_with(CapturingSqlPlugin()) + patch_plugins(plugins_map) + + engine = DatabaoContextEngine(project_dir=project_path) + given_datasource_config_file( + engine._project_layout, + datasource_name="databases/my_ds2", + config_content={"type": "dummy_sql", "name": "my_ds2"}, + ) + ds_id = DatasourceId.from_string_repr("databases/my_ds2.yaml") + + params = ["x", 42] + engine.run_sql(ds_id, "SELECT $1, $2", params=params) + + assert received["sql"].startswith("SELECT") + assert received["params"] == params + + +def test_engine_run_sql_unsupported_plugin(project_path, patch_plugins): + plugins_map = _plugins_map_with(DummyNonSqlPlugin()) + patch_plugins(plugins_map) + + engine = DatabaoContextEngine(project_dir=project_path) + given_datasource_config_file( + engine._project_layout, + datasource_name="databases/no_sql", + config_content={"type": "dummy_no_sql", "name": "no_sql"}, + ) + ds_id = DatasourceId.from_string_repr("databases/no_sql.yaml") + + with pytest.raises(NotSupportedError): + engine.run_sql(ds_id, "SELECT 1") From 586c7148f239201ec9a5d6a098e5f19f2d414441 Mon Sep 17 00:00:00 2001 From: Anna Vlasova Date: Mon, 9 Feb 2026 16:47:39 +0100 Subject: [PATCH 2/3] Create DatabaoContextEngine instead of DatabaoContextProjectManager --- src/databao_context_engine/cli/datasources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databao_context_engine/cli/datasources.py b/src/databao_context_engine/cli/datasources.py index 854fd496..9611ce01 100644 --- a/src/databao_context_engine/cli/datasources.py +++ b/src/databao_context_engine/cli/datasources.py @@ -5,6 +5,7 @@ from databao_context_engine import ( CheckDatasourceConnectionResult, + DatabaoContextEngine, DatabaoContextProjectManager, DatasourceConnectionStatus, DatasourceId, @@ -65,8 +66,7 @@ def _print_check_datasource_connection_results(results: list[CheckDatasourceConn def run_sql_query_cli(project_dir: Path, *, datasource_id: DatasourceId, sql: str) -> None: - databao_project_manager = DatabaoContextProjectManager(project_dir=project_dir) - databao_engine = databao_project_manager.get_engine_for_project() + databao_engine = DatabaoContextEngine(project_dir=project_dir) result = databao_engine.run_sql(datasource_id=datasource_id, sql=sql, params=None) # save somewhere or pretty print From 2f196966052b25b6077ef1f90f65a8ae9855e160 Mon Sep 17 00:00:00 2001 From: Anna Vlasova Date: Mon, 9 Feb 2026 16:53:32 +0100 Subject: [PATCH 3/3] Move directly to the tests folder --- tests/{datasources => }/test_engine_run_sql.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{datasources => }/test_engine_run_sql.py (100%) diff --git a/tests/datasources/test_engine_run_sql.py b/tests/test_engine_run_sql.py similarity index 100% rename from tests/datasources/test_engine_run_sql.py rename to tests/test_engine_run_sql.py