Skip to content

Commit 56fab03

Browse files
committed
Harden Spotify API interaction
1 parent 4781ff4 commit 56fab03

File tree

11 files changed

+350
-283
lines changed

11 files changed

+350
-283
lines changed

Cargo.lock

Lines changed: 128 additions & 85 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bot/Cargo.toml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,27 @@ argwerk = "0.20.0"
1111
eudex = "0.1.1"
1212
chrono = { version = "0.4.19", features = ["serde"] }
1313
chrono-tz = { version = "0.5.3", features = ["serde"] }
14-
mysql_async = "0.26.0"
14+
mysql_async = "0.27.0"
1515
diesel = { version = "1.4.5", features = ["sqlite", "chrono"] }
1616
diesel_migrations = "1.4.0"
1717
# set the bundled feature to use the bundled libsqlite3
1818
libsqlite3-sys = { version = "0.17.3", features = ["bundled", "unlock_notify"] }
19-
log = "0.4.13"
19+
log = "0.4.14"
2020
relative-path = { version = "1.3.2", features = ["serde"] }
21-
serde = { version = "1.0.120", features = ["rc"] }
22-
serde_yaml = "0.8.15"
23-
serde_json = "1.0.61"
21+
serde = { version = "1.0.123", features = ["rc"] }
22+
serde_yaml = "0.8.17"
23+
serde_json = "1.0.62"
2424
serde_cbor = "0.11.1"
2525
serde_urlencoded = "0.7.0"
2626
reqwest = "0.11.0"
2727
Inflector = "0.11.4"
2828
warp = "0.3.0"
29-
tokio = { version = "1.1.0", features = ["full"] }
30-
tokio-tungstenite = { version = "0.13.0", features = ["tls"] }
31-
tokio-stream = "0.1.2"
29+
tokio = { version = "1.2.0", features = ["full"] }
30+
tokio-tungstenite = { version = "0.14.0", features = ["rustls-tls"] }
31+
tungstenite = "0.13.0"
32+
tokio-stream = "0.1.3"
3233
url = { version = "2.2.0", features = ["serde"] }
33-
rand = "0.8.2"
34+
rand = "0.8.3"
3435
fixed-map = "0.7.1"
3536
log4rs = "1.0.0"
3637
handlebars = "3.5.2"
@@ -42,7 +43,7 @@ bytes = "1.0.1"
4243
uuid = { version = "0.8.2", features = ["serde", "v4"] }
4344
smallvec = { version = "1.6.1", features = ["serde"] }
4445
dirs = "3.0.1"
45-
backoff = "0.2.1"
46+
backoff = "0.3.0"
4647
rust-embed = { version = "5.9.0", features = ["interpolate-folder-path"] }
4748
mime = "0.3.16"
4849
mime_guess = "2.0.3"
@@ -62,14 +63,13 @@ regex = "1.4.3"
6263
backtrace = "0.3.56"
6364
futures-util = "0.3.12"
6465
futures-core = "0.3.12"
65-
tracing = "0.1.22"
66+
tracing = "0.1.23"
6667
tracing-core = "0.1.17"
6768
tracing-futures = { version = "0.2.4", default-features = false, features = ["std-future"] }
6869
slab = "0.4.2"
6970
irc = "0.15.0"
7071
ignore = "0.4.17"
71-
notify = "5.0.0-pre.4"
72-
tungstenite = "0.12.0"
72+
notify = "5.0.0-pre.5"
7373

7474
runestick = { version = "0.8.0", optional = true }
7575
rune = { version = "0.8.0", features = ["diagnostics"], optional = true }

bot/src/api/spotify/mod.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub use self::model::user::PrivateUser;
1212
use crate::api::RequestBuilder;
1313
use crate::oauth2;
1414
use crate::prelude::*;
15+
use crate::spotify_id::SpotifyId;
1516
use anyhow::Result;
1617
use bytes::Bytes;
1718
use reqwest::{header, Client, Method, StatusCode};
@@ -22,6 +23,7 @@ use url::Url;
2223
mod model;
2324

2425
const API_URL: &str = "https://api.spotify.com/v1";
26+
const DEFAULT_LIMIT: usize = 50;
2527

2628
/// API integration.
2729
#[derive(Clone, Debug)]
@@ -56,9 +58,10 @@ impl Spotify {
5658
}
5759

