Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Apr 28, 2024
1 parent f174324 commit a18d2f6
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 170 deletions.
4 changes: 2 additions & 2 deletions kr2r/src/bin/annotate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use kr2r::compact_hash::{CHPage, Compact, HashConfig, K2Compact, Row, Slot};
use kr2r::utils::find_and_sort_files;
use kr2r::utils::{find_and_sort_files, open_file};
// use std::collections::HashMap;
use rayon::prelude::*;
use std::collections::HashMap;
Expand Down Expand Up @@ -192,7 +192,7 @@ fn process_chunk_file<P: AsRef<Path>>(
chunk_file: P,
hash_files: &Vec<PathBuf>,
) -> Result<()> {
let file = File::open(chunk_file)?;
let file = open_file(chunk_file)?;
let mut reader = BufReader::new(file);

let (page_index, _) = read_chunk_header(&mut reader)?;
Expand Down
2 changes: 1 addition & 1 deletion kr2r/src/bin/build_k2_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub fn run(args: Args, required_capacity: usize) -> Result<(), Box<dyn std::erro
);
}

let hash_filename = source.join("hash_config.k2d");
let hash_filename = k2d_dir.join("hash_config.k2d");
let partition = chunk_files.len();
let mut size: u64 = 0;

Expand Down
4 changes: 2 additions & 2 deletions kr2r/src/bin/estimate_capacity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::{error::ErrorKind, Error, Parser};
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use kr2r::args::KLMTArgs;
use kr2r::mmscanner::MinimizerScanner;
use kr2r::utils::{find_library_fna_files, format_bytes};
use kr2r::utils::{find_library_fna_files, format_bytes, open_file};
use kr2r::KBuildHasher;
use seq_io::fasta::{Reader, Record};
use seq_io::parallel::read_parallel;
Expand Down Expand Up @@ -70,7 +70,7 @@ fn process_sequence(
// 检查是否存在 JSON 文件
if args.cache && Path::new(&json_path).exists() {
// 如果存在,从文件读取并反序列化
let mut file = File::open(json_path).unwrap();
let mut file = open_file(json_path).unwrap();
let mut serialized_hllp = String::new();
file.read_to_string(&mut serialized_hllp).unwrap();
let hllp: HyperLogLogPlus<u64, KBuildHasher> =
Expand Down
7 changes: 3 additions & 4 deletions kr2r/src/bin/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use kr2r::compact_hash::{Compact, HashConfig, Row};
use kr2r::readcounts::{TaxonCounters, TaxonCountersDash};
use kr2r::report::report_kraken_style;
use kr2r::taxonomy::Taxonomy;
use kr2r::utils::find_and_sort_files;
use kr2r::utils::{find_and_sort_files, open_file};
use rayon::prelude::*;
use std::collections::HashMap;
use std::fs::File;
Expand All @@ -20,7 +20,7 @@ const BATCH_SIZE: usize = 8 * 1024 * 1024;
pub fn read_id_to_seq_map<P: AsRef<Path>>(
filename: P,
) -> Result<DashMap<u32, (String, String, u32, Option<u32>)>> {
let file = File::open(filename)?;
let file = open_file(filename)?;
let reader = BufReader::new(file);
let id_map = DashMap::new();

Expand Down Expand Up @@ -168,7 +168,6 @@ pub fn add_hitlist_string(
taxonomy: &Taxonomy,
) -> String {
let result1 = generate_hit_string(kmer_count1, &rows, taxonomy, value_mask, 0);
println!("result1 {:?}", result1);
if let Some(count) = kmer_count2 {
let result2 = generate_hit_string(count, &rows, taxonomy, value_mask, kmer_count1);
format!("{} |:| {}", result1, result2)
Expand Down Expand Up @@ -273,7 +272,7 @@ fn process_batch<P: AsRef<Path>>(
writer: &Mutex<Box<dyn Write + Send>>,
value_mask: usize,
) -> Result<(TaxonCountersDash, usize, DashSet<u32>)> {
let file = File::open(sample_file)?;
let file = open_file(sample_file)?;
let mut reader = BufReader::new(file);
let size = std::mem::size_of::<Row>();
let mut batch_buffer = vec![0u8; size * BATCH_SIZE];
Expand Down
15 changes: 5 additions & 10 deletions kr2r/src/bin/splitr.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use kr2r::compact_hash::{HashConfig, Slot};
use kr2r::mmscanner::MinimizerScanner;
use kr2r::seq::{self, SeqX};
use kr2r::seq::{self, open_fasta_reader, SeqX};
use kr2r::utils::{
create_partition_files, create_partition_writers, create_sample_file, detect_file_format,
get_file_limit, FileFormat,
Expand Down Expand Up @@ -265,8 +265,7 @@ fn process_fasta_file(

let line_index = AtomicUsize::new(0);

let reader =
seq_io::fasta::Reader::from_path(&file1).expect("Unable to create pair reader from paths");
let reader = open_fasta_reader(&file1).expect("Unable to create pair reader from paths");
read_parallel(
reader,
args.num_threads as u32,
Expand Down Expand Up @@ -340,8 +339,8 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> {
let mut sample_writer =
create_sample_file(args.chunk_dir.join(format!("sample_id_{}.map", file_index)));

match detect_file_format(&file_pair[0]) {
Ok(FileFormat::Fastq) => {
match detect_file_format(&file_pair[0])? {
FileFormat::Fastq => {
process_fastq_file(
&args,
meros,
Expand All @@ -352,7 +351,7 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> {
&mut sample_writer,
);
}
Ok(FileFormat::Fasta) => {
FileFormat::Fasta => {
process_fasta_file(
&args,
meros,
Expand All @@ -363,10 +362,6 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> {
&mut sample_writer,
);
}
Err(err) => {
println!("file {:?}: {:?}", &file_pair, err);
continue;
}
}
}
Ok(())
Expand Down
139 changes: 2 additions & 137 deletions kr2r/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::mmscanner::MinimizerScanner;
use crate::taxonomy::{NCBITaxonomy, Taxonomy};
use crate::Meros;

use crate::utils::open_file;
use byteorder::{LittleEndian, WriteBytesExt};
use rayon::prelude::*;
use seq_io::fasta::{Reader, Record};
Expand Down Expand Up @@ -123,7 +124,7 @@ pub fn process_k2file(

let page: Vec<AtomicU32> = (0..capacity).map(|_| AtomicU32::new(0)).collect();

let file = File::open(&chunk_file)?;
let file = open_file(&chunk_file)?;
let mut reader = BufReader::new(file);

let cell_size = std::mem::size_of::<Slot<u32>>();
Expand Down Expand Up @@ -152,142 +153,6 @@ pub fn process_k2file(
Ok(size_count)
}

/// 处理k2格式的临时文件,构建数据库
// pub fn process_k2file<P: AsRef<Path>>(
// chunk_file: P,
// chtm: &mut CHTableMut<u32>,
// taxonomy: &Taxonomy,
// ) -> IOResult<()> {
// let total_counter = AtomicUsize::new(0);
// let size_counter = AtomicUsize::new(0);

// let value_mask = chtm.config.value_mask;
// let value_bits = chtm.config.value_bits;

// let file = File::open(chunk_file)?;
// let mut reader = BufReader::new(file);

// let cell_size = std::mem::size_of::<Cell>();
// let batch_buffer_size = cell_size * BATCH_SIZE;
// let mut batch_buffer = vec![0u8; batch_buffer_size];

// while let Ok(bytes_read) = reader.read(&mut batch_buffer) {
// if bytes_read == 0 {
// break;
// } // 文件末尾

// // 处理读取的数据批次
// let cells_in_batch = bytes_read / cell_size;

// let cells = unsafe {
// std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Cell, cells_in_batch)
// };

// cells.iter().for_each(|cell| {
// let item = cell.as_slot();
// let item_taxid: u32 = item.value.right(value_mask).to_u32();

// if let Some((flag, mut slot)) = &chtm.set_page_cell(item) {
// let slot_taxid = slot.value.right(value_mask).to_u32();
// let new_taxid = taxonomy.lca(item_taxid, slot_taxid);
// if slot_taxid != new_taxid {
// slot.update_right(u32::from_u32(new_taxid), value_bits);
// chtm.update_cell(flag, slot);
// }
// } else {
// size_counter.fetch_add(1, Ordering::SeqCst);
// }
// });
// total_counter.fetch_add(cells.len(), Ordering::SeqCst);
// }

// println!("total_counter {:?}", total_counter.load(Ordering::SeqCst));
// chtm.copy_from_page();

// let size_count = size_counter.load(Ordering::SeqCst);
// let total_count = total_counter.load(Ordering::SeqCst);
// println!("size_count {:?}", size_count);
// println!("total_count {:?}", total_count);
// chtm.add_size(size_count);

// Ok(())
// }

// /// 直接处理fna文件构建数据库
// pub fn process_fna<P: AsRef<Path>>(
// fna_file: P,
// meros: Meros,
// chtm: &mut CHTableMut<u32>,
// taxonomy: &Taxonomy,
// id_to_taxon_map: &HashMap<String, u64>,
// threads: u32,
// ) {
// let reader = Reader::from_path(fna_file).unwrap();
// let queue_len = (threads - 2) as usize;

// let total_counter = AtomicUsize::new(0);
// let size_counter = AtomicUsize::new(0);
// let seq_counter = AtomicUsize::new(0);
// // let update_counter = AtomicUsize::new(0);

// // let capacity = chtm.config.capacity;
// let value_bits = chtm.config.value_bits;
// let hash_config = chtm.config;
// let value_mask = hash_config.value_mask;

// read_parallel(
// reader,
// threads,
// queue_len,
// |record_set| {
// let mut hash_set = BTreeSet::<Slot<u32>>::new();

// for record in record_set.into_iter() {
// seq_counter.fetch_add(1, Ordering::SeqCst);
// if let Ok(seq_id) = record.id() {
// if let Some(ext_taxid) = id_to_taxon_map.get(seq_id) {
// let taxid = taxonomy.get_internal_id(*ext_taxid);
// let kmer_iter = MinimizerScanner::new(record.seq(), meros)
// .into_iter()
// .map(|hash_key| hash_config.slot(hash_key, taxid as u32))
// .collect::<BTreeSet<Slot<u32>>>();

// total_counter.fetch_add(kmer_iter.len(), Ordering::SeqCst);
// hash_set.extend(kmer_iter);
// };
// }
// }
// hash_set
// },
// |record_sets| {
// while let Some(Ok((_, hash_set))) = record_sets.next() {
// hash_set.into_iter().for_each(|item| {
// let item_taxid = item.value.right(value_mask);
// if let Some(mut slot) = &chtm.set_table_cell(item.idx, item.value) {
// let slot_taxid = slot.value.right(value_mask);
// let new_taxid = taxonomy.lca(item_taxid, slot_taxid);
// if slot_taxid != new_taxid {
// slot.update_right(new_taxid, value_bits);
// chtm.update_cell(&1, slot);
// // update_counter.fetch_add(1, Ordering::SeqCst);
// }
// } else {
// size_counter.fetch_add(1, Ordering::SeqCst);
// }
// });
// }
// },
// );
// let size_count = size_counter.load(Ordering::SeqCst);
// let seq_count = seq_counter.load(Ordering::SeqCst);
// let total_count = total_counter.load(Ordering::SeqCst);
// // let update_count = update_counter.load(Ordering::SeqCst);
// println!("seq_count {:?}", seq_count);
// println!("size_count {:?}", size_count);
// println!("total_count {:?}", total_count);
// chtm.add_size(size_count);
// }

/// 生成taxonomy树文件
pub fn generate_taxonomy(
ncbi_taxonomy_directory: &PathBuf,
Expand Down
3 changes: 2 additions & 1 deletion kr2r/src/kr2r_data.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::utils::open_file;
// use crate::{Meros, CURRENT_REVCOM_VERSION};
use crate::{
BITS_PER_CHAR, CURRENT_REVCOM_VERSION, DEFAULT_KMER_LENGTH, DEFAULT_MINIMIZER_LENGTH,
Expand Down Expand Up @@ -126,7 +127,7 @@ impl IndexOptions {
}

pub fn read_index_options<P: AsRef<Path>>(file_path: P) -> IoResult<Self> {
let mut file = File::open(file_path)?;
let mut file = open_file(file_path)?;
let mut buffer = vec![0; std::mem::size_of::<Self>()];
file.read_exact(&mut buffer)?;

Expand Down
21 changes: 18 additions & 3 deletions kr2r/src/seq.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::mmscanner::MinimizerScanner;
use crate::utils::open_file;
use seq_io::fasta;
use seq_io::fasta::Record as FaRecord;
use seq_io::fastq;
Expand All @@ -10,7 +11,6 @@ use crate::utils::is_gzipped;
use crate::Meros;
use seq_io::policy::StdPolicy;
use std::collections::HashSet;
use std::fs::File;
use std::io;
use std::iter;
use std::path::Path;
Expand Down Expand Up @@ -80,6 +80,21 @@ pub trait SeqSet {
fn to_seq_reads(&self, score: i32, meros: Meros) -> HashSet<SeqReads>;
}

pub fn open_fasta_reader<P: AsRef<Path>>(
path1: P,
) -> io::Result<fasta::Reader<Box<dyn io::Read + Send>>> {
let mut file1 = open_file(&path1)?;

let read1: Box<dyn io::Read + Send> = if is_gzipped(&mut file1)? {
Box::new(GzDecoder::new(file1))
} else {
Box::new(file1)
};

let reader1 = fasta::Reader::new(read1);
Ok(reader1)
}

pub struct PairFastqReader<P = DefaultBufPolicy> {
reader1: fastq::Reader<Box<dyn io::Read + Send>, P>,
reader2: Option<fastq::Reader<Box<dyn io::Read + Send>, P>>,
Expand All @@ -90,7 +105,7 @@ impl<'a> PairFastqReader<DefaultBufPolicy> {
#[inline]
pub fn from_path<P: AsRef<Path>>(path1: P, path2: Option<P>) -> io::Result<PairFastqReader> {
// 分别打开两个文件
let mut file1 = File::open(&path1)?;
let mut file1 = open_file(&path1)?;

let read1: Box<dyn io::Read + Send> = if is_gzipped(&mut file1)? {
Box::new(GzDecoder::new(file1))
Expand All @@ -102,7 +117,7 @@ impl<'a> PairFastqReader<DefaultBufPolicy> {

let reader2 = match path2 {
Some(path2) => {
let mut file2 = File::open(path2)?;
let mut file2 = open_file(path2)?;
let read2: Box<dyn io::Read + Send> = if is_gzipped(&mut file2)? {
Box::new(GzDecoder::new(file2))
} else {
Expand Down
7 changes: 4 additions & 3 deletions kr2r/src/taxonomy.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::utils::open_file;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::fs::File;
Expand All @@ -13,7 +14,7 @@ pub fn parse_nodes_file<P: AsRef<Path>>(
HashMap<u64, String>,
HashSet<String>,
)> {
let nodes_file = File::open(nodes_filename)?;
let nodes_file = open_file(nodes_filename)?;
let reader = BufReader::new(nodes_file);

let mut parent_map = HashMap::new();
Expand Down Expand Up @@ -60,7 +61,7 @@ pub fn parse_nodes_file<P: AsRef<Path>>(

/// 解析 ncbi 文件的 taxonomy names 文件
pub fn parse_names_file<P: AsRef<Path>>(names_filename: P) -> Result<HashMap<u64, String>> {
let names_file = File::open(names_filename)?;
let names_file = open_file(names_filename)?;
let reader = BufReader::new(names_file);

let mut name_map = HashMap::new();
Expand Down Expand Up @@ -257,7 +258,7 @@ impl Taxonomy {
const MAGIC: &'static [u8] = b"K2TAXDAT"; // 替换为实际的 magic bytes

pub fn from_file<P: AsRef<Path> + Debug>(filename: P) -> Result<Taxonomy> {
let mut file = std::fs::File::open(&filename)?;
let mut file = open_file(&filename)?;

let mut magic = vec![0; Self::MAGIC.len()];
file.read_exact(&mut magic)?;
Expand Down
Loading

0 comments on commit a18d2f6

Please sign in to comment.