-
Notifications
You must be signed in to change notification settings - Fork 95
/
ssh.py
276 lines (233 loc) · 11.1 KB
/
ssh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import json
import os
import asyncio
import asyncssh
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
import time
import re
from translations import get_translation
from language_manager import language_manager
# 存储 SSH 会话和超时任务
ssh_sessions = {}
ssh_timeouts = {}
is_command_running = {}
def get_accounts():
ACCOUNTS_JSON = os.getenv('ACCOUNTS_JSON')
return json.loads(ACCOUNTS_JSON) if ACCOUNTS_JSON else []
def is_ssh_connected(chat_id):
return chat_id in ssh_sessions and not ssh_sessions[chat_id]['conn']._transport.is_closing()
async def handle_ssh_output(stdout, update, timeout=10):
output_buffer = ""
prompt_pattern = r'.*[$#]\s*$' # 修改提示符模式以匹配更广泛的情况
start_time = time.time()
command_output = []
prompt = None
last_sent_index = 0
while True:
current_time = time.time()
try:
chunk = await asyncio.wait_for(stdout.read(4096), timeout=0.1)
output_buffer += chunk
lines = output_buffer.split('\n')
full_lines = lines[:-1]
output_buffer = lines[-1]
for line in full_lines:
clean_line = clean_ansi_escape_sequences(line)
command_output.append(clean_line)
last_line = clean_ansi_escape_sequences(output_buffer)
if re.match(prompt_pattern, last_line):
if command_output[last_sent_index:]:
await update.message.reply_text('\n'.join(command_output[last_sent_index:]))
await update.message.reply_text(last_line)
prompt = last_line
break
if current_time - start_time > 2: # 每2秒输出一次
if command_output[last_sent_index:]:
await update.message.reply_text('\n'.join(command_output[last_sent_index:]))
last_sent_index = len(command_output)
start_time = current_time
except asyncio.TimeoutError:
if current_time - start_time > timeout:
if command_output[last_sent_index:]:
await update.message.reply_text('\n'.join(command_output[last_sent_index:]))
last_sent_index = len(command_output)
start_time = current_time
await asyncio.sleep(0.1)
return prompt
async def connect_to_host(update: Update, context: ContextTypes.DEFAULT_TYPE, host_info):
chat_id = update.effective_chat.id
ssluser = host_info.get('ssluser') or host_info.get('username')
password = host_info.get('password')
sslhost = host_info.get('sslhost') or host_info.get('hostname')
customhostname = host_info.get('customhostname', '').lower()
secret_key_path = host_info.get('secretkey')
port = host_info.get('port', 22) # 默认为 22 如果未指定
try:
await update.message.reply_text(get_translation('connecting_to_host').format(host=f"{customhostname + ': ' if customhostname else ''}{ssluser}@{sslhost}:{port}"))
client_keys = [secret_key_path] if secret_key_path else None
conn = await asyncssh.connect(
sslhost,
port=port,
username=ssluser,
password=password,
client_keys=client_keys,
known_hosts=None # 禁用主机密钥检查
)
stdin, stdout, stderr = await conn.open_session(term_type='xterm')
# 使用 50 秒超时逻辑
try:
prompt = await asyncio.wait_for(handle_ssh_output(stdout, update), timeout=50)
except asyncio.TimeoutError:
await update.message.reply_text(get_translation('SSH_CONNECTION_TIMEOUT'))
conn.close()
return
ssh_sessions[chat_id] = {
'conn': conn,
'stdin': stdin,
'stdout': stdout,
'stderr': stderr,
'prompt': prompt
}
# 启动 SSH 超时任务
ssh_timeouts[chat_id] = asyncio.create_task(timeout_ssh_session(context.bot, chat_id))
except (asyncssh.Error, OSError) as exc:
await update.message.reply_text(get_translation('connection_failed').format(error=str(exc)))
async def handle_ssh_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_id = update.effective_chat.id
if len(context.args) == 0:
# 显示可用主机列表
accounts = get_accounts()
host_list = []
for account in accounts:
customhostname = account.get('customhostname', '').lower()
ssluser = account.get('ssluser') or account.get('username')
sslhost = account.get('sslhost') or account.get('hostname')
port = account.get('port', 22) # 获取端口,默认为 22
host_list.append(f"{customhostname + ': ' if customhostname else ''}{ssluser}@{sslhost}:{port}")
message = get_translation('available_hosts').format(hosts="\n".join(host_list))
message += "\n\n" + get_translation('ssh_usage')
await update.message.reply_text(message)
elif len(context.args) == 1:
host_identifier = context.args[0]
accounts = get_accounts()
# 检查是否是预定义主机
account = next((acc for acc in accounts if
acc.get('customhostname', '').lower() == host_identifier.lower() or
f"{acc.get('ssluser') or acc.get('username')}@{acc.get('sslhost') or acc.get('hostname')}".lower() == host_identifier.lower() or
f"{acc.get('ssluser') or acc.get('username')}@{acc.get('sslhost') or acc.get('hostname')}:{acc.get('port', 22)}".lower() == host_identifier.lower()
), None)
if account:
# 连接预定义主机
if chat_id in ssh_sessions:
await update.message.reply_text(get_translation('active_ssh_session'))
else:
await connect_to_host(update, context, account)
else:
# 处理自定义主机连接
if '@' in host_identifier:
ssluser, host_part = host_identifier.split('@')
if ':' in host_part:
sslhost, port = host_part.split(':')
try:
port = int(port)
except ValueError:
await update.message.reply_text(get_translation('invalid_port_format'))
return
else:
sslhost = host_part
port = 22
context.user_data['awaiting_ssh_password'] = {
'ssluser': ssluser,
'sslhost': sslhost,
'port': port
}
await update.message.reply_text(get_translation('enter_password'))
else:
await update.message.reply_text(get_translation('invalid_host_format'))
else:
await update.message.reply_text(get_translation('ssh_usage'))
async def handle_password_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_id = update.effective_chat.id
if 'awaiting_ssh_password' in context.user_data:
password = update.message.text
host_info = context.user_data['awaiting_ssh_password']
host_info['password'] = password
del context.user_data['awaiting_ssh_password']
# 删除密码消息
await update.message.delete()
await connect_to_host(update, context, host_info)
async def handle_exit_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_id = update.effective_chat.id
if chat_id in ssh_sessions:
if chat_id in is_command_running and is_command_running[chat_id]:
# 如果有命令正在执行,我们需要强制关闭连接
ssh_sessions[chat_id]['conn'].close()
await update.message.reply_text(get_translation('ssh_force_disconnected'))
else:
ssh_sessions[chat_id]['conn'].close()
await update.message.reply_text(get_translation('ssh_disconnected'))
del ssh_sessions[chat_id]
if chat_id in ssh_timeouts:
ssh_timeouts[chat_id].cancel()
del ssh_timeouts[chat_id]
if chat_id in is_command_running:
del is_command_running[chat_id]
else:
await update.message.reply_text(get_translation('no_active_ssh'))
async def start_ssh_timeout(bot, chat_id):
if chat_id in ssh_timeouts:
ssh_timeouts[chat_id].cancel()
ssh_timeouts[chat_id] = asyncio.create_task(timeout_ssh_session(bot, chat_id))
async def timeout_ssh_session(bot, chat_id):
await asyncio.sleep(900) # 15 minutes timeout
if chat_id in ssh_sessions:
session = ssh_sessions[chat_id]
session['conn'].close()
del ssh_sessions[chat_id]
if chat_id in ssh_timeouts:
del ssh_timeouts[chat_id]
if chat_id in is_command_running:
del is_command_running[chat_id]
await bot.send_message(chat_id=chat_id, text=get_translation('ssh_session_timeout'))
async def handle_ssh_command_execution(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_id = update.effective_chat.id
if chat_id in ssh_sessions:
command = update.message.text
session = ssh_sessions[chat_id]
conn = session['conn']
stdin = session['stdin']
stdout = session['stdout']
stderr = session['stderr']
if command.lower() == '/exit':
await handle_exit_command(update, context)
return
if chat_id in is_command_running and is_command_running[chat_id]:
await update.message.reply_text(get_translation('command_execution_in_progress'))
return
try:
is_command_running[chat_id] = True
await stdin.drain()
stdin.write(command + '\n')
await stdin.drain()
prompt = await asyncio.wait_for(handle_ssh_output(stdout, update), timeout=50)
ssh_sessions[chat_id]['prompt'] = prompt
is_command_running[chat_id] = False
# 重启 SSH 超时任务
await start_ssh_timeout(context.bot, chat_id)
except (asyncssh.Error, OSError) as exc:
await update.message.reply_text(get_translation('command_execution_failed').format(error=str(exc)))
else:
await update.message.reply_text(get_translation('no_active_ssh'))
def clean_ansi_escape_sequences(text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def main() -> None:
application = Application.builder().token(os.getenv('TELEGRAM_BOT_TOKEN')).build()
application.add_handler(CommandHandler('ssh', handle_ssh_command))
application.add_handler(CommandHandler('exit', handle_exit_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_ssh_command_execution))
application.add_handler(MessageHandler(filters.TEXT & filters.Regex(r'^\S+@\S+\.\S+$'), handle_password_input))
application.run_polling()
if __name__ == '__main__':
main()