diff --git a/README.md b/README.md index 2c63755..78c5fe8 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,12 @@ BigQuery extras (only needed for `build_schema`): pip install analytics-query-analyzer[bigquery] ``` +Redshift extras (only needed for `build_schema`): + +```bash +pip install analytics-query-analyzer[redshift] +``` + ## Support BigQuery is currently supported and tested. @@ -129,7 +135,22 @@ print(schema) ``` - Authentication uses Application Default Credentials (ADC). -- Only BigQuery is supported for `build_schema`. - When `table` is omitted, it scans all tables in the dataset. - When both `dataset` and `table` are omitted, it scans all datasets in the project. - The returned `schema` can be passed directly to `analyze` and `analyze_timespan`. + +Fetching from Redshift is also supported: + +```python +from analytics_query_analyzer import build_schema +from sqlglot import dialects + +schema = build_schema(dialects.Redshift, "my_database", "public", "orders") +print(schema) +``` + +Redshift authentication supports two modes: + +- Set `REDSHIFT_HOST`, `REDSHIFT_USER`, and `REDSHIFT_PASSWORD` (optional `REDSHIFT_PORT`). +- Set `REDSHIFT_CLUSTER_IDENTIFIER`, `REDSHIFT_REGION`, and `REDSHIFT_DB_USER` to use IAM (you can also set `REDSHIFT_HOST`/`REDSHIFT_PORT`). +- Use `AWS_PROFILE` if you want to select a non-default AWS profile. diff --git a/pyproject.toml b/pyproject.toml index ad87a1b..26a0536 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dynamic = ["version"] [project.optional-dependencies] bigquery = ["google-cloud-bigquery"] +redshift = ["redshift-connector"] [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/analytics_query_analyzer/schema_builder.py b/src/analytics_query_analyzer/schema_builder.py index 28c8fba..3bf732e 100644 --- a/src/analytics_query_analyzer/schema_builder.py +++ b/src/analytics_query_analyzer/schema_builder.py @@ -1,20 +1,26 @@ +import os + from sqlglot import dialects -SUPPORTED_DIALECTS = {dialects.BigQuery} +SUPPORTED_DIALECTS = {dialects.BigQuery, dialects.Redshift} def build_schema( dialect: str | type[dialects.Dialect], database: str, - schema: str | None, - table: str | None, + schema: str | None = None, + table: str | None = None, ): resolved = ( dialects.get_or_raise(dialect.lower()) if isinstance(dialect, str) else dialect ) if resolved not in SUPPORTED_DIALECTS: raise ValueError(f"Unsupported dialect for build_schema: {resolved.__name__}") - return _build_schema_bigquery(database, schema, table) + if resolved is dialects.BigQuery: + return _build_schema_bigquery(database, schema, table) + if resolved is dialects.Redshift: + return _build_schema_redshift(database, schema, table) + raise ValueError(f"Unsupported dialect for build_schema: {resolved.__name__}") def _build_schema_bigquery(project: str, dataset: str | None, table: str | None): @@ -69,3 +75,115 @@ def _dataset_to_schema(dataset_id: str) -> dict: table_ref = dataset_ref.table(table) table_obj = client.get_table(table_ref) return {project: {dataset: _table_to_schema(table_obj)}} + + +def _redshift_connection_kwargs(database: str) -> dict[str, str | int | bool]: + host = os.getenv("REDSHIFT_HOST") + port = os.getenv("REDSHIFT_PORT") + + cluster_identifier = os.getenv("REDSHIFT_CLUSTER_IDENTIFIER") + region = os.getenv("REDSHIFT_REGION") + db_user = os.getenv("REDSHIFT_DB_USER") + + user = os.getenv("REDSHIFT_USER") + password = os.getenv("REDSHIFT_PASSWORD") + + if cluster_identifier and region and db_user: + kwargs = { + "cluster_identifier": cluster_identifier, + "database": database, + "db_user": db_user, + "region": region, + "iam": True, + } + if host: + kwargs["host"] = host + if port: + kwargs["port"] = int(port) + return kwargs + + if host and user and password: + return { + "host": host, + "port": int(port) if port else 5439, + "database": database, + "user": user, + "password": password, + } + + raise RuntimeError( + "Redshift connection configuration not found. " + "Set REDSHIFT_HOST/REDSHIFT_USER/REDSHIFT_PASSWORD or " + "REDSHIFT_CLUSTER_IDENTIFIER/REDSHIFT_REGION/REDSHIFT_DB_USER." + ) + + +def _build_schema_redshift(database: str, schema: str | None, table: str | None): + try: + import redshift_connector + except ImportError as e: + raise RuntimeError( + "Redshift support is not installed. " + "Install with: pip install analytics-query-analyzer[redshift]" + ) from e + + connection_kwargs = _redshift_connection_kwargs(database) + conn = redshift_connector.connect(**connection_kwargs) + try: + cursor = conn.cursor() + try: + if schema is None: + cursor.execute( + """ + select schema_name + from information_schema.schemata + where schema_name not in ('information_schema', 'pg_catalog') + and schema_name not like 'pg_%' + order by schema_name + """ + ) + schemas = [row[0] for row in cursor.fetchall()] + else: + schemas = [schema] + + result: dict[str, dict] = {} + for schema_name in schemas: + if table is None: + cursor.execute( + """ + select table_name + from information_schema.tables + where table_schema = %s + and table_type = 'BASE TABLE' + order by table_name + """, + (schema_name,), + ) + tables = [row[0] for row in cursor.fetchall()] + else: + tables = [table] + if not tables: + result[schema_name] = {} + continue + + placeholders = ", ".join(["%s"] * len(tables)) + cursor.execute( + f""" + select table_name, column_name, data_type, udt_name + from information_schema.columns + where table_schema = %s + and table_name in ({placeholders}) + order by table_name, ordinal_position + """, + (schema_name, *tables), + ) + table_schema: dict[str, dict[str, str]] = {name: {} for name in tables} + for table_name, column_name, data_type, udt_name in cursor.fetchall(): + column_type = data_type or udt_name or "unknown" + table_schema.setdefault(table_name, {})[column_name] = column_type + result[schema_name] = table_schema + finally: + cursor.close() + finally: + conn.close() + return {database: result}