forked from QAbot-zh/query-key-app
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
201 lines (179 loc) · 9.24 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import requests, re, os
import datetime
from flask import Flask, request, render_template, jsonify
from werkzeug.middleware.proxy_fix import ProxyFix
import time
import concurrent.futures
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/submit', methods=['POST'])
def submit():
api_url_lazy, api_key_lazy = None,None
api_info = request.form.get('api_info')
api_key_head = request.form.get('api_key_head')
model_health_check = request.form.get('model_health_check') == 'on'
model_timeout = int(request.form.get('model_timeout', 10)) # 默认超时时间为10秒
model_concurrency = int(request.form.get('model_concurrency', 5)) # 默认并发数为5
if api_info:
# 使用正则表达式提取接口地址和API密钥
url_pattern = r'(https?://[^\s,。、!,;;\n]+)'
api_key_head = api_key_head or "sk-"
key_pattern = fr'({re.escape(api_key_head)}[a-zA-Z0-9_]+)'
api_url_match = re.search(url_pattern, api_info)
api_key_match = re.search(key_pattern, api_info)
if api_url_match and api_key_match:
api_url_lazy = api_url_match.group(0)
api_key_lazy = api_key_match.group(0)
action = request.form['action']
api_url = request.form['api_url'].strip().rstrip('/')
api_key = request.form['api_key'].strip()
if not api_url or not api_key:
if not api_url_lazy or not api_key_lazy:
error = "未提取到正确的接口地址和密钥"
return render_template('index.html', error=error)
else:
api_url, api_key = api_url_lazy, api_key_lazy
# 使用正则表达式移除以 /v1 起始的部分
base_url = re.sub(r'^/v1.*', '', api_url)
base_url = base_url.strip().rstrip('/')
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
if action == '拉取模型列表':
model_url = "{}/v1/models".format(base_url)
test_results = {
"available_chat_models": [],
"inconsistent_chat_models": [],
"unavailable_chat_models": [],
"not_chat_models": []
}
try:
response = requests.get(model_url, headers=headers, timeout=model_timeout)
response_json = response.json()
if not model_health_check:
response_text = ",".join([item['id'] for item in response_json['data']])
else:
not_chat_pattern = r'^(dall|mj|midjourney|stable-diffusion|playground|flux|swap_face|tts|whisper|text|emb|luma|vidu|pdf|suno|pika|chirp|domo|runway|cogvideo)'
waiting_test_models = []
for item in response_json['data']:
model_name = item['id']
if re.match(not_chat_pattern, model_name) or any(keyword in model_name for keyword in ["image","audio","video","music","pdf","flux","suno","embed"]):
test_results["not_chat_models"].append({
"status": "未校验模型",
"model_name": model_name,
"response_time": "-",
"remarks": "非chat模型暂不进行校验\n(image\\audio\\video\music...)"
})
else:
waiting_test_models.append(model_name)
with concurrent.futures.ThreadPoolExecutor(max_workers=model_concurrency) as executor:
future_to_model = {
executor.submit(test_one_model, api_url, api_key, model, model_timeout): model
for model in waiting_test_models
}
for future in concurrent.futures.as_completed(future_to_model):
model_name, response = future.result()
if (response.status_code == 200 or response.status_code == 201) and "error" not in response.json():
output_model = response.json()["model"]
if model_name == output_model:
test_results["available_chat_models"].append({
"status": "模型一致可用",
"model_name": model_name,
"response_time": "{:.2f}".format(response.elapsed.total_seconds()),
"remarks": "状态良好"
})
else:
test_results["inconsistent_chat_models"].append({
"status": "模型可用但不一致<br>(即返回模型名称与测试模型名称不一致,注意甄别真假或模型重映射)",
"model_name": model_name,
"response_time": "{:.2f}".format(response.elapsed.total_seconds()),
"remarks": "返回模型名称:{}".format(output_model)
})
else:
test_results["unavailable_chat_models"].append({
"status": "模型不可用!!!",
"model_name": model_name,
"response_time": "-",
"remarks": "超时或无响应"
})
return render_template('index.html', test_results=test_results, api_info=api_info, api_url=api_url, api_key=api_key, api_key_head=api_key_head)
except:
response_text = "无法获取模型列表,api 接口可能存在问题"
return render_template('index.html', response_text=response_text, api_info=api_info, api_url=api_url, api_key=api_key, api_key_head=api_key_head)
elif action == '检查额度':
# 获取总额度
quota_url = "{}/dashboard/billing/subscription".format(base_url)
try:
response = requests.get(quota_url, headers=headers, timeout=model_timeout)
response_json = response.json()
quota_info = response_json.get('hard_limit_usd', 0)
except:
quota_info = '无法获得额度信息,api 接口可能存在问题'
# 获取使用情况
today = datetime.datetime.now()
year, month, day = today.year, today.month, today.day
start_date = "{}-{:02d}-01".format(year, month)
end_date = "{}-{:02d}-{}".format(year, month, day)
usage_url = "{}/dashboard/billing/usage?start_date={}&end_date={}".format(base_url, start_date, end_date)
try:
response = requests.get(usage_url, headers=headers, timeout=model_timeout)
response_json = response.json()
used_info = response_json.get('total_usage', 0)/100
except:
used_info = '无法获得已用额度信息,api 接口可能存在问题'
try:
remain_info = quota_info - used_info
except:
remain_info = 0
quota = {
"available": format_value(remain_info),
"used": format_value(used_info),
"total": format_value(quota_info)
}
return render_template('index.html', quota=quota, api_info=api_info, api_url=api_url, api_key=api_key, api_key_head=api_key_head)
def format_value(value):
try:
return "{:.2f} $".format(float(value))
except ValueError:
return str(value)
def test_one_model(api_url, api_key, model_name, model_timeout=10):
headers = {
'Authorization': 'Bearer {}'.format(api_key),
'Content-Type': 'application/json'
}
test_url = "{}/v1/chat/completions".format(api_url)
data = {
"model": model_name,
"messages": [
{"role": "user", "content": "say hi"}
],
"max_tokens": 2
}
try:
response = requests.post(test_url, headers=headers, json=data, timeout=model_timeout)
except Exception as e:
response = requests.Response() # 创建一个空的 Response 对象
response.status_code = 500 # 设置状态码为 500,表示服务器错误
response.reason = str(e) # 设置错误原因
response._content = b'{"error": "Model request failed"}' # 设置响应内容,可以自定义错误信息
return model_name,response
@app.route('/test_model', methods=['POST'])
def test_model():
api_url = request.json.get('api_url').strip().rstrip('/')
api_key = request.json.get('api_key').strip()
model_name = request.json.get('model_name').strip()
if not api_url or not api_key or not model_name:
return jsonify({"success": False, "message": "Missing API URL, API Key, or Model Name"}), 400
tic = time.time()
model_name,response = test_one_model(api_url, api_key, model_name)
duration = time.time() - tic
if response.status_code == 200 or response.status_code == 201:
return jsonify({"success": True, "message": "测试成功", "response_time": duration})
else:
return jsonify({"success": False, "message": response.text}), response.status_code
if __name__ == '__main__':
app.run(host='0.0.0.0', port=os.getenv("PORT", default=5000), debug=True)