Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ BigQuery extras (only needed for `build_schema`):
pip install analytics-query-analyzer[bigquery]
```

Redshift extras (only needed for `build_schema`):

```bash
pip install analytics-query-analyzer[redshift]
```

## Support

BigQuery is currently supported and tested.
Expand Down Expand Up @@ -129,7 +135,22 @@ print(schema)
```

- Authentication uses Application Default Credentials (ADC).
- Only BigQuery is supported for `build_schema`.
- When `table` is omitted, it scans all tables in the dataset.
- When both `dataset` and `table` are omitted, it scans all datasets in the project.
- The returned `schema` can be passed directly to `analyze` and `analyze_timespan`.

Fetching from Redshift is also supported:

```python
from analytics_query_analyzer import build_schema
from sqlglot import dialects

schema = build_schema(dialects.Redshift, "my_database", "public", "orders")
print(schema)
```

Redshift authentication supports two modes:

- Set `REDSHIFT_HOST`, `REDSHIFT_USER`, and `REDSHIFT_PASSWORD` (optional `REDSHIFT_PORT`).
- Set `REDSHIFT_CLUSTER_IDENTIFIER`, `REDSHIFT_REGION`, and `REDSHIFT_DB_USER` to use IAM (you can also set `REDSHIFT_HOST`/`REDSHIFT_PORT`).
- Use `AWS_PROFILE` if you want to select a non-default AWS profile.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dynamic = ["version"]

[project.optional-dependencies]
bigquery = ["google-cloud-bigquery"]
redshift = ["redshift-connector"]

[tool.setuptools]
package-dir = {"" = "src"}
Expand Down
126 changes: 122 additions & 4 deletions src/analytics_query_analyzer/schema_builder.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import os

from sqlglot import dialects

SUPPORTED_DIALECTS = {dialects.BigQuery}
SUPPORTED_DIALECTS = {dialects.BigQuery, dialects.Redshift}


def build_schema(
dialect: str | type[dialects.Dialect],
database: str,
schema: str | None,
table: str | None,
schema: str | None = None,
table: str | None = None,
):
resolved = (
dialects.get_or_raise(dialect.lower()) if isinstance(dialect, str) else dialect
)
if resolved not in SUPPORTED_DIALECTS:
raise ValueError(f"Unsupported dialect for build_schema: {resolved.__name__}")
return _build_schema_bigquery(database, schema, table)
if resolved is dialects.BigQuery:
return _build_schema_bigquery(database, schema, table)
if resolved is dialects.Redshift:
return _build_schema_redshift(database, schema, table)
raise ValueError(f"Unsupported dialect for build_schema: {resolved.__name__}")


def _build_schema_bigquery(project: str, dataset: str | None, table: str | None):
Expand Down Expand Up @@ -69,3 +75,115 @@ def _dataset_to_schema(dataset_id: str) -> dict:
table_ref = dataset_ref.table(table)
table_obj = client.get_table(table_ref)
return {project: {dataset: _table_to_schema(table_obj)}}


def _redshift_connection_kwargs(database: str) -> dict[str, str | int | bool]:
host = os.getenv("REDSHIFT_HOST")
port = os.getenv("REDSHIFT_PORT")

cluster_identifier = os.getenv("REDSHIFT_CLUSTER_IDENTIFIER")
region = os.getenv("REDSHIFT_REGION")
db_user = os.getenv("REDSHIFT_DB_USER")

user = os.getenv("REDSHIFT_USER")
password = os.getenv("REDSHIFT_PASSWORD")

if cluster_identifier and region and db_user:
kwargs = {
"cluster_identifier": cluster_identifier,
"database": database,
"db_user": db_user,
"region": region,
"iam": True,
}
if host:
kwargs["host"] = host
if port:
kwargs["port"] = int(port)
return kwargs

if host and user and password:
return {
"host": host,
"port": int(port) if port else 5439,
"database": database,
"user": user,
"password": password,
}

raise RuntimeError(
"Redshift connection configuration not found. "
"Set REDSHIFT_HOST/REDSHIFT_USER/REDSHIFT_PASSWORD or "
"REDSHIFT_CLUSTER_IDENTIFIER/REDSHIFT_REGION/REDSHIFT_DB_USER."
)


def _build_schema_redshift(database: str, schema: str | None, table: str | None):
try:
import redshift_connector
except ImportError as e:
raise RuntimeError(
"Redshift support is not installed. "
"Install with: pip install analytics-query-analyzer[redshift]"
) from e

connection_kwargs = _redshift_connection_kwargs(database)
conn = redshift_connector.connect(**connection_kwargs)
try:
cursor = conn.cursor()
try:
if schema is None:
cursor.execute(
"""
select schema_name
from information_schema.schemata
where schema_name not in ('information_schema', 'pg_catalog')
and schema_name not like 'pg_%'
order by schema_name
"""
)
schemas = [row[0] for row in cursor.fetchall()]
else:
schemas = [schema]

result: dict[str, dict] = {}
for schema_name in schemas:
if table is None:
cursor.execute(
"""
select table_name
from information_schema.tables
where table_schema = %s
and table_type = 'BASE TABLE'
order by table_name
""",
(schema_name,),
)
tables = [row[0] for row in cursor.fetchall()]
else:
tables = [table]
if not tables:
result[schema_name] = {}
continue

placeholders = ", ".join(["%s"] * len(tables))
cursor.execute(
f"""
select table_name, column_name, data_type, udt_name
from information_schema.columns
where table_schema = %s
and table_name in ({placeholders})
order by table_name, ordinal_position
""",
(schema_name, *tables),
)
table_schema: dict[str, dict[str, str]] = {name: {} for name in tables}
for table_name, column_name, data_type, udt_name in cursor.fetchall():
column_type = data_type or udt_name or "unknown"
table_schema.setdefault(table_name, {})[column_name] = column_type
result[schema_name] = table_schema
finally:
cursor.close()
finally:
conn.close()
return {database: result}