diff --git a/src/aldknni.rs b/src/aldknni.rs index 27ed60a..adbb4fd 100644 --- a/src/aldknni.rs +++ b/src/aldknni.rs @@ -1,6 +1,8 @@ use ndarray::{prelude::*, Zip}; +use rand::rngs::StdRng; use rand::seq::IteratorRandom; -use rand::{thread_rng, Rng}; +use rand::Rng; +use rand::SeedableRng; // use bincode::serialize_into; use std::cmp::Ordering; @@ -276,7 +278,7 @@ impl GenotypesAndPhenotypes { } else { *n_reps }; - let mut rng = thread_rng(); + let mut rng = StdRng::seed_from_u64(*j_local as u64); let vec_idx_non_missing: Vec = vec_idx_non_missing .into_iter() .choose_multiple(&mut rng, n_reps); @@ -555,117 +557,118 @@ impl GenotypesAndPhenotypes { let vec_idx_loci_idx_fin: Vec = vec_idx_all[1..n_chunks].to_owned(); assert_eq!(vec_idx_loci_idx_ini.len(), vec_idx_loci_idx_fin.len()); n_chunks = vec_idx_loci_idx_ini.len(); - // Instantiate thread object for parallel execution + // Instantiate vector of tuples containing the intermediate output file name, mae, and number of imputed data points + let mut vec_fname_intermediate_files_and_mae: Vec<(String, f64, f64)> = vec![]; + // Impute iteratively per chunk where imputation per chunk is parallel across the data subset let timer = std::time::Instant::now(); - let mut thread_objects: Vec<(String, f64, f64)> = vec![]; for i in 0..n_chunks { let idx_loci_idx_ini = vec_idx_loci_idx_ini[i]; let idx_loci_idx_fin = vec_idx_loci_idx_fin[i]; - let thread = std::thread::scope(|_scope| { - let (allele_frequencies, sum_mae, n_missing) = self - .per_chunk_aldknni( - (&idx_loci_idx_ini, &idx_loci_idx_fin, loci_idx), - ( - &vec_min_loci_corr, - &vec_max_pool_dist, - &min_l_loci, - &min_k_neighbours, - n_reps, - ), - (corr, restrict_linked_loci_per_chromosome), + let (allele_frequencies, sum_mae, n_missing) = self + .per_chunk_aldknni( + (&idx_loci_idx_ini, &idx_loci_idx_fin, loci_idx), + ( + &vec_min_loci_corr, + &vec_max_pool_dist, + &min_l_loci, + &min_k_neighbours, + n_reps, + ), + (corr, restrict_linked_loci_per_chromosome), + ) + .expect("Error executing per_chunk_aldknni() method on self_clone."); + // Write-out intermediate file + let idx_ini = loci_idx[idx_loci_idx_ini]; + let idx_fin = loci_idx[idx_loci_idx_fin]; + let time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Error extracting time in UNIX_EPOCH within write_csv() method for GenotypesAndPhenotypes struct.") + .as_secs_f64(); + let mut rng = rand::thread_rng(); + let random_number = rng.gen_range(1_000_000..10_000_000); + let n_digits: usize = loci_idx + .last() + .expect("Error extracting the last element of loci_idx. Probably empty.") + .to_owned() + .to_string() + .len(); + let mut start_index = idx_ini.to_string(); + let mut end_index = idx_fin.to_string(); + for _ in 0..(n_digits - start_index.len()) { + start_index = "0".to_owned() + &start_index; + } + for _ in 0..(n_digits - end_index.len()) { + end_index = "0".to_owned() + &end_index; + } + let fname_intermediate_file: String = prefix.to_owned() + + "-" + + &start_index + + "-" + + &end_index + + "-" + + &time.to_string() + + &random_number.to_string() + + ".tmp"; + let chromosome = self.chromosome[idx_ini..idx_fin].to_owned(); + let position = self.position[idx_ini..idx_fin].to_owned(); + let allele = self.allele[idx_ini..idx_fin].to_owned(); + let p = allele_frequencies.ncols(); + assert_eq!( + chromosome.len(), + p, + "Error, the number of chromosome names and the total number of loci are not equal." + ); + // Instantiate output file + println!( + "--> {}: Writing out intermediate file with expected MAE of {} (adjusted; unadjusted={}): {}", + i, + sensible_round(adjust_mae(sum_mae / n_missing).expect("Error adjusting MAE."), 4), + sensible_round(sum_mae / n_missing, 4), + &fname_intermediate_file + ); + let error_writing_file = + "Unable to create file: ".to_owned() + &fname_intermediate_file; + let mut file_out = OpenOptions::new() + .create_new(true) + .write(true) + .append(false) + .open(&fname_intermediate_file) + .expect(&error_writing_file); + // Write the header only for the first chunk + if i == 0 { + file_out + .write_all( + ("#chr,pos,allele,".to_owned() + &self.pool_names.join(",") + "\n").as_bytes(), ) - .expect("Error executing per_chunk_aldknni() method on self_clone."); - // Write-out intermediate file - let idx_ini = loci_idx[idx_loci_idx_ini]; - let idx_fin = loci_idx[idx_loci_idx_fin]; - let time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Error extracting time in UNIX_EPOCH within write_csv() method for GenotypesAndPhenotypes struct.") - .as_secs_f64(); - let mut rng = rand::thread_rng(); - let random_number = rng.gen_range(1_000_000..10_000_000); - let n_digits: usize = loci_idx - .last() - .expect("Error extracting the last element of loci_idx. Probably empty.") - .to_owned() - .to_string() - .len(); - let mut start_index = idx_ini.to_string(); - let mut end_index = idx_fin.to_string(); - for _ in 0..(n_digits - start_index.len()) { - start_index = "0".to_owned() + &start_index; - } - for _ in 0..(n_digits - end_index.len()) { - end_index = "0".to_owned() + &end_index; - } - let fname_intermediate_file: String = prefix.to_owned() - + "-" - + &start_index - + "-" - + &end_index - + "-" - + &time.to_string() - + &random_number.to_string() - + ".tmp"; - let chromosome = self.chromosome[idx_ini..idx_fin].to_owned(); - let position = self.position[idx_ini..idx_fin].to_owned(); - let allele = self.allele[idx_ini..idx_fin].to_owned(); - let p = allele_frequencies.ncols(); - assert_eq!(chromosome.len(), p, "Error, the number of chromosome names and the total number of loci are not equal."); - // Instantiate output file - println!( - "--> {}: Writing out intermediate file with expected MAE of {} (adjusted; unadjusted={}): {}", - i, - sensible_round(adjust_mae(sum_mae / n_missing).expect("Error adjusting MAE."), 4), - sensible_round(sum_mae / n_missing, 4), - &fname_intermediate_file - ); - let error_writing_file = - "Unable to create file: ".to_owned() + &fname_intermediate_file; - let mut file_out = OpenOptions::new() - .create_new(true) - .write(true) - .append(false) - .open(&fname_intermediate_file) - .expect(&error_writing_file); - // Write the header only for the first chunk - if i == 0 { - file_out - .write_all( - ("#chr,pos,allele,".to_owned() + &self.pool_names.join(",") + "\n").as_bytes(), - ) - .expect("Error calling write_all() within the write_csv() method for GenotypesAndPhenotypes struct."); - } - // Write allele frequencies line by line - for j in 0..p { - let line = [ - chromosome[j].to_owned(), - position[j].to_string(), - allele[j].to_owned(), - allele_frequencies - .column(j) - .iter() - .map(|&x| parse_f64_roundup_and_own(x, 6)) - .collect::>() - .join(","), - ] - .join(",") - + "\n"; - file_out.write_all(line.as_bytes()).expect("Error calling write_all() per line of the output file within the write_csv() method for GenotypesAndPhenotypes struct."); - } - (fname_intermediate_file, sum_mae, n_missing) - }); - thread_objects.push(thread); + .expect("Error calling write_all() within the write_csv() method for GenotypesAndPhenotypes struct."); + } + // Write allele frequencies line by line + for j in 0..p { + let line = [ + chromosome[j].to_owned(), + position[j].to_string(), + allele[j].to_owned(), + allele_frequencies + .column(j) + .iter() + .map(|&x| parse_f64_roundup_and_own(x, 6)) + .collect::>() + .join(","), + ] + .join(",") + + "\n"; + file_out.write_all(line.as_bytes()).expect("Error calling write_all() per line of the output file within the write_csv() method for GenotypesAndPhenotypes struct."); + } + vec_fname_intermediate_files_and_mae.push(( + fname_intermediate_file, + sum_mae, + n_missing, + )); } println!( "Duration parallel imputation {} seconds.", timer.elapsed().as_secs_f64() ); - // Concatenate output per chunk - let mut vec_fname_intermediate_files_and_mae: Vec<(String, f64, f64)> = vec![]; - for thread in thread_objects { - vec_fname_intermediate_files_and_mae.push(thread); - } // Sort by intermediate output filenames (named according to indices) vec_fname_intermediate_files_and_mae.sort_by(|a, b| { a.0.partial_cmp(&b.0)