Skip to content

Commit

Permalink
Added extension sequences when start codon is not found because is trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
luissian committed Apr 7, 2024
1 parent 5629240 commit 5a3a343
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 63 deletions.
190 changes: 145 additions & 45 deletions taranis/allele_calling.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def __init__(
"""
self.prediction_data = annotation # store prediction annotation
self.sample_file = sample_file
self.sample_records = taranis.utils.read_fasta_file(self.sample_file, convert_to_dict=True)
self.sample_records = taranis.utils.read_fasta_file(
self.sample_file, convert_to_dict=True
)
self.schema = schema
self.ref_alleles = reference_alleles
self.threshold = threshold
Expand All @@ -75,7 +77,11 @@ def __init__(
self.increase_sequence = increase_sequence * 3

def assign_allele_type(
self, blast_results: list, allele_file: str, allele_name: str, ref_allele_seq: str
self,
blast_results: list,
allele_file: str,
allele_name: str,
ref_allele_seq: str,
) -> list:
"""Assign allele type to the allele
Expand All @@ -90,10 +96,7 @@ def assign_allele_type(
details
"""

def add_sequences_(column_blast_res: list) -> bool:
pass

def check_if_plot(column_blast_res: list) -> bool:
def _check_if_plot(column_blast_res: list) -> bool:
"""Check if allele is partial length
Args:
Expand All @@ -113,7 +116,7 @@ def check_if_plot(column_blast_res: list) -> bool:
return True
return False

def discard_low_threshold_results(blast_results: list) -> list:
def _discard_low_threshold_results(blast_results: list) -> list:
"""Discard blast results with lower threshold
Args:
Expand All @@ -131,16 +134,31 @@ def discard_low_threshold_results(blast_results: list) -> list:
valid_blast_result.append(b_result)
return valid_blast_result

def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list:
def _extend_sequence_for_finding_start_stop_codon(
split_blast_result: list,
prot_error_result: str,
predicted_prot_seq: str,
search_codon: str = "stop",
) -> list:
"""Extend match sequence, according the (increase_sequence) for
trying find the stop codon.
trying find the stop or start codon. When parameter is set to
stop additional nucleotides are added to extend the chance to
find out the codon stop.
If parameter is set to start then additional nucleotide is added
on the start value to identify that is a valid start codon. If
true then additional nucletotides are added to find the stop codon.
Args:
split_blast_result (list): list having the informaction collected
by running blast
by running blast
prot_error_result (str): protein conversion result
predicted_prot_seq (str): predicted protein sequence
search_codon (str, optional): codon to be found. 2 values are
allowed start of stop. By default is stop.
Returns:
list: updated information if stop codon is found
list: updated information if stop or start codon is found and the
updated protein sequence and protein conversion result if changed
"""
# collect data for checking PLOT
data_for_plot = [""] * 10
Expand All @@ -151,14 +169,24 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list:
# copy end position
data_for_plot[9] = split_blast_result[10]
# check if PLOT
if not check_if_plot(data_for_plot):
if not _check_if_plot(data_for_plot):
# fetch the sequence until the last triplet is stop codon
contig_seq = self.sample_records[split_blast_result[1]]
start_seq = int(split_blast_result[9])
stop_seq = int(split_blast_result[10])
if stop_seq > start_seq:
# sequence direction is forward
direction = "forward"
if search_codon == "start":
if (
contig_seq[start_seq - 2 : start_seq + 1]
in taranis.utils.START_CODON_FORWARD
):
start_seq -= 2
# continue to find the stop codon with the new start
else:
# start codon not found. Return the original blast result
return split_blast_result
# adjust the sequence to be a triplet
interval = (stop_seq - start_seq) // 3 * 3
new_stop_seq = start_seq + interval + self.increase_sequence
Expand All @@ -169,11 +197,21 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list:
if stop_seq > len(contig_seq):
stop_seq = len(contig_seq) // 3 * 3
else:
stop_seq = new_stop_seq -1
stop_seq = new_stop_seq - 1
c_sequence = contig_seq[start_seq:stop_seq]
else:
# sequence direction is reverse
direction = "reverse"
if search_codon == "start":
if (
contig_seq[start_seq - 2 : start_seq + 1]
in taranis.utils.START_CODON_REVERSE
):
start_seq += 1
# continue to find the stop codon with the new start
else:
# start codon not found. Return the original blast result
return split_blast_result
# adjust the sequence to be a triplet
interval = (start_seq - stop_seq) // 3 * 3
new_stop_seq = start_seq - interval - self.increase_sequence
Expand All @@ -186,28 +224,42 @@ def extend_sequence_for_finding_stop_codon(split_blast_result: list) -> list:
else:
stop_seq = new_stop_seq
# get the sequence in reverse
c_sequence = str(Seq(contig_seq[stop_seq:start_seq]).reverse_complement())
new_prot_conv_result = taranis.utils.convert_to_protein(c_sequence, force_coding=False, check_additional_bases=False)
c_sequence = str(
Seq(contig_seq[stop_seq:start_seq]).reverse_complement()
)
new_prot_conv_result = taranis.utils.convert_to_protein(
c_sequence, force_coding=False, check_additional_bases=False
)
# check if stop codon is found in protein sequence
# pdb.set_trace()
if "protein" in new_prot_conv_result and "*" in new_prot_conv_result["protein"]:
new_seq_length = new_prot_conv_result["protein"].index("*") * 3

if (
"protein" in new_prot_conv_result
and "*" in new_prot_conv_result["protein"]
):
# increase 3 nucleotides beecause index start at 0
new_seq_length = new_prot_conv_result["protein"].index("*") * 3 + 3
match_sequence = c_sequence[:new_seq_length]
split_blast_result[4] = str(new_seq_length)
split_blast_result[14] = match_sequence
prot_error_result = "-"
predicted_prot_seq = new_prot_conv_result["protein"][
0 : new_seq_length // 3
]
# update the start and stop position

if direction == "forward":
# pdb.set_trace()
split_blast_result[10] = str(int(split_blast_result[9]) + new_seq_length)
split_blast_result[10] = str(
int(split_blast_result[9]) + new_seq_length
)
else:
split_blast_result[9] = str(int(split_blast_result[10]) - new_seq_length)
# ignore the previous process if stop codon is not found

# pdb.set_trace()
return split_blast_result
split_blast_result[9] = str(
int(split_blast_result[10]) - new_seq_length
)
# ignore the previous process if stop codon is not found
return split_blast_result, prot_error_result, predicted_prot_seq

def get_blast_details(blast_result: str, allele_name: str, ref_allele_seq) -> list:
def _get_blast_details(
blast_result: str, allele_name: str, ref_allele_seq
) -> list:
"""Collect blast details and modify the order of the columns
Args:
Expand Down Expand Up @@ -254,20 +306,44 @@ def get_blast_details(blast_result: str, allele_name: str, ref_allele_seq) -> li
# remove the gaps in sequences
match_sequence = split_blast_result[13].replace("-", "")
# check if the sequence is coding
prot_conv_result = taranis.utils.convert_to_protein(match_sequence, force_coding=False, check_additional_bases=True)
prot_error_result = prot_conv_result["error"] if "error" in prot_conv_result else "-"
predicted_prot_seq = prot_conv_result["protein"] if "protein" in prot_conv_result else "-"
prot_conv_result = taranis.utils.convert_to_protein(
match_sequence, force_coding=False, check_additional_bases=True
)
prot_error_result = (
prot_conv_result["error"] if "error" in prot_conv_result else "-"
)
predicted_prot_seq = (
prot_conv_result["protein"] if "protein" in prot_conv_result else "-"
)
# remove if additional sequenced are added at the end of the stop codon
pdb.set_trace()
# pdb.set_trace()
if "additional bases added after stop codon" in prot_error_result:
new_seq_len = len(match_sequence) // 3 * 3
match_sequence = match_sequence[:new_seq_len]
split_blast_result[4] = str(new_seq_len)
# add more sequence to find the stop codon
elif "Last sequence is not a stop codon" in prot_error_result:
split_blast_result = extend_sequence_for_finding_stop_codon(split_blast_result)
(
split_blast_result,
prot_error_result,
predicted_prot_seq,
) = _extend_sequence_for_finding_start_stop_codon(
split_blast_result,
prot_error_result,
predicted_prot_seq,
search_codon="stop",
)
elif "Sequence does not have a start codon" in prot_error_result:
pass
(
split_blast_result,
prot_error_result,
predicted_prot_seq,
) = _extend_sequence_for_finding_start_stop_codon(
split_blast_result,
prot_error_result,
predicted_prot_seq,
search_codon="start",
)
# get blast details
blast_details = [
self.s_name, # sample name
Expand Down Expand Up @@ -309,17 +385,19 @@ def find_match_allele_schema(allele_file: str, match_sequence: str) -> str:
return grep_result[0].split("_")[1]
return ""

valid_blast_results = discard_low_threshold_results(blast_results)
valid_blast_results = _discard_low_threshold_results(blast_results)
match_allele_schema = ""
if len(valid_blast_results) == 0:
# no match results labelled as LNF. details data filled with empty data
return ["LNF", "-", ["-"]* 18]
return ["LNF", "-", ["-"] * 18]
if len(valid_blast_results) > 1:
# could be NIPHEM or NIPH
b_split_data = []
match_allele_seq = []
for valid_blast_result in valid_blast_results:
multi_allele_data = get_blast_details(valid_blast_result, allele_name, ref_allele_seq)
multi_allele_data = _get_blast_details(
valid_blast_result, allele_name, ref_allele_seq
)
# get match allele sequence
match_allele_seq.append(multi_allele_data[14])
b_split_data.append(multi_allele_data)
Expand All @@ -339,33 +417,42 @@ def find_match_allele_schema(allele_file: str, match_sequence: str) -> str:
for (idx,) in range(len(b_split_data)):
b_split_data[idx][4] = classification + "_" + match_allele_schema
else:
b_split_data = get_blast_details(valid_blast_results[0], allele_name, ref_allele_seq)
b_split_data = _get_blast_details(
valid_blast_results[0], allele_name, ref_allele_seq
)
# found the allele in schema with the match sequence in the contig
match_allele_schema = find_match_allele_schema(
allele_file, b_split_data[14]
allele_file, b_split_data[14]
)
# PLOT, TPR, ASM, ALM, INF, EXC are possible classifications
if match_allele_schema != "":
# exact match found labelled as EXC
classification = "EXC"
elif check_if_plot(b_split_data):
elif _check_if_plot(b_split_data):
# match allele is partial length labelled as PLOT
classification = "PLOT"
# check if protein length divided by the length of triplet matched
# sequence is lower the the tpr limit
elif b_split_data[16] == "Multiple stop codons" and b_split_data[17].index("*") / (int(b_split_data[6]) / 3) < self.tpr_limit:
elif (
b_split_data[16] == "Multiple stop codons"
and b_split_data[17].index("*") / (int(b_split_data[6]) / 3)
< self.tpr_limit
):
# labelled as TPR
classification = "TPR"
# check if match allele is shorter than reference allele
elif int(b_split_data[6]) < int(b_split_data[5]):
classification = "ASM"
# check if match allele is longer than reference allele
elif int(b_split_data[6]) > int(b_split_data[5]):
elif (
int(b_split_data[6]) > int(b_split_data[5])
or b_split_data[16] == "Last sequence is not a stop codon"
):
classification = "ALM"
else:
# if sequence was not found after running grep labelled as INF
classification = "INF"

# assign an identification value to the new allele
if match_allele_schema == "":
match_allele_schema = str(
Expand Down Expand Up @@ -434,7 +521,9 @@ def search_match_allele(self):
result["allele_type"][allele_name],
result["allele_match"][allele_name],
result["allele_details"][allele_name],
) = self.assign_allele_type(blast_result, allele_file, allele_name, r_seq)
) = self.assign_allele_type(
blast_result, allele_file, allele_name, r_seq
)
else:
# Sample does not have a reference allele to be matched
# Keep LNF info
Expand All @@ -453,6 +542,7 @@ def search_match_allele(self):
if self.snp_request and result["allele_type"][allele_name] != "LNF":
# run snp analysis
print(allele_name)
# pdb.set_trace()
result["snp_data"][allele_name] = taranis.utils.get_snp_information(
ref_allele_seq, allele_seq, ref_allele_name
)
Expand Down Expand Up @@ -494,7 +584,7 @@ def parallel_execution(
snp_request,
aligment_request,
trp_limit,
increase_sequence
increase_sequence,
)
sample_name = Path(sample_file).stem
stderr.print(f"[green] Analyzing sample {sample_name}")
Expand All @@ -512,7 +602,17 @@ def collect_data(
def stats_graphics(stats_folder: str, summary_result: dict) -> None:
stderr.print("Creating graphics")
log.info("Creating graphics")
allele_types = ["NIPHEM", "NIPH", "EXC", "PLOT", "ASM", "ALM", "INF", "LNF", "TPR"]
allele_types = [
"NIPHEM",
"NIPH",
"EXC",
"PLOT",
"ASM",
"ALM",
"INF",
"LNF",
"TPR",
]
# inizialize classification data
classif_data = {}
for allele_type in allele_types:
Expand Down
Loading

0 comments on commit 5a3a343

Please sign in to comment.