diff --git a/.gitignore b/.gitignore index 70fd1dad..31955a22 100644 --- a/.gitignore +++ b/.gitignore @@ -23,9 +23,6 @@ wfmash src/common/wflign/build build -# include directory -include - #Others *.cache *~ diff --git a/src/align/include/align_parameters.hpp b/src/align/include/align_parameters.hpp index ca2984f4..1b4cacd7 100644 --- a/src/align/include/align_parameters.hpp +++ b/src/align/include/align_parameters.hpp @@ -60,6 +60,7 @@ struct Parameters { bool emit_md_tag; //Output the MD tag bool sam_format; //Emit the output in SAM format (PAF default) bool no_seq_in_sam; //Do not fill the SEQ field in SAM format + bool multithread_fasta_input; //Multithreaded fasta input #ifdef WFA_PNG_TSV_TIMING // plotting diff --git a/src/align/include/computeAlignments.hpp b/src/align/include/computeAlignments.hpp index 704d4c61..0c4db679 100644 --- a/src/align/include/computeAlignments.hpp +++ b/src/align/include/computeAlignments.hpp @@ -370,7 +370,7 @@ void processor_manager(seq_atomic_queue_t& seq_queue, std::vector> thread_should_exit(max_processors); const size_t queue_capacity = seq_queue.capacity(); - const size_t low_threshold = queue_capacity * 0.2; + const size_t low_threshold = 1; const size_t high_threshold = queue_capacity * 0.8; auto spawn_processor = [&](size_t id) { @@ -383,14 +383,22 @@ void processor_manager(seq_atomic_queue_t& seq_queue, // Start with one processor spawn_processor(0); size_t current_processors = 1; + uint64_t exhausted = 0; while (!reader_done.load() || !line_queue.was_empty() || !seq_queue.was_empty()) { size_t queue_size = seq_queue.was_size(); - if (queue_size < low_threshold && current_processors < max_processors) { - spawn_processor(current_processors++); - } else if (queue_size > high_threshold && current_processors > 1) { - thread_should_exit[--current_processors].store(true); + if (param.multithread_fasta_input) { + if (queue_size < low_threshold && current_processors < max_processors) { + ++exhausted; + } else if (queue_size > high_threshold && current_processors > 1) { + thread_should_exit[--current_processors].store(true); + } + + if (exhausted > 20 && queue_size < low_threshold) { + spawn_processor(current_processors++); + exhausted = 0; + } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/src/common/wflign/src/wflign_patch.cpp b/src/common/wflign/src/wflign_patch.cpp index eb45f63f..eda9f95e 100644 --- a/src/common/wflign/src/wflign_patch.cpp +++ b/src/common/wflign/src/wflign_patch.cpp @@ -721,6 +721,68 @@ std::vector do_progressive_wfa_patch_alignment( return alignments; } +void erode_head(std::vector& unpatched, uint64_t& query_pos, uint64_t& target_pos, int erode_k) { + int match_count = 0; + auto it = unpatched.begin(); + uint64_t query_erased = 0, target_erased = 0; + + while (it != unpatched.end()) { + if (*it == 'M' || *it == 'X') { + match_count++; + if (match_count >= erode_k) { + break; + } + query_pos++; + target_pos++; + query_erased++; + target_erased++; + } else { + //match_count = 0; + if (*it == 'I') { + query_pos++; + query_erased++; + } + if (*it == 'D') { + target_pos++; + target_erased++; + } + } + it++; + } + + //std::cerr << "erode_head: eroded " << query_erased << " query and " << target_erased << " target" << std::endl; + // Erase the eroded part + unpatched.erase(unpatched.begin(), it); +} + +void erode_tail(std::vector& unpatched, uint64_t& query_end, uint64_t& target_end, int erode_k) { + int match_count = 0; + auto it = unpatched.rbegin(); + uint64_t q_offset = 0, t_offset = 0; + + while (it != unpatched.rend()) { + if (*it == 'M' || *it == 'X') { + match_count++; + if (match_count >= erode_k) { + break; + } + q_offset++; + t_offset++; + } else { + //match_count = 0; + if (*it == 'I') q_offset++; + if (*it == 'D') t_offset++; + } + it++; + } + + // Erase the eroded part + //std::cerr << "erode_tail: eroded " << q_offset << " query and " << t_offset << " target" << std::endl; + unpatched.erase(it.base(), unpatched.end()); + query_end -= q_offset; + target_end -= t_offset; +} + void write_merged_alignment( std::ostream &out, const std::vector &trace, @@ -735,11 +797,11 @@ void write_merged_alignment( const uint64_t& query_offset, const uint64_t& query_length, const bool& query_is_rev, - const char* target, + const char* _target, const std::string& target_name, const uint64_t& target_total_length, - const uint64_t& target_offset, - const uint64_t& target_length, + const uint64_t& _target_offset, + const uint64_t& _target_length, const float& min_identity, #ifdef WFA_PNG_TSV_TIMING const long& elapsed_time_wflambda_ms, @@ -765,8 +827,6 @@ void write_merged_alignment( int64_t target_pointer_shift = 0; - uint64_t target_length_mut = target_length; - // we need to get the start position in the query and target // then run through the whole alignment building up the cigar // finally emitting it @@ -787,11 +847,19 @@ void write_merged_alignment( uint64_t target_end = 0; //uint64_t total_score = 0; + char* target = (char*)_target; + uint64_t target_offset = _target_offset; + uint64_t target_length = _target_length; + // double mash_dist_sum = 0; uint64_t ok_alns = 0; auto start_time = std::chrono::steady_clock::now(); + //std::cerr << "target_offset: " << target_offset << " query_offset: " << query_offset << std::endl; + //std::cerr << "target_length: " << target_length << " query_length: " << query_length << std::endl; + //std::cerr << "target_total_length: " << target_total_length << " query_total_length: " << query_total_length << std::endl; + #ifdef WFLIGN_DEBUG std::cerr << "[wflign::wflign_affine_wavefront] processing traceback" << std::endl; @@ -861,20 +929,20 @@ void write_merged_alignment( }; auto patching = [&query, &query_name, &query_length, &query_start, - &query_offset, &target, &target_name, - &target_length_mut, &target_start, &target_offset, - &target_total_length, &target_end, - &target_pointer_shift, - &wflign_max_len_major, - &wflign_max_len_minor, - &distance_close_big_enough_indels, &min_wf_length, - &max_dist_threshold, &wf_aligner, - &multi_patch_alns, - &convex_penalties, - &chain_gap, &max_patching_score, &min_inversion_length, &erode_k + &query_offset, &query_end, &target, &target_name, + &target_length, &target_start, &target_offset, + &target_total_length, &target_end, + &target_pointer_shift, + &wflign_max_len_major, + &wflign_max_len_minor, + &distance_close_big_enough_indels, &min_wf_length, + &max_dist_threshold, &wf_aligner, + &multi_patch_alns, + &convex_penalties, + &chain_gap, &max_patching_score, &min_inversion_length, &erode_k #ifdef WFA_PNG_TSV_TIMING - ,&emit_patching_tsv, - &out_patching_tsv + ,&emit_patching_tsv, + &out_patching_tsv #endif ](std::vector &unpatched, std::vector &patched, @@ -885,252 +953,81 @@ void write_merged_alignment( auto q = unpatched.begin(); - uint64_t query_pos = query_start; - uint64_t target_pos = target_start; - uint64_t query_delta = 0; uint64_t target_delta = 0; bool got_alignment = false; - // Head patching - if (false) { - // how long a gap? - while (q != unpatched.end() && *q == 'I') { - ++query_delta; - ++q; - } - while (q != unpatched.end() && *q == 'D') { - ++target_delta; - ++q; - } - - if (query_delta > 0 && query_delta < wflign_max_len_minor) { - // Semi-global mode for patching the heads - - // nibble forward if we're below the correct length - // this gives a bit of context for the alignment - while (q != unpatched.end() && - (query_delta < min_wfa_head_tail_patch_length || target_delta < min_wfa_head_tail_patch_length)) { - const auto &c = *q++; - switch (c) { - case 'M': case 'X': - ++query_delta; ++target_delta; break; - case 'I': ++query_delta; break; - case 'D': ++target_delta; break; - default: break; - } - } - -// std::cerr << "A HEAD patching in" -// << query_name << " " -// << query_offset << "@ " << query_pos << -// " - " << query_delta -// << " --- " -// << target_name << " " << target_offset -// << " @ " << -// target_pos << " - " -// << target_delta -// << std::endl; - - // min_wfa_patch_length-bps of margins to manage insertions in the query - const uint64_t delta_to_ask = query_delta <= target_delta ? 0 : query_delta - target_delta; - - uint64_t target_delta_to_shift = 0; - uint64_t target_pos_x, target_start_x; - int64_t target_pointer_shift_x; - - // Note that target_pos >= target_start - if (target_pos >= delta_to_ask) { - // std::cerr << "A\n"; - // Easy, we don't have to manage 'negative' indexes for - // the target array - target_delta_to_shift = delta_to_ask; - - target_pos_x = target_pos - target_delta_to_shift; - target_start_x = target_pos_x < target_start - ? target_pos_x - : target_start; - - target_pointer_shift_x = target_pointer_shift; // + 0; - } else { - // std::cerr << "B\n"; - target_pos_x = 0; - target_start_x = 0; - - const uint64_t positions_to_get = - delta_to_ask - target_pos; - - // Manage negative indexes - if (target_offset - target_pointer_shift >= positions_to_get) { - // std::cerr << "B.1\n"; - target_pointer_shift_x = target_pointer_shift + - (int64_t)positions_to_get; - - target_delta_to_shift = - delta_to_ask; // we can get all the positions we - // need - } else { - // std::cerr << "B.2\n"; - // We can't get all the positions we need - - target_pointer_shift_x = (int64_t) - target_offset; //<=> target_pointer_shift + - //(int64_t)target_offset - - //target_pointer_shift; - - target_delta_to_shift = target_offset + target_pos - - target_pointer_shift; - } - } - - // std::cerr << "target_start " - // << target_start << std::endl; - // std::cerr << "target_start_x " - // << target_start_x << - // std::endl; std::cerr << - // "target_pos " << target_pos << - // std::endl; std::cerr << - // "target_pos_x " << - // target_pos_x << std::endl; - // std::cerr << "target_offset " - // << target_offset << std::endl; - // std::cerr << - // "target_pointer_shift_x " << - // target_pointer_shift_x << - // std::endl; std::cerr << - // "target_delta_to_shift " << - // target_delta_to_shift << - // std::endl; - - const uint64_t target_delta_x = - target_delta + target_delta_to_shift; - - if (target_delta_x > 0) { - // std::cerr << "B HEAD patching in " - // << query_name << " " - // << query_offset + query_pos << - // " - " << query_delta - // << " --- " - // << target_name << " " - // << target_offset + target_pos << " - " - // << target_delta_x - // << std::endl; - - std::string query_rev(query + query_pos, query_delta); - std::reverse(query_rev.begin(), query_rev.end()); - - std::string target_rev(target - target_pointer_shift_x + - target_pos_x, - target_delta_x); - std::reverse(target_rev.begin(), target_rev.end()); - -// std::cerr << "query: "; -// for (int i = 0; i < query_delta; ++i) { -// std::cerr << query_rev[i]; -// } -// std::cerr << "\ntarget: ";; -// for (int i = 0; i < target_delta_x; ++i) { -// std::cerr << target_rev[i]; -// } -// std::cerr << std::endl; - - wfa::WFAlignerGapAffine2Pieces* wf_aligner_heads = - new wfa::WFAlignerGapAffine2Pieces( - 0, - convex_penalties.mismatch, - convex_penalties.gap_opening1, - convex_penalties.gap_extension1, - convex_penalties.gap_opening2, - convex_penalties.gap_extension2, - wfa::WFAligner::Alignment, - wfa::WFAligner::MemoryMed); - wf_aligner_heads->setHeuristicWFmash(min_wf_length,max_dist_threshold); - const int status = wf_aligner_heads->alignEndsFree( - target_rev.c_str(),target_rev.size(),0,0, - query_rev.c_str(),query_rev.size(),0,query_rev.size()); - //std::cerr << "Head patching status " << status << "\n"; - if (status == WF_STATUS_ALG_COMPLETED || status == WF_STATUS_ALG_PARTIAL) { - //hack_cigar(wf_aligner_heads->cigar, query_rev.c_str(), target_rev.c_str(), query_rev.size(), target_rev.size(), 0, 0); + // trim back small matches at the start + erode_head(unpatched, query_start, target_start, 7); -#ifdef VALIDATE_WFA_WFLIGN - if (!validate_cigar(wf_aligner_heads->cigar, query_rev.c_str(), target_rev.c_str(), query_rev.size(), - target_rev.size(), 0, 0)) { - std::cerr << "cigar failure at head alignment " << std::endl; - unpack_display_cigar(wf_aligner_heads->cigar, query_rev.c_str(), target_rev.c_str(), query_rev.size(), - target_rev.size(), 0, 0); - std::cerr << ">query" << std::endl - << std::string(query_rev.c_str(), query_rev.size()) << std::endl; - std::cerr << ">target" << std::endl - << std::string(target_rev.c_str(), target_rev.size()) << std::endl; - assert(false); - } -#endif + uint64_t query_pos = query_start; + uint64_t target_pos = target_start; - got_alignment = true; - - target_pos = target_pos_x; - target_delta = target_delta_x; - target_pointer_shift = target_pointer_shift_x; - - target_start = target_start_x; - target_length_mut += target_delta_to_shift; - - char* cigar_ops; - int cigar_length; - wf_aligner_heads->getAlignment(&cigar_ops,&cigar_length); - - // Put the missing part of the CIGAR string first and then its aligned part - uint64_t missing_query_len = query_rev.size(); - uint64_t missing_target_len = target_rev.size(); - for(int xxx = cigar_length - 1; xxx >= 0; --xxx) { - // The CIGAR string can be incomplete - switch (cigar_ops[xxx]) { - case 'M': case 'X': - --missing_query_len; --missing_target_len; break; - case 'I': --missing_query_len; break; - case 'D': --missing_target_len; break; - default: break; - } - } - // Put the missing part of the CIGAR string - for (uint64_t i = 0; i < missing_query_len; ++i) { - patched.push_back('I'); - } - for (uint64_t i = 0; i < missing_target_len; ++i) { - patched.push_back('D'); - } + // Trim spurious matches off the tail of the alignment + erode_tail(unpatched, query_end, target_end, 7); - for(int xxx = cigar_length - 1; xxx >= 0; --xxx) { - //std::cerr << cigar_ops[xxx]; - patched.push_back(cigar_ops[xxx]); - } - //std::cerr << "\n"; - } - delete wf_aligner_heads; + // Head patching + if (query_start > 0 || target_start > 0) { + // Calculate how far we need to shift to cover the query, and how far we can safely shift + int64_t needed_shift = std::max(0L, static_cast(query_start) - static_cast(target_start)); + int64_t max_safe_shift = std::min( + static_cast(wflign_max_len_minor), + static_cast(target_offset) + ); + + // Take the minimum of what we need and what's safe + int64_t actual_shift = std::min(needed_shift, max_safe_shift); + + // Adjust the target pointer, offset, and length + target -= actual_shift; + target_offset -= actual_shift; + target_length += actual_shift; + target_end += actual_shift; // hmm + target_pos += actual_shift; + target_start += actual_shift; + + alignment_t head_aln; + alignment_t head_rev_aln; + do_wfa_patch_alignment( + query, + 0, + query_start, + target, + 0, // Start from the beginning of the adjusted target + target_start, + wf_aligner, + convex_penalties, + head_aln, + head_rev_aln, + chain_gap, + max_patching_score, + min_inversion_length); + + if (head_aln.ok) { + //std::cerr << "head_aln: " << head_aln.score << std::endl; + // Prepend the head alignment to the main alignment + for (int i = head_aln.edit_cigar.begin_offset; i < head_aln.edit_cigar.end_offset; ++i) { + //std::cerr << head_aln.edit_cigar.cigar_ops[i]; + patched.push_back(head_aln.edit_cigar.cigar_ops[i]); } - } - - // add in stuff if we didn't align - if (!got_alignment) { - for (uint64_t i = 0; i < query_delta; ++i) { + //std::cerr << std::endl; + } else { + // push back I and D to fill the gap + for (uint64_t i = 0; i < query_start; ++i) { patched.push_back('I'); } - for (uint64_t i = 0; i < target_delta; ++i) { + for (uint64_t i = 0; i < target_start; ++i) { patched.push_back('D'); } } - - query_pos += query_delta; - target_pos += target_delta; - - query_delta = 0; - target_delta = 0; + query_start = 0; + target_start = 0; } // Patching in the middle while (q != unpatched.end()) { - // get to the first match + // get to the first matchn while (q != unpatched.end() && (*q == 'M' || *q == 'X')) { /* std::cerr << "q: " << query[query_pos] << " " @@ -1138,7 +1035,7 @@ void write_merged_alignment( << std::endl; */ if (query_pos >= query_length || - target_pos >= target_length_mut) { + target_pos >= target_length) { std::cerr << "[wflign::wflign_affine_wavefront] " "corrupted traceback (out of bounds) for " << query_name << " " << query_offset << " " @@ -1450,174 +1347,58 @@ void write_merged_alignment( } } - // Tail patching - if (false) { - // Important: the last patch (in the middle of the traceback) - // can generate a tail check backward if there are other Is/Ds - // to merge in the current patch - while (!patched.empty() && - (patched.back() == 'I' || patched.back() == 'D') && - ((query_delta < wflign_max_len_major && - target_delta < wflign_max_len_major) && - (query_delta < wflign_max_len_minor || - target_delta < wflign_max_len_minor))) { - const auto &c = patched.back(); - if (c == 'I') { - ++query_delta; - --query_pos; - } else { - ++target_delta; - --target_pos; - } - patched.pop_back(); + // trim Is and Ds from the end of the alignment trace, adjusting the query_pos and target_pos as needed + while (!patched.empty() && (patched.back() == 'I' || patched.back() == 'D')) { + if (patched.back() == 'I') { + --query_pos; + } else { + --target_pos; } + patched.pop_back(); + } - got_alignment = false; - - if (query_delta > 0 && query_delta < wflign_max_len_minor) { - // nibble backward if we're below the correct length - // this gives a bit of context for the alignment - while (!patched.empty() && - (query_delta < min_wfa_head_tail_patch_length || target_delta < min_wfa_head_tail_patch_length)) { - const auto& c = patched.back(); - switch (c) { - case 'M': case 'X': - --query_pos; --target_pos; - ++query_delta; ++target_delta; break; - case 'I': ++query_delta; --query_pos; break; - case 'D': ++target_delta; --target_pos; break; - default: break; - } - patched.pop_back(); - } - -// std::cerr << "A TAIL patching in " -// << query_name << " " << -// query_offset << " @ " << -// query_pos << " - " << -// query_delta << " " -// << target_name << " " << -// target_offset << " @ " -// << target_pos - target_pointer_shift << " - " -// << target_delta -// << std::endl; - - // min_wfa_patch_length-bps of margins to manage insertions in the query - const uint64_t delta_to_ask = query_delta <= target_delta ? 0 : query_delta - target_delta; - - // there is a piece of query - auto target_delta_x = - target_delta + - ((target_offset - target_pointer_shift) + target_pos + - target_delta + delta_to_ask < - target_total_length - ? delta_to_ask - : target_total_length - - ((target_offset - target_pointer_shift) + - target_pos + target_delta)); - - if (target_delta_x > 0) { - // std::cerr << "B TAIL patching in " - // << query_name << " " << - // query_offset + query_pos << " - " << - // query_delta << " " - // << target_name << " " << - // target_offset + target_pos - target_pointer_shift << " - " - // << target_delta_x - // << std::endl; - - wfa::WFAlignerGapAffine2Pieces* wf_aligner_tails = - new wfa::WFAlignerGapAffine2Pieces( - 0, - convex_penalties.mismatch, - convex_penalties.gap_opening1, - convex_penalties.gap_extension1, - convex_penalties.gap_opening2, - convex_penalties.gap_extension2, - wfa::WFAligner::Alignment, - wfa::WFAligner::MemoryMed); - wf_aligner_tails->setHeuristicWFmash(min_wf_length,max_dist_threshold); - const int status = wf_aligner_tails->alignEndsFree( - target - target_pointer_shift + target_pos, target_delta_x,0,0, - query + query_pos, query_delta,0,query_delta); - //std::cerr << "Tail patching status " << status << "\n"; - if (status == WF_STATUS_ALG_COMPLETED || status == WF_STATUS_ALG_PARTIAL) { - //hack_cigar(wf_aligner_tails->cigar, query + query_pos, target - target_pointer_shift + target_pos, query_delta, target_delta_x, 0, 0); - -#ifdef VALIDATE_WFA_WFLIGN - if (!validate_cigar(wf_aligner_tails->cigar, query + query_pos, target - target_pointer_shift + target_pos, query_delta, - target_delta_x, 0, 0)) { - std::cerr << "cigar failure at head alignment " << std::endl; - unpack_display_cigar(wf_aligner_tails->cigar, query + query_pos, target - target_pointer_shift + target_pos, query_delta, - target_delta_x, 0, 0); - std::cerr << ">query" << std::endl - << std::string(query + query_pos, query_delta) << std::endl; - std::cerr << ">target" << std::endl - << std::string(target - target_pointer_shift + target_pos, target_delta_x) << std::endl; - assert(false); - } -#endif - - got_alignment = true; - - { - // if (target_pos + target_delta_x > - // target_length_mut) { - // target_end += (target_pos + target_delta_x - // - target_length_mut); target_length_mut = - // target_pos + target_delta_x; - //} - const uint32_t inc = - target_delta_x - target_delta; - target_end += inc; - target_length_mut += inc; - } - - target_delta = target_delta_x; - - char* cigar_ops; - int cigar_length; - wf_aligner_tails->getAlignment(&cigar_ops,&cigar_length); - uint64_t missing_query_len = query_delta; - uint64_t missing_target_len = target_delta; - for(int xxx = 0; xxx < cigar_length; ++xxx) { - //std::cerr << cigar_ops[xxx]; - patched.push_back(cigar_ops[xxx]); - - // The CIGAR string can be incomplete - switch (cigar_ops[xxx]) { - case 'M': case 'X': - --missing_query_len; --missing_target_len; break; - case 'I': --missing_query_len; break; - case 'D': --missing_target_len; break; - default: break; - } - } - //std::cerr << "\n"; - // Put the missing part of the CIGAR string - for (uint64_t i = 0; i < missing_query_len; ++i) { - patched.push_back('I'); - } - for (uint64_t i = 0; i < missing_target_len; ++i) { - patched.push_back('D'); - } - } - delete wf_aligner_tails; + // Tail patching + if (query_pos < query_length || target_pos < target_length) { + // Calculate how much additional target sequence we need + int64_t needed_extension = std::max(0L, static_cast(query_length - query_pos) - static_cast(target_length - target_pos)); + + // Calculate how much we can safely extend + int64_t max_safe_extension = std::min( + static_cast(wflign_max_len_minor), + static_cast(target_total_length - (target_offset + target_length)) + ); + + // Take the minimum of what we need and what's safe + int64_t actual_extension = std::min(needed_extension, max_safe_extension); + + alignment_t tail_aln; + alignment_t tail_rev_aln; + do_wfa_patch_alignment( + query, + query_pos, + query_length - query_pos, + target, + target_pos, + (target_length - target_pos) + actual_extension, + wf_aligner, + convex_penalties, + tail_aln, + tail_rev_aln, + chain_gap, + max_patching_score, + min_inversion_length); + + if (tail_aln.ok) { + // Append the tail alignment to the main alignment + for (int i = tail_aln.edit_cigar.begin_offset; i < tail_aln.edit_cigar.end_offset; ++i) { + patched.push_back(tail_aln.edit_cigar.cigar_ops[i]); } - } + query_pos = query_length; + target_pos = target_length; - if (!got_alignment) { - // add in our tail gap / softclip - for (uint64_t i = 0; i < query_delta; ++i) { - patched.push_back('I'); - } - for (uint64_t i = 0; i < target_delta; ++i) { - patched.push_back('D'); - } + // Adjust target_length if we used additional sequence + target_end += tail_aln.target_length; } - // not used - // query_pos += query_delta; - // target_pos += target_delta; } #ifdef WFLIGN_DEBUG @@ -1630,7 +1411,6 @@ void write_merged_alignment( #endif }; - std::vector pre_tracev; { std::vector erodev; { @@ -1694,7 +1474,7 @@ void write_merged_alignment( #ifdef VALIDATE_WFA_WFLIGN if (!validate_trace(rawv, query, target - target_pointer_shift, - query_length, target_length_mut, + query_length, target_length, query_start, target_start)) { std::cerr << "cigar failure in rawv (at end) " @@ -1747,7 +1527,7 @@ void write_merged_alignment( #ifdef VALIDATE_WFA_WFLIGN if (!validate_trace(erodev, query, target - target_pointer_shift, - query_length, target_length_mut, query_start, + query_length, target_length, query_start, target_start)) { std::cerr << "cigar failure in erodev " << "\t" << query_name << "\t" << query_total_length @@ -1787,13 +1567,13 @@ void write_merged_alignment( //std::cerr << "FIRST PATCH ROUND" << std::endl; // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n"; - patching(erodev, pre_tracev, 4096, 32, 512, false); + patching(erodev, tracev, 4096, 8, 512, true); #ifdef VALIDATE_WFA_WFLIGN - if (!validate_trace(pre_tracev, query, + if (!validate_trace(tracev, query, target - target_pointer_shift, query_length, - target_length_mut, query_start, target_start)) { - std::cerr << "cigar failure in pre_tracev " + target_length, query_start, target_start)) { + std::cerr << "cigar failure in tracev " << "\t" << query_name << "\t" << query_total_length << "\t" << query_offset + (query_is_rev @@ -1811,13 +1591,7 @@ void write_merged_alignment( } #endif - // normalize: sort so that I tracev_end // b.) subtract from target_end this count @@ -1890,7 +1663,6 @@ void write_merged_alignment( } trim_del_last = std::distance(tracev.rbegin(), last_non_del); target_end -= trim_del_last; - // target_length_mut -= trim_del_last; begin_offset = trim_del_first; end_offset = tracev.size() - trim_del_last; @@ -1899,7 +1671,7 @@ void write_merged_alignment( /* #ifdef VALIDATE_WFA_WFLIGN if (!validate_trace(tracev, query, target - target_pointer_shift, -query_length, target_length_mut, query_start, target_start)) { std::cerr << +query_length, target_length, query_start, target_start)) { std::cerr << "cigar failure at alignment (after head/tail del trimming) " << "\t" << query_name << "\t" << query_total_length diff --git a/src/common/wflign/src/wflign_patch.hpp b/src/common/wflign/src/wflign_patch.hpp index e7f6892a..80530a54 100644 --- a/src/common/wflign/src/wflign_patch.hpp +++ b/src/common/wflign/src/wflign_patch.hpp @@ -59,7 +59,8 @@ namespace wflign { alignment_t& rev_aln, const int64_t& chain_gap, const int& max_patching_score, - const uint64_t& min_inversion_length); + const uint64_t& min_inversion_length, + bool ends_free); void trim_alignment(alignment_t& aln); std::vector do_progressive_wfa_patch_alignment( const char* query, diff --git a/src/interface/parse_args.hpp b/src/interface/parse_args.hpp index 05139bab..763a99b1 100644 --- a/src/interface/parse_args.hpp +++ b/src/interface/parse_args.hpp @@ -86,7 +86,7 @@ void parse_args(int argc, args::Flag approx_mapping(mapping_opts, "approx-map", "skip base-level alignment, producing an approximate mapping in PAF", {'m',"approx-map"}); args::Flag no_split(mapping_opts, "no-split", "disable splitting of input sequences during mapping [default: enabled]", {'N',"no-split"}); args::ValueFlag chain_gap(mapping_opts, "N", "chain mappings closer than this distance in query and target, sets approximate maximum variant length detectable in alignment [default: 6*segment_length, up to 30k]", {'c', "chain-gap"}); - args::ValueFlag max_mapping_length(mapping_opts, "N", "maximum length of a single mapping before breaking [default: 1M]", {'P', "max-mapping-length"}); + args::ValueFlag max_mapping_length(mapping_opts, "N", "maximum length of a single mapping before breaking [default: inf]", {'P', "max-mapping-length"}); args::Flag drop_low_map_pct_identity(mapping_opts, "K", "drop mappings with estimated identity below --map-pct-id=%", {'K', "drop-low-map-id"}); args::ValueFlag overlap_threshold(mapping_opts, "F", "drop mappings overlapping more than fraction F with a higher scoring mapping [default: 0.5]", {'O', "overlap-threshold"}); args::Flag no_filter(mapping_opts, "MODE", "disable mapping filtering", {'f', "no-filter"}); @@ -439,7 +439,7 @@ void parse_args(int argc, } map_parameters.max_mapping_length = l; } else { - map_parameters.max_mapping_length = 1000000; // 1 Mbp default + map_parameters.max_mapping_length = std::numeric_limits::max(); } if (drop_low_map_pct_identity) { @@ -585,6 +585,10 @@ void parse_args(int argc, map_parameters.threads = 1; align_parameters.threads = 1; } + // disable multi-fasta processing due to the memory inefficiency of samtools faidx readers + // which require us to duplicate the in-memory indexes of large files for each thread + // if aligner exhaustion is a problem, we could enable this + align_parameters.multithread_fasta_input = false; // Compute optimal window size for sketching { diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 3b5acbdb..0d94d723 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -726,6 +726,9 @@ namespace skch // remove short chains that didn't exceed block length filterWeakMappings(unfilteredMappings, std::floor(param.block_length / param.segLength)); + + // now that filtering has happened, set back the individual mapping coordinates and block length + setBlockCoordsToMappingCoords(unfilteredMappings); } else { // set block coordinates setBlockCoordsToMappingCoords(unfilteredMappings); @@ -1558,8 +1561,8 @@ namespace skch int query_gap = curr.queryStartPos - prev.queryEndPos; int ref_gap = curr.refStartPos - prev.refEndPos; - // Check if both gaps are within the threshold - if (std::abs(query_gap) <= threshold && std::abs(ref_gap) <= threshold) { + // Check if both gaps are >0 and within the threshold + if (query_gap > 0 && ref_gap > 0 && query_gap <= threshold && ref_gap <= threshold) { // Calculate midpoints int query_mid = (prev.queryEndPos + curr.queryStartPos) / 2; int ref_mid = (prev.refEndPos + curr.refStartPos) / 2; @@ -1582,6 +1585,12 @@ namespace skch } } } + + double axis_weighted_euclidean_distance(int64_t dx, int64_t dy, double w = 0.5) { + double euclidean = std::sqrt(dx*dx + dy*dy); + double axis_factor = 1.0 - (2.0 * std::min(std::abs(dx), std::abs(dy))) / (std::abs(dx) + std::abs(dy)); + return euclidean * (1.0 + w * axis_factor); + } /** * @brief Merge fragment mappings by convolution of a 2D range over the alignment matrix @@ -1616,7 +1625,7 @@ namespace skch //Start the procedure to identify the chains for (auto it = readMappings.begin(); it != readMappings.end(); it++) { - std::vector> distances; + std::vector> distances; for (auto it2 = std::next(it); it2 != readMappings.end(); it2++) { //If this mapping is for the same segment, ignore if (it2->refSeqId == it->refSeqId && it2->queryStartPos == it->queryStartPos) { @@ -1628,29 +1637,27 @@ namespace skch } //If the next mapping is within range, check if it's in range and if (it2->strand == it->strand) { - int ref_dist = it2->refStartPos - it->refEndPos; - int query_dist = 0; + int64_t ref_dist = it2->refStartPos - it->refEndPos; + int64_t query_dist = std::numeric_limits::max(); auto dist = std::numeric_limits::max(); - auto score = std::numeric_limits::max(); + auto awed = std::numeric_limits::max(); if (it->strand == strnd::FWD && it->queryStartPos <= it2->queryStartPos) { query_dist = it2->queryStartPos - it->queryEndPos; dist = std::sqrt(std::pow(query_dist,2) + std::pow(ref_dist,2)); - score = std::pow(query_dist - ref_dist, 2); + awed = axis_weighted_euclidean_distance(query_dist, ref_dist, 0.9); } else if (it->strand != strnd::FWD && it->queryEndPos >= it2->queryEndPos) { query_dist = it->queryStartPos - it2->queryEndPos; dist = std::sqrt(std::pow(query_dist,2) + std::pow(ref_dist,2)); - score = std::pow(query_dist - ref_dist, 2); + awed = axis_weighted_euclidean_distance(query_dist, ref_dist, 0.9); } - int query_mapping_len = std::min((it->queryEndPos - it->queryStartPos), - (it2->queryEndPos - it2->queryStartPos)); - if (dist < max_dist) { - distances.push_back(std::make_pair(dist + score, it2->splitMappingId)); + if (awed < max_dist) { + distances.push_back(std::make_tuple(awed, dist, it2->splitMappingId)); } } } if (distances.size()) { std::sort(distances.begin(), distances.end()); - disjoint_sets.unite(it->splitMappingId, distances.front().second); + disjoint_sets.unite(it->splitMappingId, std::get<2>(distances.front())); } } @@ -1680,8 +1687,18 @@ namespace skch return std::tie(a.queryStartPos, a.refStartPos) < std::tie(b.queryStartPos, b.refStartPos); }); + // if we have an infinite max mappinng length, we should just emit the chain here + if (param.max_mapping_length == std::numeric_limits::max()) { + // Process any remaining fragment + processMappingFragment(it, it_end); + it = it_end; + continue; + } + // tweak start and end positions of consecutive mappings - adjustConsecutiveMappings(it, it_end, param.segLength); + // TODO: XXX double check that the consecutive mappings are not overlapping!!! + // extra: force global alignment by patching head and tail to mapping start and end coordinates + //adjustConsecutiveMappings(it, it_end, param.segLength); // First pass: Mark cuttable positions const int consecutive_mappings_window = 4; // Configurable parameter @@ -1699,13 +1716,15 @@ namespace skch if (current == it || current == std::prev(it_end)) { continue; } - if (current->queryStartPos - std::prev(current)->queryEndPos > param.segLength - || current->refStartPos - std::prev(current)->refEndPos > param.segLength) { + if (current->queryStartPos - std::prev(current)->queryEndPos > param.segLength / 5 + || current->refStartPos - std::prev(current)->refEndPos > param.segLength / 5) { is_cuttable[std::distance(it, current) - 1] = false; is_cuttable[std::distance(it, current)] = false; } } + adjustConsecutiveMappings(it, it_end, param.segLength); + auto fragment_start = it; auto current = it; offset_t accumulate_length = 0; @@ -1716,6 +1735,9 @@ namespace skch accumulate_length += current->queryEndPos - current->queryStartPos; if (accumulate_length >= param.max_mapping_length && is_cuttable[std::distance(it, current)]) { + if (current != fragment_start) { + adjustConsecutiveMappings(std::prev(current), current, param.chain_gap); + } processMappingFragment(fragment_start, current); fragment_start = current; accumulate_length = 0; @@ -1853,6 +1875,8 @@ namespace skch if (!param.mergeMappings) { outstrm << sep << "jc:f:" << float(e.conservedSketches) / e.sketchSize; + } else { + outstrm << sep << "chain:i:" << e.splitMappingId; } } else {