Skip to content

Commit

Permalink
refactor: modularize queue logic
Browse files Browse the repository at this point in the history
Cleaner code.
  • Loading branch information
Roardom committed Jul 14, 2024
1 parent c8c5df7 commit 9a5b912
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 323 deletions.
2 changes: 1 addition & 1 deletion src/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
},
scheduler::{
announce_update::AnnounceUpdate, history_update::HistoryUpdate, peer_update::PeerUpdate,
torrent_update::TorrentUpdate, user_update::UserUpdate,
torrent_update::TorrentUpdate, user_update::UserUpdate, Upsertable,
},
warning::AnnounceWarning,
};
Expand Down
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ async fn main() -> Result<()> {
let mut flushes = 0;

while flushes < max_flushes
&& (tracker_clone2.history_updates.lock().len() > 0
|| tracker_clone2.peer_updates.lock().len() > 0
|| tracker_clone2.torrent_updates.lock().len() > 0
|| tracker_clone2.user_updates.lock().len() > 0)
&& (tracker_clone2.history_updates.lock().is_not_empty()
|| tracker_clone2.peer_updates.lock().is_not_empty()
|| tracker_clone2.torrent_updates.lock().is_not_empty()
|| tracker_clone2.user_updates.lock().is_not_empty())
{
scheduler::flush(&tracker_clone2.clone()).await;
flushes += 1;
Expand Down
103 changes: 98 additions & 5 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{cmp::min, hash::Hash, sync::Arc};

pub mod announce_update;
pub mod history_update;
Expand All @@ -8,9 +8,13 @@ pub mod user_update;

use crate::tracker::Tracker;
use chrono::{Duration, Utc};
use indexmap::{map::Values, IndexMap};
use sqlx::MySqlPool;
use tokio::join;
use torrent_update::TorrentUpdate;

use self::history_update::HistoryUpdateExtraBindings;

pub async fn handle(tracker: &Arc<Tracker>) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let mut counter = 0_u64;
Expand Down Expand Up @@ -46,7 +50,9 @@ async fn flush_history_updates(tracker: &Arc<Tracker>) {
let result = history_update_batch
.flush_to_db(
&tracker.pool,
tracker.config.active_peer_ttl + tracker.config.peer_expiry_interval,
HistoryUpdateExtraBindings {
seedtime_ttl: tracker.config.active_peer_ttl + tracker.config.peer_expiry_interval,
},
)
.await;

Expand All @@ -65,7 +71,7 @@ async fn flush_history_updates(tracker: &Arc<Tracker>) {
/// Send peer updates to mysql database
async fn flush_peer_updates(tracker: &Arc<Tracker>) {
let peer_update_batch = tracker.peer_updates.lock().take_batch();
let result = peer_update_batch.flush_to_db(&tracker.pool).await;
let result = peer_update_batch.flush_to_db(&tracker.pool, ()).await;

match result {
Ok(_) => (),
Expand All @@ -79,7 +85,7 @@ async fn flush_peer_updates(tracker: &Arc<Tracker>) {
/// Send torrent updates to mysql database
async fn flush_torrent_updates(tracker: &Arc<Tracker>) {
let torrent_update_batch = tracker.torrent_updates.lock().take_batch();
let result = torrent_update_batch.flush_to_db(&tracker.pool).await;
let result = torrent_update_batch.flush_to_db(&tracker.pool, ()).await;

match result {
Ok(_) => (),
Expand All @@ -96,7 +102,7 @@ async fn flush_torrent_updates(tracker: &Arc<Tracker>) {
/// Send user updates to mysql database
async fn flush_user_updates(tracker: &Arc<Tracker>) {
let user_update_batch = tracker.user_updates.lock().take_batch();
let result = user_update_batch.flush_to_db(&tracker.pool).await;
let result = user_update_batch.flush_to_db(&tracker.pool, ()).await;

match result {
Ok(_) => (),
Expand Down Expand Up @@ -182,3 +188,90 @@ pub async fn reap(tracker: &Arc<Tracker>) {
}
}
}

pub struct Queue<K, V> {
records: IndexMap<K, V>,
config: QueueConfig,
}

pub struct QueueConfig {
pub max_bindings_per_flush: usize,
pub bindings_per_record: usize,
pub extra_bindings_per_flush: usize,
}

impl QueueConfig {
fn max_batch_size(&mut self) -> usize {
(self.max_bindings_per_flush - self.extra_bindings_per_flush) / self.bindings_per_record
}
}

impl<K, V> Queue<K, V>
where
K: Hash + Eq,
V: Clone,
Queue<K, V>: Upsertable<V>,
{
/// Initialize a new queue
pub fn new(config: QueueConfig) -> Queue<K, V> {
Self {
records: IndexMap::new(),
config,
}
}

/// Take a portion of the updates from the start of the queue with a max
/// size defined by the buffer config
fn take_batch(&mut self) -> Batch<K, V> {
let len = self.records.len();

Batch(
self.records
.drain(0..min(len, self.config.max_batch_size()))
.collect(),
)
}

/// Bulk upsert a batch into the end of the queue
fn upsert_batch(&mut self, batch: Batch<K, V>) {
for record in batch.values() {
self.upsert(record.clone());
}
}

pub fn is_not_empty(&self) -> bool {
self.records.len() != 0
}
}

pub trait Upsertable<T> {
fn upsert(&mut self, new: T);
}

pub struct Batch<K, V>(IndexMap<K, V>);

impl<'a, K, V> Batch<K, V> {
fn is_empty(&self) -> bool {
self.0.len() == 0
}

fn values(&'a self) -> Values<'a, K, V> {
self.0.values()
}
}

pub trait Flushable<T> {
/// Used to store extra bindings used in the query when the record already
/// exists in the database
type ExtraBindings;

/// Flushes batch of updates to MySQL database
///
/// **Warning**: this function does not make sure that the query isn't too long
/// or doesn't use too many bindings
async fn flush_to_db(
&self,
db: &MySqlPool,
extra_bindings: Self::ExtraBindings,
) -> Result<u64, sqlx::Error>;
}
115 changes: 34 additions & 81 deletions src/scheduler/history_update.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use std::{
cmp::min,
ops::{Deref, DerefMut},
};

use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use sqlx::{MySql, MySqlPool, QueryBuilder};

pub struct Queue(pub IndexMap<Index, HistoryUpdate>);
use super::{Flushable, Upsertable};

#[derive(Eq, Hash, PartialEq)]
pub struct Index {
Expand All @@ -32,69 +26,42 @@ pub struct HistoryUpdate {
pub completed_at: Option<DateTime<Utc>>,
}

impl Queue {
pub fn new() -> Queue {
Queue(IndexMap::new())
}

pub fn upsert(&mut self, new: HistoryUpdate) {
self.entry(Index {
torrent_id: new.torrent_id,
user_id: new.user_id,
})
.and_modify(|history_update| {
history_update.user_agent = new.user_agent.to_owned();
history_update.is_active = new.is_active;
history_update.is_seeder = new.is_seeder;
history_update.uploaded = new.uploaded;
history_update.downloaded = new.downloaded;
history_update.uploaded_delta += new.uploaded_delta;
history_update.downloaded_delta += new.downloaded_delta;
history_update.credited_uploaded_delta += new.credited_uploaded_delta;
history_update.credited_downloaded_delta += new.credited_downloaded_delta;
history_update.completed_at = new.completed_at;
})
.or_insert(new);
}

/// Determine the max amount of history records that can be inserted at
/// once
const fn history_limit() -> usize {
/// Max amount of bindings in a mysql query
const BIND_LIMIT: usize = 65535;

/// Number of columns being updated in the history table
const HISTORY_COLUMN_COUNT: usize = 16;

/// 1 extra binding is used to insert the TTL
const EXTRA_BINDING_COUNT: usize = 1;

(BIND_LIMIT - EXTRA_BINDING_COUNT) / HISTORY_COLUMN_COUNT
}

/// Take a portion of the history updates small enough to be inserted into
/// the database.
pub fn take_batch(&mut self) -> Queue {
let len = self.len();

Queue(self.drain(0..min(Queue::history_limit(), len)).collect())
impl Upsertable<HistoryUpdate> for super::Queue<Index, HistoryUpdate> {
fn upsert(&mut self, new: HistoryUpdate) {
self.records
.entry(Index {
torrent_id: new.torrent_id,
user_id: new.user_id,
})
.and_modify(|history_update| {
history_update.user_agent = new.user_agent.to_owned();
history_update.is_active = new.is_active;
history_update.is_seeder = new.is_seeder;
history_update.uploaded = new.uploaded;
history_update.downloaded = new.downloaded;
history_update.uploaded_delta += new.uploaded_delta;
history_update.downloaded_delta += new.downloaded_delta;
history_update.credited_uploaded_delta += new.credited_uploaded_delta;
history_update.credited_downloaded_delta += new.credited_downloaded_delta;
history_update.completed_at = new.completed_at;
})
.or_insert(new);
}
}

/// Merge a history update batch into this history update batch
pub fn upsert_batch(&mut self, batch: Queue) {
for history_update in batch.values() {
self.upsert(history_update.clone());
}
}
pub struct HistoryUpdateExtraBindings {
pub seedtime_ttl: u64,
}

/// Flushes history updates to the mysql db
///
/// **Warning**: this function does not make sure that the query isn't too long
/// or doesn't use too many bindings
pub async fn flush_to_db(&self, db: &MySqlPool, seedtime_ttl: u64) -> Result<u64, sqlx::Error> {
let len = self.len();
impl Flushable<HistoryUpdate> for super::Batch<Index, HistoryUpdate> {
type ExtraBindings = HistoryUpdateExtraBindings;

if len == 0 {
async fn flush_to_db(
&self,
db: &MySqlPool,
extra_bindings: HistoryUpdateExtraBindings,
) -> Result<u64, sqlx::Error> {
if self.is_empty() {
return Ok(0);
}

Expand Down Expand Up @@ -158,7 +125,7 @@ impl Queue {
DATE_ADD(updated_at, INTERVAL
"#,
)
.push_bind(seedtime_ttl)
.push_bind(extra_bindings.seedtime_ttl)
.push(
r#"
SECOND) > VALUES(updated_at) AND seeder = 1 AND active = 1 AND VALUES(seeder) = 1,
Expand All @@ -181,17 +148,3 @@ impl Queue {
.map(|result| result.rows_affected())
}
}

impl Deref for Queue {
type Target = IndexMap<Index, HistoryUpdate>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for Queue {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
Loading

0 comments on commit 9a5b912

Please sign in to comment.