-
Notifications
You must be signed in to change notification settings - Fork 0
/
rfmix2xarr.py
116 lines (88 loc) · 2.5 KB
/
rfmix2xarr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import argparse
import logging
import sys
import numpy as np
import xarray as xr
logging.basicConfig(level=logging.INFO)
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Convert rfmix .fb.tsv output to plink2 pgen format"
)
parser.add_argument("--file", help="Path to rfmix .fb.tsv file")
parser.add_argument("--out", help="Prefix of the output files")
parser.add_argument(
"--pop",
help="the ancestry to output, match to the header of rfmix output",
type=str,
)
return parser.parse_args(argv)
def main(argv):
args = parse_args(argv)
fbtsv = args.file
outprefix = args.out
# Count M
f = open(fbtsv, "r")
M = sum(1 for line in f) - 2
f.close()
# Count n_pops
f = open(fbtsv, "r")
comment = f.readline()
pops = comment.strip().split("\t")[1:]
pops_idx_lookup = {p: (i + 1) for i, p in enumerate(pops)}
idx = pops_idx_lookup[args.pop]
n_pops = len(pops)
# Count N
header = f.readline()
indv = np.array(
list(map(lambda x: x.split(":::")[0], header.strip().split("\t")[4:]))[
:: (2 * n_pops)
]
)
N = indv.shape[0]
logging.info(
f"""
==============Start==============
Read file: {fbtsv}
Found {N} individuals and {M} variants
in {n_pops} populations: {','.join(pops)}
Local ancestry for \"{pops[idx-1]}\" will be output
"""
)
# pgen
chrom = None
pos = []
# M = 100
genotype_matrix = np.nan * np.empty((M, N))
for i, line in enumerate(f):
if i % 100 == 0:
logging.info(f"processing {i}-th marker")
# if i == 100:
# break
line_split = line.strip().split("\t")
genotype_matrix[i] = np.float_(
line_split[(3 + idx) :: (2 * n_pops)]
) + np.float_(line_split[(3 + n_pops + idx) :: (2 * n_pops)])
if chrom is None:
chrom = int(line_split[0])
pos.append(int(line_split[1]))
vp = np.array(pos)
vi = [f"{chrom}:{p}" for p in pos]
si = indv
xarr = xr.DataArray(
genotype_matrix,
dims=["variants", "samples"],
coords={
"pos": ("variants", vp),
"variant_id": ("variants", vi),
"sample_id": ("samples", si),
},
)
print(xarr)
xarr.to_netcdf(f"{outprefix}.nc")
logging.info(
"""
===============END================
"""
)
if __name__ == "__main__":
main(sys.argv[1:])