diff --git a/.env b/.env.leader similarity index 100% rename from .env rename to .env.leader diff --git a/.env.replica b/.env.replica new file mode 100644 index 0000000..7a28769 --- /dev/null +++ b/.env.replica @@ -0,0 +1,4 @@ +ROLE=replica +THIS_ADDR=127.0.0.1:6377 +LEADER_ADDR=127.0.0.1:6379 +RUST_LOG=info \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 22cb4ce..19bf880 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod storage; use dotenv; use network::replica::Role; use network::server::start_server; -use std::env; +use std::{env, process}; use std::net::SocketAddr; extern crate pretty_env_logger; @@ -36,14 +36,26 @@ fn map_art() { println!("{}", world_map) } +fn load_env_file(file: &str) { + println!("Loading environment from {}", file); + dotenv::from_filename(file).expect("Error loading {} file"); +} + +fn handle_environment() { + let args: Vec = env::args().collect(); + if args.len() != 2 { + println!("BAD USAGE"); + eprintln!("USAGE: {} ", &args[0]); + process::exit(1); + } + let env_file = &args[1]; + load_env_file(env_file); +} + #[tokio::main] async fn main() { - map_art(); - dotenv::from_filename(".env").ok(); - pretty_env_logger::init(); - info!("Creating geomemdb ..."); + handle_environment(); - let role = env::var("ROLE").unwrap_or_else(|_| "leader".to_string()); let leader_addr: SocketAddr = env::var("LEADER_ADDR") .unwrap_or_else(|_| "127.0.0.1:6379".to_string()) .parse() @@ -52,12 +64,19 @@ async fn main() { .unwrap_or_else(|_| "127.0.0.1:6379".to_string()) .parse() .unwrap(); + pretty_env_logger::init(); + info!("Creating Geomemdb ..."); + map_art(); + let role = env::var("ROLE").unwrap_or_else(|_| "leader".to_string()); match role.as_str() { "leader" => { + info!("I am LEADER running at {}", leader_addr); start_server(this_addr, None, Role::Leader).await; } "replica" => { + info!("I am replica running at: {}", this_addr); + info!("Leader is running at {}", leader_addr); start_server(this_addr, Some(leader_addr), Role::Replica).await; } _ => { diff --git a/src/network/replica.rs b/src/network/replica.rs index a386f58..ed4da7c 100644 --- a/src/network/replica.rs +++ b/src/network/replica.rs @@ -9,6 +9,9 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use tokio::time::{sleep, Duration}; +const DEAD_REPLICA_TIMEOUT_SECONDS: u64 = 10; + + #[derive(Clone, PartialEq)] pub enum Role { Leader, @@ -86,13 +89,13 @@ impl Replica { pub async fn monitor_replicas(&self) { loop { - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(10)).await; // checks every 10 seconds let mut replicas_to_remove = Vec::new(); { let replicas = self.replicas.lock().unwrap(); let now = std::time::Instant::now(); for (addr, last_heartbeat) in replicas.iter() { - if now.duration_since(*last_heartbeat).as_secs() > 10 { + if now.duration_since(*last_heartbeat).as_secs() > DEAD_REPLICA_TIMEOUT_SECONDS { info!("Replica at {} is considered dead", addr); replicas_to_remove.push(*addr); } diff --git a/src/network/server.rs b/src/network/server.rs index 290ac24..05eb00e 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -15,9 +15,7 @@ pub async fn start_server(addr: SocketAddr, leader_addr: Option, rol if let Role::Replica = role { let replica_clone = Arc::clone(&replica); - tokio::spawn(async move { - replica_clone.send_heartbeat().await; - }); + tokio::spawn(async move { replica_clone.send_heartbeat().await; }); } else if let Role::Leader = role { let replica_clone = Arc::clone(&replica); tokio::spawn(async move {