-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
167 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
# Copyright (c) 2024 iiPython | ||
|
||
# Modules | ||
import sys | ||
import typing | ||
from time import time | ||
from base64 import b64encode | ||
from urllib.parse import quote_plus | ||
from platform import python_version | ||
|
||
from requests import Session | ||
|
||
from nightwatch import __version__ | ||
from nightwatch.bot import Client, Context | ||
from nightwatch.bot.client import AuthorizationFailed | ||
from nightwatch.logging import log | ||
|
||
# Create client | ||
class NextgenerationBot(Client): | ||
def __init__(self) -> None: | ||
super().__init__() | ||
|
||
# Extra data | ||
self.send_on_join = None | ||
self.session, self.cache = Session(), {"time": 0, "token": None} | ||
|
||
# Handle now playing | ||
def get_spotify_access(self) -> None: | ||
with self.session.post( | ||
"https://accounts.spotify.com/api/token", | ||
data = "grant_type=client_credentials", | ||
headers = { | ||
"Authorization": f"Basic {b64encode(b'3f974573800a4ff5b325de9795b8e603:ff188d2860ff44baa57acc79c121a3b9').decode()}", | ||
"Content-Type": "application/x-www-form-urlencoded" | ||
} | ||
) as response: | ||
self.cache["time"] = time() | ||
self.cache["token"] = response.json()["access_token"] | ||
|
||
log.info("test_bot", "Spotify access token has been regenerated!") | ||
|
||
def get_now_playing(self) -> tuple[dict | None, str | None]: | ||
|
||
# Should we regenerate our spotify token? | ||
if (time() - self.cache["time"]) >= 3550: # Not 3600, just for some wiggle room | ||
self.get_spotify_access() | ||
|
||
# Fetch actual data | ||
with self.session.get("https://api.listenbrainz.org/1/user/iiPython/playing-now") as response: | ||
result = (response.json())["payload"]["listens"] | ||
|
||
if not result: | ||
return None, None | ||
|
||
result = result[0] | ||
|
||
# Reorganize the result data | ||
tm = result["track_metadata"] | ||
result = {"artist": tm["artist_name"], "track": tm["track_name"], "album": tm["release_name"],} | ||
with self.session.get( | ||
f"https://api.spotify.com/v1/search?q={quote_plus(f'{result['artist']} {result['album']}')}&type=album&limit=1", | ||
headers = { | ||
"Authorization": f"Bearer {self.cache['token']}", | ||
"Content-Type": "application/x-www-form-urlencoded" | ||
} | ||
) as response: | ||
return result, (response.json())["albums"]["items"][0]["images"][-2]["url"] | ||
|
||
# Handle rejoining (for changing name or hex) | ||
async def rejoin(self, username: typing.Optional[str] = None, hex: typing.Optional[str] = None) -> None: | ||
await self.close() | ||
await self.event_loop(username or self.user.name, hex or self.user.hex, self.address) # type: ignore | ||
|
||
# Handle events | ||
async def on_connect(self, ctx: Context) -> None: | ||
log.info("test_bot", f"Connected to {ctx.rics.name}!") | ||
|
||
async def on_message(self, ctx: Context) -> None: | ||
if self.send_on_join is not None: | ||
await ctx.send(self.send_on_join) | ||
self.send_on_join = None | ||
return | ||
|
||
command = ctx.message.message | ||
if command[:2] != "p!": | ||
return | ||
|
||
log.info("test_bot", f"'{ctx.user.name}' ran '{command[2:]}'!") | ||
match command[2:].split(" "): | ||
case ["help"]: | ||
await ctx.reply("Commands: p!help, p!eval, p!music, p!user, p!people, p!rename, p!set-hex, p!version") | ||
|
||
case ["eval", *expression]: | ||
to_evaluate = " ".join(expression).strip("`").replace("; ", "\n") | ||
if not (to_evaluate.strip() and to_evaluate != expression): | ||
return await ctx.reply("Nothing was specified to run.") | ||
|
||
try: | ||
namespace, lines = {"ctx": ctx, "import": __import__}, to_evaluate.splitlines() | ||
exec(f"async def __exec():\n{'\n'.join(f' {line}' for line in lines[:-1]) + f'\n return {lines[-1]}'}", namespace) | ||
|
||
return_value = await namespace["__exec"]() # type: ignore | ||
if return_value is not None: | ||
await ctx.reply(str(return_value)) | ||
|
||
return | ||
|
||
except Exception as e: | ||
return await ctx.reply(str(e)) | ||
|
||
case ["music"]: | ||
data, image = self.get_now_playing() | ||
if not (data and image): | ||
return await ctx.reply("iiPython isn't listening to anything right now.") | ||
|
||
await ctx.send(f"iiPython is listening to **[{data['track']}](https://www.last.fm/music/{data['artist']}/_/{data['track']})** by **[{data['artist']}](https://www.last.fm/music/{data['artist']})** (on **[{data['album']}](https://www.last.fm/music/{data['artist']}/{data['album']})**).") | ||
await ctx.send(f"![{data['track']} by {data['artist']} cover art]({image})") | ||
|
||
case ["user", *username]: | ||
client = next(filter(lambda u: u.name == " ".join(username), ctx.rics.users), None) | ||
if client is None: | ||
return await ctx.reply("Specified user doesn't *fucking* exist.") | ||
|
||
await ctx.send(f"**Name:** {'🤖 ' if client.bot else '★ ' if client.admin else ''}{client.name} | **HEX Code:** #{client.hex}") | ||
|
||
case ["rename" | "set-hex" as command, *response]: | ||
try: | ||
if not response: | ||
return await ctx.reply("Are you gonna specify a value or what?") | ||
|
||
log.info("test_bot", f"Reauthenticated using name '{self.user.name}' and HEX code #{self.user.hex}!") # type: ignore | ||
await self.rejoin( | ||
" ".join(response) if command == "rename" else None, | ||
response[0] if command == "set-hex" else None | ||
) | ||
|
||
except AuthorizationFailed as problem: | ||
if problem.json is not None: | ||
message = (problem.json.get("message") or problem.json["detail"][0]["msg"]).rstrip(".").lower() | ||
self.send_on_join = f"Failed to switch {'username' if command == 'rename' else 'hex code'} because '{message}'." | ||
|
||
log.error("test_bot", "Failed to switch name or hex code!") # type: ignore | ||
await self.event_loop(self.user.name, self.user.hex, self.address) # type: ignore | ||
|
||
case ["people"]: | ||
await ctx.send(f"There are {len(ctx.rics.users)} users: {', '.join(f'{u.name}{f' ({'admin' if u.admin else 'bot'})' if u.admin or u.bot else ''}' for u in ctx.rics.users)}") | ||
|
||
case ["version"]: | ||
await ctx.reply(f"Running on Nightwatch v{__version__} using Python {python_version()}.") | ||
|
||
case _: | ||
log.warn("test_bot", "Invalid command was ran, see above.") | ||
await ctx.reply("I have **no idea** what the *fuck* you just asked...") | ||
|
||
NextgenerationBot().run( | ||
username = "Prism", | ||
hex = "126bf1", | ||
address = sys.argv[1] | ||
) |