diff --git a/src/coordinator.rs b/src/coordinator.rs index 53a3675..ec1a53c 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -1,29 +1,23 @@ use std::collections::BinaryHeap; -use std::io::ErrorKind; +use std::fs::{File, OpenOptions}; use std::net::SocketAddr; -use std::panic::panic_any; use std::sync::Arc; +use std::time::Duration; -use aws_sdk_sqs::operation::receive_message::builders::ReceiveMessageFluentBuilder; -use aws_sdk_sqs::operation::send_message::builders::SendMessageFluentBuilder; -use bytemuck::bytes_of; -use eyre::{anyhow, Error}; use futures::future; -use memmap::{Mmap, MmapOptions}; +use memmap::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use serde::{Deserialize, Serialize}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; use tokio::sync::mpsc::Receiver; -use tokio::sync::{self, mpsc, Mutex}; +use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; use crate::bits::Bits; use crate::distance::{self, Distance, DistanceResults, MasksEngine}; -use crate::encoded_bits::EncodedBits; -use crate::template::{self, Template}; +use crate::template::Template; -const BATCH_SIZE: usize = 20_000; //TODO: probably make this configurable +const BATCH_SIZE: usize = 20_000; //TODO: make this configurable pub struct Coordinator { aws_client: aws_sdk_sqs::Client, @@ -68,7 +62,7 @@ impl Coordinator { //TODO: update error handling pub async fn spawn(mut self) -> eyre::Result<()> { - let mmap_db: Arc = Arc::new(self.initialize_mmap_db()); + let mmap_db: Arc = Arc::new(self.initialize_mmap_db()?); loop { if let Some(messages) = self.dequeue_queries().await? { @@ -105,7 +99,8 @@ impl Coordinator { } } - //TODO: sleep for some amount of time + //TODO: decide how long to sleep + tokio::time::sleep(Duration::from_millis(100)).await; } } @@ -316,8 +311,17 @@ impl Coordinator { Ok(distance_results) } - pub fn initialize_mmap_db(&self) -> Mmap { - todo!() + pub fn initialize_mmap_db(&self) -> eyre::Result { + //TODO: update this + // Try to open the file, or create it if it does not exist + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open("masks")?; + + // Memory-map the file + unsafe { Ok(Mmap::map(&file)?) } } pub async fn dequeue_queries(