-
Notifications
You must be signed in to change notification settings - Fork 2
/
sslyze_parser.py
283 lines (241 loc) · 13.6 KB
/
sslyze_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from dataclasses import asdict
import argparse
import sys # check uninstalled library
import sslyze
import json
import subprocess
"""argument parsing section"""
ap = argparse.ArgumentParser()
ap.add_argument("-f", "--input", required=True, help="Input file path that will be parsed.")
ap.add_argument("-o", "--output", required=True, help="Output file name to write results into a CSV file.")
ap.add_argument("-k", "--keep", action='store_true', help="Keep unparsed sslyze result file .")
args = vars(ap.parse_args())
"""end of argument parsing section"""
"""This part will be implemented in install_sslyze and calculate_results instead of here"""
# def sslyze_starter():
# if args["install"]:
# subprocess.run(["pip3", "install", "upgrade", "setuptools"])
# subprocess.run(["pip3", "install", "upgrade", "sslyze"])
# else:
# pass
#
# os.system(
# "python3 -m sslyze --regular --targets_in=" + args['input'] + " --json_out=" + args["input"].split(".")[0] +
# ".json --heartbleed --slow_connection")
def get_field(dictionary, *param):
"""This can be static method and doesn't need to be inside of the class"""
if len(param) == 2:
try:
return dictionary[param[0]][param[1]]
except KeyError:
return None
elif len(param) == 3:
try:
return dictionary[param[0]][param[1]][param[2]]
except KeyError:
return None
def check_sslyze():
"""check sslyze if it exists on the system"""
return 'sslyze' in sys.modules
class SslyzeClass:
def __init__(self, input_file, output_file):
self.input_file = input_file
self.subdomain_list = self.create_subdomain_list()
self.output_file = output_file
self.is_sslyze_present = check_sslyze() # to check if sslyze is present on the system
self.servers_to_scan = [] # server informations for sslyze scanning process
self.scanner_get_result = {}
self.sslyze_result = [] # server scan result as json
"""
dict structure:
"subdomain_name": [features]
"subdomain_name": [features]
"""
def create_subdomain_list(self):
"""Input is assumed to be the output from subdomain enumeration."""
temp_list = []
with open(self.input_file, "r") as fp:
for line in fp:
temp_list.append(line.split(",")[0].strip())
return temp_list
def install_ssylze(self):
if self.is_sslyze_present:
print("Already installed.")
return
else:
subprocess.run(["pip3", "install", "setuptools"])
subprocess.run(["pip3", "install", "sslyze"])
def write_to_csv(self):
"""Write to csv using self.sslyze_result"""
header = "Hostname, IP, Heartbleed, CCS Injection, Robot Attack, Downgrade Attack, " \
"Client Oriented Renegotiation, Secure Renegotiation\n"
result = header
# for scan_result in sslyze_json:
# Known vulnerabilities check is_vulnerable_to_heartbleed = scan_result["scan_commands_results"]
# ["heartbleed"]["is_vulnerable_to_heartbleed"]
for each in self.sslyze_result:
is_vulnerable_to_heartbleed = get_field(each, "scan_commands_results", "heartbleed", "is_vulnerable_to_heartbleed")
is_vulnerable_to_ccs_injection = get_field(each, "scan_commands_results", "openssl_ccs_injection", "is_vulnerable_to_ccs_injection")
is_vulnerable_to_robot_attack = get_field(each, "scan_commands_results", "robot", "robot_result")
downgrade_attack = get_field(each, "scan_commands_results", "tls_fallback_scsv", "supports_fallback_scsv")
# Session Renegotiation
client_oriented_reneg = get_field(each, "scan_commands_results", "session_renegotiation", "accepts_client_renegotiation")
secure_reneg = get_field(each, "scan_commands_results", "session_renegotiation", "supports_secure_renegotiation")
# Weak cipher support should be implemented
# Hostname and IP
hostname = each["server_info"]["server_location"]["hostname"]
ip = each["server_info"]["server_location"]["ip_address"]
entry = "{},{},{},{},{},{},{},{}\n".format(hostname,
ip,
"Vulnerable to Heartbleed" if is_vulnerable_to_heartbleed else "Not vulnerable to Heartbleed",
"Vulnerable to CCS injection" if is_vulnerable_to_ccs_injection else "Not vulnerable to OpenSSL CCS injection",
is_vulnerable_to_robot_attack,
"OK - Supported" if downgrade_attack else "Downgrade Attack Possible",
"VULNERABLE - Server honors client-initiated renegotiations" if client_oriented_reneg else "Not vulnerable",
"OK - Supported" if secure_reneg else "Not Supported")
result = result + entry
# above code is from old snippet
with open(self.output_file + ".csv", "w", encoding="UTF-8") as fp:
fp.write(result)
def write_to_json(self):
"""Write raw sslyze output to json just before parsing"""
temp_dict = {"results": self.sslyze_result}
with open(self.output_file + '.json', 'w', encoding='UTF-8') as json_file:
json.dump(temp_dict, json_file, cls=sslyze.JsonEncoder, indent=4)
def calculate_results(self):
"""Use sslyze library to implement the features below"""
# server connection testing
for hostname in self.subdomain_list:
try:
server_location = sslyze.ServerNetworkLocationViaDirectConnection.with_ip_address_lookup(hostname, 443)
try:
server_info = sslyze.ServerConnectivityTester().perform(server_location)
self.servers_to_scan.append(server_info)
except ConnectionError as e:
print(f"Error connecting to {server_location.hostname}:{server_location.port}: {e.error_message}")
return
except:
print("Cannot resolve " + hostname + ", skipping..")
pass
scanner = sslyze.Scanner()
# Queue scan commands for each server
for server_info in self.servers_to_scan:
server_scan_req = sslyze.ServerScanRequest(server_info=server_info,
scan_commands={sslyze.ScanCommand.CERTIFICATE_INFO,
sslyze.ScanCommand.HEARTBLEED,
sslyze.ScanCommand.ROBOT,
sslyze.ScanCommand.SSL_2_0_CIPHER_SUITES,
sslyze.ScanCommand.SSL_3_0_CIPHER_SUITES,
sslyze.ScanCommand.OPENSSL_CCS_INJECTION,
sslyze.ScanCommand.SESSION_RENEGOTIATION,
sslyze.ScanCommand.TLS_1_0_CIPHER_SUITES,
sslyze.ScanCommand.TLS_1_1_CIPHER_SUITES,
sslyze.ScanCommand.TLS_1_2_CIPHER_SUITES,
sslyze.ScanCommand.TLS_1_3_CIPHER_SUITES,
sslyze.ScanCommand.SESSION_RESUMPTION,
sslyze.ScanCommand.TLS_COMPRESSION,
sslyze.ScanCommand.TLS_FALLBACK_SCSV,
})
scanner.queue_scan(server_scan_req)
self.scanner_get_result = scanner.get_results()
# Then retrieve the result of the scan commands for each server
for server_scan_result in scanner.get_results():
print(f"\nResults for {server_scan_result.server_info.server_location.hostname}:")
# Scan commands that were run with no errors
try:
ssl2_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.SSL_2_0_CIPHER_SUITES]
print("\nAccepted cipher suites for SSL 2.0:")
for accepted_cipher_suite in ssl2_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
except KeyError:
pass
try:
ssl3_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.SSL_3_0_CIPHER_SUITES]
print("\nAccepted cipher suites for SSL 3.0:")
for accepted_cipher_suite in ssl3_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
except KeyError:
pass
try:
tls1_0_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.TLS_1_0_CIPHER_SUITES]
print("\nAccepted cipher suites for TLS 1.0:")
for accepted_cipher_suite in tls1_0_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
except KeyError:
pass
try:
tls1_1_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.TLS_1_1_CIPHER_SUITES]
print("\nAccepted cipher suites for TLS 1.1:")
for accepted_cipher_suite in tls1_1_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
except KeyError:
pass
try:
tls1_2_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.TLS_1_2_CIPHER_SUITES]
print("\nAccepted cipher suites for TLS 1.2:")
for accepted_cipher_suite in tls1_2_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
except KeyError:
pass
try:
tls1_3_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.TLS_1_3_CIPHER_SUITES]
print("\nAccepted cipher suites for TLS 1.3:")
for accepted_cipher_suite in tls1_3_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
except KeyError:
pass
try:
heartbleed_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.HEARTBLEED]
print("\nResult for heartbleed:")
print(f"*{str(heartbleed_result.is_vulnerable_to_heartbleed)}")
except KeyError as e:
print(e)
try:
robot_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.ROBOT]
print("\nResult for robot:")
print(f"* {str(robot_result)}")
except KeyError as e:
print(e)
try:
openssl_css_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.OPENSSL_CCS_INJECTION]
print("\nResult for openssl:")
print(f"* {str(openssl_css_result.is_vulnerable_to_ccs_injection)}")
except KeyError as e:
print(e)
try:
session_reneg = server_scan_result.scan_commands_results[sslyze.ScanCommand.SESSION_RENEGOTIATION]
print("\nResult for session renegotiation:")
print(f"* accepts client renegotitation: {str(session_reneg.is_vulnerable_to_client_renegotiation_dos)} \n"
f"* supports_secure_renegotiation: {str(session_reneg.supports_secure_renegotiation)}")
except KeyError as e:
print(e)
except AttributeError as f:
print(f)
try:
tls_compression_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.TLS_COMPRESSION]
print("\nResult for TLS Compression:")
print(f"* {str(tls_compression_result.supports_compression)}")
except KeyError as e:
print(e)
try:
tls_fallback_result = server_scan_result.scan_commands_results[sslyze.ScanCommand.TLS_FALLBACK_SCSV]
print("\nResult for TLS Fallback Downgrade Prevention:")
print(f"* {str(tls_fallback_result.supports_fallback_scsv)}")
except KeyError as e:
print(e)
try:
# Scan commands that were run with errors
for scan_command, error in server_scan_result.scan_commands_errors.items():
print(f"\nError when running {scan_command}:\n{error.exception_trace}")
except TimeoutError as t:
print(t)
self.sslyze_result.append(asdict(server_scan_result))
if __name__ == '__main__':
sslyze_obj = SslyzeClass(args["input"], args["output"])
sslyze_obj.install_ssylze()
sslyze_obj.calculate_results()
if args["keep"]:
sslyze_obj.write_to_json()
sslyze_obj.write_to_csv()
else:
sslyze_obj.write_to_csv()