Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Feb 27, 2024
1 parent 1851df4 commit 2d69b9a
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ version = "0.0.1"
edition = "2021"

[dependencies]
zarrs = { version = "0.11.6", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] }
zarrs = { version = "0.12.2", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] }
clap = { version = "4.1.8", features = ["derive"] }
serde_json = "1.0.108"
thiserror = "1.0.50"
Expand Down
32 changes: 14 additions & 18 deletions src/engine/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,25 @@ use crate::utils::rows_per_shard;
use super::EngineResult;
use super::EngineStrategy;

impl<T: ReadableStorageTraits> EngineStrategy<Vec<usize>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<usize>> {
let index_to_chunk = index as u64 / rows_per_shard(self);
let chunk_to_index = index as u64 % rows_per_shard(self);
let ans = self.retrieve_chunk_subset_elements(
&[index_to_chunk, 0],
&ArraySubset::new_with_ranges(&[
chunk_to_index..chunk_to_index + 1,
0..columns_per_shard(self),
]),
)?;
Ok(ans.to_vec())
impl<T: ReadableStorageTraits + 'static> EngineStrategy<Vec<u32>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<u32>> {
let shard_index = index as u64 / rows_per_shard(self);
let shard = self.retrieve_chunk_elements(&[shard_index, 0])?;
let chunk_index = index as u64 % rows_per_shard(self);
let start = (chunk_index * columns_per_shard(self)) as usize;
let end = start + columns_per_shard(self) as usize;
let chunk: &[u32] = &shard[start..end];
Ok(chunk.to_vec())
}

