Skip to content

Commit

Permalink
update: log database query times and record upsert counts
Browse files Browse the repository at this point in the history
  • Loading branch information
Roardom committed Jul 17, 2024
1 parent 534b8c7 commit 3b1a642
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub async fn flush(tracker: &Arc<Tracker>) {
/// Send history updates to mysql database
async fn flush_history_updates(tracker: &Arc<Tracker>) {
let history_update_batch = tracker.history_updates.lock().take_batch();
let start = Utc::now();
let len = history_update_batch.len();
let result = history_update_batch
.flush_to_db(
&tracker.pool,
Expand All @@ -55,11 +57,14 @@ async fn flush_history_updates(tracker: &Arc<Tracker>) {
},
)
.await;
let elapsed = Utc::now().signed_duration_since(start).num_milliseconds();

match result {
Ok(_) => (),
Ok(_) => {
println!("{start} - Upserted {len} histories in {elapsed} ms.");
}
Err(e) => {
println!("History update failed: {}", e);
println!("{start} - Failed to update {len} histories after {elapsed} ms: {e}");
tracker
.history_updates
.lock()
Expand All @@ -71,12 +76,17 @@ async fn flush_history_updates(tracker: &Arc<Tracker>) {
/// Send peer updates to mysql database
async fn flush_peer_updates(tracker: &Arc<Tracker>) {
let peer_update_batch = tracker.peer_updates.lock().take_batch();
let start = Utc::now();
let len = peer_update_batch.len();
let result = peer_update_batch.flush_to_db(&tracker.pool, ()).await;
let elapsed = Utc::now().signed_duration_since(start).num_milliseconds();

match result {
Ok(_) => (),
Ok(_) => {
println!("{start} - Upserted {len} peers in {elapsed} ms.");
}
Err(e) => {
println!("Peer update failed: {}", e);
println!("{start} - Failed to update {len} peers after {elapsed} ms: {e}");
tracker.peer_updates.lock().upsert_batch(peer_update_batch);
}
}
Expand All @@ -85,12 +95,17 @@ async fn flush_peer_updates(tracker: &Arc<Tracker>) {
/// Send torrent updates to mysql database
async fn flush_torrent_updates(tracker: &Arc<Tracker>) {
let torrent_update_batch = tracker.torrent_updates.lock().take_batch();
let start = Utc::now();
let len = torrent_update_batch.len();
let result = torrent_update_batch.flush_to_db(&tracker.pool, ()).await;
let elapsed = Utc::now().signed_duration_since(start).num_milliseconds();

match result {
Ok(_) => (),
Ok(_) => {
println!("{start} - Upserted {len} torrents in {elapsed} ms.");
}
Err(e) => {
println!("Torrent update failed: {}", e);
println!("{start} - Failed to update {len} torrents after {elapsed} ms: {e}");
tracker
.torrent_updates
.lock()
Expand All @@ -102,12 +117,17 @@ async fn flush_torrent_updates(tracker: &Arc<Tracker>) {
/// Send user updates to mysql database
async fn flush_user_updates(tracker: &Arc<Tracker>) {
let user_update_batch = tracker.user_updates.lock().take_batch();
let start = Utc::now();
let len = user_update_batch.len();
let result = user_update_batch.flush_to_db(&tracker.pool, ()).await;
let elapsed = Utc::now().signed_duration_since(start).num_milliseconds();

match result {
Ok(_) => (),
Ok(_) => {
println!("{start} - Upserted {len} users in {elapsed} ms.");
}
Err(e) => {
println!("User update failed: {}", e);
println!("{start} - Failed to update {len} users after {elapsed} ms: {e}");
tracker.user_updates.lock().upsert_batch(user_update_batch);
}
}
Expand All @@ -116,12 +136,17 @@ async fn flush_user_updates(tracker: &Arc<Tracker>) {
/// Send announce updates to mysql database
async fn flush_announce_updates(tracker: &Arc<Tracker>) {
let announce_update_batch = tracker.announce_updates.lock().take_batch();
let start = Utc::now();
let len = announce_update_batch.len();
let result = announce_update_batch.flush_to_db(&tracker.pool).await;
let elapsed = Utc::now().signed_duration_since(start).num_milliseconds();

match result {
Ok(_) => (),
Ok(_) => {
println!("{start} - Upserted {len} announces in {elapsed} ms.");
}
Err(e) => {
println!("Announce update failed: {}", e);
println!("{start} - Failed to update {len} announces after {elapsed} ms: {e}");
tracker
.announce_updates
.lock()
Expand Down Expand Up @@ -259,6 +284,10 @@ impl<'a, K, V> Batch<K, V> {
fn values(&'a self) -> Values<'a, K, V> {
self.0.values()
}

fn len(&self) -> usize {
self.0.len()
}
}

pub trait Flushable<T> {
Expand Down

0 comments on commit 3b1a642

Please sign in to comment.