From a18d2f65f8af8d4269455d563dc9647ac4d8549d Mon Sep 17 00:00:00 2001 From: dagou Date: Sun, 28 Apr 2024 18:46:14 +0800 Subject: [PATCH] bug fix --- kr2r/src/bin/annotate.rs | 4 +- kr2r/src/bin/build_k2_db.rs | 2 +- kr2r/src/bin/estimate_capacity.rs | 4 +- kr2r/src/bin/resolve.rs | 7 +- kr2r/src/bin/splitr.rs | 15 ++-- kr2r/src/db.rs | 139 +----------------------------- kr2r/src/kr2r_data.rs | 3 +- kr2r/src/seq.rs | 21 ++++- kr2r/src/taxonomy.rs | 7 +- kr2r/src/utils.rs | 18 +++- ncbi/Cargo.toml | 6 +- 11 files changed, 56 insertions(+), 170 deletions(-) diff --git a/kr2r/src/bin/annotate.rs b/kr2r/src/bin/annotate.rs index 31e8933..39917bc 100644 --- a/kr2r/src/bin/annotate.rs +++ b/kr2r/src/bin/annotate.rs @@ -1,6 +1,6 @@ use clap::Parser; use kr2r::compact_hash::{CHPage, Compact, HashConfig, K2Compact, Row, Slot}; -use kr2r::utils::find_and_sort_files; +use kr2r::utils::{find_and_sort_files, open_file}; // use std::collections::HashMap; use rayon::prelude::*; use std::collections::HashMap; @@ -192,7 +192,7 @@ fn process_chunk_file>( chunk_file: P, hash_files: &Vec, ) -> Result<()> { - let file = File::open(chunk_file)?; + let file = open_file(chunk_file)?; let mut reader = BufReader::new(file); let (page_index, _) = read_chunk_header(&mut reader)?; diff --git a/kr2r/src/bin/build_k2_db.rs b/kr2r/src/bin/build_k2_db.rs index ab0a798..1eb1c17 100644 --- a/kr2r/src/bin/build_k2_db.rs +++ b/kr2r/src/bin/build_k2_db.rs @@ -104,7 +104,7 @@ pub fn run(args: Args, required_capacity: usize) -> Result<(), Box = diff --git a/kr2r/src/bin/resolve.rs b/kr2r/src/bin/resolve.rs index 6803bf5..1d2a0fd 100644 --- a/kr2r/src/bin/resolve.rs +++ b/kr2r/src/bin/resolve.rs @@ -5,7 +5,7 @@ use kr2r::compact_hash::{Compact, HashConfig, Row}; use kr2r::readcounts::{TaxonCounters, TaxonCountersDash}; use kr2r::report::report_kraken_style; use kr2r::taxonomy::Taxonomy; -use kr2r::utils::find_and_sort_files; +use kr2r::utils::{find_and_sort_files, open_file}; use rayon::prelude::*; use std::collections::HashMap; use std::fs::File; @@ -20,7 +20,7 @@ const BATCH_SIZE: usize = 8 * 1024 * 1024; pub fn read_id_to_seq_map>( filename: P, ) -> Result)>> { - let file = File::open(filename)?; + let file = open_file(filename)?; let reader = BufReader::new(file); let id_map = DashMap::new(); @@ -168,7 +168,6 @@ pub fn add_hitlist_string( taxonomy: &Taxonomy, ) -> String { let result1 = generate_hit_string(kmer_count1, &rows, taxonomy, value_mask, 0); - println!("result1 {:?}", result1); if let Some(count) = kmer_count2 { let result2 = generate_hit_string(count, &rows, taxonomy, value_mask, kmer_count1); format!("{} |:| {}", result1, result2) @@ -273,7 +272,7 @@ fn process_batch>( writer: &Mutex>, value_mask: usize, ) -> Result<(TaxonCountersDash, usize, DashSet)> { - let file = File::open(sample_file)?; + let file = open_file(sample_file)?; let mut reader = BufReader::new(file); let size = std::mem::size_of::(); let mut batch_buffer = vec![0u8; size * BATCH_SIZE]; diff --git a/kr2r/src/bin/splitr.rs b/kr2r/src/bin/splitr.rs index 9e961da..6a819c2 100644 --- a/kr2r/src/bin/splitr.rs +++ b/kr2r/src/bin/splitr.rs @@ -1,6 +1,6 @@ use kr2r::compact_hash::{HashConfig, Slot}; use kr2r::mmscanner::MinimizerScanner; -use kr2r::seq::{self, SeqX}; +use kr2r::seq::{self, open_fasta_reader, SeqX}; use kr2r::utils::{ create_partition_files, create_partition_writers, create_sample_file, detect_file_format, get_file_limit, FileFormat, @@ -265,8 +265,7 @@ fn process_fasta_file( let line_index = AtomicUsize::new(0); - let reader = - seq_io::fasta::Reader::from_path(&file1).expect("Unable to create pair reader from paths"); + let reader = open_fasta_reader(&file1).expect("Unable to create pair reader from paths"); read_parallel( reader, args.num_threads as u32, @@ -340,8 +339,8 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> { let mut sample_writer = create_sample_file(args.chunk_dir.join(format!("sample_id_{}.map", file_index))); - match detect_file_format(&file_pair[0]) { - Ok(FileFormat::Fastq) => { + match detect_file_format(&file_pair[0])? { + FileFormat::Fastq => { process_fastq_file( &args, meros, @@ -352,7 +351,7 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> { &mut sample_writer, ); } - Ok(FileFormat::Fasta) => { + FileFormat::Fasta => { process_fasta_file( &args, meros, @@ -363,10 +362,6 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> { &mut sample_writer, ); } - Err(err) => { - println!("file {:?}: {:?}", &file_pair, err); - continue; - } } } Ok(()) diff --git a/kr2r/src/db.rs b/kr2r/src/db.rs index c1ed7e9..5570cb5 100644 --- a/kr2r/src/db.rs +++ b/kr2r/src/db.rs @@ -4,6 +4,7 @@ use crate::mmscanner::MinimizerScanner; use crate::taxonomy::{NCBITaxonomy, Taxonomy}; use crate::Meros; +use crate::utils::open_file; use byteorder::{LittleEndian, WriteBytesExt}; use rayon::prelude::*; use seq_io::fasta::{Reader, Record}; @@ -123,7 +124,7 @@ pub fn process_k2file( let page: Vec = (0..capacity).map(|_| AtomicU32::new(0)).collect(); - let file = File::open(&chunk_file)?; + let file = open_file(&chunk_file)?; let mut reader = BufReader::new(file); let cell_size = std::mem::size_of::>(); @@ -152,142 +153,6 @@ pub fn process_k2file( Ok(size_count) } -/// 处理k2格式的临时文件,构建数据库 -// pub fn process_k2file>( -// chunk_file: P, -// chtm: &mut CHTableMut, -// taxonomy: &Taxonomy, -// ) -> IOResult<()> { -// let total_counter = AtomicUsize::new(0); -// let size_counter = AtomicUsize::new(0); - -// let value_mask = chtm.config.value_mask; -// let value_bits = chtm.config.value_bits; - -// let file = File::open(chunk_file)?; -// let mut reader = BufReader::new(file); - -// let cell_size = std::mem::size_of::(); -// let batch_buffer_size = cell_size * BATCH_SIZE; -// let mut batch_buffer = vec![0u8; batch_buffer_size]; - -// while let Ok(bytes_read) = reader.read(&mut batch_buffer) { -// if bytes_read == 0 { -// break; -// } // 文件末尾 - -// // 处理读取的数据批次 -// let cells_in_batch = bytes_read / cell_size; - -// let cells = unsafe { -// std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Cell, cells_in_batch) -// }; - -// cells.iter().for_each(|cell| { -// let item = cell.as_slot(); -// let item_taxid: u32 = item.value.right(value_mask).to_u32(); - -// if let Some((flag, mut slot)) = &chtm.set_page_cell(item) { -// let slot_taxid = slot.value.right(value_mask).to_u32(); -// let new_taxid = taxonomy.lca(item_taxid, slot_taxid); -// if slot_taxid != new_taxid { -// slot.update_right(u32::from_u32(new_taxid), value_bits); -// chtm.update_cell(flag, slot); -// } -// } else { -// size_counter.fetch_add(1, Ordering::SeqCst); -// } -// }); -// total_counter.fetch_add(cells.len(), Ordering::SeqCst); -// } - -// println!("total_counter {:?}", total_counter.load(Ordering::SeqCst)); -// chtm.copy_from_page(); - -// let size_count = size_counter.load(Ordering::SeqCst); -// let total_count = total_counter.load(Ordering::SeqCst); -// println!("size_count {:?}", size_count); -// println!("total_count {:?}", total_count); -// chtm.add_size(size_count); - -// Ok(()) -// } - -// /// 直接处理fna文件构建数据库 -// pub fn process_fna>( -// fna_file: P, -// meros: Meros, -// chtm: &mut CHTableMut, -// taxonomy: &Taxonomy, -// id_to_taxon_map: &HashMap, -// threads: u32, -// ) { -// let reader = Reader::from_path(fna_file).unwrap(); -// let queue_len = (threads - 2) as usize; - -// let total_counter = AtomicUsize::new(0); -// let size_counter = AtomicUsize::new(0); -// let seq_counter = AtomicUsize::new(0); -// // let update_counter = AtomicUsize::new(0); - -// // let capacity = chtm.config.capacity; -// let value_bits = chtm.config.value_bits; -// let hash_config = chtm.config; -// let value_mask = hash_config.value_mask; - -// read_parallel( -// reader, -// threads, -// queue_len, -// |record_set| { -// let mut hash_set = BTreeSet::>::new(); - -// for record in record_set.into_iter() { -// seq_counter.fetch_add(1, Ordering::SeqCst); -// if let Ok(seq_id) = record.id() { -// if let Some(ext_taxid) = id_to_taxon_map.get(seq_id) { -// let taxid = taxonomy.get_internal_id(*ext_taxid); -// let kmer_iter = MinimizerScanner::new(record.seq(), meros) -// .into_iter() -// .map(|hash_key| hash_config.slot(hash_key, taxid as u32)) -// .collect::>>(); - -// total_counter.fetch_add(kmer_iter.len(), Ordering::SeqCst); -// hash_set.extend(kmer_iter); -// }; -// } -// } -// hash_set -// }, -// |record_sets| { -// while let Some(Ok((_, hash_set))) = record_sets.next() { -// hash_set.into_iter().for_each(|item| { -// let item_taxid = item.value.right(value_mask); -// if let Some(mut slot) = &chtm.set_table_cell(item.idx, item.value) { -// let slot_taxid = slot.value.right(value_mask); -// let new_taxid = taxonomy.lca(item_taxid, slot_taxid); -// if slot_taxid != new_taxid { -// slot.update_right(new_taxid, value_bits); -// chtm.update_cell(&1, slot); -// // update_counter.fetch_add(1, Ordering::SeqCst); -// } -// } else { -// size_counter.fetch_add(1, Ordering::SeqCst); -// } -// }); -// } -// }, -// ); -// let size_count = size_counter.load(Ordering::SeqCst); -// let seq_count = seq_counter.load(Ordering::SeqCst); -// let total_count = total_counter.load(Ordering::SeqCst); -// // let update_count = update_counter.load(Ordering::SeqCst); -// println!("seq_count {:?}", seq_count); -// println!("size_count {:?}", size_count); -// println!("total_count {:?}", total_count); -// chtm.add_size(size_count); -// } - /// 生成taxonomy树文件 pub fn generate_taxonomy( ncbi_taxonomy_directory: &PathBuf, diff --git a/kr2r/src/kr2r_data.rs b/kr2r/src/kr2r_data.rs index afb3ac5..b993634 100644 --- a/kr2r/src/kr2r_data.rs +++ b/kr2r/src/kr2r_data.rs @@ -1,3 +1,4 @@ +use crate::utils::open_file; // use crate::{Meros, CURRENT_REVCOM_VERSION}; use crate::{ BITS_PER_CHAR, CURRENT_REVCOM_VERSION, DEFAULT_KMER_LENGTH, DEFAULT_MINIMIZER_LENGTH, @@ -126,7 +127,7 @@ impl IndexOptions { } pub fn read_index_options>(file_path: P) -> IoResult { - let mut file = File::open(file_path)?; + let mut file = open_file(file_path)?; let mut buffer = vec![0; std::mem::size_of::()]; file.read_exact(&mut buffer)?; diff --git a/kr2r/src/seq.rs b/kr2r/src/seq.rs index 52f3d21..18dc57e 100644 --- a/kr2r/src/seq.rs +++ b/kr2r/src/seq.rs @@ -1,4 +1,5 @@ use crate::mmscanner::MinimizerScanner; +use crate::utils::open_file; use seq_io::fasta; use seq_io::fasta::Record as FaRecord; use seq_io::fastq; @@ -10,7 +11,6 @@ use crate::utils::is_gzipped; use crate::Meros; use seq_io::policy::StdPolicy; use std::collections::HashSet; -use std::fs::File; use std::io; use std::iter; use std::path::Path; @@ -80,6 +80,21 @@ pub trait SeqSet { fn to_seq_reads(&self, score: i32, meros: Meros) -> HashSet; } +pub fn open_fasta_reader>( + path1: P, +) -> io::Result>> { + let mut file1 = open_file(&path1)?; + + let read1: Box = if is_gzipped(&mut file1)? { + Box::new(GzDecoder::new(file1)) + } else { + Box::new(file1) + }; + + let reader1 = fasta::Reader::new(read1); + Ok(reader1) +} + pub struct PairFastqReader

