-
Notifications
You must be signed in to change notification settings - Fork 19
/
aiot_cloud.py
234 lines (206 loc) · 8.05 KB
/
aiot_cloud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import hashlib
import json
import random
import string
import time
import logging
from aiohttp import ClientSession
_LOGGER = logging.getLogger(__name__)
API_DOMAIN = {
"CN": "open-cn.aqara.com",
"USA": "open-usa.aqara.com",
"KR": "open-kr.aqara.com",
"RU": "open-ru.aqara.com",
"GER": "open-ger.aqara.com",
}
# apply APP_ID,KEY_ID 、APP_KEY from https://developer.aqara.com/
APP_ID = "88110776288481280040ace0"
KEY_ID = "K.881107763014836224"
APP_KEY = "t7g6qhx4nmbeqmfq1w6yksucnbrofsgs"
def get_random_string(length: int):
seq = string.ascii_uppercase + string.digits
return "".join((random.choice(seq) for _ in range(length)))
# 生成Headers中的sign
def gen_sign(
access_token: str,
app_id: str,
key_id: str,
nonce: str,
timestamp: str,
app_key: str,
):
"""Signature in headers, see https://opendoc.aqara.cn/docs/%E4%BA%91%E5%AF%B9%E6%8E%A5%E5%BC%80%E5%8F%91%E6%89%8B%E5%86%8C/API%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97/Sign%E7%94%9F%E6%88%90%E8%A7%84%E5%88%99.html"""
s = f"Appid={app_id}&Keyid={key_id}&Nonce={nonce}&Time={timestamp}{app_key}"
if access_token and len(access_token) > 0:
s = f"AccessToken={access_token}&{s}"
s = s.lower()
sign = hashlib.md5(s.encode("utf-8")).hexdigest()
return sign
class AiotCloud:
access_token = None
refresh_token = None
update_token_event_callback = None
def __init__(self, session: ClientSession):
self.app_id = APP_ID
self.key_id = KEY_ID
self.app_key = APP_KEY
self.session = session
self.set_country("CN")
def set_country(self, country: str):
self.country = country
self.api_url = f"https://{API_DOMAIN[country]}/v3.0/open/api"
def _get_request_headers(self):
"""生成Headers"""
nonce = get_random_string(16)
timestamp = str(int(round(time.time() * 1000)))
sign = gen_sign(
self.access_token, self.app_id, self.key_id, nonce, timestamp, self.app_key
)
headers = {
"Content-Type": "application/json",
"Appid": self.app_id,
"Keyid": self.key_id,
"Nonce": nonce,
"Time": timestamp,
"Sign": sign,
"Lang": "zh",
}
if self.access_token:
headers["Accesstoken"] = self.access_token
return headers
async def _async_invoke_aqara_cloud_api(
self, intent: str, only_result: bool = True, list_data: bool = False, **kwargs
):
"""调用Aqara Api"""
try:
empty_keys = []
for k, v in kwargs.items():
if v is None:
empty_keys.append(k)
[kwargs.pop(x) for x in empty_keys]
payload = (
{"intent": intent, "data": [kwargs]}
if list_data
else {"intent": intent, "data": kwargs}
)
r = await self.session.post(
url=self.api_url,
data=json.dumps(payload),
headers=self._get_request_headers(),
)
raw = await r.read()
jo = json.loads(raw)
if only_result:
# 这里的异常处理需要优化
if jo["code"] != 0:
_LOGGER.warn(f"调用Aiot api失败,返回值:{jo}")
if jo["code"] == 108:
_LOGGER.warn(f"Aiot令牌过期或异常,正在尝试自动刷新!")
new_jo = await self.async_refresh_token(self.refresh_token)
if new_jo["code"] == 0:
_LOGGER.info(f"Aiot令牌更新成功!")
return await self._async_invoke_aqara_cloud_api(
intent, only_result, list_data, **kwargs
)
else:
_LOGGER.warn("Aiot令牌更新失败,请重新授权!")
return jo.get("result")
else:
return jo
except Exception as ex:
_LOGGER.error(ex)
async def async_get_auth_code(
self, account: str, account_type: int, access_token_validity: str = "7d"
):
"""获取授权验证码"""
return await self._async_invoke_aqara_cloud_api(
intent="config.auth.getAuthCode",
only_result=False,
account=account,
accountType=account_type,
accessTokenValidity=access_token_validity,
)
async def async_get_token(self, authCode: str, account: str, account_type: int):
"""获取访问令牌"""
jo = await self._async_invoke_aqara_cloud_api(
intent="config.auth.getToken",
only_result=False,
authCode=authCode,
account=account,
accountType=account_type,
)
if jo["code"] == 0:
self.access_token = jo["result"]["accessToken"]
self.refresh_token = jo["result"]["refreshToken"]
if self.update_token_event_callback:
self.update_token_event_callback(self.access_token, self.refresh_token)
return jo
async def async_refresh_token(self, refresh_token: str):
"""刷新访问令牌"""
jo = await self._async_invoke_aqara_cloud_api(
intent="config.auth.refreshToken",
only_result=False,
refreshToken=refresh_token,
)
if jo["code"] == 0:
self.access_token = jo["result"]["accessToken"]
self.refresh_token = jo["result"]["refreshToken"]
if self.update_token_event_callback:
self.update_token_event_callback(self.access_token, self.refresh_token)
return jo
async def async_query_device_info(
self,
dids: list = None,
position_id: str = None,
page_num: int = None,
page_size: int = None,
):
"""查询设备信息"""
resp = await self._async_invoke_aqara_cloud_api(
intent="query.device.info",
dids=dids,
positionId=position_id,
pageNum=page_num,
pageSize=page_size,
)
return resp.get("data")
async def async_query_all_devices_info(self, page_size: int = 50):
"""查询所有设备信息"""
continue_flag = True
page_num = 1
devices = []
while continue_flag:
jo = await self.async_query_device_info(
page_num=page_num, page_size=page_size
)
devices.extend(jo)
if len(jo) < page_size:
continue_flag = False
page_num = page_num + 1
return devices
async def async_query_device_sub_info(self, did: str):
"""查询网关下子设备信息"""
return await self._async_invoke_aqara_cloud_api(
intent="query.device.subInfo", did=did
)
async def async_query_resource_info(self, model: str, resource_id: str = None):
"""查询已开放的资源详情"""
return await self._async_invoke_aqara_cloud_api(
intent="query.resource.info", model=model, resourceId=resource_id
)
async def async_query_resource_value(self, subject_id: str, resource_ids: list):
"""查询资源信息"""
return await self._async_invoke_aqara_cloud_api(
intent="query.resource.value",
resources=[{"subjectId": subject_id, "resourceIds": resource_ids}],
)
async def async_write_resource_device(
self, subject_id: str, resource_id: str, value: str
):
"""控制设备"""
return await self._async_invoke_aqara_cloud_api(
intent="write.resource.device",
list_data=True,
subjectId=subject_id,
resources=[{"resourceId": resource_id, "value": value}],
)