Skip to content

Commit

Permalink
Merge pull request #7 from devolo/development
Browse files Browse the repository at this point in the history
v0.2.0
  • Loading branch information
Shutgun authored Jan 28, 2021
2 parents 00983bd + 6e23143 commit f342517
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 19 deletions.
7 changes: 7 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v0.2.0] - 2021/01/28

### Added

- High level methods now respect cool off and maximum assessment rate limits

## [v0.1.1] - 2021/01/27

### Fixed

- Fixed pip installation

## [v0.1.0] - 2021/01/27

### Added
Expand Down
2 changes: 1 addition & 1 deletion ssllabs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .ssllabs import Ssllabs

__license__ = "MIT"
__version__ = "0.1.1"
__version__ = "0.2.0"

__all__ = ['Ssllabs', "__license__", "__version__"]
51 changes: 33 additions & 18 deletions ssllabs/ssllabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,56 @@ class Ssllabs():
"""Highlevel methods to interact with the SSL Labs Assessment APIs."""

def __init__(self):
self.logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self._logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self._semaphore = asyncio.Semaphore(1)

async def availability(self) -> bool:
"""
Check the availability of the SSL Labs servers.
See also: https://github.com/ssllabs/ssllabs-scan/blob/master/ssllabs-api-docs-v3.md#error-response-status-codes
"""
api = Info()
i = Info()
try:
await api.get()
await i.get()
return True
except HTTPStatusError as ex:
self.logger.error(ex)
self._logger.error(ex)
return False

async def analyze(self, host: str, publish: bool = False, ignore_mismatch: bool = False) -> HostData:
"""
Test a particular host.
Test a particular host with respect to the cool off and the maximum number of assessments.
:param host: Host to test
:param publish: True if assessment results should be published on the public results boards
:param ignore_mismatch: True if assessment shall proceed even when the server certificate doesn't match the hostname
See also: https://github.com/ssllabs/ssllabs-scan/blob/master/ssllabs-api-docs-v3.md#protocol-usage
"""
api = Analyze()
host_object = await api.get(host=host,
startNew="on",
publish="on" if publish else "off",
igonreMismatch="on" if ignore_mismatch else "off")
await self._semaphore.acquire()
i = Info()
info = await i.get()

# Wait for a free slot, if all slots are in use
while info.currentAssessments >= info.maxAssessments:
await asyncio.sleep(1)
info = await i.get()

# If there is already an assessment running, wait the needed cool off until starting the next one
if info.currentAssessments != 0:
await asyncio.sleep(info.newAssessmentCoolOff / 1000)

a = Analyze()
host_object = await a.get(host=host,
startNew="on",
publish="on" if publish else "off",
igonreMismatch="on" if ignore_mismatch else "off")
self._semaphore.release()
while host_object.status not in ["READY", "ERROR"]:
self.logger.debug("Analyzing %s", host)
self._logger.debug("Analyzing %s", host)
await asyncio.sleep(10)
host_object = await api.get(host=host, all="done")
host_object = await a.get(host=host, all="done")
return host_object

async def info(self) -> InfoData:
Expand All @@ -56,8 +71,8 @@ async def info(self) -> InfoData:
See also: https://github.com/ssllabs/ssllabs-scan/blob/master/ssllabs-api-docs-v3.md#info
"""
api = Info()
return await api.get()
i = Info()
return await i.get()

async def root_certs(self, trust_store: int = 1) -> str:
"""
Expand All @@ -70,14 +85,14 @@ async def root_certs(self, trust_store: int = 1) -> str:
if not 1 <= trust_store <= 5:
raise ValueError("""Trust store not found. Please choose on of the following:
1-Mozilla, 2-Apple MacOS, 3-Android, 4-Java, 5-Windows""")
api = RootCertsRaw()
return await api.get(trustStore=trust_store)
rcr = RootCertsRaw()
return await rcr.get(trustStore=trust_store)

async def status_codes(self) -> StatusCodesData:
"""
Retrieve known status codes.
See also: https://github.com/ssllabs/ssllabs-scan/blob/master/ssllabs-api-docs-v3.md#retrieve-known-status-codes
"""
api = StatusCodes()
return await api.get()
sc = StatusCodes()
return await sc.get()
10 changes: 10 additions & 0 deletions tests/test_data/info_max_assessments.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"engineVersion": "2.1.8",
"criteriaVersion": "2009q",
"maxAssessments": 25,
"currentAssessments": 25,
"newAssessmentCoolOff": 1000,
"messages": [
"This assessment service is provided free of charge by Qualys SSL Labs, subject to our terms and conditions: https://www.ssllabs.com/about/terms.html"
]
}
10 changes: 10 additions & 0 deletions tests/test_data/info_running_assessments.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"engineVersion": "2.1.8",
"criteriaVersion": "2009q",
"maxAssessments": 25,
"currentAssessments": 1,
"newAssessmentCoolOff": 1000,
"messages": [
"This assessment service is provided free of charge by Qualys SSL Labs, subject to our terms and conditions: https://www.ssllabs.com/about/terms.html"
]
}
33 changes: 33 additions & 0 deletions tests/test_ssllabs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import dataclasses
from unittest.mock import patch

Expand Down Expand Up @@ -57,6 +58,38 @@ async def test_analyze_not_ready_yet(self, request, mocker):
await ssllabs.analyze(host="devolo.de")
assert spy.call_count == 2

@pytest.mark.asyncio
async def test_analyze_max_assessments(self, request, mocker):
with patch("asyncio.sleep", new=AsyncMock()), \
patch("ssllabs.api.analyze.Analyze.get",
new=AsyncMock(return_value=from_dict(data_class=HostData,
data=request.cls.analyze))), \
patch("ssllabs.api.info.Info.get",
new=AsyncMock(side_effect=[
from_dict(data_class=InfoData,
data=request.cls.info_max_assessments),
from_dict(data_class=InfoData,
data=request.cls.info)
])):
spy = mocker.spy(asyncio, "sleep")
ssllabs = Ssllabs()
await ssllabs.analyze(host="devolo.de")
assert spy.call_count == 1

@pytest.mark.asyncio
async def test_analyze_running_assessments(self, request, mocker):
with patch("asyncio.sleep", new=AsyncMock()), \
patch("ssllabs.api.analyze.Analyze.get",
new=AsyncMock(return_value=from_dict(data_class=HostData,
data=request.cls.analyze))), \
patch("ssllabs.api.info.Info.get",
new=AsyncMock(return_value=from_dict(data_class=InfoData,
data=request.cls.info_running_assessments))):
spy = mocker.spy(asyncio, "sleep")
ssllabs = Ssllabs()
await ssllabs.analyze(host="devolo.de")
assert spy.call_count == 1

@pytest.mark.asyncio
async def test_root_certs(self, request):
with patch("ssllabs.api.root_certs_raw.RootCertsRaw.get",
Expand Down

0 comments on commit f342517

Please sign in to comment.