1
1
use clap:: Parser ;
2
- use dashmap:: DashMap ;
3
- use kr2r:: compact_hash:: { Compact , HashConfig } ;
2
+ use dashmap:: { DashMap , DashSet } ;
3
+ use kr2r:: compact_hash:: { Compact , HashConfig , Row } ;
4
4
use kr2r:: iclassify:: { resolve_tree, trim_pair_info} ;
5
5
use kr2r:: readcounts:: { TaxonCounters , TaxonCountersDash } ;
6
6
use kr2r:: report:: report_kraken_style;
7
7
use kr2r:: taxonomy:: Taxonomy ;
8
8
use kr2r:: utils:: find_and_sort_files;
9
9
use rayon:: prelude:: * ;
10
- use std:: collections:: { HashMap , HashSet } ;
10
+ use std:: collections:: HashMap ;
11
11
use std:: fs:: File ;
12
12
use std:: io:: { self , BufRead , BufReader , BufWriter , Read , Result , Write } ;
13
13
use std:: path:: { Path , PathBuf } ;
@@ -16,57 +16,130 @@ use std::sync::Mutex;
16
16
17
17
const BATCH_SIZE : usize = 8 * 1024 * 1024 ;
18
18
19
- pub fn read_id_to_seq_map < P : AsRef < Path > > ( filename : P ) -> Result < DashMap < u32 , ( String , usize ) > > {
19
+ pub fn read_id_to_seq_map < P : AsRef < Path > > (
20
+ filename : P ,
21
+ ) -> Result < DashMap < u32 , ( String , String , u32 , Option < u32 > ) > > {
20
22
let file = File :: open ( filename) ?;
21
23
let reader = BufReader :: new ( file) ;
22
24
let id_map = DashMap :: new ( ) ;
23
25
24
26
reader. lines ( ) . par_bridge ( ) . for_each ( |line| {
25
27
let line = line. expect ( "Could not read line" ) ;
26
28
let parts: Vec < & str > = line. trim ( ) . split_whitespace ( ) . collect ( ) ;
27
- if parts. len ( ) >= 3 {
29
+ if parts. len ( ) >= 4 {
28
30
// 解析序号为u32类型的键
29
31
if let Ok ( id) = parts[ 0 ] . parse :: < u32 > ( ) {
30
32
// 第二列是序列标识符,直接作为字符串
31
33
let seq_id = parts[ 1 ] . to_string ( ) ;
32
- if let Ok ( count) = parts[ 2 ] . parse :: < usize > ( ) {
33
- // 插入到DashMap中
34
- id_map. insert ( id, ( seq_id, count) ) ;
35
- }
34
+ let seq_size = parts[ 2 ] . to_string ( ) ;
35
+ let count_parts: Vec < & str > = parts[ 3 ] . split ( '|' ) . collect ( ) ;
36
+ let kmer_count1 = count_parts[ 0 ] . parse :: < u32 > ( ) . unwrap ( ) ;
37
+ let kmer_count2 = count_parts[ 1 ] . parse :: < u32 > ( ) . map_or ( None , |i| Some ( i) ) ;
38
+ id_map. insert ( id, ( seq_id, seq_size, kmer_count1, kmer_count2) ) ;
36
39
}
37
40
}
38
41
} ) ;
39
42
40
43
Ok ( id_map)
41
44
}
42
45
46
+ fn generate_hit_string (
47
+ count : u32 ,
48
+ rows : & Vec < Row > ,
49
+ taxonomy : & Taxonomy ,
50
+ value_mask : usize ,
51
+ offset : u32 ,
52
+ ) -> String {
53
+ let mut result = String :: new ( ) ;
54
+ let mut last_pos = 0 ;
55
+ let mut has_key = false ; // 标记是否处理了特定位置
56
+
57
+ for row in rows {
58
+ let value = row. value ;
59
+ let key = value. right ( value_mask) ;
60
+ let ext_code = taxonomy. nodes [ key as usize ] . external_id ;
61
+
62
+ // 忽略不在当前段的位置
63
+ if row. kmer_id < offset || row. kmer_id >= offset + count {
64
+ continue ;
65
+ }
66
+ // 调整位置为相对于当前段的起始
67
+ let adjusted_pos = row. kmer_id - offset;
68
+ // 填充前导0
69
+ if adjusted_pos > last_pos {
70
+ if has_key {
71
+ result. push_str ( & format ! ( "0:{} " , adjusted_pos - last_pos - 1 ) ) ;
72
+ } else {
73
+ result. push_str ( & format ! ( "0:{} " , adjusted_pos) ) ;
74
+ }
75
+ }
76
+ // 添加当前键的计数
77
+ result. push_str ( & format ! ( "{}:1 " , ext_code) ) ;
78
+ last_pos = adjusted_pos;
79
+ has_key = true ;
80
+ }
81
+
82
+ // 填充尾随0
83
+ if last_pos < count - 1 {
84
+ if has_key {
85
+ result. push_str ( & format ! ( "0:{} " , count - last_pos - 1 ) ) ;
86
+ } else {
87
+ result. push_str ( & format ! ( "0:{} " , count) ) ;
88
+ }
89
+ }
90
+
91
+ result. trim_end ( ) . to_string ( )
92
+ }
93
+
94
+ pub fn add_hitlist_string (
95
+ rows : & Vec < Row > ,
96
+ value_mask : usize ,
97
+ kmer_count1 : u32 ,
98
+ kmer_count2 : Option < u32 > ,
99
+ taxonomy : & Taxonomy ,
100
+ ) -> String {
101
+ let result1 = generate_hit_string ( kmer_count1, & rows, taxonomy, value_mask, 0 ) ;
102
+ if let Some ( count) = kmer_count2 {
103
+ let result2 = generate_hit_string ( count, & rows, taxonomy, value_mask, kmer_count1) ;
104
+ format ! ( "{} |:| {}" , result1, result2)
105
+ } else {
106
+ format ! ( "{}" , result1)
107
+ }
108
+ }
109
+
43
110
pub fn count_values (
44
- vec : Vec < u32 > ,
111
+ rows : & Vec < Row > ,
45
112
value_mask : usize ,
113
+ kmer_count1 : u32 ,
46
114
) -> ( HashMap < u32 , u64 > , TaxonCountersDash , usize ) {
47
115
let mut counts = HashMap :: new ( ) ;
48
116
49
- let mut unique_elements = HashSet :: new ( ) ;
117
+ let mut hit_count : usize = 0 ;
50
118
119
+ let mut last_row: Row = Row :: new ( 0 , 0 , 0 ) ;
51
120
let cur_taxon_counts = TaxonCountersDash :: new ( ) ;
52
121
53
- for value in vec {
54
- // 使用entry API处理计数
55
- // entry返回的是一个Entry枚举,它代表了可能存在也可能不存在的值
56
- // or_insert方法在键不存在时插入默认值(在这里是0)
57
- // 然后无论哪种情况,我们都对计数器加1
122
+ for row in rows {
123
+ let value = row. value ;
58
124
let key = value. right ( value_mask) ;
59
125
* counts. entry ( key) . or_insert ( 0 ) += 1 ;
60
- if !unique_elements. contains ( & value) {
126
+
127
+ // 如果切换到第2条seq,就重新计算
128
+ if last_row. kmer_id < kmer_count1 && row. kmer_id > kmer_count1 {
129
+ last_row = Row :: new ( 0 , 0 , 0 ) ;
130
+ }
131
+ if !( last_row. value == value && row. kmer_id - last_row. kmer_id == 1 ) {
61
132
cur_taxon_counts
62
133
. entry ( key as u64 )
63
134
. or_default ( )
64
135
. add_kmer ( value as u64 ) ;
136
+ hit_count += 1 ;
65
137
}
66
- unique_elements. insert ( value) ;
138
+
139
+ last_row = * row;
67
140
}
68
141
69
- ( counts, cur_taxon_counts, unique_elements . len ( ) )
142
+ ( counts, cur_taxon_counts, hit_count )
70
143
}
71
144
72
145
#[ derive( Parser , Debug , Clone ) ]
@@ -84,13 +157,8 @@ pub struct Args {
84
157
#[ clap( long, value_parser, required = true ) ]
85
158
pub chunk_dir : PathBuf ,
86
159
87
- // /// The file path for the Kraken 2 index.
88
- // #[clap(short = 'H', long = "index-filename", value_parser, required = true)]
89
- // index_filename: PathBuf,
90
-
91
- // /// The file path for the Kraken 2 taxonomy.
92
- // #[clap(short = 't', long = "taxonomy-filename", value_parser, required = true)]
93
- // taxonomy_filename: String,
160
+ #[ clap( long, value_parser, default_value_t = false ) ]
161
+ pub full_output : bool ,
94
162
/// Confidence score threshold, default is 0.0.
95
163
#[ clap(
96
164
short = 'T' ,
@@ -126,20 +194,21 @@ pub struct Args {
126
194
pub kraken_output_dir : Option < PathBuf > ,
127
195
}
128
196
129
- fn process_batch < P : AsRef < Path > , B : Compact > (
197
+ fn process_batch < P : AsRef < Path > > (
130
198
sample_file : P ,
131
199
args : & Args ,
132
200
taxonomy : & Taxonomy ,
133
- id_map : DashMap < u32 , ( String , usize ) > ,
134
- writer : Box < dyn Write + Send > ,
201
+ id_map : & DashMap < u32 , ( String , String , u32 , Option < u32 > ) > ,
202
+ writer : & Mutex < Box < dyn Write + Send > > ,
135
203
value_mask : usize ,
136
- ) -> Result < ( TaxonCountersDash , usize ) > {
204
+ ) -> Result < ( TaxonCountersDash , usize , DashSet < u32 > ) > {
137
205
let file = File :: open ( sample_file) ?;
138
206
let mut reader = BufReader :: new ( file) ;
139
- let size = std:: mem:: size_of :: < B > ( ) ;
207
+ let size = std:: mem:: size_of :: < Row > ( ) ;
140
208
let mut batch_buffer = vec ! [ 0u8 ; size * BATCH_SIZE ] ;
141
209
142
210
let hit_counts = DashMap :: new ( ) ;
211
+ let hit_seq_id_set = DashSet :: new ( ) ;
143
212
let confidence_threshold = args. confidence_threshold ;
144
213
let minimum_hit_groups = args. minimum_hit_groups ;
145
214
@@ -150,31 +219,36 @@ fn process_batch<P: AsRef<Path>, B: Compact>(
150
219
151
220
// 处理读取的数据批次
152
221
let slots_in_batch = bytes_read / size;
153
-
154
222
let slots = unsafe {
155
- std:: slice:: from_raw_parts ( batch_buffer. as_ptr ( ) as * const B , slots_in_batch)
223
+ std:: slice:: from_raw_parts ( batch_buffer. as_ptr ( ) as * const Row , slots_in_batch)
156
224
} ;
157
225
158
226
slots. into_par_iter ( ) . for_each ( |item| {
159
- let cell = item. left ( 0 ) . to_u32 ( ) ;
160
- let seq_id = item. right ( 0 ) . to_u32 ( ) ;
161
- hit_counts. entry ( seq_id) . or_insert_with ( Vec :: new) . push ( cell)
227
+ let seq_id = item. seq_id ;
228
+ hit_seq_id_set. insert ( seq_id) ;
229
+ hit_counts
230
+ . entry ( seq_id)
231
+ . or_insert_with ( Vec :: new)
232
+ . push ( * item)
162
233
} ) ;
163
234
}
164
235
165
- let writer = Mutex :: new ( writer) ;
236
+ // let writer = Mutex::new(writer);
166
237
let classify_counter = AtomicUsize :: new ( 0 ) ;
167
238
let cur_taxon_counts = TaxonCountersDash :: new ( ) ;
168
239
169
- hit_counts. into_par_iter ( ) . for_each ( |( k, cells ) | {
240
+ hit_counts. into_par_iter ( ) . for_each ( |( k, mut rows ) | {
170
241
if let Some ( item) = id_map. get ( & k) {
171
- let total_kmers: usize = item. 1 ;
242
+ rows. sort_unstable ( ) ;
243
+ let total_kmers: usize = item. 2 as usize + item. 3 . unwrap_or ( 0 ) as usize ;
172
244
let dna_id = trim_pair_info ( & item. 0 ) ;
173
- let ( counts, cur_counts, hit_groups) = count_values ( cells, value_mask) ;
245
+ let ( counts, cur_counts, hit_groups) = count_values ( & rows, value_mask, item. 2 ) ;
246
+ let hit_string = add_hitlist_string ( & rows, value_mask, item. 2 , item. 3 , taxonomy) ;
174
247
let mut call = resolve_tree ( & counts, taxonomy, total_kmers, confidence_threshold) ;
175
248
if call > 0 && hit_groups < minimum_hit_groups {
176
249
call = 0 ;
177
250
} ;
251
+
178
252
cur_counts. iter ( ) . for_each ( |entry| {
179
253
cur_taxon_counts
180
254
. entry ( * entry. key ( ) )
@@ -184,20 +258,31 @@ fn process_batch<P: AsRef<Path>, B: Compact>(
184
258
} ) ;
185
259
186
260
let ext_call = taxonomy. nodes [ call as usize ] . external_id ;
187
- if call > 0 {
188
- let output_line = format ! ( "C\t {}\t {}\n " , dna_id, ext_call) ;
189
- // 使用锁来同步写入
190
- let mut file = writer. lock ( ) . unwrap ( ) ;
191
- file. write_all ( output_line. as_bytes ( ) ) . unwrap ( ) ;
261
+ let clasify = if call > 0 {
192
262
classify_counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
193
263
cur_taxon_counts
194
264
. entry ( call as u64 )
195
265
. or_default ( )
196
266
. increment_read_count ( ) ;
197
- }
267
+
268
+ "C"
269
+ } else {
270
+ "U"
271
+ } ;
272
+ // 使用锁来同步写入
273
+ let output_line = format ! (
274
+ "{}\t {}\t {}\t {}\t {}\n " ,
275
+ clasify, dna_id, ext_call, item. 1 , hit_string
276
+ ) ;
277
+ let mut file = writer. lock ( ) . unwrap ( ) ;
278
+ file. write_all ( output_line. as_bytes ( ) ) . unwrap ( ) ;
198
279
}
199
280
} ) ;
200
- Ok ( ( cur_taxon_counts, classify_counter. load ( Ordering :: SeqCst ) ) )
281
+ Ok ( (
282
+ cur_taxon_counts,
283
+ classify_counter. load ( Ordering :: SeqCst ) ,
284
+ hit_seq_id_set,
285
+ ) )
201
286
}
202
287
203
288
pub fn run ( args : Args ) -> Result < ( ) > {
@@ -228,14 +313,29 @@ pub fn run(args: Args) -> Result<()> {
228
313
}
229
314
None => Box :: new ( io:: stdout ( ) ) as Box < dyn Write + Send > ,
230
315
} ;
231
- let ( thread_taxon_counts, thread_classified) = process_batch :: < & PathBuf , u64 > (
316
+ let writer = Mutex :: new ( writer) ;
317
+ let ( thread_taxon_counts, thread_classified, hit_seq_set) = process_batch :: < & PathBuf > (
232
318
sample_file,
233
319
& args,
234
320
& taxo,
235
- sample_id_map,
236
- writer,
321
+ & sample_id_map,
322
+ & writer,
237
323
value_mask,
238
324
) ?;
325
+
326
+ if args. full_output {
327
+ sample_id_map
328
+ . iter ( )
329
+ . filter ( |item| !hit_seq_set. contains ( item. key ( ) ) )
330
+ . for_each ( |item| {
331
+ let dna_id = trim_pair_info ( & item. 0 ) ;
332
+ let hit_string = add_hitlist_string ( & vec ! [ ] , value_mask, item. 2 , item. 3 , & taxo) ;
333
+ let output_line = format ! ( "U\t {}\t 0\t {}\t {}\n " , dna_id, item. 1 , hit_string) ;
334
+ let mut file = writer. lock ( ) . unwrap ( ) ;
335
+ file. write_all ( output_line. as_bytes ( ) ) . unwrap ( ) ;
336
+ } ) ;
337
+ }
338
+
239
339
thread_taxon_counts. iter ( ) . for_each ( |entry| {
240
340
total_taxon_counts
241
341
. entry ( * entry. key ( ) )
0 commit comments