-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_filtered_kmers.py
executable file
·328 lines (233 loc) · 13.6 KB
/
get_filtered_kmers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
#!/usr/bin/python3
'''
Expected arguments:
- k (kmer length)
- fastalist (input file with
'''
import argparse
import re
import subprocess
import os
import sys # Replace with logging!
import logging as logging
import hashlib
import ksnpCache as kcache
import shutil
tab = '\t'
newline = '\n'
##################################
##################################
#
# FUNCTIONS
#
##################################
##################################
def parseGetFilteredKmerArgs( override = None ):
parser = argparse.ArgumentParser(description='output a fileName2genomeName file and corresponding data files')
parser.add_argument('fastalist', help='The list of fasta files used as input to kSNP')
parser.add_argument('mergeFastaReads', help='Command to merge contigs into a single fasta file')
parser.add_argument('k', help='The kmer length for this run')
parser.add_argument('jellyfish', help='The path to the jellyfish program to use for this run')
parser.add_argument('hashSize', help='The hash size to pass to the jellyfish program')
parser.add_argument('numCpus', help='The number of CPUs to use for the jellyfish run.')
parser.add_argument('fileToGenome', help='The filename of the fileName2genomeName output')
parser.add_argument('freqCheck', help='The filename of the program to calculate count frequencies inline')
parser.add_argument('outputPrefix', help='The prefix to use for output files, if any', default = 'fsplit')
parser.add_argument('kmerSuffix', help='The suffix to use for files cotaining kmer frequency counts', default = 'kmer')
parser.add_argument('jellyfishSuffix', help='The suffix to use for files containing jellyfish database data', default= 'Jelly')
parser.add_argument('filterSuffix', help='The suffix to use for filtered kmer frequency counts', default= 'filtered')
parser.add_argument('--debug', action='store_true', help='Output diagnostic data to the screen using STDERR')
parser.add_argument('--info', action='store_true', help='Output progress updates to the screen using STDERR')
if override is None:
return parser.parse_args()
else:
return parser.parse_args(override)
def parseFastaInputFiles(fileToGenomeFilename,fastalistFilename, inputFilenamePrefix, jellyfishFilenameSuffix, kmerFilenameSuffix, k):
'''
Given input filename, and information about the expected filenames, create the
fileName2genomeName file and set the names for the temporary (in-processing)
per-gene files. Return a list of input files and important details about them.
'''
inputLineCounter = 0
# List of lists, each element is the number, genome, and filename for an input file)
inputFiles = []
logging.debug("Parsing input file %s to generate %s and calculate filenames", fastalistFilename, fileToGenomeFilename)
fileToGenomeFile = open(fileToGenomeFilename, 'w')
# Assume this is manageably small. Typically 100 entries would be considered huge.
inputLines = open(fastalistFilename, 'r').readlines()
for line in inputLines:
# For each line in the input list of fasta files...
(filename, genome) = line.split(tab)
countStr = str(inputLineCounter)
thisRecord = {
'count': countStr,
'genome': genome.strip(),
'inputFile': filename,
'tempFile': inputFilenamePrefix + countStr,
'jellyfishDB': inputFilenamePrefix + countStr + '.' + options.jellyfishSuffix,
'kmers': inputFilenamePrefix + countStr + '.' + options.kmerSuffix,
'kmersFiltered': options.kmerSuffix + '.' + inputFilenamePrefix + countStr, # Set by expectations of the rest of the parent script
}
# I would prefer the kmersFiltered name be as defined below to more accurately represent origin / processing.
# 'kmersFiltered': inputFilenamePrefix + countStr + '.' + options.kmerSuffix + '.' + options.filterSuffix,
# Even better would be:
# 'kmersFiltered': '.'.join([genome.strip(), checksum[-10:], options.kmerSuffix, options.filterSuffix])
fileToGenomeFile.write(thisRecord['tempFile'] + tab + thisRecord['genome'] + newline)
# add this entry to our list of files to process
inputFiles.append(thisRecord)
inputLineCounter = inputLineCounter + 1
fileToGenomeFile.close()
logging.info("Number of input sequences: %s", len(inputFiles))
return inputFiles
def convertInputFiles(inputFileList, mergeFastaReadsFilename):
for record in inputFileList:
print(tab.join([record['count'], record['genome'], record['inputFile']]))
outputFileHandle = open(record['tempFile'], 'wb')
subprocess.run([mergeFastaReadsFilename, record['inputFile']], stdout=outputFileHandle)
outputFileHandle.close()
def runJellyfishDump(nextFile, kmers, jellyfishFilename, freqcheckFilename):
processes = []
allPendingProcesses = []
if os.access(nextFile, mode=os.F_OK):
jellyfishDump = subprocess.Popen([
jellyfishFilename, 'dump', '-c', nextFile], stdout = subprocess.PIPE)
freqCheck = subprocess.Popen([
freqcheckFilename, kmers, ], stdin=jellyfishDump.stdout, stdout=subprocess.PIPE)
# Permit SIGPIPE to reach jellyfish dump process by ensuring that we don't hold an open
# file handle to the pipe in question.
jellyfishDump.stdout.close()
processes = freqCheck
allPendingProcesses = ([jellyfishDump, freqCheck])
# Join STDOUT of jellyfishDump to STDIN of freqCheck, and capture STDOUT of freqCheck.
return ( processes, allPendingProcesses )
def processJellyfishDumps(inputFiles, jellyfishFilename, freqcheckFilename, ):
# Now we grab frequency data from these databases.
# Note that we run 'dump' in the background, so if there are a v. large number of input files,
# this could get exciting. (but probably not; we'll just go a little slower due to resource
# contention, but still be much faster because we're letting the OS manage it, rather than one-
# by-one)
allPendingProcesses = []
for file in inputFiles:
logging.debug("Looking for dump files for %s", file['genome'])
nextFile = file['jellyfishDB']
# We may need to do a jellyfish merge in this case. First we find the relevant filenames:
if os.access(nextFile, mode=os.F_OK):
filesToMerge = [nextFile,]
else:
filesToMerge = []
# We test the files including the .Jelly suffix, .Jelly_0, and so on.
dbFileCount = 0
nextFile = file['jellyfishDB']+ '_' + str(dbFileCount)
while os.access(nextFile, mode=os.F_OK):
filesToMerge.append(nextFile)
dbFileCount = dbFileCount+1
nextFile = file['jellyfishDB']+ '_' + str(dbFileCount)
processes = []
logging.debug("Processing (dump) %s", file['jellyfishDB'])
# First, test for and process the singular, un-suffixed jellyfish output file.
if len(filesToMerge) > 1:
# We need to merge the dumps together to get a single set of counts
logging.debug("Merging multiple jellyfish dumps: %s", filesToMerge)
command=[ jellyfishFilename, 'merge', '-o', file['jellyfishDB'],]
command.extend(filesToMerge)
subprocess.run(command)
dumpfile = file['jellyfishDB']
else:
dumpfile = filesToMerge[0]
logging.debug("Running jellyfish dump on %s (1/1 jellyfish dump for genome %s)", dumpfile, file['genome'])
(newProcesses, newPendingProcesses) = runJellyfishDump(dumpfile, file['kmers'], jellyfishFilename, freqcheckFilename)
processes.append(newProcesses)
allPendingProcesses.extend(newPendingProcesses)
file['freqProcesses'] = processes
# Wait for all of our dumpers and frequency calculators to finish
logging.debug("Waiting for dumpers and frequency checkers to finish")
# Busy wait for processes starting at 0.
while len(allPendingProcesses) > 0:
# This polling is ONLY okay because we know the output from inline_frequency_check is
# v. tiny (integer representation smaller than size of the source file), and won't fill
# the pipeline. OTHERWISE, we would need to find a way to grab the data so the process
# doesn't wait to output while we wait for it to quit (deadlock).
pollResult = allPendingProcesses[0].poll()
if pollResult is not None:
allPendingProcesses.pop(0)
logging.debug('a process exited with exit code %s, %s processes remaining to check', pollResult, len(allPendingProcesses))
if pollResult != 0:
raise RuntimeError ("Subprocess exited with nonzero exit code ERROR!")
logging.debug("Dumpers and statistics done")
# Grab frequency data from per-process output.
for file in inputFiles:
logging.debug('Processing frequency data for %s', file['tempFile'])
# Grab the frequency cutoff from the first subprocess, ignoring stderr.
processes = file['freqProcesses']
# This is the interface between the inline_freq_check.py program and this one.
# Currently space separated integers. The first is the minFreq, and the second
# is the total number of kmers.
logging.debug("processes: %s\nallPendingProcesses: %s", processes, allPendingProcesses)
statistics = processes.pop().communicate()[0].split(b" ")
file['minFreq'] = int(statistics[0])
# Note that length may change if minFreq != 1.
file['length'] = int(statistics[1])
# Then clean up any remaining processes for this file (shouldn't be any)
for process in processes:
process.communicate()
print(file['tempFile'] + tab + str(file['minFreq']))
# The only data relevant in this data structure is 'minFreq', but this is easy.
return inputFiles
def generateJellyfishDumps(inputFileList, jellyfishFilename, k, hashSize, numCPUs):
'''
Process all of the input files to get lists of kmers of all frequencies.
'''
# Jellyfish is pretty resource intensive; run these processes one-at-a-time.
# Further investigation suggests that for moderate sized inputs (~200M), peak resource
# usage is only for the first 10 seconds or so... So can we run multiple at the same time?
for file in inputFileList:
logging.debug("Processing (jellyfish) %s", file['jellyfishDB'])
# Note that this output is e.g. file0.Jelly, or file0.Jelly_<n>
# Also note that we wait for each jellyfish count to finish before
# starting the next.
logging.debug("Running jellyfish: %s", " ".join([jellyfishFilename, 'count', '-C', '-o', file['jellyfishDB'], '-m', k, '-s', hashSize, '-t', numCPUs, file['tempFile'] ]))
subprocess.run([
jellyfishFilename,
'count', '-C', '-o', file['jellyfishDB'], '-m', k, '-s', hashSize, '-t', numCPUs, file['tempFile'] ])
##################################
##################################
#
# MAIN
#
##################################
##################################
if __name__ == "__main__":
uncachedFiles = []
options = parseGetFilteredKmerArgs()
if options.debug:
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
elif options.info:
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
else:
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING)
logging.debug('Commandline options, as parsed: %s', str(options))
logging.info('Parsing input fasta file')
inputFiles = parseFastaInputFiles(options.fileToGenome,options.fastalist, options.outputPrefix, options.jellyfishSuffix, options.kmerSuffix, options.k)
uncachedFiles = inputFiles
if len(uncachedFiles) > 0:
# Convert input files into format appropriate for kSNP processing.
logging.info('Converting %s input files without cached data into canonical kSNP input', len(uncachedFiles))
convertInputFiles(uncachedFiles, options.mergeFastaReads)
# Run Jellyfish on the input files to produce kmers list files
logging.info("Running jellyfish to find kmers for %s input files", len(uncachedFiles))
generateJellyfishDumps(uncachedFiles, options.jellyfish, options.k, options.hashSize, options.numCpus)
dumpFiles = processJellyfishDumps(uncachedFiles, options.jellyfish, options.freqCheck)
logging.debug("Filtering out minimum frequency kmers from %s files", len(dumpFiles))
for file in dumpFiles:
logging.debug("File: %s", file)
if file['minFreq'] != 1: #Only if there is any filtering to do:
logging.debug("Running awk to filter minimum frequency kmers (awk-ward) for %s", file['tempFile'])
kmersFiltered = open(file['kmersFiltered'], 'wb')
subprocess.run(['awk', '-v', 'm='+str(file['minFreq']), '$2>=m {print}', file['kmers']], stdout=kmersFiltered)
kmersFiltered.close()
else:
logging.debug("No filtering to do, renaming kmers file to filtered kmers filename for %s", file['tempFile'])
os.rename(file['kmers'],file['kmersFiltered'])
else: # We didn't find any uncached files
logging.info('Cached data found for all %s input files, using cached data only.', len(inputFiles))
sys.exit(0)