Skip to content

Commit

Permalink
errors finally fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Mar 11, 2024
1 parent a5ab4b1 commit 3fd018d
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 62 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ version = "0.0.1"
edition = "2021"

[dependencies]
zarrs = { version = "0.12.4", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray", "crc32c" ] }
zarrs = { version = "0.12.4", default-features = false, features = [ "http", "gzip", "sharding", "async", "ndarray", "crc32c" ] }
clap = { version = "4.1.8", features = ["derive"] }
serde_json = "1.0.108"
thiserror = "1.0.50"
Expand All @@ -17,9 +17,6 @@ rio_api = "0.8.4"
rayon = "1.8.0"
parking_lot = "0.12"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

[profile.release]
codegen-units = 1
opt-level = 3
Expand Down
4 changes: 2 additions & 2 deletions examples/query_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use remote_hdt::storage::Storage;
use std::env;
use std::time::Instant;

const SUBJECT: &str = "<http://www.Department0.University0.edu/AssistantProfessor0/Publication0>";
const SUBJECT: &str = "<http://www.Department1.University0.edu/Course8>";

fn main() -> Result<(), RemoteHDTError> {
let args: Vec<String> = env::args().collect();
Expand All @@ -21,7 +21,7 @@ fn main() -> Result<(), RemoteHDTError> {
let arr = binding.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?;

let before = Instant::now();
arr.get_subject(SUBJECT)?;
arr.get_object(SUBJECT)?;

println!("Elapsed time: {:.2?}", before.elapsed());

Expand Down
4 changes: 0 additions & 4 deletions examples/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ use remote_hdt::storage::Storage;
use std::env;
use std::time::Instant;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn main() -> Result<(), RemoteHDTError> {
let args: Vec<String> = env::args().collect();
if args.len() <= 3 {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::utils::rows_per_shard;
use super::EngineResult;
use super::EngineStrategy;

impl<T: ReadableStorageTraits + 'static> EngineStrategy<Vec<u32>> for Array<T> {
impl<T: ReadableStorageTraits + 'static + ?Sized> EngineStrategy<Vec<u32>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<u32>> {
let shard_index = index as u64 / rows_per_shard(self);
let shard = self.retrieve_chunk_elements(&[shard_index, 0])?;
Expand Down
2 changes: 0 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ pub enum RemoteHDTError {
TripleSerialization,
#[error("The provided path is not valid")]
OsPathToString,
#[error(transparent)]
Opendal(#[from] zarrs::opendal::Error),
#[error("The provided backend is read-only")]
ReadOnlyBackend,
#[error("Error while parsing the RDF graph")]
Expand Down
12 changes: 8 additions & 4 deletions src/storage/layout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use zarrs::array::DataType;
use zarrs::array::DimensionName;
use zarrs::array::FillValue;
use zarrs::array_subset::ArraySubset;
use zarrs::storage::store::OpendalStore;
use zarrs::storage::store::FilesystemStore;
use zarrs::storage::ReadableStorageTraits;

use crate::dictionary::Dictionary;
use crate::error::RemoteHDTError;
Expand All @@ -30,7 +31,10 @@ pub mod matrix;
pub mod tabular;

pub trait LayoutOps<C> {
fn retrieve_attributes(&mut self, arr: &Array<OpendalStore>) -> StorageResult<Dictionary> {
fn retrieve_attributes(
&mut self,
arr: &Array<dyn ReadableStorageTraits>,
) -> StorageResult<Dictionary> {
// 4. We get the attributes so we can obtain some values that we will need
let attributes = arr.attributes();

Expand Down Expand Up @@ -63,7 +67,7 @@ pub trait LayoutOps<C> {
))
}

fn serialize(&mut self, arr: Array<OpendalStore>, graph: Graph) -> StorageResult<()> {
fn serialize(&mut self, arr: &Array<FilesystemStore>, graph: Graph) -> StorageResult<()> {
let columns = arr.shape()[1] as usize;
let count = AtomicU64::new(0);
let binding = self.graph_iter(graph.to_owned());
Expand Down Expand Up @@ -99,7 +103,7 @@ pub trait LayoutOps<C> {

fn parse(
&mut self,
arr: &Array<OpendalStore>,
arr: &Array<dyn ReadableStorageTraits>,
dimensionality: &Dimensionality,
) -> StorageResult<ZarrArray> {
// First, we create the 2D matrix in such a manner that the number of
Expand Down
52 changes: 16 additions & 36 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use zarrs::array::Array;
use zarrs::array::ArrayBuilder;
use zarrs::array_subset::ArraySubset;
use zarrs::group::GroupBuilder;
use zarrs::opendal::services::Fs;
use zarrs::opendal::services::Http;
use zarrs::opendal::Operator;
use zarrs::storage::store::OpendalStore;
use zarrs::storage::store::FilesystemStore;
use zarrs::storage::store::HTTPStore;
use zarrs::storage::ReadableStorageTraits;

use crate::dictionary::Dictionary;
use crate::error::RemoteHDTError;
Expand Down Expand Up @@ -41,7 +41,7 @@ pub struct Storage<C> {
layout: Box<dyn Layout<C>>,
serialization: Serialization,
reference_system: ReferenceSystem,
array: Option<Array<OpendalStore>>,
array: Option<Array<dyn ReadableStorageTraits>>,
sparse_array: Option<ZarrArray>,
}

Expand Down Expand Up @@ -78,29 +78,20 @@ impl<C> Storage<C> {
reference_system: ReferenceSystem,
// threading_strategy: ThreadingStrategy, TODO: implement this
) -> StorageResult<&mut Self> {
let operator = match store {
let path = match store {
Backend::FileSystem(path) => {
let mut builder = Fs::default();
let path = PathBuf::from_str(path)?;

match path.exists() {
true => return Err(RemoteHDTError::PathExists),
false => {
let path = match path.into_os_string().into_string() {
Ok(string) => string,
Err(_) => return Err(RemoteHDTError::OsPathToString),
};
builder.root(&path);
}
false => path,
}

Operator::new(builder)?.finish()
}
Backend::HTTP(_) => return Err(RemoteHDTError::ReadOnlyBackend),
};

// 2. We can create the FileSystemStore appropiately
let store = Arc::new(OpendalStore::new(operator.blocking()));
let store = Arc::new(FilesystemStore::new(path)?);

// Create a group and write metadata to filesystem
let group = GroupBuilder::new().build(store.clone(), "/group")?;
Expand Down Expand Up @@ -144,10 +135,13 @@ impl<C> Storage<C> {
attributes.insert("reference_system".into(), reference_system.as_ref().into());
attributes
})
.build(store, ARRAY_NAME)?;
.build(store.clone(), ARRAY_NAME)?;

arr.store_metadata()?;
self.layout.serialize(arr, graph)?;
self.layout.serialize(&arr, graph)?;

let shape = ArraySubset::new_with_ranges(&[0..10, 1..2]);
arr.retrieve_array_subset_elements::<u32>(&shape).unwrap();

Ok(self)
}
Expand All @@ -157,32 +151,18 @@ impl<C> Storage<C> {
store: Backend<'_>,
// threading_strategy: ThreadingStrategy, TODO: implement this
) -> StorageResult<&mut Self> {
let operator = match store {
let store: Arc<dyn ReadableStorageTraits> = match store {
Backend::FileSystem(path) => {
let mut builder = Fs::default();
let path = PathBuf::from_str(path)?;

match path.exists() {
false => return Err(RemoteHDTError::PathDoesNotExist),
true => {
let path = match path.into_os_string().into_string() {
Ok(string) => string,
Err(_) => return Err(RemoteHDTError::OsPathToString),
};
builder.root(&path);
}
true => Arc::new(FilesystemStore::new(path)?),
}

Operator::new(builder)?.finish()
}
Backend::HTTP(path) => {
let mut builder = Http::default();
builder.endpoint(path);
Operator::new(builder)?.finish()
}
Backend::HTTP(url) => Arc::new(HTTPStore::new(url)?),
};

let store: Arc<OpendalStore> = Arc::new(OpendalStore::new(operator.blocking()));
let arr = Array::new(store, ARRAY_NAME)?;
let dictionary = self.layout.retrieve_attributes(&arr)?;
self.dictionary = dictionary;
Expand Down
4 changes: 3 additions & 1 deletion src/storage/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ impl<C> Ops for Storage<C> {
Some(array) => OpsFormat::SparseArray(match self.reference_system {
ReferenceSystem::SPO | ReferenceSystem::SOP => array.get_first_term(index)?,
ReferenceSystem::PSO | ReferenceSystem::OSP => array.get_second_term(index)?,
ReferenceSystem::POS | ReferenceSystem::OPS => array.get_third_term(index)?,
ReferenceSystem::POS | ReferenceSystem::OPS => {
array.get_third_term(index).unwrap()
}
}),
None => return Err(OpsError::EmptySparseArray),
},
Expand Down
4 changes: 2 additions & 2 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn hash_to_set(terms: HashSet<String>) -> Vec<String> {
vec
}

pub fn rows_per_shard<T>(arr: &Array<T>) -> u64 {
pub fn rows_per_shard<T: ?Sized>(arr: &Array<T>) -> u64 {
match arr.chunk_grid().chunk_shape(&[0, 0], arr.shape()) {
Ok(shape) => match shape {
Some(chunk_shape) => chunk_shape[0].into(),
Expand All @@ -41,7 +41,7 @@ pub fn rows_per_shard<T>(arr: &Array<T>) -> u64 {
}
}

pub fn columns_per_shard<T>(arr: &Array<T>) -> u64 {
pub fn columns_per_shard<T: ?Sized>(arr: &Array<T>) -> u64 {
match arr.chunk_grid().chunk_shape(&[0, 0], arr.shape()) {
Ok(shape) => match shape {
Some(chunk_shape) => chunk_shape[1].into(),
Expand Down
2 changes: 1 addition & 1 deletion tests/get_object_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn get_object_matrix_sharding_test() -> Result<(), Box<dyn Error>> {
common::setup(
common::SHARDING_ZARR,
&mut storage,
ChunkingStrategy::Sharding(4),
ChunkingStrategy::Sharding(3),
ReferenceSystem::SPO,
);

Expand Down
61 changes: 56 additions & 5 deletions tests/orientation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use remote_hdt::storage::params::ChunkingStrategy;
use remote_hdt::storage::params::ReferenceSystem;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::Storage;
use sprs::TriMat;
use std::error::Error;

mod common;
Expand Down Expand Up @@ -116,11 +117,31 @@ fn orientation_pso_tabular_test() -> Result<(), Box<dyn Error>> {
.load(Backend::FileSystem(common::TABULAR_PSO_ZARR))?
.get_predicate(common::Predicate::InstanceOf.into())?
{
OpsFormat::Zarr(actual) => actual,
OpsFormat::SparseArray(actual) => actual,
_ => unreachable!(),
};

if actual == vec![3, 1, 1] {
let mut expected = TriMat::new((
storage.get_dictionary().predicates_size(),
storage.get_dictionary().objects_size(),
));
expected.add_triplet(
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
common::Object::Human.get_idx(&storage.get_dictionary()),
common::Subject::Alan.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
common::Object::Town.get_idx(&storage.get_dictionary()),
common::Subject::Wilmslow.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
common::Object::Computer.get_idx(&storage.get_dictionary()),
common::Subject::Bombe.get_idx(&storage.get_dictionary()),
);

if actual == expected.to_csc() {
Ok(())
} else {
println!("{:?}", actual);
Expand All @@ -130,7 +151,7 @@ fn orientation_pso_tabular_test() -> Result<(), Box<dyn Error>> {

#[test]
fn orientation_ops_tabular_test() -> Result<(), Box<dyn Error>> {
let mut storage = Storage::new(TabularLayout, Serialization::Zarr);
let mut storage = Storage::new(TabularLayout, Serialization::Sparse);

common::setup(
common::TABULAR_OPS_ZARR,
Expand All @@ -143,11 +164,41 @@ fn orientation_ops_tabular_test() -> Result<(), Box<dyn Error>> {
.load(Backend::FileSystem(common::TABULAR_OPS_ZARR))?
.get_subject(common::Subject::Alan.into())?
{
OpsFormat::Zarr(actual) => actual,
OpsFormat::SparseArray(actual) => actual,
_ => unreachable!(),
};

if actual == vec![1, 3, 4, 0, 0, 0, 0, 6, 7] {
let mut expected = TriMat::new((
storage.get_dictionary().objects_size(),
storage.get_dictionary().subjects_size(),
));
expected.add_triplet(
common::Object::Human.get_idx(&storage.get_dictionary()),
common::Subject::Alan.get_idx(&storage.get_dictionary()),
common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Object::Warrington.get_idx(&storage.get_dictionary()),
common::Subject::Alan.get_idx(&storage.get_dictionary()),
common::Predicate::PlaceOfBirth.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Object::Wilmslow.get_idx(&storage.get_dictionary()),
common::Subject::Alan.get_idx(&storage.get_dictionary()),
common::Predicate::PlaceOfDeath.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Object::Date.get_idx(&storage.get_dictionary()),
common::Subject::Alan.get_idx(&storage.get_dictionary()),
common::Predicate::DateOfBirth.get_idx(&storage.get_dictionary()),
);
expected.add_triplet(
common::Object::GCHQ.get_idx(&storage.get_dictionary()),
common::Subject::Alan.get_idx(&storage.get_dictionary()),
common::Predicate::Employer.get_idx(&storage.get_dictionary()),
);

if actual == expected.to_csc() {
Ok(())
} else {
Err(String::from("Expected and actual results are not equals").into())
Expand Down

0 comments on commit 3fd018d

Please sign in to comment.