-
Notifications
You must be signed in to change notification settings - Fork 0
/
paf_plotter.py
114 lines (111 loc) · 7.68 KB
/
paf_plotter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def pafplotter(paffile = None, \
querychr = None, \
mapping = None, \
lengthplot = "FALSE", \
catplot = "FALSE", \
mapping_quality_plot = "FALSE", \
scatterplot = "FALSE"):
# Universitat Potsdam
# Author Gaurav Sablok
# date: 2024-2-28
""""
a pafplotter function which will allow you to plot many analysis
and summary stats from the paf alignment files for the graphs. It
analyses the paf alignment files faster and you can chunck the indiviual
block of the code as the separate functions.
"""
import pandas as pd
import seaborn as sns
sns.set_theme()
if paffile is not None and querychr is None:
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
return pafdataframe
if paffile and querychr is not None:
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
selectedpafdataframe = pafdataframe.where(pafdataframe["query"] == querychr).dropna()
return selectedpafdataframe
if paffile and mapping is not None:
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
selectedpafdataframe = pafdataframe.where(pafdataframe["mapping_quality"] == mapping).dropna()
return selectedpafdataframe
if paffile is not None and querychr is not None and lengthplot == "TRUE":
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
selectedpafdataframe = pafdataframe.where(pafdataframe["query"] == querychr).dropna()
pd.DataFrame(pafdataframe.where(pafdataframe["query"] == querychr).dropna()["query_end"].apply(lambda num: int(num)) - \
pafdataframe.where(pafdataframe["query"] == querychr).dropna()["query_start"].apply(lambda num: int(num)), \
columns = ["plot"]).plot(kind = "bar")
return selectedpafdataframe
if paffile is not None and querychr is not None and catplot == "TRUE":
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
selectedpafdataframe = pafdataframe.where(pafdataframe["query"] == querychr).dropna()
catcorrdinates = pd.DataFrame(pafdataframe.where(pafdataframe["query"] == querychr).dropna()["query_end"].apply(lambda num: int(num)) - \
pafdataframe.where(pafdataframe["query"] == querychr).dropna()["query_start"].apply(lambda num: int(num)), \
columns = ["plot"])
return sns.catplot(catcorrdinates)
if paffile is not None and mapping_quality_plot == "TRUE":
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
return sns.catplot(pafdataframe, kind = "box", x="residue_matches", y="alignment_length", height = 10)
if paffile is not None and scatterplot == "TRUE":
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paffile)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
return sns.scatterplot(pafdataframe, x="query", y="mapping_quality")
def locationfetch(paffile,querychr):
""""
a location fetcher for the paf alignment as a
list with the coordinates. Here you can provide
the query of the alignment file and it will give you
a list of the fectched queries which you can directly
pass to the streamlit application
"""
if paffile is not None and querychr is not None:
paf = paffile
query = querychr
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paf)],
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
querycorrdinates = list(pafdataframe.where(pafdataframe["query"] == querychr).dropna().iloc())
return querycorrdinates
def filterstrand(paffile, strand):
""""
fetching the alignment length on the query strand
for the specific strand. It takes the stand as the
str and then locates across the dataframe and then
map the int to the str conversion.
"""
if paffile is not None and strand is not None:
paf = paffile
querystrand = str(strand)
pafdataframe = pd.DataFrame([line.strip().split("\t")[0:12] for line in open(paf)], \
columns=["query", "query_length", "query_start", "query_end", "strand", "target", \
"target_length", "target_start", "target_end", "residue_matches",
"alignment_length", "mapping_quality"])
start = list(map(lambda num: int(num),list(pafdataframe. \
where(pafdataframe["strand"] == "+").dropna()["query_start"])))
end = list(map(lambda num: int(num),list(pafdataframe. \
where(pafdataframe["strand"] == "+").dropna()["query_end"])))
querystrandalign = []
for i in range(len(start)):
querystrandalign.append(end[i] - start[i])
querylength = pd.DataFrame(querystrandalign, columns = ["alignedlength"]).plot(kind = "bar")
return querylength