-
Notifications
You must be signed in to change notification settings - Fork 0
/
ZJU_HealthReporter.py
234 lines (196 loc) · 7.88 KB
/
ZJU_HealthReporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# -*- coding: utf-8 -*-
# 打卡脚修改自ZJU-nCov-Hitcarder的开源代码,感谢这位同学开源的代码
# 添加邮件发送机制
import requests
import json
import re
import datetime
import time
import sys
class DaKa(object):
"""Hit card class
Attributes:
username: (str) 浙大统一认证平台用户名(一般为学号)
password: (str) 浙大统一认证平台密码
login_url: (str) 登录url
base_url: (str) 打卡首页url
save_url: (str) 提交打卡url
self.headers: (dir) 请求头
sess: (requests.Session) 统一的session
"""
def __init__(self, username, password):
self.username = username
self.password = password
self.login_url = "https://zjuam.zju.edu.cn/cas/login?service=https%3A%2F%2Fhealthreport.zju.edu.cn%2Fa_zju%2Fapi%2Fsso%2Findex%3Fredirect%3Dhttps%253A%252F%252Fhealthreport.zju.edu.cn%252Fncov%252Fwap%252Fdefault%252Findex"
self.base_url = "https://healthreport.zju.edu.cn/ncov/wap/default/index"
self.save_url = "https://healthreport.zju.edu.cn/ncov/wap/default/save"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
}
self.sess = requests.Session()
self.sess.keep_alive = False
def login(self):
"""Login to ZJU platform"""
res = self.sess.get(self.login_url, headers=self.headers)
execution = re.search(
'name="execution" value="(.*?)"', res.text).group(1)
res = self.sess.get(
url='https://zjuam.zju.edu.cn/cas/v2/getPubKey', headers=self.headers).json()
n, e = res['modulus'], res['exponent']
encrypt_password = self._rsa_encrypt(self.password, e, n)
data = {
'username': self.username,
'password': encrypt_password,
'execution': execution,
'_eventId': 'submit'
}
res = self.sess.post(url=self.login_url, data=data, headers=self.headers)
# check if login successfully
if '统一身份认证' in res.content.decode():
raise LoginError('登录失败,请核实账号密码重新登录')
return self.sess
def post(self):
"""Post the hitcard info"""
res = self.sess.post(self.save_url, data=self.info, headers=self.headers)
return json.loads(res.text)
def get_date(self):
"""Get current date"""
today = datetime.date.today()
return "%4d%02d%02d" % (today.year, today.month, today.day)
def get_info(self, html=None):
"""Get hitcard info, which is the old info with updated new time."""
if not html:
res = self.sess.get(self.base_url, headers=self.headers)
html = res.content.decode()
try:
old_infos = re.findall(r'oldInfo: ({[^\n]+})', html)
if len(old_infos) != 0:
old_info = json.loads(old_infos[0])
else:
raise RegexMatchError("未发现缓存信息,请先至少手动成功打卡一次再运行脚本")
new_info_tmp = json.loads(re.findall(r'def = ({[^\n]+})', html)[0])
new_id = new_info_tmp['id']
name = re.findall(r'realname: "([^\"]+)",', html)[0]
number = re.findall(r"number: '([^\']+)',", html)[0]
except IndexError:
raise RegexMatchError('Relative info not found in html with regex')
except json.decoder.JSONDecodeError:
raise DecodeError('JSON decode error')
new_info = old_info.copy()
print(old_info)
new_info['id'] = new_id
new_info['name'] = name
new_info['number'] = number
new_info["date"] = self.get_date()
new_info["created"] = round(time.time())
# form change
new_info['jrdqtlqk[]'] = 0
new_info['jrdqjcqk[]'] = 0
new_info['sfsqhzjkk'] = 1 # 是否申领杭州健康码
new_info['sqhzjkkys'] = 1 # 杭州健康吗颜色,1:绿色 2:红色 3:黄色
new_info['sfqrxxss'] = 1 # 是否确认信息属实
new_info['jcqzrq'] = ""
new_info['gwszdd'] = ""
new_info['szgjcs'] = ""
self.info = new_info
return new_info
def _rsa_encrypt(self, password_str, e_str, M_str):
password_bytes = bytes(password_str, 'ascii')
password_int = int.from_bytes(password_bytes, 'big')
e_int = int(e_str, 16)
M_int = int(M_str, 16)
result_int = pow(password_int, e_int, M_int)
return hex(result_int)[2:].rjust(128, '0')
# Exceptions
class LoginError(Exception):
"""Login Exception"""
pass
class RegexMatchError(Exception):
"""Regex Matching Exception"""
pass
class DecodeError(Exception):
"""JSON Decode Exception"""
pass
def send_mail(receivers, statu, content):
import smtplib
from email.mime.text import MIMEText
# 设置服务器所需信息
# 邮箱服务器地址
mail_host = 'smtp.qq.com'
# 用户名
mail_user = ''
# 密码(部分邮箱为授权码)
mail_pass = ''
# 邮件发送方邮箱地址
sender = ''
# 邮件接受方邮箱地址,注意需要[]包裹,这意味着你可以写多个邮件地址群发
# receivers = ['59*****02@qq.com']
# 设置email信息
# 邮件内容设置
message = MIMEText(content, 'plain', 'utf-8')
# 邮件主题
message['Subject'] = '【ZJU健康打卡】' + str(statu)
# 发送方信息
message['From'] = sender
# 接受方信息
message['To'] = receivers[0]
# 登录并发送邮件
try:
# 连接到服务器
smtpObj = smtplib.SMTP_SSL(host=mail_host, port=465)
# 登录到服务器
smtpObj.login(mail_user, mail_pass)
# 发送
smtpObj.sendmail(
sender, receivers, message.as_string())
# 退出
smtpObj.quit()
print('success')
except smtplib.SMTPException as e:
print('error', e) # 打印错误
def main(username, password, useremail):
"""Hit card process
Arguments:
username: (str) 浙大统一认证平台用户名(一般为学号)
password: (str) 浙大统一认证平台密码
"""
print("\n[Time] %s" %
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print("🚌 打卡任务启动")
dk = DaKa(username, password)
print("登录到浙大统一身份认证平台...")
try:
dk.login()
print("已登录到浙大统一身份认证平台")
except Exception as err:
print(str(err))
send_mail([useremail], '失败', str(err))
raise Exception
print('正在获取个人信息...')
try:
new_info = dk.get_info()
print('已成功获取个人信息')
except Exception as err:
print('获取信息失败,请手动打卡,更多信息: ' + str(err))
send_mail([useremail], '失败', err)
raise Exception
print('正在为您打卡打卡打卡')
try:
res = dk.post()
if str(res['e']) == '0':
print('已为您打卡成功!')
send_mail([useremail], '成功', '成功打卡 ' + str(new_info))
else:
print(res['m'])
send_mail([useremail], '失败', res['m'])
except Exception:
print('数据提交失败')
send_mail([useremail], '失败', '数据提交失败')
raise Exception
if __name__ == "__main__":
user_list = [('user1', 'passwd', 'receiver@email.com'), ('user2', 'passwd', 'receiver@email.com')]
for user in user_list:
try:
main(user[0], user[1], user[2])
except Exception:
pass