fn get_second_term(&self, _index: usize) -> EngineResult<Vec<usize>> {
fn get_second_term(&self, _index: usize) -> EngineResult<Vec<u32>> {
unimplemented!()
}

fn get_third_term(&self, index: usize) -> EngineResult<Vec<usize>> {
let last_chunk = self.shape()[0] / rows_per_shard(self);
fn get_third_term(&self, index: usize) -> EngineResult<Vec<u32>> {
let col = index as u64;
let shape = &ArraySubset::new_with_ranges(&[0..last_chunk, col..col + 1]);
let ans = self.retrieve_array_subset_elements(shape)?;
Ok(ans.to_vec())
let shape = ArraySubset::new_with_start_end_inc(vec![0, col], vec![self.shape()[0], col])?;
let ans = self.retrieve_array_subset_elements(&shape)?;
Ok(ans)
}
}
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::convert::Infallible;
use thiserror::Error;
use zarrs::array::codec::bytes_to_bytes::gzip::GzipCompressionLevelError;
use zarrs::array::codec::CodecError;
use zarrs::array::ArrayCreateError;
use zarrs::array::ArrayError;
use zarrs::array::NonZeroError;
Expand Down Expand Up @@ -71,6 +72,8 @@ pub enum EngineError {
Operation,
#[error(transparent)]
IncompatibleStartEndIndicesError(#[from] IncompatibleStartEndIndicesError),
#[error(transparent)]
Codec(#[from] CodecError),
}

#[derive(Error, Debug)]
Expand Down
12 changes: 6 additions & 6 deletions src/storage/layout/matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Layout<Chunk> for MatrixLayout {
}

fn data_type(&self) -> DataType {
DataType::UInt64
DataType::UInt32
}

fn chunk_shape(
Expand All @@ -49,7 +49,7 @@ impl Layout<Chunk> for MatrixLayout {
}

fn fill_value(&self) -> FillValue {
FillValue::from(0u64)
FillValue::from(0u32)
}

fn dimension_names(&self, reference_system: &ReferenceSystem) -> Option<Vec<DimensionName>> {
Expand Down Expand Up @@ -97,27 +97,27 @@ impl LayoutOps<Chunk> for MatrixLayout {
graph
}

fn store_chunk_elements(&self, chunk: &[Chunk], columns: usize) -> Vec<u64> {
fn store_chunk_elements(&self, chunk: &[Chunk], columns: usize) -> Vec<u32> {
// We create a slice that has the size of the chunk filled with 0 values
// having the size of the shard; that is, number of rows, and a given
// number of columns. This value is converted into an AtomicU8 for us to
// be able to share it among threads
let slice: Vec<AtomicZarrType> = vec![0u64; chunk.len() * columns]
let slice: Vec<AtomicZarrType> = vec![0u32; chunk.len() * columns]
.iter()
.map(|&n| AtomicZarrType::new(n))
.collect();

for (first_term, triples) in chunk.iter().enumerate() {
triples.iter().for_each(|&(second_term, third_term)| {
let third_term_idx = third_term as usize + first_term * columns;
slice[third_term_idx].store(second_term as u64, Ordering::Relaxed);
slice[third_term_idx].store(second_term, Ordering::Relaxed);
});
}

slice
.iter()
.map(|elem| elem.load(Ordering::Relaxed))
.collect::<Vec<u64>>()
.collect::<Vec<_>>()
}

fn retrieve_chunk_elements(
Expand Down
8 changes: 3 additions & 5 deletions src/storage/layout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@ pub trait LayoutOps<C> {
let remainder = iter.remainder();

for chunk in iter {
arr.store_chunk_elements(
&[count.load(Ordering::Relaxed), 0],
self.store_chunk_elements(chunk, columns),
)?;
let slice = self.store_chunk_elements(chunk, columns);
arr.store_chunk_elements(&[count.load(Ordering::Relaxed), 0], slice)?;
count.fetch_add(1, Ordering::Relaxed);
}

Expand Down Expand Up @@ -147,7 +145,7 @@ pub trait LayoutOps<C> {
}

fn graph_iter(&self, graph: Graph) -> Vec<C>;
fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec<u64>;
fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec<u32>;
fn retrieve_chunk_elements(
&mut self,
matrix: &Mutex<TriMat<usize>>,
Expand Down
12 changes: 6 additions & 6 deletions src/storage/layout/tabular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ impl Layout<Chunk> for TabularLayout {
}

fn data_type(&self) -> DataType {
DataType::UInt64
DataType::UInt32
}

fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid {
vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value
}

fn fill_value(&self) -> FillValue {
FillValue::from(0u64)
FillValue::from(0u32)
}

fn dimension_names(&self, _: &ReferenceSystem) -> Option<Vec<DimensionName>> {
Expand Down Expand Up @@ -71,12 +71,12 @@ impl LayoutOps<Chunk> for TabularLayout {
.collect::<Vec<Chunk>>()
}

fn store_chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec<u64> {
fn store_chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec<u32> {
let mut ans = Vec::new();
for &(first_term, second_term, third_term) in chunk {
ans.push(first_term as u64);
ans.push(second_term as u64);
ans.push(third_term as u64);
ans.push(first_term);
ans.push(second_term);
ans.push(third_term);
}
ans
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde_json::Map;
use sprs::CsMat;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use zarrs::array::Array;
use zarrs::array::ArrayBuilder;
Expand Down Expand Up @@ -30,7 +30,7 @@ pub mod ops;
pub mod params;

pub type ZarrArray = CsMat<usize>;
type AtomicZarrType = AtomicU64;
type AtomicZarrType = AtomicU32;
pub type StorageResult<T> = Result<T, RemoteHDTError>;

const ARRAY_NAME: &str = "/group/RemoteHDT"; // TODO: parameterize this
Expand Down
2 changes: 1 addition & 1 deletion src/storage/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub type OpsResult = Result<OpsFormat, OpsError>;

pub enum OpsFormat {
SparseArray(ZarrArray),
Zarr(Vec<usize>),
Zarr(Vec<u32>),
}

pub trait Ops {
Expand Down
2 changes: 1 addition & 1 deletion tests/get_subject_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn get_subject_matrix_sharding_test() -> Result<(), Box<dyn Error>> {
common::setup(
common::SHARDING_ZARR,
&mut storage,
ChunkingStrategy::Sharding(3),
ChunkingStrategy::Sharding(4),
ReferenceSystem::SPO,
);

Expand Down

0 comments on commit 2d69b9a

Please sign in to comment.