Skip to content

Commit

Permalink
add: log unregistered info hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
Roardom committed Nov 13, 2024
1 parent b2b8490 commit 6367f8c
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 3 deletions.
23 changes: 20 additions & 3 deletions src/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use crate::{
},
scheduler::{
announce_update::AnnounceUpdate, history_update::HistoryUpdate, peer_update::PeerUpdate,
torrent_update::TorrentUpdate, user_update::UserUpdate, Upsertable,
torrent_update::TorrentUpdate, unregistered_info_hash_update::UnregisteredInfoHashUpdate,
user_update::UserUpdate, Upsertable,
},
warning::AnnounceWarning,
};
Expand Down Expand Up @@ -323,11 +324,27 @@ pub async fn announce(
.cloned();

// Validate torrent
let torrent_id = *tracker
let torrent_id_res = tracker
.infohash2id
.read()
.get(&queries.info_hash)
.ok_or(InfoHashNotFound)?;
.ok_or(InfoHashNotFound)
.cloned();

if let Ok(user) = &user {
if let Err(InfoHashNotFound) = torrent_id_res {
tracker
.unregistered_info_hash_updates
.lock()
.upsert(UnregisteredInfoHashUpdate {
user_id: user.id,
info_hash: queries.info_hash,
updated_at: Utc::now(),
});
}
}

let torrent_id = torrent_id_res?;

let is_connectable = check_connectivity(&tracker, client_ip, queries.port).await;

Expand Down
29 changes: 29 additions & 0 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod announce_update;
pub mod history_update;
pub mod peer_update;
pub mod torrent_update;
pub mod unregistered_info_hash_update;
pub mod user_update;

use crate::tracker::Tracker;
Expand Down Expand Up @@ -41,6 +42,7 @@ pub async fn flush(tracker: &Arc<Tracker>) {
flush_torrent_updates(tracker),
flush_user_updates(tracker),
flush_announce_updates(tracker),
flush_unregistered_info_hash_updates(tracker),
);
}

Expand Down Expand Up @@ -155,6 +157,33 @@ async fn flush_announce_updates(tracker: &Arc<Tracker>) {
}
}

/// Send unregistered info hash updates to mysql database
async fn flush_unregistered_info_hash_updates(tracker: &Arc<Tracker>) {
let unregistered_info_hash_update_batch =
tracker.unregistered_info_hash_updates.lock().take_batch();
let start = Utc::now();
let len = unregistered_info_hash_update_batch.len();
let result = unregistered_info_hash_update_batch
.flush_to_db(&tracker.pool, ())
.await;
let elapsed = Utc::now().signed_duration_since(start).num_milliseconds();

match result {
Ok(_) => {
println!("{start} - Upserted {len} unregistered info hashes in {elapsed} ms.");
}
Err(e) => {
println!(
"{start} - Failed to update {len} unregistered info hashes after {elapsed} ms: {e}"
);
tracker
.unregistered_info_hash_updates
.lock()
.upsert_batch(unregistered_info_hash_update_batch);
}
}
}

/// Remove peers that have not announced for some time
pub async fn reap(tracker: &Arc<Tracker>) {
let ttl = Duration::seconds(tracker.config.active_peer_ttl.try_into().unwrap());
Expand Down
83 changes: 83 additions & 0 deletions src/scheduler/unregistered_info_hash_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::tracker::torrent::InfoHash;
use chrono::{DateTime, Utc};
use sqlx::{MySql, MySqlPool, QueryBuilder};

use super::{Flushable, Mergeable, Upsertable};

#[derive(Eq, Hash, PartialEq)]
pub struct Index {
pub user_id: u32,
pub info_hash: InfoHash,
}

#[derive(Clone)]
pub struct UnregisteredInfoHashUpdate {
pub user_id: u32,
pub info_hash: InfoHash,
pub updated_at: DateTime<Utc>,
}

impl Mergeable for UnregisteredInfoHashUpdate {
fn merge(&mut self, new: &Self) {
if new.updated_at > self.updated_at {
self.updated_at = new.updated_at;
}
}
}

impl Upsertable<UnregisteredInfoHashUpdate> for super::Queue<Index, UnregisteredInfoHashUpdate> {
fn upsert(&mut self, new: UnregisteredInfoHashUpdate) {
self.records
.entry(Index {
user_id: new.user_id,
info_hash: new.info_hash,
})
.and_modify(|unregistered_info_hash_update| {
unregistered_info_hash_update.merge(&new);
})
.or_insert(new);
}
}

impl Flushable<UnregisteredInfoHashUpdate> for super::Batch<Index, UnregisteredInfoHashUpdate> {
type ExtraBindings = ();

async fn flush_to_db(&self, db: &MySqlPool, _extra_bindings: ()) -> Result<u64, sqlx::Error> {
if self.is_empty() {
return Ok(0);
}

let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
r#"
INSERT INTO
unregistered_info_hashes(
user_id,
info_hash,
created_at,
updated_at
)
"#,
);

query_builder
.push_values(self.values(), |mut bind, unregistered_info_hash_update| {
bind.push_bind(unregistered_info_hash_update.user_id)
.push_bind(unregistered_info_hash_update.info_hash.to_vec())
.push_bind(unregistered_info_hash_update.updated_at)
.push_bind(unregistered_info_hash_update.updated_at);
})
.push(
r#"
ON DUPLICATE KEY UPDATE
updated_at = VALUES(updated_at)
"#,
);

query_builder
.build()
.persistent(false)
.execute(db)
.await
.map(|result| result.rows_affected())
}
}
11 changes: 11 additions & 0 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use sqlx::{MySql, MySqlPool, QueryBuilder};
use anyhow::{Context, Result};

use crate::config;
use crate::scheduler::unregistered_info_hash_update::{self, UnregisteredInfoHashUpdate};
use crate::scheduler::{
announce_update,
history_update::{self, HistoryUpdate},
Expand Down Expand Up @@ -50,6 +51,8 @@ pub struct Tracker {
pub stats: Stats,
pub torrents: Mutex<torrent::Map>,
pub torrent_updates: Mutex<Queue<torrent_update::Index, TorrentUpdate>>,
pub unregistered_info_hash_updates:
Mutex<Queue<unregistered_info_hash_update::Index, UnregisteredInfoHashUpdate>>,
pub users: RwLock<user::Map>,
pub user_updates: Mutex<Queue<user_update::Index, UserUpdate>>,
}
Expand Down Expand Up @@ -180,6 +183,14 @@ impl Tracker {
extra_bindings_per_flush: 0,
},
)),
unregistered_info_hash_updates: Mutex::new(Queue::<
unregistered_info_hash_update::Index,
UnregisteredInfoHashUpdate,
>::new(QueueConfig {
max_bindings_per_flush: 65_535,
bindings_per_record: 4,
extra_bindings_per_flush: 0,
})),
users: RwLock::new(users),
user_updates: Mutex::new(Queue::<user_update::Index, UserUpdate>::new(QueueConfig {
max_bindings_per_flush: 65_535,
Expand Down

0 comments on commit 6367f8c

Please sign in to comment.