From da60162de9089764ac24f7609f3e4fe6e09e7e94 Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 24 Apr 2024 09:56:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kr2r/src/args.rs | 25 +- kr2r/src/bin/annotate.rs | 36 ++- kr2r/src/bin/build_k2_db.rs | 93 +++--- kr2r/src/bin/estimate_capacity.rs | 8 +- kr2r/src/bin/hashshard.rs | 46 +-- kr2r/src/bin/inspect.rs | 2 +- kr2r/src/bin/resolve.rs | 15 +- kr2r/src/bin/seqid2taxid.rs | 8 +- kr2r/src/bin/splitr.rs | 15 +- kr2r/src/bin/squid.rs | 38 ++- kr2r/src/compact_hash.rs | 507 ++++-------------------------- kr2r/src/db.rs | 204 ++++++------ 12 files changed, 298 insertions(+), 699 deletions(-) diff --git a/kr2r/src/args.rs b/kr2r/src/args.rs index fc063a1..6bcc1be 100644 --- a/kr2r/src/args.rs +++ b/kr2r/src/args.rs @@ -15,16 +15,11 @@ pub const ONEGB: u64 = 1073741824; pub struct Build { /// ncbi library fna database directory #[arg(long = "db", required = true)] - pub source: PathBuf, - - /// Kraken 2 hash table filename, default = $database/hash.k2d - #[clap(short = 'H')] - pub hashtable_filename: Option, - - /// Kraken 2 options filename, default = $database/opts.k2d - #[clap(short = 'o')] - pub options_filename: Option, + pub database: PathBuf, + // /// Kraken 2 options filename, default = $database/opts.k2d + // #[clap(short = 'o')] + // pub options_filename: Option, /// 包含原始配置 #[clap(flatten)] pub klmt: KLMTArgs, @@ -41,9 +36,9 @@ pub struct Build { #[derive(Parser, Debug, Clone)] #[clap(version, about = "taxonomy")] pub struct Taxo { - /// Kraken 2 taxonomy filename, default = $database/taxo.k2d - #[clap(short = 't')] - pub taxonomy_filename: Option, + // /// Kraken 2 taxonomy filename, default = $database/taxo.k2d + // #[clap(short = 't')] + // pub taxonomy_filename: Option, // #[clap(short = 'm', required = true)] // pub id_to_taxon_map_filename: PathBuf, @@ -75,7 +70,11 @@ const BATCH_SIZE: usize = 8 * 1024 * 1024; pub struct ClassifyArgs { /// database hash chunk directory and other files #[clap(long)] - pub hash_dir: PathBuf, + pub k2d_dir: PathBuf, + + /// Enables use of a Kraken 2 compatible shared database. Default is false. + #[clap(long, default_value_t = false)] + pub kraken_db_type: bool, // /// The file path for the Kraken 2 options. // #[clap(short = 'o', long = "options-filename", value_parser, required = true)] diff --git a/kr2r/src/bin/annotate.rs b/kr2r/src/bin/annotate.rs index 0a29769..b067e2a 100644 --- a/kr2r/src/bin/annotate.rs +++ b/kr2r/src/bin/annotate.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{Parser, ValueEnum}; use kr2r::compact_hash::{CHPage, Compact, HashConfig, K2Compact, Row, Slot}; use kr2r::utils::find_and_sort_files; // use std::collections::HashMap; @@ -13,6 +13,11 @@ use std::time::Instant; // 定义每批次处理的 Slot 数量 pub const BATCH_SIZE: usize = 8 * 1024 * 1024; +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +enum DbType { + Kraken, + Squid, +} /// Command line arguments for the splitr program. /// /// This structure defines the command line arguments that are accepted by the splitr program. @@ -26,13 +31,11 @@ pub const BATCH_SIZE: usize = 8 * 1024 * 1024; pub struct Args { /// database hash chunk directory and other files #[clap(long)] - pub hash_dir: PathBuf, - // /// The file path for the Kraken 2 index. - // #[clap(short = 'H', long = "index-filename", value_parser, required = true)] - // index_filename: PathBuf, - /// The file path for the Kraken 2 options. - // #[clap(short = 'o', long = "options-filename", value_parser, required = true)] - // options_filename: String, + pub k2d_dir: PathBuf, + + /// Enables use of a Kraken 2 compatible shared database. Default is false. + #[clap(long, default_value_t = false)] + pub kraken_db_type: bool, /// chunk directory #[clap(long)] @@ -97,9 +100,10 @@ fn process_batch( chtm: &K, chunk_dir: PathBuf, batch_size: usize, + kraken_db_type: bool, ) -> std::io::Result<()> where - K: K2Compact + Send, + K: K2Compact + Send, R: Read + Send, { let slot_size = std::mem::size_of::>(); @@ -129,7 +133,7 @@ where .into_par_iter() .filter_map(|slot| { let indx = slot.idx & idx_mask; - let taxid = chtm.get_from_page(indx, slot.value); + let taxid = chtm.get_from_page(indx, slot.value, kraken_db_type); if taxid > 0 { let kmer_id = slot.idx >> idx_bits; @@ -200,7 +204,7 @@ fn process_chunk_file>( let start = Instant::now(); - let config = HashConfig::::from_hash_header(&args.hash_dir.join("hash_config.k2d"))?; + let config = HashConfig::from_hash_header(&args.k2d_dir.join("hash_config.k2d"))?; let parition = hash_files.len(); let chtm = CHPage::from( config, @@ -211,7 +215,13 @@ fn process_chunk_file>( let duration = start.elapsed(); // 打印运行时间 println!("load table took: {:?}", duration); - process_batch(&mut reader, &chtm, args.chunk_dir.clone(), args.batch_size)?; + process_batch( + &mut reader, + &chtm, + args.chunk_dir.clone(), + args.batch_size, + args.kraken_db_type, + )?; Ok(()) } @@ -219,7 +229,7 @@ fn process_chunk_file>( pub fn run(args: Args) -> Result<()> { let chunk_files = find_and_sort_files(&args.chunk_dir, "sample", ".k2")?; - let hash_files = find_and_sort_files(&args.hash_dir, "hash", ".k2d")?; + let hash_files = find_and_sort_files(&args.k2d_dir, "hash", ".k2d")?; // 开始计时 let start = Instant::now(); diff --git a/kr2r/src/bin/build_k2_db.rs b/kr2r/src/bin/build_k2_db.rs index 6f2d2d9..bd3c7e7 100644 --- a/kr2r/src/bin/build_k2_db.rs +++ b/kr2r/src/bin/build_k2_db.rs @@ -1,13 +1,10 @@ // 使用时需要引用模块路径 use clap::Parser; -use kr2r::args::{Build, Taxo, ONEGB, U32MAXPLUS}; -use kr2r::compact_hash::{CHTableMut, HashConfig}; +use kr2r::args::{parse_size, Build, Taxo}; +use kr2r::compact_hash::HashConfig; use kr2r::db::{ - convert_fna_to_k2_format, - generate_taxonomy, - get_bits_for_taxid, - process_k2file, - // process_k2file1, + convert_fna_to_k2_format, generate_taxonomy, get_bits_for_taxid, process_k2file, + write_config_to_file, }; use kr2r::utils::{ create_partition_files, create_partition_writers, find_library_fna_files, format_bytes, @@ -24,21 +21,19 @@ pub struct Args { #[clap(flatten)] pub build: Build, + /// database hash chunk directory and other files + #[clap(long)] + pub k2d_dir: Option, + #[clap(flatten)] pub taxo: Taxo, - // // /// Name of Kraken 2 database - // // #[arg(short, long = "db")] - // // database: PathBuf, - // #[arg(short = 'c', long, required = true)] - // pub required_capacity: u64, - /// chunk directory + #[clap(long, value_parser = parse_size, default_value = "1G", help = "Specifies the hash file capacity.\nAcceptable formats include numeric values followed by 'K', 'M', or 'G' (e.g., '1.5G', '250M', '1024K').\nNote: The specified capacity affects the index size, with a factor of 4 applied.\nFor example, specifying '1G' results in an index size of '4G'.\nDefault: 1G (capacity 1G = file size 4G)")] + pub hash_capacity: usize, + + /// chunk temp directory #[clap(long)] pub chunk_dir: PathBuf, - - /// chunk size 1-4(GB) [1073741824-4294967295] default: 1GB - #[clap(long, value_parser = clap::value_parser!(u64).range(ONEGB..U32MAXPLUS + 1), default_value_t = ONEGB)] - pub chunk_size: u64, } pub fn run(args: Args, required_capacity: usize) -> Result<(), Box> { @@ -48,19 +43,19 @@ pub fn run(args: Args, required_capacity: usize) -> Result<(), Box Result<(), Box::new(capacity, value_bits, 0, 0, 0); + let partition = (capacity + args.hash_capacity - 1) / args.hash_capacity; + let hash_config = HashConfig::new(capacity, value_bits, 0, partition, args.hash_capacity); // 开始计时 let start = Instant::now(); - let chunk_size = args.chunk_size as usize; - - let partition = (capacity + chunk_size - 1) / chunk_size; + let chunk_size = args.hash_capacity as usize; if partition >= file_num_limit { panic!("Exceeds File Number Limit"); @@ -93,12 +87,7 @@ pub fn run(args: Args, required_capacity: usize) -> Result<(), Box Result<(), Box usize { let mut hllp: HyperLogLogPlus = HyperLogLogPlus::new(16, KBuildHasher::default()).unwrap(); - let source: PathBuf = args.source.clone(); + let source: PathBuf = args.database.clone(); let fna_files = if source.is_file() { vec![source.to_string_lossy().to_string()] } else { - find_library_fna_files(args.source) + find_library_fna_files(args.database) }; for fna_file in fna_files { let args_clone = Args { - source: source.clone(), + database: source.clone(), ..args }; let local_hllp = process_sequence(&fna_file, args_clone); diff --git a/kr2r/src/bin/hashshard.rs b/kr2r/src/bin/hashshard.rs index 465fe7f..6a1f211 100644 --- a/kr2r/src/bin/hashshard.rs +++ b/kr2r/src/bin/hashshard.rs @@ -1,5 +1,7 @@ use clap::Parser; +use kr2r::args::parse_size; use kr2r::compact_hash::HashConfig; +use memmap2::MmapOptions; use std::fs::{self, create_dir_all, File, OpenOptions}; use std::io::BufWriter; use std::io::{Result as IOResult, Write}; @@ -7,8 +9,6 @@ use std::path::Path; use std::path::PathBuf; use std::time::Instant; -use memmap2::MmapOptions; - fn mmap_read_write, Q: AsRef>( source_path: P, dest_path: Q, @@ -35,32 +35,16 @@ fn mmap_read_write, Q: AsRef>( Ok(()) } -fn parse_size(s: &str) -> Result { - let len = s.len(); - if len < 2 { - return Err("Size must be at least two characters".to_string()); - } - - let (num, suffix) = s.split_at(len - 1); - let number: f64 = num.parse().map_err(|_| "Invalid number".to_string())?; - match suffix { - "G" | "g" => Ok((number * 1_073_741_824.0) as usize), // 2^30 - "M" | "m" => Ok((number * 1_048_576.0) as usize), // 2^20 - "K" | "k" => Ok((number * 1_024.0) as usize), // 2^10 - _ => Err("Invalid size suffix. Use 'G', 'M', or 'K'".to_string()), - } -} - #[derive(Parser, Debug, Clone)] #[clap(version, about = "split hash file", long_about = "split hash file")] pub struct Args { /// The database directory for the Kraken 2 index. contains index files(hash.k2d opts.k2d taxo.k2d) - #[clap(long, value_parser, required = true)] - db: PathBuf, + #[clap(long = "db", value_parser, required = true)] + database: PathBuf, /// database hash chunk directory and other files #[clap(long)] - hash_dir: Option, + k2d_dir: Option, /// Specifies the hash file capacity. Acceptable formats include numeric values followed by 'K', 'M', or 'G' (e.g., '1.5G', '250M', '1024K'). /// Note: The specified capacity affects the index size, with a factor of 4 applied. For example, specifying '1G' results in an index size of '4G'. @@ -70,8 +54,8 @@ pub struct Args { } pub fn run(args: Args) -> IOResult<()> { - let index_filename = &args.db.join("hash.k2d"); - let hash_config = HashConfig::::from(index_filename)?; + let index_filename = &args.database.join("hash.k2d"); + let hash_config = HashConfig::from(index_filename)?; let partition = (hash_config.capacity + args.hash_capacity - 1) / args.hash_capacity; println!("start..."); @@ -81,11 +65,11 @@ pub fn run(args: Args) -> IOResult<()> { let file_len = hash_config.capacity * 4 + 32; let b_size = std::mem::size_of::(); - let hash_dir = args.hash_dir.unwrap_or(args.db.clone()); + let k2d_dir = args.k2d_dir.unwrap_or(args.database.clone()); - create_dir_all(&hash_dir).expect(&format!("create hash dir error {:?}", hash_dir)); + create_dir_all(&k2d_dir).expect(&format!("create hash dir error {:?}", k2d_dir)); - let config_file = hash_dir.join("hash_config.k2d"); + let config_file = k2d_dir.join("hash_config.k2d"); if config_file.exists() { panic!("hash config is exists!!!"); } @@ -100,7 +84,7 @@ pub fn run(args: Args) -> IOResult<()> { )?; for i in 1..=partition { - let chunk_file = hash_dir.join(format!("hash_{}.k2d", i)); + let chunk_file = k2d_dir.join(format!("hash_{}.k2d", i)); let offset = (32 + args.hash_capacity * (i - 1) * b_size) as u64; let mut length = args.hash_capacity * b_size; if (offset as usize + length) > file_len { @@ -116,14 +100,14 @@ pub fn run(args: Args) -> IOResult<()> { // 打印运行时间 println!("hashshard took: {:?}", duration); - let source_taxo_file = &args.db.join("taxo.k2d"); - let dst_tax_file = hash_dir.join("taxo.k2d"); + let source_taxo_file = &args.database.join("taxo.k2d"); + let dst_tax_file = k2d_dir.join("taxo.k2d"); if !dst_tax_file.exists() { fs::copy(source_taxo_file, dst_tax_file)?; } - let source_opts_file = &args.db.join("opts.k2d"); - let dst_opts_file = hash_dir.join("opts.k2d"); + let source_opts_file = &args.database.join("opts.k2d"); + let dst_opts_file = k2d_dir.join("opts.k2d"); if !dst_opts_file.exists() { fs::copy(source_opts_file, dst_opts_file)?; } diff --git a/kr2r/src/bin/inspect.rs b/kr2r/src/bin/inspect.rs index dc84b46..a4f9127 100644 --- a/kr2r/src/bin/inspect.rs +++ b/kr2r/src/bin/inspect.rs @@ -36,7 +36,7 @@ fn main() -> Result<()> { println!("taxonomy node count {:?}", taxo.node_count()); } - let config = HashConfig::::from(args.index_filename.clone())?; + let config = HashConfig::from(args.index_filename.clone())?; println!("compact hash table {:?}", config); if args.value_count { diff --git a/kr2r/src/bin/resolve.rs b/kr2r/src/bin/resolve.rs index 31d0840..6803bf5 100644 --- a/kr2r/src/bin/resolve.rs +++ b/kr2r/src/bin/resolve.rs @@ -88,7 +88,11 @@ fn generate_hit_string( // 填充尾随0 if last_pos < count - 1 { - result.push((0, count - last_pos - 1)); + if last_pos == 0 { + result.push((0, count - last_pos)); + } else { + result.push((0, count - last_pos - 1)); + } } result @@ -164,6 +168,7 @@ pub fn add_hitlist_string( taxonomy: &Taxonomy, ) -> String { let result1 = generate_hit_string(kmer_count1, &rows, taxonomy, value_mask, 0); + println!("result1 {:?}", result1); if let Some(count) = kmer_count2 { let result2 = generate_hit_string(count, &rows, taxonomy, value_mask, kmer_count1); format!("{} |:| {}", result1, result2) @@ -216,7 +221,7 @@ pub fn count_values( pub struct Args { /// database hash chunk directory and other files #[clap(long)] - pub hash_dir: PathBuf, + pub k2d_dir: PathBuf, /// chunk directory #[clap(long, value_parser, required = true)] @@ -352,15 +357,15 @@ fn process_batch>( } pub fn run(args: Args) -> Result<()> { - let hash_dir = &args.hash_dir; - let taxonomy_filename = hash_dir.join("taxo.k2d"); + let k2d_dir = &args.k2d_dir; + let taxonomy_filename = k2d_dir.join("taxo.k2d"); let taxo = Taxonomy::from_file(taxonomy_filename)?; let sample_files = find_and_sort_files(&args.chunk_dir, "sample_file", ".bin")?; let sample_id_files = find_and_sort_files(&args.chunk_dir, "sample_id", ".map")?; let partition = sample_files.len(); - let hash_config = HashConfig::::from_hash_header(&args.hash_dir.join("hash_config.k2d"))?; + let hash_config = HashConfig::from_hash_header(&args.k2d_dir.join("hash_config.k2d"))?; let value_mask = hash_config.value_mask; let mut total_taxon_counts = TaxonCounters::new(); diff --git a/kr2r/src/bin/seqid2taxid.rs b/kr2r/src/bin/seqid2taxid.rs index 27c86ba..3b49c27 100644 --- a/kr2r/src/bin/seqid2taxid.rs +++ b/kr2r/src/bin/seqid2taxid.rs @@ -9,18 +9,18 @@ use std::path::PathBuf; pub struct Args { /// the database directory #[arg(long, required = true)] - pub source: PathBuf, + pub database: PathBuf, - /// seqid2taxid.map file path, default = $source/seqid2taxid.map + /// seqid2taxid.map file path, default = $database/seqid2taxid.map #[arg(short = 'm', long)] pub id_to_taxon_map_filename: Option, } pub fn run(args: Args) -> Result<()> { - let prelim_file = summary_prelim_map_files(&args.source)?; + let prelim_file = summary_prelim_map_files(&args.database)?; let map_file = args .id_to_taxon_map_filename - .unwrap_or(args.source.join("seqid2taxid.map")); + .unwrap_or(args.database.join("seqid2taxid.map")); if map_file.exists() { println!("id_to_taxon_map_filename {:?} exists", map_file); diff --git a/kr2r/src/bin/splitr.rs b/kr2r/src/bin/splitr.rs index 1fbf428..9e961da 100644 --- a/kr2r/src/bin/splitr.rs +++ b/kr2r/src/bin/splitr.rs @@ -30,7 +30,7 @@ use clap::Parser; pub struct Args { /// database hash chunk directory and other files #[clap(long)] - pub hash_dir: PathBuf, + pub k2d_dir: PathBuf, // /// The file path for the Kraken 2 options. // #[clap(short = 'o', long = "options-filename", value_parser, required = true)] @@ -119,7 +119,7 @@ fn get_lastest_file_index(file_path: &PathBuf) -> Result { /// 处理record fn process_record( iter: I, - hash_config: &HashConfig, + hash_config: &HashConfig, seq_id: u64, chunk_size: usize, idx_bits: usize, @@ -164,7 +164,7 @@ fn write_data_to_file( fn process_fastq_file( args: &Args, meros: Meros, - hash_config: HashConfig, + hash_config: HashConfig, file_index: usize, files: &[String], writers: &mut Vec>, @@ -249,7 +249,7 @@ fn process_fastq_file( fn process_fasta_file( args: &Args, meros: Meros, - hash_config: HashConfig, + hash_config: HashConfig, file_index: usize, files: &[String], writers: &mut Vec>, @@ -311,7 +311,7 @@ fn process_fasta_file( ) } -fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> { +fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> { let partition = hash_config.partition; let mut writers: Vec> = init_chunk_writers(&args, partition, hash_config.hash_capacity); @@ -386,7 +386,7 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> pub fn run(args: Args) -> Result<()> { // let args = Args::parse(); - let options_filename = &args.hash_dir.join("opts.k2d"); + let options_filename = &args.k2d_dir.join("opts.k2d"); let idx_opts = IndexOptions::read_index_options(options_filename)?; if args.paired_end_processing && !args.single_file_pairs && args.input_files.len() % 2 != 0 { @@ -396,8 +396,9 @@ pub fn run(args: Args) -> Result<()> { "Paired-end processing requires an even number of input files.", )); } - let hash_config = HashConfig::::from_hash_header(&args.hash_dir.join("hash_config.k2d"))?; + let hash_config = HashConfig::from_hash_header(&args.k2d_dir.join("hash_config.k2d"))?; + println!("hash_config {:?}", hash_config); if hash_config.hash_capacity == 0 { panic!("`hash_capacity` can't be zero!"); } diff --git a/kr2r/src/bin/squid.rs b/kr2r/src/bin/squid.rs index 5480ad3..9c9a455 100644 --- a/kr2r/src/bin/squid.rs +++ b/kr2r/src/bin/squid.rs @@ -8,9 +8,9 @@ mod seqid2taxid; mod splitr; use kr2r::args::ClassifyArgs; -use kr2r::args::{Build, Taxo, ONEGB, U32MAXPLUS}; +use kr2r::args::{parse_size, Build, Taxo}; use kr2r::utils::find_and_sort_files; -use std::io::Result; +// use std::io::Result; use std::path::PathBuf; use std::time::Instant; @@ -20,6 +20,10 @@ struct BuildArgs { #[clap(flatten)] pub build: Build, + /// database hash chunk directory and other files + #[clap(long)] + pub k2d_dir: Option, + #[clap(flatten)] taxo: Taxo, @@ -30,9 +34,8 @@ struct BuildArgs { #[clap(long)] chunk_dir: PathBuf, - /// chunk size 1-4(GB) [1073741824-4294967295] - #[clap(long, value_parser = clap::value_parser!(u64).range(ONEGB..U32MAXPLUS + 1), default_value_t = ONEGB)] - chunk_size: u64, + #[clap(long, value_parser = parse_size, default_value = "1G", help = "Specifies the hash file capacity.\nAcceptable formats include numeric values followed by 'K', 'M', or 'G' (e.g., '1.5G', '250M', '1024K').\nNote: The specified capacity affects the index size, with a factor of 4 applied.\nFor example, specifying '1G' results in an index size of '4G'.\nDefault: 1G (capacity 1G = file size 4G)")] + pub hash_capacity: usize, /// estimate capacity from cache if exists #[arg(long, default_value_t = true)] @@ -59,7 +62,7 @@ struct Args { impl From for splitr::Args { fn from(item: ClassifyArgs) -> Self { Self { - hash_dir: item.hash_dir, + k2d_dir: item.k2d_dir, paired_end_processing: item.paired_end_processing, single_file_pairs: item.single_file_pairs, minimum_quality_score: item.minimum_quality_score, @@ -73,9 +76,10 @@ impl From for splitr::Args { impl From for annotate::Args { fn from(item: ClassifyArgs) -> Self { Self { - hash_dir: item.hash_dir, + k2d_dir: item.k2d_dir, chunk_dir: item.chunk_dir, batch_size: item.batch_size, + kraken_db_type: item.kraken_db_type, } } } @@ -83,7 +87,7 @@ impl From for annotate::Args { impl From for resolve::Args { fn from(item: ClassifyArgs) -> Self { Self { - hash_dir: item.hash_dir, + k2d_dir: item.k2d_dir, chunk_dir: item.chunk_dir, batch_size: item.batch_size, confidence_threshold: item.confidence_threshold, @@ -99,7 +103,7 @@ impl From for resolve::Args { impl From for estimate_capacity::Args { fn from(item: BuildArgs) -> Self { Self { - source: item.build.source, + database: item.build.database, klmt: item.build.klmt, cache: item.cache, n: item.max_n, @@ -113,9 +117,10 @@ impl From for build_k2_db::Args { fn from(item: BuildArgs) -> Self { Self { build: item.build, + k2d_dir: item.k2d_dir, taxo: item.taxo, chunk_dir: item.chunk_dir, - chunk_size: item.chunk_size, + hash_capacity: item.hash_capacity, } } } @@ -123,7 +128,7 @@ impl From for build_k2_db::Args { impl From for seqid2taxid::Args { fn from(item: BuildArgs) -> Self { Self { - source: item.build.source, + database: item.build.database, id_to_taxon_map_filename: item.taxo.id_to_taxon_map_filename, } } @@ -141,7 +146,7 @@ enum Commands { Classify(ClassifyArgs), } -fn main() -> Result<()> { +fn main() -> Result<(), Box> { let args = Args::parse(); match args.cmd { @@ -158,9 +163,7 @@ fn main() -> Result<()> { let required_capacity = estimate_capacity::run(ec_args); let build_args = build_k2_db::Args::from(cmd_args.clone()); - if let Err(e) = build_k2_db::run(build_args, required_capacity) { - panic!("Application error: {}", e); - } + build_k2_db::run(build_args, required_capacity)?; } Commands::Hashshard(cmd_args) => { hashshard::run(cmd_args)?; @@ -180,7 +183,10 @@ fn main() -> Result<()> { let splitr_args = splitr::Args::from(cmd_args.clone()); let chunk_files = find_and_sort_files(&splitr_args.chunk_dir, "sample", ".k2")?; if !chunk_files.is_empty() { - panic!("{} must be empty", &splitr_args.chunk_dir.display()); + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + format!("{} must be empty", &splitr_args.chunk_dir.display()), + ))); } splitr::run(splitr_args)?; let annotate_args = annotate::Args::from(cmd_args.clone()); diff --git a/kr2r/src/compact_hash.rs b/kr2r/src/compact_hash.rs index 138584f..08ee113 100644 --- a/kr2r/src/compact_hash.rs +++ b/kr2r/src/compact_hash.rs @@ -1,9 +1,8 @@ use byteorder::{ByteOrder, LittleEndian}; -use memmap2::{Mmap, MmapMut, MmapOptions}; +use memmap2::{Mmap, MmapOptions}; use std::cmp::Ordering as CmpOrdering; use std::fs::OpenOptions; -use std::io::{Error, ErrorKind, Result}; -use std::marker::PhantomData; +use std::io::Result; use std::path::Path; /// 1101010101 => left: 11010, right: 10101; @@ -139,14 +138,12 @@ where let slot_ptr = self as *const Self as *const u8; unsafe { std::slice::from_raw_parts(slot_ptr, slot_size) } } +} - pub fn get_seq_id(&self) -> B { +impl Slot { + pub fn get_seq_id(&self) -> u64 { self.value.right(0xFFFFFFFF) } - - pub fn to_b(&self, left: B) -> B { - B::combined(left, self.value.right(0), 0) - } } // 实现 PartialOrd,只比较 index 字段 @@ -169,42 +166,14 @@ where } } -/// 与Slot的区别,只是idx的类型 -#[repr(C)] -pub struct Cell { - pub idx: u32, - value: u32, -} - -impl Cell { - pub fn new(idx: u32, value: u32) -> Self { - Self { idx, value } - } - - pub fn as_slice(&self, cell_size: usize) -> &[u8] { - let cell_ptr = self as *const Self as *const u8; - unsafe { std::slice::from_raw_parts(cell_ptr, cell_size) } - } - - pub fn as_slot(&self) -> Slot { - Slot::new(self.idx as usize, self.value) - } -} - -pub struct Page -where - B: Compact, -{ +pub struct Page { pub index: usize, pub size: usize, - pub data: Vec, + pub data: Vec, } -impl Page -where - B: Compact, -{ - pub fn new(index: usize, size: usize, data: Vec) -> Self { +impl Page { + pub fn new(index: usize, size: usize, data: Vec) -> Self { Self { index, size, data } } @@ -217,22 +186,16 @@ where } } -pub struct PagePtr<'a, B> -where - B: Compact + 'a, -{ +pub struct PagePtr<'a> { #[allow(dead_code)] mmap: Mmap, pub index: usize, pub size: usize, - pub data: &'a [B], + pub data: &'a [u32], } -impl<'a, B> PagePtr<'a, B> -where - B: Compact + 'a, -{ - pub fn new(mmap: Mmap, index: usize, size: usize, data: &'a [B]) -> Self { +impl<'a> PagePtr<'a> { + pub fn new(mmap: Mmap, index: usize, size: usize, data: &'a [u32]) -> Self { Self { mmap, index, @@ -245,10 +208,7 @@ where use std::fmt::{self, Debug}; #[derive(Clone, Copy)] -pub struct HashConfig -where - B: Compact, -{ +pub struct HashConfig { // value_mask = ((1 << value_bits) - 1); pub value_mask: usize, // 值的位数 @@ -261,29 +221,23 @@ where pub partition: usize, // 分块大小 pub hash_capacity: usize, - _phantom: PhantomData, } // 为HashConfig手动实现Debug trait -impl fmt::Debug for HashConfig -where - B: Compact, -{ +impl fmt::Debug for HashConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CompactHashTableConfig") .field("value_mask", &self.value_mask) .field("value_bits", &self.value_bits) .field("capacity", &self.capacity) .field("size", &self.size) + .field("hash_capacity", &self.hash_capacity) // 注意,我们没有包括_phantom字段 .finish() } } -impl HashConfig -where - B: Compact, -{ +impl HashConfig { // 使用常量替代硬编码的数字,增加代码可读性 const PARTITION_OFFSET: usize = 0; const HASH_SIZE_OFFSET: usize = 8; @@ -307,7 +261,6 @@ where size, partition, hash_capacity, - _phantom: PhantomData, } } @@ -367,9 +320,9 @@ where hash_key as usize % self.capacity } - pub fn slot(&self, hash_key: u64, taxid: B) -> Slot { + pub fn slot(&self, hash_key: u64, taxid: u32) -> Slot { let idx = self.index(hash_key); - Slot::::new(idx, B::hash_value(hash_key, self.value_bits, taxid)) + Slot::::new(idx, u32::hash_value(hash_key, self.value_bits, taxid)) } pub fn slot_u64(&self, hash_key: u64, seq_id: u64) -> Slot { @@ -378,38 +331,32 @@ where } } -pub trait K2Compact: std::marker::Sync + Send -where - B: Compact, -{ +pub trait K2Compact: std::marker::Sync + Send { fn get_idx_mask(&self) -> usize; fn get_idx_bits(&self) -> usize; fn get_value_mask(&self) -> usize; fn get_value_bits(&self) -> usize; - fn get_from_page(&self, idx: usize, value: u64) -> B; + fn get_from_page(&self, idx: usize, value: u64, next: bool) -> u32; } #[allow(unused)] -pub struct CHPage<'a, B> -where - B: Compact, -{ +pub struct CHPage<'a> { // 哈希表的容量 - pub config: HashConfig, - pub page: Page, - pub next_page: PagePtr<'a, B>, + pub config: HashConfig, + pub page: Page, + pub next_page: PagePtr<'a>, } -fn read_page_from_file, B: Compact>(filename: P) -> Result> { +fn read_page_from_file>(filename: P) -> Result { let file = OpenOptions::new().read(true).open(&filename)?; let mmap = unsafe { MmapOptions::new().populate().map(&file)? }; let index = LittleEndian::read_u64(&mmap[0..8]) as usize; let capacity = LittleEndian::read_u64(&mmap[8..16]) as usize; - let data = unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const B, capacity) }; + let data = unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const u32, capacity) }; // 初始化Vec,预分配足够容量 - let mut page_data: Vec = Vec::with_capacity(capacity); + let mut page_data: Vec = Vec::with_capacity(capacity); // 为Vec安全地设置长度 unsafe { page_data.set_len(capacity); @@ -424,49 +371,27 @@ fn read_page_from_file, B: Compact>(filename: P) -> Result, B: Compact>(filename: P) -> Result> { +fn read_pageptr_from_file<'a, P: AsRef>(filename: P) -> Result> { let file = OpenOptions::new().read(true).open(&filename)?; let mmap = unsafe { MmapOptions::new().populate().map(&file)? }; let index = LittleEndian::read_u64(&mmap[0..8]) as usize; let capacity = LittleEndian::read_u64(&mmap[8..16]) as usize; let page_data = - unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const B, capacity) }; + unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const u32, capacity) }; Ok(PagePtr::new(mmap, index, capacity, page_data)) } -impl<'a, B> CHPage<'a, B> -where - B: Compact + 'a, -{ +impl<'a> CHPage<'a> { pub fn from + Debug>( - config: HashConfig, + config: HashConfig, chunk_file1: P, chunk_file2: P, - ) -> Result> { - // let file2 = OpenOptions::new().read(true).open(&chunk_file2)?; - // let mmap = unsafe { MmapOptions::new().map(&file2)? }; - // let index2 = LittleEndian::read_u64(&mmap[0..8]) as usize; - // let capacity = LittleEndian::read_u64(&mmap[8..16]) as usize; - - // let next_page = - // unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const B, capacity) }; - + ) -> Result> { let page = read_page_from_file(chunk_file1)?; let next_page = read_pageptr_from_file(chunk_file2)?; - // let file1 = OpenOptions::new().read(true).open(&chunk_file1)?; - // let mmap1 = unsafe { MmapOptions::new().map(&file1)? }; - - // let index1 = LittleEndian::read_u64(&mmap1[0..8]) as usize; - // let capacity = LittleEndian::read_u64(&mmap1[8..16]) as usize; - - // let table = - // unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const B, capacity) }; - // let page_data = table.to_vec(); - // let page = Page::::new(index1, capacity, page_data); - let chtm = CHPage { config, next_page, @@ -475,13 +400,13 @@ where Ok(chtm) } - pub fn get_from_next_page(&self, index: usize, compacted_key: B) -> B { + pub fn get_from_next_page(&self, index: usize, compacted_key: u32) -> u32 { let value_mask = self.config.value_mask; let mut idx = index; loop { if let Some(cell) = self.next_page.data.get(idx) { - if cell.right(value_mask) == B::default() + if cell.right(value_mask) == u32::default() || cell.left(self.config.value_bits) == compacted_key { return cell.right(value_mask); @@ -493,17 +418,14 @@ where } } else { // 如果get(idx)失败,返回默认值 - return B::default(); + return u32::default(); } } - B::default() + u32::default() } } -impl<'a, B> K2Compact for CHPage<'a, B> -where - B: Compact, -{ +impl<'a> K2Compact for CHPage<'a> { fn get_idx_mask(&self) -> usize { let idx_bits = ((self.config.hash_capacity as f64).log2().ceil() as usize).max(1); (1 << idx_bits) - 1 @@ -521,362 +443,39 @@ where self.config.value_bits } - fn get_from_page(&self, indx: usize, value: u64) -> B { - let compacted_key = B::from_u32(value.left(self.config.value_bits) as u32); + fn get_from_page(&self, indx: usize, value: u64, next: bool) -> u32 { + let compacted_key = value.left(self.config.value_bits) as u32; let value_mask = self.config.value_mask; let mut idx = indx; let first_idx = idx; loop { if let Some(cell) = self.page.data.get(idx) { - if cell.right(value_mask) == B::default() + if cell.right(value_mask) == u32::default() || cell.left(self.config.value_bits) == compacted_key { return cell.right(value_mask); } - idx = idx + 1; - if idx >= self.page.size { - // 需要确定在table中的位置, page index 从0开始 - let index = idx % self.page.size; - return self.get_from_next_page(index, compacted_key); - } - if idx == first_idx { - break; - } - } else { - // 如果get(idx)失败,返回默认值 - return B::default(); - } - } - B::default() - } -} - -// #[allow(unused)] -// pub struct CHTable<'a, B> -// where -// B: Compact + 'a, -// { -// // memmap -// mmap: Mmap, -// // 哈希表的容量 -// pub config: HashConfig, -// pub table: &'a [B], -// pub page: Page, -// } - -// impl<'a, B> CHTable<'a, B> -// where -// B: Compact + 'a, -// { -// pub fn from>( -// filename: P, -// page_index: usize, -// page_size: usize, -// ) -> Result> { -// let file = OpenOptions::new().read(true).open(&filename)?; - -// let mmap = unsafe { MmapOptions::new().populate().map(&file)? }; -// let config = HashConfig::from_mmap(&mmap); -// let table = unsafe { -// std::slice::from_raw_parts(mmap.as_ptr().add(32) as *const B, config.capacity) -// }; - -// let start_index = page_index * page_size; -// let end_index = std::cmp::min((page_index + 1) * page_size, config.capacity); -// if start_index > config.capacity { -// return Err(Error::new(ErrorKind::Other, "out of capacity")); -// } - -// let page_data = table[start_index..end_index].to_vec(); -// let page = Page::::new(page_index, page_size, page_data); - -// let chtm = CHTable { -// config, -// table, -// mmap, -// page, -// }; -// Ok(chtm) -// } - -// pub fn get_none_counts(&self) -> usize { -// self.table -// .iter() -// .filter(|&&item| item == B::default()) -// .count() -// } - -// pub fn get_from_table(&self, index: usize, compacted_key: B) -> B { -// let value_mask = self.config.value_mask; -// let mut idx = index; -// let first_idx = idx; - -// loop { -// if let Some(cell) = self.table.get(idx) { -// if cell.right(value_mask) == B::default() -// || cell.left(self.config.value_bits) == compacted_key -// { -// return cell.right(value_mask); -// } - -// idx = (idx + 1) % self.config.capacity; -// if idx == first_idx { -// break; -// } -// } else { -// // 如果get(idx)失败,返回默认值 -// return B::default(); -// } -// } -// B::default() -// } - -// pub fn get(&self, hash_key: u64) -> B { -// let compacted_key = B::compacted(hash_key, self.config.value_bits); -// let value_mask = self.config.value_mask; -// let mut idx = self.config.index(hash_key); -// let first_idx = idx; - -// loop { -// if let Some(cell) = self.table.get(idx) { -// if cell.right(value_mask) == B::default() -// || cell.left(self.config.value_bits) == compacted_key -// { -// return cell.right(value_mask); -// } -// } else { -// // 如果get(idx)失败,返回默认值 -// return B::default(); -// } - -// idx = (idx + 1) % self.config.capacity; -// if idx == first_idx { -// break; -// } -// } -// B::default() -// } -// } - -// impl<'a, B> K2Compact for CHTable<'a, B> -// where -// B: Compact + 'a, -// { -// fn get_idx_mask(&self) -> usize { -// let idx_bits = ((self.config.hash_capacity as f64).log2().ceil() as usize).max(1); -// (1 << idx_bits) - 1 -// } - -// fn get_idx_bits(&self) -> usize { -// ((self.config.hash_capacity as f64).log2().ceil() as usize).max(1) -// } - -// fn get_value_mask(&self) -> usize { -// self.config.value_mask -// } - -// fn get_value_bits(&self) -> usize { -// self.config.value_bits -// } - -// fn get_from_page(&self, indx: usize, value: u64) -> B { -// let compacted_key = B::from_u32(value.left(self.config.value_bits) as u32); -// let value_mask = self.config.value_mask; -// let mut idx = indx; -// let first_idx = idx; - -// loop { -// if let Some(cell) = self.page.data.get(idx) { -// if cell.right(value_mask) == B::default() -// || cell.left(self.config.value_bits) == compacted_key -// { -// return cell.right(value_mask); -// } - -// idx = idx + 1; -// if idx >= self.page.size { -// // 需要确定在table中的位置, page index 从0开始 -// let index = self.page.size * self.page.index + idx; -// return self.get_from_table(index, compacted_key); -// } -// if idx == first_idx { -// break; -// } -// } else { -// // 如果get(idx)失败,返回默认值 -// return B::default(); -// } -// } -// B::default() -// } -// } - -#[allow(unused)] -pub struct CHTableMut<'a, B> -where - B: Compact + 'a, -{ - // memmap - mmap: MmapMut, - // 哈希表的容量 - pub config: HashConfig, - pub table: &'a mut [B], - pub page: Page, -} - -impl<'a, B> CHTableMut<'a, B> -where - B: Compact + 'a, -{ - pub fn new>( - hash_file: P, - config: HashConfig, - page_index: usize, - page_size: usize, - ) -> Result> { - let key_bits = 32 - config.value_bits; - - let file_len = 32 + std::mem::size_of::() * config.capacity; - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&hash_file)?; - file.set_len(file_len as u64)?; - - let mut mut_mmap = unsafe { MmapOptions::new().len(file_len).map_mut(&file)? }; - - mut_mmap[0..8].copy_from_slice(&config.capacity.to_le_bytes()); - // 不能直接写入size,有可能文件已经存在,原来的size被覆盖 - // mut_mmap[8..16].copy_from_slice(&config.size.to_le_bytes()); - mut_mmap[16..24].copy_from_slice(&key_bits.to_le_bytes()); - mut_mmap[24..32].copy_from_slice(&config.value_bits.to_le_bytes()); - - let table = unsafe { - std::slice::from_raw_parts_mut(mut_mmap.as_mut_ptr().add(32) as *mut B, config.capacity) - }; - let start_index = page_index * page_size; - let end_index = std::cmp::min((page_index + 1) * page_size, config.capacity); - if start_index > config.capacity { - return Err(Error::new(ErrorKind::Other, "out of capacity")); - } - - let page_data: Vec = table[start_index..end_index].to_vec(); - let page = Page::::new(page_index, page_size, page_data); - let chtm = Self { - mmap: mut_mmap, - config, - page, - table, - }; - Ok(chtm) - } -} - -impl<'a, B> CHTableMut<'a, B> -where - B: Compact + 'a, -{ - pub fn set_table_cell(&mut self, index: usize, value: B) -> Option> { - let mut idx = index; - let first_idx = idx; - let value_bits = self.config.value_bits; // 局部变量存储配置 - let value_mask = self.config.value_mask; - let capacity = self.config.capacity; - - loop { - if let Some(cell) = self.table.get_mut(idx) { - if cell.right(value_mask) == B::default() { - *cell = value; - break; - } - - if cell.left(value_bits) == value.left(value_bits) { - return Some(Slot::::new(idx, cell.clone())); - } - - idx = (idx + 1) % capacity; // 循环索引,避免超出范围 - if idx == first_idx { - break; - } - } else { - return None; - } - } - - None - } - - pub fn set_page_cell(&mut self, item: Slot) -> Option<(usize, Slot)> { - let mut idx = item.idx; - let first_idx = idx; - let value_bits = self.config.value_bits; // 局部变量存储配置 - let value_mask = self.config.value_mask; - - loop { - if let Some(cell) = self.page.data.get_mut(idx) { - if cell.right(value_mask) == B::default() { - *cell = item.value; - break; - } - - if cell.left(value_bits) == item.value.left(value_bits) { - return Some((0, Slot::::new(idx, cell.clone()))); - } - - idx = idx + 1; - if idx >= self.page.size { - // 需要确定在table中的位置 - let index = self.page.size * self.page.index + idx; - match self.set_table_cell(index, item.value) { - None => return None, - Some(s) => return Some((1, s)), + if next { + idx = idx + 1; + if idx >= self.page.size { + // 需要确定在table中的位置, page index 从0开始 + let index = idx % self.page.size; + return self.get_from_next_page(index, compacted_key); } + } else { + idx = (idx + 1) % self.page.size; } + if idx == first_idx { break; } } else { - return None; - } - } - - None - } - - pub fn copy_from_page(&mut self) { - self.table[self.page.start()..self.page.end(self.config.capacity)] - .copy_from_slice(&self.page.data); - } - - // 直接更新 - pub fn update_cell(&mut self, flag: &usize, item: Slot) { - // 需要确定在table中的位置 - match flag { - 0 => { - self.page.data[item.idx] = item.value; - } - _ => { - self.table[item.idx] = item.value; + // 如果get(idx)失败,返回默认值 + return u32::default(); } } - } - - pub fn update_size(&self, size: usize) { - let size_bytes = size.to_le_bytes(); // 假设使用小端序 - unsafe { - let size_ptr = self.mmap.as_ptr().add(8) as *mut u8; - std::ptr::copy_nonoverlapping(size_bytes.as_ptr(), size_ptr, size_bytes.len()); - } - self.mmap.flush().expect("Failed to flush mmap"); - } - - /// 在原始size的基础上再加上size - pub fn add_size(&mut self, size: usize) { - let old_size = LittleEndian::read_u64(&self.mmap[8..16]) as usize; - let new_size = old_size + size; - self.update_size(new_size); + u32::default() } } diff --git a/kr2r/src/db.rs b/kr2r/src/db.rs index d3387e9..c1ed7e9 100644 --- a/kr2r/src/db.rs +++ b/kr2r/src/db.rs @@ -1,5 +1,5 @@ // 使用时需要引用模块路径 -use crate::compact_hash::{CHTableMut, Cell, Compact, HashConfig, Slot}; +use crate::compact_hash::{Compact, HashConfig, Slot}; use crate::mmscanner::MinimizerScanner; use crate::taxonomy::{NCBITaxonomy, Taxonomy}; use crate::Meros; @@ -20,7 +20,7 @@ const BATCH_SIZE: usize = 81920; fn set_page_cell( taxonomy: &Taxonomy, page: &[AtomicU32], - item: Slot, + item: &Slot, page_size: usize, value_bits: usize, value_mask: usize, @@ -46,24 +46,28 @@ fn set_page_cell( }); if result.is_ok() || idx == first_idx { - // 成功更新或遍历一圈后回到起点 break; } - idx = (idx + 1) % page_size; // 移动到下一个索引 + idx = (idx + 1) % page_size; if idx == first_idx { - // 如果遍历完一整圈还没有找到插入点,可能需要处理溢出或者重新哈希等策略 break; } } } -fn write_u32_to_file(page: &Vec, file_path: &PathBuf) -> IOResult<()> { +fn write_hashtable_to_file( + page: &Vec, + file_path: &PathBuf, + page_index: u64, + capacity: u64, +) -> IOResult { // 打开文件用于写入 let file = File::create(file_path)?; let mut writer = BufWriter::new(file); let mut count = 0; - // 遍历 Vec 并写入每个 u32,使用小端字节序 + writer.write_u64::(page_index)?; + writer.write_u64::(capacity)?; for item in page { let value = item.load(Ordering::Relaxed); if value != 0 { @@ -72,65 +76,57 @@ fn write_u32_to_file(page: &Vec, file_path: &PathBuf) -> IOResult<()> writer.write_u32::(value)?; } - println!("count {:?}", count); writer.flush()?; // 确保所有内容都被写入文件 - Ok(()) + Ok(count) } -use memmap2::{Mmap, MmapMut, MmapOptions}; -use std::fs::OpenOptions; +pub fn write_config_to_file( + file_path: &PathBuf, + partition: u64, + hash_capacity: u64, + capacity: u64, + size: u64, + key_bits: u64, + value_bits: u64, +) -> IOResult<()> { + // 打开文件用于写入 + let file = File::create(file_path)?; + let mut writer = BufWriter::new(file); + writer.write_u64::(partition)?; + writer.write_u64::(hash_capacity)?; + writer.write_u64::(capacity)?; + writer.write_u64::(size)?; + writer.write_u64::(key_bits)?; + writer.write_u64::(value_bits)?; + writer.flush()?; + Ok(()) +} -pub fn process_k2file1( - config: HashConfig, +pub fn process_k2file( + config: HashConfig, + database: &PathBuf, chunk_file: &PathBuf, taxonomy: &Taxonomy, page_size: usize, page_index: usize, -) -> IOResult<()> { +) -> IOResult { let total_counter = AtomicUsize::new(0); - // let size_counter = AtomicUsize::new(0); let value_mask = config.value_mask; let value_bits = config.value_bits; - let start_index = page_index * page_size; - let end_index = std::cmp::min((page_index + 1) * page_size, config.capacity); + let start_index = (page_index - 1) * page_size; + let end_index = std::cmp::min(page_index * page_size, config.capacity); let capacity = end_index - start_index; - let page_file = &chunk_file - .parent() - .unwrap() - .join(format!("page_{}", page_index)); - let hash_file = &chunk_file - .parent() - .unwrap() - .join(format!("hash_{}", page_index)); - - let file_len = std::mem::size_of::() * config.capacity; - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&hash_file)?; - file.set_len(file_len as u64)?; - - let mut mut_mmap = unsafe { MmapOptions::new().len(file_len).map_mut(&file)? }; - let table = unsafe { - std::slice::from_raw_parts_mut(mut_mmap.as_mut_ptr() as *mut u32, config.capacity) - }; + let page_file = database.join(format!("hash_{}.k2d", page_index)); + let page: Vec = (0..capacity).map(|_| AtomicU32::new(0)).collect(); - // 使用 table 中的值更新 page 直到遇到第一个 0 - for (i, &value) in table.iter().enumerate() { - if value == 0 { - break; - } - page[i].store(value, Ordering::Relaxed); - } let file = File::open(&chunk_file)?; let mut reader = BufReader::new(file); - let cell_size = std::mem::size_of::(); + let cell_size = std::mem::size_of::>(); let batch_buffer_size = cell_size * BATCH_SIZE; let mut batch_buffer = vec![0u8; batch_buffer_size]; @@ -143,80 +139,79 @@ pub fn process_k2file1( let cells_in_batch = bytes_read / cell_size; let cells = unsafe { - std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Cell, cells_in_batch) + std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Slot, cells_in_batch) }; - cells.par_iter().for_each(|cell| { - let item = cell.as_slot(); + cells.par_iter().for_each(|item| { set_page_cell(taxonomy, &page, item, capacity, value_bits, value_mask); }); total_counter.fetch_add(cells.len(), Ordering::SeqCst); } - write_u32_to_file(&page, &page_file)?; - println!("total_counter {:?}", total_counter.load(Ordering::SeqCst)); - Ok(()) + let size_count = + write_hashtable_to_file(&page, &page_file, page_index as u64, capacity as u64)?; + Ok(size_count) } /// 处理k2格式的临时文件,构建数据库 -pub fn process_k2file>( - chunk_file: P, - chtm: &mut CHTableMut, - taxonomy: &Taxonomy, -) -> IOResult<()> { - let total_counter = AtomicUsize::new(0); - let size_counter = AtomicUsize::new(0); +// pub fn process_k2file>( +// chunk_file: P, +// chtm: &mut CHTableMut, +// taxonomy: &Taxonomy, +// ) -> IOResult<()> { +// let total_counter = AtomicUsize::new(0); +// let size_counter = AtomicUsize::new(0); - let value_mask = chtm.config.value_mask; - let value_bits = chtm.config.value_bits; +// let value_mask = chtm.config.value_mask; +// let value_bits = chtm.config.value_bits; - let file = File::open(chunk_file)?; - let mut reader = BufReader::new(file); +// let file = File::open(chunk_file)?; +// let mut reader = BufReader::new(file); - let cell_size = std::mem::size_of::(); - let batch_buffer_size = cell_size * BATCH_SIZE; - let mut batch_buffer = vec![0u8; batch_buffer_size]; +// let cell_size = std::mem::size_of::(); +// let batch_buffer_size = cell_size * BATCH_SIZE; +// let mut batch_buffer = vec![0u8; batch_buffer_size]; - while let Ok(bytes_read) = reader.read(&mut batch_buffer) { - if bytes_read == 0 { - break; - } // 文件末尾 +// while let Ok(bytes_read) = reader.read(&mut batch_buffer) { +// if bytes_read == 0 { +// break; +// } // 文件末尾 - // 处理读取的数据批次 - let cells_in_batch = bytes_read / cell_size; +// // 处理读取的数据批次 +// let cells_in_batch = bytes_read / cell_size; - let cells = unsafe { - std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Cell, cells_in_batch) - }; +// let cells = unsafe { +// std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Cell, cells_in_batch) +// }; - cells.iter().for_each(|cell| { - let item = cell.as_slot(); - let item_taxid: u32 = item.value.right(value_mask).to_u32(); +// cells.iter().for_each(|cell| { +// let item = cell.as_slot(); +// let item_taxid: u32 = item.value.right(value_mask).to_u32(); - if let Some((flag, mut slot)) = &chtm.set_page_cell(item) { - let slot_taxid = slot.value.right(value_mask).to_u32(); - let new_taxid = taxonomy.lca(item_taxid, slot_taxid); - if slot_taxid != new_taxid { - slot.update_right(u32::from_u32(new_taxid), value_bits); - chtm.update_cell(flag, slot); - } - } else { - size_counter.fetch_add(1, Ordering::SeqCst); - } - }); - total_counter.fetch_add(cells.len(), Ordering::SeqCst); - } +// if let Some((flag, mut slot)) = &chtm.set_page_cell(item) { +// let slot_taxid = slot.value.right(value_mask).to_u32(); +// let new_taxid = taxonomy.lca(item_taxid, slot_taxid); +// if slot_taxid != new_taxid { +// slot.update_right(u32::from_u32(new_taxid), value_bits); +// chtm.update_cell(flag, slot); +// } +// } else { +// size_counter.fetch_add(1, Ordering::SeqCst); +// } +// }); +// total_counter.fetch_add(cells.len(), Ordering::SeqCst); +// } - println!("total_counter {:?}", total_counter.load(Ordering::SeqCst)); - chtm.copy_from_page(); +// println!("total_counter {:?}", total_counter.load(Ordering::SeqCst)); +// chtm.copy_from_page(); - let size_count = size_counter.load(Ordering::SeqCst); - let total_count = total_counter.load(Ordering::SeqCst); - println!("size_count {:?}", size_count); - println!("total_count {:?}", total_count); - chtm.add_size(size_count); +// let size_count = size_counter.load(Ordering::SeqCst); +// let total_count = total_counter.load(Ordering::SeqCst); +// println!("size_count {:?}", size_count); +// println!("total_count {:?}", total_count); +// chtm.add_size(size_count); - Ok(()) -} +// Ok(()) +// } // /// 直接处理fna文件构建数据库 // pub fn process_fna>( @@ -331,12 +326,12 @@ pub fn get_bits_for_taxid( } /// 将fna文件转换成k2格式的临时文件 -pub fn convert_fna_to_k2_format, B: Compact>( +pub fn convert_fna_to_k2_format>( fna_file: P, meros: Meros, taxonomy: &Taxonomy, id_to_taxon_map: &HashMap, - hash_config: HashConfig, + hash_config: HashConfig, writers: &mut Vec>, chunk_size: usize, threads: u32, @@ -344,7 +339,7 @@ pub fn convert_fna_to_k2_format, B: Compact>( let reader = Reader::from_path(fna_file).unwrap(); let queue_len = (threads - 2) as usize; let value_bits = hash_config.value_bits; - let cell_size = std::mem::size_of::(); + let cell_size = std::mem::size_of::>(); read_parallel( reader, @@ -361,8 +356,7 @@ pub fn convert_fna_to_k2_format, B: Compact>( let index: usize = hash_config.index(hash_key); let idx = index % chunk_size; let partition_index = index / chunk_size; - let cell = - Cell::new(idx as u32, u32::hash_value(hash_key, value_bits, taxid)); + let cell = Slot::new(idx, u32::hash_value(hash_key, value_bits, taxid)); k2_cell_list.push((partition_index, cell)); } };