|
| 1 | +# Copyright (c) 2024 iiPython |
| 2 | + |
| 3 | +# Modules |
| 4 | +import sys |
| 5 | +import typing |
| 6 | +from time import time |
| 7 | +from base64 import b64encode |
| 8 | +from urllib.parse import quote_plus |
| 9 | +from platform import python_version |
| 10 | + |
| 11 | +from requests import Session |
| 12 | + |
| 13 | +from nightwatch import __version__ |
| 14 | +from nightwatch.bot import Client, Context |
| 15 | +from nightwatch.bot.client import AuthorizationFailed |
| 16 | +from nightwatch.logging import log |
| 17 | + |
| 18 | +# Create client |
| 19 | +class NextgenerationBot(Client): |
| 20 | + def __init__(self) -> None: |
| 21 | + super().__init__() |
| 22 | + |
| 23 | + # Extra data |
| 24 | + self.send_on_join = None |
| 25 | + self.session, self.cache = Session(), {"time": 0, "token": None} |
| 26 | + |
| 27 | + # Handle now playing |
| 28 | + def get_spotify_access(self) -> None: |
| 29 | + with self.session.post( |
| 30 | + "https://accounts.spotify.com/api/token", |
| 31 | + data = "grant_type=client_credentials", |
| 32 | + headers = { |
| 33 | + "Authorization": f"Basic {b64encode(b'3f974573800a4ff5b325de9795b8e603:ff188d2860ff44baa57acc79c121a3b9').decode()}", |
| 34 | + "Content-Type": "application/x-www-form-urlencoded" |
| 35 | + } |
| 36 | + ) as response: |
| 37 | + self.cache["time"] = time() |
| 38 | + self.cache["token"] = response.json()["access_token"] |
| 39 | + |
| 40 | + log.info("test_bot", "Spotify access token has been regenerated!") |
| 41 | + |
| 42 | + def get_now_playing(self) -> tuple[dict | None, str | None]: |
| 43 | + |
| 44 | + # Should we regenerate our spotify token? |
| 45 | + if (time() - self.cache["time"]) >= 3550: # Not 3600, just for some wiggle room |
| 46 | + self.get_spotify_access() |
| 47 | + |
| 48 | + # Fetch actual data |
| 49 | + with self.session.get("https://api.listenbrainz.org/1/user/iiPython/playing-now") as response: |
| 50 | + result = (response.json())["payload"]["listens"] |
| 51 | + |
| 52 | + if not result: |
| 53 | + return None, None |
| 54 | + |
| 55 | + result = result[0] |
| 56 | + |
| 57 | + # Reorganize the result data |
| 58 | + tm = result["track_metadata"] |
| 59 | + result = {"artist": tm["artist_name"], "track": tm["track_name"], "album": tm["release_name"],} |
| 60 | + with self.session.get( |
| 61 | + f"https://api.spotify.com/v1/search?q={quote_plus(f'{result['artist']} {result['album']}')}&type=album&limit=1", |
| 62 | + headers = { |
| 63 | + "Authorization": f"Bearer {self.cache['token']}", |
| 64 | + "Content-Type": "application/x-www-form-urlencoded" |
| 65 | + } |
| 66 | + ) as response: |
| 67 | + return result, (response.json())["albums"]["items"][0]["images"][-2]["url"] |
| 68 | + |
| 69 | + # Handle rejoining (for changing name or hex) |
| 70 | + async def rejoin(self, username: typing.Optional[str] = None, hex: typing.Optional[str] = None) -> None: |
| 71 | + await self.close() |
| 72 | + await self.event_loop(username or self.user.name, hex or self.user.hex, self.address) # type: ignore |
| 73 | + |
| 74 | + # Handle events |
| 75 | + async def on_connect(self, ctx: Context) -> None: |
| 76 | + log.info("test_bot", f"Connected to {ctx.rics.name}!") |
| 77 | + |
| 78 | + async def on_message(self, ctx: Context) -> None: |
| 79 | + if self.send_on_join is not None: |
| 80 | + await ctx.send(self.send_on_join) |
| 81 | + self.send_on_join = None |
| 82 | + return |
| 83 | + |
| 84 | + command = ctx.message.message |
| 85 | + if command[:2] != "p!": |
| 86 | + return |
| 87 | + |
| 88 | + log.info("test_bot", f"'{ctx.user.name}' ran '{command[2:]}'!") |
| 89 | + match command[2:].split(" "): |
| 90 | + case ["help"]: |
| 91 | + await ctx.reply("Commands: p!help, p!eval, p!music, p!user, p!people, p!rename, p!set-hex, p!version") |
| 92 | + |
| 93 | + case ["eval", *expression]: |
| 94 | + to_evaluate = " ".join(expression).strip("`") |
| 95 | + if not (to_evaluate.strip() and to_evaluate != expression): |
| 96 | + return await ctx.reply("Nothing was specified to run.") |
| 97 | + |
| 98 | + try: |
| 99 | + return await ctx.reply(str(eval(to_evaluate, {"ctx": ctx}))) |
| 100 | + |
| 101 | + except Exception as e: |
| 102 | + return await ctx.reply(str(e)) |
| 103 | + |
| 104 | + case ["music"]: |
| 105 | + data, image = self.get_now_playing() |
| 106 | + if not (data and image): |
| 107 | + return await ctx.reply("iiPython isn't listening to anything right now.") |
| 108 | + |
| 109 | + await ctx.send(f"iiPython is listening to **[{data['track']}](https://www.last.fm/music/{data['artist']}/_/{data['track']})** by **[{data['artist']}](https://www.last.fm/music/{data['artist']})** (on **[{data['album']}](https://www.last.fm/music/{data['artist']}/{data['album']})**).") |
| 110 | + await ctx.send(f"![{data['track']} by {data['artist']} cover art]({image})") |
| 111 | + |
| 112 | + case ["user", *username]: |
| 113 | + client = next(filter(lambda u: u.name == " ".join(username), ctx.rics.users), None) |
| 114 | + if client is None: |
| 115 | + return await ctx.reply("Specified user doesn't *fucking* exist.") |
| 116 | + |
| 117 | + await ctx.send(f"**Name:** {'🤖 ' if client.bot else '★ ' if client.admin else ''}{client.name} | **HEX Code:** #{client.hex}") |
| 118 | + |
| 119 | + case ["rename" | "set-hex" as command, *response]: |
| 120 | + try: |
| 121 | + if not response: |
| 122 | + return await ctx.reply("Are you gonna specify a value or what?") |
| 123 | + |
| 124 | + log.info("test_bot", f"Reauthenticated using name '{self.user.name}' and HEX code #{self.user.hex}!") # type: ignore |
| 125 | + await self.rejoin( |
| 126 | + " ".join(response) if command == "rename" else None, |
| 127 | + response[0] if command == "set-hex" else None |
| 128 | + ) |
| 129 | + |
| 130 | + except AuthorizationFailed as problem: |
| 131 | + if problem.json is not None: |
| 132 | + message = (problem.json.get("message") or problem.json["detail"][0]["msg"]).rstrip(".").lower() |
| 133 | + self.send_on_join = f"Failed to switch {'username' if command == 'rename' else 'hex code'} because '{message}'." |
| 134 | + |
| 135 | + log.error("test_bot", "Failed to switch name or hex code!") # type: ignore |
| 136 | + await self.event_loop(self.user.name, self.user.hex, self.address) # type: ignore |
| 137 | + |
| 138 | + case ["people"]: |
| 139 | + await ctx.send(f"There are {len(ctx.rics.users)} users: {', '.join(f'{u.name}{f' ({'admin' if u.admin else 'bot'})' if u.admin or u.bot else ''}' for u in ctx.rics.users)}") |
| 140 | + |
| 141 | + case ["version"]: |
| 142 | + await ctx.reply(f"Running on Nightwatch v{__version__} using Python {python_version()}.") |
| 143 | + |
| 144 | + case _: |
| 145 | + log.warn("test_bot", "Invalid command was ran, see above.") |
| 146 | + await ctx.reply("I have **no idea** what the *fuck* you just asked...") |
| 147 | + |
| 148 | +NextgenerationBot().run( |
| 149 | + username = "Prism", |
| 150 | + hex = "126bf1", |
| 151 | + address = sys.argv[1] |
| 152 | +) |
0 commit comments