forked from hochshi/DomainMap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mapDomains2UNP.py
127 lines (111 loc) · 5.63 KB
/
mapDomains2UNP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from __future__ import print_function
import requests
import json
import base64
from string import Template
import argparse
import csv
import untangle
import subprocess
from xml.sax import SAXParseException
__author__ = 'lab'
PDB_UNP_ALIGN_URL = 'http://www.rcsb.org/pdb/rest/das/pdb_uniprot_mapping/alignment?query='
UNP_FASTA_URL = 'http://www.uniprot.org/uniprot/'
ECOD_FASTA_URL = 'http://prodata.swmed.edu/ecod/complete/sequence?id='
UNP_FASTA_FILE = "/tmp/unp.fasta"
ECOD_FASTA_FILE = "/tmp/ecod.fasta"
SWALIGN_CMD = "java -jar NWAlign.jar "
NEO4J_CREATE_TRAN_URL = "http://localhost:7474/db/data/transaction/commit"
NEO4J_USER_PASS = 'neo4j:Sheshi6'
def main():
args = parse_args()
source = args.ecod_source
# csv_destination = args.csv_destination
# destination = args.destination
csv_reader = csv.DictReader(source, delimiter="\t")
domain_unp_mapping = {}
for row in csv_reader:
domain_uid = row['Uid']
domain_id = row['Domain_id']
chain_ids = get_chain_id(row)
for chain_id in chain_ids:
try:
aligned_uniprot = get_aligned_uniprot(chain_id)
uniprot_fasta = get_uniprot_fasta(aligned_uniprot)
domain_fasta = get_domain_fasta(domain_id)
alignment = get_uniprot_domain_alignment()
alignment_map = json.loads(alignment);
if float(alignment_map['sequence_identity']) >= 0.9:
neo4j_map_ecod2unp(int(domain_uid), aligned_uniprot, alignment_map['matches'])
else:
print('ECOD entry '+domain_uid+" Doesn't match against UNP entry "+aligned_uniprot+
" sequence_identity is: "+alignment_map['sequence_identity']+"\n")
except SAXParseException as e:
print('ECOD entry '+domain_uid+" raised a SAX exception while matching against UNP entry\n")
print(e)
except TypeError as e:
print('ECOD entry '+domain_uid+" raised a Type error exception while matching against UNP entry"+aligned_uniprot+"\n sequence_identity: "+str(alignment_map['sequence_identity'])+"\n")
print(e)
def parse_args():
parser = argparse.ArgumentParser(description="Import ECOD to pdb and uniprot mapping.")
# exclusive_group = parser.add_mutually_exclusive_group(required=True)
# batch_group = exclusive_group.add_argument_group("Batch processing", "description")
parser.add_argument("-ecod_src", "--ecod_source", help="Source file with ECOD IDs", type=file, required=True)
parser.add_argument("-pdb_unp_map", "--pdb_unp_mapping", help="Source file with PDB to UNP residue mapping", type=file, required=True)
# parser.add_argument("-csvdst", "--csv_destination", help="CSV mapping file destination", type=argparse.FileType('w'),
# required=True)
# parser.add_argument("-dst", "--destination", help="Mapping file destination", type=argparse.FileType('w'),
# required=True)
args = parser.parse_args()
return args
def get_chain_id(row):
pdb_id = row['Domain_id'][1:5]
chain_list = []
if "." == row['Chain_id']:
for chain_res in row['PDB_residue_range'].split(","):
chain_list.append(pdb_id+"."+chain_res.split(":")[0])
else:
chain_list.append(pdb_id+"."+row['Chain_id'])
return chain_list
def get_aligned_uniprot(chain_id):
obj = untangle.parse(PDB_UNP_ALIGN_URL+chain_id)
for alignObject in obj.dasalignment.alignment.alignObject:
if 'UniProt' == alignObject['dbSource']:
return alignObject['dbAccessionId']
def get_uniprot_fasta(aligned_uniprot):
fasta = requests.get(UNP_FASTA_URL+aligned_uniprot+'.fasta').text
unp_fa = open(UNP_FASTA_FILE, 'w')
print(fasta, file=unp_fa)
unp_fa.close()
def get_domain_fasta(domain_id):
fasta = requests.get(ECOD_FASTA_URL+domain_id).text
ecod_fa = open(ECOD_FASTA_FILE, 'w')
print(fasta, file=ecod_fa)
ecod_fa.close()
def get_uniprot_domain_alignment():
return subprocess.check_output([SWALIGN_CMD+UNP_FASTA_FILE+' '+ECOD_FASTA_FILE, ], shell=True)
def neo4j_map_ecod2unp(domain_id, aligned_uniprot, matches_list):
createmappings(domain_id, aligned_uniprot, matches_list, NEO4J_CREATE_TRAN_URL)
def createmappings(domain_id, aligned_uniprot, matches_list, trans_location):
unp_statment_template = 'MATCH (d:Domain {uid: $domain_uid})' \
' MERGE (u:UniprotEntry {accession: "$unp_acc"})' \
' CREATE UNIQUE (d)-[:MATCHES {uniprot_start: $uniprot_start, uniprot_end: $uniprot_end, ecod_start: $ecod_start, ecod_end: $ecod_end}]->(u)'
unp_template = Template(unp_statment_template)
statments_list = []
for match in matches_list:
unp_statment = unp_template.substitute(domain_uid=domain_id, unp_acc=aligned_uniprot,
uniprot_start=match['uniprot_start'], uniprot_end=match['uniprot_end'],
ecod_start=match['ecod_start'], ecod_end=match['ecod_end'])
statments_list.append({'statement': unp_statment})
statements_dict = {'statements': statments_list}
r = requests.post(trans_location, headers=generateheaders(),data=json.dumps(statements_dict))
r_obj = json.loads(r.text)
if r_obj['errors']:
print("Errors occured while mapping ecod "+domain_id+" to uniprot "+aligned_uniprot+"\n")
print(r_obj['errors'])
def generateheaders():
return {'Authorization': base64.b64encode(NEO4J_USER_PASS),
'Accept': 'application/json; charset=UTF-8',
'Content-Type': 'application/json'}
if __name__ == "__main__":
main()