diff --git a/seacatauth/batman/elk.py b/seacatauth/batman/elk.py index ead13d32..e8baf0c1 100644 --- a/seacatauth/batman/elk.py +++ b/seacatauth/batman/elk.py @@ -1,4 +1,5 @@ import re +import ssl import logging import typing import aiohttp @@ -26,8 +27,13 @@ class ELKIntegration(asab.config.Configurable): ConfigDefaults = { "url": "http://localhost:9200/", - "username": "elastic", - "password": "elastic", + # Credentials/api key (mutualy exclusive) + "username": "", + "password": "", + "api_key": "", + + # Certs + "ca_file": "", # List of elasticsearch system users # If Seacat Auth has users with one of these usernames, it will not sync them @@ -59,6 +65,21 @@ def __init__(self, batman_svc, config_section_name="batman:elk", config=None): else: self.Authorization = None + api_key = self.Config.get("api_key") + self.Headers = None + if api_key != "": + self.Headers = { + "Authorization": "ApiKey {}".format(api_key) + } + + # Prep for SSL + ca_cert = self.Config.get("ca_file") + self.SSLContext = None + if ca_cert != "": + self.SSLContext = ssl.create_default_context(cafile=ca_cert) + self.SSLContext.check_hostname = True + self.SSLContext.verify_mode = ssl.CERT_REQUIRED + self.URL = self.Config.get("url").rstrip("/") self.ResourcePrefix = self.Config.get("resource_prefix") self.ELKResourceRegex = re.compile("^{}".format( @@ -88,13 +109,14 @@ async def _initialize_resources(self): """ # Fetch ELK roles try: - async with aiohttp.ClientSession(auth=self.Authorization) as session: - async with session.get("{}/_xpack/security/role".format(self.URL)) as resp: - if resp.status != 200: - text = await resp.text() - L.error("Failed to fetch ElasticSearch roles:\n{}".format(text[:1000])) - return - elk_roles_data = await resp.json() + async with aiohttp.TCPConnector(ssl=self.SSLContext or False) as conn: + async with aiohttp.ClientSession(connector=conn, auth=self.Authorization, headers=self.Headers) as session: + async with session.get("{}/_xpack/security/role".format(self.URL)) as resp: + if resp.status != 200: + text = await resp.text() + L.error("Failed to fetch ElasticSearch roles:\n{}".format(text[:1000])) + return + elk_roles_data = await resp.json() except Exception as e: L.error("Communication with ElasticSearch produced {}: {}".format(type(e).__name__, str(e))) return @@ -179,17 +201,18 @@ async def sync(self, cred: dict, elk_resources: typing.Iterable): json["roles"] = list(elk_roles) try: - async with aiohttp.ClientSession(auth=self.Authorization) as session: - async with session.post("{}/_xpack/security/user/{}".format(self.URL, username), json=json) as resp: - if resp.status == 200: - # Everything is alright here - pass - else: - text = await resp.text() - L.warning( - "Failed to create/update user in ElasticSearch:\n{}".format(text[:1000]), - struct_data={"cid": cred["_id"]} - ) + async with aiohttp.TCPConnector(ssl=self.SSLContext) as conn: + async with aiohttp.ClientSession(connector=conn, auth=self.Authorization, headers=self.Headers) as session: + async with session.post("{}/_xpack/security/user/{}".format(self.URL, username), json=json) as resp: + if resp.status == 200: + # Everything is alright here + pass + else: + text = await resp.text() + L.warning( + "Failed to create/update user in ElasticSearch:\n{}".format(text[:1000]), + struct_data={"cid": cred["_id"]} + ) except Exception as e: L.error( "Communication with ElasticSearch produced {}: {}".format(type(e).__name__, str(e)),