Skip to content

Commit

Permalink
reverting to iterative for-loop instead of thread::scope which iterat…
Browse files Browse the repository at this point in the history
…es anyway
  • Loading branch information
jeffersonfparil committed Mar 10, 2024
1 parent 4c106db commit a18949d
Showing 1 changed file with 106 additions and 103 deletions.
209 changes: 106 additions & 103 deletions src/aldknni.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use ndarray::{prelude::*, Zip};
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::{thread_rng, Rng};
use rand::Rng;
use rand::SeedableRng;

// use bincode::serialize_into;
use std::cmp::Ordering;
Expand Down Expand Up @@ -276,7 +278,7 @@ impl GenotypesAndPhenotypes {
} else {
*n_reps
};
let mut rng = thread_rng();
let mut rng = StdRng::seed_from_u64(*j_local as u64);
let vec_idx_non_missing: Vec<usize> = vec_idx_non_missing
.into_iter()
.choose_multiple(&mut rng, n_reps);
Expand Down Expand Up @@ -555,117 +557,118 @@ impl GenotypesAndPhenotypes {
let vec_idx_loci_idx_fin: Vec<usize> = vec_idx_all[1..n_chunks].to_owned();
assert_eq!(vec_idx_loci_idx_ini.len(), vec_idx_loci_idx_fin.len());
n_chunks = vec_idx_loci_idx_ini.len();
// Instantiate thread object for parallel execution
// Instantiate vector of tuples containing the intermediate output file name, mae, and number of imputed data points
let mut vec_fname_intermediate_files_and_mae: Vec<(String, f64, f64)> = vec![];
// Impute iteratively per chunk where imputation per chunk is parallel across the data subset
let timer = std::time::Instant::now();
let mut thread_objects: Vec<(String, f64, f64)> = vec![];
for i in 0..n_chunks {
let idx_loci_idx_ini = vec_idx_loci_idx_ini[i];
let idx_loci_idx_fin = vec_idx_loci_idx_fin[i];
let thread = std::thread::scope(|_scope| {
let (allele_frequencies, sum_mae, n_missing) = self
.per_chunk_aldknni(
(&idx_loci_idx_ini, &idx_loci_idx_fin, loci_idx),
(
&vec_min_loci_corr,
&vec_max_pool_dist,
&min_l_loci,
&min_k_neighbours,
n_reps,
),
(corr, restrict_linked_loci_per_chromosome),
let (allele_frequencies, sum_mae, n_missing) = self
.per_chunk_aldknni(
(&idx_loci_idx_ini, &idx_loci_idx_fin, loci_idx),
(
&vec_min_loci_corr,
&vec_max_pool_dist,
&min_l_loci,
&min_k_neighbours,
n_reps,
),
(corr, restrict_linked_loci_per_chromosome),
)
.expect("Error executing per_chunk_aldknni() method on self_clone.");
// Write-out intermediate file
let idx_ini = loci_idx[idx_loci_idx_ini];
let idx_fin = loci_idx[idx_loci_idx_fin];
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Error extracting time in UNIX_EPOCH within write_csv() method for GenotypesAndPhenotypes struct.")
.as_secs_f64();
let mut rng = rand::thread_rng();
let random_number = rng.gen_range(1_000_000..10_000_000);
let n_digits: usize = loci_idx
.last()
.expect("Error extracting the last element of loci_idx. Probably empty.")
.to_owned()
.to_string()
.len();
let mut start_index = idx_ini.to_string();
let mut end_index = idx_fin.to_string();
for _ in 0..(n_digits - start_index.len()) {
start_index = "0".to_owned() + &start_index;
}
for _ in 0..(n_digits - end_index.len()) {
end_index = "0".to_owned() + &end_index;
}
let fname_intermediate_file: String = prefix.to_owned()
+ "-"
+ &start_index
+ "-"
+ &end_index
+ "-"
+ &time.to_string()
+ &random_number.to_string()
+ ".tmp";
let chromosome = self.chromosome[idx_ini..idx_fin].to_owned();
let position = self.position[idx_ini..idx_fin].to_owned();
let allele = self.allele[idx_ini..idx_fin].to_owned();
let p = allele_frequencies.ncols();
assert_eq!(
chromosome.len(),
p,
"Error, the number of chromosome names and the total number of loci are not equal."
);
// Instantiate output file
println!(
"--> {}: Writing out intermediate file with expected MAE of {} (adjusted; unadjusted={}): {}",
i,
sensible_round(adjust_mae(sum_mae / n_missing).expect("Error adjusting MAE."), 4),
sensible_round(sum_mae / n_missing, 4),
&fname_intermediate_file
);
let error_writing_file =
"Unable to create file: ".to_owned() + &fname_intermediate_file;
let mut file_out = OpenOptions::new()
.create_new(true)
.write(true)
.append(false)
.open(&fname_intermediate_file)
.expect(&error_writing_file);
// Write the header only for the first chunk
if i == 0 {
file_out
.write_all(
("#chr,pos,allele,".to_owned() + &self.pool_names.join(",") + "\n").as_bytes(),
)
.expect("Error executing per_chunk_aldknni() method on self_clone.");
// Write-out intermediate file
let idx_ini = loci_idx[idx_loci_idx_ini];
let idx_fin = loci_idx[idx_loci_idx_fin];
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Error extracting time in UNIX_EPOCH within write_csv() method for GenotypesAndPhenotypes struct.")
.as_secs_f64();
let mut rng = rand::thread_rng();
let random_number = rng.gen_range(1_000_000..10_000_000);
let n_digits: usize = loci_idx
.last()
.expect("Error extracting the last element of loci_idx. Probably empty.")
.to_owned()
.to_string()
.len();
let mut start_index = idx_ini.to_string();
let mut end_index = idx_fin.to_string();
for _ in 0..(n_digits - start_index.len()) {
start_index = "0".to_owned() + &start_index;
}
for _ in 0..(n_digits - end_index.len()) {
end_index = "0".to_owned() + &end_index;
}
let fname_intermediate_file: String = prefix.to_owned()
+ "-"
+ &start_index
+ "-"
+ &end_index
+ "-"
+ &time.to_string()
+ &random_number.to_string()
+ ".tmp";
let chromosome = self.chromosome[idx_ini..idx_fin].to_owned();
let position = self.position[idx_ini..idx_fin].to_owned();
let allele = self.allele[idx_ini..idx_fin].to_owned();
let p = allele_frequencies.ncols();
assert_eq!(chromosome.len(), p, "Error, the number of chromosome names and the total number of loci are not equal.");
// Instantiate output file
println!(
"--> {}: Writing out intermediate file with expected MAE of {} (adjusted; unadjusted={}): {}",
i,
sensible_round(adjust_mae(sum_mae / n_missing).expect("Error adjusting MAE."), 4),
sensible_round(sum_mae / n_missing, 4),
&fname_intermediate_file
);
let error_writing_file =
"Unable to create file: ".to_owned() + &fname_intermediate_file;
let mut file_out = OpenOptions::new()
.create_new(true)
.write(true)
.append(false)
.open(&fname_intermediate_file)
.expect(&error_writing_file);
// Write the header only for the first chunk
if i == 0 {
file_out
.write_all(
("#chr,pos,allele,".to_owned() + &self.pool_names.join(",") + "\n").as_bytes(),
)
.expect("Error calling write_all() within the write_csv() method for GenotypesAndPhenotypes struct.");
}
// Write allele frequencies line by line
for j in 0..p {
let line = [
chromosome[j].to_owned(),
position[j].to_string(),
allele[j].to_owned(),
allele_frequencies
.column(j)
.iter()
.map(|&x| parse_f64_roundup_and_own(x, 6))
.collect::<Vec<String>>()
.join(","),
]
.join(",")
+ "\n";
file_out.write_all(line.as_bytes()).expect("Error calling write_all() per line of the output file within the write_csv() method for GenotypesAndPhenotypes struct.");
}
(fname_intermediate_file, sum_mae, n_missing)
});
thread_objects.push(thread);
.expect("Error calling write_all() within the write_csv() method for GenotypesAndPhenotypes struct.");
}
// Write allele frequencies line by line
for j in 0..p {
let line = [
chromosome[j].to_owned(),
position[j].to_string(),
allele[j].to_owned(),
allele_frequencies
.column(j)
.iter()
.map(|&x| parse_f64_roundup_and_own(x, 6))
.collect::<Vec<String>>()
.join(","),
]
.join(",")
+ "\n";
file_out.write_all(line.as_bytes()).expect("Error calling write_all() per line of the output file within the write_csv() method for GenotypesAndPhenotypes struct.");
}
vec_fname_intermediate_files_and_mae.push((
fname_intermediate_file,
sum_mae,
n_missing,
));
}
println!(
"Duration parallel imputation {} seconds.",
timer.elapsed().as_secs_f64()
);
// Concatenate output per chunk
let mut vec_fname_intermediate_files_and_mae: Vec<(String, f64, f64)> = vec![];
for thread in thread_objects {
vec_fname_intermediate_files_and_mae.push(thread);
}
// Sort by intermediate output filenames (named according to indices)
vec_fname_intermediate_files_and_mae.sort_by(|a, b| {
a.0.partial_cmp(&b.0)
Expand Down

0 comments on commit a18949d

Please sign in to comment.