-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathclient.py
86 lines (68 loc) · 2.4 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from typing import Any, Dict, List, Optional
import requests
from pydantic import BaseModel
from metaphor.common.logger import get_logger
logger = get_logger()
class DbtConnection(BaseModel, extra="allow"):
account_id: int
project_id: Optional[int] = None
name: str
type: str
state: int
details: Dict[str, Any]
class DbtProject(BaseModel, extra="allow"):
id: int
name: str
account_id: int
description: Optional[str] = None
created_at: str
updated_at: str
deleted_at: Optional[str] = None
# connection can be None even though it's marked as required in https://docs.getdbt.com/dbt-cloud/api-v3#/operations/List%20Projects
connection: Optional[DbtConnection] = None
class DbtEnvironment(BaseModel, extra="allow"):
id: int
account_id: int
project_id: int
name: str
dbt_version: str
type: str
deployment_type: Optional[str] = None
state: int
created_at: str
updated_at: str
class DbtAdminAPIClient:
"""A client that wraps the dbt Cloud Administrative API
See https://docs.getdbt.com/dbt-cloud/api-v3 for more details.
"""
def __init__(
self,
base_url: str,
account_id: int,
service_token: str,
):
self.admin_api_base_url = f"{base_url}/api/v3"
self.account_id = account_id
self.service_token = service_token
def _get(self, path: str, params: Optional[Dict] = None):
url = f"{self.admin_api_base_url}/accounts/{self.account_id}/{path}"
logger.debug(f"Sending request to {url}")
req = requests.get(
url,
params=params,
headers={
"Content-Type": "application/json",
"Authorization": f"Token {self.service_token}",
},
timeout=60, # request timeout 60s
)
assert req.status_code == 200, f"{url} returned {req.status_code}"
return req.json()
def list_projects(self) -> List[DbtProject]:
"""Get all projects in the account"""
resp = self._get("projects/")
return [DbtProject.model_validate(project) for project in resp.get("data")]
def list_environments(self, project_id: int) -> List[DbtEnvironment]:
"""Get all environments under the project"""
resp = self._get(f"projects/{project_id}/environments/")
return [DbtEnvironment.model_validate(env) for env in resp.get("data")]