Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed monitor certificate lambda #1864

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions service_status/monitor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime as dt
import ssl
import socket
from urllib.parse import urlparse
from service_status.config import REGION_NAME, NOTIFICATION_ARN, SLACK_HOOK, NETWORKS, NETWORK_ID, \
CERTIFICATION_EXPIRATION_THRESHOLD
from common.boto_utils import BotoUtils
Expand Down Expand Up @@ -146,18 +147,22 @@ def _send_notification_for_certificate_expiration(self, org_id, service_id, endp
self._send_slack_notification(slack_message=slack_message)

def _get_certification_expiration_date_for_given_service(self, endpoint):
endpoint = endpoint.lstrip()
if self._valid_url(url=endpoint):
if self._is_https_endpoint(endpoint):
endpoint = self.obj_util.remove_http_https_prefix(url=endpoint)
hostname = endpoint.split(":")[0]
port = endpoint.split(":")[1]
context = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
data = json.dumps(ssock.getpeercert())
expiration_date = json.loads(data)["notAfter"]
return dt.datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z")
try:
endpoint = endpoint.lstrip()
if self._valid_url(url=endpoint):
if self._is_https_endpoint(endpoint):
url = urlparse(endpoint)
hostname = url.hostname
port = url.port
context = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
data = json.dumps(ssock.getpeercert())
expiration_date = json.loads(data)["notAfter"]
return dt.datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z")
except Exception as e:
logger.exception(e)
return None

@staticmethod
def _get_certificate_expiration_email_notification_subject(org_id, service_id, endpoint):
Expand Down