diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index abf50f28b..fe7aa5e46 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -53,7 +53,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update
- sudo apt-get install php8.1-cli php8.1-xml -y --no-install-recommends
+ sudo apt-get install php8.1-cli php8.1-xml sslscan -y --no-install-recommends
python -m pip install --upgrade pip
pip install -U setuptools
pip3 install .[test]
diff --git a/README.rst b/README.rst
index 08525a110..eeee6c6f5 100644
--- a/README.rst
+++ b/README.rst
@@ -31,8 +31,6 @@ See `INSTALL.md `__.
-The `ssl` module used to scan TLS/SSL misconfiguration won't work on ARM processors (see `SSLyze documentation `__).
-
How it works
============
@@ -145,7 +143,7 @@ The aforementioned attacks are tied to the following module names :
+ shellshock (Test Shellshock attack, see `Wikipedia `__)
+ spring4shell (Detects websites vulnerable to CVE-2020-5398)
+ sql (Error-based and boolean-based SQL injection detection)
-+ ssl (Evaluate the security of SSL/TLS certificate configuration, requires `SSLyze `__)
++ ssl (Evaluate the security of SSL/TLS certificate configuration, requires `sslscan `__)
+ ssrf (Server Side Request Forgery)
+ takeover (Subdomain takeover)
+ timesql (SQL injection vulnerabilities detected with time-based methodology)
diff --git a/doc/ChangeLog_Wapiti b/doc/ChangeLog_Wapiti
index 2e501c140..a71681b31 100644
--- a/doc/ChangeLog_Wapiti
+++ b/doc/ChangeLog_Wapiti
@@ -9,6 +9,7 @@ Unrelease
Core : fix headless explorer method
Core : fix max-scan-time and missing timeout
Python : update dependencies and pip configurations
+ mod_ssl: Move to sslscan for the ssl module instead of sslyze
09/08/2023
Wapiti 3.1.8
diff --git a/doc/FAQ.md b/doc/FAQ.md
index c2ff902e1..52fbf6df8 100644
--- a/doc/FAQ.md
+++ b/doc/FAQ.md
@@ -36,8 +36,8 @@ If you are really lost, feel free to contact me.
### I have a warning about the ssl module not working ! ###
-The `ssl` module requires the Python libraries [SSLyze](https://github.com/nabla-c0d3/sslyze) and [humanize](https://github.com/python-humanize/humanize).
-The module is optional therefore you need to install those dependencies to use the `ssl` module.
+The `ssl` module requires the [sslscan](binary) to be present in your PATH.
+Check if the software is available with your package manager.
### I have some UnicodeDecodeError as soon as I launch Wapiti ! ###
diff --git a/pyproject.toml b/pyproject.toml
index aeb28dc35..baeabbb9e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -42,6 +42,7 @@ dependencies = [
"httpcore==1.0.4",
"httpx[brotli, socks]==0.27.0",
"httpx-ntlm==1.4.0",
+ "humanize==4.9.0",
"loguru==0.7.2",
"mako==1.3.2",
"markupsafe==2.1.5",
@@ -64,17 +65,12 @@ wapiti = "wapitiCore.main.wapiti:wapiti_asyncio_wrapper"
wapiti-getcookie = "wapitiCore.main.getcookie:getcookie_asyncio_wrapper"
[project.optional-dependencies]
-ssl = [
- "humanize==4.9.0",
- "sslyze==5.2.0"
-]
test = [
"humanize==4.9.0",
"pytest==8.0.2",
"pytest-cov==4.1.0",
"pytest-asyncio==0.23.5",
"respx==0.20.2",
- "sslyze==5.2.0"
]
[tool.setuptools.packages]
diff --git a/tests/attack/test_mod_ssl.py b/tests/attack/test_mod_ssl.py
index c6dd7a0ea..6a091fcfd 100644
--- a/tests/attack/test_mod_ssl.py
+++ b/tests/attack/test_mod_ssl.py
@@ -6,14 +6,22 @@
import http.server
import ssl
from unittest.mock import AsyncMock
+from datetime import datetime, timedelta
+import httpx
import pytest
+import respx
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa
from wapitiCore.net.classes import CrawlerConfiguration
from wapitiCore.net import Request
-from wapitiCore.language.vulnerability import CRITICAL_LEVEL, HIGH_LEVEL, INFO_LEVEL
+from wapitiCore.language.vulnerability import CRITICAL_LEVEL, HIGH_LEVEL, INFO_LEVEL, MEDIUM_LEVEL
from wapitiCore.net.crawler import AsyncCrawler
-from wapitiCore.attack.mod_ssl import ModuleSsl, NAME
+from wapitiCore.attack.mod_ssl import ModuleSsl, NAME, extract_altnames, match_address, check_ocsp_must_staple, \
+ check_ev_certificate, process_vulnerabilities, process_bad_protocols
def https_server(cert_directory: str):
@@ -88,11 +96,11 @@ async def test_ssl_scanner():
payload_type="vulnerability",
module="ssl",
category=NAME,
- level=CRITICAL_LEVEL,
+ level=HIGH_LEVEL,
request=request,
parameter='',
wstg=["WSTG-CRYP-01"],
- info="Certificate is invalid for Mozilla trust store: self-signed certificate",
+ info="Strict Transport Security (HSTS) is not set",
response=None
)
@@ -101,10 +109,120 @@ async def test_ssl_scanner():
payload_type="vulnerability",
module="ssl",
category=NAME,
- level=HIGH_LEVEL,
+ level=MEDIUM_LEVEL,
request=request,
parameter='',
wstg=["WSTG-CRYP-01"],
- info="Strict Transport Security (HSTS) is not set",
+ info="Self-signed certificate detected: The certificate is not signed by a trusted Certificate Authority",
response=None
)
+
+
+def test_extract_alt_names():
+ assert ["perdu.com", "test.fr"] == extract_altnames("DNS:perdu.com, DNS:test.fr, whatever, ")
+
+
+def test_match_address():
+ assert match_address("sub.domain.com", "domain.com", ["*.domain.com", "yolo"])
+ assert match_address("sub.domain.com", "*.domain.com", ["yolo"])
+ assert not match_address("sub.domain.com", "google.com", ["*.truc.com"])
+
+
+def generate_cert(include_organization_name: bool = True, include_ocsp_must_staple: bool = True):
+ # Generate a private key
+ private_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+
+ # Build the subject name
+ subject_name = [
+ x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"),
+ x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"California"),
+ x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"San Francisco"),
+ x509.NameAttribute(x509.NameOID.COMMON_NAME, u"mysite.com"),
+ ]
+
+ if include_organization_name:
+ subject_name.append(x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"My Company"))
+
+ # Generate a certificate
+ subject = issuer = x509.Name(subject_name)
+
+ cert_builder = x509.CertificateBuilder().subject_name(
+ subject
+ ).issuer_name(
+ issuer
+ ).public_key(
+ private_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.utcnow()
+ ).not_valid_after(
+ datetime.utcnow() + timedelta(days=10)
+ ).add_extension(
+ x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+ critical=False,
+ )
+
+ if include_ocsp_must_staple:
+ cert_builder = cert_builder.add_extension(
+ x509.TLSFeature([x509.TLSFeatureType.status_request]),
+ critical=False
+ )
+
+ cert = cert_builder.sign(private_key, hashes.SHA256(), default_backend())
+ return cert
+
+
+@pytest.mark.asyncio
+@respx.mock
+async def test_certificate_transparency():
+ cert = generate_cert()
+ respx.get(f'https://crt.sh/?q={cert.serial_number}').mock(
+ # Method GET that serve as a reference
+ return_value=httpx.Response(200, text="Success")
+ )
+
+ persister = AsyncMock()
+ request = Request("https://127.0.0.1:4443/")
+ request.path_id = 42
+ crawler_configuration = CrawlerConfiguration(Request("https://127.0.0.1:4443/"), timeout=1)
+ async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
+ options = {"timeout": 10, "level": 2}
+
+ module = ModuleSsl(crawler, persister, options, Event(), crawler_configuration)
+ assert 1 == await module.check_certificate_transparency(cert)
+
+
+def test_ocsp():
+ assert 0 == check_ocsp_must_staple(generate_cert(include_ocsp_must_staple=False))
+ assert 1 == check_ocsp_must_staple(generate_cert())
+
+
+def test_extended_validation():
+ assert 0 == check_ev_certificate(generate_cert(include_organization_name=False))
+ assert 1 == check_ev_certificate(generate_cert())
+
+
+@pytest.mark.asyncio
+async def test_process_vulnerabilities():
+ base_dir = os.path.dirname(sys.modules["wapitiCore"].__file__)
+ xml_file = os.path.join(base_dir, "..", "tests/data/ssl/broken_ssl.xml")
+ results = [info async for info in process_vulnerabilities(xml_file)]
+ assert [
+ (4, 'Server is vulnerable to Heartbleed attack via TLSv1.0'),
+ (3, 'Server honors client-initiated renegotiations (vulnerable to DoS attacks)')
+ ] == results
+
+
+@pytest.mark.asyncio
+async def test_process_bad_protocols():
+ base_dir = os.path.dirname(sys.modules["wapitiCore"].__file__)
+ xml_file = os.path.join(base_dir, "..", "tests/data/ssl/broken_ssl.xml")
+ results = [info async for info in process_bad_protocols(xml_file)]
+ assert [
+ (4, 'The following protocols are deprecated and/or insecure and should be deactivated: SSLv2, TLSv1.0')
+ ] == results
diff --git a/tests/data/ssl/broken_ssl.xml b/tests/data/ssl/broken_ssl.xml
new file mode 100644
index 000000000..93685e59b
--- /dev/null
+++ b/tests/data/ssl/broken_ssl.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ sha256WithRSAEncryption
+
+
+
+
+ false
+ Jan 19 00:00:00 2024 GMT
+ false
+ Feb 22 23:59:59 2024 GMT
+ true
+
+
+
+
diff --git a/wapitiCore/attack/mod_ssl.py b/wapitiCore/attack/mod_ssl.py
index 4f0d7d16f..b576f8ebe 100644
--- a/wapitiCore/attack/mod_ssl.py
+++ b/wapitiCore/attack/mod_ssl.py
@@ -15,23 +15,28 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-from datetime import datetime
+# https://badssl.com/ can help to test the module
+import fnmatch
+import os
+import socket
+import ssl
+import subprocess
+from datetime import datetime, timezone
import json
import asyncio
-from os.path import join as path_join
-from typing import List, Tuple, Optional
+from os.path import join as path_join, exists
+from typing import List, Tuple, Optional, AsyncIterator
from collections import defaultdict
-from itertools import chain
+import xml.etree.ElementTree as ET
+import re
+import tempfile
+import shutil
+from httpx import RequestError
import humanize
-from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
+from cryptography.hazmat._oid import NameOID
+from cryptography.hazmat.backends import default_backend
from cryptography import x509
-from sslyze.plugins.certificate_info._certificate_utils import get_common_names, \
- parse_subject_alternative_name_extension
-from sslyze.plugins.robot.implementation import RobotScanResultEnum
-from sslyze import ServerNetworkLocation, ServerNetworkConfiguration, ScanCommand, Scanner, ServerScanRequest, \
- ScanCommandAttemptStatusEnum
-from sslyze.errors import ServerHostnameCouldNotBeResolved
from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request, Response
@@ -40,135 +45,230 @@
from wapitiCore.definitions.ssl import NAME, WSTG_CODE
-def get_common_name(name_field: x509.Name) -> str:
- try:
- return get_common_names(name_field)[0]
- except IndexError:
- return name_field.rfc4514_string()
-
-
-def cipher_level_to_color(security_level: str) -> str:
- if security_level == "Insecure":
+def sslscan_level_to_color(security_level: str) -> str:
+ if security_level == "weak":
return "RED"
- if security_level == "Weak":
+ if security_level == "acceptable":
return "ORANGE"
# Secure / Recommended / Unknown
return "GREEN"
-def cipher_level_to_wapiti_level(security_level: str) -> str:
- if security_level == "Insecure":
+def sslscan_level_to_wapiti_level(security_level: str) -> str:
+ if security_level == "weak":
return CRITICAL_LEVEL
- if security_level == "Weak":
- return HIGH_LEVEL
+ if security_level == "acceptable":
+ return MEDIUM_LEVEL
# Secure / Recommended / Unknown
return INFO_LEVEL
-def process_certificate_info(certinfo_result):
- for cert_deployment in certinfo_result.certificate_deployments:
+def check_ev_certificate(cert: x509.Certificate) -> bool:
+ """
+ Checks if the certificate is an EV (Extended Validation) certificate.
+ """
+ for attribute in cert.subject:
+ if attribute.oid == NameOID.ORGANIZATION_NAME:
+ return True
+ return False
- leaf_certificate = cert_deployment.received_certificate_chain[0]
- message = f"Certificate subject: {get_common_name(leaf_certificate.subject)}"
- log_blue(message)
- yield INFO_LEVEL, message
- alt_names = parse_subject_alternative_name_extension(leaf_certificate)
- alt_names = ", ".join(chain(alt_names.dns_names, alt_names.ip_addresses))
- message = f"Alt. names: {alt_names}"
+def get_certificate(hostname: str, port: int = 443) -> x509.Certificate:
+ context = ssl.create_default_context()
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
+ conn = context.wrap_socket(
+ socket.socket(socket.AF_INET),
+ server_hostname=hostname,
+ )
+ conn.connect((hostname, port))
+ cert_bin = conn.getpeercert(True)
+ return x509.load_der_x509_certificate(cert_bin, default_backend())
+
+
+def check_ocsp_must_staple(cert: x509.Certificate) -> bool:
+ try:
+ extension = cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.5.5.7.1.24"))
+ return extension is not None
+ except x509.ExtensionNotFound:
+ return False
+
+
+def extract_altnames(altnames: str) -> List[str]:
+ return [name.split(":", 1)[1] for name in re.split(r',\s*', altnames) if ":" in name]
+
+
+def match_address(target: str, subject: str, alt_names: List[str]) -> bool:
+ # Check against subject
+ if fnmatch.fnmatch(target, subject):
+ return True
+
+ # Check against each alternative name
+ for alt_name in alt_names:
+ if fnmatch.fnmatch(target, alt_name):
+ return True
+
+ return False
+
+
+def sslscan_date_to_utc(date_str: str) -> datetime:
+ dt = datetime.strptime(date_str, "%b %d %H:%M:%S %Y GMT")
+ return dt.replace(tzinfo=timezone.utc)
+
+
+async def process_cert_info(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
+ tree = ET.parse(xml_file)
+ root = tree.getroot()
+
+ target = root.find(".//ssltest").get("sniname")
+
+ # Extract certificate information
+ for cert in root.findall(".//certificate"):
+ subject = cert.find("subject").text
+ message = f"Certificate subject: {subject}"
log_blue(message)
yield INFO_LEVEL, message
- message = f"Issuer: {get_common_name(leaf_certificate.issuer)}"
+ alt_names_tag = cert.find("altnames")
+ if alt_names_tag is not None and alt_names_tag.text.strip():
+ alt_names = extract_altnames(alt_names_tag.text)
+ message = f"Alt. names: {', '.join(alt_names)}"
+ log_blue(message)
+ yield INFO_LEVEL, message
+ else:
+ alt_names = []
+
+ message = f"Issuer: {cert.find('issuer').text}"
log_blue(message)
yield INFO_LEVEL, message
- if not cert_deployment.leaf_certificate_subject_matches_hostname:
+ if not match_address(target, subject, alt_names):
message = "Requested hostname doesn't match those in the certificate"
log_red(message)
yield CRITICAL_LEVEL, message
- if not cert_deployment.received_chain_has_valid_order:
- message = "Certificate chain is in invalid order"
- log_orange(message)
- yield MEDIUM_LEVEL, message
-
- public_key = leaf_certificate.public_key()
-
- if isinstance(public_key, EllipticCurvePublicKey):
- key_size = public_key.curve.key_size
- else:
- key_size = public_key.key_size
-
- if public_key.__class__.__name__ == "_RSAPublicKey":
- algorithm = "RSA"
- elif public_key.__class__.__name__ == "_EllipticCurvePublicKey":
- algorithm = "ECC"
- else:
- algorithm = public_key.__class__.__name__
-
- message = f"Key: {algorithm} {key_size} bits"
+ key = cert.find("pk")
+ message = f"Key: {key.get('type')} {key.get('bits')} bits"
log_blue(message)
yield INFO_LEVEL, message
- message = f"Signature Algorithm: {leaf_certificate.signature_hash_algorithm.name}"
+ message = f"Signature Algorithm: {cert.find('signature-algorithm').text}"
log_blue(message)
yield INFO_LEVEL, message
- if leaf_certificate.not_valid_after > datetime.utcnow():
- message = "Certificate expires in " + \
- humanize.precisedelta(leaf_certificate.not_valid_after - datetime.utcnow())
+ if cert.find("self-signed").text == "true":
+ message = (
+ "Self-signed certificate detected: The certificate is not signed by a trusted Certificate Authority"
+ )
+ log_orange(message)
+ yield MEDIUM_LEVEL, message
+
+ not_valid_after = sslscan_date_to_utc(cert.find("not-valid-after").text)
+ utcnow = datetime.utcnow().replace(tzinfo=timezone.utc)
+ if not_valid_after > utcnow:
+ message = "Certificate expires in " + humanize.precisedelta(not_valid_after - utcnow)
log_green(message)
yield INFO_LEVEL, message
else:
- message = f"Certificate has expired at {leaf_certificate.not_valid_after}"
+ message = f"Certificate has expired at {not_valid_after}"
log_red(message)
yield CRITICAL_LEVEL, message
- if not cert_deployment.leaf_certificate_is_ev:
- message = "Certificate doesn't use Extended Validation"
- log_orange(message)
- yield MEDIUM_LEVEL, message
- # https://en.wikipedia.org/wiki/OCSP_stapling
- if not cert_deployment.leaf_certificate_has_must_staple_extension:
- message = "OCSP Must-Staple extension is missing"
- log_orange(message)
- yield MEDIUM_LEVEL, message
+async def process_cipher_suites2(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
+ tree = ET.parse(xml_file)
+ root = tree.getroot()
- if cert_deployment.leaf_certificate_signed_certificate_timestamps_count is None:
- message = "Certificate transparency: Unknown (OpenSSL version is not recent enough)"
- log_orange(message)
- yield MEDIUM_LEVEL, message
- elif cert_deployment.leaf_certificate_signed_certificate_timestamps_count:
- message = (
- "Certificate transparency: Yes "
- f"({cert_deployment.leaf_certificate_signed_certificate_timestamps_count} SCT)"
+ protocol_versions = set()
+ # Enumerate supported protocols first
+ for cipher in root.findall(".//cipher"):
+ protocol_versions.add(cipher.get("sslversion"))
+
+ # For each protocol, group ciphers by severity then raise a warning per severity
+ for protocol in protocol_versions:
+ log_blue(f"\nAccepted cipher suites for {protocol}:")
+ group_by_severity = defaultdict(list)
+ for cipher in root.findall(f".//cipher[@sslversion='{protocol}']"):
+ name = cipher.get("cipher")
+ group_by_severity[cipher.get("strength")].append(name)
+
+ logging.log(
+ sslscan_level_to_color(cipher.get("strength")),
+ f"* {name} {cipher.get('strength')}"
)
- log_green(message)
- yield INFO_LEVEL, message
- else:
- message = "Certificate transparency: No"
- log_red(message)
- yield HIGH_LEVEL, message
- if cert_deployment.verified_chain_has_sha1_signature:
- message = "One of the certificate in the chain is signed using SHA-1"
- log_red(message)
- yield HIGH_LEVEL, message
+ for security_level, ciphers in group_by_severity.items():
+ # We are using sslscan level in the report to be consistent with output from the tool
+ message = f"The following ciphers are {security_level.lower()} for {protocol}: {', '.join(sorted(ciphers))}"
+ yield sslscan_level_to_wapiti_level(security_level), message
- for validation_result in cert_deployment.path_validation_results:
- if not validation_result.was_validation_successful:
- message = (
- f"Certificate is invalid for {validation_result.trust_store.name} "
- f"trust store: {validation_result.openssl_error_string}"
- )
- log_red(message)
- yield CRITICAL_LEVEL, message
- # Currently we stop at the first certificate of the server, maybe improve later
- # Right now several certificates generates too much confusion in report
- break
+async def process_bad_protocols(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
+ # https://blog.mozilla.org/security/2014/10/14/the-poodle-attack-and-the-end-of-ssl-3-0/
+ # https://blog.qualys.com/product-tech/2018/11/19/grade-change-for-tls-1-0-and-tls-1-1-protocols
+ known_bad_protocols = {"SSLv2", "SSLv3", "TLSv1.0", "TLSv1.1"}
+
+ tree = ET.parse(xml_file)
+ root = tree.getroot()
+ bad_protocols = set()
+ for protocol in root.findall(".//protocol[@enabled='1']"):
+ name = f"{protocol.get('type').upper()}v{protocol.get('version')}"
+ if name in known_bad_protocols:
+ bad_protocols.add(name)
+
+ if bad_protocols:
+ message = "The following protocols are deprecated and/or insecure and should be deactivated: " + \
+ ", ".join(sorted(bad_protocols))
+ log_red(message)
+ yield CRITICAL_LEVEL, message
+
+
+def process_error(xml_file: str) -> str:
+ if not exists(xml_file):
+ return "sslscan did not generate the expected XML file"
+
+ try:
+ tree = ET.parse(xml_file)
+ root = tree.getroot()
+ error = root.find(".//error")
+ if error:
+ return error.text
+ return ""
+ except (ET.ParseError, FileNotFoundError, OSError, IOError) as exception:
+ return "Error parsing sslscan XML output:" + str(exception)
+
+
+async def process_vulnerabilities(xml_file: str) -> AsyncIterator[Tuple[int, str]]:
+ tree = ET.parse(xml_file)
+ root = tree.getroot()
+ vulnerable_protocols = set()
+ for protocol in root.findall(".//heartbleed[@vulnerable='1']"):
+ vulnerable_protocols.add(protocol.get("sslversion"))
+
+ if vulnerable_protocols:
+ message = f"Server is vulnerable to Heartbleed attack via {', '.join(vulnerable_protocols)}"
+ log_red(message)
+ yield CRITICAL_LEVEL, message
+
+ if root.find(".//compression[@supported='1']"):
+ message = "Server is vulnerable to CRIME attack (compression is supported)"
+ log_red(message)
+ yield CRITICAL_LEVEL, message
+
+ if root.find(".//fallback[@supported='1']"):
+ message = "Server is vulnerable to OpenSSL CCS (CVE-2014-0224)"
+ log_red(message)
+ yield CRITICAL_LEVEL, message
+
+ renegotiation = root.find(".//renegotiation")
+ if int(renegotiation.get("supported")) == 0:
+ message = "Server doesn't support secure renegotiations"
+ log_orange(message)
+ yield MEDIUM_LEVEL, message
+ elif int(renegotiation.get("secure")) == 0:
+ message = "Server honors client-initiated renegotiations (vulnerable to DoS attacks)"
+ log_red(message)
+ yield HIGH_LEVEL, message
def process_cipher_suites(results, version: str):
@@ -189,7 +289,7 @@ def process_cipher_suites(results, version: str):
continue
logging.log(
- cipher_level_to_color(security_level),
+ sslscan_level_to_color(security_level),
f"* {accepted_cipher_suite.cipher_suite.name} "
f"{accepted_cipher_suite.cipher_suite.openssl_name} "
# f"{accepted_cipher_suite.cipher_suite.key_size} "
@@ -200,146 +300,7 @@ def process_cipher_suites(results, version: str):
for security_level, ciphers in group_by_severity.items():
message = f"The following ciphers are {security_level.lower()} for {version}: {', '.join(sorted(ciphers))}"
- yield cipher_level_to_wapiti_level(security_level), message
-
-
-def analyze(hostname: str, port: int) -> List[Tuple[int, str]]:
- results = []
- # Define the server that you want to scan
- try:
- server_location = ServerNetworkLocation(hostname, port)
- except ServerHostnameCouldNotBeResolved:
- log_red(f"Could not resolve {hostname}")
- return results
-
- # Then queue some scan commands for the server
- scanner = Scanner()
- server_scan_req = ServerScanRequest(
- server_location=server_location,
- scan_commands={
- ScanCommand.CERTIFICATE_INFO,
- ScanCommand.SSL_2_0_CIPHER_SUITES,
- ScanCommand.SSL_3_0_CIPHER_SUITES,
- ScanCommand.TLS_1_0_CIPHER_SUITES,
- ScanCommand.TLS_1_1_CIPHER_SUITES,
- ScanCommand.TLS_1_2_CIPHER_SUITES,
- ScanCommand.TLS_1_3_CIPHER_SUITES,
- ScanCommand.ROBOT,
- ScanCommand.HEARTBLEED,
- ScanCommand.TLS_COMPRESSION,
- ScanCommand.TLS_FALLBACK_SCSV,
- ScanCommand.TLS_1_3_EARLY_DATA,
- ScanCommand.OPENSSL_CCS_INJECTION,
- ScanCommand.SESSION_RENEGOTIATION,
- ScanCommand.HTTP_HEADERS
- },
- network_configuration=ServerNetworkConfiguration(
- tls_server_name_indication=server_location.hostname,
- network_timeout=5,
- network_max_retries=2
- )
- )
- scanner.queue_scans([server_scan_req])
-
- # TLS 1.2 / 1.3 results
- good_protocols = {
- ScanCommand.TLS_1_2_CIPHER_SUITES: "TLS v1.2",
- ScanCommand.TLS_1_3_CIPHER_SUITES: "TLS v1.3"
- }
-
- # https://blog.mozilla.org/security/2014/10/14/the-poodle-attack-and-the-end-of-ssl-3-0/
- # https://blog.qualys.com/product-tech/2018/11/19/grade-change-for-tls-1-0-and-tls-1-1-protocols
- bad_protocols = {
- ScanCommand.SSL_2_0_CIPHER_SUITES: "SSL v2",
- ScanCommand.SSL_3_0_CIPHER_SUITES: "SSL v3",
- ScanCommand.TLS_1_0_CIPHER_SUITES: "TLS v1.0",
- ScanCommand.TLS_1_1_CIPHER_SUITES: "TLS v1.1"
- }
-
- # Then retrieve the results
- for result in scanner.get_results():
- log_blue(f"\nResults for {result.server_location.hostname}:")
- deprecated_protocols = []
-
- if result.connectivity_error_trace:
- # Stuff like connection timeout
- log_red(result.connectivity_error_trace)
- continue
-
- for scan_command in result.scan_result.__annotations__:
- scan_results = getattr(result.scan_result, scan_command)
-
- if scan_results.error_reason:
- log_red(scan_results.error_reason)
- continue
-
- if scan_results.status != ScanCommandAttemptStatusEnum.COMPLETED:
- continue
-
- if scan_command == ScanCommand.CERTIFICATE_INFO:
- for level, message in process_certificate_info(scan_results.result):
- results.append((level, message))
- elif scan_command in bad_protocols:
- if scan_results.result.accepted_cipher_suites:
- deprecated_protocols.append(bad_protocols[scan_command])
- elif scan_command == ScanCommand.ROBOT:
- if scan_results.result.robot_result in (
- RobotScanResultEnum.VULNERABLE_WEAK_ORACLE, RobotScanResultEnum.VULNERABLE_STRONG_ORACLE
- ):
- message = "Server is vulnerable to ROBOT attack"
- log_red(message)
- results.append((CRITICAL_LEVEL, message))
- elif scan_command == ScanCommand.HEARTBLEED:
- if scan_results.result.is_vulnerable_to_heartbleed:
- message = "Server is vulnerable to Heartbleed attack"
- log_red(message)
- results.append((CRITICAL_LEVEL, message))
- elif scan_command == ScanCommand.TLS_COMPRESSION:
- if scan_results.result.supports_compression:
- message = "Server is vulnerable to CRIME attack (compression is supported)"
- log_red(message)
- results.append((CRITICAL_LEVEL, message))
- elif scan_command == ScanCommand.TLS_FALLBACK_SCSV:
- if not scan_results.result.supports_fallback_scsv:
- message = "Server is vulnerable to downgrade attacks (support for TLS_FALLBACK_SCSV is missing)"
- log_red(message)
- results.append((CRITICAL_LEVEL, message))
- elif scan_command == ScanCommand.TLS_1_3_EARLY_DATA:
- # https://blog.trailofbits.com/2019/03/25/what-application-developers-need-to-know-about-tls-early-data-0rtt/
- if scan_results.result.supports_early_data:
- message = "TLS 1.3 Early Data (0RTT) is vulnerable to replay attacks"
- log_orange(message)
- results.append((MEDIUM_LEVEL, message))
- elif scan_command == ScanCommand.OPENSSL_CCS_INJECTION:
- if scan_results.result.is_vulnerable_to_ccs_injection:
- message = "Server is vulnerable to OpenSSL CCS (CVE-2014-0224)"
- log_red(message)
- results.append((CRITICAL_LEVEL, message))
- elif scan_command == ScanCommand.SESSION_RENEGOTIATION:
- if scan_results.result.is_vulnerable_to_client_renegotiation_dos:
- message = "Server honors client-initiated renegotiations (vulnerable to DoS attacks)"
- log_red(message)
- results.append((HIGH_LEVEL, message))
- if not scan_results.result.supports_secure_renegotiation:
- message = "Server doesn't support secure renegotiations"
- log_orange(message)
- results.append((MEDIUM_LEVEL, message))
- elif scan_command == ScanCommand.HTTP_HEADERS:
- if scan_results.result.strict_transport_security_header is None:
- message = "Strict Transport Security (HSTS) is not set"
- log_red(message)
- results.append((HIGH_LEVEL, message))
- elif scan_command in good_protocols:
- for level, message in process_cipher_suites(scan_results.result, good_protocols[scan_command]):
- results.append((level, message))
-
- if deprecated_protocols:
- message = "The following protocols are deprecated and/or insecure and should be deactivated: " + \
- ", ".join(deprecated_protocols)
- log_red(message)
- results.append((CRITICAL_LEVEL, message))
-
- return results
+ yield sslscan_level_to_wapiti_level(security_level), message
class ModuleSsl(Attack):
@@ -350,8 +311,19 @@ def __init__(self, crawler, persister, attack_options, stop_event, crawler_confi
Attack.__init__(self, crawler, persister, attack_options, stop_event, crawler_configuration)
# list to ensure only one occurrence per (vulnerable url/post_keys) tuple
self.tested_targets = set()
+ self.has_sslcan = None
async def must_attack(self, request: Request, response: Optional[Response] = None):
+ if self.has_sslcan is None:
+ if shutil.which("sslscan"):
+ self.has_sslcan = True
+ else:
+ log_red("sslscan is not installed or not found in PATH, module will be skipped")
+ self.has_sslcan = False
+
+ if not self.has_sslcan:
+ return False
+
if request.scheme != "https":
return False
@@ -366,11 +338,96 @@ async def must_attack(self, request: Request, response: Optional[Response] = Non
async def attack(self, request: Request, response: Optional[Response] = None):
loop = asyncio.get_running_loop()
- # sslyze use threads to launch scanners concurrently, so we put those inside an asyncio executor
- scan_results = await loop.run_in_executor(None, analyze, request.hostname, request.port)
+ scan_results = await loop.run_in_executor(None, self.process_sslscan, request.hostname, request.port)
- for level, message in scan_results:
+ async for level, message in scan_results:
if level == INFO_LEVEL:
await self.add_addition(category=NAME, request=request, info=message, wstg=WSTG_CODE)
else:
await self.add_vuln(level=level, category=NAME, request=request, info=message, wstg=WSTG_CODE)
+
+ async def check_hsts(self, hostname: str, port: int) -> int:
+ """
+ Checks if the given hostname supports HSTS.
+ """
+ try:
+ response = await self.crawler.async_send(Request(f'https://{hostname}:{port}'))
+ except RequestError:
+ return -1
+ return int('strict-transport-security' in response.headers)
+
+ async def check_certificate_transparency(self, cert: x509.Certificate) -> int:
+ """
+ Returns 1 if at least one CST exists, 0 otherwise. -1 if an error occurs.
+ """
+ serial_number = cert.serial_number
+ try:
+ # crt.sh should be able to provide JSON output, but currently it doesn't work
+ # moved to that simple check instead
+ response = await self.crawler.async_send(Request(f'https://crt.sh/?q={serial_number}'))
+ if response.status != 200:
+ return -1
+ if "None found" in response.content:
+ return 0
+ return 1
+ except RequestError:
+ return -1
+
+ async def process_cert_features(self, target: str, port: int) -> AsyncIterator[Tuple[int, str]]:
+ cert = get_certificate(target, port)
+
+ if not check_ev_certificate(cert):
+ message = "Certificate doesn't use Extended Validation"
+ log_orange(message)
+ yield MEDIUM_LEVEL, message
+
+ # https://en.wikipedia.org/wiki/OCSP_stapling
+ if not check_ocsp_must_staple(cert):
+ message = "OCSP Must-Staple extension is missing"
+ log_orange(message)
+ yield MEDIUM_LEVEL, message
+
+ has_sct = await self.check_certificate_transparency(cert)
+ if has_sct > 0:
+ message = "Certificate transparency: Yes"
+ log_green(message)
+ yield INFO_LEVEL, message
+ elif has_sct == 0:
+ message = "Certificate transparency: No"
+ log_red(message)
+ yield HIGH_LEVEL, message
+
+ if await self.check_hsts(target, port) == 0:
+ message = "Strict Transport Security (HSTS) is not set"
+ log_red(message)
+ yield HIGH_LEVEL, message
+
+ async def process_sslscan(self, hostname: str, port: int) -> AsyncIterator[Tuple[int, str]]:
+ with tempfile.NamedTemporaryFile("r", suffix=".xml", delete=False) as temp_file:
+ sslscan_command = ["sslscan", "--iana-names", "--ocsp", f"--xml={temp_file.name}", f"{hostname}:{port}"]
+ try:
+ subprocess.run(sslscan_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
+ except subprocess.CalledProcessError as exception:
+ log_red("Error running sslscan: " + str(exception))
+ return
+
+ error = process_error(temp_file.name)
+ if error:
+ log_red(error)
+ else:
+ async for info in process_cert_info(temp_file.name):
+ yield info
+ try:
+ async for info in self.process_cert_features(hostname, port):
+ yield info
+ except ssl.SSLError:
+ log_red("Could not get extra information about the certificate due to SSL errors")
+ except (socket.timeout, socket.gaierror):
+ log_red("Could not get extra information about the certificate due to network errors")
+ async for info in process_vulnerabilities(temp_file.name):
+ yield info
+ async for info in process_cipher_suites2(temp_file.name):
+ yield info
+ async for info in process_bad_protocols(temp_file.name):
+ yield info
+ os.unlink(temp_file.name)