Skip to content

Commit

Permalink
Add Polygon
Browse files Browse the repository at this point in the history
  • Loading branch information
ILmoshe committed Jul 15, 2024
1 parent bb2c722 commit 755bed4
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 58 deletions.
5 changes: 5 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/geommdb.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified snapshot.bincode
Binary file not shown.
20 changes: 14 additions & 6 deletions src/network/command.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub enum Command {
GeoAdd { key: String, lat: f64, lon: f64 },
GeoAdd { key: String, coords: Vec<(f64, f64)> },
GeoSearch { lat: f64, lon: f64, radius: f64 },
GeoGet { key: String },
Heartbeat,
Expand All @@ -8,11 +8,19 @@ pub enum Command {
pub fn parse_command(input: &str) -> Option<Command> {
let parts: Vec<&str> = input.trim().split_whitespace().collect();
match parts.as_slice() {
["GEOADD", key, lat, lon] => Some(Command::GeoAdd {
key: key.to_string(),
lat: lat.parse().ok()?,
lon: lon.parse().ok()?,
}),
["GEOADD", key, rest @ ..] => {
let coords: Vec<(f64, f64)> = rest
.chunks(2)
.filter_map(|chunk| {
if chunk.len() == 2 {
Some((chunk[0].parse().ok()?, chunk[1].parse().ok()?))
} else {
None
}
})
.collect();
Some(Command::GeoAdd { key: key.to_string(), coords })
}
["GEOSEARCH", lat, lon, radius] => Some(Command::GeoSearch {
lat: lat.parse().ok()?,
lon: lon.parse().ok()?,
Expand Down
33 changes: 13 additions & 20 deletions src/network/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::network::command::{parse_command, Command};
use crate::network::replica::{Replica, Role};
use crate::persistence::WalEntry;
use log::{error, info};
use std::net::SocketAddr;
use std::sync::{Arc};
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use log::{info, error};
use crate::network::command::{Command, parse_command};
use crate::network::replica::{Replica, Role};
use crate::persistence::WalEntry;

pub async fn handle_client(
mut stream: TcpStream,
Expand All @@ -32,26 +32,20 @@ pub async fn handle_client(
info!("Received command: {}", input.trim());
let response = if let Some(command) = parse_command(&input) {
match command {
Command::GeoAdd { key, lat, lon } => {
Command::GeoAdd { key, coords } => {
let new_coord = coords.clone();
if let Role::Leader = replica.role {
let mut db = replica.db.lock().unwrap();
db.geo_add(key.clone(), lat, lon);
db.geo_add(key.clone(), coords);
let mut persistence = replica.persistence.lock().unwrap();

if let Err(e) = persistence.log_entry(WalEntry::GeoAdd {
key: key.clone(),
lat,
lon,
coords: new_coord,
}) {
error!("Failed to log entry; err = {:?}", e);
}

// Replicate the write to other replicas
// (Simplified: this should be done asynchronously in a real implementation)
info!(
"GeoAdd command processed: key={}, lat={}, lon={}",
key, lat, lon
);
info!("GeoAdd command processed: key={}", key);
"OK\n".to_string()
} else {
// Forward write requests to the leader
Expand Down Expand Up @@ -83,17 +77,16 @@ pub async fn handle_client(
Command::GeoGet { key } => {
let db = replica.db.lock().unwrap();
match db.geo_get(&key) {
Some(point) => {
info!("GeoGet command processed: key={}, lat={}, lon={}", key, point.y(), point.x());
format!("{} {}\n", point.y(), point.x())
Some(data) => {
info!("GeoGet command processed: key={}", key);
data + "\n"
}
None => {
info!("GeoGet command: key={} not found", key);
"Not Found\n".to_string()
}
}
}

Command::Heartbeat => {
if let Role::Leader = replica.role {
if let Ok(addr) = stream.peer_addr() {
Expand Down
6 changes: 3 additions & 3 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const SNAPSHOT_FILE: &str = "snapshot.bincode"; // A snapshot is a complete copy
#[derive(Serialize, Deserialize)]
pub enum WalEntry {
// Make this enum public
GeoAdd { key: String, lat: f64, lon: f64 },
GeoAdd { key: String, coords: Vec<(f64, f64)> },
}

pub struct Persistence {
Expand Down Expand Up @@ -44,8 +44,8 @@ impl Persistence {
let entry: WalEntry = bincode::deserialize(line.as_bytes())
.map_err(|e| io::Error::new(ErrorKind::Other, e))?;
match entry {
WalEntry::GeoAdd { key, lat, lon } => {
db.geo_add(key, lat, lon);
WalEntry::GeoAdd { key, coords } => {
db.geo_add(key, coords);
}
}
}
Expand Down
82 changes: 53 additions & 29 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,72 @@
use geo::algorithm::haversine_distance::HaversineDistance;
use geo_types::Point;
use rstar::RTree;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use geo::{HaversineDistance, Point, Polygon};
use rstar::{RTree, RTreeObject, AABB, PointDistance};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct GeoDatabase {
tree: RTree<Point<f64>>,
data: HashMap<String, Point<f64>>,
points: HashMap<String, Point<f64>>,
polygons: HashMap<String, Polygon<f64>>,
point_tree: RTree<Point<f64>>,
polygon_tree: RTree<Polygon<f64>>,
}

impl GeoDatabase {
pub fn new() -> Self {
GeoDatabase {
tree: RTree::new(),
data: HashMap::new(),
points: HashMap::new(),
polygons: HashMap::new(),
point_tree: RTree::new(),
polygon_tree: RTree::new(),
}
}

pub fn geo_add(&mut self, key: String, lat: f64, lon: f64) {
let point = Point::new(lon, lat);
self.tree.insert(point);
self.data.insert(key, point);
pub fn geo_add(&mut self, key: String, coords: Vec<(f64, f64)>) {
if coords.len() == 1 {
let point = Point::new(coords[0].1, coords[0].0); // (lon, lat)
self.point_tree.insert(point);
self.points.insert(key, point);
} else {
let polygon = Polygon::new(coords.into(), vec![]);
self.polygon_tree.insert(polygon.clone());
self.polygons.insert(key, polygon);
}
}

pub fn geo_search(&self, lat: f64, lon: f64, radius: f64) -> Vec<String> {
let center = Point::new(lon, lat);
self.tree
.nearest_neighbor_iter(&center)
.filter_map(|p: &Point<f64>| {
let distance = p.haversine_distance(&center);
if distance <= radius {
self.data
.iter()
.find(|&(_, &v)| v == *p)
.map(|(k, _)| k.clone())
} else {
None
}
})
.collect()
let mut results = Vec::new();

// Search for points within the radius
for point in self.point_tree.nearest_neighbor_iter(&center).filter(|p| p.haversine_distance(&center) <= radius) {
if let Some((key, _)) = self.points.iter().find(|(_, &v)| v == *point) {
results.push(key.clone());
}
}

// Create an AABB for the search radius
let search_aabb = AABB::from_corners(
Point::new(lon - radius, lat - radius),
Point::new(lon + radius, lat + radius)
);

// Search for polygons that intersect with the radius circle
for polygon in self.polygon_tree.locate_in_envelope_intersecting(&search_aabb) {
if let Some((key, _)) = self.polygons.iter().find(|(_, v)| v.clone() == polygon) {
results.push(key.clone());
}
}

results
}

pub fn geo_get(&self, key: &str) -> Option<Point<f64>> {
self.data.get(key).cloned()
pub fn geo_get(&self, key: &str) -> Option<String> {
if let Some(point) = self.points.get(key) {
Some(format!("POINT({} {})", point.y(), point.x()))
} else if let Some(polygon) = self.polygons.get(key) {
Some(format!("POLYGON(({}))", polygon.exterior().points().map(|p| format!("{} {}", p.y(), p.x())).collect::<Vec<_>>().join(", ")))
} else {
None
}
}
}

0 comments on commit 755bed4

Please sign in to comment.