From 7c0bdb7a4f8d51037d2c5d289f6abc05f422bba7 Mon Sep 17 00:00:00 2001 From: Moshe Mizrachi Date: Tue, 16 Jul 2024 23:12:43 +0300 Subject: [PATCH] Format code --- .gitignore | 3 ++- clients/python/main.py | 11 +++++++---- snapshot.bincode | Bin 1218 -> 0 bytes src/geospatial.rs | 1 + src/lib.rs | 4 ++-- src/network/command.rs | 20 ++++++++++++++++---- src/network/handler.rs | 12 +++++++----- src/network/mod.rs | 2 +- src/network/server.rs | 4 ++-- src/persistence.rs | 5 ++++- src/storage.rs | 27 +++++++++++++++++++++------ tests/leader_replica_tests.rs | 3 --- 12 files changed, 63 insertions(+), 29 deletions(-) delete mode 100644 snapshot.bincode diff --git a/.gitignore b/.gitignore index 476cc7f..7f286d3 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ Cargo.lock node_modules wal.log -snapshot.x \ No newline at end of file +snapshot.bincode +.idea \ No newline at end of file diff --git a/clients/python/main.py b/clients/python/main.py index 24be7db..ea118cf 100644 --- a/clients/python/main.py +++ b/clients/python/main.py @@ -19,7 +19,10 @@ def send_command(command: str): if __name__ == "__main__": - send_command("GEOADD location1 37.7749 -122.4194") - send_command("GEOADD location2 34.0522 -118.2437") - # send_command("GEOSEARCH 37.7749 -122.4194 500000\n") - send_command("GEOGET location1") + # send_command("GEOADD location1 37.7749 -122.4194") + # send_command("GEOADD location2 34.0522 -118.2437") + # # send_command("GEOSEARCH 37.7749 -122.4194 500000\n") + # send_command("GEOGET location1") + # # send command geoadd polygon + # send_command("GEOADD polygon1 37.7749 -122.4194 37.7749 -122.4194 37.7749 -122.4194 37.7749 -122.4194") + send_command("GEOGET polygon1") diff --git a/snapshot.bincode b/snapshot.bincode deleted file mode 100644 index f5a5dc00910d4529d17a9df699f8636ba2755ec9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1218 zcmZQ#fB;S?os*xOSdy8aXXM!Q==hBO*aKg3!fMyCIXa*#Fbw!3QG9(?+<^;M!`|pW zc5;9kz`z7$Fal{7BsMcl0g%R~ml!d0HRya~J3-c=%TpqTZWcO!q}q)h4(NR3Z~?KA z(*b%S1H}(9V(4nn`N(z>YbLSk(CtF!k5s$S!vUQSOBb*L1tt$GKjc6WfCT=7!TWkN M8p8em{y$790P8;RYybcN diff --git a/src/geospatial.rs b/src/geospatial.rs index e69de29..8b13789 100644 --- a/src/geospatial.rs +++ b/src/geospatial.rs @@ -0,0 +1 @@ + diff --git a/src/lib.rs b/src/lib.rs index 374ab47..275438e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -pub mod network; pub mod geospatial; +pub mod network; pub mod persistence; -pub mod storage; \ No newline at end of file +pub mod storage; diff --git a/src/network/command.rs b/src/network/command.rs index 504bbc0..4fff00d 100644 --- a/src/network/command.rs +++ b/src/network/command.rs @@ -1,7 +1,16 @@ pub enum Command { - GeoAdd { key: String, coords: Vec<(f64, f64)> }, - GeoSearch { lat: f64, lon: f64, radius: f64 }, - GeoGet { key: String }, + GeoAdd { + key: String, + coords: Vec<(f64, f64)>, + }, + GeoSearch { + lat: f64, + lon: f64, + radius: f64, + }, + GeoGet { + key: String, + }, Heartbeat, } @@ -19,7 +28,10 @@ pub fn parse_command(input: &str) -> Option { } }) .collect(); - Some(Command::GeoAdd { key: key.to_string(), coords }) + Some(Command::GeoAdd { + key: key.to_string(), + coords, + }) } ["GEOSEARCH", lat, lon, radius] => Some(Command::GeoSearch { lat: lat.parse().ok()?, diff --git a/src/network/handler.rs b/src/network/handler.rs index dc1be36..79116bf 100644 --- a/src/network/handler.rs +++ b/src/network/handler.rs @@ -1,11 +1,11 @@ +use crate::network::command::{parse_command, Command}; +use crate::network::replica::{Replica, Role}; +use crate::persistence::WalEntry; +use log::{error, info}; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; -use log::{info, error}; -use crate::network::command::{Command, parse_command}; -use crate::network::replica::{Replica, Role}; -use crate::persistence::WalEntry; pub async fn handle_client( mut stream: TcpStream, @@ -34,11 +34,12 @@ pub async fn handle_client( match command { Command::GeoAdd { key, coords } => { let new_coord = coords.clone(); + if let Role::Leader = replica.role { let mut db = replica.db.lock().unwrap(); db.geo_add(key.clone(), coords); - let mut persistence = replica.persistence.lock().unwrap(); + let mut persistence = replica.persistence.lock().unwrap(); if let Err(e) = persistence.log_entry(WalEntry::GeoAdd { key: key.clone(), coords: new_coord, @@ -51,6 +52,7 @@ pub async fn handle_client( // Forward write requests to the leader if let Some(leader_addr) = leader_addr { if let Ok(mut leader_stream) = TcpStream::connect(leader_addr).await { + // TODO: Do we actually need to connect to the leader in every request?, heartbeat can ensure that we are connected to the leader. leader_stream.write_all(input.as_bytes()).await.unwrap(); let mut leader_response = [0; 1024]; let n = leader_stream.read(&mut leader_response).await.unwrap(); diff --git a/src/network/mod.rs b/src/network/mod.rs index 84646c8..e7c6f52 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -1,4 +1,4 @@ pub mod command; pub mod handler; pub mod replica; -pub mod server; \ No newline at end of file +pub mod server; diff --git a/src/network/server.rs b/src/network/server.rs index 96119bb..290ac24 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -1,9 +1,9 @@ use crate::network::handler::handle_client; use crate::network::replica::{Replica, Role}; use crate::persistence::Persistence; -use std::net::SocketAddr; -use std::sync::{Arc}; use log::info; +use std::net::SocketAddr; +use std::sync::Arc; use tokio::net::TcpListener; use tokio::signal; diff --git a/src/persistence.rs b/src/persistence.rs index 499ce58..50e7715 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -9,7 +9,10 @@ const SNAPSHOT_FILE: &str = "snapshot.bincode"; // A snapshot is a complete copy #[derive(Serialize, Deserialize)] pub enum WalEntry { // Make this enum public - GeoAdd { key: String, coords: Vec<(f64, f64)> }, + GeoAdd { + key: String, + coords: Vec<(f64, f64)>, + }, } pub struct Persistence { diff --git a/src/storage.rs b/src/storage.rs index 4d7f010..5f9e432 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,7 +1,7 @@ -use std::collections::HashMap; use geo::{HaversineDistance, Point, Polygon}; -use rstar::{RTree, RTreeObject, AABB, PointDistance}; +use rstar::{PointDistance, RTree, RTreeObject, AABB}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; #[derive(Debug, Serialize, Deserialize)] pub struct GeoDatabase { @@ -38,7 +38,11 @@ impl GeoDatabase { let mut results = Vec::new(); // Search for points within the radius - for point in self.point_tree.nearest_neighbor_iter(¢er).filter(|p| p.haversine_distance(¢er) <= radius) { + for point in self + .point_tree + .nearest_neighbor_iter(¢er) + .filter(|p| p.haversine_distance(¢er) <= radius) + { if let Some((key, _)) = self.points.iter().find(|(_, &v)| v == *point) { results.push(key.clone()); } @@ -47,11 +51,14 @@ impl GeoDatabase { // Create an AABB for the search radius let search_aabb = AABB::from_corners( Point::new(lon - radius, lat - radius), - Point::new(lon + radius, lat + radius) + Point::new(lon + radius, lat + radius), ); // Search for polygons that intersect with the radius circle - for polygon in self.polygon_tree.locate_in_envelope_intersecting(&search_aabb) { + for polygon in self + .polygon_tree + .locate_in_envelope_intersecting(&search_aabb) + { if let Some((key, _)) = self.polygons.iter().find(|(_, v)| v.clone() == polygon) { results.push(key.clone()); } @@ -64,7 +71,15 @@ impl GeoDatabase { if let Some(point) = self.points.get(key) { Some(format!("POINT({} {})", point.y(), point.x())) } else if let Some(polygon) = self.polygons.get(key) { - Some(format!("POLYGON(({}))", polygon.exterior().points().map(|p| format!("{} {}", p.y(), p.x())).collect::>().join(", "))) + Some(format!( + "POLYGON(({}))", + polygon + .exterior() + .points() + .map(|p| format!("{} {}", p.y(), p.x())) + .collect::>() + .join(", ") + )) } else { None } diff --git a/tests/leader_replica_tests.rs b/tests/leader_replica_tests.rs index 917013e..a48e31a 100644 --- a/tests/leader_replica_tests.rs +++ b/tests/leader_replica_tests.rs @@ -10,7 +10,6 @@ mod common; async fn test_leader_replica_interaction() { dotenv::from_filename(".env.test").ok(); - let leader_addr = "127.0.0.1:6379".parse().unwrap(); // let replica_addr = "127.0.0.1:6380".parse().unwrap(); @@ -42,5 +41,3 @@ async fn test_leader_replica_interaction() { sleep(Duration::from_secs(1)).await; } - -