Skip to content

Commit

Permalink
enable ssl cert and api key option for es batman
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Schier committed Aug 2, 2023
1 parent 4adcb6a commit 5aca28b
Showing 1 changed file with 43 additions and 20 deletions.
63 changes: 43 additions & 20 deletions seacatauth/batman/elk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import ssl
import logging
import typing
import aiohttp
Expand Down Expand Up @@ -26,8 +27,13 @@ class ELKIntegration(asab.config.Configurable):

ConfigDefaults = {
"url": "http://localhost:9200/",
"username": "elastic",
"password": "elastic",
# Credentials/api key (mutualy exclusive)
"username": "",
"password": "",
"api_key": "",

# Certs
"ca_file": "",

# List of elasticsearch system users
# If Seacat Auth has users with one of these usernames, it will not sync them
Expand Down Expand Up @@ -59,6 +65,21 @@ def __init__(self, batman_svc, config_section_name="batman:elk", config=None):
else:
self.Authorization = None

api_key = self.Config.get("api_key")
self.Headers = None
if api_key != "":
self.Headers = {
"Authorization": "ApiKey {}".format(api_key)
}

# Prep for SSL
ca_cert = self.Config.get("ca_file")
self.SSLContext = None
if ca_cert != "":
self.SSLContext = ssl.create_default_context(cafile=ca_cert)
self.SSLContext.check_hostname = True
self.SSLContext.verify_mode = ssl.CERT_REQUIRED

self.URL = self.Config.get("url").rstrip("/")
self.ResourcePrefix = self.Config.get("resource_prefix")
self.ELKResourceRegex = re.compile("^{}".format(
Expand Down Expand Up @@ -88,13 +109,14 @@ async def _initialize_resources(self):
"""
# Fetch ELK roles
try:
async with aiohttp.ClientSession(auth=self.Authorization) as session:
async with session.get("{}/_xpack/security/role".format(self.URL)) as resp:
if resp.status != 200:
text = await resp.text()
L.error("Failed to fetch ElasticSearch roles:\n{}".format(text[:1000]))
return
elk_roles_data = await resp.json()
async with aiohttp.TCPConnector(ssl=self.SSLContext or False) as conn:
async with aiohttp.ClientSession(connector=conn, auth=self.Authorization, headers=self.Headers) as session:
async with session.get("{}/_xpack/security/role".format(self.URL)) as resp:
if resp.status != 200:
text = await resp.text()
L.error("Failed to fetch ElasticSearch roles:\n{}".format(text[:1000]))
return
elk_roles_data = await resp.json()
except Exception as e:
L.error("Communication with ElasticSearch produced {}: {}".format(type(e).__name__, str(e)))
return
Expand Down Expand Up @@ -179,17 +201,18 @@ async def sync(self, cred: dict, elk_resources: typing.Iterable):
json["roles"] = list(elk_roles)

try:
async with aiohttp.ClientSession(auth=self.Authorization) as session:
async with session.post("{}/_xpack/security/user/{}".format(self.URL, username), json=json) as resp:
if resp.status == 200:
# Everything is alright here
pass
else:
text = await resp.text()
L.warning(
"Failed to create/update user in ElasticSearch:\n{}".format(text[:1000]),
struct_data={"cid": cred["_id"]}
)
async with aiohttp.TCPConnector(ssl=self.SSLContext) as conn:
async with aiohttp.ClientSession(connector=conn, auth=self.Authorization, headers=self.Headers) as session:
async with session.post("{}/_xpack/security/user/{}".format(self.URL, username), json=json) as resp:
if resp.status == 200:
# Everything is alright here
pass
else:
text = await resp.text()
L.warning(
"Failed to create/update user in ElasticSearch:\n{}".format(text[:1000]),
struct_data={"cid": cred["_id"]}
)
except Exception as e:
L.error(
"Communication with ElasticSearch produced {}: {}".format(type(e).__name__, str(e)),
Expand Down

0 comments on commit 5aca28b

Please sign in to comment.