forked from Illumina/GTCtoVCF
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ReferenceGenome.py
180 lines (141 loc) · 5.49 KB
/
ReferenceGenome.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
from pysam import Fastafile
class ReferenceGenome(object):
"""
Class to read sequence data from a reference genome
Attributes:
genome_fasta_file (string): Path to reference genome file
fasta_file (pysam.Fastafile): Fastafile object for reference genome
"""
def __init__(self, genome_fasta_file, logger):
"""
Create new ReferenceGenome
Args:
genome_fasta_file (string): Path to whole genome FASTA file
logger (logging.Logger): Logger for reporting warnings/errors
Returns:
ReferenceGenome
"""
self._logger = logger
self.genome_fasta_file = genome_fasta_file
self._validate_reference_file()
try:
self._fasta_file = Fastafile(self.genome_fasta_file)
except:
raise IOError("Could not read genome file: " +
self.genome_fasta_file)
def _validate_reference_file(self):
"""
Check whether reference file is valid
Args:
genome_fasta_file (string): Path to genome FASTA file
Returns
bool: True if valid
"""
if not os.path.isfile(self.genome_fasta_file + ".fai"):
raise ValueError(
"Supplied genome FASTA file does not have FASTA index")
def get_contig_lengths(self):
"""
Get the names and lengths of all contigs in a references
Args:
None
Returns:
list(tuple(string,int)): Returns a list representing the name and lengths of all contigs in reference
"""
return zip(self._fasta_file.references, self._fasta_file.lengths)
def get_contig_order(self):
"""
Get a dictionary with the ordering of the contigs
Args:
None
Returns:
dict(string,int): Return a dictionary mapping from contig name to order of that contig
"""
result = {}
for (idx, contig) in enumerate(self._fasta_file.references):
result[contig] = idx
return result
def get_reference_bases(self, chrom, start, end):
"""
Get the reference bases from start to end
Args:
chrom (string): Chromsome to query
start (int): Start position to query
end (int): End position (not inclusive)
Returns:
string: The genome sequence
Raises:
ValueError - Invalid arguments
"""
if start >= end:
raise ValueError("Start/stop coordinates incorrect for: " +
str(chrom) + ":" + str(start) + "-" + str(end))
if chrom not in self._fasta_file:
raise ValueError(
"FASTA reference is missing entry for chromosome " + str(chrom))
return self._fasta_file.fetch(str(chrom), start, end)
class CachedReferenceGenome(object):
"""
Class to provide sequence data from a reference genome. All sequence
data will be cached into memory.
Attributes:
genome_fasta_file (string): Path to reference genome file
"""
def __init__(self, reference_genome, logger):
self._logger = logger
self._logger.info("Caching reference data")
self._cache = CachedReferenceGenome.generate_genome_cache(reference_genome)
self._logger.info("Finished caching reference data")
self.genome_fasta_file = reference_genome.genome_fasta_file
self._contig_order = reference_genome.get_contig_order()
@staticmethod
def generate_genome_cache(reference_genome):
"""
Get the reference bases from start to end
Args:
reference_genome (ReferenceGenome): A reference genome that provides the information to be cached
Returns:
dict(string,string): A dictionary that maps from contig name to bases for that contig
"""
result = {}
for (contig, contig_length) in reference_genome.get_contig_lengths():
result[contig] = reference_genome.get_reference_bases(contig, 0, contig_length)
return result
def get_contig_order(self):
"""
Get a dictionary with the ordering of the contigs
Args:
None
Returns:
dict(string,int): Return a dictionary mapping from contig name to order of that contig
"""
return self._contig_order
def get_contig_lengths(self):
"""
Get the names and lengths of all contigs in a references
Args:
None
Returns:
list(tuple(string,int)): Returns a list representing the name and lengths of all contigs in reference
"""
return [(contig, len(self._cache[contig])) for contig in self._cache]
def get_reference_bases(self, chrom, start, end):
"""
Get the reference bases from start to end
Args:
chrom (string): Chromsome to query
start (int): Start position to query
end (int): End position (not inclusive)
Returns:
string: The genome sequence
Raises:
ValueError - Invalid arguments
"""
if start >= end:
raise ValueError("Start/stop coordinates incorrect for: " +
str(chrom) + ":" + str(start) + "-" + str(end))
if chrom not in self._cache:
raise ValueError(
"Reference is missing entry for chromosome " + str(chrom))
return self._cache[str(chrom)][start:end]