-
Notifications
You must be signed in to change notification settings - Fork 0
/
song_bot.py
261 lines (215 loc) · 8.64 KB
/
song_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
#-*-coding:utf-8-*-
import argparse
import asyncio
import json
import os
import discord
from discord.ext import commands
ffmpeg_options = {
'options': '-vn'
}
class MusicBot(commands.Cog):
def __init__(self, bot, db):
self.bot = bot
self.song_parent = db["files_dir"]
print("Music files will be loaded from {}.".format(self.song_parent))
self.song_database = db["songs"]
print("songs: ", end="")
for x in self.song_database:
print(x["song_keys"][0], ", ", end="")
print("")
self.playlist = []
self.current = "#NA"
self. ctx = None
self.volume = 0.2
self.parser = argparse.ArgumentParser()
self.parser.add_argument("--artist", action="store", nargs="+")
self.parser.add_argument("--song", action="store", nargs="+")
self.parser.add_argument("--album", action="store", nargs="+")
self.parser.add_argument("--random", action="store_true")
self.parser.add_argument("--all", action="store_true")
@commands.command()
async def join(self, ctx, *, channel: discord.VoiceChannel):
if ctx.voice_client is not None:
return await ctx.voice_client.move_to(channel)
self.ctx = ctx
await channel.connect()
def parse(self, query):
play_args = self.parser.parse_args(query.split(" "))
target_songs = []
# process args!
if "artist" in play_args and play_args.artist is not None:
print("Query artists: ", end="")
print(play_args.artist)
for artist in play_args.artist:
artist_clean = artist.lower().replace(" ", "") # sanitize
tmp = [x for x in self.song_database if artist_clean in x["artist_keys"]]
target_songs = [*target_songs, *tmp]
if "album" in play_args and play_args.album is not None:
print("Query album: ", end="")
print(play_args.album)
for album in play_args.album:
album_clean = album.lower().replace(" ", "")
tmp = [x for x in self.song_database if album_clean in x["album_keys"]]
target_songs = [*target_songs, *tmp]
if "song" in play_args and play_args.song is not None:
query_songs = [x.replace("\"", "").replace("\'", "") for x in play_args.song]
print("Query songs: ", end="")
print(query_songs)
if len(target_songs) != 0: # filter!
target_songs_new = []
for song in query_songs:
song_clean = song.lower().replace(" ", "")
tmp = [x for x in target_songs if song_clean in x["song_keys"]]
target_songs_new = [*target_songs_new, *tmp]
target_songs = target_songs_new
else:
for song in query_songs:
song_clean = song.lower().replace(" ", "")
tmp = [x for x in self.song_database if song_clean in x["song_keys"]]
target_songs = [*target_songs, *tmp]
return target_songs
@commands.command()
async def add(self, ctx, *, query):
self.ctx = ctx
target_songs = self.parse(query)
self.playlist = [*self.playlist, *target_songs]
@commands.command()
async def play(self, ctx, *, query):
self.ctx = ctx
print("Incoming query: ", query)
target_songs = self.parse(query)
self.playlist = [*self.playlist, *target_songs]
print("Current playlist: ", end="")
for x in self.playlist:
print(" {} by {}".format(x["song_keys"][0], x["artist_keys"][0]))
if ctx.voice_client.is_playing():
ctx.voice_client.stop()
else:
await self.play_song(None)
async def play_song(self, error):
if error is not None:
print("Error: {}".format(e))
return
print("Checking playlist...")
if len(self.playlist) == 0:
await self.ctx.send("Playlist empty")
self.is_playing = False
return
print("looking for the file")
target_file_path = os.path.join(self.song_parent, self.playlist[0]["filename"])
target_song = self.playlist[0]["song_keys"][0]
target_artist = self.playlist[0]["artist_keys"][0]
del self.playlist[0]
await self.ensure_voice(self.ctx)
source = discord.PCMVolumeTransformer(
discord.FFmpegPCMAudio(target_file_path, options="-b:a 128k")
)
source.volume = self.volume
await self.ctx.send("Now playing: {} by {}".format(target_song, target_artist))
self.current = "{} by {}".format(target_song, target_artist)
self.ctx.voice_client.play(
source,
after=self.dispatch_play_song
)
print("Now playing: {} by {}".format(target_song, target_artist))
def dispatch_play_song(self, e):
print("dispatch!")
if e is not None:
print("Error: ", end="")
print(e)
return
# asyncio.run(self.play_song(None), self.bot.loop)
coro = self.play_song(None)
fut = asyncio.run_coroutine_threadsafe(coro, self.bot.loop)
try:
print("fut.result")
fut.result()
except:
pass
return
@commands.command()
async def pause(self, ctx):
if ctx.voice_client.is_playing():
ctx.voice_client.pause()
@commands.command()
async def resume(self, ctx):
if ctx.voice_client.is_paused():
ctx.voice_client.resume()
@commands.command()
async def skip(self, ctx, *, how_many=0):
m_del = int(how_many)
m_del = m_del if m_del <= len(self.playlist) else len(self.playlist)
for _ in range(m_del):
del self.playlist[0]
# await self.play_song(None)
ctx.voice_client.stop()
@commands.command()
async def show(self, ctx):
to_show = [
"Now playing: " + self.current,
"Songs in playlist: "
]
if len(self.playlist) == 0:
to_show[-1] = "Playlist empty"
for i, song in enumerate(self.playlist):
to_add = "{0:02d}. ".format(i) + song["song_keys"][0] + " by " + song["artist_keys"][0]
to_show.append(to_add)
await ctx.send("\n".join(to_show))
@commands.command()
async def quit(self, ctx):
self.playlist = []
ctx.voice_client.stop()
await ctx.voice_client.disconnect()
@commands.command()
async def q(self, ctx):
await self.quit(ctx)
@commands.command()
async def volume(self, ctx, volume: int):
if ctx.voice_client is None:
return await ctx.send("Not connected to a voice channel.")
self.volume = volume / 100
if ctx.voice_client.source is not None:
ctx.voice_client.source.volume = self.volume
await ctx.send("Changed volume to {}%".format(volume))
# これわるさしてるかも
@play.before_invoke
async def ensure_voice(self, ctx):
if ctx.voice_client is None:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
await ctx.send("You are not connected to a voice channel.")
raise commands.CommandError("Author not connected to a voice channel.")
elif ctx.voice_client.is_playing(): #こいつだああああああああああああああああああああああああ
pass
def main(args):
print("Initializing...")
if "token_file" in args:
with open(args.token_file, "r") as f:
tmp = json.load(f)
token = tmp["token"]
elif "token" in args:
token = args.token
print("Token loaded.")
# load database
with open(args.database, "r") as f:
db = json.load(f)
bot = commands.Bot(
command_prefix=commands.when_mentioned_or(">", "!"),
description='Plays local music file in voice channel'
)
@bot.event
async def on_ready():
print("Logged in as {0} ({0.id})".format(bot.user))
print('------')
bot.add_cog(MusicBot(bot, db))
bot.run(token)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
token_group = parser.add_mutually_exclusive_group(required=True)
token_group.add_argument("--token_file", action="store", help="json file which has discord bot token")
token_group.add_argument("--token", action="store", help="bot token")
parser.add_argument("--database", "-d", action="store", help="json file which has song file path and info", required=True)
args = parser.parse_args()
main(args)