Skip to content

Commit

Permalink
lilith: periodic purge strategy changed
Browse files Browse the repository at this point in the history
Previously we were picking N random hosts which doesn't ensure that we will check all hosts for liveness. The new strategy uses a ring buffer where we always check the first N hosts and then push live ones back at the end, ensuring that we will test all of our hosts over time. New hosts are appended at the end of ring buffer, since the are the most recent, hence live ones
  • Loading branch information
aggstam committed Nov 14, 2023
1 parent af7c280 commit fb0671b
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions bin/lilith/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
path::Path,
process::exit,
sync::Arc,
Expand All @@ -28,7 +28,7 @@ use futures::future::join_all;
use log::{debug, error, info, warn};
use semver::Version;
use smol::{
lock::{Mutex, MutexGuard},
lock::{Mutex, MutexGuard, RwLock},
stream::StreamExt,
Executor,
};
Expand Down Expand Up @@ -147,21 +147,47 @@ impl Lilith {
/// for a specific P2P network.
async fn periodic_purge(name: String, p2p: P2pPtr, ex: Arc<Executor<'_>>) -> Result<()> {
info!(target: "lilith", "Starting periodic host purge task for \"{}\"", name);

// Initialize a growable ring buffer(VecDeque) to store known hosts
let ring_buffer = Arc::new(RwLock::new(VecDeque::<Url>::new()));
loop {
// We'll pick up to PROBE_HOSTS_N hosts every PURGE_PERIOD and try to
// connect to them. If we can't reach them, remove them from our set.
// Wait for next purge period
sleep(PURGE_PERIOD).await;
debug!(target: "lilith", "[{}] Picking random hosts from db", name);
debug!(target: "lilith", "[{}] The Purge has started...", name);

// Check if new hosts exist and add them to the end of the ring buffer
let mut lock = ring_buffer.write().await;
let hosts = p2p.clone().hosts().fetch_all().await;
if hosts.len() != lock.len() {
// Since hosts are stored in a HashSet we have to check all of them
for host in hosts {
if !lock.contains(&host) {
lock.push_back(host);
}
}
}

// Pick first up to PROBE_HOSTS_N hosts from the ring buffer
let mut purgers = vec![];
let mut index = 0;
while index <= PROBE_HOSTS_N {
match lock.pop_front() {
Some(host) => purgers.push(host),
None => break,
};
index += 1;
}

let lottery_winners = p2p.clone().hosts().fetch_n_random(PROBE_HOSTS_N).await;
let win_str: Vec<&str> = lottery_winners.iter().map(|x| x.as_str()).collect();
debug!(target: "lilith", "[{}] Got: {:?}", name, win_str);
// Try to connect to them. If we can't reach them, remove them from our set.
let purgers_str: Vec<&str> = purgers.iter().map(|x| x.as_str()).collect();
debug!(target: "lilith", "[{}] Got: {:?}", name, purgers_str);

let mut tasks = vec![];

for host in &lottery_winners {
for host in &purgers {
let p2p_ = p2p.clone();
let ex_ = ex.clone();
let ring_buffer_ = ring_buffer.clone();
tasks.push(async move {
let session_out = p2p_.session_outbound();
let session_weak = Arc::downgrade(&session_out);
Expand Down Expand Up @@ -190,6 +216,8 @@ impl Lilith {
Ok(()) => {
debug!(target: "lilith", "Handshake success! Stopping channel.");
channel.stop().await;
// Push host back to the ring buffer
ring_buffer_.write().await.push_back(host.clone());
}
Err(e) => {
debug!(target: "lilith", "Handshake failure! {}", e);
Expand Down

0 comments on commit fb0671b

Please sign in to comment.