-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy path1.B.5.pipe_Merge_Combined_Genotyping_For_All_Tools.py
88 lines (72 loc) · 2.46 KB
/
1.B.5.pipe_Merge_Combined_Genotyping_For_All_Tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Merge combined genotyping results for all germline genotyping tools."""
# Import Modules
import sys
sys.path.append(sys.path[0] + "/../common")
import bioparse as bp
# Input Parameters
SK_FILE = sys.argv[1]
DV_FILE = sys.argv[2]
OUT_DIR = sys.argv[3]
OUT_ID = sys.argv[4]
# Global Variables
IN_FILENAME = SK_FILE.split("/")[-1]
IN_FILENAME = IN_FILENAME[:-7] if "gz" == IN_FILENAME[-2:] else IN_FILENAME[:-4]
OUT_FILE = f"{OUT_DIR}/{OUT_ID}.{IN_FILENAME}.vcf"
HET_GT_SET = {"1/0", "0/1", "1|0", "0|1"}
HOM_GT_SET = {"1/1", "1|1", "1"}
HOLDER_STR = "\t".join(tuple("..."))
def __check_if_proc_vcf(in_items):
return in_items[2] != "." and in_items[-1] == "."
def __grab_gt(format_str):
raw_gt = format_str.split(":")[0]
if raw_gt in HET_GT_SET:
return "het"
elif raw_gt in HOM_GT_SET:
return "hom"
else:
return "non-dip"
def __check_bases(ref_items, in_items):
return ref_items[3] == in_items[3] and ref_items[4] == in_items[4]
def __compare_vcf_info(ref_items, in_items):
if not __check_bases(ref_items, in_items):
return None
elif not __check_if_proc_vcf(ref_items):
ref_gt = __grab_gt(ref_items[-1])
in_gt = __grab_gt(in_items[-1])
if ref_gt == "non-dip" or ref_gt != in_gt:
return None
out_list = ref_items[:2]
var_id = ref_gt
var_id += "_ind" if len(ref_items[3]) != len(ref_items[4]) else "_snv"
out_list.append(var_id)
out_list += ref_items[3:5]
return "{}\t{}\n".format("\t".join(out_list), HOLDER_STR)
elif sk_items[2] == in_items[2]:
return "{}\n".format("\t".join(ref_items))
return None
if __name__ == '__main__':
print(f"# SK Input: {SK_FILE}")
print(f"# DV Input: {DV_FILE}")
sk_fp = bp.FileParser(SK_FILE)
dv_fp = bp.FileParser(DV_FILE)
out_f = open(OUT_FILE, "w")
bp.write_vcf_hd(out_f)
for sk_items in sk_fp.iter():
if dv_fp.term:
break
cmp_res = bp.cmp_lines(sk_items, dv_fp.get_items())
while cmp_res > 0:
dv_fp.next()
if dv_fp.term:
break
cmp_res = bp.cmp_lines(sk_items, dv_fp.get_items())
if dv_fp.term:
break
elif cmp_res == 0:
vcf_cmp_res = __compare_vcf_info(sk_items, dv_fp.get_items())
if vcf_cmp_res:
out_f.write(f"{vcf_cmp_res}")
sk_fp.close()
dv_fp.close()
out_f.close()
print("# All operations complete")