-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgsp-ccdg-f3_vds-to-vcf.py
307 lines (235 loc) · 9.73 KB
/
gsp-ccdg-f3_vds-to-vcf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#!/usr/bin/env python3
__author__ = "rye"
import hail as hl
import logging
import argparse
from datetime import date
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_bucket(
data_type: str
) -> str:
"""
Return path prefix to desired CCDG bucket.
:param data_type: Whether data is from CCDG exomes or genomes
:return: Path prefix to CCDG bucket
"""
return "gs://fc-secure-9e3357c0-389c-41d7-94ee-56673db6b75f/" if data_type == "genomes" else "gs://fc-secure-7e69c896-d6c0-4a4e-8490-42cb2d4fdebf/"
def read_in_vds(
VDS: str
) -> hl.vds.VariantDataset:
"""
Read in and return specified VDS. Tries to handle legacy VDS representation.
:param VDS: Path to VDS of interest
:return: VDS of interest
"""
logger.info(f"Starting to read VDS...")
# Attempt the usual reading strategy
try:
vds = hl.vds.read_vds(VDS)
# If it failed, try Tim's solution for old VDS format
except:
logger.info(f"Usual VDS read function failed. Trying legacy read...")
def read_old_vds(path):
rd = hl.read_matrix_table(path + '/reference_data')
vd = hl.read_matrix_table(path + '/variant_data')
rd = rd.transmute_rows(ref_allele=rd.alleles[0])
return hl.vds.VariantDataset(rd, vd)
vds = read_old_vds(VDS)
return vds
def prepare_vds_to_mt(
vds: hl.vds.VariantDataset,
chr: str
) -> hl.MatrixTable:
"""
Filter VDS to chr, densify to MT, compute GT from LGT+LA and finally split
multiallelic sites.
:param vds: VDS to be used
:param chr: String representation of chromosome to be filtered to
:return: Densified MT filtered to chr with computed GT and split multi sites
"""
logger.info(f"Starting to prepare VDS (filtering to {chr})...")
# Filter VDS to chr
vds = hl.vds.filter_chromosomes(vds, keep = chr)
# Compute GT
vds.variant_data = vds.variant_data.annotate_entries(GT = hl.vds.lgt_to_gt(vds.variant_data.LGT, vds.variant_data.LA))
vds.variant_data = vds.variant_data.drop(vds.variant_data.LGT)
# Densify VDS
mt = hl.vds.to_dense_mt(vds)
# Split multiallelic sites
mt = hl.split_multi_hts(mt)
return mt
def prepare_mt(
mt: hl.MatrixTable,
data_type: str,
bucket: str
) -> hl.MatrixTable:
"""
Read in variant_qc and variant_info HTs to annotate MT with AS_VQSLOD and
AS_lowqual fields (respectively). Also annotate is_snv.
:param mt: MT to be annotated
:param data_type: Whether data is from CCDG exomes or genomes
:param bucket: Path prefix to either exomes or genomes buckets
:return: MT annotated with AS_VQSLOD, AS_lowqual, is_snv variant fields
"""
logger.info("Starting to annotate AS_VQSLOD, AS_lowqual, and is_snv...")
# Read in annotation tables
VARIANT_QC_HT = bucket + "ccdg_" + data_type + "_variant_qc_vqsr_alleleSpecificTrans_split.ht"
variant_qc_ht = hl.read_table(VARIANT_QC_HT)
VARIANT_INFO_HT = bucket + "ccdg_" + data_type + "_variant_info_split.ht"
variant_info_ht = hl.read_table(VARIANT_INFO_HT)
# Annotate AS_VQSLOD
mt = mt.annotate_rows(AS_VQSLOD = variant_qc_ht[mt.locus, mt.alleles].info.AS_VQSLOD)
# Annotate AS_lowqual
mt = mt.annotate_rows(AS_lowqual = variant_info_ht[mt.locus, mt.alleles].AS_lowqual)
# Annotate is_snv
mt = mt.annotate_rows(is_snv = hl.is_snp(mt.alleles[0], mt.alleles[1]))
return mt
def compute_passing(
mt: hl.MatrixTable,
data_type: str
) -> hl.MatrixTable:
"""
Compute variant passing status based on Wenhan's default recommendations:
If WGS: (AS_lowqual not True) and (AS_VQSLOD >= -2.4464 | SNV) and (AS_VQSLOD >= -0.3533 | INDEL)
If WES: (AS_lowqual not True) and (AS_VQSLOD >= -7.4038 | SNV) and (AS_VQSLOD >= -2.8062 | INDEL)
:param mt: MT to be annotated
:param data_type: Whether data is from CCDG exomes or genomes
:return: MT annotated with filter variant field
"""
logger.info("Starting to annotate passing filter...")
# Set different cutoffs for each data type
snv_cutoff = -2.4464 if data_type == "genomes" else -7.4038
indel_cutoff = -0.3533 if data_type == "genomes" else -2.8062
# Compute PASS/FAIL status
mt = mt.annotate_rows(filters =
hl.if_else((~hl.is_missing(mt.AS_VQSLOD)) & (~mt.AS_lowqual) &
(((mt.is_snv) & (mt.AS_VQSLOD >= snv_cutoff))|
((~mt.is_snv) & (mt.AS_VQSLOD >= indel_cutoff))),
{"PASS"}, {"FAIL"}))
return mt
def format_mt_for_vcf(
mt: hl.MatrixTable
) -> hl.MatrixTable:
"""
Make final formatting adjustments to mt before writing to VCF
Columns: [only 's' is taken by export_vcf]
info becomes INFO field
filters becomes FILTERS field
rsid becomes ID field
qual becomes QUAL field
No other row fields exported
:param mt: MT to be annotated
:return: MT annotated with only GT entry field and info field
"""
logger.info("Starting to format MT for VCF...")
# Keep only the GT entry field
mt = mt.select_entries(mt.GT)
# Construct the info field
mt = mt.annotate_rows(info = hl.struct(was_split = mt.was_split,
AS_VQSLOD = mt.AS_VQSLOD,
AS_lowqual = mt.AS_lowqual,
is_snv = mt.is_snv))
return mt
def export_VDS_to_VCF(
vds: hl.vds.VariantDataset,
data_type: str,
bucket: str,
chr: str,
out_path: str
):
"""
Driver function for annotating and exporting VDS to VCF.
Filters to provided chr, lifts variant annotations over, computes PASS/FAIL,
and finally writes to VCF.
:param vds: VDS to be used
:param data_type: Whether data is from CCDG exomes or genomes
:param bucket: Path prefix to either exomes or genomes buckets
:param chr: String representation of chromosome to be filtered to
:param out_path: VCF output path
"""
logger.info(f"Starting to export VDS ({chr}) to VCF...")
# Filter VDS to chr, convert to MT, compute GT, and split multi
mt = prepare_vds_to_mt(vds, chr)
# Annotate MT with AS_VQSLOD, AS_lowqual, and is_snv
mt = prepare_mt(mt, data_type, bucket)
# Compute passing status using Wenhan's default suggestions and annotate
mt = compute_passing(mt, data_type)
# Last formatting for mt before writing to VCF
mt = format_mt_for_vcf(mt)
# Format metadata
snv_cutoff = -2.4464 if data_type == "genomes" else -7.4038
indel_cutoff = -0.3533 if data_type == "genomes" else -2.8062
metadata = {'filter': {'PASS': {'Description': 'Variant passing QC'}, 'FAIL': {'Description': 'Variant failing QC'}},
'info': {'was_split': {'Description': 'True if this variant was originally multiallelic, otherwise False.',
'Number': '0',
'Type': 'Flag'},
'AS_VQSLOD': {'Description': f"AS_VQSLOD score. Passing variants have score >= {snv_cutoff} if SNV and >= {indel_cutoff} otherwise (indel).",
'Number': '1',
'Type': 'Float'},
'AS_lowqual': {'Description': 'AS_lowqual classification. Passing variants have value FALSE.',
'Number': '0',
'Type': 'Flag'},
'is_snv': {'Description': 'Whether a variant is an SNV',
'Number': '0',
'Type': 'Flag'}
}
}
# Write to VCF
logger.info(f"Starting to write to VCF...")
hl.export_vcf(mt, out_path, metadata = metadata)
def main(args):
# Initialize with temporary 4-day bucket
hl.init(default_reference="GRCh38",
tmp_dir="gs://gsp-ccdg-f3-tmp-4day/conversion_runs/")
data_type = "exomes" if args.exomes else "genomes"
# Grab workspace bucket prefix (for either exomes or genomes)
bucket = get_bucket(data_type)
# Read in VDS
vds = read_in_vds(args.vds)
# Convert VDS
# If no specific chromosome specified, cycle through chr1 to chr23, chrM, chrX, chrY
chrs = list(range(1, 23)) + ['M', 'X', 'Y']
if (args.chr == 0):
for c in chrs:
path = f"{args.out}_chr{c}.vcf.bgz"
logger.info(f"Exporting to: {path}")
export_VDS_to_VCF(vds, data_type, bucket, f"chr{c}", path)
else:
for c in args.chr.split(","):
assert(c in [str(x) for x in chrs])
path = f"{args.out}_chr{c}.vcf.bgz"
logger.info(f"Exporting to: {path}")
export_VDS_to_VCF(vds, data_type, bucket, f"chr{c}", path)
logger.info("Copying log to logging bucket...")
hl.copy_log(f"gs://bgen-temp/log/{data_type}_export_{date.today()}_chr{args.chr}.log")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--exomes",
help = "Subset CCDG exomes. Otherwise, default is CCDG genomes.",
action = "store_true",
)
parser.add_argument(
"--vds",
help = "Full path to input VDS for conversion",
type = str,
)
parser.add_argument(
"--chr",
help="Comma-separated list of chromosomes to subset down to and export as VCF. If no input, run on chr1-chr22, chrM, chrX, chrY",
default=0,
type=str,
)
parser.add_argument(
"--out",
help="Output path prefix for final VCF WITHOUT file extension (.vcf.bgz will be used)",
type=str,
)
args = parser.parse_args()
main(args)