From 5a3a343642f19f4b696508849bc1b8c860c44508 Mon Sep 17 00:00:00 2001 From: luissian Date: Sun, 7 Apr 2024 22:52:26 +0200 Subject: [PATCH] Added extension sequences when start codon is not found because is trunk --- taranis/allele_calling.py | 190 +++++++++++++++++++++++++++++--------- taranis/utils.py | 35 ++++--- 2 files changed, 162 insertions(+), 63 deletions(-) diff --git a/taranis/allele_calling.py b/taranis/allele_calling.py index cb886c3..efe4979 100644 --- a/taranis/allele_calling.py +++ b/taranis/allele_calling.py @@ -56,7 +56,9 @@ def __init__( """ self.prediction_data = annotation # store prediction annotation self.sample_file = sample_file - self.sample_records = taranis.utils.read_fasta_file(self.sample_file, convert_to_dict=True) + self.sample_records = taranis.utils.read_fasta_file( + self.sample_file, convert_to_dict=True + ) self.schema = schema self.ref_alleles = reference_alleles self.threshold = threshold @@ -75,7 +77,11 @@ def __init__( self.increase_sequence = increase_sequence * 3 def assign_allele_type( - self, blast_results: list, allele_file: str, allele_name: str, ref_allele_seq: str + self, + blast_results: list, + allele_file: str, + allele_name: str, + ref_allele_seq: str, ) -> list: """Assign allele type to the allele @@ -90,10 +96,7 @@ def assign_allele_type( details """ - def add_sequences_(column_blast_res: list) -> bool: - pass - - def check_if_plot(column_blast_res: list) -> bool: + def _check_if_plot(column_blast_res: list) -> bool: """Check if allele is partial length Args: @@ -113,7 +116,7 @@ def check_if_plot(column_blast_res: list) -> bool: return True return False - def discard_low_threshold_results(blast_results: list) -> list: + def _discard_low_threshold_results(blast_results: list) -> list: """Discard blast results with lower threshold Args: @@ -131,16 +134,31 @@ def discard_low_threshold_results(blast_results: list) -> list: valid_blast_result.append(b_result) return valid_blast_result - def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list: + def _extend_sequence_for_finding_start_stop_codon( + split_blast_result: list, + prot_error_result: str, + predicted_prot_seq: str, + search_codon: str = "stop", + ) -> list: """Extend match sequence, according the (increase_sequence) for - trying find the stop codon. + trying find the stop or start codon. When parameter is set to + stop additional nucleotides are added to extend the chance to + find out the codon stop. + If parameter is set to start then additional nucleotide is added + on the start value to identify that is a valid start codon. If + true then additional nucletotides are added to find the stop codon. Args: split_blast_result (list): list having the informaction collected - by running blast + by running blast + prot_error_result (str): protein conversion result + predicted_prot_seq (str): predicted protein sequence + search_codon (str, optional): codon to be found. 2 values are + allowed start of stop. By default is stop. Returns: - list: updated information if stop codon is found + list: updated information if stop or start codon is found and the + updated protein sequence and protein conversion result if changed """ # collect data for checking PLOT data_for_plot = [""] * 10 @@ -151,7 +169,7 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list: # copy end position data_for_plot[9] = split_blast_result[10] # check if PLOT - if not check_if_plot(data_for_plot): + if not _check_if_plot(data_for_plot): # fetch the sequence until the last triplet is stop codon contig_seq = self.sample_records[split_blast_result[1]] start_seq = int(split_blast_result[9]) @@ -159,6 +177,16 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list: if stop_seq > start_seq: # sequence direction is forward direction = "forward" + if search_codon == "start": + if ( + contig_seq[start_seq - 2 : start_seq + 1] + in taranis.utils.START_CODON_FORWARD + ): + start_seq -= 2 + # continue to find the stop codon with the new start + else: + # start codon not found. Return the original blast result + return split_blast_result # adjust the sequence to be a triplet interval = (stop_seq - start_seq) // 3 * 3 new_stop_seq = start_seq + interval + self.increase_sequence @@ -169,11 +197,21 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list: if stop_seq > len(contig_seq): stop_seq = len(contig_seq) // 3 * 3 else: - stop_seq = new_stop_seq -1 + stop_seq = new_stop_seq - 1 c_sequence = contig_seq[start_seq:stop_seq] else: # sequence direction is reverse direction = "reverse" + if search_codon == "start": + if ( + contig_seq[start_seq - 2 : start_seq + 1] + in taranis.utils.START_CODON_REVERSE + ): + start_seq += 1 + # continue to find the stop codon with the new start + else: + # start codon not found. Return the original blast result + return split_blast_result # adjust the sequence to be a triplet interval = (start_seq - stop_seq) // 3 * 3 new_stop_seq = start_seq - interval - self.increase_sequence @@ -186,28 +224,42 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list: else: stop_seq = new_stop_seq # get the sequence in reverse - c_sequence = str(Seq(contig_seq[stop_seq:start_seq]).reverse_complement()) - new_prot_conv_result = taranis.utils.convert_to_protein(c_sequence, force_coding=False, check_additional_bases=False) + c_sequence = str( + Seq(contig_seq[stop_seq:start_seq]).reverse_complement() + ) + new_prot_conv_result = taranis.utils.convert_to_protein( + c_sequence, force_coding=False, check_additional_bases=False + ) # check if stop codon is found in protein sequence - # pdb.set_trace() - if "protein" in new_prot_conv_result and "*" in new_prot_conv_result["protein"]: - new_seq_length = new_prot_conv_result["protein"].index("*") * 3 + + if ( + "protein" in new_prot_conv_result + and "*" in new_prot_conv_result["protein"] + ): + # increase 3 nucleotides beecause index start at 0 + new_seq_length = new_prot_conv_result["protein"].index("*") * 3 + 3 match_sequence = c_sequence[:new_seq_length] split_blast_result[4] = str(new_seq_length) + split_blast_result[14] = match_sequence prot_error_result = "-" + predicted_prot_seq = new_prot_conv_result["protein"][ + 0 : new_seq_length // 3 + ] # update the start and stop position - if direction == "forward": - # pdb.set_trace() - split_blast_result[10] = str(int(split_blast_result[9]) + new_seq_length) + split_blast_result[10] = str( + int(split_blast_result[9]) + new_seq_length + ) else: - split_blast_result[9] = str(int(split_blast_result[10]) - new_seq_length) - # ignore the previous process if stop codon is not found - - # pdb.set_trace() - return split_blast_result + split_blast_result[9] = str( + int(split_blast_result[10]) - new_seq_length + ) + # ignore the previous process if stop codon is not found + return split_blast_result, prot_error_result, predicted_prot_seq - def get_blast_details(blast_result: str, allele_name: str, ref_allele_seq) -> list: + def _get_blast_details( + blast_result: str, allele_name: str, ref_allele_seq + ) -> list: """Collect blast details and modify the order of the columns Args: @@ -254,20 +306,44 @@ def get_blast_details(blast_result: str, allele_name: str, ref_allele_seq) -> li # remove the gaps in sequences match_sequence = split_blast_result[13].replace("-", "") # check if the sequence is coding - prot_conv_result = taranis.utils.convert_to_protein(match_sequence, force_coding=False, check_additional_bases=True) - prot_error_result = prot_conv_result["error"] if "error" in prot_conv_result else "-" - predicted_prot_seq = prot_conv_result["protein"] if "protein" in prot_conv_result else "-" + prot_conv_result = taranis.utils.convert_to_protein( + match_sequence, force_coding=False, check_additional_bases=True + ) + prot_error_result = ( + prot_conv_result["error"] if "error" in prot_conv_result else "-" + ) + predicted_prot_seq = ( + prot_conv_result["protein"] if "protein" in prot_conv_result else "-" + ) # remove if additional sequenced are added at the end of the stop codon - pdb.set_trace() + # pdb.set_trace() if "additional bases added after stop codon" in prot_error_result: new_seq_len = len(match_sequence) // 3 * 3 match_sequence = match_sequence[:new_seq_len] split_blast_result[4] = str(new_seq_len) # add more sequence to find the stop codon elif "Last sequence is not a stop codon" in prot_error_result: - split_blast_result = extend_sequence_for_finding_stop_codon(split_blast_result) + ( + split_blast_result, + prot_error_result, + predicted_prot_seq, + ) = _extend_sequence_for_finding_start_stop_codon( + split_blast_result, + prot_error_result, + predicted_prot_seq, + search_codon="stop", + ) elif "Sequence does not have a start codon" in prot_error_result: - pass + ( + split_blast_result, + prot_error_result, + predicted_prot_seq, + ) = _extend_sequence_for_finding_start_stop_codon( + split_blast_result, + prot_error_result, + predicted_prot_seq, + search_codon="start", + ) # get blast details blast_details = [ self.s_name, # sample name @@ -309,17 +385,19 @@ def find_match_allele_schema(allele_file: str, match_sequence: str) -> str: return grep_result[0].split("_")[1] return "" - valid_blast_results = discard_low_threshold_results(blast_results) + valid_blast_results = _discard_low_threshold_results(blast_results) match_allele_schema = "" if len(valid_blast_results) == 0: # no match results labelled as LNF. details data filled with empty data - return ["LNF", "-", ["-"]* 18] + return ["LNF", "-", ["-"] * 18] if len(valid_blast_results) > 1: # could be NIPHEM or NIPH b_split_data = [] match_allele_seq = [] for valid_blast_result in valid_blast_results: - multi_allele_data = get_blast_details(valid_blast_result, allele_name, ref_allele_seq) + multi_allele_data = _get_blast_details( + valid_blast_result, allele_name, ref_allele_seq + ) # get match allele sequence match_allele_seq.append(multi_allele_data[14]) b_split_data.append(multi_allele_data) @@ -339,33 +417,42 @@ def find_match_allele_schema(allele_file: str, match_sequence: str) -> str: for (idx,) in range(len(b_split_data)): b_split_data[idx][4] = classification + "_" + match_allele_schema else: - b_split_data = get_blast_details(valid_blast_results[0], allele_name, ref_allele_seq) + b_split_data = _get_blast_details( + valid_blast_results[0], allele_name, ref_allele_seq + ) # found the allele in schema with the match sequence in the contig match_allele_schema = find_match_allele_schema( - allele_file, b_split_data[14] + allele_file, b_split_data[14] ) # PLOT, TPR, ASM, ALM, INF, EXC are possible classifications if match_allele_schema != "": # exact match found labelled as EXC classification = "EXC" - elif check_if_plot(b_split_data): + elif _check_if_plot(b_split_data): # match allele is partial length labelled as PLOT classification = "PLOT" # check if protein length divided by the length of triplet matched # sequence is lower the the tpr limit - elif b_split_data[16] == "Multiple stop codons" and b_split_data[17].index("*") / (int(b_split_data[6]) / 3) < self.tpr_limit: + elif ( + b_split_data[16] == "Multiple stop codons" + and b_split_data[17].index("*") / (int(b_split_data[6]) / 3) + < self.tpr_limit + ): # labelled as TPR classification = "TPR" # check if match allele is shorter than reference allele elif int(b_split_data[6]) < int(b_split_data[5]): classification = "ASM" # check if match allele is longer than reference allele - elif int(b_split_data[6]) > int(b_split_data[5]): + elif ( + int(b_split_data[6]) > int(b_split_data[5]) + or b_split_data[16] == "Last sequence is not a stop codon" + ): classification = "ALM" else: # if sequence was not found after running grep labelled as INF classification = "INF" - + # assign an identification value to the new allele if match_allele_schema == "": match_allele_schema = str( @@ -434,7 +521,9 @@ def search_match_allele(self): result["allele_type"][allele_name], result["allele_match"][allele_name], result["allele_details"][allele_name], - ) = self.assign_allele_type(blast_result, allele_file, allele_name, r_seq) + ) = self.assign_allele_type( + blast_result, allele_file, allele_name, r_seq + ) else: # Sample does not have a reference allele to be matched # Keep LNF info @@ -453,6 +542,7 @@ def search_match_allele(self): if self.snp_request and result["allele_type"][allele_name] != "LNF": # run snp analysis print(allele_name) + # pdb.set_trace() result["snp_data"][allele_name] = taranis.utils.get_snp_information( ref_allele_seq, allele_seq, ref_allele_name ) @@ -494,7 +584,7 @@ def parallel_execution( snp_request, aligment_request, trp_limit, - increase_sequence + increase_sequence, ) sample_name = Path(sample_file).stem stderr.print(f"[green] Analyzing sample {sample_name}") @@ -512,7 +602,17 @@ def collect_data( def stats_graphics(stats_folder: str, summary_result: dict) -> None: stderr.print("Creating graphics") log.info("Creating graphics") - allele_types = ["NIPHEM", "NIPH", "EXC", "PLOT", "ASM", "ALM", "INF", "LNF", "TPR"] + allele_types = [ + "NIPHEM", + "NIPH", + "EXC", + "PLOT", + "ASM", + "ALM", + "INF", + "LNF", + "TPR", + ] # inizialize classification data classif_data = {} for allele_type in allele_types: diff --git a/taranis/utils.py b/taranis/utils.py index 43beb10..4e94f32 100644 --- a/taranis/utils.py +++ b/taranis/utils.py @@ -30,6 +30,7 @@ from Bio import BiopythonWarning import pdb + log = logging.getLogger(__name__) @@ -117,7 +118,10 @@ def check_additional_programs_installed(software_list: list) -> None: sys.exit(1) return -def convert_to_protein(sequence: str, force_coding: bool =False, check_additional_bases: bool = False) -> dict: + +def convert_to_protein( + sequence: str, force_coding: bool = False, check_additional_bases: bool = False +) -> dict: """Check if the input sequence is a coding protein. Args: @@ -134,14 +138,14 @@ def convert_to_protein(sequence: str, force_coding: bool =False, check_additiona return {"error": "Sequence does not have a start codon"} if len(sequence) % 3 != 0: if not check_additional_bases: - return {"error" : "Sequence is not a multiple of three"} + return {"error": "Sequence is not a multiple of three"} # Remove the last or second to last bases to check if there is a stop codon new_seq_len = len(sequence) // 3 * 3 sequence = sequence[:new_seq_len] # this error will be overwritten if another error is found conv_result["error"] = "additional bases added after stop codon" - seq_sequence = Seq(sequence) + seq_sequence = Seq(sequence) try: seq_prot = seq_sequence.translate(table=1, cds=force_coding) except Bio.Data.CodonTable.TranslationError as e: @@ -153,14 +157,13 @@ def convert_to_protein(sequence: str, force_coding: bool =False, check_additiona if not force_coding: first_stop = seq_prot.find("*") if first_stop != last_stop: - return {"error": "Multiple stop codons","protein": str(seq_prot)} + return {"error": "Multiple stop codons", "protein": str(seq_prot)} if last_stop != len(seq_prot) - 1: return {"error": "Last sequence is not a stop codon", "protein": str(seq_prot)} if "error" in conv_result: conv_result["protein"] = str(seq_prot) return conv_result - return {"protein": str(seq_prot)} - + return {"protein": str(seq_prot)} def create_annotation_files( @@ -425,22 +428,15 @@ def get_snp_information( dict: key: ref_sequence, value: list of snp information """ # Supress warning that len of alt sequence not a multiple of three - warnings.simplefilter('ignore', BiopythonWarning) + warnings.simplefilter("ignore", BiopythonWarning) snp_info = {} ref_protein = str(Seq(ref_sequence).translate()) - try: - alt_protein = str(Seq(alt_sequence).translate()) - except Exception as e: - import pdb; pdb.set_trace() - - if len(alt_sequence) %3 != 0: - import pdb; pdb.set_trace() + alt_protein = str(Seq(alt_sequence).translate()) snp_line = [] # get the shortest sequence for the loop length_for_snp = min(len(ref_sequence), len(alt_sequence)) for idx in range(length_for_snp): - if ref_sequence[idx] != alt_sequence[idx]: # calculate the triplet index triplet_idx = idx // 3 @@ -449,7 +445,10 @@ def get_snp_information( alt_triplet = alt_sequence[triplet_idx * 3 : triplet_idx * 3 + 3] # get amino acid change ref_aa = ref_protein[triplet_idx] - alt_aa = alt_protein[triplet_idx] + try: + alt_aa = alt_protein[triplet_idx] + except: + alt_aa = "-" # get amino acid category ref_category = map_amino_acid_to_annotation(ref_sequence[triplet_idx]) alt_category = map_amino_acid_to_annotation(alt_sequence[triplet_idx]) @@ -679,8 +678,8 @@ def read_fasta_file(fasta_file: str, convert_to_dict=False) -> dict | str: if convert_to_dict: with open(fasta_file, "r") as fh: for record in SeqIO.parse(fh, "fasta"): - conv_fasta[record.id] = str(record.seq) - return conv_fasta + conv_fasta[record.id] = str(record.seq) + return conv_fasta return SeqIO.parse(fasta_file, "fasta")