diff --git a/src/remote_hdt.rs b/src/remote_hdt.rs index e4e6a57..be02388 100644 --- a/src/remote_hdt.rs +++ b/src/remote_hdt.rs @@ -1,9 +1,13 @@ use bimap::BiHashMap; +use ndarray::parallel::prelude::IntoParallelRefIterator; +use ndarray::parallel::prelude::ParallelIterator; use ndarray::{ArcArray, ArcArray1, Array2, ArrayBase, Axis, Dim, Ix3, IxDynImpl, OwnedArcRepr}; use rdf_rs::RdfParser; use sophia::term::BoxTerm; use std::path::PathBuf; use std::str::FromStr; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; use zarr3::codecs::bb::gzip_codec::GzipCodec; use zarr3::prelude::smallvec::smallvec; use zarr3::prelude::{ @@ -482,6 +486,7 @@ impl<'a> RemoteHDT<'a> { // TODO: could this be done using rayon or a multi-threaded approach. // Maybe using chunks instead of a region and having several chunks of // the same size (i.e 100x100). Then we write in parallel? + // This is the place where the system is currently taking more time if arr.write_region(&offset, data).is_err() { return Err(String::from("Error writing to the Array")); }; @@ -498,18 +503,26 @@ impl<'a> RemoteHDT<'a> { objects: BiHashMap, ) -> Result, Dim>, String> { match ArcArrayD::from_shape_vec(self.reference_system.shape(domain).to_vec(), { - let mut v: Vec = - vec![0u8; domain.subjects_size * domain.predicates_size * domain.objects_size]; - let slice = v.as_mut_slice(); - dump.graph.iter().for_each(|[subject, predicate, object]| { - slice[self.reference_system.index( - subjects.get_by_left(subject).unwrap().to_owned(), - predicates.get_by_left(predicate).unwrap().to_owned(), - objects.get_by_left(object).unwrap().to_owned(), - domain, - )] = 1u8; - }); - slice.to_vec() + let slice: Vec = + vec![0u8; domain.subjects_size * domain.predicates_size * domain.objects_size] + .par_iter() + .map(|&n| AtomicU8::new(n)) + .collect(); + dump.graph + .par_iter() + .for_each(|[subject, predicate, object]| { + slice[self.reference_system.index( + subjects.get_by_left(subject).unwrap().to_owned(), + predicates.get_by_left(predicate).unwrap().to_owned(), + objects.get_by_left(object).unwrap().to_owned(), + domain, + )] + .store(1u8, Ordering::Relaxed); + }); + slice + .iter() + .map(|elem| elem.load(Ordering::Relaxed)) + .collect::>() }) { Ok(data) => Ok(data), Err(_) => return Err(String::from("Error creating the data Array")), diff --git a/tests/write_read_test.rs b/tests/write_read_test.rs index fc5623c..354df8a 100644 --- a/tests/write_read_test.rs +++ b/tests/write_read_test.rs @@ -4,7 +4,7 @@ use remote_hdt::remote_hdt::{ArcArray3, RemoteHDTBuilder}; #[test] fn write_read_test() { - let _ = remove_dir_all("root.zarr").unwrap(); + let _ = remove_dir_all("root.zarr"); let _ = RemoteHDTBuilder::new("root.zarr") .reference_system(remote_hdt::remote_hdt::ReferenceSystem::SPO)