Skip to content

Commit

Permalink
replace i_div_4 range with start and len
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlKCarlK committed Jan 23, 2024
1 parent 21a03af commit 51d07ae
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 76 deletions.
39 changes: 19 additions & 20 deletions src/bed_cloud.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
// cmk will the Python code fail on 32 bit?
#[cfg(not(target_pointer_width = "64"))]
compile_error!(
"This code requires a 64-bit target architecture because the cloud-library assumes it."
);
compile_error!("This code requires a 64-bit target architecture.");

use anyinput::anyinput;
use bytes::Bytes;
Expand Down Expand Up @@ -184,14 +181,13 @@ where
TObjectStore: ObjectStore,
{
// compute numbers outside of the loop
let (_, in_iid_count_div4_u64) =
check_file_length(in_iid_count, in_sid_count, size, object_path)?;
let (i_div_4_less_start_array, i_mod_4_times_2_array, i_div_4_range) =
let in_iid_count_div4_u64 = check_file_length(in_iid_count, in_sid_count, size, object_path)?;
let (i_div_4_less_start_array, i_mod_4_times_2_array, i_div_4_start, i_div_4_len) =
check_and_precompute_iid_index(in_iid_count, iid_index)?;
if i_div_4_range.is_empty() {
if i_div_4_len == 0 {
return Ok(()); // we must return early because the chucks method doesn't work with size 0
}
let chunk_count = max(1, max_chunk_size / i_div_4_range.len());
let chunk_count = max(1, max_chunk_size / i_div_4_len as usize);
let from_two_bits_to_value = set_up_two_bits_to_value(is_a1_counted, missing_value);
let lower_sid_count = -(in_sid_count as isize);
let upper_sid_count: isize = (in_sid_count as isize) - 1;
Expand All @@ -210,7 +206,8 @@ where
upper_sid_count,
lower_sid_count,
in_iid_count_div4_u64,
i_div_4_range.clone(),
i_div_4_start,
i_div_4_len,
);
async move {
let (ranges, out_sid_i_vec) = result?;
Expand Down Expand Up @@ -239,25 +236,27 @@ where

#[inline]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
fn extract_ranges(
chunk_count: usize,
chunk: itertools::Chunk<'_, std::slice::Iter<'_, isize>>,
chunk_index: usize,
upper_sid_count: isize,
lower_sid_count: isize,
in_iid_count_div4_u64: u64,
i_div_4_range: Range<usize>,
) -> Result<(Vec<std::ops::Range<usize>>, Vec<usize>), Box<BedErrorPlus>> {
i_div_4_start: u64,
i_div_4_len: u64,
) -> Result<(Vec<Range<usize>>, Vec<usize>), Box<BedErrorPlus>> {
let mut ranges = Vec::with_capacity(chunk_count);
let mut out_sid_i_vec = Vec::with_capacity(chunk_count);
for (inner_index, in_sid_i_signed) in chunk.enumerate() {
let out_sid_i = chunk_index * chunk_count + inner_index;
let in_sid_i =
convert_negative_sid_index(*in_sid_i_signed, upper_sid_count, lower_sid_count)?;
let pos: u64 =
in_sid_i * in_iid_count_div4_u64 + i_div_4_range.start as u64 + CB_HEADER_U64; // "as" and math is safe because of early checks
let range = pos as usize..(pos as usize + i_div_4_range.len()) as usize;
debug_assert!((range.len() == i_div_4_range.len())); // real assert
let pos: usize =
(in_sid_i * in_iid_count_div4_u64 + i_div_4_start + CB_HEADER_U64) as usize; // "as" and math is safe because of early checks
let range = pos..pos + i_div_4_len as usize;
debug_assert!(range.end - range.start == i_div_4_len as usize); // real assert
ranges.push(range);
out_sid_i_vec.push(out_sid_i);
}
Expand Down Expand Up @@ -296,17 +295,17 @@ fn check_file_length<TObjectStore>(
in_sid_count: usize,
size: usize,
object_path: &ObjectPath<TObjectStore>,
) -> Result<(usize, u64), Box<BedErrorPlus>>
) -> Result<u64, Box<BedErrorPlus>>
where
TObjectStore: ObjectStore,
{
let (in_iid_count_div4, in_iid_count_div4_u64) = try_div_4(in_iid_count, in_sid_count)?;
let in_iid_count_div4_u64 = try_div_4(in_iid_count, in_sid_count)?;
let file_len = size as u64;
let file_len2 = in_iid_count_div4_u64 * (in_sid_count as u64) + CB_HEADER_U64;
if file_len != file_len2 {
Err(BedError::IllFormed(object_path.to_string()))?;
}
Ok((in_iid_count_div4, in_iid_count_div4_u64))
Ok(in_iid_count_div4_u64)
}

#[inline]
Expand Down Expand Up @@ -2053,7 +2052,7 @@ where
let max_concurrent_requests =
compute_max_concurrent_requests(read_options.max_concurrent_requests)?;

let max_chunk_size = compute_max_chunk_size(read_options.max_chunk_size)?; // cmk check
let max_chunk_size = compute_max_chunk_size(read_options.max_chunk_size)?;

// If we already have a Vec<isize>, reference it. If we don't, create one and reference it.
let iid_hold = Hold::new(&read_options.iid_index, iid_count)?;
Expand Down
51 changes: 24 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// cmk we either need to test on 32-bit or assume 64-bit (and remove the _u64 variables)
#![warn(missing_docs)]
#![warn(clippy::pedantic)]
#![allow(
Expand Down Expand Up @@ -547,24 +546,21 @@ impl Missing for i8 {
}
}

// try_div_4 assumes that |usize| <= |u64|
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
compile_error!("This code requires a target architecture of either 32-bit or 64-bit");

#[cfg(not(target_pointer_width = "64"))]
compile_error!("This code requires a 64-bit target architecture.");
#[inline]
fn try_div_4(in_iid_count: usize, in_sid_count: usize) -> Result<(usize, u64), Box<BedErrorPlus>> {
fn try_div_4(in_iid_count: usize, in_sid_count: usize) -> Result<u64, Box<BedErrorPlus>> {
if in_iid_count == 0 {
return Ok((0, 0));
return Ok(0);
}
let in_iid_count_div4 = (in_iid_count - 1) / 4 + 1;
let in_iid_count_div4_u64 = in_iid_count_div4 as u64;
let in_iid_count_div4_u64 = ((in_iid_count - 1) / 4 + 1) as u64;
let in_sid_count_u64 = in_sid_count as u64;

if in_sid_count > 0 && (u64::MAX - CB_HEADER_U64) / in_sid_count_u64 < in_iid_count_div4_u64 {
Err(BedError::IndexesTooBigForFiles(in_iid_count, in_sid_count))?;
}

Ok((in_iid_count_div4, in_iid_count_div4_u64))
Ok(in_iid_count_div4_u64)
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -582,7 +578,7 @@ fn internal_read_no_alloc<TVal: BedVal>(
) -> Result<(), Box<BedErrorPlus>> {
// Check the file length

let (_, in_iid_count_div4_u64) = try_div_4(in_iid_count, in_sid_count)?;
let in_iid_count_div4_u64 = try_div_4(in_iid_count, in_sid_count)?;
// "as" and math is safe because of early checks
let file_len = buf_reader.get_ref().metadata()?.len();
let file_len2 = in_iid_count_div4_u64 * (in_sid_count as u64) + CB_HEADER_U64;
Expand All @@ -591,7 +587,7 @@ fn internal_read_no_alloc<TVal: BedVal>(
}

// Check and precompute for each iid_index
let (i_div_4_less_start_array, i_mod_4_times_2_array, i_div_4_range) =
let (i_div_4_less_start_array, i_mod_4_times_2_array, i_div_4_start, i_div_4_len) =
check_and_precompute_iid_index(in_iid_count, iid_index)?;

// Check and compute work for each sid_index
Expand All @@ -613,9 +609,8 @@ fn internal_read_no_alloc<TVal: BedVal>(
};

// Read the iid info for one snp from the disk
let mut bytes_vector: Vec<u8> = vec![0; i_div_4_range.len()];
let pos: u64 =
in_sid_i * in_iid_count_div4_u64 + i_div_4_range.start as u64 + CB_HEADER_U64; // "as" and math is safe because of early checks
let mut bytes_vector: Vec<u8> = vec![0; i_div_4_len as usize];
let pos: u64 = in_sid_i * in_iid_count_div4_u64 + i_div_4_start + CB_HEADER_U64; // "as" and math is safe because of early checks
buf_reader.seek(SeekFrom::Start(pos))?;
buf_reader.read_exact(&mut bytes_vector)?;
Ok::<_, Box<BedErrorPlus>>(bytes_vector)
Expand Down Expand Up @@ -649,7 +644,7 @@ type Array1U8 = nd::ArrayBase<nd::OwnedRepr<u8>, nd::Dim<[usize; 1]>>;
fn check_and_precompute_iid_index(
in_iid_count: usize,
iid_index: &[isize],
) -> Result<(Array1Usize, Array1U8, Range<usize>), Box<BedErrorPlus>> {
) -> Result<(Array1Usize, Array1U8, u64, u64), Box<BedErrorPlus>> {
let lower_iid_count = -(in_iid_count as isize);
let upper_iid_count: isize = (in_iid_count as isize) - 1;
let mut i_div_4_less_start_array = nd::Array1::<usize>::zeros(iid_index.len());
Expand Down Expand Up @@ -682,22 +677,24 @@ fn check_and_precompute_iid_index(
.par_bridge()
.try_for_each(|x| (*x).clone())?;

let (min_value, i_div_4_range) =
let (i_div_4_start, i_div_4_len) =
if let Some(min_value) = i_div_4_less_start_array.par_iter().min() {
let max_value = *i_div_4_less_start_array.par_iter().max().unwrap(); // safe because of min
(*min_value, *min_value..max_value + 1)
(*min_value as u64, (max_value + 1 - *min_value) as u64)
} else {
(0, 0..0)
(0, 0)
};
if min_value > 0 {
i_div_4_less_start_array // cmk skip of min_value is 0
// skip of min_value is 0
if i_div_4_start > 0 {
i_div_4_less_start_array
.par_iter_mut()
.for_each(|x| *x -= min_value);
.for_each(|x| *x -= i_div_4_start as usize);
}
Ok((
i_div_4_less_start_array,
i_mod_4_times_2_array,
i_div_4_range,
i_div_4_start,
i_div_4_len,
))
}

Expand Down Expand Up @@ -740,13 +737,13 @@ where
let (iid_count, sid_count) = val.dim();

// 4 genotypes per byte so round up
let (iid_count_div4, _) = try_div_4(iid_count, sid_count)?;
let iid_count_div4_u64 = try_div_4(iid_count, sid_count)?;

// We create and write to a file.
// If there is an error, we will delete it.
if let Err(e) = write_internal(
path,
iid_count_div4,
iid_count_div4_u64,
val,
is_a1_counted,
missing,
Expand All @@ -764,7 +761,7 @@ where
#[anyinput]
fn write_internal<S, TVal>(
path: AnyPath,
iid_count_div4: usize,
iid_count_div4_u64: u64,
//val: &nd::ArrayView2<'_, TVal>,
val: &nd::ArrayBase<S, nd::Ix2>,
is_a1_counted: bool,
Expand Down Expand Up @@ -792,7 +789,7 @@ where
.parallel_map_scoped(scope, {
move |column| {
// Convert each column into a bytes_vector
let mut bytes_vector: Vec<u8> = vec![0; iid_count_div4]; // inits to 0
let mut bytes_vector: Vec<u8> = vec![0; iid_count_div4_u64 as usize]; // inits to 0
for (iid_i, &v0) in column.iter().enumerate() {
#[allow(clippy::eq_op)]
let genotype_byte = if v0 == homozygous_primary_allele {
Expand Down
58 changes: 29 additions & 29 deletions tests/tests_api_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use object_store::ObjectStore;
use std::collections::HashSet;
use std::panic::catch_unwind;
use std::sync::Arc;
use std::time::Instant;
use thousands::Separable;
use tokio::runtime;
use url::Url;
Expand Down Expand Up @@ -2586,31 +2585,32 @@ fn http_cloud_urls_md_4() -> Result<(), Box<BedErrorPlus>> {
Ok::<(), Box<BedErrorPlus>>(())
}

#[tokio::test]
async fn http_cloud_column_speed_test() -> Result<(), Box<dyn std::error::Error>> {
let mut bed_cloud = BedCloud::builder(
"https://www.ebi.ac.uk/biostudies/files/S-BSST936/genotypes/synthetic_v1_chr-10.bed",
[("timeout", "100s")],
)?
.skip_early_check()
.iid_count(1_008_000)
.sid_count(361_561)
.build()
.await?;

let start_time = Instant::now();
let val = ReadOptions::builder()
.iid_index(..1000)
.sid_index(..50)
.f32()
.read_cloud(&mut bed_cloud)
.await?;
let elapsed = start_time.elapsed();

println!("Time taken: {:?}", elapsed);
println!("Size of val: {} elements", val.len());
println!("Mean of val: {:?}", val.mean());
// assert_eq!(val.mean(), Some(0.03391369));

Ok(())
}
// cmk removed this test because it is too slow
// #[tokio::test]
// async fn http_cloud_column_speed_test() -> Result<(), Box<dyn std::error::Error>> {
// let mut bed_cloud = BedCloud::builder(
// "https://www.ebi.ac.uk/biostudies/files/S-BSST936/genotypes/synthetic_v1_chr-10.bed",
// [("timeout", "100s")],
// )?
// .skip_early_check()
// .iid_count(1_008_000)
// .sid_count(361_561)
// .build()
// .await?;

// let start_time = Instant::now();
// let val = ReadOptions::builder()
// .iid_index(..1000)
// .sid_index(..50)
// .f32()
// .read_cloud(&mut bed_cloud)
// .await?;
// let elapsed = start_time.elapsed();

// println!("Time taken: {:?}", elapsed);
// println!("Size of val: {} elements", val.len());
// println!("Mean of val: {:?}", val.mean());
// // assert_eq!(val.mean(), Some(0.03391369));

// Ok(())
// }

0 comments on commit 51d07ae

Please sign in to comment.