Skip to content

Commit

Permalink
Format code
Browse files Browse the repository at this point in the history
  • Loading branch information
ILmoshe committed Jul 16, 2024
1 parent 755bed4 commit 7c0bdb7
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 29 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ Cargo.lock
node_modules

wal.log
snapshot.x
snapshot.bincode
.idea
11 changes: 7 additions & 4 deletions clients/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ def send_command(command: str):


if __name__ == "__main__":
send_command("GEOADD location1 37.7749 -122.4194")
send_command("GEOADD location2 34.0522 -118.2437")
# send_command("GEOSEARCH 37.7749 -122.4194 500000\n")
send_command("GEOGET location1")
# send_command("GEOADD location1 37.7749 -122.4194")
# send_command("GEOADD location2 34.0522 -118.2437")
# # send_command("GEOSEARCH 37.7749 -122.4194 500000\n")
# send_command("GEOGET location1")
# # send command geoadd polygon
# send_command("GEOADD polygon1 37.7749 -122.4194 37.7749 -122.4194 37.7749 -122.4194 37.7749 -122.4194")
send_command("GEOGET polygon1")
Binary file removed snapshot.bincode
Binary file not shown.
1 change: 1 addition & 0 deletions src/geospatial.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod network;
pub mod geospatial;
pub mod network;
pub mod persistence;
pub mod storage;
pub mod storage;
20 changes: 16 additions & 4 deletions src/network/command.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
pub enum Command {
GeoAdd { key: String, coords: Vec<(f64, f64)> },
GeoSearch { lat: f64, lon: f64, radius: f64 },
GeoGet { key: String },
GeoAdd {
key: String,
coords: Vec<(f64, f64)>,
},
GeoSearch {
lat: f64,
lon: f64,
radius: f64,
},
GeoGet {
key: String,
},
Heartbeat,
}

Expand All @@ -19,7 +28,10 @@ pub fn parse_command(input: &str) -> Option<Command> {
}
})
.collect();
Some(Command::GeoAdd { key: key.to_string(), coords })
Some(Command::GeoAdd {
key: key.to_string(),
coords,
})
}
["GEOSEARCH", lat, lon, radius] => Some(Command::GeoSearch {
lat: lat.parse().ok()?,
Expand Down
12 changes: 7 additions & 5 deletions src/network/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::network::command::{parse_command, Command};
use crate::network::replica::{Replica, Role};
use crate::persistence::WalEntry;
use log::{error, info};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use log::{info, error};
use crate::network::command::{Command, parse_command};
use crate::network::replica::{Replica, Role};
use crate::persistence::WalEntry;

pub async fn handle_client(
mut stream: TcpStream,
Expand Down Expand Up @@ -34,11 +34,12 @@ pub async fn handle_client(
match command {
Command::GeoAdd { key, coords } => {
let new_coord = coords.clone();

if let Role::Leader = replica.role {
let mut db = replica.db.lock().unwrap();
db.geo_add(key.clone(), coords);
let mut persistence = replica.persistence.lock().unwrap();

let mut persistence = replica.persistence.lock().unwrap();
if let Err(e) = persistence.log_entry(WalEntry::GeoAdd {
key: key.clone(),
coords: new_coord,
Expand All @@ -51,6 +52,7 @@ pub async fn handle_client(
// Forward write requests to the leader
if let Some(leader_addr) = leader_addr {
if let Ok(mut leader_stream) = TcpStream::connect(leader_addr).await {
// TODO: Do we actually need to connect to the leader in every request?, heartbeat can ensure that we are connected to the leader.
leader_stream.write_all(input.as_bytes()).await.unwrap();
let mut leader_response = [0; 1024];
let n = leader_stream.read(&mut leader_response).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod command;
pub mod handler;
pub mod replica;
pub mod server;
pub mod server;
4 changes: 2 additions & 2 deletions src/network/server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::network::handler::handle_client;
use crate::network::replica::{Replica, Role};
use crate::persistence::Persistence;
use std::net::SocketAddr;
use std::sync::{Arc};
use log::info;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::signal;

Expand Down
5 changes: 4 additions & 1 deletion src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ const SNAPSHOT_FILE: &str = "snapshot.bincode"; // A snapshot is a complete copy
#[derive(Serialize, Deserialize)]
pub enum WalEntry {
// Make this enum public
GeoAdd { key: String, coords: Vec<(f64, f64)> },
GeoAdd {
key: String,
coords: Vec<(f64, f64)>,
},
}

pub struct Persistence {
Expand Down
27 changes: 21 additions & 6 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use geo::{HaversineDistance, Point, Polygon};
use rstar::{RTree, RTreeObject, AABB, PointDistance};
use rstar::{PointDistance, RTree, RTreeObject, AABB};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize)]
pub struct GeoDatabase {
Expand Down Expand Up @@ -38,7 +38,11 @@ impl GeoDatabase {
let mut results = Vec::new();

// Search for points within the radius
for point in self.point_tree.nearest_neighbor_iter(&center).filter(|p| p.haversine_distance(&center) <= radius) {
for point in self
.point_tree
.nearest_neighbor_iter(&center)
.filter(|p| p.haversine_distance(&center) <= radius)
{
if let Some((key, _)) = self.points.iter().find(|(_, &v)| v == *point) {
results.push(key.clone());
}
Expand All @@ -47,11 +51,14 @@ impl GeoDatabase {
// Create an AABB for the search radius
let search_aabb = AABB::from_corners(
Point::new(lon - radius, lat - radius),
Point::new(lon + radius, lat + radius)
Point::new(lon + radius, lat + radius),
);

// Search for polygons that intersect with the radius circle
for polygon in self.polygon_tree.locate_in_envelope_intersecting(&search_aabb) {
for polygon in self
.polygon_tree
.locate_in_envelope_intersecting(&search_aabb)
{
if let Some((key, _)) = self.polygons.iter().find(|(_, v)| v.clone() == polygon) {
results.push(key.clone());
}
Expand All @@ -64,7 +71,15 @@ impl GeoDatabase {
if let Some(point) = self.points.get(key) {
Some(format!("POINT({} {})", point.y(), point.x()))
} else if let Some(polygon) = self.polygons.get(key) {
Some(format!("POLYGON(({}))", polygon.exterior().points().map(|p| format!("{} {}", p.y(), p.x())).collect::<Vec<_>>().join(", ")))
Some(format!(
"POLYGON(({}))",
polygon
.exterior()
.points()
.map(|p| format!("{} {}", p.y(), p.x()))
.collect::<Vec<_>>()
.join(", ")
))
} else {
None
}
Expand Down
3 changes: 0 additions & 3 deletions tests/leader_replica_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod common;
async fn test_leader_replica_interaction() {
dotenv::from_filename(".env.test").ok();


let leader_addr = "127.0.0.1:6379".parse().unwrap();
// let replica_addr = "127.0.0.1:6380".parse().unwrap();

Expand Down Expand Up @@ -42,5 +41,3 @@ async fn test_leader_replica_interaction() {

sleep(Duration::from_secs(1)).await;
}


0 comments on commit 7c0bdb7

Please sign in to comment.