Skip to content

Commit

Permalink
Add conifg for replica and for leader
Browse files Browse the repository at this point in the history
  • Loading branch information
ILmoshe committed Jul 23, 2024
1 parent dd39060 commit 83d21a8
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 11 deletions.
File renamed without changes.
4 changes: 4 additions & 0 deletions .env.replica
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ROLE=replica
THIS_ADDR=127.0.0.1:6377
LEADER_ADDR=127.0.0.1:6379
RUST_LOG=info
31 changes: 25 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod storage;
use dotenv;
use network::replica::Role;
use network::server::start_server;
use std::env;
use std::{env, process};
use std::net::SocketAddr;

extern crate pretty_env_logger;
Expand Down Expand Up @@ -36,14 +36,26 @@ fn map_art() {
println!("{}", world_map)
}

fn load_env_file(file: &str) {
println!("Loading environment from {}", file);
dotenv::from_filename(file).expect("Error loading {} file");
}

fn handle_environment() {
let args: Vec<String> = env::args().collect();
if args.len() != 2 {
println!("BAD USAGE");
eprintln!("USAGE: {} <env_file>", &args[0]);
process::exit(1);
}
let env_file = &args[1];
load_env_file(env_file);
}

#[tokio::main]
async fn main() {
map_art();
dotenv::from_filename(".env").ok();
pretty_env_logger::init();
info!("Creating geomemdb ...");
handle_environment();

let role = env::var("ROLE").unwrap_or_else(|_| "leader".to_string());
let leader_addr: SocketAddr = env::var("LEADER_ADDR")
.unwrap_or_else(|_| "127.0.0.1:6379".to_string())
.parse()
Expand All @@ -52,12 +64,19 @@ async fn main() {
.unwrap_or_else(|_| "127.0.0.1:6379".to_string())
.parse()
.unwrap();
pretty_env_logger::init();
info!("Creating Geomemdb ...");
map_art();

let role = env::var("ROLE").unwrap_or_else(|_| "leader".to_string());
match role.as_str() {
"leader" => {
info!("I am LEADER running at {}", leader_addr);
start_server(this_addr, None, Role::Leader).await;
}
"replica" => {
info!("I am replica running at: {}", this_addr);
info!("Leader is running at {}", leader_addr);
start_server(this_addr, Some(leader_addr), Role::Replica).await;
}
_ => {
Expand Down
7 changes: 5 additions & 2 deletions src/network/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};

const DEAD_REPLICA_TIMEOUT_SECONDS: u64 = 10;


#[derive(Clone, PartialEq)]
pub enum Role {
Leader,
Expand Down Expand Up @@ -86,13 +89,13 @@ impl Replica {

pub async fn monitor_replicas(&self) {
loop {
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(10)).await; // checks every 10 seconds
let mut replicas_to_remove = Vec::new();
{
let replicas = self.replicas.lock().unwrap();
let now = std::time::Instant::now();
for (addr, last_heartbeat) in replicas.iter() {
if now.duration_since(*last_heartbeat).as_secs() > 10 {
if now.duration_since(*last_heartbeat).as_secs() > DEAD_REPLICA_TIMEOUT_SECONDS {
info!("Replica at {} is considered dead", addr);
replicas_to_remove.push(*addr);
}
Expand Down
4 changes: 1 addition & 3 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ pub async fn start_server(addr: SocketAddr, leader_addr: Option<SocketAddr>, rol

if let Role::Replica = role {
let replica_clone = Arc::clone(&replica);
tokio::spawn(async move {
replica_clone.send_heartbeat().await;
});
tokio::spawn(async move { replica_clone.send_heartbeat().await; });
} else if let Role::Leader = role {
let replica_clone = Arc::clone(&replica);
tokio::spawn(async move {
Expand Down

0 comments on commit 83d21a8

Please sign in to comment.