-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbot.py
316 lines (262 loc) · 11.3 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
from __future__ import annotations
import os
import time
import data
import logging
import aiohttp
import discord
import traceback
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
import typing
from difflib import SequenceMatcher
from collections import Counter
from discord.ext import commands
from slash import *
from cogs.translate import translate_ctx_menu
from pymongo import MongoClient
from pymongo.server_api import ServerApi
import os
from dotenv import load_dotenv; load_dotenv()
EXTENSIONS: List[str] = [
'cogs.translate',
'cogs.interactions',
'cogs.afk',
'cogs.embed',
'cogs.role',
'cogs.welcomer',
'cogs.news',
'cogs.misc',
'cogs.quiz',
'cogs.fun',
'cogs.anime',
'cogs.help',
'cogs.auto',
'cogs.user',
'cogs.snipe',
'cogs.mod',
'cogs.meta',
'cogs.goblet',
'cogs.botto',
'cogs.prefix',
'cogs.reminder',
'cogs.audio',
'cogs.purge',
'cogs.vccontrol',
"cogs.gpt",
]
# Setup logging using discord's prebuilt logging
discord.utils.setup_logging()
class Bot(commands.AutoShardedBot):
def __init__(self) -> None:
super().__init__(
description=data.bot_description,
command_prefix=self.get_prefix,
allowed_mentions=discord.AllowedMentions(roles=False, everyone=False, users=True),
intents=discord.Intents.all(),
enable_debug_events=True,
case_insensitive=True
)
self.openai_key: str = os.getenv('OPENAI_KEY')
self.db = MongoClient(os.getenv("DATABASE"), server_api=ServerApi('1'))
self.start_time = time.time()
self.session: Optional[aiohttp.ClientSession] = None
self.command_stats: Counter[str] = Counter()
self.socket_stats: Counter[str] = Counter()
self.command_types_used: Counter[discord.ChannelType] = Counter()
self.logger: logging.Logger = logging.getLogger('bot')
self.logger.info("Bot instance initialized successfully")
async def get_prefix(
self,
message: discord.Message
) -> List[str]:
if message.guild:
prefix_doc: dict = self.db[str(message.guild.id)]["config"].find_one({"_id": "prefix"})
if not prefix_doc or not prefix_doc.get("prefix"):
self.db[str(message.guild.id)]["config"].update_one(
{"_id": "prefix"},
{"$set": {"prefix": ["?"]}},
upsert=True
)
return ["?"]
return prefix_doc["prefix"]
else:
return ["?"]
async def setup_hook(self) -> None:
self.session = aiohttp.ClientSession()
self.logger.info("aiohttp ClientSession created")
for extension in EXTENSIONS:
try:
await self.load_extension(extension)
self.logger.info(f"Successfully loaded extension: {extension}")
except Exception as e:
self.logger.error(f'Failed to load extension {extension}: {e}')
traceback.print_exc()
group_commands: List[commands.Group] = [
HolyGroup(name="holy", description="holy commands")
]
for group in group_commands:
try:
self.tree.add_command(group)
self.logger.info(f"Successfully added slash group: {group.name}")
except Exception as e:
self.logger.error(f'Failed to add slash group {group.name}: {e}')
try:
self.tree.context_menu(name='Translate')(translate_ctx_menu)
self.logger.info("Successfully added Translate context menu")
except Exception as e:
self.logger.error(f'Failed to add context_menu: {e}')
try:
self.tree.command(name="interactions", description="interact with a discord user through GIFs")(interactioncmd)
self.logger.info("Successfully added interactions and help slash commands")
except Exception as e:
self.logger.error(f'Failed to add slash command: {e}')
await self.change_presence(status=discord.Status.dnd)
self.logger.info("Bot presence changed to DND")
self.logger.info("Finished the setup_hook function")
async def on_ready(self) -> None:
try:
# only syncs in test guild for faster sync
# will remove later when it's production ready
guild: Optional[discord.Guild] = self.get_guild(1292422671480651856)
if guild:
self.tree.copy_global_to(guild=guild)
await self.tree.sync(guild=guild)
self.logger.info("Successfully synced commands to test guild")
except Exception as e:
self.logger.error(f'Failed to sync: {e}')
self.logger.info(f'Bot ready: {self.user} (ID: {self.user.id})')
async def on_command_completion(
self,
context: commands.Context
) -> None:
if not isinstance(context.command, (commands.HybridCommand, commands.HybridGroup)):
return
full_command_name: str = context.command.name
executed_command: str = full_command_name
if context.guild is not None:
self.logger.info(
f"Executed {executed_command} command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id})"
)
else:
self.logger.info(
f"Executed {executed_command} command by {context.author} (ID: {context.author.id}) in DMs"
)
async def on_command_error(
self,
context: commands.Context,
error: commands.CommandError
) -> None:
if isinstance(error, commands.CommandNotFound):
self.logger.warning(f"CommandNotFound error: {error}")
return
if isinstance(error, commands.MissingPermissions):
await context.reply(f"You are missing the permission(s) `{', '.join(error.missing_permissions)}` to execute this command!")
elif isinstance(error, commands.BotMissingPermissions):
await context.reply(f"I am missing the permission(s) `{', '.join(error.missing_permissions)}` to fully perform this command!")
elif isinstance(error, commands.MissingRequiredArgument):
command_name = context.command.qualified_name
params = [f"<{param}>" for param in context.command.clean_params]
prefixes = await self.get_prefix(context.message)
usage = f"{prefixes[0]}{command_name} {' '.join(params)}"
missing_param = str(error.param)
await context.reply(f"```\n{usage}\n```")
elif isinstance(error, commands.BadArgument):
await context.reply(f"{str(error)}")
elif isinstance(error, commands.MissingRole):
await context.reply("You are missing the required role to use this command.")
elif isinstance(error, commands.MissingAnyRole):
await context.reply("You are missing one of the required roles to use this command.")
elif isinstance(error, commands.NSFWChannelRequired):
await context.reply("This command can only be used in NSFW channels.")
elif isinstance(error, commands.BadUnionArgument):
await context.reply(f"Could not parse argument: {str(error)}")
elif isinstance(error, commands.BadLiteralArgument):
await context.reply(f"Invalid option. Allowed values are: {', '.join(map(str, error.literals))}")
else:
try:
owner = await self.fetch_user(876869802948452372)
embed = discord.Embed(
title="Bot Error",
description=f"An error occurred in the bot{'.' if not context else f': {context.command.brief}'}",
color=discord.Color.red(),
timestamp=discord.utils.utcnow()
)
embed.add_field(name="Error Type", value=type(error).__name__, inline=False)
embed.add_field(name="Error Message", value=str(error), inline=False)
if context.command:
embed.add_field(name="Command", value=context.command.name, inline=False)
if context:
embed.add_field(name="User", value=f"{context.author.mention}", inline=True)
embed.add_field(name="Channel", value=f"{context.channel.mention}", inline=False)
embed.add_field(name="Guild", value=f"{context.guild.name}\n{context.guild.id}" if context.guild else "DM", inline=False)
await owner.send(embed=embed)
except Exception as e:
print(f"Failed to send error message to owner: {e}")
self.logger.warning(f"Command error handled: {type(error).__name__}")
async def find_member(
self,
guild: discord.Guild,
query: str
) -> typing.Optional[discord.Member]:
if query.isdigit():
member = guild.get_member(int(query))
if member:
return member
members = await guild.query_members(query, limit=1)
if members:
return members[0]
query_lower = query.lower()
best_match = None
best_score = 0
for member in guild.members:
if query_lower in member.name.lower():
score = len(query_lower) / len(member.name)
if score > best_score:
best_match = member
best_score = score
if member.nick and query_lower in member.nick.lower():
score = len(query_lower) / len(member.nick)
if score > best_score:
best_match = member
best_score = score
return best_match
async def find_role(
self,
guild: discord.Guild,
query: str
) -> typing.Optional[discord.Role]:
def calculate_similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
best_match: typing.Optional[discord.Role] = None
best_score: float = 0
for role in guild.roles:
if query in str(role.id):
return role
role_name_score = calculate_similarity(query, role.name)
if role_name_score > best_score:
best_match = role
best_score = role_name_score
return best_match
async def on_command(
self,
ctx: commands.Context
) -> None:
self.command_stats[ctx.command.qualified_name] += 1
message: discord.Message = ctx.message
if isinstance(message.channel, discord.TextChannel):
self.command_types_used[message.channel.type] += 1
self.logger.info(f"Command '{ctx.command.qualified_name}' invoked")
async def close(
self
) -> None:
await super().close()
if self.session:
await self.session.close()
self.logger.info("Bot closed successfully")
async def start(
self
) -> None:
load_dotenv()
await super().start(os.getenv('TOKEN'))
self.logger.info("Bot started successfully")