Skip to content

Commit 0e1f617

Browse files
Cyber1000rgerganov
authored andcommitted
Added tests for http
1 parent a644733 commit 0e1f617

File tree

3 files changed

+200
-0
lines changed

3 files changed

+200
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ build/
33
dist/
44
env/
55
*.egg-info/
6+
.coverage
7+
.vscode

pyairctrl/http_client.py

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def load_key(self):
149149
self._get_key()
150150
else:
151151
self._get_key()
152+
return self._session_key
152153

153154
def _check_key(self):
154155
url = "http://{}/di/v1/products/1/air".format(self._host)

testing/http_client_tests.py

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# pylint: disable=invalid-name, missing-class-docstring, missing-function-docstring
2+
3+
import unittest
4+
from multiprocessing import Process
5+
import os
6+
import time
7+
import json
8+
import random
9+
import base64
10+
import flask
11+
import requests
12+
from Cryptodome.Cipher import AES
13+
from Cryptodome.Util.Padding import pad
14+
from pyairctrl.http_client import HTTPAirClient
15+
16+
G = int(
17+
"A4D1CBD5C3FD34126765A442EFB99905F8104DD258AC507FD6406CFF14266D31266FEA1E5C41564B777E690F5504F213160217B4B01B886A5E91547F9E2749F4D7FBD7D3B9A92EE1909D0D2263F80A76A6A24C087A091F531DBF0A0169B6A28AD662A4D18E73AFA32D779D5918D08BC8858F4DCEF97C2A24855E6EEB22B3B2E5",
18+
16,
19+
)
20+
21+
P = int(
22+
"B10B8F96A080E01DDE92DE5EAE5D54EC52C99FBCFB06A3C69A6A9DCA52D23B616073E28675A23D189838EF1E2EE652C013ECB4AEA906112324975C3CD49B83BFACCBDD7D90C4BD7098488E9C219A73724EFFD6FAE5644738FAA31A4FF55BCCC0A151AF5F0DC8B4BD45BF37DF365C1A65E68CFDA76D4DA708DF1FB2BC2E4A4371",
23+
16,
24+
)
25+
26+
27+
def encrypt(values, key):
28+
data = pad(bytearray(values, "ascii"), 16, style="pkcs7")
29+
iv = bytes(16)
30+
cipher = AES.new(key, AES.MODE_CBC, iv)
31+
data_enc = cipher.encrypt(data)
32+
return data_enc
33+
34+
35+
def padding_encrypt(values, key):
36+
# add two random bytes in front of the body
37+
data = "AA" + json.dumps(values)
38+
data = pad(bytearray(data, "ascii"), 16, style="pkcs7")
39+
iv = bytes(16)
40+
cipher = AES.new(key, AES.MODE_CBC, iv)
41+
data_enc = cipher.encrypt(data)
42+
return base64.b64encode(data_enc)
43+
44+
45+
class HttpServer:
46+
def __init__(self, port):
47+
super().__init__()
48+
self.flask_server = flask.Flask(__name__)
49+
self.port = port
50+
self.process = Process(target=self._run)
51+
os.environ["FLASK_ENV"] = "development"
52+
53+
def _test_connection(self):
54+
index_url = "http://127.0.0.1:{}".format(self.port)
55+
try:
56+
requests.get(url=index_url)
57+
return True
58+
except requests.exceptions.ConnectionError:
59+
return False
60+
61+
def _run(self):
62+
self.flask_server.run(port=self.port, debug=False)
63+
64+
def start(self):
65+
self.process.start()
66+
while not self._test_connection():
67+
time.sleep(1)
68+
69+
def stop(self):
70+
self.process.terminate()
71+
self.process.join()
72+
73+
def add_url_rule(self, rule, view_func, methods):
74+
self.flask_server.add_url_rule(rule, view_func=view_func, methods=methods)
75+
76+
77+
class HTTPClientTests(unittest.TestCase):
78+
device_key = "1234567890123456"
79+
current_dataset = ""
80+
81+
@classmethod
82+
def setUpClass(cls):
83+
cls.httpServer = HttpServer(80)
84+
cls.airClient = HTTPAirClient("127.0.0.1")
85+
dir_path = os.path.dirname(os.path.realpath(__file__))
86+
with open(os.path.join(dir_path, "data.json"), "r") as json_file:
87+
cls.test_data = json.load(json_file)
88+
cls.httpServer.add_url_rule(
89+
"/di/v1/products/0/security", view_func=cls.security, methods=["PUT"]
90+
)
91+
cls.httpServer.add_url_rule(
92+
"/di/v1/products/1/air", view_func=cls.get_status, methods=["GET"]
93+
)
94+
cls.httpServer.add_url_rule(
95+
"/di/v1/products/0/wifi", view_func=cls.get_wifi, methods=["GET"]
96+
)
97+
cls.httpServer.add_url_rule(
98+
"/di/v1/products/0/firmware", view_func=cls.get_firmware, methods=["GET"]
99+
)
100+
cls.httpServer.add_url_rule(
101+
"/di/v1/products/1/fltsts", view_func=cls.get_filters, methods=["GET"]
102+
)
103+
cls.httpServer.start()
104+
105+
@classmethod
106+
def tearDownClass(cls):
107+
cls.httpServer.stop()
108+
109+
# def test_ssdp(self):
110+
# # missing
111+
# pass
112+
113+
def test_get_valid_session_key(self):
114+
fpath = os.path.expanduser("~/../.pyairctrl")
115+
if os.path.isfile(fpath):
116+
os.remove(fpath)
117+
118+
current_key = self.airClient.load_key()
119+
self.assertEqual(current_key.decode("ascii"), self.device_key)
120+
121+
# def test_set_values(self):
122+
# # missing
123+
# pass
124+
125+
# def test_set_wifi(self):
126+
# # missing
127+
# pass
128+
129+
def test_get_status_is_valid(self):
130+
self.assert_json_data(self.airClient.get_status, "AC2729-status")
131+
132+
def test_get_wifi_is_valid(self):
133+
self.assert_json_data(self.airClient.get_wifi, "AC2729-wifi")
134+
135+
def test_get_firmware_is_valid(self):
136+
self.assert_json_data(self.airClient.get_firmware, "AC2729-firmware")
137+
138+
def test_get_filters_is_valid(self):
139+
self.assert_json_data(self.airClient.get_filters, "AC2729-fltsts")
140+
141+
# def test_pair(self):
142+
# # missing
143+
# pass
144+
145+
def assert_json_data(self, air_func, dataset):
146+
result = air_func()
147+
data = self.test_data[dataset]["data"]
148+
json_data = json.loads(data)
149+
self.assertEqual(result, json_data)
150+
151+
@classmethod
152+
def security(cls):
153+
b = random.getrandbits(256)
154+
B = pow(G, b, P)
155+
156+
dh = json.loads(flask.request.get_data().decode("ascii"))
157+
A = int(dh["diffie"], 16)
158+
s = pow(A, b, P)
159+
s_bytes = s.to_bytes(128, byteorder="big")[:16]
160+
161+
session_key_encrypted = encrypt(cls.device_key, s_bytes)
162+
163+
data = json.dumps(
164+
{"key": session_key_encrypted.hex(), "hellman": format(B, "x")}
165+
)
166+
data_enc = data.encode("ascii")
167+
168+
return data_enc
169+
170+
@classmethod
171+
def get_status(cls):
172+
return cls.callback_encode("AC2729-status")
173+
174+
@classmethod
175+
def get_wifi(cls):
176+
return cls.callback_encode("AC2729-wifi")
177+
178+
@classmethod
179+
def get_firmware(cls):
180+
return cls.callback_encode("AC2729-firmware")
181+
182+
@classmethod
183+
def get_filters(cls):
184+
return cls.callback_encode("AC2729-fltsts")
185+
186+
@classmethod
187+
def callback_encode(cls, dataset):
188+
data = cls.test_data[dataset]["data"]
189+
json_data = json.loads(data)
190+
encrypted_data = padding_encrypt(
191+
json_data, bytes(cls.device_key.encode("ascii"))
192+
)
193+
return encrypted_data
194+
195+
196+
if __name__ == "__main__":
197+
unittest.main()

0 commit comments

Comments
 (0)