-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsplit_compound_indels.py
137 lines (123 loc) · 7.13 KB
/
split_compound_indels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python2.7
# encoding: utf-8
"""
split_compound_indels.py
Created by Matthew Wakefield on 2013-10-17.
Copyright (c) 2013 Matthew Wakefield and The University of Melbourne. All rights reserved.
"""
from __future__ import print_function, unicode_literals, division
import sys, os
import re
from io import StringIO
import unittest
import argparse
__author__ = "Matthew Wakefield"
__copyright__ = "Copyright 2013, Matthew Wakefield and The University of Melbourne"
__credits__ = ["Matthew Wakefield",]
__license__ = "GPL"
__version__ = "0.1"
__maintainer__ = "Matthew Wakefield"
__email__ = "wakefield@wehi.edu.au"
__status__ = "Development"
def indel_lines(mpileupfile):
inputlines = 0
outputlines = 0
for line in mpileupfile:
inputlines +=1
if int(line.split()[3]) != len(line.split()[4])-(line.count('$')+(line.count('^')*2)):
outputlines += 1
yield line.strip('\n')
print('Processed {0} indel containing positions from a total of {1} positions'.format(outputlines,inputlines), file=sys.stderr)
def split_out_indels(pileupstring, errorstring):
snpstring = ''
snperrors = ''
indelstring = ''
indelerrors = ''
pileupstring_position = 0
errorstring_position = 0
while pileupstring_position < len(pileupstring):
if pileupstring[pileupstring_position] in '.,*':
snpstring += pileupstring[pileupstring_position]
snperrors += errorstring[errorstring_position]
indelstring += pileupstring[pileupstring_position]
indelerrors += errorstring[errorstring_position]
pileupstring_position += 1
errorstring_position += 1
elif pileupstring[pileupstring_position] == '$':
snpstring += pileupstring[pileupstring_position]
indelstring += pileupstring[pileupstring_position]
pileupstring_position += 1
elif pileupstring[pileupstring_position] == '^':
snpstring += pileupstring[pileupstring_position:pileupstring_position+2]
indelstring += pileupstring[pileupstring_position:pileupstring_position+2]
pileupstring_position += 2
elif pileupstring[pileupstring_position] == '-':
feature_size = int(re.match(r"\d+",pileupstring[pileupstring_position+1:]).group(0))
description_length = 1+len(str(feature_size))+feature_size # eg -3CGG = 1+1+3 = 5
indelstring += pileupstring[pileupstring_position:pileupstring_position+description_length]
pileupstring_position += description_length
elif pileupstring[pileupstring_position] == '+':
feature_size = int(re.match(r"\d+",pileupstring[pileupstring_position+1:]).group(0))
description_length = 1+len(str(feature_size))+feature_size # eg -3CGG = 1+1+3 = 5
indelstring += pileupstring[pileupstring_position:pileupstring_position+description_length]
indelerrors += errorstring[errorstring_position:errorstring_position+feature_size]
pileupstring_position += description_length
else:
snpstring += pileupstring[pileupstring_position]
snperrors += errorstring[errorstring_position]
indelstring += '.'
indelerrors += errorstring[errorstring_position]
pileupstring_position += 1
errorstring_position += 1
return (snpstring, snperrors), (indelstring, indelerrors)
def command_line_interface(*args,**kw):
parser = argparse.ArgumentParser(description='A script for to split positions with compound \
indels and SNVs into separate entries.\
Takes a single sample mpileup file and produces \
a mpileup file with two entries for each indel \
containing position - one with only SNVs and one \
with only indels. This can be used with single \
position indel callers to produce either SNV or \
indel calls.\
This script may have unintended consequences on \
calling and should ONLY be used as a secondary \
screen to rescue missed compound mutations.')
parser.add_argument('mpileup',
type=argparse.FileType('U'),
default=sys.stdin,
help='a single sample mpileup or pileup file')
parser.add_argument('--output',
type=argparse.FileType('w'),
default=sys.stdout,
help='a mpileup file containing a SNP and indel entry for each indel position')
return parser.parse_args(*args,**kw)
def main():
args = command_line_interface()
for line in indel_lines(args.mpileup):
chromosome, position, reference, readcount, bases, qualities = line.split('\t')
(snpstring, snperrors), (indelstring, indelerrors) = split_out_indels(bases, qualities)
print(chromosome, position, reference, readcount, snpstring, snperrors, sep='\t', file=args.output)
print(chromosome, position, reference, readcount, indelstring, indelerrors, sep='\t', file=args.output)
class test_split_compound_indels(unittest.TestCase):
def setUp(self):
self.mpileupfile = StringIO(''.join(["1\t27023256\tC\t35\t...........T-3CGG.................,,.,,,,\tFGG@GFEGGGGGGGGGGGGGGGGEGGGGFCGDFFF\n",
"1\t27023257\tC\t35\t....$......*.................,,.,,,,\tGGFEGGGGGGGGGGGGGGGGGGGEGGGGFAGAA>C\n",
"1\t27023258\tG\t34\t.........*.................,,.,,,,\tGG?FGGEG?GEGGGGGGGGGGGEGEGGFFCFDDD\n",
"1\t27023259\tG\t34\t..^I.......*.................,,.,,,,\t?G>GGGGGEGFGFGGGGGGGCG?EDEGFAGB=@9\n",
])
)
pass
def test_indel_lines(self):
self.assertEqual(list(indel_lines(self.mpileupfile)),['1\t27023256\tC\t35\t...........T-3CGG.................,,.,,,,\tFGG@GFEGGGGGGGGGGGGGGGGEGGGGFCGDFFF\n'])
pass
def test_split_out_indels(self):
pileupstring = "...........T-3CGG.................,,.,,,,"
errorstring = "ABCDEFGHIJKL MNOPQRSTUVWXYZ!@#$%^&*()".replace(' ','') #spaces only for clarity
snpstring = "...........T.................,,.,,,,"
snperrors = "ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*()".replace(' ','') #spaces only for clarity
indelstring = "............-3CGG.................,,.,,,,"
indelerrors = "ABCDEFGHIJKL MNOPQRSTUVWXYZ!@#$%^&*()".replace(' ','') #spaces only for clarity
self.assertEqual(split_out_indels(pileupstring, errorstring), ((snpstring, snperrors), (indelstring, indelerrors)))
if __name__ == '__main__':
#unittest.main()
main()