Skip to content

Commit

Permalink
Fix heartbeat mechanizem
Browse files Browse the repository at this point in the history
  • Loading branch information
ILmoshe committed Jul 23, 2024
1 parent 83d21a8 commit 6d4dde0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .env.replica
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ROLE=replica
THIS_ADDR=127.0.0.1:6377
LEADER_ADDR=127.0.0.1:6379
RUST_LOG=info
RUST_LOG=info
HEARTBEAT_EVERY_X_SECONDS=10
36 changes: 20 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ mod storage;
use dotenv;
use network::replica::Role;
use network::server::start_server;
use std::{env, process};
use std::net::SocketAddr;
use std::{env, process};

extern crate pretty_env_logger;
#[macro_use]
Expand Down Expand Up @@ -52,6 +52,24 @@ fn handle_environment() {
load_env_file(env_file);
}

async fn run_server(leader_addr: SocketAddr, this_addr: SocketAddr) {
let role = env::var("ROLE").unwrap_or_else(|_| "leader".to_string());
match role.as_str() {
"leader" => {
info!("I am LEADER running at {}", leader_addr);
start_server(this_addr, None, Role::Leader).await;
}
"replica" => {
info!("I am replica running at: {}", this_addr);
info!("Leader is running at {}", leader_addr);
start_server(this_addr, Some(leader_addr), Role::Replica).await;
}
_ => {
eprintln!("Invalid role specified. Use 'leader' or 'replica'.");
}
}
}

#[tokio::main]
async fn main() {
handle_environment();
Expand All @@ -68,19 +86,5 @@ async fn main() {
info!("Creating Geomemdb ...");
map_art();

let role = env::var("ROLE").unwrap_or_else(|_| "leader".to_string());
match role.as_str() {
"leader" => {
info!("I am LEADER running at {}", leader_addr);
start_server(this_addr, None, Role::Leader).await;
}
"replica" => {
info!("I am replica running at: {}", this_addr);
info!("Leader is running at {}", leader_addr);
start_server(this_addr, Some(leader_addr), Role::Replica).await;
}
_ => {
eprintln!("Invalid role specified. Use 'leader' or 'replica'.");
}
}
run_server(leader_addr, this_addr).await;
}
31 changes: 25 additions & 6 deletions src/network/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::persistence::Persistence;
use crate::storage::GeoDatabase;
use log::{error, info};
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::{Arc, Mutex};
Expand All @@ -11,7 +12,6 @@ use tokio::time::{sleep, Duration};

const DEAD_REPLICA_TIMEOUT_SECONDS: u64 = 10;


#[derive(Clone, PartialEq)]
pub enum Role {
Leader,
Expand Down Expand Up @@ -75,12 +75,29 @@ impl Replica {
}

pub async fn send_heartbeat(&self) {
let heartbeat_rate = env::var("HEARTBEAT_EVERY_X_SECONDS")
.unwrap_or("5".to_string())
.parse::<u64>()
.unwrap();

if let Some(leader_addr) = self.leader_addr {
let mut stream = TcpStream::connect(leader_addr).await;

loop {
sleep(Duration::from_secs(5)).await;
if let Ok(mut stream) = TcpStream::connect(leader_addr).await {
if stream.write_all(b"HEARTBEAT\n").await.is_ok() {
info!("Sent heartbeat to leader at {}", leader_addr);
sleep(Duration::from_secs(heartbeat_rate)).await;

match &mut stream {
Ok(ref mut stream) => {
if stream.write_all(b"HEARTBEAT\n").await.is_ok() {
info!("Sent heartbeat to leader at {}", leader_addr);
} else {
info!("Failed to send heartbeat, attempting to reconnect...");
*stream = TcpStream::connect(leader_addr).await.unwrap();
}
}
Err(_) => {
info!("Failed to connect to leader, retrying...");
stream = TcpStream::connect(leader_addr).await;
}
}
}
Expand All @@ -95,7 +112,8 @@ impl Replica {
let replicas = self.replicas.lock().unwrap();
let now = std::time::Instant::now();
for (addr, last_heartbeat) in replicas.iter() {
if now.duration_since(*last_heartbeat).as_secs() > DEAD_REPLICA_TIMEOUT_SECONDS {
if now.duration_since(*last_heartbeat).as_secs() > DEAD_REPLICA_TIMEOUT_SECONDS
{
info!("Replica at {} is considered dead", addr);
replicas_to_remove.push(*addr);
}
Expand All @@ -110,6 +128,7 @@ impl Replica {
}

pub async fn handle_heartbeat(&self, addr: SocketAddr) {
info!("Heartbeat from replica at {}", addr);
let mut replicas = self.replicas.lock().unwrap();
replicas.insert(addr, std::time::Instant::now());
}
Expand Down
5 changes: 4 additions & 1 deletion src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ pub async fn start_server(addr: SocketAddr, leader_addr: Option<SocketAddr>, rol
let replica = Arc::new(Replica::new(addr, role.clone(), leader_addr).await);

if let Role::Replica = role {
// when we say replica we mean follower
let replica_clone = Arc::clone(&replica);
tokio::spawn(async move { replica_clone.send_heartbeat().await; });
tokio::spawn(async move {
replica_clone.send_heartbeat().await;
});
} else if let Role::Leader = role {
let replica_clone = Arc::clone(&replica);
tokio::spawn(async move {
Expand Down

0 comments on commit 6d4dde0

Please sign in to comment.