{ reader1: fastq::Reader, P>, reader2: Option, P>>, @@ -90,7 +105,7 @@ impl<'a> PairFastqReader { #[inline] pub fn from_path>(path1: P, path2: Option

) -> io::Result { // 分别打开两个文件 - let mut file1 = File::open(&path1)?; + let mut file1 = open_file(&path1)?; let read1: Box = if is_gzipped(&mut file1)? { Box::new(GzDecoder::new(file1)) @@ -102,7 +117,7 @@ impl<'a> PairFastqReader { let reader2 = match path2 { Some(path2) => { - let mut file2 = File::open(path2)?; + let mut file2 = open_file(path2)?; let read2: Box = if is_gzipped(&mut file2)? { Box::new(GzDecoder::new(file2)) } else { diff --git a/kr2r/src/taxonomy.rs b/kr2r/src/taxonomy.rs index 0d3615d..0de63a8 100644 --- a/kr2r/src/taxonomy.rs +++ b/kr2r/src/taxonomy.rs @@ -1,3 +1,4 @@ +use crate::utils::open_file; use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::fs::File; @@ -13,7 +14,7 @@ pub fn parse_nodes_file>( HashMap, HashSet, )> { - let nodes_file = File::open(nodes_filename)?; + let nodes_file = open_file(nodes_filename)?; let reader = BufReader::new(nodes_file); let mut parent_map = HashMap::new(); @@ -60,7 +61,7 @@ pub fn parse_nodes_file>( /// 解析 ncbi 文件的 taxonomy names 文件 pub fn parse_names_file>(names_filename: P) -> Result> { - let names_file = File::open(names_filename)?; + let names_file = open_file(names_filename)?; let reader = BufReader::new(names_file); let mut name_map = HashMap::new(); @@ -257,7 +258,7 @@ impl Taxonomy { const MAGIC: &'static [u8] = b"K2TAXDAT"; // 替换为实际的 magic bytes pub fn from_file + Debug>(filename: P) -> Result { - let mut file = std::fs::File::open(&filename)?; + let mut file = open_file(&filename)?; let mut magic = vec![0; Self::MAGIC.len()]; file.read_exact(&mut magic)?; diff --git a/kr2r/src/utils.rs b/kr2r/src/utils.rs index b64bd3a..e4ab328 100644 --- a/kr2r/src/utils.rs +++ b/kr2r/src/utils.rs @@ -6,7 +6,7 @@ use walkdir::WalkDir; /// 读取 seqid2taxid.map 文件。为了裁剪 ncbi 的 taxonomy 树 pub fn read_id_to_taxon_map>(filename: P) -> Result> { - let file = File::open(filename)?; + let file = open_file(filename)?; let reader = BufReader::new(file); let mut id_map = HashMap::new(); @@ -97,7 +97,7 @@ pub fn summary_prelim_map_files>(data_dir: P) -> Result let mut summary_file = File::create(&summary_file_path)?; // 遍历找到的文件并汇总内容 for path in prelim_map_files { - let file = File::open(path)?; + let file = open_file(path)?; let reader = BufReader::new(file); for line in reader.lines() { let line = line?; @@ -109,7 +109,7 @@ pub fn summary_prelim_map_files>(data_dir: P) -> Result } pub fn create_seqid2taxid_file>(prelim_map_file: P, output_file: P) -> Result<()> { - let file = File::open(prelim_map_file)?; + let file = open_file(prelim_map_file)?; let reader = BufReader::new(file); let mut output = File::create(output_file).unwrap(); @@ -159,7 +159,7 @@ pub fn is_gzipped(file: &mut File) -> io::Result { } pub fn detect_file_format>(path: P) -> io::Result { - let mut file = File::open(path)?; + let mut file = open_file(path)?; let read1: Box = if is_gzipped(&mut file)? { Box::new(GzDecoder::new(file)) } else { @@ -305,3 +305,13 @@ pub fn find_and_sort_files( // 返回排序后的文件路径 Ok(sorted_entries.into_iter().map(|(path, _)| path).collect()) } + +pub fn open_file>(path: P) -> io::Result { + File::open(&path).map_err(|e| { + if e.kind() == io::ErrorKind::NotFound { + io::Error::new(e.kind(), format!("File not found: {:?}", path.as_ref())) + } else { + e + } + }) +} diff --git a/ncbi/Cargo.toml b/ncbi/Cargo.toml index f6a95e8..88fd393 100644 --- a/ncbi/Cargo.toml +++ b/ncbi/Cargo.toml @@ -6,15 +6,15 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -reqwest = { version = "0.11", features = ["stream", "multipart", "gzip"] } +reqwest = { version = "0.12.4", features = ["stream", "multipart", "gzip"] } tokio = { version = "1", features = ["full"] } anyhow = "1.0" futures = "0.3" regex = "1" clap = { version = "4.4.10", features = ["derive"] } futures-util = "0.3.30" -reqwest-retry = "0.3" -reqwest-middleware = "0.2.4" +reqwest-retry = "0.5" +reqwest-middleware = "0.3" lazy_static = "1.4" log = "0.4" env_logger = "0.11.0"