-
Notifications
You must be signed in to change notification settings - Fork 0
/
apiCall.py
111 lines (90 loc) · 3.79 KB
/
apiCall.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
CriminalIP.client
This module implements the CriminalIP API
"""
from exception import APIError
import requests
import time
import json
class CriminalIP:
def __init__(self, cip_key=None):
self.cip_key = cip_key
self.api_url = 'https://api.criminalip.io'
self.api_query_time = None
self.api_limit_rate = 1
self.session = requests.Session()
self.session.proxies = None
self.basestring = str
self.baseint = int
def _request(self, function, params=None, method='get'):
base_url = self.api_url
headers = {"x-api-key" : self.cip_key}
# Wait for API rate limit
if self.api_query_time is not None and self.api_limit_rate > 0:
while(1.0 / self.api_limit_rate) + self.api_query_time >= time.time():
time.sleep(0.1 / self.api_limit_rate)
# Send Requets
try:
method = method.lower()
if method == 'post':
data = self.session.post(base_url + function, params, headers=headers)
elif method == 'put':
data = self.session.put(base_url + function, params=params)
elif method =='delete':
data = self.session.delete(base_url + function, params=params)
else:
data = self.session.get(base_url + function, params=params, headers=headers)
self.api_query_time = time.time()
except Exception as e:
print("Occur Api Error")
raise APIError("Unable to connect to CriminalIP")
# Check that the API key wasn't rejected
if data.status_code == 401:
try:
# Return the actual error message if the API returned valid JSON
error = data.json()['error']
except Exception as e:
# If the response looks like HTML then it's probably the 401 page that nginx returns
# for 401 responses by default
if data.text.startswith('<'):
error = 'Invalid API key'
else:
# Otherwise lets raise the error message
error = u'{}'.format(e)
raise APIError(error)
elif data.status_code == 403:
raise APIError('Access denied (403 Forbidden)')
elif data.status_code == 502:
raise APIError('Bad Gateway (502)')
# Parse the text into JSON
try:
#data = data.json()
# status_code = data.status_code
data = json.loads(data.text)
except ValueError:
raise APIError('Unable to parse JSON response')
# Raise an exception if an error occurred
if type(data) == dict and 'error' in data:
raise APIError(data['error'])
# Return the data
# print("data: " + json.dumps(data))
# return json.dumps(data)
return data
def criminal_domain_scan(self, domain):
#Get all available data
if isinstance(domain, self.basestring):
params = {}
params['query'] = domain
return self._request('/v1/domain/scan', params, method='post')
def criminal_domain_report(self, scan_id):
#Post all available data
if isinstance(scan_id, self.baseint):
params = {}
params['id'] = scan_id
return self._request('/v1/domain/report/%d' %params['id'])
def criminal_is_safe_dns_server(self, scan_ip):
#Get all available data
if isinstance(scan_ip, self.basestring):
params = {}
params['ip'] = scan_ip
return self._request('/v1/feature/ip/is_safe_dns_server', params)