diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c1bb182..a5478bf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # CHANGELOG +## v23.47 + +### Pre-releases +- `v23.47-alpha` + +### Breaking changes +- Batman for Kibana now also requires `kibana_url` (#281, `v23.47-alpha`) +- Batman does no longer create Seacat resources from all Kibana roles (#281, `v23.47-alpha`) +- Config section 'batman:elk' renamed to 'batman:kibana' (#281, `v23.47-alpha`) + +### Features +- Kibana spaces and roles are now synchronized with Seacat tenants (#281, `v23.47-alpha`) + +--- + + ## v23.44-beta ### Pre-releases diff --git a/seacatauth/authz/resource/service.py b/seacatauth/authz/resource/service.py index 57ef6717..8afff43a 100644 --- a/seacatauth/authz/resource/service.py +++ b/seacatauth/authz/resource/service.py @@ -181,7 +181,7 @@ async def create(self, resource_id: str, description: str = None): await upsertor.execute(event_type=EventTypes.RESOURCE_CREATED) except asab.storage.exceptions.DuplicateError as e: if e.KeyValue is not None: - key, value = e.KeyValue + key, value = e.KeyValue.popitem() raise asab.exceptions.Conflict(key=key, value=value) else: raise asab.exceptions.Conflict() diff --git a/seacatauth/authz/utils.py b/seacatauth/authz/utils.py index e58158f3..2267b6c8 100644 --- a/seacatauth/authz/utils.py +++ b/seacatauth/authz/utils.py @@ -1,6 +1,9 @@ +import typing + + async def build_credentials_authz( - tenant_service, role_service, credentials_id, - tenants=None, exclude_resources=None + tenant_service, role_service, credentials_id: str, + tenants: typing.Iterable = None, exclude_resources: typing.Iterable = None ): """ Creates a nested 'authz' dict with tenant:resource structure: diff --git a/seacatauth/batman/elk.py b/seacatauth/batman/elk.py deleted file mode 100644 index 5cf315bd..00000000 --- a/seacatauth/batman/elk.py +++ /dev/null @@ -1,222 +0,0 @@ -import re -import ssl -import logging -import typing -import aiohttp -import asab.config -import asab.tls - -from ..authz import build_credentials_authz - -# - -L = logging.getLogger(__name__) - -# - - -# TODO: When credentials are added/updated/deleted, the sync should happen -# That's to be done using PubSub mechanism - -# TODO: Remove users that are managed by us but are removed (use `managed_role` to find these) - - -class ELKIntegration(asab.config.Configurable): - """ - Kibana / ElasticSearch user push compomnent - """ - - ConfigDefaults = { - "url": "http://localhost:9200/", - # Credentials/api key (mutualy exclusive) - "username": "", - "password": "", - "api_key": "", - - # For SSL options such as `cafile`, please refer to asab SSLContextBuilder - - # List of elasticsearch system users - # If Seacat Auth has users with one of these usernames, it will not sync them - # to avoid interfering with kibana system users - "local_users": "elastic kibana logstash_system beats_system remote_monitoring_user", - - # Resources with this prefix will be mapped to Kibana users as roles - # E.g.: Resource "elk:kibana-analyst" will be mapped to role "kibana-analyst" - "resource_prefix": "elk:", - - # This role 'flags' users in ElasticSearch/Kibana that is managed by Seacat Auth - # There should be a role created in the ElasticSearch that grants no rights - "seacat_user_flag": "seacat_managed", - } - - - def __init__(self, batman_svc, config_section_name="batman:elk", config=None): - super().__init__(config_section_name=config_section_name, config=config) - self.BatmanService = batman_svc - self.CredentialsService = self.BatmanService.App.get_service("seacatauth.CredentialsService") - self.TenantService = self.BatmanService.App.get_service("seacatauth.TenantService") - self.RoleService = self.BatmanService.App.get_service("seacatauth.RoleService") - self.ResourceService = self.BatmanService.App.get_service("seacatauth.ResourceService") - - username = self.Config.get("username") - password = self.Config.get("password") - api_key = self.Config.get("api_key") - if username != "" and api_key != "": - raise ValueError("Cannot authenticate with both 'api_key' and 'username'+'password'.") - if username != "": - self.Headers = { - "Authorization": aiohttp.BasicAuth(username, password).encode() - } - elif api_key != "": - self.Headers = { - "Authorization": "ApiKey {}".format(api_key) - } - else: - self.Headers = None - - self.URL = self.Config.get("url").rstrip("/") - self.ResourcePrefix = self.Config.get("resource_prefix") - self.ELKResourceRegex = re.compile("^{}".format( - re.escape(self.Config.get("resource_prefix")) - )) - self.ELKSeacatFlagRole = self.Config.get("seacat_user_flag") - - lu = re.split(r"\s+", self.Config.get("local_users"), flags=re.MULTILINE) - lu.append(username) - - self.LocalUsers = frozenset(lu) - - batman_svc.App.PubSub.subscribe("Application.tick/60!", self._on_tick) - - # Prep for SSL - self.SSLContextBuilder = asab.tls.SSLContextBuilder(config_section_name) - if self.URL.startswith("https://"): - self.SSLContext = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) - else: - self.SSLContext = None - - - async def _on_tick(self, event_name): - await self._initialize_resources() - await self.sync_all() - - async def initialize(self): - await self._initialize_resources() - await self.sync_all() - - async def _initialize_resources(self): - # TODO: Remove resource if its respective kibana role has been removed - """ - Fetches roles from ELK and creates a Seacat Auth resource for each one of them. - """ - # Fetch ELK roles - try: - async with aiohttp.TCPConnector(ssl=self.SSLContext or False) as conn: - async with aiohttp.ClientSession(connector=conn, headers=self.Headers) as session: - async with session.get("{}/_xpack/security/role".format(self.URL)) as resp: - if resp.status != 200: - text = await resp.text() - L.error("Failed to fetch ElasticSearch roles:\n{}".format(text[:1000])) - return - elk_roles_data = await resp.json() - except Exception as e: - L.error("Communication with ElasticSearch produced {}: {}".format(type(e).__name__, str(e))) - return - - # Fetch SCA resources for the ELK module - existing_elk_resources = await self.ResourceService.list(query_filter={"_id": self.ELKResourceRegex}) - existing_elk_resources = set( - resource["_id"] - for resource in existing_elk_resources["data"] - ) - - # Create resources that don't exist yet - for role in elk_roles_data.keys(): - resource_id = "{}{}".format(self.ResourcePrefix, role) - if resource_id not in existing_elk_resources: - await self.ResourceService.create( - resource_id, - description="Grants access to ELK role {!r}.".format(role) - ) - - async def sync_all(self): - elk_resources = await self.ResourceService.list(query_filter={"_id": self.ELKResourceRegex}) - elk_resources = set( - resource["_id"] - for resource in elk_resources["data"] - ) - async for cred in self.CredentialsService.iterate(): - await self.sync(cred, elk_resources) - - - async def sync(self, cred: dict, elk_resources: typing.Iterable): - username = cred.get("username") - if username is None: - # Be defensive - L.info("Cannot create user: No username", struct_data={"cid": cred["_id"]}) - return - - if username in self.LocalUsers: - # Ignore users that are specified as local - return - - json = { - "enabled": cred.get("suspended", False) is not True, - - # Generate technical password - "password": self.BatmanService.generate_password(cred["_id"]), - - "metadata": { - # We are managed by SeaCat Auth - "seacatauth": True - }, - - } - - v = cred.get("email") - if v is not None: - json["email"] = v - - v = cred.get("full_name") - if v is not None: - json["full_name"] = v - - elk_roles = {self.ELKSeacatFlagRole} # Add a role that marks users managed by Seacat Auth - - # Get authz dict - authz = await build_credentials_authz(self.TenantService, self.RoleService, cred["_id"]) - - # ELK roles from SCA resources - # Use only global "*" roles for now - user_resources = set(authz.get("*", [])) - if "authz:superuser" in user_resources: - elk_roles.update( - resource[len(self.ResourcePrefix):] - for resource in elk_resources - ) - else: - elk_roles.update( - resource[len(self.ResourcePrefix):] - for resource in user_resources.intersection(elk_resources) - ) - - json["roles"] = list(elk_roles) - - try: - async with aiohttp.TCPConnector(ssl=self.SSLContext) as conn: - async with aiohttp.ClientSession(connector=conn, headers=self.Headers) as session: - async with session.post("{}/_xpack/security/user/{}".format(self.URL, username), json=json) as resp: - if resp.status == 200: - # Everything is alright here - pass - else: - text = await resp.text() - L.warning( - "Failed to create/update user in ElasticSearch:\n{}".format(text[:1000]), - struct_data={"cid": cred["_id"]} - ) - except Exception as e: - L.error( - "Communication with ElasticSearch produced {}: {}".format(type(e).__name__, str(e)), - struct_data={"cid": cred["_id"]} - ) diff --git a/seacatauth/batman/kibana.py b/seacatauth/batman/kibana.py new file mode 100644 index 00000000..2064b671 --- /dev/null +++ b/seacatauth/batman/kibana.py @@ -0,0 +1,485 @@ +import contextlib +import datetime +import re +import ssl +import logging +import typing +import aiohttp +import aiohttp.client_exceptions +import asab.config +import asab.tls + +from ..authz import build_credentials_authz + +# + +L = logging.getLogger(__name__) + +# + + +# TODO: When credentials are added/updated/deleted, the sync should happen +# That's to be done using PubSub mechanism + +# TODO: Remove users that are managed by us but are removed (use `managed_role` to find these) + + +class KibanaIntegration(asab.config.Configurable): + """ + Kibana / ElasticSearch user push compomnent + """ + + ConfigDefaults = { + "url": "http://localhost:9200", + + # Enables automatic synchronization of Kibana spaces with Seacat tenants + # Space and role sync is disabled if kibana_url is empty. + "kibana_url": "http://localhost:5601", + + # Basic credentials / API key (mutually exclusive) + "username": "", + "password": "", + "api_key": "", + + # For SSL options such as `cafile`, please refer to the config of asab.tls.SSLContextBuilder + + # List of elasticsearch system users + # If Seacat Auth has users with one of these usernames, it will not sync them + # to avoid interfering with kibana system users + "local_users": "elastic kibana logstash_system beats_system remote_monitoring_user", + + # This role 'flags' users in ElasticSearch/Kibana that is managed by Seacat Auth + # There should be a role created in the ElasticSearch that grants no rights + "seacat_user_flag": "seacat_managed", + } + + EssentialKibanaResources = { + "kibana:access": { + "description": + "Read-only access to tenant space in Kibana."}, + "kibana:edit": { + "description": + "Read-write access to tenant space in Kibana."}, + "kibana:admin": { + "role_name": "kibana_admin", + "description": + "Grants access to all features in Kibana across all spaces. For more information, see 'kibana_admin' " + "role in ElasticSearch documentation."}, + "authz:superuser": { + "role_name": "superuser", + "description": + "Grants full access to cluster management and data indices. This role also grants direct read-only " + "access to restricted indices like .security. A user with the superuser role can impersonate " + "any other user in the system."}, + } + + + def __init__(self, batman_svc, config_section_name="batman:kibana", config=None): + super().__init__(config_section_name=config_section_name, config=config) + + if "batman:elk" in asab.Config: + asab.LogObsolete.warning( + "Config section 'batman:elk' has been renamed to 'batman:kibana'. Please update your config.", + struct_data={"eol": "2024-05-31"}) + self.Config.update(asab.Config["batman:elk"]) + + self.BatmanService = batman_svc + self.App = self.BatmanService.App + self.CredentialsService = self.App.get_service("seacatauth.CredentialsService") + self.TenantService = self.App.get_service("seacatauth.TenantService") + self.RoleService = self.App.get_service("seacatauth.RoleService") + self.ResourceService = self.App.get_service("seacatauth.ResourceService") + + self.KibanaUrl = self.Config.get("kibana_url").rstrip("/") + if len(self.KibanaUrl) == 0: + self.KibanaUrl = None + self.ElasticSearchUrl = self.Config.get("url").rstrip("/") + self.Headers = self._prepare_session_headers() + + self.ResourcePrefix = "kibana:" + self.DeprecatedResourcePrefix = "elk:" + self.DeprecatedResourceRegex = re.compile("^elk:") + self.SeacatUserFlagRole = self.Config.get("seacat_user_flag") + self.IgnoreUsernames = self._prepare_ignored_usernames() + + self.SSLContextBuilder = asab.tls.SSLContextBuilder(config_section_name) + if self.ElasticSearchUrl.startswith("https://"): + self.SSLContext = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT) + else: + self.SSLContext = None + + self.RetrySyncAll: datetime.datetime | None = None + + self.App.PubSub.subscribe("Application.init!", self._on_init) + self.App.PubSub.subscribe("Role.assigned!", self._on_authz_change) + self.App.PubSub.subscribe("Role.unassigned!", self._on_authz_change) + self.App.PubSub.subscribe("Role.updated!", self._on_authz_change) + self.App.PubSub.subscribe("Tenant.assigned!", self._on_authz_change) + self.App.PubSub.subscribe("Tenant.unassigned!", self._on_authz_change) + self.App.PubSub.subscribe("Tenant.created!", self._on_tenant_created) + self.App.PubSub.subscribe("Tenant.updated!", self._on_tenant_updated) + self.App.PubSub.subscribe("Application.housekeeping!", self._on_housekeeping) + self.App.PubSub.subscribe("Application.tick/10!", self._retry_sync) + + @contextlib.asynccontextmanager + async def _elasticsearch_session(self): + async with aiohttp.TCPConnector(ssl=self.SSLContext or False) as connector: + async with aiohttp.ClientSession(connector=connector, headers=self.Headers) as session: + yield session + + + async def _on_init(self, event_name): + await self._initialize_resources() + # Ensure sync on startup even if housekeeping does not happen; prevent syncing twice + if not asab.Config.getboolean("housekeeping", "run_at_startup"): + try: + await self._sync_all_tenants_and_spaces() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to Kibana: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + try: + await self.sync_all_credentials() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to ElasticSearch: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + + async def _on_housekeeping(self, event_name): + try: + await self._sync_all_tenants_and_spaces() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to Kibana: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + try: + await self.sync_all_credentials() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to ElasticSearch: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + + + async def _retry_sync(self, event_name): + if not self.RetrySyncAll or datetime.datetime.now(datetime.UTC) < self.RetrySyncAll: + return + self.RetrySyncAll = None + try: + await self._sync_all_tenants_and_spaces() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to Kibana: {}".format(str(e))) + return + try: + await self.sync_all_credentials() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to ElasticSearch: {}".format(str(e))) + return + + async def _on_authz_change(self, event_name, credentials_id=None, **kwargs): + try: + if credentials_id: + await self.sync_credentials(credentials_id) + else: + await self.sync_all_credentials() + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to ElasticSearch: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + + + async def _on_tenant_created(self, event_name, tenant_id): + space_id = self._kibana_space_id_from_tenant_id(tenant_id) + try: + await self._create_or_update_kibana_space(tenant_id, space_id) + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to Kibana: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + + + async def _on_tenant_updated(self, event_name, tenant_id): + space_id = self._kibana_space_id_from_tenant_id(tenant_id) + try: + await self._create_or_update_kibana_space(tenant_id, space_id) + except aiohttp.client_exceptions.ClientConnectionError as e: + L.error("Cannot connect to Kibana: {}".format(str(e))) + self.RetrySyncAll = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=60) + return + + + async def _sync_all_tenants_and_spaces(self): + async for tenant in self.TenantService.iterate(): + space_id = self._kibana_space_id_from_tenant_id(tenant["_id"]) + await self._create_or_update_kibana_space(tenant, space_id) + + + async def _create_or_update_kibana_space(self, tenant: str | dict, space_id: str = None): + """ + Create a Kibana space for specified tenant or update its metadata if necessary. + Also create a read-only and a read-write Kibana role for that space. + """ + if self.KibanaUrl is None: + return + + if isinstance(tenant, str): + tenant_id = tenant + tenant = await self.TenantService.get_tenant(tenant_id) + else: + tenant_id = tenant["_id"] + + if not space_id: + space_id = self._kibana_space_id_from_tenant_id(tenant_id) + + async with self._elasticsearch_session() as session: + async with session.get("{}/api/spaces/space/{}".format(self.KibanaUrl, space_id)) as resp: + if resp.status == 404: + existing_space = None + elif resp.status == 200: + existing_space = await resp.json() + else: + text = await resp.text() + L.error( + "Failed to fetch Kibana tenant space (Server responded with {}):\n{}".format( + resp.status, text[:1000]), + struct_data={"space_id": space_id, "tenant_id": tenant_id} + ) + return + + space_update = {} + if existing_space: + name = tenant.get("label", tenant_id) + if existing_space.get("name") != name: + space_update["name"] = name + description = tenant.get("description") + if existing_space.get("description") != description: + space_update["description"] = description + if len(space_update) > 0: + space_update["id"] = space_id + else: + space_update = { + "id": space_id, + "name": tenant.get("label", tenant_id) + } + if "description" in tenant: + space_update["description"] = tenant["description"] + + if not space_update: + # No changes + L.debug("Kibana space metadata up to date", struct_data={ + "space_id": space_id, "tenant_id": tenant_id}) + return + + elif existing_space: + # Update existing space + async with self._elasticsearch_session() as session: + async with session.put( + "{}/api/spaces/space/{}".format(self.KibanaUrl, space_id), json=space_update + ) as resp: + if not (200 <= resp.status < 300): + text = await resp.text() + L.error( + "Failed to update Kibana tenant space (Server responded with {}):\n{}".format( + resp.status, text[:1000]), + struct_data={"space_id": space_id, "tenant_id": tenant_id} + ) + L.log(asab.LOG_NOTICE, "Kibana space updated", struct_data={"id": space_id, "tenant": tenant_id}) + return + + else: + # Create new space + async with self._elasticsearch_session() as session: + async with session.post("{}/api/spaces/space".format(self.KibanaUrl), json=space_update) as resp: + if not (200 <= resp.status < 300): + text = await resp.text() + L.error( + "Failed to create Kibana tenant space (Server responded with {}):\n{}".format( + resp.status, text[:1000]), + struct_data={"space_id": space_id, "tenant_id": tenant_id} + ) + return + + L.log(asab.LOG_NOTICE, "Kibana space created", struct_data={"id": space_id, "tenant": tenant_id}) + + # Create roles for space access + await self._create_kibana_role(tenant_id, space_id, "read") + await self._create_kibana_role(tenant_id, space_id, "all") + + + async def _create_kibana_role(self, tenant_id: str, space_id: str, privileges: str = "read"): + assert privileges in {"read", "all"} + role_name = self._elastic_role_from_tenant(tenant_id, privileges) + role = { + # Add all privileges for the new space + "kibana": [{"spaces": [space_id], "base": [privileges]}] + } + + async with self._elasticsearch_session() as session: + async with session.put( + "{}/api/security/role/{}".format(self.KibanaUrl, role_name), json=role + ) as resp: + if resp.status // 100 != 2: + text = await resp.text() + L.error("Failed to create Kibana role {!r}:\n{}".format(role_name, text[:1000])) + return + + L.log(asab.LOG_NOTICE, "Kibana role created.", struct_data={"name": role_name}) + + + async def _get_kibana_spaces(self): + async with self._elasticsearch_session() as session: + async with session.get("{}/api/spaces/space".format(self.KibanaUrl)) as resp: + if resp.status != 200: + text = await resp.text() + L.error("Failed to fetch Kibana spaces:\n{}".format(text[:1000])) + return + spaces = await resp.json() + return spaces + + + async def initialize(self): + pass + + + async def _initialize_resources(self): + """ + Create Seacat Auth resources that grant access to ElasticSearch roles + """ + # Create core resources that don't exist yet + for resource_id, resource in self.EssentialKibanaResources.items(): + try: + await self.ResourceService.get(resource_id) + except KeyError: + await self.ResourceService.create( + resource_id, + description=resource.get("description") + ) + + async def sync_all_credentials(self): + elk_resources = await self.ResourceService.list(query_filter={"_id": self.DeprecatedResourceRegex}) + elk_resources = set( + resource["_id"] + for resource in elk_resources["data"] + ) + async with self._elasticsearch_session() as session: + async for cred in self.CredentialsService.iterate(): + await self._sync_credentials(session, cred, elk_resources) + + + async def sync_credentials(self, credentials_id: str): + elk_resources = await self.ResourceService.list(query_filter={"_id": self.DeprecatedResourceRegex}) + elk_resources = set( + resource["_id"] + for resource in elk_resources["data"] + ) + cred_svc = self.BatmanService.App.get_service("seacatauth.CredentialsService") + credentials = await cred_svc.get(credentials_id) + async with self._elasticsearch_session() as session: + await self._sync_credentials(session, credentials, elk_resources) + + + async def _sync_credentials(self, session: aiohttp.ClientSession, cred: dict, elk_resources: typing.Iterable): + username = cred.get("username") + if username is None: + # Be defensive + L.info("Cannot create user: No username", struct_data={"cid": cred["_id"]}) + return + + if username in self.IgnoreUsernames: + return + + elastic_user = { + "enabled": cred.get("suspended", False) is not True, + # Generate complex deterministic password + "password": self.BatmanService.generate_password(cred["_id"]), + "metadata": { + # Flag users managed by SeaCat Auth + "seacatauth": True + }, + } + + v = cred.get("email") + if v is not None: + elastic_user["email"] = v + + v = cred.get("full_name") + if v is not None: + elastic_user["full_name"] = v + + elk_roles = {self.SeacatUserFlagRole} # Add a role that marks users managed by Seacat Auth + + # Get full authorization scope + assigned_tenants = await self.TenantService.get_tenants(cred["_id"]) + authz = await build_credentials_authz( + self.TenantService, self.RoleService, cred["_id"], tenants=assigned_tenants) + + # Tenant-scoped resources grant privileges for specific tenant spaces + for tenant_id, resources in authz.items(): + if "kibana:access" in resources: + elk_roles.add(self._elastic_role_from_tenant(tenant_id, "read")) + if "kibana:edit" in resources: + elk_roles.add(self._elastic_role_from_tenant(tenant_id, "all")) + + # Globally authorized resources grant privileges across all Kibana spaces + global_authz = frozenset(authz.get("*", frozenset())) + if "authz:superuser" in global_authz: + elk_roles.add(self.EssentialKibanaResources["authz:superuser"]["role_name"]) + if "kibana:admin" in global_authz: + elk_roles.add(self.EssentialKibanaResources["kibana:admin"]["role_name"]) + + # BACK COMPAT + # Map globally authorized Seacat resources prefixed with "elk:" to Elastic roles + elk_roles.update( + resource[len(self.DeprecatedResourcePrefix):] + for resource in global_authz.intersection(elk_resources) + ) + + elastic_user["roles"] = list(elk_roles) + + async with session.post( + "{}/_xpack/security/user/{}".format(self.ElasticSearchUrl, username), + json=elastic_user + ) as resp: + if 200 <= resp.status < 300: + # Everything is alright here + pass + else: + text = await resp.text() + L.warning( + "Failed to create/update user in ElasticSearch:\n{}".format(text[:1000]), + struct_data={"cid": cred["_id"]} + ) + + + def _elastic_role_from_tenant(self, tenant: str, privileges: str): + return "tenant_{}_{}".format(tenant, privileges) + + + def _kibana_space_id_from_tenant_id(self, tenant_id: str): + if tenant_id == "default": + # "default" is a reserved space name in Kibana + return "tenant-default" + # Replace forbidden characters with "--" + # NOTE: Tenant ID can contain "." while space ID can not + return re.sub("[^a-z0-9_-]", "--", tenant_id) + + def _prepare_ignored_usernames(self): + """ + Load usernames that will not be synchronized to avoid conflicts with ELK system users + """ + ignore_usernames = re.split(r"\s+", self.Config.get("local_users"), flags=re.MULTILINE) + if self.Config.get("username"): + ignore_usernames.append(self.Config.get("username")) + return frozenset(ignore_usernames) + + def _prepare_session_headers(self): + headers = {"kbn-xsrf": "kibana"} + username = self.Config.get("username") + password = self.Config.get("password") + api_key = self.Config.get("api_key") + if username != "" and api_key != "": + raise ValueError("Cannot authenticate with both 'api_key' and 'username'+'password'.") + if username != "": + headers["Authorization"] = aiohttp.BasicAuth(username, password).encode() + elif api_key != "": + headers["Authorization"] = "ApiKey {}".format(api_key) + return headers diff --git a/seacatauth/batman/service.py b/seacatauth/batman/service.py index 47ec1a3f..568e994c 100644 --- a/seacatauth/batman/service.py +++ b/seacatauth/batman/service.py @@ -28,10 +28,10 @@ def __init__(self, app, service_name="seacatauth.BatmanService"): # TODO: There should be no hardcoded encryption password self.Key = b"12345678901234567890123456789012" - if "batman:elk" in asab.Config.sections(): - from .elk import ELKIntegration + if "batman:kibana" in asab.Config.sections() or "batman:elk" in asab.Config.sections(): + from .kibana import KibanaIntegration self.Integrations.append( - ELKIntegration(self) + KibanaIntegration(self) ) if "batman:grafana" in asab.Config.sections(): diff --git a/seacatauth/credentials/service.py b/seacatauth/credentials/service.py index 0efe0422..662f3f1e 100644 --- a/seacatauth/credentials/service.py +++ b/seacatauth/credentials/service.py @@ -341,6 +341,8 @@ async def create_credentials(self, provider_id: str, credentials_data: dict, ses credentials_id=credentials_id, by_cid=agent_cid) + self.App.PubSub.publish("Credentials.created!", credentials_id=credentials_id) + return { "status": "OK", "credentials_id": credentials_id, @@ -457,6 +459,8 @@ async def update_credentials(self, credentials_id: str, update_dict: dict, sessi by_cid=agent_cid, attributes=list(validated_data.keys())) + self.App.PubSub.publish("Credentials.updated!", credentials_id=credentials_id) + return {"status": "OK"} @@ -506,6 +510,8 @@ async def delete_credentials(self, credentials_id: str, agent_cid: str = None): credentials_id=credentials_id, by_cid=agent_cid) + self.App.PubSub.publish("Credentials.deleted!", credentials_id=credentials_id) + return { "status": result, "credentials_id": credentials_id, diff --git a/seacatauth/tenant/handler.py b/seacatauth/tenant/handler.py index b66dc7dd..1c12fd75 100644 --- a/seacatauth/tenant/handler.py +++ b/seacatauth/tenant/handler.py @@ -56,11 +56,7 @@ async def list(self, request): """ List all registered tenant IDs """ - # TODO: This has to be cached agressivelly - provider = self.TenantService.get_provider() - result = [] - async for tenant in provider.iterate(): - result.append(tenant["_id"]) + result = await self.TenantService.list_tenant_ids() return asab.web.rest.json_response(request, data=result) diff --git a/seacatauth/tenant/service.py b/seacatauth/tenant/service.py index b3b6dd35..27aa906f 100644 --- a/seacatauth/tenant/service.py +++ b/seacatauth/tenant/service.py @@ -37,6 +37,28 @@ def create_provider(self, provider_id, config_section_name): self.TenantsProvider = provider + async def list_tenant_ids(self): + """ + List all registered tenant IDs + """ + # TODO: This has to be cached agressivelly + provider = self.get_provider() + result = [] + async for tenant in provider.iterate(): + result.append(tenant["_id"]) + return result + + + async def iterate(self): + """ + Iterate over all tenants + """ + # TODO: Limit, page, filter + provider = self.get_provider() + async for tenant in provider.iterate(): + yield tenant + + async def get_tenant(self, tenant_id: str): return await self.TenantsProvider.get(tenant_id) @@ -61,11 +83,16 @@ async def create_tenant( L.error("Tenant with this ID already exists.", struct_data={"tenant": tenant_id}) raise asab.exceptions.Conflict(value=tenant_id) + self.App.PubSub.publish("Tenant.created!", tenant_id=tenant_id) + return tenant_id async def update_tenant(self, tenant_id: str, **kwargs): result = await self.TenantsProvider.update(tenant_id, **kwargs) + + self.App.PubSub.publish("Tenant.updated!", tenant_id=tenant_id) + return {"result": result} @@ -92,6 +119,8 @@ async def delete_tenant(self, tenant_id: str): # Delete tenant from provider await self.TenantsProvider.delete(tenant_id) + self.App.PubSub.publish("Tenant.deleted!", tenant_id=tenant_id) + # Delete sessions that have the tenant in scope await session_service.delete_sessions_by_tenant_in_scope(tenant_id) @@ -225,6 +254,7 @@ async def assign_tenant( "cid": credentials_id, "tenant": tenant, }) + self.App.PubSub.publish("Tenant.assigned!", credentials_id=credentials_id, tenant_id=tenant) async def unassign_tenant(self, credentials_id: str, tenant: str): @@ -243,6 +273,7 @@ async def unassign_tenant(self, credentials_id: str, tenant: str): ) await self.TenantsProvider.unassign_tenant(credentials_id, tenant) + self.App.PubSub.publish("Tenant.unassigned!", credentials_id=credentials_id, tenant_id=tenant) def is_enabled(self):