Skip to content

Commit 5084879

Browse files
eakmanrqcrericha
andauthored
Add BigQuery auth support for native and dbt (#648)
* add bq auth support for native and dbt * fix literal * support both dataset and schema * set default for dataset in example * [dbt] Only load the target config within the profile. * [dbt] Only render the profile target's fields --------- Co-authored-by: Chris Rericha <tristipher@gmail.com>
1 parent abd3747 commit 5084879

File tree

11 files changed

+268
-131
lines changed

11 files changed

+268
-131
lines changed

docs/guides/connections.md

Lines changed: 5 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -38,85 +38,8 @@ sqlmesh --test-connection local_db plan
3838

3939
## Supported engines
4040

41-
### BigQuery
42-
TBD
43-
44-
See the [engine configuration reference](../integrations/engines.md#bigquery---localbuilt-in-scheduler) for more details.
45-
46-
### Databricks
47-
48-
A Databricks connection should be configured as follows:
49-
```yaml linenums="1"
50-
connections:
51-
my_databricks_connection:
52-
type: databricks
53-
server_hostname: [server hostname]
54-
access_token: [access token]
55-
http_headers: [optional, list of key-value pairs]
56-
session_configuration: [optional, key-value mapping]
57-
concurrent_tasks: [optional, should be greater than 0]
58-
```
59-
60-
See the [engine configuration reference](../integrations/engines.md#databricks---localbuilt-in-scheduler) for more details.
61-
62-
### DuckDB
63-
64-
A DuckDB connection should be configured as follows:
65-
```yaml linenums="1"
66-
connections:
67-
my_duckdb_connection:
68-
type: duckdb
69-
database: [optional, path to the database file]
70-
```
71-
72-
See the [engine configuration reference](../reference/configuration.md#duckdb) for more details.
73-
74-
### Redshift
75-
76-
A Redshift connection should be configured as follows:
77-
```yaml linenums="1"
78-
connections:
79-
my_redshift_connection:
80-
type: redshift
81-
user: [optional, username]
82-
password: [optional, password]
83-
database: [optional, database]
84-
host: [optional, hostname]
85-
port: [optional, port]
86-
ssl: [optional, boolean flag which determines whether SSL is enabled]
87-
sslmode: [optional, the security of the connection to the Amazon Redshift cluster]
88-
timeout: [optional, connection timeout]
89-
tcp_keepalive: [optional, boolean flag which determines whether to use TCP Keepalives]
90-
application_name: [optional, the application name]
91-
preferred_role: [optional, the IAM role]
92-
principal_arn: [optional, the ARN for the IAM entity (user or role)]
93-
credentials_provider: [optional, the class name of the IdP that will be used for authentication]
94-
region: [optional, the AWS region]
95-
cluster_identifier: [optional, the cluster identifier]
96-
iam: [optional, boolean flag which determines whether the IAM authentication should be used]
97-
is_serverless: [optional, whether the Redshift endpoint is serverless or provisional]
98-
serverless_acct_id: [optional, serverless account ID]
99-
serverless_work_group: [optional, serverless work group]
100-
concurrent_tasks: [optional, should be greater than 0]
101-
```
102-
103-
See the [engine configuration reference](../integrations/engines.md#redshift---localbuilt-in-scheduler) for more details.
104-
105-
### Snowflake
106-
107-
A Snowflake connection should be configured as follows:
108-
```yaml linenums="1"
109-
connections:
110-
my_snowflake_connection:
111-
type: snowflake
112-
user: [required if not using Okta, username]
113-
password: [required if using password]
114-
authenticator: [required if using externalbrowser]
115-
account: [required, account ID]
116-
warehouse: [optional, warehouse name]
117-
database: [optional, database name]
118-
role: [optional, user role]
119-
concurrent_tasks: [optional, should be greater than 0]
120-
```
121-
122-
See the [engine configuration reference](../integrations/engines.md#snowflake---localbuilt-in-scheduler) for more details.
41+
* [BigQuery](../integrations/engines.md#bigquery---localbuilt-in-scheduler)
42+
* [Databricks](../integrations/engines.md#databricks---localbuilt-in-scheduler)
43+
* [Redshift](../integrations/engines.md#redshift---localbuilt-in-scheduler)
44+
* [Snowflake](../integrations/engines.md#snowflake---localbuilt-in-scheduler)
45+
* [Spark](../integrations/engines.md#spark---localbuilt-in-scheduler)

docs/integrations/engines.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,19 @@
22

33
# BigQuery
44
## BigQuery - Local/Built-in Scheduler
5-
Currently relies on local configuration of `gcloud` CLI to be authenticated in order to connect.
6-
[Github issue to expand supported methods](https://github.com/TobikoData/sqlmesh/issues/270).
5+
| Option | Description | Type | Required |
6+
|-----------------|-----------------------------------------------------------------------------------------------|:------:|:--------:|
7+
| `method` | Connection methods. Can be `oath`, `oauth-secrets`, `service-account`, `service-account-json` | string | N |
8+
| `project` | The name of the GCP project | string | N |
9+
| `location` | The location of for the datasets (can be regional or multi-regional) | string | Y |
10+
| `keyfile` | Path to the keyfile to be used with service-account method | string | Y |
11+
| `keyfile_json` | Keyfile information provided inline (not recommended) | dict | N |
12+
| `token` | Oath secret auth token | string | N |
13+
| `refresh_token` | Oath secret auth refresh_token | string | N |
14+
| `client_id` | Oath secret auth client_id | string | N |
15+
| `client_secret` | Oath secret auth client_secret | string | N |
16+
| `token_uri` | Oath secret auth token_uri | string | N |
17+
| `scopes` | Oath secret auth scopes | list | N |
718

819
## BigQuery - Airflow Scheduler
920
**Engine Name:** `bigquery`

examples/airflow/Dockerfile.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@ RUN mkdir /opt/sqlmesh/sqlmesh
5959
RUN chown -R airflow /opt/sqlmesh
6060
USER airflow
6161
RUN cd /opt/sqlmesh && pip install -e .
62+
RUN pip install dbt-core

examples/sushi_dbt/profiles.yml

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,21 @@ sushi:
88
path: 'local.duckdb'
99
schema: sushi
1010
snowflake:
11-
account: "{{ env_var('SNOWFLAKE_ACCOUNT', '') }}"
11+
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
1212
database: sushi
13-
password: "{{ env_var('SNOWFLAKE_PASSWORD', '') }}"
14-
role: "{{ env_var('SNOWFLAKE_ROLE', '') }}"
13+
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
14+
role: "{{ env_var('SNOWFLAKE_ROLE') }}"
1515
schema: sushi
1616
threads: 1
1717
type: snowflake
18-
user: "{{ env_var('SNOWFLAKE_USER', '') }}"
19-
warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE', '') }}"
18+
user: "{{ env_var('SNOWFLAKE_USER') }}"
19+
warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}"
20+
bigquery:
21+
type: bigquery
22+
method: service-account
23+
project: "{{ env_var('BQ_PROJECT') }}"
24+
dataset: "{{ env_var('BQ_SCHEMA') }}"
25+
threads: 1
26+
keyfile: "{{ env_var('BQ_KEYFILE') }}"
27+
location: "{{ env_var('BQ_LOCATION') }}"
2028
target: in_memory

sqlmesh/core/config/connection.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import abc
44
import sys
55
import typing as t
6+
from enum import Enum
67

78
from pydantic import Field, root_validator
89

@@ -336,13 +337,33 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
336337
return {}
337338

338339

340+
class BigQueryConnectionMethod(str, Enum):
341+
OAUTH = "oauth"
342+
OAUTH_SECRETS = "oauth-secrets"
343+
SERVICE_ACCOUNT = "service-account"
344+
SERVICE_ACCOUNT_JSON = "service-account-json"
345+
346+
339347
class BigQueryConnectionConfig(_ConnectionConfig):
340348
"""
341349
BigQuery Connection Configuration.
342-
343-
TODO: Need to update to support all the different authentication options
344350
"""
345351

352+
method: BigQueryConnectionMethod = BigQueryConnectionMethod.OAUTH
353+
354+
project: t.Optional[str] = None
355+
location: t.Optional[str] = None
356+
# Keyfile Auth
357+
keyfile: t.Optional[str] = None
358+
keyfile_json: t.Optional[t.Dict[str, t.Any]] = None
359+
# Oath Secret Auth
360+
token: t.Optional[str] = None
361+
refresh_token: t.Optional[str] = None
362+
client_id: t.Optional[str] = None
363+
client_secret: t.Optional[str] = None
364+
token_uri: t.Optional[str] = None
365+
scopes: t.Tuple[str, ...] = ("https://www.googleapis.com/auth/bigquery",)
366+
346367
concurrent_tasks: int = 4
347368

348369
type_: Literal["bigquery"] = Field(alias="type", default="bigquery")
@@ -355,6 +376,45 @@ def _connection_kwargs_keys(self) -> t.Set[str]:
355376
def _engine_adapter(self) -> t.Type[EngineAdapter]:
356377
return engine_adapter.BigQueryEngineAdapter
357378

379+
@property
380+
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
381+
"""The static connection kwargs for this connection"""
382+
import google.auth
383+
from google.api_core import client_info
384+
from google.oauth2 import credentials, service_account
385+
386+
if self.method == BigQueryConnectionMethod.OAUTH:
387+
creds, _ = google.auth.default(scopes=self.scopes)
388+
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT:
389+
creds = service_account.Credentials.from_service_account_file(
390+
self.keyfile, scopes=self.scopes
391+
)
392+
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_JSON:
393+
creds = service_account.Credentials.from_service_account_info(
394+
self.keyfile_json, scopes=self.scopes
395+
)
396+
elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS:
397+
creds = credentials.Credentials(
398+
token=self.token,
399+
refresh_token=self.refresh_token,
400+
client_id=self.client_id,
401+
client_secret=self.client_secret,
402+
token_uri=self.token_uri,
403+
scopes=self.scopes,
404+
)
405+
else:
406+
raise ConfigError("Invalid BigQuery Connection Method")
407+
client = google.cloud.bigquery.Client(
408+
project=self.project,
409+
credentials=creds,
410+
location=self.location,
411+
client_info=client_info.ClientInfo(user_agent="sqlmesh"),
412+
)
413+
414+
return {
415+
"client": client,
416+
}
417+
358418
@property
359419
def _connection_factory(self) -> t.Callable:
360420
from google.cloud.bigquery.dbapi import connect

sqlmesh/dbt/loader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ def sqlmesh_config(project_root: t.Optional[Path] = None, **kwargs: t.Any) -> Co
2222
profile = Profile.load(context)
2323

2424
return Config(
25-
default_connection=profile.default_target,
26-
connections=profile.to_sqlmesh(),
25+
default_connection=profile.target_name,
26+
connections={profile.target_name: profile.target.to_sqlmesh()},
2727
loader=DbtLoader,
2828
**kwargs,
2929
)

sqlmesh/dbt/profile.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import typing as t
44
from pathlib import Path
55

6-
from sqlmesh.core.config.connection import ConnectionConfig
76
from sqlmesh.dbt.common import PROJECT_FILENAME, DbtContext, load_yaml
87
from sqlmesh.dbt.target import TargetConfig
98
from sqlmesh.utils.errors import ConfigError
9+
from sqlmesh.utils.yaml import dumps as dump_yaml
1010

1111

1212
class Profile:
@@ -19,21 +19,21 @@ class Profile:
1919
def __init__(
2020
self,
2121
path: Path,
22-
targets: t.Dict[str, TargetConfig],
23-
default_target: str,
22+
target_name: str,
23+
target: TargetConfig,
2424
):
2525
"""
2626
Args:
2727
path: Path to the profile file
28-
targets: Dict of targets defined for the project
29-
default_target: Name of the default target for the proejct
28+
target_name: Name of the target loaded
29+
target: TargetConfig for target_name
3030
"""
3131
self.path = path
32-
self.targets = targets
33-
self.default_target = default_target
32+
self.target_name = target_name
33+
self.target = target
3434

3535
@classmethod
36-
def load(cls, context: DbtContext) -> Profile:
36+
def load(cls, context: DbtContext, target_name: t.Optional[str] = None) -> Profile:
3737
"""
3838
Loads the profile for the specified project
3939
@@ -59,8 +59,8 @@ def load(cls, context: DbtContext) -> Profile:
5959
if not profile_filepath:
6060
raise ConfigError(f"{cls.PROFILE_FILE} not found.")
6161

62-
targets, default_target = cls._read_profile(profile_filepath, context)
63-
return Profile(profile_filepath, targets, default_target)
62+
target_name, target = cls._read_profile(profile_filepath, context, target_name)
63+
return Profile(profile_filepath, target_name, target)
6464

6565
@classmethod
6666
def _find_profile(cls, project_root: Path) -> t.Optional[Path]:
@@ -77,28 +77,27 @@ def _find_profile(cls, project_root: Path) -> t.Optional[Path]:
7777

7878
@classmethod
7979
def _read_profile(
80-
cls, path: Path, context: DbtContext
81-
) -> t.Tuple[t.Dict[str, TargetConfig], str]:
82-
with open(path, "r", encoding="utf-8") as file:
83-
source = file.read()
84-
contents = load_yaml(context.render(source))
85-
86-
project_data = contents.get(context.profile_name)
80+
cls, path: Path, context: DbtContext, target_name: t.Optional[str] = None
81+
) -> t.Tuple[str, TargetConfig]:
82+
project_data = load_yaml(path).get(context.profile_name)
8783
if not project_data:
8884
raise ConfigError(f"Profile '{context.profile_name}' not found in profiles.")
8985

9086
outputs = project_data.get("outputs")
9187
if not outputs:
9288
raise ConfigError(f"No outputs exist in profiles for '{context.profile_name}'.")
9389

94-
targets = {name: TargetConfig.load(name, output) for name, output in outputs.items()}
95-
default_target = context.render(project_data.get("target"))
96-
if default_target not in targets:
90+
if not target_name:
91+
if "target" not in project_data:
92+
raise ConfigError(f"No target specified for '{context.profile_name}'.")
93+
target_name = context.render(project_data.get("target"))
94+
95+
if target_name not in outputs:
9796
raise ConfigError(
98-
f"Default target '{default_target}' not specified in profiles for '{context.profile_name}'."
97+
f"Target '{target_name}' not specified in profiles for '{context.profile_name}'."
9998
)
10099

101-
return (targets, default_target)
100+
target_fields = load_yaml(context.render(dump_yaml(outputs[target_name])))
101+
target = TargetConfig.load(target_name, target_fields)
102102

103-
def to_sqlmesh(self) -> t.Dict[str, ConnectionConfig]:
104-
return {self.default_target: self.targets[self.default_target].to_sqlmesh()}
103+
return (target_name, target)

sqlmesh/dbt/project.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,8 @@ def load(cls, context: DbtContext) -> Project:
6060
context.render(project_yaml.get("profile", "")) or context.project_name
6161
)
6262

63-
profile = Profile.load(context)
64-
context.target = (
65-
profile.targets[context.target_name]
66-
if context.target_name
67-
else profile.targets[profile.default_target]
68-
)
63+
profile = Profile.load(context, context.target_name)
64+
context.target = profile.target
6965

7066
packages = {}
7167
root_loader = PackageLoader(context, ProjectConfig())

0 commit comments

Comments
 (0)