-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpostecolityping.py
executable file
·271 lines (227 loc) · 11.5 KB
/
postecolityping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
## postecolityping.py
## Ecoli related analysis
## Parses ecolityping.sh results to generate an assessment report.
import argparse
import os
import time
import datetime
import json
parser = argparse.ArgumentParser(description='postecolityping.py')
parser.add_argument("-i","--sampleid", help='ecoli1')
parser.add_argument("-d", "--indir")
parser.add_argument("-stbit", help="ST string", default='ST:NA,NA')
args=parser.parse_args()
# Functions
def get_rundir(indir) -> str:
"""This function gets the rundir from a dir path."""
if indir == "": # Check if the indir is an empty string
return "" # Return an empty string if the input is empty
rundir = os.path.basename(indir)
return rundir
def get_folder_creation_date(folder_path):
"""This function gets the creation date of a folder."""
if folder_path == "": # Check if the folder path is an empty string
return "" # Return an empty string if the folder path is empty
if not os.path.exists(folder_path):
raise FileNotFoundError(f"The folder path does not exist: {folder_path}")
creation_time = os.path.getctime(folder_path)
creation_date = datetime.datetime.fromtimestamp(creation_time)
formatted_date = str(creation_date.strftime('%y%m%d'))
return formatted_date
def convert_number_to_date(number):
"""This function converts a number to a date from 221117 to 2022-11-17."""
# Ensure the number is a string and starts with a valid numeric date
number_str = str(number)
try:
if len(number_str) < 6 or not number_str.isdigit(): # Ensure it's at least 6 digits and all digits for correct date format
raise ValueError(f"Invalid input format for date conversion: {number_str}")
year = int(number_str[:2]) # First two characters should represent the year
month = int(number_str[2:4]) # Next two characters should represent the month
day = int(number_str[4:6]) # Last two characters should represent the day
# Create a date string in the "YYYY-MM-DD" format
date_string = f"20{year:02d}-{month:02d}-{day:02d}"
return date_string
except Exception as e:
# If an error occurs, return an empty string
print(f"Error in date conversion: {e}. Returning empty string.")
return ""
def get_wgs_date_and_number(rundir):
"""This function separates the rundir into date and experiment_name/wgsnumber."""
# Default to current date if there's an error
current_date = datetime.datetime.today().strftime('%Y-%m-%d') # Get today's date in YYYY-MM-DD format
if rundir == "":
return current_date, "" # If rundir is empty, return today's date and empty wgsnumber
if "_" in rundir:
rundir_list = rundir.split("_") # Split rundir into parts
# Try to extract the date from the first part (assuming it's in YYMMDD format)
wgsdate = convert_number_to_date(rundir_list[0]) # 231006 -> 2023-10-06
# If date conversion fails (empty string), use the current date
if wgsdate == "":
print(f"Invalid date format in rundir: {rundir_list[0]}. Using current date: {current_date}")
wgsdate = current_date
# Extract wgsnumber from the appropriate parts of rundir
wgsnumber = "_".join(rundir_list[3:6]) # e.g., N_WGS_743
else:
# If rundir doesn't have underscores, assume it's a single value
wgsnumber = rundir
# Try to get folder creation date (falling back to current date if needed)
date = get_folder_creation_date(rundir)
wgsdate = convert_number_to_date(date)
# If folder creation date isn't valid, fallback to current date
if wgsdate == "":
print(f"Invalid folder creation date for rundir: {rundir}. Using current date: {current_date}")
wgsdate = current_date
return wgsdate, wgsnumber
def print_header_to_output(OUTFILE):
header="isolate\twzx\twzy\tfliC\tOH\tstx\teae\tehx\tother\twgsrun\twgsdate\tST\tSTgenes\tverbose\n"
outfile=open(OUTFILE, 'w')
print(header, end='', file=outfile)
return header
def write_to_json(csv_data, json_outfile):
"""Write to json file
"""
with open(json_outfile, "w") as jsonout:
json.dump(csv_data, jsonout)
# Main
# KMA results processing
SPECOLIFBIDIR=os.path.join(args.indir, args.sampleid, "sp_ecoli_fbi") # KMA results
OUTFILE=os.path.join(args.indir, args.sampleid, f"{args.sampleid}.tsv")
rundir = get_rundir(args.indir)
wgsdate, wgsnumber = get_wgs_date_and_number(rundir)
ST_list=args.stbit.split(",") # ['ST:11', 'adk:12', 'fumC:12', 'gyrB:8', 'icd:12', 'mdh:15', 'purA:2', 'recA:2']
ST=ST_list.pop(0).split(":")[1] # 11
STgenes=",".join(ST_list) # adk:12,fumC:12,gyrB:8,icd:12,mdh:15,purA:2,recA:2
OUTFILE=os.path.join(args.indir, args.sampleid, f"{args.sampleid}.tsv")
header = print_header_to_output(OUTFILE)
# Settings
fliflag="false"
locations={"OH": 4, "stx":5, "wzx":1, "wzy":2, "wzt":1, "wzm":2, "fliC":3, "fli":3, "eae": 6, "ehxA":7, "wgsrun":8, "wgsdate":9, "ST": 10, "STgenes":11, "other":12}
thresholds={"stx":[98,98], "wzx":[98,98], "wzy":[98,98], "wzt":[98,98], "wzm":[98,98], "fliC":[90,90], "fli":[90,90], "eae": [95,95], "ehxA":[95,95], "other":[98,98]}
stxbadthreshold=[30,90]
# Processing of .res file
hitdict={}
hitdict[args.sampleid]={}
res_file=open(os.path.join(SPECOLIFBIDIR, "colipost.res"), 'r')
for line in res_file:
if line.startswith("#"):
continue
#line[0]=the specific hit, line[9]=pc_idt for the hit to the ref, line[18]=the mutation of the specific line
line=line.split("\t")
template=line[0]
template_cov=line[5]
query_ident=line[6]
gene_list=template.split("__")
gene_name=gene_list[1]
serotype_or_virulence=gene_list[2]
if gene_name.startswith("stx"):
gene_name="stx"
if not gene_name in hitdict[args.sampleid]:
hitdict[args.sampleid][gene_name]=[]#{"lencov":lencov, "cov":line[5], "SNP":0, "DEL":0, "INS":0, ".":0}
hit_results_list=[serotype_or_virulence, template_cov, query_ident]
hitdict[args.sampleid][gene_name].append(hit_results_list)
#print('HITDICT',hitdict)
# Processing of hits
results_dict={}
for args.sampleid in hitdict:
foundgoodstx="false"
results_dict[args.sampleid]=[[],[],[],[],[],[],[],[],[],[],[],[],[],[]]# Remember to increase this if "locations" is made longer [[], [], [],[],[],[],[],[],[],[],[],,[],[][]]
Opick="-"
# First the mapped hits are parsed and types are decided upon based on a logic from KNJ and SSC
for gene_name in hitdict[args.sampleid]:
for hit_results_list in hitdict[args.sampleid][gene_name]:
serotype_or_virulence=hit_results_list[0]
template_cov=hit_results_list[1]
query_ident=hit_results_list[2]
if gene_name.startswith("fli") and not "fliC" in gene_name and fliflag=="false":
results_dict[args.sampleid][locations["fliC"]]=[]
fliflag="true"
if not gene_name in thresholds:
gene_name="other"
if fliflag=="true":
gene_name="fli"
if "stx" in gene_name:
serotype_or_virulence=serotype_or_virulence.lower()
if "100.00" in template_cov and "100.00" in query_ident:
if "wzt" in gene_name or "wzm" in gene_name or fliflag=="true":
results_dict[args.sampleid][-1].append(":".join(["_".join([gene_name, serotype_or_virulence]), template_cov.lstrip(), query_ident.lstrip()]))
if gene_name.startswith("e"):
serotype_or_virulence="positive"
if gene_name.startswith("stx"):
foundgoodstx="true"
elif float(template_cov)> thresholds[gene_name][0] and float(query_ident)>thresholds[gene_name][1]:
if gene_name.startswith("wz") or fliflag=="true":
results_dict[args.sampleid][-1].append(":".join(["_".join([gene_name, serotype_or_virulence]), template_cov.lstrip(), query_ident.lstrip()]))
else:
results_dict[args.sampleid][-1].append(":".join([serotype_or_virulence, template_cov.lstrip(), query_ident.lstrip()]))
if gene_name.startswith("e"):
serotype_or_virulence="positive"
if gene_name.startswith("stx"):
foundgoodstx="true"
elif gene_name.startswith("stx") and float(template_cov)>stxbadthreshold[0] and float(query_ident)>stxbadthreshold[1]:
results_dict[args.sampleid][-1].append(":".join([serotype_or_virulence, template_cov.lstrip(), query_ident.lstrip()]))
serotype_or_virulence=serotype_or_virulence[:4]
else:
if gene_name.startswith("wz"):
results_dict[args.sampleid][-1].append(":".join(["_".join([gene_name, serotype_or_virulence]), template_cov.lstrip(), query_ident.lstrip()]))
else:
results_dict[args.sampleid][-1].append(":".join([serotype_or_virulence, template_cov.lstrip(), query_ident.lstrip()]))
continue
if "fliC" in gene_name and fliflag=="true":
continue
if not serotype_or_virulence in results_dict[args.sampleid][locations[gene_name]]:
results_dict[args.sampleid][locations[gene_name]].append(serotype_or_virulence)
#print(foundgoodstx)
if foundgoodstx=="true":
for stxcase in results_dict[args.sampleid][locations["stx"]]:
if len(stxcase)==4:
results_dict[args.sampleid][locations["stx"]].pop(results_dict[args.sampleid][locations["stx"]].index(stxcase))
results_dict[args.sampleid][locations["stx"]]="; ".join(results_dict[args.sampleid][locations["stx"]]).replace("-", "")
for gene in results_dict[args.sampleid]:
if type(gene)==list:
results_dict[args.sampleid][results_dict[args.sampleid].index(gene)]=":".join(gene)
if len(results_dict[args.sampleid][locations["wzx"]]) > 1 and not "*" in results_dict[args.sampleid][locations["wzx"]]:
Opick=results_dict[args.sampleid][locations["wzx"]]
if len(results_dict[args.sampleid][locations["wzy"]]) > 1 and not "*" in results_dict[args.sampleid][locations["wzy"]]:
Opick=results_dict[args.sampleid][locations["wzy"]]
if len(results_dict[args.sampleid][locations["fliC"]]) < 2:
results_dict[args.sampleid][locations["fliC"]]="-"
results_dict[args.sampleid][locations["OH"]]=":".join([Opick, results_dict[args.sampleid][locations["fliC"]]])
# Here the results_dict is curated with dashes
for element in results_dict[args.sampleid]:
if len(element)<2:
results_dict[args.sampleid][results_dict[args.sampleid].index(element)]="-"
results_dict[args.sampleid].pop(0)
results_dict[args.sampleid][locations["wgsrun"]]=wgsnumber
#print(wgsnumber)
results_dict[args.sampleid][locations["wgsdate"]]=wgsdate
#print(results_dict)
# Here the results_dict is curated with dashes
for args.sampleid in results_dict.keys():
outfile=open(OUTFILE, 'w')
results_dict[args.sampleid][locations["ST"]]=ST # as defined on line 34 KRKI 02-05-2019
results_dict[args.sampleid][locations["STgenes"]]=STgenes # as defined on line 35 KRKI 02-05-2019
lineelements=[]
for element in results_dict[args.sampleid]:
if type(element) is dict:
for gene in ["O", "H", "stx", "eae", "ehx"]:
if gene in element:
lineelements.append(";".join(element[gene]))
elif len(element)<1:
#print('element', element)
lineelements.append("-")
else:
lineelements.append(element)
# Print results to outfile
print("".join([header, "\t".join([args.sampleid, "\t".join(lineelements)])]), file=outfile, end='')
# Turn the data into a dictionary
keys = header.strip().split('\t')
csv_data = {key: None for key in keys}
lineelements.insert(0, args.sampleid)
csv_data = dict(zip(keys, lineelements))
#print(csv_data)
json_outfile=os.path.join(os.path.join(args.indir, args.sampleid, f"{args.sampleid}.json"))
write_to_json(csv_data, json_outfile)
if __name__ == "__main__":
file = "/home/projects/fvst_ssi_dtu/apps/sofi_bifrost_dev/scripts/bifrost/components/bifrost_sp_ecoli/bifrost_sp_ecoli/ecoli_fbi/postecolityping.py"
print(f"file string : {file}")