diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..b58b603 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/geommdb.iml b/.idea/geommdb.iml new file mode 100644 index 0000000..bbe0a70 --- /dev/null +++ b/.idea/geommdb.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..a6d9e55 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/snapshot.bincode b/snapshot.bincode index 36b555a..f5a5dc0 100644 Binary files a/snapshot.bincode and b/snapshot.bincode differ diff --git a/src/network/command.rs b/src/network/command.rs index 61a1d06..504bbc0 100644 --- a/src/network/command.rs +++ b/src/network/command.rs @@ -1,5 +1,5 @@ pub enum Command { - GeoAdd { key: String, lat: f64, lon: f64 }, + GeoAdd { key: String, coords: Vec<(f64, f64)> }, GeoSearch { lat: f64, lon: f64, radius: f64 }, GeoGet { key: String }, Heartbeat, @@ -8,11 +8,19 @@ pub enum Command { pub fn parse_command(input: &str) -> Option { let parts: Vec<&str> = input.trim().split_whitespace().collect(); match parts.as_slice() { - ["GEOADD", key, lat, lon] => Some(Command::GeoAdd { - key: key.to_string(), - lat: lat.parse().ok()?, - lon: lon.parse().ok()?, - }), + ["GEOADD", key, rest @ ..] => { + let coords: Vec<(f64, f64)> = rest + .chunks(2) + .filter_map(|chunk| { + if chunk.len() == 2 { + Some((chunk[0].parse().ok()?, chunk[1].parse().ok()?)) + } else { + None + } + }) + .collect(); + Some(Command::GeoAdd { key: key.to_string(), coords }) + } ["GEOSEARCH", lat, lon, radius] => Some(Command::GeoSearch { lat: lat.parse().ok()?, lon: lon.parse().ok()?, diff --git a/src/network/handler.rs b/src/network/handler.rs index e5000d5..dc1be36 100644 --- a/src/network/handler.rs +++ b/src/network/handler.rs @@ -1,11 +1,11 @@ -use crate::network::command::{parse_command, Command}; -use crate::network::replica::{Replica, Role}; -use crate::persistence::WalEntry; -use log::{error, info}; use std::net::SocketAddr; -use std::sync::{Arc}; +use std::sync::{Arc, Mutex}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; +use log::{info, error}; +use crate::network::command::{Command, parse_command}; +use crate::network::replica::{Replica, Role}; +use crate::persistence::WalEntry; pub async fn handle_client( mut stream: TcpStream, @@ -32,26 +32,20 @@ pub async fn handle_client( info!("Received command: {}", input.trim()); let response = if let Some(command) = parse_command(&input) { match command { - Command::GeoAdd { key, lat, lon } => { + Command::GeoAdd { key, coords } => { + let new_coord = coords.clone(); if let Role::Leader = replica.role { let mut db = replica.db.lock().unwrap(); - db.geo_add(key.clone(), lat, lon); + db.geo_add(key.clone(), coords); let mut persistence = replica.persistence.lock().unwrap(); if let Err(e) = persistence.log_entry(WalEntry::GeoAdd { key: key.clone(), - lat, - lon, + coords: new_coord, }) { error!("Failed to log entry; err = {:?}", e); } - - // Replicate the write to other replicas - // (Simplified: this should be done asynchronously in a real implementation) - info!( - "GeoAdd command processed: key={}, lat={}, lon={}", - key, lat, lon - ); + info!("GeoAdd command processed: key={}", key); "OK\n".to_string() } else { // Forward write requests to the leader @@ -83,9 +77,9 @@ pub async fn handle_client( Command::GeoGet { key } => { let db = replica.db.lock().unwrap(); match db.geo_get(&key) { - Some(point) => { - info!("GeoGet command processed: key={}, lat={}, lon={}", key, point.y(), point.x()); - format!("{} {}\n", point.y(), point.x()) + Some(data) => { + info!("GeoGet command processed: key={}", key); + data + "\n" } None => { info!("GeoGet command: key={} not found", key); @@ -93,7 +87,6 @@ pub async fn handle_client( } } } - Command::Heartbeat => { if let Role::Leader = replica.role { if let Ok(addr) = stream.peer_addr() { diff --git a/src/persistence.rs b/src/persistence.rs index 7944459..499ce58 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -9,7 +9,7 @@ const SNAPSHOT_FILE: &str = "snapshot.bincode"; // A snapshot is a complete copy #[derive(Serialize, Deserialize)] pub enum WalEntry { // Make this enum public - GeoAdd { key: String, lat: f64, lon: f64 }, + GeoAdd { key: String, coords: Vec<(f64, f64)> }, } pub struct Persistence { @@ -44,8 +44,8 @@ impl Persistence { let entry: WalEntry = bincode::deserialize(line.as_bytes()) .map_err(|e| io::Error::new(ErrorKind::Other, e))?; match entry { - WalEntry::GeoAdd { key, lat, lon } => { - db.geo_add(key, lat, lon); + WalEntry::GeoAdd { key, coords } => { + db.geo_add(key, coords); } } } diff --git a/src/storage.rs b/src/storage.rs index 7f48483..4d7f010 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,48 +1,72 @@ -use geo::algorithm::haversine_distance::HaversineDistance; -use geo_types::Point; -use rstar::RTree; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use geo::{HaversineDistance, Point, Polygon}; +use rstar::{RTree, RTreeObject, AABB, PointDistance}; +use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct GeoDatabase { - tree: RTree>, - data: HashMap>, + points: HashMap>, + polygons: HashMap>, + point_tree: RTree>, + polygon_tree: RTree>, } impl GeoDatabase { pub fn new() -> Self { GeoDatabase { - tree: RTree::new(), - data: HashMap::new(), + points: HashMap::new(), + polygons: HashMap::new(), + point_tree: RTree::new(), + polygon_tree: RTree::new(), } } - pub fn geo_add(&mut self, key: String, lat: f64, lon: f64) { - let point = Point::new(lon, lat); - self.tree.insert(point); - self.data.insert(key, point); + pub fn geo_add(&mut self, key: String, coords: Vec<(f64, f64)>) { + if coords.len() == 1 { + let point = Point::new(coords[0].1, coords[0].0); // (lon, lat) + self.point_tree.insert(point); + self.points.insert(key, point); + } else { + let polygon = Polygon::new(coords.into(), vec![]); + self.polygon_tree.insert(polygon.clone()); + self.polygons.insert(key, polygon); + } } pub fn geo_search(&self, lat: f64, lon: f64, radius: f64) -> Vec { let center = Point::new(lon, lat); - self.tree - .nearest_neighbor_iter(¢er) - .filter_map(|p: &Point| { - let distance = p.haversine_distance(¢er); - if distance <= radius { - self.data - .iter() - .find(|&(_, &v)| v == *p) - .map(|(k, _)| k.clone()) - } else { - None - } - }) - .collect() + let mut results = Vec::new(); + + // Search for points within the radius + for point in self.point_tree.nearest_neighbor_iter(¢er).filter(|p| p.haversine_distance(¢er) <= radius) { + if let Some((key, _)) = self.points.iter().find(|(_, &v)| v == *point) { + results.push(key.clone()); + } + } + + // Create an AABB for the search radius + let search_aabb = AABB::from_corners( + Point::new(lon - radius, lat - radius), + Point::new(lon + radius, lat + radius) + ); + + // Search for polygons that intersect with the radius circle + for polygon in self.polygon_tree.locate_in_envelope_intersecting(&search_aabb) { + if let Some((key, _)) = self.polygons.iter().find(|(_, v)| v.clone() == polygon) { + results.push(key.clone()); + } + } + + results } - pub fn geo_get(&self, key: &str) -> Option> { - self.data.get(key).cloned() + pub fn geo_get(&self, key: &str) -> Option { + if let Some(point) = self.points.get(key) { + Some(format!("POINT({} {})", point.y(), point.x())) + } else if let Some(polygon) = self.polygons.get(key) { + Some(format!("POLYGON(({}))", polygon.exterior().points().map(|p| format!("{} {}", p.y(), p.x())).collect::>().join(", "))) + } else { + None + } } }