Skip to content

Commit

Permalink
get_predicate implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Mar 7, 2024
1 parent 2d69b9a commit 3681585
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 75 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ benches/*/*.nt
!resources/root.zarr
.vscode
heaptrack.*
tests/out
tests/out
uniprotkb_*
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ version = "0.0.1"
edition = "2021"

[dependencies]
zarrs = { version = "0.12.2", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] }
zarrs = { version = "0.12.3", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] }
clap = { version = "4.1.8", features = ["derive"] }
serde_json = "1.0.108"
thiserror = "1.0.50"
Expand Down
2 changes: 1 addition & 1 deletion examples/serialize_bench.rs → examples/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc;
fn main() -> Result<(), RemoteHDTError> {
let args: Vec<String> = env::args().collect();
if args.len() <= 3 {
panic!("Usage: cargo run --example serialize_bench <rdf_path> <zarr_path> <shard_size>");
panic!("Usage: cargo run --example serialize <rdf_path> <zarr_path> <shard_size>");
}

let rdf_path = &args[1].as_str();
Expand Down
10 changes: 8 additions & 2 deletions src/engine/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ impl EngineStrategy<ZarrArray> for ZarrArray {
Ok(&matrix * self)
}

fn get_second_term(&self, _value: usize) -> EngineResult<ZarrArray> {
unimplemented!()
fn get_second_term(&self, value: usize) -> EngineResult<ZarrArray> {
let mut matrix = TriMat::new((self.rows(), self.cols()));
self.iter().for_each(|(&e, (row, col))| {
if e == value {
matrix.add_triplet(row, col, value);
}
});
Ok(matrix.to_csc())
}

fn get_third_term(&self, index: usize) -> EngineResult<ZarrArray> {
Expand Down
30 changes: 25 additions & 5 deletions src/engine/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use zarrs::array::Array;
use zarrs::array_subset::ArraySubset;
use zarrs::storage::ReadableStorageTraits;

use crate::error::EngineError;
use crate::utils::columns_per_shard;
use crate::utils::rows_per_shard;

Expand All @@ -19,14 +20,33 @@ impl<T: ReadableStorageTraits + 'static> EngineStrategy<Vec<u32>> for Array<T> {
Ok(chunk.to_vec())
}

fn get_second_term(&self, _index: usize) -> EngineResult<Vec<u32>> {
unimplemented!()
fn get_second_term(&self, index: usize) -> EngineResult<Vec<u32>> {
let mut ans = Vec::new();
let number_of_shards = match self.chunk_grid_shape() {
Some(chunk_grid) => chunk_grid[0],
None => return Err(EngineError::Operation),
};
for i in 0..number_of_shards {
let mut shard = self.retrieve_chunk_elements::<u32>(&[i, 0])?;
shard.iter_mut().for_each(|e| {
if *e != index as u32 {
*e = 0
}
});
ans.append(&mut shard);
}
Ok(ans)
}

fn get_third_term(&self, index: usize) -> EngineResult<Vec<u32>> {
let objects = self.shape()[0];
let col = index as u64;
let shape = ArraySubset::new_with_start_end_inc(vec![0, col], vec![self.shape()[0], col])?;
let ans = self.retrieve_array_subset_elements(&shape)?;
Ok(ans)
let shape = ArraySubset::new_with_ranges(&[0..objects, col..col + 1]);
let array_subset = self.retrieve_array_subset(&shape).unwrap();
let third_term_subset = array_subset
.windows(4)
.map(|w| u32::from_ne_bytes(w.try_into().unwrap()))
.collect::<Vec<_>>();
Ok(third_term_subset)
}
}
17 changes: 11 additions & 6 deletions src/storage/layout/matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ impl Layout<Chunk> for MatrixLayout {
&self,
dimensionality: &Dimensionality,
) -> StorageResult<Box<dyn ArrayToBytesCodecTraits>> {
let mut sharding_codec_builder =
ShardingCodecBuilder::new(vec![1, dimensionality.get_third_term_size()].try_into()?);
let mut sharding_codec_builder = ShardingCodecBuilder::new(
vec![1, dimensionality.get_third_term_size()]
.as_slice()
.try_into()?,
);
sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]);
Ok(Box::new(sharding_codec_builder.build()))
}
Expand Down Expand Up @@ -124,16 +127,18 @@ impl LayoutOps<Chunk> for MatrixLayout {
&mut self,
matrix: &Mutex<TriMat<usize>>,
first_term_index: usize,
chunk: &[usize],
chunk: &[u32],
) {
chunk
.iter()
.enumerate()
.for_each(|(third_term_idx, &second_term_idx)| {
if second_term_idx != 0 {
matrix
.lock()
.add_triplet(first_term_index, third_term_idx, second_term_idx);
matrix.lock().add_triplet(
first_term_index,
third_term_idx,
second_term_idx as usize,
);
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions src/storage/layout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ pub trait LayoutOps<C> {

for chunk in iter {
let slice = self.store_chunk_elements(chunk, columns);
arr.store_chunk_elements(&[count.load(Ordering::Relaxed), 0], slice)?;
arr.store_chunk_elements::<u32>(&[count.load(Ordering::Relaxed), 0], slice)?;
count.fetch_add(1, Ordering::Relaxed);
}

if !remainder.is_empty() {
arr.store_array_subset_elements(
arr.store_array_subset_elements::<u32>(
&ArraySubset::new_with_start_shape(
vec![count.load(Ordering::Relaxed) * rows_per_shard(&arr), 0],
vec![remainder.len() as u64, columns_per_shard(&arr)],
Expand Down Expand Up @@ -123,7 +123,7 @@ pub trait LayoutOps<C> {
// of it. Once we have all the pieces processed, we will have parsed the
// whole array
for shard in 0..number_of_shards {
arr.retrieve_chunk_elements(&[shard, 0])?
arr.retrieve_chunk_elements::<u32>(&[shard, 0])?
// We divide each shard by the number of columns, as a shard is
// composed of chunks having the size of [1, number of cols]
.chunks(number_of_columns)
Expand All @@ -150,7 +150,7 @@ pub trait LayoutOps<C> {
&mut self,
matrix: &Mutex<TriMat<usize>>,
first_term_idx: usize,
chunk: &[usize],
chunk: &[u32],
);
fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize;
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/layout/tabular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ impl LayoutOps<Chunk> for TabularLayout {
fn retrieve_chunk_elements(
&mut self,
matrix: &Mutex<TriMat<usize>>,
first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick?
chunk: &[usize],
_first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick?
chunk: &[u32],
) {
matrix
.lock()
.add_triplet(chunk[0], chunk[2], chunk[1] as usize);
.add_triplet(chunk[0] as usize, chunk[2] as usize, chunk[1] as usize);
}

fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize {
Expand Down
7 changes: 3 additions & 4 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl<C> Storage<C> {

// Create a group and write metadata to filesystem
let group = GroupBuilder::new().build(store.clone(), "/group")?;

let _ = group.store_metadata()?;
group.store_metadata()?;

// TODO: rayon::ThreadPoolBuilder::new()
// .num_threads(1)
Expand Down Expand Up @@ -153,9 +152,9 @@ impl<C> Storage<C> {
Ok(self)
}

pub fn load<'a>(
pub fn load(
&mut self,
store: Backend<'a>,
store: Backend<'_>,
// threading_strategy: ThreadingStrategy, TODO: implement this
) -> StorageResult<&mut Self> {
let operator = match store {
Expand Down
10 changes: 5 additions & 5 deletions src/storage/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub enum ReferenceSystem {
pub struct Dimensionality {
graph_size: Option<usize>,
pub(crate) first_term_size: usize,
second_term_size: usize,
_second_term_size: usize,
pub(crate) third_term_size: usize,
}

Expand Down Expand Up @@ -91,7 +91,7 @@ impl Dimensionality {
ReferenceSystem::POS | ReferenceSystem::PSO => dictionary.predicates_size(),
ReferenceSystem::OPS | ReferenceSystem::OSP => dictionary.objects_size(),
},
second_term_size: match dictionary.get_reference_system() {
_second_term_size: match dictionary.get_reference_system() {
ReferenceSystem::PSO | ReferenceSystem::OSP => dictionary.subjects_size(),
ReferenceSystem::SPO | ReferenceSystem::OPS => dictionary.predicates_size(),
ReferenceSystem::SOP | ReferenceSystem::POS => dictionary.objects_size(),
Expand All @@ -112,9 +112,9 @@ impl Dimensionality {
self.first_term_size as u64
}

pub(crate) fn get_second_term_size(&self) -> u64 {
self.second_term_size as u64
}
// pub(crate) fn get_second_term_size(&self) -> u64 {
// self._second_term_size as u64
// }

pub(crate) fn get_third_term_size(&self) -> u64 {
self.third_term_size as u64
Expand Down
6 changes: 3 additions & 3 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum Subject {
}

impl Subject {
fn get_idx(self, dictionary: &Dictionary) -> usize {
pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize {
dictionary.get_subject_idx_unchecked(self.into())
}
}
Expand Down Expand Up @@ -74,7 +74,7 @@ pub enum Predicate {
}

impl Predicate {
fn get_idx(self, dictionary: &Dictionary) -> usize {
pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize {
dictionary.get_predicate_idx_unchecked(self.into())
}
}
Expand Down Expand Up @@ -107,7 +107,7 @@ pub enum Object {
}

impl Object {
fn get_idx(self, dictionary: &Dictionary) -> usize {
pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize {
dictionary.get_object_idx_unchecked(self.into())
}
}
Expand Down
88 changes: 88 additions & 0 deletions tests/get_predicate_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use remote_hdt::storage::layout::matrix::MatrixLayout;
use remote_hdt::storage::layout::tabular::TabularLayout;
use remote_hdt::storage::ops::Ops;
use remote_hdt::storage::ops::OpsFormat;
use remote_hdt::storage::params::Backend;
use remote_hdt::storage::params::ChunkingStrategy;
use remote_hdt::storage::params::ReferenceSystem;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::Storage;
use sprs::TriMat;
use std::error::Error;

mod common;

#[test]
fn get_predicate_matrix_chunk_test() -> Result<(), Box<dyn Error>> {
let mut storage = Storage::new(MatrixLayout, Serialization::Zarr);

common::setup(
common::MATRIX_ZARR,
&mut storage,
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
);

let actual = match storage
.load(Backend::FileSystem(common::MATRIX_ZARR))?
.get_predicate(common::Predicate::InstanceOf.into())?
{
OpsFormat::Zarr(actual) => actual,
_ => unreachable!(),
};

if actual
== vec![
0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 5, 0, 0, 0,
]
{
Ok(())
} else {
Err(String::from("Expected and actual results are not equals").into())
}
}

#[test]
fn get_predicate_tabular_test() -> Result<(), Box<dyn Error>> {
let mut storage = Storage::new(TabularLayout, Serialization::Sparse);

common::setup(
common::TABULAR_ZARR,
&mut storage,
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
);

let actual = match storage
.load(Backend::FileSystem(common::TABULAR_ZARR))?
.get_predicate(common::Predicate::InstanceOf.into())?
{
OpsFormat::SparseArray(actual) => actual,
_ => unreachable!(),
};

let mut expected = TriMat::new((4, 9));
expected.add_triplet(
common::Subject::Alan.get_idx(&storage.get_dictionary()),
common::Object::Human.get_idx(&storage.get_dictionary()),
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Subject::Wilmslow.get_idx(&storage.get_dictionary()),
common::Object::Town.get_idx(&storage.get_dictionary()),
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Subject::Bombe.get_idx(&storage.get_dictionary()),
common::Object::Computer.get_idx(&storage.get_dictionary()),
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
);
let expected = expected.to_csc();

if actual == expected {
Ok(())
} else {
Err(String::from("Expected and actual results are not equals").into())
}
}
Loading

0 comments on commit 3681585

Please sign in to comment.