5860
/// Get my playlists.
59-
pub async fn playlist(&self, id: String, market: Option<&str>) -> Result<FullPlaylist> {
61+
pub async fn playlist(&self, id: SpotifyId, market: Option<&str>) -> Result<FullPlaylist> {
6062
let req = self
61-
.request(Method::GET, &["playlists", id.as_str()])
63+
.request(Method::GET, &["playlists", id.to_string().as_str()])
64+
.query_param("limit", &DEFAULT_LIMIT.to_string())
6265
.optional_query_param("market", market);
6366

6467
req.execute().await?.json()
@@ -190,14 +193,11 @@ impl Spotify {
190193

191194
/// Get my songs.
192195
pub async fn my_tracks(&self) -> Result<Page<SavedTrack>> {
193-
let req = self.request(Method::GET, &["me", "tracks"]);
194-
req.execute().await?.json()
195-
}
196+
let req = self
197+
.request(Method::GET, &["me", "tracks"])
198+
.query_param("limit", &DEFAULT_LIMIT.to_string());
196199

197-
/// Get my songs.
198-
pub fn my_tracks_stream(&self) -> PageStream<SavedTrack> {
199-
let req = self.request(Method::GET, &["me", "tracks"]);
200-
self.page_stream(async move { req.execute().await?.json() })
200+
req.execute().await?.json()
201201
}
202202

203203
/// Get the full track by ID.
@@ -252,6 +252,7 @@ fn device_control<C>(status: StatusCode, _: &C) -> Result<Option<bool>> {
252252
}
253253
}
254254

255+
/// A page converted into a stream which will perform pagination under the hood.
255256
pub struct PageStream<T> {
256257
client: Client,
257258
token: oauth2::SyncToken,
@@ -283,15 +284,16 @@ where
283284
None => return Poll::Ready(None),
284285
};
285286

286-
if let Poll::Ready(page) = future.as_mut().poll(cx)? {
287-
self.as_mut().next = match page.next.map(|s| str::parse(s.as_str())).transpose()? {
288-
Some(next) => Some(Box::pin(self.next_page(next))),
289-
None => None,
290-
};
287+
let page = match future.as_mut().poll(cx)? {
288+
Poll::Ready(page) => page,
289+
Poll::Pending => return Poll::Pending,
290+
};
291291

292-
return Poll::Ready(Some(Ok(page.items)));
293-
}
292+
self.as_mut().next = match page.next.map(|s| str::parse(s.as_str())).transpose()? {
293+
Some(next) => Some(Box::pin(self.next_page(next))),
294+
None => None,
295+
};
294296

295-
Poll::Pending
297+
Poll::Ready(Some(Ok(page.items)))
296298
}
297299
}

