diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index 78fe36ca..101857b2 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -4,7 +4,8 @@ on: push: branches: [ release ] pull_request: - branches: [ release ] + branches: [ release, dev ] + workflow_dispatch: jobs: build: diff --git a/dev-requirements.txt b/dev-requirements.txt index 2517bdba..677514f3 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -mypy==0.981 +mypy==1.4.1 flake8 invoke>=2,<3 pytest==7.2.2 diff --git a/sslyze/cli/console_output.py b/sslyze/cli/console_output.py index 6dd52aac..39214093 100644 --- a/sslyze/cli/console_output.py +++ b/sslyze/cli/console_output.py @@ -69,6 +69,7 @@ def server_scan_completed(self, server_scan_result: ServerScanResult) -> None: if server_scan_result.scan_status != ServerScanStatusEnum.COMPLETED: # Nothing to print here if the scan was not completed return + assert server_scan_result.scan_result # Must be set if scan_status == ServerScanStatusEnum.COMPLETED # Generate the console output for each scan command scan_command_results_str = "" diff --git a/sslyze/json/json_output.py b/sslyze/json/json_output.py index 9076552f..14a1ffe9 100644 --- a/sslyze/json/json_output.py +++ b/sslyze/json/json_output.py @@ -138,7 +138,8 @@ def from_orm(cls, tls_probing_result: ServerTlsProbingResult) -> "_ServerTlsProb ) -_ServerTlsProbingResultAsJson.__doc__ = ServerTlsProbingResult.__doc__ # type: ignore +assert ServerTlsProbingResult.__doc__ +_ServerTlsProbingResultAsJson.__doc__ = ServerTlsProbingResult.__doc__ class _ServerNetworkConfigurationAsJson(BaseModelWithOrmModeAndForbid): @@ -152,7 +153,8 @@ class _ServerNetworkConfigurationAsJson(BaseModelWithOrmModeAndForbid): network_max_retries: int = 3 -_ServerNetworkConfigurationAsJson.__doc__ = ServerNetworkConfiguration.__doc__ # type: ignore +assert ServerNetworkConfiguration.__doc__ +_ServerNetworkConfigurationAsJson.__doc__ = ServerNetworkConfiguration.__doc__ class _ServerNetworkLocationAsJson(BaseModelWithOrmModeAndForbid): @@ -160,10 +162,11 @@ class _ServerNetworkLocationAsJson(BaseModelWithOrmModeAndForbid): port: int connection_type: ConnectionTypeEnum ip_address: Optional[str] = None - http_proxy_settings: Optional[_HttpProxySettingsAsJson] = None # type: ignore + http_proxy_settings: Optional[_HttpProxySettingsAsJson] = None -_ServerNetworkLocationAsJson.__doc__ = ServerNetworkLocation.__doc__ # type: ignore +assert ServerNetworkLocation.__doc__ +_ServerNetworkLocationAsJson.__doc__ = ServerNetworkLocation.__doc__ class ServerScanResultAsJson(BaseModelWithOrmModeAndForbid): @@ -210,7 +213,8 @@ def from_orm(cls, server_scan_result: ServerScanResult) -> "ServerScanResultAsJs ) -ServerScanResultAsJson.__doc__ = ServerScanResult.__doc__ # type: ignore +assert ServerScanResult.__doc__ +ServerScanResultAsJson.__doc__ = ServerScanResult.__doc__ class InvalidServerStringAsJson(BaseModelWithOrmModeAndForbid): diff --git a/sslyze/plugins/certificate_info/json_output.py b/sslyze/plugins/certificate_info/json_output.py index 897170aa..3a020af5 100644 --- a/sslyze/plugins/certificate_info/json_output.py +++ b/sslyze/plugins/certificate_info/json_output.py @@ -29,7 +29,8 @@ class CertificateInfoExtraArgumentAsJson(BaseModelWithOrmMode): custom_ca_file: Path -CertificateInfoExtraArgumentAsJson.__doc__ = CertificateInfoExtraArgument.__doc__ # type: ignore +assert CertificateInfoExtraArgument.__doc__ +CertificateInfoExtraArgumentAsJson.__doc__ = CertificateInfoExtraArgument.__doc__ class _PublicKeyAsJson(BaseModelWithOrmMode): @@ -232,7 +233,8 @@ class _TrustStoreAsJson(BaseModelWithOrmMode): ev_oids: Optional[List[_ObjectIdentifierAsJson]] -_TrustStoreAsJson.__doc__ = TrustStore.__doc__ # type: ignore +assert TrustStore.__doc__ +_TrustStoreAsJson.__doc__ = TrustStore.__doc__ class _PathValidationResultAsJson(BaseModelWithOrmMode): @@ -242,7 +244,8 @@ class _PathValidationResultAsJson(BaseModelWithOrmMode): was_validation_successful: bool -_PathValidationResultAsJson.__doc__ = PathValidationResult.__doc__ # type: ignore +assert PathValidationResult.__doc__ +_PathValidationResultAsJson.__doc__ = PathValidationResult.__doc__ class _CertificateDeploymentAnalysisResultAsJson(BaseModelWithOrmMode): @@ -264,7 +267,8 @@ class _CertificateDeploymentAnalysisResultAsJson(BaseModelWithOrmMode): verified_certificate_chain: Optional[List[_CertificateAsJson]] -_CertificateDeploymentAnalysisResultAsJson.__doc__ = CertificateDeploymentAnalysisResult.__doc__ # type: ignore +assert CertificateDeploymentAnalysisResult.__doc__ +_CertificateDeploymentAnalysisResultAsJson.__doc__ = CertificateDeploymentAnalysisResult.__doc__ class CertificateInfoScanResultAsJson(BaseModelWithOrmMode): @@ -272,7 +276,8 @@ class CertificateInfoScanResultAsJson(BaseModelWithOrmMode): certificate_deployments: List[_CertificateDeploymentAnalysisResultAsJson] -CertificateInfoScanResultAsJson.__doc__ = CertificateInfoScanResult.__doc__ # type: ignore +assert CertificateInfoScanResult.__doc__ +CertificateInfoScanResultAsJson.__doc__ = CertificateInfoScanResult.__doc__ class CertificateInfoScanAttemptAsJson(ScanCommandAttemptAsJson): diff --git a/sslyze/plugins/compression_plugin.py b/sslyze/plugins/compression_plugin.py index 361ee5c4..7404c6aa 100755 --- a/sslyze/plugins/compression_plugin.py +++ b/sslyze/plugins/compression_plugin.py @@ -36,7 +36,7 @@ class CompressionScanResultAsJson(BaseModelWithOrmModeAndForbid): class CompressionScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[CompressionScanResultAsJson] # type: ignore + result: Optional[CompressionScanResultAsJson] class _CompressionCliConnector(ScanCommandCliConnector[CompressionScanResult, None]): diff --git a/sslyze/plugins/early_data_plugin.py b/sslyze/plugins/early_data_plugin.py index d4fa4a5a..0cbd6171 100644 --- a/sslyze/plugins/early_data_plugin.py +++ b/sslyze/plugins/early_data_plugin.py @@ -37,7 +37,7 @@ class EarlyDataScanResultAsJson(BaseModelWithOrmModeAndForbid): class EarlyDataScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[EarlyDataScanResultAsJson] # type: ignore + result: Optional[EarlyDataScanResultAsJson] class _EarlyDataCliConnector(ScanCommandCliConnector[EarlyDataScanResult, None]): diff --git a/sslyze/plugins/elliptic_curves_plugin.py b/sslyze/plugins/elliptic_curves_plugin.py index 8735ac81..4269abb8 100644 --- a/sslyze/plugins/elliptic_curves_plugin.py +++ b/sslyze/plugins/elliptic_curves_plugin.py @@ -64,7 +64,8 @@ class _EllipticCurveAsJson(pydantic.BaseModel): openssl_nid: int -_EllipticCurveAsJson.__doc__ = EllipticCurve.__doc__ # type: ignore +assert EllipticCurve.__doc__ +_EllipticCurveAsJson.__doc__ = EllipticCurve.__doc__ class SupportedEllipticCurvesScanResultAsJson(pydantic.BaseModel): @@ -92,7 +93,8 @@ def from_orm(cls, result: SupportedEllipticCurvesScanResult) -> "SupportedEllipt ) -SupportedEllipticCurvesScanResultAsJson.__doc__ = SupportedEllipticCurvesScanResult.__doc__ # type: ignore +assert SupportedEllipticCurvesScanResult.__doc__ +SupportedEllipticCurvesScanResultAsJson.__doc__ = SupportedEllipticCurvesScanResult.__doc__ class SupportedEllipticCurvesScanAttemptAsJson(ScanCommandAttemptAsJson): diff --git a/sslyze/plugins/fallback_scsv_plugin.py b/sslyze/plugins/fallback_scsv_plugin.py index 5ef0f637..c96b6cdb 100755 --- a/sslyze/plugins/fallback_scsv_plugin.py +++ b/sslyze/plugins/fallback_scsv_plugin.py @@ -35,7 +35,7 @@ class FallbackScsvScanResultAsJson(BaseModelWithOrmModeAndForbid): class FallbackScsvScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[FallbackScsvScanResultAsJson] # type: ignore + result: Optional[FallbackScsvScanResultAsJson] class _FallbackScsvCliConnector(ScanCommandCliConnector[FallbackScsvScanResult, None]): diff --git a/sslyze/plugins/heartbleed_plugin.py b/sslyze/plugins/heartbleed_plugin.py index 11bfbb63..5e559dd3 100644 --- a/sslyze/plugins/heartbleed_plugin.py +++ b/sslyze/plugins/heartbleed_plugin.py @@ -42,7 +42,7 @@ class HeartbleedScanResultAsJson(BaseModelWithOrmModeAndForbid): class HeartbleedScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[HeartbleedScanResultAsJson] # type: ignore + result: Optional[HeartbleedScanResultAsJson] class _HeartbleedCliConnector(ScanCommandCliConnector[HeartbleedScanResult, None]): diff --git a/sslyze/plugins/http_headers_plugin.py b/sslyze/plugins/http_headers_plugin.py index 9e5dba59..c5687f92 100755 --- a/sslyze/plugins/http_headers_plugin.py +++ b/sslyze/plugins/http_headers_plugin.py @@ -10,6 +10,7 @@ # TODO: Fix type annotations in nassl from nassl._nassl import SslError # type: ignore +from sslyze.json.pydantic_utils import BaseModelWithOrmMode from sslyze.json.scan_attempt_json import ScanCommandAttemptAsJson from sslyze.plugins.plugin_base import ( ScanCommandImplementation, @@ -90,10 +91,11 @@ class _StrictTransportSecurityHeaderAsJson(pydantic.BaseModel): include_subdomains: bool -_StrictTransportSecurityHeaderAsJson.__doc__ = StrictTransportSecurityHeader.__doc__ # type: ignore +assert StrictTransportSecurityHeader.__doc__ +_StrictTransportSecurityHeaderAsJson.__doc__ = StrictTransportSecurityHeader.__doc__ -class HttpHeadersScanResultAsJson(pydantic.BaseModel): +class HttpHeadersScanResultAsJson(BaseModelWithOrmMode): http_request_sent: str http_error_trace: Optional[str] @@ -101,9 +103,6 @@ class HttpHeadersScanResultAsJson(pydantic.BaseModel): strict_transport_security_header: Optional[_StrictTransportSecurityHeaderAsJson] expect_ct_header: None = None # TODO(6.0.0): Remove as this is a deprecated field - class Config: - orm_mode = True - @classmethod def from_orm(cls, result: HttpHeadersScanResult) -> "HttpHeadersScanResultAsJson": http_error_trace_as_str = None @@ -124,7 +123,8 @@ def from_orm(cls, result: HttpHeadersScanResult) -> "HttpHeadersScanResultAsJson ) -HttpHeadersScanResultAsJson.__doc__ = HttpHeadersScanResult.__doc__ # type: ignore +assert HttpHeadersScanResult.__doc__ +HttpHeadersScanResultAsJson.__doc__ = HttpHeadersScanResult.__doc__ class HttpHeadersScanAttemptAsJson(ScanCommandAttemptAsJson): diff --git a/sslyze/plugins/openssl_ccs_injection_plugin.py b/sslyze/plugins/openssl_ccs_injection_plugin.py index a823fb86..1009bef7 100644 --- a/sslyze/plugins/openssl_ccs_injection_plugin.py +++ b/sslyze/plugins/openssl_ccs_injection_plugin.py @@ -43,7 +43,7 @@ class OpenSslCcsInjectionScanResultAsJson(BaseModelWithOrmModeAndForbid): class OpenSslCcsInjectionScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[OpenSslCcsInjectionScanResultAsJson] # type: ignore + result: Optional[OpenSslCcsInjectionScanResultAsJson] class _OpenSslCcsInjectionCliConnector(ScanCommandCliConnector[OpenSslCcsInjectionScanResult, None]): diff --git a/sslyze/plugins/openssl_cipher_suites/json_output.py b/sslyze/plugins/openssl_cipher_suites/json_output.py index 3d6e1500..3cb29392 100644 --- a/sslyze/plugins/openssl_cipher_suites/json_output.py +++ b/sslyze/plugins/openssl_cipher_suites/json_output.py @@ -73,7 +73,8 @@ class _CipherSuiteAcceptedByServerAsJson(BaseModelWithOrmMode): ephemeral_key: Optional[_EphemeralKeyInfoAsJson] -_CipherSuiteAcceptedByServerAsJson.__doc__ = CipherSuiteAcceptedByServer.__doc__ # type: ignore +assert CipherSuiteAcceptedByServer.__doc__ +_CipherSuiteAcceptedByServerAsJson.__doc__ = CipherSuiteAcceptedByServer.__doc__ class _CipherSuiteRejectedByServerAsJson(BaseModelWithOrmMode): @@ -98,7 +99,8 @@ def from_orm(cls, scan_result: CipherSuitesScanResult) -> "CipherSuitesScanResul ) -CipherSuitesScanResultAsJson.__doc__ = CipherSuitesScanResult.__doc__ # type: ignore +assert CipherSuitesScanResult.__doc__ +CipherSuitesScanResultAsJson.__doc__ = CipherSuitesScanResult.__doc__ class CipherSuitesScanAttemptAsJson(ScanCommandAttemptAsJson): diff --git a/sslyze/plugins/robot/implementation.py b/sslyze/plugins/robot/implementation.py index 534f9d84..d7e5d8f5 100644 --- a/sslyze/plugins/robot/implementation.py +++ b/sslyze/plugins/robot/implementation.py @@ -39,7 +39,7 @@ class RobotScanResultAsJson(BaseModelWithOrmModeAndForbid): class RobotScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[RobotScanResultAsJson] # type: ignore + result: Optional[RobotScanResultAsJson] class _RobotCliConnector(ScanCommandCliConnector[RobotScanResult, None]): diff --git a/sslyze/plugins/session_renegotiation_plugin.py b/sslyze/plugins/session_renegotiation_plugin.py index 6aebee73..da84a0f3 100755 --- a/sslyze/plugins/session_renegotiation_plugin.py +++ b/sslyze/plugins/session_renegotiation_plugin.py @@ -40,7 +40,7 @@ class SessionRenegotiationScanResultAsJson(BaseModelWithOrmModeAndForbid): class SessionRenegotiationScanAttemptAsJson(ScanCommandAttemptAsJson): - result: Optional[SessionRenegotiationScanResultAsJson] # type: ignore + result: Optional[SessionRenegotiationScanResultAsJson] class _ScanJobResultEnum(Enum): diff --git a/sslyze/plugins/session_resumption/implementation.py b/sslyze/plugins/session_resumption/implementation.py index bacf4ace..9a351ce1 100644 --- a/sslyze/plugins/session_resumption/implementation.py +++ b/sslyze/plugins/session_resumption/implementation.py @@ -171,7 +171,9 @@ def _process_resumption_attempt_results( return result, successful_attempts_count, total_attempts_count -class SessionResumptionSupportImplementation(ScanCommandImplementation[SessionResumptionSupportScanResult, None]): +class SessionResumptionSupportImplementation( + ScanCommandImplementation[SessionResumptionSupportScanResult, SessionResumptionSupportExtraArgument] +): """Test a server for session resumption support using session IDs and TLS tickets.""" cli_connector_cls = _SessionResumptionSupportCliConnector diff --git a/sslyze/plugins/session_resumption/json_output.py b/sslyze/plugins/session_resumption/json_output.py index 2471172b..d9e9ab21 100644 --- a/sslyze/plugins/session_resumption/json_output.py +++ b/sslyze/plugins/session_resumption/json_output.py @@ -9,7 +9,8 @@ class SessionResumptionSupportExtraArgumentAsJson(BaseModelWithOrmModeAndForbid) number_of_resumptions_to_attempt: int -SessionResumptionSupportExtraArgumentAsJson.__doc__ = SessionResumptionSupportExtraArgument.__doc__ # type: ignore +assert SessionResumptionSupportExtraArgument.__doc__ +SessionResumptionSupportExtraArgumentAsJson.__doc__ = SessionResumptionSupportExtraArgument.__doc__ class SessionResumptionSupportScanResultAsJson(BaseModelWithOrmModeAndForbid): @@ -22,7 +23,8 @@ class SessionResumptionSupportScanResultAsJson(BaseModelWithOrmModeAndForbid): tls_ticket_successful_resumptions_count: int -SessionResumptionSupportScanResultAsJson.__doc__ = SessionResumptionSupportScanResult.__doc__ # type: ignore +assert SessionResumptionSupportScanResult.__doc__ +SessionResumptionSupportScanResultAsJson.__doc__ = SessionResumptionSupportScanResult.__doc__ class SessionResumptionSupportScanAttemptAsJson(ScanCommandAttemptAsJson): diff --git a/sslyze/scanner/_jobs_worker_thread.py b/sslyze/scanner/_jobs_worker_thread.py index 76e3eef3..34685e31 100644 --- a/sslyze/scanner/_jobs_worker_thread.py +++ b/sslyze/scanner/_jobs_worker_thread.py @@ -1,12 +1,20 @@ import threading from dataclasses import dataclass import queue -from typing import Optional, Any, Callable, Sequence +from typing import Optional, Any, Callable, Sequence, Union from uuid import UUID from sslyze.plugins.scan_commands import ScanCommand +try: + # Python 3.10+ + from typing import TypeAlias # type: ignore +except ImportError: + # Python 3.9 and before + from typing_extensions import TypeAlias # type: ignore + + @dataclass(frozen=True) class CompletedScanJob: parent_server_scan_request_uuid: UUID @@ -29,10 +37,11 @@ class WorkerThreadNoMoreJobsSentinel: pass +WorkerQueueType: TypeAlias = "queue.Queue[Union[WorkerThreadNoMoreJobsSentinel, QueuedScanJob]]" + + class JobsWorkerThread(threading.Thread): - def __init__( - self, jobs_queue_in: "queue.Queue[QueuedScanJob]", completed_jobs_queue_out: "queue.Queue[CompletedScanJob]" - ): + def __init__(self, jobs_queue_in: WorkerQueueType, completed_jobs_queue_out: "queue.Queue[CompletedScanJob]"): super().__init__() self._jobs_queue_in = jobs_queue_in self._completed_jobs_queue_out = completed_jobs_queue_out diff --git a/sslyze/scanner/_mass_connectivity_tester.py b/sslyze/scanner/_mass_connectivity_tester.py index b44f9f7d..d528c51a 100644 --- a/sslyze/scanner/_mass_connectivity_tester.py +++ b/sslyze/scanner/_mass_connectivity_tester.py @@ -6,6 +6,14 @@ from sslyze.errors import ConnectionToServerFailed from sslyze.server_connectivity import check_connectivity_to_server +try: + # Python 3.10+ + from typing import TypeAlias # type: ignore +except ImportError: + # Python 3.9 and before + from typing_extensions import TypeAlias # type: ignore + + _ServerConnectivityTestingResult = Union[ServerTlsProbingResult, ConnectionToServerFailed] @@ -13,6 +21,16 @@ ServerConnectivityTestErrorCallback = Callable[[ServerScanRequest, ConnectionToServerFailed], None] +class _NoMoreWorkSentinel: + pass + + +_ScanRequestsQueueType: TypeAlias = "queue.Queue[Union[_NoMoreWorkSentinel, ServerScanRequest]]" +_ResultsQueueType: TypeAlias = ( + "queue.Queue[Union[_NoMoreWorkSentinel, Tuple[ServerScanRequest, _ServerConnectivityTestingResult]]]" +) + + class MassConnectivityTester: """Wrapper around check_connectivity_to_server() for concurrent/mass testing.""" @@ -20,8 +38,8 @@ def __init__(self, concurrent_server_scans_count: int) -> None: if concurrent_server_scans_count < 1: raise ValueError() - self._scan_requests_queue: "queue.Queue[ServerScanRequest]" = queue.Queue() - self._results_queue: "queue.Queue[Tuple[ServerScanRequest, _ServerConnectivityTestingResult]]" = queue.Queue() + self._scan_requests_queue: _ScanRequestsQueueType = queue.Queue() + self._results_queue: _ResultsQueueType = queue.Queue() self._all_worker_threads = [ _ConnectivityTesterThread( scan_requests_queue_in=self._scan_requests_queue, @@ -45,7 +63,7 @@ def start_work(self, server_scan_requests: List[ServerScanRequest]) -> None: # Notify workers when all work as been completed for _ in self._all_worker_threads: - self._scan_requests_queue.put(_NoMoreWorkSentinel()) # type: ignore + self._scan_requests_queue.put(_NoMoreWorkSentinel()) def wait_until_all_work_was_processed( self, @@ -76,15 +94,11 @@ def wait_until_all_work_was_processed( worker_thread.join() -class _NoMoreWorkSentinel: - pass - - class _ConnectivityTesterThread(threading.Thread): def __init__( self, - scan_requests_queue_in: "queue.Queue[ServerScanRequest]", - results_queue_out: "queue.Queue[Tuple[ServerScanRequest, _ServerConnectivityTestingResult]]", + scan_requests_queue_in: _ScanRequestsQueueType, + results_queue_out: _ResultsQueueType, ): super().__init__() self._scan_requests_queue_in = scan_requests_queue_in @@ -97,7 +111,7 @@ def run(self) -> None: # If there are no more jobs to complete, notify the parent and shutdown the thread if isinstance(scan_request, _NoMoreWorkSentinel): - self._results_queue_out.put(_NoMoreWorkSentinel()) # type: ignore + self._results_queue_out.put(_NoMoreWorkSentinel()) self._scan_requests_queue_in.task_done() return diff --git a/sslyze/scanner/_mass_scanner.py b/sslyze/scanner/_mass_scanner.py index 4adbc051..04cbaba2 100644 --- a/sslyze/scanner/_mass_scanner.py +++ b/sslyze/scanner/_mass_scanner.py @@ -3,7 +3,7 @@ import queue from time import sleep from traceback import TracebackException -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Union from uuid import UUID from nassl.ssl_client import ClientCertificateRequested @@ -17,6 +17,7 @@ CompletedScanJob, QueuedScanJob, JobsWorkerThread, + WorkerQueueType, ) from sslyze.scanner.models import ( ServerScanRequest, @@ -29,6 +30,13 @@ from sslyze.scanner.scan_command_attempt import ScanCommandAttempt from sslyze.server_connectivity import ServerConnectivityInfo +try: + # Python 3.10+ + from typing import TypeAlias # type: ignore +except ImportError: + # Python 3.9 and before + from typing_extensions import TypeAlias # type: ignore + @dataclass(frozen=True) class _OngoingServerScan: @@ -48,6 +56,12 @@ class NoMoreServerScanRequestsSentinel: pass +ServerScanRequestsQueueType: TypeAlias = ( + "queue.Queue[Union[NoMoreServerScanRequestsSentinel, Tuple[ServerScanRequest, ServerTlsProbingResult]]]" +) +ServerScanResultsQueueType: TypeAlias = "queue.Queue[Union[NoMoreServerScanRequestsSentinel, ServerScanResult]]" + + class MassScannerProducerThread(threading.Thread): """Thread to continuously process more scans and the corresponding jobs as previous ones get completed. @@ -61,8 +75,8 @@ def __init__( self, concurrent_server_scans_count: int, per_server_concurrent_connections_count: int, - server_scan_requests_queue_in: "queue.Queue[Tuple[ServerScanRequest, ServerTlsProbingResult]]", - server_scan_results_queue_out: "queue.Queue[ServerScanResult]", + server_scan_requests_queue_in: ServerScanRequestsQueueType, + server_scan_results_queue_out: ServerScanResultsQueueType, ): super().__init__() self._server_scan_results_queue_out = server_scan_results_queue_out @@ -70,10 +84,8 @@ def __init__( self.daemon = True # Shutdown the thread if the program is exiting early (ie. ctrl+c) # Create internal threads and queues for dispatching jobs - self._completed_jobs_queue: "queue.Queue[CompletedScanJob]" = queue.Queue() - self._all_worker_queues: List["queue.Queue[QueuedScanJob]"] = [ - queue.Queue() for _ in range(concurrent_server_scans_count) - ] + self._completed_jobs_queue: queue.Queue[CompletedScanJob] = queue.Queue() + self._all_worker_queues: List[WorkerQueueType] = [queue.Queue() for _ in range(concurrent_server_scans_count)] self._worker_threads_per_queues_count = per_server_concurrent_connections_count self._all_worker_threads = [] for worker_queue in self._all_worker_queues: @@ -156,20 +168,20 @@ def run(self) -> None: self._completed_jobs_queue.join() for worker_queue in self._all_worker_queues: for _ in range(self._worker_threads_per_queues_count): - worker_queue.put(WorkerThreadNoMoreJobsSentinel()) # type: ignore + worker_queue.put(WorkerThreadNoMoreJobsSentinel()) worker_queue.join() for worker_thread in self._all_worker_threads: worker_thread.join() - self._server_scan_results_queue_out.put(NoMoreServerScanRequestsSentinel()) # type: ignore + self._server_scan_results_queue_out.put(NoMoreServerScanRequestsSentinel()) self._server_scan_results_queue_out.join() def _queue_server_scan( server_scan_request: ServerScanRequest, server_connectivity_result: ServerTlsProbingResult, - assigned_worker_queue: "queue.Queue[QueuedScanJob]", + assigned_worker_queue: WorkerQueueType, ) -> _OngoingServerScan: # Queue all the underlying jobs for this server scan all_scan_jobs_per_scan_cmd, scan_command_errors_during_queuing = _generate_scan_jobs_for_server_scan( diff --git a/sslyze/scanner/scanner.py b/sslyze/scanner/scanner.py index db805420..125b563d 100644 --- a/sslyze/scanner/scanner.py +++ b/sslyze/scanner/scanner.py @@ -1,6 +1,6 @@ import queue from traceback import TracebackException -from typing import List, Optional, Generator, Tuple, Sequence +from typing import List, Optional, Generator, Sequence from sslyze import ServerTlsProbingResult from sslyze.errors import ConnectionToServerFailed @@ -8,6 +8,8 @@ from sslyze.scanner._mass_scanner import ( MassScannerProducerThread, NoMoreServerScanRequestsSentinel, + ServerScanRequestsQueueType, + ServerScanResultsQueueType, ) from sslyze.scanner.models import ( ServerScanRequest, @@ -67,8 +69,8 @@ def get_results(self) -> Generator[ServerScanResult, None, None]: raise ValueError("No scan requests have been submitted") # Setup the queues for running and completing the scans - server_scan_requests_queue: "queue.Queue[Tuple[ServerScanRequest, ServerTlsProbingResult]]" = queue.Queue() - server_scan_results_queue: "queue.Queue[ServerScanResult]" = queue.Queue() + server_scan_requests_queue: ServerScanRequestsQueueType = queue.Queue() + server_scan_results_queue: ServerScanResultsQueueType = queue.Queue() def server_connectivity_test_completed_callback( server_scan_request: ServerScanRequest, connectivity_result: ServerTlsProbingResult @@ -115,7 +117,7 @@ def server_connectivity_test_error_callback( ) # Notify the MassScanner that all the scan requests have been queued - server_scan_requests_queue.put(NoMoreServerScanRequestsSentinel()) # type: ignore + server_scan_requests_queue.put(NoMoreServerScanRequestsSentinel()) # Wait for all scans to finish while True: diff --git a/tests/plugins_tests/openssl_cipher_suites/test_openssl_cipher_suites_plugin.py b/tests/plugins_tests/openssl_cipher_suites/test_openssl_cipher_suites_plugin.py index 6e268e6d..c3f7b47a 100644 --- a/tests/plugins_tests/openssl_cipher_suites/test_openssl_cipher_suites_plugin.py +++ b/tests/plugins_tests/openssl_cipher_suites/test_openssl_cipher_suites_plugin.py @@ -24,7 +24,7 @@ # https://github.com/nabla-c0d3/sslyze/issues/338 @pytest.mark.skip("Re-enable these tests when implementing cipher suite preference (#338)") class DisabledTestCipherSuitePreference: - def test_cipher_suite_preferred_by_server(self): + def test_cipher_suite_preferred_by_server(self) -> None: # Given an ordered list of cipher suites configured_cipher_suites = [ "ECDHE-RSA-CHACHA20-POLY1305", @@ -57,10 +57,11 @@ def test_cipher_suite_preferred_by_server(self): result: CipherSuitesScanResult = Tlsv12ScanImplementation.scan_server(server_info) # And the server's cipher suite preference was detected - assert result.cipher_suite_preferred_by_server - assert configured_cipher_suites[0] == result.cipher_suite_preferred_by_server.cipher_suite.openssl_name + pref_by_server = result.cipher_suite_preferred_by_server # type: ignore + assert pref_by_server + assert configured_cipher_suites[0] == pref_by_server.cipher_suite.openssl_name - def test_follows_client_cipher_suite_preference(self): + def test_follows_client_cipher_suite_preference(self) -> None: # Given a server to scan that follows client cipher suite preference server_location = ServerNetworkLocation("www.hotmail.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -69,11 +70,11 @@ def test_follows_client_cipher_suite_preference(self): result: CipherSuitesScanResult = Tlsv12ScanImplementation.scan_server(server_info) # And the server is detected as following the client's preference - assert result.cipher_suite_preferred_by_server + assert result.cipher_suite_preferred_by_server # type: ignore class TestCipherSuitesPluginWithOnlineServer: - def test_sslv2_disabled(self): + def test_sslv2_disabled(self) -> None: # Given a server to scan that does not support SSL 2.0 server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -85,7 +86,7 @@ def test_sslv2_disabled(self): assert not result.accepted_cipher_suites assert result.rejected_cipher_suites - def test_sslv3_disabled(self): + def test_sslv3_disabled(self) -> None: # Given a server to scan that does not support SSL 3.0 server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -97,7 +98,7 @@ def test_sslv3_disabled(self): assert not result.accepted_cipher_suites assert result.rejected_cipher_suites - def test_tlsv1_0_enabled(self): + def test_tlsv1_0_enabled(self) -> None: # Given a server to scan that supports TLS 1.0 server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -119,7 +120,7 @@ def test_tlsv1_0_enabled(self): assert result.rejected_cipher_suites - def test_tlsv1_0_disabled(self): + def test_tlsv1_0_disabled(self) -> None: # Given a server to scan that does NOT support TLS 1.0 server_location = ServerNetworkLocation("success.trendmicro.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -131,7 +132,7 @@ def test_tlsv1_0_disabled(self): assert not result.accepted_cipher_suites assert result.rejected_cipher_suites - def test_tlsv1_1_enabled(self): + def test_tlsv1_1_enabled(self) -> None: # Given a server to scan that supports TLS 1.1 server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -153,7 +154,7 @@ def test_tlsv1_1_enabled(self): assert result.rejected_cipher_suites - def test_tlsv1_2_enabled(self): + def test_tlsv1_2_enabled(self) -> None: # Given a server to scan that supports TLS 1.2 server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -179,7 +180,7 @@ def test_tlsv1_2_enabled(self): accepted_cipher.cipher_suite.name for accepted_cipher in result.accepted_cipher_suites } - def test_null_cipher_suites(self): + def test_null_cipher_suites(self) -> None: # Given a server to scan that supports NULL cipher suites server_location = ServerNetworkLocation("null.badssl.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -209,7 +210,7 @@ def test_null_cipher_suites(self): accepted_cipher.cipher_suite.name for accepted_cipher in result.accepted_cipher_suites } - def test_rc4_cipher_suites(self): + def test_rc4_cipher_suites(self) -> None: # Given a server to scan that supports RC4 cipher suites server_location = ServerNetworkLocation("rc4.badssl.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -222,7 +223,7 @@ def test_rc4_cipher_suites(self): accepted_cipher.cipher_suite.name for accepted_cipher in result.accepted_cipher_suites } - def test_ecdsa_cipher_suites(self): + def test_ecdsa_cipher_suites(self) -> None: # Given a server to scan that supports ECDSA cipher suites server_location = ServerNetworkLocation("ecc256.badssl.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -244,7 +245,7 @@ def test_ecdsa_cipher_suites(self): accepted_cipher.cipher_suite.name for accepted_cipher in result.accepted_cipher_suites } - def test_smtp(self): + def test_smtp(self) -> None: # Given an SMTP server to scan hostname = "smtp.gmail.com" server_location = ServerNetworkLocation(hostname, 587) @@ -257,7 +258,7 @@ def test_smtp(self): result: CipherSuitesScanResult = Tlsv12ScanImplementation.scan_server(server_info) assert result.accepted_cipher_suites - def test_tls_1_3_cipher_suites(self): + def test_tls_1_3_cipher_suites(self) -> None: # Given a server to scan that supports TLS 1.3 server_location = ServerNetworkLocation("www.cloudflare.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -270,7 +271,7 @@ def test_tls_1_3_cipher_suites(self): accepted_cipher.cipher_suite.name for accepted_cipher in result.accepted_cipher_suites } - def test_ephemeral_key_info(self): + def test_ephemeral_key_info(self) -> None: # Given a server to scan that supports DH and ECDH ephemeral keys server_location = ServerNetworkLocation("cloudflare.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -290,7 +291,7 @@ def test_ephemeral_key_info(self): @can_only_run_on_linux_64 class TestCipherSuitesPluginWithLocalServer: - def test_sslv2_enabled(self): + def test_sslv2_enabled(self) -> None: # Given a server to scan that supports SSL 2.0 with LegacyOpenSslServer(openssl_cipher_string="ALL:COMPLEMENTOFALL") as server: server_location = ServerNetworkLocation( @@ -305,7 +306,7 @@ def test_sslv2_enabled(self): assert len(result.accepted_cipher_suites) == 7 assert not result.rejected_cipher_suites - def test_sslv3_enabled(self): + def test_sslv3_enabled(self) -> None: # Given a server to scan that supports SSL 3.0 with LegacyOpenSslServer(openssl_cipher_string="ALL:COMPLEMENTOFALL") as server: server_location = ServerNetworkLocation( @@ -320,7 +321,7 @@ def test_sslv3_enabled(self): assert len(result.accepted_cipher_suites) == 43 assert result.rejected_cipher_suites - def test_succeeds_when_client_auth_failed_tls_1_2(self): + def test_succeeds_when_client_auth_failed_tls_1_2(self) -> None: # Given a TLS 1.2 server that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And SSLyze does NOT provide a client certificate @@ -334,7 +335,7 @@ def test_succeeds_when_client_auth_failed_tls_1_2(self): assert result.accepted_cipher_suites - def test_succeeds_when_client_auth_failed_tls_1_3(self): + def test_succeeds_when_client_auth_failed_tls_1_3(self) -> None: # Given a TLS 1.3 server that requires client authentication with ModernOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And SSLyze does NOT provide a client certificate diff --git a/tests/plugins_tests/test_compression_plugin.py b/tests/plugins_tests/test_compression_plugin.py index 25824e51..3b21b4b6 100644 --- a/tests/plugins_tests/test_compression_plugin.py +++ b/tests/plugins_tests/test_compression_plugin.py @@ -8,7 +8,7 @@ class TestCompressionPlugin: - def test_compression_disabled(self): + def test_compression_disabled(self) -> None: # Given a server to scan that has TLS compression disabled server_location = ServerNetworkLocation(hostname="www.google.com", port=443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -23,12 +23,12 @@ def test_compression_disabled(self): assert CompressionImplementation.cli_connector_cls.result_to_console_output(result) @pytest.mark.skip("Not implemented; find a server vulnerable to TLS compression") - def test_compression_enabled(self): + def test_compression_enabled(self) -> None: # TODO pass @can_only_run_on_linux_64 - def test_succeeds_when_client_auth_failed(self): + def test_succeeds_when_client_auth_failed(self) -> None: # Given a server that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And sslyze does NOT provide a client certificate diff --git a/tests/plugins_tests/test_early_data_plugin.py b/tests/plugins_tests/test_early_data_plugin.py index e1c611c3..c84bc45d 100644 --- a/tests/plugins_tests/test_early_data_plugin.py +++ b/tests/plugins_tests/test_early_data_plugin.py @@ -7,7 +7,7 @@ class TestEarlyDataPlugin: @can_only_run_on_linux_64 - def test_early_data_enabled(self): + def test_early_data_enabled(self) -> None: # Given a server to scan that supports early data with ModernOpenSslServer(max_early_data=256) as server: server_location = ServerNetworkLocation( @@ -25,7 +25,7 @@ def test_early_data_enabled(self): assert EarlyDataImplementation.cli_connector_cls.result_to_console_output(result) @can_only_run_on_linux_64 - def test_early_data_disabled_no_tls_1_3(self): + def test_early_data_disabled_no_tls_1_3(self) -> None: # Given a server to scan that does NOT support early data because it does not support TLS 1.3 with LegacyOpenSslServer() as server: server_location = ServerNetworkLocation( @@ -40,7 +40,7 @@ def test_early_data_disabled_no_tls_1_3(self): assert not result.supports_early_data @can_only_run_on_linux_64 - def test_early_data_disabled(self): + def test_early_data_disabled(self) -> None: # Given a server to scan that does NOT support early data because it it is disabled with ModernOpenSslServer(max_early_data=None) as server: server_location = ServerNetworkLocation( diff --git a/tests/plugins_tests/test_elliptic_curves_plugin.py b/tests/plugins_tests/test_elliptic_curves_plugin.py index 17ee72d4..2e394d73 100644 --- a/tests/plugins_tests/test_elliptic_curves_plugin.py +++ b/tests/plugins_tests/test_elliptic_curves_plugin.py @@ -9,7 +9,7 @@ class TestEllipticCurvesPluginWithOnlineServer: - def test_supported_curves(self): + def test_supported_curves(self) -> None: # Given a server to scan that supports ECDH cipher suites server_location = ServerNetworkLocation("www.cloudflare.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -28,7 +28,7 @@ def test_supported_curves(self): @can_only_run_on_linux_64 class TestEllipticCurvesPluginWithLocalServer: - def test_supported_curves(self): + def test_supported_curves(self) -> None: # Given a server to scan that supports ECDH cipher suites with specific curves server_curves = ["X25519", "X448", "prime256v1", "secp384r1", "secp521r1"] with ModernOpenSslServer(groups=":".join(server_curves)) as server: @@ -41,4 +41,5 @@ def test_supported_curves(self): result: SupportedEllipticCurvesScanResult = SupportedEllipticCurvesImplementation.scan_server(server_info) # And the supported curves were detected + assert result.supported_curves assert set(server_curves) == {curve.name for curve in result.supported_curves} diff --git a/tests/plugins_tests/test_fallback_scsv_plugin.py b/tests/plugins_tests/test_fallback_scsv_plugin.py index ed239e8e..e36b3235 100644 --- a/tests/plugins_tests/test_fallback_scsv_plugin.py +++ b/tests/plugins_tests/test_fallback_scsv_plugin.py @@ -14,7 +14,7 @@ class TestFallbackScsvPlugin: - def test_fallback_good(self): + def test_fallback_good(self) -> None: # Given a server that supports SCSV server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -29,7 +29,7 @@ def test_fallback_good(self): assert FallbackScsvImplementation.cli_connector_cls.result_to_console_output(result) @can_only_run_on_linux_64 - def test_fallback_bad(self): + def test_fallback_bad(self) -> None: # Given a server that does NOT support SCSV with LegacyOpenSslServer() as server: server_location = ServerNetworkLocation( @@ -44,7 +44,7 @@ def test_fallback_bad(self): assert not result.supports_fallback_scsv @can_only_run_on_linux_64 - def test_fails_when_client_auth_failed(self): + def test_fails_when_client_auth_failed(self) -> None: # Given a server that does NOT support SCSV and that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And sslyze does NOT provide a client certificate @@ -58,7 +58,7 @@ def test_fails_when_client_auth_failed(self): FallbackScsvImplementation.scan_server(server_info) @can_only_run_on_linux_64 - def test_works_when_client_auth_succeeded(self): + def test_works_when_client_auth_succeeded(self) -> None: # Given a server that does NOT support SCSV and that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: server_location = ServerNetworkLocation( diff --git a/tests/plugins_tests/test_http_headers_plugin.py b/tests/plugins_tests/test_http_headers_plugin.py index 3efca965..c4004eae 100644 --- a/tests/plugins_tests/test_http_headers_plugin.py +++ b/tests/plugins_tests/test_http_headers_plugin.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from http.client import HTTPResponse from typing import Dict import pytest @@ -22,7 +23,7 @@ class TestHttpHeadersPlugin: - def test_hsts_enabled(self): + def test_hsts_enabled(self) -> None: # Given a server to scan that has HSTS enabled server_location = ServerNetworkLocation("hsts.badssl.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -38,7 +39,7 @@ def test_hsts_enabled(self): # And a CLI output can be generated assert HttpHeadersImplementation.cli_connector_cls.result_to_console_output(result) - def test_all_headers_disabled(self): + def test_all_headers_disabled(self) -> None: # Given a server to scan that does not have security headers server_location = ServerNetworkLocation("expired.badssl.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -56,7 +57,7 @@ def test_all_headers_disabled(self): assert HttpHeadersImplementation.cli_connector_cls.result_to_console_output(result) @can_only_run_on_linux_64 - def test_http_error(self): + def test_http_error(self) -> None: # Given a server to scan with ModernOpenSslServer( # And the server will trigger an error when receiving an HTTP request @@ -85,7 +86,7 @@ def test_http_error(self): assert result_as_json @can_only_run_on_linux_64 - def test_fails_when_client_auth_failed(self): + def test_fails_when_client_auth_failed(self) -> None: # Given a server that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And sslyze does NOT provide a client certificate @@ -99,7 +100,7 @@ def test_fails_when_client_auth_failed(self): HttpHeadersImplementation.scan_server(server_info) @can_only_run_on_linux_64 - def test_works_when_client_auth_succeeded(self): + def test_works_when_client_auth_succeeded(self) -> None: # Given a server that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: server_location = ServerNetworkLocation( @@ -121,7 +122,7 @@ def test_works_when_client_auth_succeeded(self): @dataclass -class _MockHttpResponse: +class _MockHttpResponse(HTTPResponse): status: int _headers: Dict[str, str] @@ -131,7 +132,7 @@ def getheader(self, name: str, default=None): class TestHttpRedirection: - def test_no_redirection(self): + def test_no_redirection(self) -> None: # Given an HTTP response with no redirection http_response = _MockHttpResponse( status=200, @@ -146,7 +147,7 @@ def test_no_redirection(self): # No new location is returned assert next_location_path is None - def test_redirection_relative_url(self): + def test_redirection_relative_url(self) -> None: # Given an HTTP response with a redirection to a relative URL http_response = _MockHttpResponse( status=302, @@ -161,7 +162,7 @@ def test_redirection_relative_url(self): # The new location is returned assert next_location_path == "/newpath" - def test_redirection_absolute_url_same_server(self): + def test_redirection_absolute_url_same_server(self) -> None: # Given an HTTP response with a redirection to an absolute URL that points to the same server http_response = _MockHttpResponse( status=302, @@ -176,7 +177,7 @@ def test_redirection_absolute_url_same_server(self): # The new location is returned assert next_location_path == "/newpath" - def test_redirection_absolute_url_different_hostname(self): + def test_redirection_absolute_url_different_hostname(self) -> None: # Given an HTTP response with a redirection to an absolute URL that points to a different hostname http_response = _MockHttpResponse( status=302, @@ -191,7 +192,7 @@ def test_redirection_absolute_url_different_hostname(self): # No new location is returned assert next_location_path is None - def test_redirection_absolute_url_different_port(self): + def test_redirection_absolute_url_different_port(self) -> None: # Given an HTTP response with a redirection to an absolute URL that points to a different port http_response = _MockHttpResponse( status=302, diff --git a/tests/plugins_tests/test_robot_plugin.py b/tests/plugins_tests/test_robot_plugin.py index 55f1d88e..84c72727 100644 --- a/tests/plugins_tests/test_robot_plugin.py +++ b/tests/plugins_tests/test_robot_plugin.py @@ -10,7 +10,7 @@ class TestRobotPluginPlugin: - def test_robot_attack_good(self): + def test_robot_attack_good(self) -> None: # Validate the bug fix for https://github.com/nabla-c0d3/sslyze/issues/282 # Given a server to scan that is not vulnerable to ROBOT server_location = ServerNetworkLocation("guide.duo.com", 443) @@ -23,11 +23,11 @@ def test_robot_attack_good(self): assert RobotImplementation.cli_connector_cls.result_to_console_output(result) @pytest.mark.skip("Not implemented; TODO: Find a vulnerable server.") - def test_robot_attack_bad(self): + def test_robot_attack_bad(self) -> None: pass @can_only_run_on_linux_64 - def test_fails_when_client_auth_failed(self): + def test_fails_when_client_auth_failed(self) -> None: # Given a TLS 1.2 server that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And sslyze does NOT provide a client certificate diff --git a/tests/plugins_tests/test_session_renegotiation_plugin.py b/tests/plugins_tests/test_session_renegotiation_plugin.py index 1aea2d90..f1455e8c 100644 --- a/tests/plugins_tests/test_session_renegotiation_plugin.py +++ b/tests/plugins_tests/test_session_renegotiation_plugin.py @@ -17,7 +17,7 @@ class TestSessionRenegotiationPlugin: - def test_renegotiation_good(self): + def test_renegotiation_good(self) -> None: # Given a server that is NOT vulnerable to insecure reneg nor client reneg DOS server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -33,7 +33,7 @@ def test_renegotiation_good(self): assert SessionRenegotiationImplementation.cli_connector_cls.result_to_console_output(result) @can_only_run_on_linux_64 - def test_renegotiation_is_vulnerable_to_client_renegotiation_dos(self): + def test_renegotiation_is_vulnerable_to_client_renegotiation_dos(self) -> None: # Given a server that is vulnerable to client renegotiation DOS with LegacyOpenSslServer() as server: server_location = ServerNetworkLocation( @@ -51,7 +51,7 @@ def test_renegotiation_is_vulnerable_to_client_renegotiation_dos(self): assert SessionRenegotiationImplementation.cli_connector_cls.result_to_console_output(result) @can_only_run_on_linux_64 - def test_fails_when_client_auth_failed(self): + def test_fails_when_client_auth_failed(self) -> None: # Given a server that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: # And sslyze does NOT provide a client certificate @@ -65,7 +65,7 @@ def test_fails_when_client_auth_failed(self): SessionRenegotiationImplementation.scan_server(server_info) @can_only_run_on_linux_64 - def test_works_when_client_auth_succeeded(self): + def test_works_when_client_auth_succeeded(self) -> None: # Given a server that is vulnerable and that requires client authentication with LegacyOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: server_location = ServerNetworkLocation( diff --git a/tests/plugins_tests/test_session_resumption_plugin.py b/tests/plugins_tests/test_session_resumption_plugin.py index 81c02b36..95c9013e 100644 --- a/tests/plugins_tests/test_session_resumption_plugin.py +++ b/tests/plugins_tests/test_session_resumption_plugin.py @@ -19,7 +19,7 @@ class TestSessionResumptionSupport: - def test(self): + def test(self) -> None: # Given a server that supports session resumption with both TLS tickets and session IDs server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -39,7 +39,7 @@ def test(self): # And a CLI output can be generated assert SessionResumptionSupportImplementation.cli_connector_cls.result_to_console_output(result) - def test_with_extra_argument(self): + def test_with_extra_argument(self) -> None: # Given a server that supports session resumption with both TLS tickets and session IDs server_location = ServerNetworkLocation("www.google.com", 443) server_info = check_connectivity_to_server_and_return_info(server_location) @@ -78,7 +78,7 @@ def test_fails_when_client_auth_failed(self): SessionResumptionSupportImplementation.scan_server(server_info) @can_only_run_on_linux_64 - def test_works_when_client_auth_succeeded(self): + def test_works_when_client_auth_succeeded(self) -> None: # Given a server that requires client authentication with ModernOpenSslServer(client_auth_config=ClientAuthConfigEnum.REQUIRED) as server: server_location = ServerNetworkLocation(