forked from n3d1117/chatgpt-telegram-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtelegram_bot.py
187 lines (157 loc) · 7.9 KB
/
telegram_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import asyncio
import logging
import telegram.constants as constants
from httpx import HTTPError
from revChatGPT.revChatGPT import AsyncChatbot as ChatGPT3Bot
from telegram import Update, Message
from telegram.error import RetryAfter, BadRequest
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters
class ChatGPT3TelegramBot:
"""
Class representing a Chat-GPT3 Telegram Bot.
"""
def __init__(self, config: dict, gpt3_bot: ChatGPT3Bot):
"""
Initializes the bot with the given configuration and GPT-3 bot object.
:param config: A dictionary containing the bot configuration
:param gpt3_bot: The GPT-3 bot object
"""
self.config = config
self.gpt3_bot = gpt3_bot
self.disallowed_message = "Sorry, you are not allowed to use this bot. You can check out the source code at " \
"https://github.com/n3d1117/chatgpt-telegram-bot"
async def help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Shows the help menu.
"""
await update.message.reply_text("/start - Start the bot\n"
"/reset - Reset conversation\n"
"/help - Help menu\n\n"
"Open source at https://github.com/n3d1117/chatgpt-telegram-bot",
disable_web_page_preview=True)
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Handles the /start command.
"""
if not self.is_allowed(update):
logging.info(f'User {update.message.from_user.name} is not allowed to start the bot')
await self.send_disallowed_message(update, context)
return
logging.info('Bot started')
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a Chat-GPT3 Bot, please talk to me!")
async def reset(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Resets the conversation.
"""
if not self.is_allowed(update):
logging.info(f'User {update.message.from_user.name} is not allowed to reset the bot')
await self.send_disallowed_message(update, context)
return
logging.info('Resetting the conversation...')
self.gpt3_bot.reset_chat()
await context.bot.send_message(chat_id=update.effective_chat.id, text="Done!")
async def send_typing_periodically(self, update: Update, context: ContextTypes.DEFAULT_TYPE, every_seconds: float):
"""
Sends the typing action periodically to the chat
"""
while True:
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.TYPING)
await asyncio.sleep(every_seconds)
async def prompt(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
React to incoming messages and respond accordingly.
"""
if not self.is_allowed(update):
logging.info(f'User {update.message.from_user.name} is not allowed to use the bot')
await self.send_disallowed_message(update, context)
return
logging.info(f'New message received from user {update.message.from_user.name}')
# Send "Typing..." action periodically every 4 seconds until the response is received
typing_task = context.application.create_task(
self.send_typing_periodically(update, context, every_seconds=4)
)
if self.config['use_stream']:
initial_message: Message or None = None
chunk_index, chunk_text = (0, '')
async def message_update(every_seconds: float):
"""
Edits the `initial_message` periodically with the updated text from the latest chunk
"""
while True:
try:
if initial_message is not None and chunk_text != initial_message.text:
await initial_message.edit_text(chunk_text)
except (BadRequest, HTTPError, RetryAfter):
# Ignore common errors while editing the message
pass
except Exception as e:
logging.info(f'Error while editing the message: {str(e)}')
await asyncio.sleep(every_seconds)
# Start task to update the initial message periodically every 0.5 seconds
# If you're frequently hitting rate limits, increase this interval
message_update_task = context.application.create_task(message_update(every_seconds=0.5))
# Stream the response
async for chunk in await self.gpt3_bot.get_chat_response(update.message.text, output='stream'):
if chunk_index == 0 and initial_message is None:
# Sends the initial message, to be edited later with updated text
initial_message = await context.bot.send_message(
chat_id=update.effective_chat.id,
reply_to_message_id=update.message.message_id,
text=chunk['message']
)
typing_task.cancel()
chunk_index, chunk_text = (chunk_index + 1, chunk['message'])
message_update_task.cancel()
# Final edit, including Markdown formatting
await initial_message.edit_text(chunk_text, parse_mode=constants.ParseMode.MARKDOWN)
else:
response = await self.get_chatgpt_response(update.message.text)
typing_task.cancel()
await context.bot.send_message(
chat_id=update.effective_chat.id,
reply_to_message_id=update.message.message_id,
text=response['message'],
parse_mode=constants.ParseMode.MARKDOWN
)
async def get_chatgpt_response(self, message) -> dict:
"""
Gets the response from the ChatGPT APIs.
"""
try:
response = await self.gpt3_bot.get_chat_response(message)
return response
except Exception as e:
logging.info(f'Error while getting the response: {str(e)}')
return {"message": "I'm having some trouble talking to you, please try again later."}
async def send_disallowed_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Sends the disallowed message to the user.
"""
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=self.disallowed_message,
disable_web_page_preview=True
)
async def error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles errors in the telegram-python-bot library.
"""
logging.debug(f'Exception while handling an update: {context.error}')
def is_allowed(self, update: Update) -> bool:
"""
Checks if the user is allowed to use the bot.
"""
if self.config['allowed_user_ids'] == '*':
return True
return str(update.message.from_user.id) in self.config['allowed_user_ids'].split(',')
def run(self):
"""
Runs the bot indefinitely until the user presses Ctrl+C
"""
application = ApplicationBuilder().token(self.config['token']).build()
application.add_handler(CommandHandler('start', self.start, block=False))
application.add_handler(CommandHandler('reset', self.reset, block=False))
application.add_handler(CommandHandler('help', self.help, block=False))
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), self.prompt, block=False))
application.add_error_handler(self.error_handler)
application.run_polling()