bot/src/macros.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,13 @@ macro_rules! retry_until_ok {
108108

109109
loop {
110110
log::info!("{}", $id);
111-
let res = async { $($f)* }.await;
111+
let res: anyhow::Result<_> = async { $($f)* }.await;
112112

113113
match res {
114114
Ok(output) => break output,
115115
Err(e) => {
116116
let duration = backoff.next();
117-
log_warn!(e, "\"{}\" failed, trying again in {:?}", $id, duration);
117+
log_warn!(e, "{} failed, trying again in {:?}", $id, duration);
118118
tokio::time::sleep(duration).await;
119119
}
120120
}

bot/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ const PACKAGE: &str = env!("CARGO_PKG_NAME");
4242
#[cfg(feature = "windows")]
4343
mod internal {
4444
use super::FILE;
45-
use log4rs::config::{Config, ConfigBuilder, Logger, LoggerBuilder, Root, RootBuilder};
45+
use log4rs::config::runtime::{ConfigBuilder, LoggerBuilder, RootBuilder};
46+
use log4rs::config::{Config, Logger, Root};
4647

4748
pub(crate) fn logger_builder() -> LoggerBuilder {
4849
Logger::builder().appender(FILE).additive(false)

bot/src/player/connect.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ pub(super) async fn setup(
8181

8282
#[derive(Debug, Error)]
8383
pub(super) enum ConnectError {
84-
#[error("no device configured or available")]
85-
NoDevice,
86-
#[error("error when issuing {0} command")]
84+
#[error("{0}: no device configured or available")]
85+
NoDevice(&'static str),
86+
#[error("{0}: error")]
8787
Error(&'static str, #[source] Error),
8888
}
8989

@@ -92,7 +92,7 @@ impl ConnectError {
9292
match result {
9393
Err(e) => Err(ConnectError::Error(what, e.into())),
9494
Ok(true) => Ok(()),
95-
Ok(false) => Err(ConnectError::NoDevice),
95+
Ok(false) => Err(ConnectError::NoDevice(what)),
9696
}
9797
}
9898
}
@@ -171,6 +171,7 @@ impl ConnectPlayer {
171171
self.volume(player::ModifyVolume::Set(update)).await
172172
}
173173

174+
/// Modify the volume of the player.
174175
pub(super) async fn volume(&self, modify: player::ModifyVolume) -> Result<u32, ConnectError> {
175176
let volume = self.volume.load().await;
176177
let update = modify.apply(volume);
@@ -181,6 +182,7 @@ impl ConnectPlayer {
181182
Ok(update)
182183
}
183184

185+
/// Get the current volume of the player.
184186
pub(super) async fn current_volume(&self) -> u32 {
185187
self.volume.load().await
186188
}

bot/src/player/item.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ pub struct Item {
1414
impl Item {
1515
/// Human readable version of playback item.
1616
pub fn what(&self) -> String {
17-
match self.track {
18-
Track::Spotify { ref track } => {
17+
match &self.track {
18+
Track::Spotify { track } => {
1919
if let Some(artists) = utils::human_artists(&track.artists) {
2020
format!("\"{}\" by {}", track.name, artists)
2121
} else {
2222
format!("\"{}\"", track.name)
2323
}
2424
}
25-
Track::YouTube { ref video } => match video.snippet.as_ref() {
25+
Track::YouTube { video } => match video.snippet.as_ref() {
2626
Some(snippet) => match snippet.channel_title.as_ref() {
2727
Some(channel_title) => {
2828
format!("\"{}\" from \"{}\"", snippet.title, channel_title)
@@ -34,17 +34,11 @@ impl Item {
3434
}
3535
}
3636

37+
/// Test if the given item is playable.
3738
pub fn is_playable(&self) -> bool {
38-
match self.track {
39-
Track::Spotify { ref track } => {
40-
match track.is_playable {
41-
Some(is_playable) => return is_playable,
42-
None => return false,
43-
};
44-
}
45-
Track::YouTube { video: _ } => {
46-
return true;
47-
}
39+
match &self.track {
40+
Track::Spotify { track } => track.is_playable.unwrap_or(true),
41+
Track::YouTube { video: _ } => true,
4842
}
4943
}
5044
}

bot/src/player/playback_future.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,9 @@ impl PlaybackFuture {
3636
}
3737

3838
let (mut fallback_stream, fallback) = settings.stream("fallback-uri").optional().await?;
39-
self.internal
40-
.write()
41-
.await
42-
.update_fallback_items(fallback)
43-
.await;
39+
40+
let configure_fallback = Fuse::new(update_fallback_items_task(&self.internal, fallback));
41+
tokio::pin!(configure_fallback);
4442

4543
let (mut song_stream, song) = injector.stream::<Song>().await;
4644

@@ -63,9 +61,6 @@ impl PlaybackFuture {
6361
_ => None,
6462
}).unwrap_or_default());
6563
}
66-
fallback = fallback_stream.recv() => {
67-
self.internal.write().await.update_fallback_items(fallback).await;
68-
}
6964
/* player */
7065
_ = &mut song_timeout => {
7166
self.internal.write().await.end_of_track().await?;
@@ -89,7 +84,36 @@ impl PlaybackFuture {
8984
event = self.connect_stream.recv() => {
9085
self.internal.write().await.handle_player_event(event).await?;
9186
}
87+
fallback = fallback_stream.recv() => {
88+
configure_fallback.set(Fuse::new(update_fallback_items_task(&self.internal, fallback)));
89+
}
90+
_ = configure_fallback.as_mut() => {
91+
}
9292
}
9393
}
94+
95+
/// Update fallback item tasks.
96+
async fn update_fallback_items_task(
97+
internal: &RwLock<PlayerInternal>,
98+
fallback: Option<Uri>,
99+
) {
100+
let task = retry_until_ok! {
101+
"Loading fallback items", {
102+
let task = internal.read().await.load_fallback_items(fallback.as_ref());
103+
let (what, items) = task.await?;
104+
105+
log::info!(
106+
"Updated fallback queue with {} items from {}.",
107+
items.len(),
108+
what
109+
);
110+
111+
internal.write().await.update_fallback_items(items).await;
112+
Ok(())
113+
}
114+
};
115+
116+
task.await
117+
}
94118
}
95119
}

0 commit comments

Comments
 (0)