diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b2c1bf3..3ce5e7dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Thank you to everyone else that has contributed by reporting bugs, enhancements - [[PR #401](https://github.com/nf-core/viralrecon/pull/401)] - Added option to add a custom annotation - [[PR #417](https://github.com/nf-core/viralrecon/pull/417)] - Allow skipping of Freyja bootstrapping module & freyja module update - [[PR #434](https://github.com/nf-core/viralrecon/pull/434)] - Add blast result filtering through `min_contig_length` and `min_perc_contig_aligned`. +- [[PR #]()] - Refactored old ivar_variants_to_vcf.py with new functionalities like a quality threshold. ### Parameters diff --git a/bin/ivar_variants_to_vcf.py b/bin/ivar_variants_to_vcf.py index d00b3988..7343bf3a 100755 --- a/bin/ivar_variants_to_vcf.py +++ b/bin/ivar_variants_to_vcf.py @@ -1,17 +1,16 @@ #!/usr/bin/env python -from email.charset import QP import os import sys -import re import errno import argparse -from collections import OrderedDict -from collections import deque +import warnings +from itertools import product import numpy as np from Bio import SeqIO from scipy.stats import fisher_exact +import pandas as pd def parse_args(args=None): @@ -24,8 +23,8 @@ def parse_args(args=None): parser.add_argument( "-po", "--pass_only", - help="Only output variants that PASS filters.", action="store_true", + help="Only output variants that PASS filters.", ) parser.add_argument( "-af", @@ -34,18 +33,38 @@ def parse_args(args=None): default=0, help="Only output variants where allele frequency is greater than this number (default: 0).", ) + parser.add_argument( + "-bq", + "--bad_quality_threshold", + type=int, + default=20, + help="Only output variants where ALT_QUAL is greater than this number (default: 20).", + ) + parser.add_argument( + "-mt", + "--merge_af_threshold", + type=float, + default=0.25, + help="Only merge variants within a range of Allele Frequency. Useful to distinguish haplotypes.", + ) + parser.add_argument( + "-cf", + "--consensus_af", + type=float, + default=0.75, + help="Allele Frenquecy threshold used to include variants in consensus sequence.", + ) parser.add_argument( "-is", "--ignore_strand_bias", - default=False, - help="Does not take strand bias into account, use this option when not using amplicon sequencing.", action="store_true", + help="Does not take strand bias into account, use this option when using amplicon sequencing.", ) parser.add_argument( "-ic", "--ignore_merge_codons", - help="Output variants without taking into account if consecutive positions belong to the same codon.", action="store_true", + help="Output variants without taking into account if consecutive positions belong to the same codon.", ) parser.add_argument( "-f", @@ -57,537 +76,717 @@ def parse_args(args=None): return parser.parse_args(args) -def make_dir(path): - """ - Description: - Create directory if it doesn't exist. - Input: - path - path where the directory will be created. - Returns: - None - """ - if not len(path) == 0: +class IvarVariants: + def __init__( + self, + file_in=None, + file_out=None, + pass_only=None, + freq_threshold=None, + bad_qual_threshold=None, + merge_af_threshold=None, + consensus_af=None, + ignore_stbias=None, + ignore_merge=None, + fasta=None, + ): + self.file_in = file_in + self.file_out = file_out + if os.path.exists(self.file_out): + self.filename = str(os.path.splitext(self.file_in)[0]) + else: + self.filename = str(self.file_in) + self.pass_only = pass_only try: - os.makedirs(path) - except OSError as exception: - if exception.errno != errno.EEXIST: - raise - - -def parse_ivar_line(line): - """ - Description: - Parse ivar line to get needed variables for vcf format. - input: - line - ivar tsv line - return: - CHROM, POS, ID, REF, ALT, QUAL, INFO, FORMAT, REF_CODON, ALT_CODON, pass_test, var_type - """ - - line = line.strip("\n").split("\t") - ## Assign intial fields to variables - CHROM = line[0] - POS = line[1] - ID = "." - REF = line[2] - ALT = line[3] - - ## REF/ALF depths and quals - try: - REF_DP = int(line[4]) - except ValueError: - print(line) - print(line[4]) - exit(-1) - REF_RV = int(line[5]) - REF_FW = REF_DP - REF_RV - REF_QUAL = int(line[6]) - ALT_RV = int(line[8]) - ALT_DP = int(line[7]) - ALT_FW = ALT_DP - ALT_RV - ALT_QUAL = int(line[9]) - ALT_FREQ = float(line[10]) - FORMAT = [REF_DP, REF_RV, REF_QUAL, ALT_DP, ALT_RV, ALT_QUAL, ALT_FREQ] - - ## Codon annotation - REF_CODON = line[15] - ALT_CODON = line[17] - - ## Determine variant type - var_type = "SNP" - if ALT[0] == "+": - ALT = REF + ALT[1:] - var_type = "INS" - elif ALT[0] == "-": - REF += ALT[1:] - ALT = line[2] - var_type = "DEL" - - QUAL = "." - - ## Determine FILTER field - INFO = f"DP={int(float(line[11]))}" - pass_test = line[13] - - return ( - CHROM, - POS, - ID, - REF, - ALT, - QUAL, - INFO, - FORMAT, - REF_CODON, - ALT_CODON, - pass_test, - var_type, - ) - - -###################### -## FILTER FUNCTIONS ## -###################### - - -def ivar_filter(pass_test): - """ - Description: - process ivar filter into vcf filter format. - input: - pass_test - ivar fisher exact test [ True, False ] - return: - Whether it passes the filter or not. [False, "ft"] - """ - if pass_test == "TRUE": - return False - else: - return "ft" - - -def strand_bias_filter(format): - """ - Description: - Calculate strand-bias fisher test. - input: - format - format variables - return: - Whether it passes the filter or not. [False, "sb"] - """ - # format=[REF_DP, REF_RV, REF_QUAL, ALT_DP, ALT_RV, ALT_QUAL, ALT_FREQ] - # table: - ## REF_FW REF_RV - ## ALT_FW ALT_RV - table = np.array([[format[0] - format[1], format[1]], [format[3] - format[4], format[4]]]) - oddsr, pvalue = fisher_exact(table, alternative="greater") - - # h0: both strands are equally represented. - # If test is significant h0 is refused so there is an strand bias. - if pvalue < 0.05: - return "sb" - else: - return False - - -def write_vcf_header(ref, ignore_strand_bias, file_out, filename): - """ - Description: - Write vcf header for VCFv4.2 - input: - ref - (optional), ref in fasta format - ignore_strand_bias - if no strand-bias is calculated [True, False] - file_out - output file_in - filename - name of the output file - return: - Nothing. - """ - ## Define VCF header - header_source = ["##fileformat=VCFv4.2", "##source=iVar"] - if ref: - header_contig = [] - for record in SeqIO.parse(ref, "fasta"): - header_contig += ["##contig="] - - header_source += header_contig - - header_info = ['##INFO='] - header_filter = [ - '##FILTER=', - '##FILTER= 0.05">', - ] - header_format = [ - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - '##FORMAT=', - ] - header_cols = [f"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\t{filename}"] - if not ignore_strand_bias: - header_filter += ['##FILTER='] - - header = header_source + header_info + header_filter + header_format + header_cols - fout = open(file_out, "w") - fout.write("\n".join(header) + "\n") - fout.close() - - -def write_vcf_line(chrom, pos, id, ref, alt, filter, qual, info, format, file_out): - """ - Description: - Format variables into vcf line format and write line to file. - input: - chrom, pos, id, ref, alt, filter, qual, info, format - vcf variables - file_out - file output - return: - Nothing. - """ - sample = f'1:{":".join(str(x) for x in format)}' - format = "GT:REF_DP:REF_RV:REF_QUAL:ALT_DP:ALT_RV:ALT_QUAL:ALT_FREQ" - - oline = ( - chrom - + "\t" - + pos - + "\t" - + id - + "\t" - + ref - + "\t" - + alt - + "\t" - + qual - + "\t" - + filter - + "\t" - + info - + "\t" - + format - + "\t" - + sample - + "\n" - ) - fout = open(file_out, "a") - fout.write(oline) - fout.close() + self.freq_threshold = float(freq_threshold) + except Exception: + print("Invalid allele frequency threshold. Setting it to 0") + self.freq_threshold = 0 + try: + self.bad_qual_threshold = float(bad_qual_threshold) + except Exception: + print("Invalid bad quality threshold. Setting it to 20") + self.bad_qual_threshold = 20 + try: + self.merge_af_threshold = float(merge_af_threshold) + except Exception: + print("Invalid merge af threshold. Setting it to 0.25") + self.merge_af_threshold = 0.25 + try: + self.consensus_af = float(consensus_af) + except Exception: + print("Invalid consensus af threshold. Setting it to 0.75") + self.consensus_af = 0.75 + self.ignore_stbias = ignore_stbias + self.ignore_merge = ignore_merge + if not self.file_out: + exit("Output file not provided. Aborting...") + if fasta: + self.ref_fasta = ( + fasta if os.path.exists(str(fasta)) else exit("Invalid fasta path") + ) + else: + self.ref_fasta = None + if self.file_in: + try: + self.raw_ivar_df = pd.read_csv(self.file_in, sep="\t") + except Exception: + exit(f"Could not read input file: {file_in}") + else: + exit("Input file not provided. Aborting...") + if self.raw_ivar_df.empty: + exit("Input tsv was empty") + + def strand_bias_filter(self, row): + """Calculate strand-bias fisher test. + + Args: + row (pd.Series)- a row from ivar.tsv dataframe as series + Returns: + str(): Whether it passes the filter ("") or not. ("sb") + """ + data_matrix = np.array( + [ + [row["REF_DP"] - row["REF_RV"], row["REF_RV"]], + [row["ALT_DP"] - row["ALT_RV"], row["ALT_RV"]], + ] + ) + _, pvalue = fisher_exact(data_matrix, alternative="greater") + if pvalue < 0.05: + return "sb" + else: + return "" -############################ -## MERGE CODONS FUNCTIONS ## -############################ + def apply_filters(self, row): + """Apply all the filters to a row and return its output merged + Args: + row (pd.Series)- a row from ivar.tsv dataframe as series -def check_consecutive(mylist): - """ - Description: - This function checks a list of numbers and returns how many items are consecutive. - input: - my_list - A list of integers - return: - Number of items consecutive in the list - [False, 2, 3,..] - """ - # getting first index of tuple for consecutive checking - my_list = list(map(int, [i[0] for i in mylist])) - ## Check if the list contains consecutive numbers - if len(my_list) == 1: - return False - elif sorted(my_list) == list(range(min(my_list), max(my_list) + 1)): - return len(my_list) - else: - ## If not, and the list is > 1, remove the last item and reevaluate. - if len(my_list) > 2: - my_list.pop() - if sorted(my_list) == list(range(min(my_list), max(my_list) + 1)): - return len(my_list) + Returns: + vcf_filter (str): Results from each filter joined by ';' + """ + ivar_filter = "" if row["PASS"] == True else "ft" + try: + float(row["ALT_QUAL"]) + except ValueError: + bad_quality_filter = "bq" + bad_quality_filter = "" if row["ALT_QUAL"] >= self.bad_qual_threshold else "bq" + if not self.ignore_stbias: + stb_filter = self.strand_bias_filter(row) else: + stb_filter = "" + all_filters = [ivar_filter, stb_filter, bad_quality_filter] + vcf_filter = ";".join([x for x in all_filters if x]) + if not vcf_filter: + vcf_filter = "PASS" + return vcf_filter + + def initiate_vcf_df(self): + """Read the input ivar.tsv file, process the data depending on the parameters + selected when running the script and create a pandas dataframe with it + + Returns: + vcf_df: Pandas dataframe after processing the file + """ + ivar_df = self.raw_ivar_df.copy() + ivar_df = ivar_df.dropna(thresh=2) + if self.pass_only: + ivar_df = ivar_df[ivar_df["PASS"] is True] + ivar_df = ivar_df[ivar_df["ALT_FREQ"] >= self.freq_threshold] + vcf_dict = {} + vcf_dict["REGION"] = ivar_df["REGION"] + vcf_dict["POS"] = ivar_df["POS"] + vcf_dict["ID"] = ["."] * len(ivar_df) + # Dealing with insertions and deletions + vcf_dict["REF"] = np.where( + ivar_df["ALT"].str[0] == "-", + ivar_df["REF"] + ivar_df["ALT"].str[1:], + ivar_df["REF"], + ) + vcf_dict["ALT"] = np.where( + ivar_df["ALT"].str[0] == "+", + ivar_df["REF"] + ivar_df["ALT"].str[1:], + np.where(ivar_df["ALT"].str[0] == "-", ivar_df["REF"], ivar_df["ALT"]), + ) + vcf_dict["QUAL"] = ["."] * len(ivar_df) + vcf_dict["FILTER"] = ivar_df.apply(self.apply_filters, axis=1) + vcf_dict["INFO"] = np.select( + [ivar_df["ALT"].str[0] == "+", ivar_df["ALT"].str[0] == "-"], + ["TYPE=INS", "TYPE=DEL"], + default="TYPE=SNP", + ) + format_cols = [ + "GT", + "DP", + "REF_DP", + "REF_RV", + "REF_QUAL", + "ALT_DP", + "ALT_RV", + "ALT_QUAL", + "ALT_FREQ", + ] + vcf_dict["FORMAT"] = ":".join(format_cols) + # simple workaround by setting the genotype to a constant 1 + ivar_df["DP"] = ivar_df["TOTAL_DP"] + ivar_df["GT"] = 1 + vcf_dict["FILENAME"] = ivar_df[format_cols].astype(str).apply(":".join, axis=1) + if not self.ignore_merge: + # These columns are needed to merge codons. They will be deleted later + vcf_dict["REF_CODON"] = ivar_df["REF_CODON"] + vcf_dict["ALT_CODON"] = ivar_df["ALT_CODON"] + vcf_df = pd.DataFrame.from_dict(vcf_dict) + return vcf_df + + def find_consecutive(self, vcf_df): + """Find and extract the consecutive variants in the vcf dataframe + + Args: + vcf_df (pd.DataFrame): Pandas df from initiate_vcf_df() + + Returns: + consecutive_df(pd.DataFrame): dataframe only with variants in + consecutive positions, including those with duplicates + """ + consecutive_mask = vcf_df["POS"].diff() <= 1 + consecutive_mask = consecutive_mask | consecutive_mask.shift(-1) + consecutive_df = vcf_df[consecutive_mask] + return consecutive_df + + def split_non_consecutive(self, consecutive_df): + """Split rows that are not consecutive to form groups of consecutive variants + + Args: + consecutive_df (pd.DataFrame): Consecutive variants from find_consecutive() + + Returns: + split_rows_list (list): List of dataframes with consecutive rows + """ + # Numpy raises a warning from a pandas function that is wrongly set as deprecated + with warnings.catch_warnings(): + warnings.simplefilter(action="ignore", category=FutureWarning) + split_rows_list = np.split( + consecutive_df, np.where(np.diff(consecutive_df["POS"]) > 1)[0] + 1 + ) + return split_rows_list + + def find_duplicates(self, row_set): + """Check wether there are duplicated variants in a group of consecutive rows + + Args: + row_set(pd.DataFrame): Rows from split_non_consecutive() & get_same_codon() + + Returns: + Bool: True if there are duplicates in the dataframe, False if not + """ + consdups_mask = row_set["POS"].diff() == 0 + consdups_mask = consdups_mask | consdups_mask.shift(-1) + if row_set[consdups_mask].empty: return False - return False - - -def get_diff_position(seq1, seq2): - """ - Description: - Function to compare two codon nucleotide sequences (size 3) and retuns the position where it differs. - Input: - seq1 - string size 3 [A,T,C,G]. Ex. "ATC" - seq2 - string size 3 [A,T,C,G]. Ex. "ACC" - Returns: - Returns position where seq1 != seq2 - """ - # If codon is NA treat as not same codon - if seq1 == "NA": - return 2 + else: + return True + + def get_same_codon(self, splitted_groups): + """Receive a list of dataframes and exclude dataframes with only consecutive + rows that share codon sequence, excluding those purely formed by duplicates. + + Args: + splitted_groups (list(pd.DataFrame)): Groups from split_non_consecutive() + + Returns: + same_codon_rows(list(pd.DataFrame)): list with separated dataframes + """ + same_codon_dfs = [] + for consec_df in splitted_groups: + clean_codon_rows = [ + self.find_consecutive(group) + for _, group in consec_df.groupby("REF_CODON") + if not self.find_consecutive(group).empty + and not self.find_consecutive(group)["POS"].nunique() == 1 + ] + same_codon_dfs.append(clean_codon_rows) + same_codon_consecutive = [df for group in same_codon_dfs for df in group] + return same_codon_consecutive + + def split_by_codon(self, same_codon_rows): + """Split each dataframe into a dictionary with rows belonging to the same codon + + Args: + same_codon_rows (list(pd.DataFrame)): Groups from get_same_codon() + + Returns: + split_rows_dict(dict(index:pd.DataFrame)): Dictionary containing dataframe + with rows belonging to the same codon as values and the index of the first + row as keys, which is the position where the rows are going to be merged. + """ + split_rows_dict = {} + for rowset in same_codon_rows: + last_pos = 0 + rows_groups = [] + first_index = None + for row in rowset.itertuples(): + alt_pos = [x for x in range(3) if row.REF_CODON[x] != row.ALT_CODON[x]] + if len(alt_pos) > 1: + print("Conflicting variants in position %s. Skipped" % row.POS) + continue + alt_pos = alt_pos[0] + first_index = row.Index if first_index is None else first_index + if alt_pos < last_pos: + split_rows_dict[first_index] = pd.DataFrame(rows_groups) + rows_groups = [] + first_index = row.Index + rows_groups.append(row) + last_pos = alt_pos + split_rows_dict[first_index] = pd.DataFrame(rows_groups) + return split_rows_dict + + def merge_rows(self, consec_rows): + """Merge certain columns from a set of rows into the position of the first one + + Args: + consec_rows (pd.DataFrame): Clean dataframe only with rows to be merged + + Returns: + row_to_merge (list): List with the values for each cell in the merged row + """ + merged_index = consec_rows.head(1).index[0] + consec_rows.at[merged_index, "REF"] = "".join(consec_rows["REF"]) + consec_rows.at[merged_index, "ALT"] = "".join(consec_rows["ALT"]) + freqs_to_merge = ",".join(consec_rows["FILENAME"].str.split(":").str[8]) + depths_to_merge = ",".join(consec_rows["FILENAME"].str.split(":").str[5]) + stats_to_merge = ":".join([freqs_to_merge, depths_to_merge]) + consec_rows.at[merged_index, "FILENAME"] += f":{stats_to_merge}" + consec_rows.at[merged_index, "FORMAT"] += ":MERGED_AF:MERGED_DP" + lowest_af = min(freqs_to_merge.split(",")) + filecol_list = consec_rows.at[merged_index, "FILENAME"].split(":") + filecol_list[8] = lowest_af + consec_rows.at[merged_index, "FILENAME"] = ":".join(filecol_list) + merged_row = consec_rows.loc[merged_index].values.tolist() + return merged_row + + def create_merge_rowlist(self, clean_rows_list): + """Merge all the given rows in a single one for each dataframe of consecutive + rows in a given list + + Args: + clean_rows_list (list(pd.DataFrame())): Dataframes with consecutive rows + + Returns: + rows_to_merge (list(list())): List of merged rows as a list of values + """ + rows_to_merge = [] + for rowbatch in clean_rows_list: + if rowbatch.empty: + continue + if len(rowbatch) == 1: + rows_to_merge.append(rowbatch.values.tolist()[0]) + else: + if self.find_consecutive(rowbatch).empty: + continue + merged_row = self.merge_rows(rowbatch) + rows_to_merge.append(merged_row) + return rows_to_merge + + def handle_dup_rows(self, row_set): + """Split dataframe with multiple variants in the same position and create a list + of rows for each possible resulting codon in the position of the first variant + + Args: + row_set (pd.DataFrame): Consecutive variants df from split_rows_dict() + + Returns: + merged_rowlist (list(list)): List of merged rows from merge_rows() + """ + # Numpy raises a warning from a pandas function that is wrongly set as deprecated + # https://github.com/numpy/numpy/issues/24889 + # https://github.com/apache/arrow/issues/36412 + with warnings.catch_warnings(): + warnings.simplefilter(action="ignore", category=FutureWarning) + split_dups = np.split( + row_set, np.where(np.diff(row_set["POS"]) >= 1)[0] + 1 + ) + split_indexes = [rowlist.index.to_list() for rowlist in split_dups] + index_product_list = [indexlist for indexlist in product(*split_indexes)] + merged_rowlist = [] + for indexlist in index_product_list: + consec_rows = row_set.loc[indexlist, :] + clean_rows_list = self.merge_ref_alt(consec_rows.copy()) + cleaned_ref_rows_list = self.remove_edge_ref(clean_rows_list) + batch_rowlist = self.create_merge_rowlist(cleaned_ref_rows_list.copy()) + merged_rowlist.extend(batch_rowlist) + return merged_rowlist + + def get_ref_rowset(self, row_set): + """Create a new row for each variant row in the dataframe emulating the + reference for that position + + Args: + row_set (pd.DataFrame()): A certain group of consecutive variants + + Returns: + merged_ref_rows: Same Df but with duplicated rows that emulate reference + for each variant position. + """ + ref_row_set = row_set.copy() + ref_row_set["ALT"] = ref_row_set["REF"] + ref_row_set["ALT_CODON"] = ref_row_set["REF_CODON"] + filecol = ref_row_set["FILENAME"].values.tolist() + ref_filecol = [] + for row in filecol: + # values = GT:DP:REF_DP:REF_RV:REF_QUAL:ALT_DP:ALT_RV:ALT_QUAL:ALT_FREQ + split_vals = row.split(":") + ref_dp = split_vals[2] + total_dp = split_vals[1] + split_vals[8] = str(round(int(ref_dp)/int(total_dp), 3)) + split_vals[5] = ref_dp + ref_filecol.append(":".join(split_vals)) + ref_row_set["FILENAME"] = ref_filecol + merged_ref_rows = ( + pd.concat([row_set, ref_row_set]).sort_values("POS").reset_index(drop=True) + ) + return merged_ref_rows + + def merge_rule_check(self, alt_dictlist): + """Evaluate a list of possible codons and decide if they can be merged together + based on certain conditions regarding Allele Frequency + + Args: + alt_dictlist (list(dict()): A list of dictionaries with consecutive locus + + Returns: + consec_series (list(dict()): Same list but only with validated locus + """ + consec_series = [] + + def is_subset(maindict, subdict): + for key, value in subdict.items(): + if key not in maindict or maindict[key] != value: + return False + return True + + for altdict in alt_dictlist: + if len(altdict) <= 1: + consec_series.append(altdict) + continue + af_list = [x["AF"] for x in altdict.values()] + key_list = list(altdict.keys()) + if any(af >= self.consensus_af for af in af_list): + consec_series.append(altdict) + continue + distances = [] + for i in range(len(altdict) - 1): + key1 = key_list[i] + key2 = key_list[i + 1] + distance = abs(altdict[key2]["AF"] - altdict[key1]["AF"]) + distances.append(distance) + if all(dist < self.merge_af_threshold for dist in distances): + consec_series.append(altdict) + for i, dist in enumerate(distances): + if dist <= self.merge_af_threshold and af_list[i] <= self.consensus_af: + close_pair = { + key_list[i]: altdict[i], + key_list[i + 1]: altdict[i + 1], + } + else: + close_pair = {key_list[i]: altdict[i]} + if not any(is_subset(d, close_pair) for d in consec_series): + consec_series.append(close_pair) + if all(dist > self.merge_af_threshold for dist in distances): + for i in range(len(af_list)): + if not any(is_subset(d, close_pair) for d in consec_series): + consec_series.append({key_list[i]: altdict[i]}) + + return consec_series + + def merge_ref_alt(self, consec_rows): + """Create a list of all possible combinations of REF and ALT consecutive codons + following certain conditions of similarity using Allele Frequency values + + Args: + consec_rows (list(pd.DataFrame)): List of dataframes with consecutive rows + + Returns: + clean_rows_list (list(pd.DataFrame)): Filtered list with viable combinations + """ + # Compare variants AF with REF and group those with more similarity + + merged_ref_rows = self.get_ref_rowset(consec_rows.copy()) + merged_ref_rows["AF"] = merged_ref_rows["FILENAME"].str.split(":").str[8] + alt_rows = merged_ref_rows[ + merged_ref_rows["REF_CODON"] != merged_ref_rows["ALT_CODON"] + ].reset_index(drop=True) + ref_rows = merged_ref_rows[ + merged_ref_rows["REF_CODON"] == merged_ref_rows["ALT_CODON"] + ].reset_index(drop=True) + ref_rows["REF_DP"] = ref_rows["FILENAME"].str.split(":").str[2] + for col in ["AF", "ALT", "ALT_CODON", "FILENAME"]: + ref_rows[col] = np.where( + ref_rows["REF_DP"] == "0", alt_rows[col], ref_rows[col] + ) + ref_rows = ref_rows.drop("REF_DP", axis=1) + ref_dict = { + x: {"AF": float(y), "set": "ref"} + for x, y in ref_rows["AF"].to_dict().items() + } + alt_dict = { + x: {"AF": float(y), "set": "alt"} + for x, y in alt_rows["AF"].to_dict().items() + } + alt_combinations = list( + product(*[[(k, alt_dict[k]), (k, ref_dict[k])] for k in alt_dict.keys()]) + ) + combined_dictlist = [dict(comb) for comb in alt_combinations] + # Keep together only those codons that fulfill certain similarity rules + consec_series = self.merge_rule_check(combined_dictlist) + clean_rows_list = [] + for rowdict in consec_series: + consec_rowsdict = {} + for key, vals in rowdict.items(): + if vals["set"] == "alt": + consec_rowsdict[key] = alt_rows.loc[int(key)] + else: + consec_rowsdict[key] = ref_rows.loc[int(key)] + consec_df = pd.DataFrame.from_dict(consec_rowsdict, orient="index") + if self.consensus_merge is True: + if any(x >= self.consensus_af for x in consec_df["AF"].astype(float)): + consensus_dfs = consec_df.groupby( + consec_df["AF"].astype(float) >= self.consensus_af + ) + for af_in_consensus, df in consensus_dfs: + df = df.drop("AF", axis=1) + if af_in_consensus is True: + if not self.find_consecutive(df).empty: + clean_rows_list.append(df) + else: + for _, row in df.groupby("POS"): + clean_rows_list.append(row) + else: + for _, row in df.groupby("POS"): + clean_rows_list.append(row) + else: + for _, row in consec_df.drop("AF", axis=1).groupby("POS"): + clean_rows_list.append(row) + else: + clean_loc_df = consec_df.drop("AF", axis=1) + if not clean_loc_df.empty: + clean_rows_list.append(clean_loc_df) + + return clean_rows_list + + def remove_edge_ref(self, clean_rows_list): + """Remove reference nucleotides from both edges of the variant codon + + Args: + clean_rows_list (List(pd.DataFrame)): List of variants to be merged + Returns: + cleaned_ref_rows_list (List(pd.DataFrame)): List of rows without edge refs + """ + + def indexes_are_consecutive(idx_list): + """Returns True if ints in list are consecutive, or just 1 element""" + return sorted(idx_list) == list(range(min(idx_list), max(idx_list) + 1)) + + def remove_subsets(cleaned_ref_rows_list): + """Remove those dataframes which are subsets of another one in the list""" + + def is_subset(df1, df2): + """Returns True if df1 is a subset of df2""" + return len(df1.merge(df2)) == len(df1) + + max_length = max(len(df) for df in cleaned_ref_rows_list) + largest_dfs = [df for df in cleaned_ref_rows_list if len(df) == max_length] + other_dfs = [df for df in cleaned_ref_rows_list if len(df) < max_length] + if not other_dfs: + return largest_dfs + final_ref_rows_list = largest_dfs + for smalldf in other_dfs: + if not any(is_subset(smalldf, bigdf) for bigdf in largest_dfs): + final_ref_rows_list.append(smalldf) + return final_ref_rows_list + + cleaned_ref_rows_list = [] + for df in clean_rows_list: + ref_col = df["REF"] + alt_col = df["ALT"] + idx_matches = [idx for idx in alt_col.index if alt_col[idx] == ref_col[idx]] + if not idx_matches: + cleaned_ref_rows_list.append(df) + continue + if len(df) == 3 and idx_matches == [1]: + cleaned_ref_rows_list.append(df) + continue + if indexes_are_consecutive(idx_matches): + df = df.drop(idx_matches, axis=0) + if not df.empty: + cleaned_ref_rows_list.append(df) + final_ref_rows_list = remove_subsets(cleaned_ref_rows_list) + return final_ref_rows_list + + def process_vcf_df(self, vcf_df): + """Merge rows with consecutive SNPs that passed all filters and without NAs + + Args: + vcf_df - dataframe from self.initiate_vcf_df() + Returns: + processed_vcf_df: dataframe with consecutive variants merged + """ + + def include_rows(vcf_df, last_index, rows_to_merge): + indexes_to_merge = [ + x for x in range(last_index, last_index + len(rows_to_merge)) + ] + for index, row in zip(indexes_to_merge, rows_to_merge): + try: + vcf_df.loc[index] = row + except ValueError: + print(f"Invalid row found: {str(row)}. Skipped") + return vcf_df + + clean_vcf_df = vcf_df[vcf_df["INFO"] == "TYPE=SNP"] + clean_vcf_df = clean_vcf_df[clean_vcf_df["FILTER"] == "PASS"].dropna() + consecutive_df = self.find_consecutive(clean_vcf_df) + if consecutive_df.empty: + return vcf_df + splitted_groups = self.split_non_consecutive(consecutive_df) + same_codon_consecutive = self.get_same_codon(splitted_groups) + split_rows_dict = self.split_by_codon(same_codon_consecutive) + for _, row_set in sorted(split_rows_dict.items()): + last_index = vcf_df.tail(1).index[0] + 1 + vcf_df = vcf_df.drop(row_set.Index) + # Redundant "Index" column is generated by Itertuples in split_by_codon() + row_set = row_set.drop(["Index"], axis=1) + if self.find_duplicates(row_set): + rows_to_merge = self.handle_dup_rows(row_set.copy()) + else: + clean_rows_list = self.merge_ref_alt(row_set.copy()) + cleaned_ref_rows_list = self.remove_edge_ref(clean_rows_list) + rows_to_merge = self.create_merge_rowlist(cleaned_ref_rows_list.copy()) + vcf_df = include_rows(vcf_df, last_index, rows_to_merge) + vcf_df = vcf_df[vcf_df["REF"] != vcf_df["ALT"]] + vcf_df = vcf_df.sort_index().reset_index(drop=True) + processed_vcf_df = vcf_df.drop_duplicates().sort_values("POS") + return processed_vcf_df + + def get_vcf_header(self): + """Create the vcf header for VCFv4.2 + + Returns: + header: String containing all the vcf header lines separated by newline. + """ + # Define VCF header + header_source = ["##fileformat=VCFv4.2", "##source=iVar"] + if self.ref_fasta: + header_contig = [] + for record in SeqIO.parse(self.ref_fasta, "fasta"): + header_contig += [ + "##contig=" + ] + + header_source += header_contig + + header_info = [ + '##INFO=', + ] + header_filter = [ + '##FILTER=', + '##FILTER= 0.05">', + '##FILTER=', + ] + if not self.ignore_stbias: + header_filter.append( + '##FILTER=' + ) + header_format = [ + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + '##FORMAT=', + ] + if not self.ignore_merge: + header_format.append( + '##FORMAT=' + ) + header_format.append( + '##FORMAT=' + ) + header = header_source + header_info + header_filter + header_format + return header + + def write_vcf(self): + """Process ivar.tsv, merge the vcf header and table and write them into a file""" + vcf_header = "\n".join(self.get_vcf_header()) + vcf_table = self.initiate_vcf_df() + + def export_vcf(vcf_table, consensus=True): + self.consensus_merge = consensus + if not self.ignore_merge: + processed_vcf = self.process_vcf_df(vcf_table) + try: + processed_vcf = processed_vcf.drop(["REF_CODON", "ALT_CODON"], axis=1) + except KeyError: + pass + # Workaround because itertuples cannot handle special characters in column names + processed_vcf = processed_vcf.rename( + columns={"REGION": "#CHROM", "FILENAME": self.filename} + ) + if consensus: + filepath = self.file_out + else: + basename = os.path.splitext(os.path.basename(self.file_out))[0] + filename = str(basename) + "_merge_annot.vcf" + filepath = os.path.join(os.path.dirname(self.file_out), filename) + with open(filepath, "w") as file_out: + file_out.write(vcf_header + "\n") + processed_vcf.to_csv(filepath, sep="\t", index=False, header=True, mode="a") - ind_diff = [i for i in range(len(seq1)) if seq1[i] != seq2[i]] - if len(ind_diff) > 1: - print("There has been an issue, more than one difference between the seqs.") - return False - else: - return ind_diff[0] + export_vcf(vcf_table, consensus=True) + export_vcf(vcf_table, consensus=False) + return -def check_merge_codons(q_pos, fe_codon_ref, fe_codon_alt): - """ - Description: - Logic for determine if variant lines need to be collapsed into one determining - if they are consecutive and belong to the same codon. - Input: - qpos - list of positions. Ex. [4441, 4442, 4443] - fe_codon_ref - first position codon annotation for ref. Ex. "ATG" - fe_codon_alt - first position codon annotation for alt. Ex. "AGG" - Returns: - Returns num_collapse. Number of lines that need to be collapsed into one. - """ - # Are two positions in the queue consecutive? - # q_pos = [4441, 4442, 5067] - num_collapse = 0 - if check_consecutive(list(q_pos)) == 2: - ## If the first position is not on the third position of the codon they are in the same codon. - if get_diff_position(fe_codon_ref, fe_codon_alt) != 2: - num_collapse = 2 - else: - num_collapse = 1 - # Are the three positions in the queue consecutive? - # q_pos = [4441, 4442, 4443] - elif check_consecutive(list(q_pos)) == 3: - ## we check the first position in which codon position is to process it acordingly. - # If first position is in the first codon position all three positions belong to the same codon. - if get_diff_position(fe_codon_ref, fe_codon_alt) == 0: - num_collapse = 3 - # If first position is in the second codon position, we have the two first positions belonging to the same codon and the last one independent. - elif get_diff_position(fe_codon_ref, fe_codon_alt) == 1: - num_collapse = 2 - ## Finally if we have the first position in the last codon position, we write first position and left the remaining two to be evaluated in the next iteration. - elif get_diff_position(fe_codon_ref, fe_codon_alt) == 2: - num_collapse = 1 - # If no consecutive process only one line. - elif check_consecutive(list(q_pos)) == False: - num_collapse = 1 - - return num_collapse - - -def process_variants(variants, num_collapse): +def make_dir(path): """ Description: - The function set the variables acordingly to the lines to collapse do to consecutive variants. - Input: - variants - Dict with var lines. - num_collapse - number of lines to collapse [2,3] - Returns:: - Vars fixed: chrom, pos, id, ref, alt, qual, filter, info, format + Create directory if it doesn't exist. + Args: + path - path where the directory will be created. """ - # Collapsed variant parameters equal to first variant - key_list = ["chrom", "pos", "id", "qual", "filter", "info", "format"] - chrom, pos, id, qual, filter, info, format = [variants[next(iter(variants))][key] for key in key_list] - - # If no consecutive, process one variant line - # If two consecutive, process two variant lines into one - # If three consecutive process three variant lines and write one - ref = "" - alt = "" - iter_variants = iter(variants) - for _ in range(num_collapse): # fixed notation - var = next(iter_variants) - ref += variants[var]["ref"] - alt += variants[var]["alt"] - - return chrom, pos, id, ref, alt, qual, filter, info, format + if not len(path) == 0: + try: + os.makedirs(path) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + return def main(args=None): - # Process args args = parse_args(args) - - # Initialize vars - filename = os.path.splitext(args.file_in)[0] - out_dir = os.path.dirname(args.file_out) - var_list = [] # store variants - var_count_dict = {"SNP": 0, "INS": 0, "DEL": 0} # variant counts - variants = OrderedDict() # variant dict (merge codon) - q_pos = deque([], maxlen=3) # pos fifo queue (merge codon) - last_pos = "" - - # Create output directory - make_dir(out_dir) - - ############################## - ## Write vcf header to file ## - ############################## - write_vcf_header(args.fasta, args.ignore_strand_bias, args.file_out, filename) - - ################################# - ## Read and process input file ## - ################################# - with open(args.file_in, "r") as fin: - for line in fin: - if "REGION" not in line: - ################ - ## Parse line ## - ################ - ## format= - # [REF_DP, REF_RV, REF_QUAL, ALT_DP, ALT_RV, ALT_QUAL, ALT_FREQ] - write_line = True - ( - chrom, - pos, - id, - ref, - alt, - qual, - info, - format, - ref_codon, - alt_codon, - pass_test, - var_type, - ) = parse_ivar_line(line) - - ## If pos is duplicated due to annotation skip lines - if pos == last_pos: - continue - - last_pos = pos - ##################### - ## Process filters ## - ##################### - ## ivar fisher test - filter = "" - if ivar_filter(pass_test): - filter = ivar_filter(pass_test) - ## strand-bias fisher test - if not args.ignore_strand_bias: - if strand_bias_filter(format): - if filter: - filter += ";" + strand_bias_filter(format) - else: - filter = strand_bias_filter(format) - - if not filter: - filter = "PASS" - - ##################### - ## Filter variants ## - ##################### - if args.pass_only and filter != "PASS": - write_line = False - ### AF filtering. ALT_DP/(ALT_DP+REF_DP) - if float(format[3] / (format[0] + format[3])) < args.allele_freq_threshold: - write_line = False - ### Duplication filter - if (chrom, pos, ref, alt) in var_list: - write_line = False - else: - var_list.append((chrom, pos, ref, alt)) - - ############################################################ - ## MERGE_CODONS ## - ## Merge consecutive variants belonging to the same codon ## - ############################################################ - if not args.ignore_merge_codons and var_type == "SNP": - ## re-fill queue and dict accordingly - q_pos.append((pos, var_type)) # adding type information - variants[(chrom, pos, ref, alt)] = { - "chrom": chrom, - "pos": pos, - "id": id, - "ref": ref, - "alt": alt, - "qual": qual, - "filter": filter, - "info": info, - "format": format, - "ref_codon": ref_codon, - "alt_codon": alt_codon, - } - - if len(q_pos) == q_pos.maxlen: - fe_codon_ref = variants[next(iter(variants))]["ref_codon"] - fe_codon_alt = variants[next(iter(variants))]["alt_codon"] - num_collapse = check_merge_codons(q_pos, fe_codon_ref, fe_codon_alt) - ( - chrom, - pos, - id, - ref, - alt, - qual, - filter, - info, - format, - ) = process_variants(variants, num_collapse) - - ## Empty variants dict and queue accordingly - for _ in range(num_collapse): - variants.popitem(last=False) - q_pos.popleft() - else: - write_line = False - - ############################## - ## Write output to vcf file ## - ############################## - if write_line: - var_count_dict[var_type] += 1 - write_vcf_line( - chrom, - pos, - id, - ref, - alt, - filter, - qual, - info, - format, - args.file_out, - ) - - if not args.ignore_merge_codons: - ####################### - ## handle last lines ## - ####################### - while len(q_pos) > 0: - try: - fe_codon_ref = variants[next(iter(variants))]["ref_codon"] - fe_codon_alt = variants[next(iter(variants))]["alt_codon"] - except StopIteration: - break - else: - num_collapse = check_merge_codons(q_pos, fe_codon_ref, fe_codon_alt) - (chrom, pos, id, ref, alt, qual, filter, info, format) = process_variants(variants, num_collapse) - - var_count_dict[q_pos[0][1]] += 1 - write_vcf_line(chrom, pos, id, ref, alt, filter, qual, info, format, args.file_out) - ## Empty variants dict and queue accordingly - for _ in range(num_collapse): - variants.popitem(last=False) - q_pos.popleft() - - ############################################# - ## variant counts to pass to MultiQC ## - ############################################# - var_count_list = [(k, str(v)) for k, v in sorted(var_count_dict.items())] - - # format output table a little more cleanly - # row_spacing = len(filename) - - row = create_f_string(30, "<") # an arbitraily long value to fit most sample names - row += create_f_string(10) * len(var_count_list) # A spacing of ten looks pretty - - headers = ["sample"] - headers.extend([x[0] for x in var_count_list]) - data = [filename] - data.extend([x[1] for x in var_count_list]) - print(row.format(*headers)) - print(row.format(*data)) - - -def create_f_string(str_size, placement="^"): - row_size = "{: " + placement + str(str_size) + "}" - return row_size + ivar_to_vcf = IvarVariants( + file_in=args.file_in, + file_out=args.file_out, + pass_only=args.pass_only, + freq_threshold=args.allele_freq_threshold, + bad_qual_threshold=args.bad_quality_threshold, + merge_af_threshold=args.merge_af_threshold, + consensus_af=args.consensus_af, + ignore_stbias=args.ignore_strand_bias, + ignore_merge=args.ignore_merge_codons, + fasta=args.fasta, + ) + ivar_to_vcf.write_vcf() + return if __name__ == "__main__":