-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from sloth-ontabasco/rewrite
Rewrite
- Loading branch information
Showing
15 changed files
with
505 additions
and
156 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
logs/ | ||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
*.py[cod] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
import io | ||
import os | ||
from traceback import TracebackException | ||
import config | ||
import dotenv | ||
import sys | ||
|
||
import logging | ||
from logging.handlers import TimedRotatingFileHandler | ||
|
||
import discord | ||
from discord.ext import commands | ||
from discord import app_commands | ||
|
||
from contextlib import contextmanager, suppress | ||
|
||
logger = logging.getLogger("Bot") | ||
|
||
@contextmanager | ||
def log_setup(): | ||
""" | ||
Context manager that sets up file logging | ||
""" | ||
try: | ||
dotenv.load_dotenv() | ||
logging.getLogger("discord").setLevel(logging.INFO) | ||
logging.getLogger("discord.http").setLevel(logging.INFO) | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.DEBUG) | ||
dtfmt = "%Y-%m-%d %H:%M:%S" | ||
if not os.path.isdir("logs/"): | ||
os.mkdir("logs/") | ||
|
||
# Add custom logging handlers like rich, maybe in the future?? | ||
handlers = [ | ||
TimedRotatingFileHandler(filename="logs/bot.log", when="d", interval=5), | ||
logging.StreamHandler(sys.stdout) | ||
] | ||
|
||
fmt = logging.Formatter( | ||
"[{asctime}] [{levelname:<7}] {name}: {message}", dtfmt, style="{" | ||
) | ||
|
||
for handler in handlers: | ||
handler.setFormatter(fmt) | ||
logger.addHandler(handler) | ||
|
||
yield | ||
finally: | ||
handlers = logger.handlers[:] | ||
for handler in handlers: | ||
handler.close() | ||
logger.removeHandler(handler) | ||
|
||
class BotTree(app_commands.CommandTree): | ||
""" | ||
Subclass of app_commands.CommandTree to define the behavior for the bot's slash command tree. | ||
Handles thrown errors within the tree and interactions between all commands | ||
""" | ||
|
||
async def log_to_channel(self, interaction: discord.Interaction, err: Exception): | ||
""" | ||
Log error to discord channel defined in config.py | ||
""" | ||
|
||
channel = await interaction.client.fetch_channel(config.DEV_LOGS_CHANNEL) | ||
traceback_txt = "".join(TracebackException.from_exception(err).format()) | ||
file = discord.File( | ||
io.BytesIO(traceback_txt.encode()), | ||
filename=f"{type(err)}.txt" | ||
) | ||
|
||
embed = discord.Embed( | ||
title="Unhandled Exception Alert", | ||
description=f""" | ||
Invoked Channel: {interaction.channel.name} | ||
\nInvoked User: {interaction.user.display_name} | ||
\n```{traceback_txt[2000:].strip()}``` | ||
""" | ||
) | ||
|
||
await channel.send(embed=embed, file=file) | ||
|
||
async def on_error( | ||
self, interaction: discord.Interaction, error: app_commands.AppCommandError | ||
): | ||
"""Handles errors thrown within the command tree""" | ||
try: | ||
await self.log_to_channel(interaction, error) | ||
except Exception as e: | ||
await super().on_error(interaction, e) | ||
|
||
|
||
class IITMBot(commands.AutoShardedBot): | ||
""" | ||
Main bot. invoked in runner (main.py) | ||
""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
@classmethod | ||
def _use_default(cls, *args): | ||
""" | ||
Create an instance of IITMBot with base configuration | ||
""" | ||
|
||
intents = discord.Intents.all() | ||
activity = discord.Activity( | ||
type=discord.ActivityType.watching, name=config.DEFAULT_ACTIVITY_TEXT | ||
) | ||
|
||
x = cls( | ||
command_prefix=config.BOT_PREFIX, | ||
intents=intents, | ||
owner_id=config.OWNER_ID, | ||
activity=activity, | ||
tree_cls=BotTree | ||
) | ||
return x | ||
|
||
|
||
async def load_extensions(self, *args): | ||
for filename in os.listdir("cogs/"): | ||
if filename.endswith(".py"): | ||
logger.info(f"Trying to load cogs.{filename[:-3]}") | ||
try: | ||
await self.load_extension(f"cogs.{filename[:-3]}") | ||
logger.info(f"Loaded cogs.{filename[:-3]}") | ||
except Exception as e: | ||
logger.error(f"cogs.{filename[:-3]} failed to load: {e}") | ||
|
||
async def close(self): | ||
""" | ||
Clean exit from discord and aiohttps sessions (maybe for bottle in future?) | ||
""" | ||
for ext in list(self.extensions): | ||
with suppress(Exception): | ||
await self.unload_extension(ext) | ||
|
||
for cog in list(self.cogs): | ||
with suppress(Exception): | ||
await self.remove_cog(cog) | ||
|
||
await super().close() | ||
|
||
async def on_ready(self): | ||
logger.info("Logged in as") | ||
logger.info(f"\tUser: {self.user.name}") | ||
logger.info(f"\tID : {self.user.id}") | ||
logger.info("------") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import io | ||
import logging | ||
import math | ||
import textwrap | ||
import traceback | ||
import logging | ||
import config | ||
|
||
import discord | ||
from discord.ext.commands.errors import ExtensionNotFound | ||
from discord.ext import commands | ||
|
||
from contextlib import redirect_stdout | ||
|
||
|
||
class Dev(commands.Cog): | ||
def __init__(self, bot): | ||
self.logger = logging.getLogger("Dev") | ||
self.bot = bot | ||
|
||
@commands.Cog.listener() | ||
async def on_ready(self): | ||
self.logger.info("Loaded Dev") | ||
|
||
def cleanup_code(self, content: str): | ||
""" | ||
Remove code-block from eval | ||
""" | ||
if content.startswith("```") and content.endswith("```"): | ||
return "\n".join(content.split("\n")[1:-1]) | ||
|
||
return content.strip("`\n") | ||
|
||
def get_syntax_error(self, e): | ||
if e.text is None: | ||
return f"```py\n{e.__class__.__name__}: {e}\n```" | ||
return f'```py\n{e.text}{"^":>{e.offset}}\n{e.__class__.__name__}: {e}```' | ||
|
||
@commands.is_owner() | ||
@commands.command(pass_context=True, name="eval") | ||
async def eval(self, ctx: commands.Context, *, body: str): | ||
"""Evaluates a code""" | ||
env = { | ||
"bot": self.bot, | ||
"ctx": ctx, | ||
"channel": ctx.channel, | ||
"author": ctx.author, | ||
"guild": ctx.guild, | ||
"message": ctx.message, | ||
"self": self, | ||
"math": math, | ||
} | ||
|
||
env.update(globals()) | ||
|
||
body = self.cleanup_code(body) | ||
stdout = io.StringIO() | ||
|
||
to_compile = f'async def func():\n{textwrap.indent(body, " ")}' | ||
|
||
try: | ||
exec(to_compile, env) | ||
except Exception as e: | ||
return await ctx.send(f"```py\n{e.__class__.__name__}: {e}\n```") | ||
|
||
func = env["func"] | ||
try: | ||
with redirect_stdout(stdout): | ||
ret = await func() | ||
except Exception: | ||
value = stdout.getvalue() | ||
await ctx.send(f"```py\n{value}{traceback.format_exc()}\n```") | ||
else: | ||
value = stdout.getvalue() | ||
try: | ||
await ctx.message.add_reaction("\N{THUMBS UP SIGN}") | ||
except Exception as _: | ||
await ctx.message.add_reaction("\N{THUMBS DOWN SIGN}") | ||
pass | ||
|
||
if ret is None: | ||
self.logger.info(f"Output chars: {len(str(value))}") | ||
if value: | ||
if len(str(value)) >= 2000: | ||
await ctx.send( | ||
f"Returned over 2k chars, sending as file instead.\n" | ||
f"(first 1.5k chars for quick reference)\n" | ||
f"```py\n{value[0:1500]}\n```", | ||
file=discord.File( | ||
io.BytesIO(value.encode()), filename="output.txt" | ||
), | ||
) | ||
else: | ||
await ctx.send(f"```py\n{value}\n```") | ||
else: | ||
self.logger.info(f"Output chars: {len(str(value)) + len(str(ret))}") | ||
self._last_result = ret | ||
if len(str(value)) + len(str(ret)) >= 2000: | ||
await ctx.send( | ||
f"Returned over 2k chars, sending as file instead.\n" | ||
f"(first 1.5k chars for quick reference)\n" | ||
f'```py\n{f"{value}{ret}"[0:1500]}\n```', | ||
file=discord.File( | ||
io.BytesIO(f"{value}{ret}".encode()), filename="output.txt" | ||
), | ||
) | ||
else: | ||
await ctx.send(f"```py\n{value}{ret}\n```") | ||
|
||
@commands.is_owner() | ||
@commands.command(name="reload", hidden=True) | ||
async def reload(self, ctx: commands.Context, *, module_name: str): | ||
"""Reload a module""" | ||
try: | ||
try: | ||
await self.bot.unload_extension(module_name) | ||
except discord.ext.commands.errors.ExtensionNotLoaded as enl: | ||
await ctx.send(f"Module not loaded. Trying to load it.", delete_after=6) | ||
|
||
await self.bot.load_extension(module_name) | ||
await ctx.send("Module Loaded") | ||
|
||
except ExtensionNotFound as enf: | ||
await ctx.send( | ||
f"Module not found. Possibly, wrong module name provided.", | ||
delete_after=10, | ||
) | ||
except Exception as e: | ||
self.logger.error("Unable to load module.") | ||
self.logger.error("{}: {}".format(type(e).__name__, e)) | ||
|
||
@commands.command(hidden=True) | ||
async def kill(self, ctx: commands.Context): | ||
"""Kill the bot""" | ||
await ctx.send("Bravo 6, going dark o7") | ||
await self.bot.close() | ||
|
||
@commands.command() | ||
@commands.is_owner() | ||
async def sync_apps(self, ctx: commands.Context): | ||
|
||
await ctx.bot.tree.sync(guild=discord.Object(config.PRIMARY_GUILD_ID)) | ||
await ctx.reply("Synced local guild commands") | ||
|
||
@commands.command() | ||
@commands.is_owner() | ||
async def clear_apps(self, ctx: commands.Context): | ||
|
||
ctx.bot.tree.clear_commands(guild=discord.Object(config.PRIMARY_GUILD_ID)) | ||
ctx.bot.tree.clear_commands(guild=None) | ||
await ctx.bot.tree.sync(guild=discord.Object(config.PRIMARY_GUILD_ID)) | ||
await ctx.bot.tree.sync() | ||
|
||
await ctx.send("cleared all commands") | ||
|
||
|
||
async def setup(bot): | ||
await bot.add_cog(Dev(bot)) |
Oops, something went wrong.