diff --git a/examples/complement_bench.rs b/examples/complement_bench.rs new file mode 100644 index 0000000..0ef8b2f --- /dev/null +++ b/examples/complement_bench.rs @@ -0,0 +1,9 @@ +use remote_hdt::error::RemoteHDTError; +use remote_hdt::complement::layout::default::DefaultComplementLayout; +use remote_hdt::complement::ops::Ops; +use remote_hdt::complement::ComplementStorage; +fn main() { + + let mut binding = ComplementStorage::new(DefaultComplementLayout); + +} diff --git a/src/complement/layout/default.rs b/src/complement/layout/default.rs new file mode 100644 index 0000000..6ac95e4 --- /dev/null +++ b/src/complement/layout/default.rs @@ -0,0 +1 @@ +pub struct DefaultComplementLayout; \ No newline at end of file diff --git a/src/complement/layout/mod.rs b/src/complement/layout/mod.rs new file mode 100644 index 0000000..eced7a7 --- /dev/null +++ b/src/complement/layout/mod.rs @@ -0,0 +1,133 @@ +pub mod default; + +use zarrs::storage::store::OpendalStore; +use zarrs::array::Array; +pub trait ComplementLayout { + fn retrieve_attributes(&mut self, arr: &Array) { + // 4. We get the attributes so we can obtain some values that we will need + let attributes = arr.attributes(); + + let subjects = &value_to_term(match attributes.get("subjects") { + Some(subjects) => subjects, + None => return Err(RemoteHDTError::SubjectsNotInJSON), + }); + let predicates = &value_to_term(match attributes.get("predicates") { + Some(predicates) => predicates, + None => return Err(RemoteHDTError::PredicatesNotInJSON), + }); + let objects = &value_to_term(match attributes.get("objects") { + Some(objects) => objects, + None => return Err(RemoteHDTError::ObjectsNotInJSON), + }); + + let reference_system: ReferenceSystem = match attributes.get("reference_system") { + Some(reference_system) => reference_system, + None => return Err(RemoteHDTError::ReferenceSystemNotInJSON), + } + .as_str() + .unwrap() + .into(); + + Ok(Dictionary::from_vec_str( + reference_system, + subjects, + predicates, + objects, + )) + } + + fn serialize(&mut self, arr: Array, graph: Graph) -> StorageResult<()> { + let columns = arr.shape()[1] as usize; + let count = AtomicU64::new(0); + let binding = self.graph_iter(graph.to_owned()); + let iter = binding.chunks_exact(rows_per_shard(&arr) as usize); + let remainder = iter.remainder(); + + for chunk in iter { + arr.store_chunk_elements( + &[count.load(Ordering::Relaxed), 0], + self.store_chunk_elements(chunk, columns), + )?; + count.fetch_add(1, Ordering::Relaxed); + } + + if !remainder.is_empty() { + arr.store_array_subset_elements( + &ArraySubset::new_with_start_shape( + vec![count.load(Ordering::Relaxed) * rows_per_shard(&arr), 0], + vec![remainder.len() as u64, columns_per_shard(&arr)], + )?, + self.store_chunk_elements(remainder, columns), + )?; + } + + Ok(()) + } + + fn parse( + &mut self, + arr: &Array, + dimensionality: &Dimensionality, + ) -> StorageResult { + // First, we create the 2D matrix in such a manner that the number of + // rows is the same as the size of the first terms; i.e, in the SPO + // orientation, that will be equals to the number of subjects, while + // the number of columns is equals to the size of the third terms; i.e, + // following the same example as before, it will be equals to the number + // of objects. In our case the dimensionality abstracts the process + // of getting the size of the concrete dimension + let matrix = Mutex::new(TriMat::new(( + dimensionality.first_term_size, // we obtain the size of the first terms + dimensionality.third_term_size, // we obtain the size of the third terms + ))); + + // We compute the number of shards; for us to achieve so, we have to obtain + // first dimension of the chunk grid + let number_of_shards = match arr.chunk_grid_shape() { + Some(chunk_grid) => chunk_grid[0], + + None => 0, + }; + + let number_of_columns = arr.shape()[1] as usize; + + // For each chunk in the Zarr array we retrieve it and parse it into a + // matrix, inserting the triplet in its corresponding position. The idea + // of parsing the array chunk-by-chunk allows us to keep the RAM usage + // low, as instead of parsing the whole array, we process smaller pieces + // of it. Once we have all the pieces processed, we will have parsed the + // whole array + for shard in 0..number_of_shards { + arr.retrieve_chunk_elements(&[shard, 0])? + // We divide each shard by the number of columns, as a shard is + // composed of chunks having the size of [1, number of cols] + .chunks(number_of_columns) + .enumerate() + .for_each(|(first_term_idx, chunk)| { + self.retrieve_chunk_elements( + &matrix, + first_term_idx + (shard * rows_per_shard(arr)) as usize, + chunk, + ); + }) + } + + // We use a CSC Matrix because typically, RDF knowledge graphs tend to + // have more rows than columns; as such, CSC matrices are optimized + // for that precise scenario + let x = matrix.lock(); + Ok(x.to_csc()) + } + + fn graph_iter(&self, graph: Graph) -> Vec; + fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec; + fn retrieve_chunk_elements( + &mut self, + matrix: &Mutex>, + first_term_idx: usize, + chunk: &[usize], + ); + fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize; + +} + diff --git a/src/complement/mod.rs b/src/complement/mod.rs new file mode 100644 index 0000000..7c46fcf --- /dev/null +++ b/src/complement/mod.rs @@ -0,0 +1,28 @@ +pub mod layout; +pub mod ops; +pub mod params; + +use crate::dictionary::Dictionary; + +use self::params::Dimensionality; +use self::layout::default::DefaultComplementLayout; + + +use zarrs::storage::store::OpendalStore; +use zarrs::array::Array; + + +pub struct ComplementStorage { + dictionary: Dictionary, + dimensionality: Dimensionality, + array: Option> + +} + +impl ComplementStorage { + pub fn new(layout: impl ComplementLayout + 'static) -> Self { + ComplementStorage { + + } + } +} \ No newline at end of file diff --git a/src/complement/ops.rs b/src/complement/ops.rs new file mode 100644 index 0000000..cb162d6 --- /dev/null +++ b/src/complement/ops.rs @@ -0,0 +1,3 @@ +pub trait Ops { + fn get_chunk(&self, subject: &usize) -> Vec; +} \ No newline at end of file diff --git a/src/complement/params.rs b/src/complement/params.rs new file mode 100644 index 0000000..4a0ced9 --- /dev/null +++ b/src/complement/params.rs @@ -0,0 +1,4 @@ +#[derive(Default)] +pub struct Dimensionality { + graph_size: Option, +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 1968cf7..70dc140 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,3 +4,4 @@ pub mod error; mod io; pub mod storage; mod utils; +pub mod complement; diff --git a/src/storage/layout/complement.rs b/src/storage/layout/complement.rs deleted file mode 100644 index ae1c542..0000000 --- a/src/storage/layout/complement.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::num::NonZeroU64; - -use parking_lot::Mutex; -use sprs::TriMat; -use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; -use zarrs::array::codec::ArrayToBytesCodecTraits; -use zarrs::array::codec::GzipCodec; -use zarrs::array::ChunkGrid; -use zarrs::array::DataType; -use zarrs::array::DimensionName; -use zarrs::array::FillValue; - -use super::ChunkingStrategy; -use super::Dimensionality; -use super::StorageResult; - -use crate::io::Graph; -use crate::storage::layout::LayoutOps; -use crate::storage::layout::ComplementaryLayout; - -type Chunk = (u32, u32, u32); - -pub struct ComplementLayout; - - - - -impl ComplementaryLayout for ComplementLayout { - fn shape(&self, dimensionality: &Dimensionality) -> Vec { - vec![dimensionality.get_graph_size(), 3] - } - - fn data_type(&self) -> DataType { - DataType::UInt64 - } - - fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { - vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value - } - - fn fill_value(&self) -> FillValue { - FillValue::from(0u64) - } - - fn dimension_names(&self) -> Option> { - Some(vec![ - DimensionName::new("Triples"), - DimensionName::new("Complementary fields"), - ]) - } - - fn array_to_bytes_codec( - &self, - _: &Dimensionality, - ) -> StorageResult> { - let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3].try_into()?); - sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); - Ok(Box::new(sharding_codec_builder.build())) - } -} - -impl LayoutOps for ComplementLayout { - fn graph_iter(&self, graph: Graph) -> Vec { - graph - .iter() - .enumerate() - .flat_map(|(first_term, triples)| { - triples - .iter() - .map(|&(second_term, third_term)| (first_term as u32, second_term, third_term)) - .collect::>() - }) - .collect::>() - } - - fn store_chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec { - let mut ans = Vec::new(); - for &(first_term, second_term, third_term) in chunk { - ans.push(first_term as u64); - ans.push(second_term as u64); - ans.push(third_term as u64); - } - ans - } - - fn retrieve_chunk_elements( - &mut self, - matrix: &Mutex>, - first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick? - chunk: &[usize], - ) { - matrix - .lock() - .add_triplet(chunk[0], chunk[2], chunk[1] as usize); - } - - fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize { - dimensionality.first_term_size * dimensionality.third_term_size - } -} diff --git a/src/storage/layout/mod.rs b/src/storage/layout/mod.rs index c9b8b9b..e5ead2f 100644 --- a/src/storage/layout/mod.rs +++ b/src/storage/layout/mod.rs @@ -28,7 +28,6 @@ type ArrayToBytesCodec = Box; pub mod matrix; pub mod tabular; -pub mod complement; pub trait LayoutOps { fn retrieve_attributes(&mut self, arr: &Array) -> StorageResult { @@ -175,18 +174,3 @@ pub trait Layout: LayoutOps { } -pub trait ComplementaryLayout: LayoutOps { - fn shape(&self, dimensionality: &Dimensionality) -> Vec; - fn data_type(&self) -> DataType; - fn chunk_shape( - &self, - chunking_strategy: ChunkingStrategy, - dimensionality: &Dimensionality, - ) -> ChunkGrid; - fn fill_value(&self) -> FillValue; - fn dimension_names(&self) -> Option>; - fn array_to_bytes_codec( - &self, - dimensionality: &Dimensionality, - ) -> StorageResult; -}