Skip to content

fix: removing outdated code in Kibana client auth #4495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 13 additions & 54 deletions detection_rules/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,7 @@
import click
import requests


# this is primarily for type hinting - all use of the github client should come from GithubClient class
try:
from github import Github
from github.Repository import Repository
from github.GitRelease import GitRelease
from github.GitReleaseAsset import GitReleaseAsset
except ImportError:
# for type hinting
Github = None # noqa: N806
Repository = None # noqa: N806
GitRelease = None # noqa: N806
GitReleaseAsset = None # noqa: N806
from kibana import Kibana

from .utils import add_params, cached, get_path, load_etc_dump

Expand Down Expand Up @@ -348,57 +336,28 @@ def get_elasticsearch_client(cloud_id: str = None, elasticsearch_url: str = None
client_error(error_msg, e, ctx=ctx, err=True)


def get_kibana_client(cloud_id: str, kibana_url: str, kibana_user: str, kibana_password: str, kibana_cookie: str,
space: str, ignore_ssl_errors: bool, provider_type: str, provider_name: str, api_key: str,
**kwargs):
def get_kibana_client(
*,
api_key: str,
cloud_id: str | None = None,
kibana_url: str | None = None,
space: str | None = None,
ignore_ssl_errors: bool = False,
**kwargs
):
"""Get an authenticated Kibana client."""
from requests import HTTPError
from kibana import Kibana

if not (cloud_id or kibana_url):
client_error("Missing required --cloud-id or --kibana-url")

if not (kibana_cookie or api_key):
# don't prompt for these until there's a cloud id or Kibana URL
kibana_user = kibana_user or click.prompt("kibana_user")
kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True)

verify = not ignore_ssl_errors

with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, space=space, verify=verify, **kwargs) as kibana:
if kibana_cookie:
kibana.add_cookie(kibana_cookie)
return kibana
elif api_key:
kibana.add_api_key(api_key)
return kibana

try:
kibana.login(kibana_user, kibana_password, provider_type=provider_type, provider_name=provider_name)
except HTTPError as exc:
if exc.response.status_code == 401:
err_msg = f'Authentication failed for {kibana_url}. If credentials are valid, check --provider-name'
client_error(err_msg, exc, err=True)
else:
raise

return kibana
return Kibana(cloud_id=cloud_id, kibana_url=kibana_url, space=space, verify=verify, api_key=api_key, **kwargs)


client_options = {
'kibana': {
'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id'),
help="ID of the cloud instance."),
'api_key': click.Option(['--api-key'], default=getdefault('api_key')),
'kibana_cookie': click.Option(['--kibana-cookie', '-kc'], default=getdefault('kibana_cookie'),
help='Cookie from an authed session'),
'kibana_password': click.Option(['--kibana-password', '-kp'], default=getdefault('kibana_password')),
'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')),
'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')),
'provider_type': click.Option(['--provider-type'], default=getdefault('provider_type'),
help="Elastic Cloud providers: basic and saml (for SSO)"),
'provider_name': click.Option(['--provider-name'], default=getdefault('provider_name'),
help="Elastic Cloud providers: cloud-basic and cloud-saml (for SSO)"),
'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id'), help="ID of the cloud instance."),
'api_key': click.Option(['--api-key'], default=getdefault('api_key')),
'space': click.Option(['--space'], default=None, help='Kibana space'),
'ignore_ssl_errors': click.Option(['--ignore-ssl-errors'], default=getdefault('ignore_ssl_errors'))
},
Expand Down
33 changes: 19 additions & 14 deletions detection_rules/remote_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ class RemoteConnector:

def __init__(self, parse_config: bool = False, **kwargs):
es_args = ['cloud_id', 'ignore_ssl_errors', 'elasticsearch_url', 'es_user', 'es_password', 'timeout']
kibana_args = [
'cloud_id', 'ignore_ssl_errors', 'kibana_url', 'kibana_user', 'kibana_password', 'space', 'kibana_cookie',
'provider_type', 'provider_name'
]
kibana_args = ['cloud_id', 'ignore_ssl_errors', 'kibana_url', 'api_key', 'space']

if parse_config:
es_kwargs = {arg: getdefault(arg)() for arg in es_args}
Expand Down Expand Up @@ -73,17 +70,25 @@ def auth_es(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional
es_password=es_password, timeout=timeout, **kwargs)
return self.es_client

def auth_kibana(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None,
kibana_url: Optional[str] = None, kibana_user: Optional[str] = None,
kibana_password: Optional[str] = None, space: Optional[str] = None,
kibana_cookie: Optional[str] = None, provider_type: Optional[str] = None,
provider_name: Optional[str] = None, **kwargs) -> Kibana:
def auth_kibana(
self,
*,
api_key: str,
cloud_id: str | None = None,
kibana_url: str | None = None,
space: str | None = None,
ignore_ssl_errors: bool = False,
**kwargs
) -> Kibana:
"""Return an authenticated Kibana client."""
self.kibana_client = get_kibana_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors,
kibana_url=kibana_url, kibana_user=kibana_user,
kibana_password=kibana_password, space=space,
kibana_cookie=kibana_cookie, provider_type=provider_type,
provider_name=provider_name, **kwargs)
self.kibana_client = get_kibana_client(
cloud_id=cloud_id,
ignore_ssl_errors=ignore_ssl_errors,
kibana_url=kibana_url,
api_key=api_key,
space=space,
**kwargs
)
return self.kibana_client


Expand Down
97 changes: 25 additions & 72 deletions lib/kibana/kibana/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,30 @@
import uuid
from typing import List, Optional, Union

from urllib.parse import urljoin
import requests
from elasticsearch import Elasticsearch

_context = threading.local()


class Kibana(object):
class Kibana:
"""Wrapper around the Kibana SIEM APIs."""

CACHED = False

def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=None, space=None):
def __init__(self, cloud_id=None, kibana_url=None, api_key=None, verify=True, elasticsearch=None, space=None):
""""Open a session to the platform."""
self.authenticated = False

self.session = requests.Session()
self.session.verify = verify

if api_key:
self.session.headers.update(
{
"kbn-xsrf": "true",
"Authorization": f"ApiKey {api_key}",
}
)

self.verify = verify

self.cloud_id = cloud_id
Expand All @@ -37,9 +44,6 @@ def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=No
self.space = space if space and space.lower() != 'default' else None
self.status = None

self.provider_name = None
self.provider_type = None

if self.cloud_id:
self.cluster_name, cloud_info = self.cloud_id.split(":")
self.domain, self.es_uuid, self.kibana_uuid = \
Expand All @@ -50,18 +54,24 @@ def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=No

kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243"
if self.kibana_url and self.kibana_url != kibana_url_from_cloud:
raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '
f'{kibana_url_from_cloud}')
raise ValueError(
f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '
f'{kibana_url_from_cloud}'
)
self.kibana_url = kibana_url_from_cloud

self.elastic_url = f"https://{self.es_uuid}.{self.domain}:9243"

self.provider_name = 'cloud-basic'
self.provider_type = 'basic'

self.session.headers.update({'Content-Type': "application/json", "kbn-xsrf": str(uuid.uuid4())})
self.elasticsearch = elasticsearch

if not self.elasticsearch and self.elastic_url:
self.elasticsearch = Elasticsearch(
hosts=[self.elastic_url],
api_key=api_key,
verify_certs=self.verify,
)
self.elasticsearch.info()

if not verify:
from requests.packages.urllib3.exceptions import \
InsecureRequestWarning
Expand All @@ -75,7 +85,7 @@ def version(self):
return self.status.get("version", {}).get("number")

@staticmethod
def ndjson_file_data_prep(lines: List[dict], filename: str) -> (dict, str):
def ndjson_file_data_prep(lines: List[dict], filename: str) -> tuple[dict, str]:
"""Prepare a request for an ndjson file upload to Kibana."""
data = ('\n'.join(json.dumps(r) for r in lines) + '\n')
boundary = '----JustAnotherBoundary'
Expand Down Expand Up @@ -144,63 +154,6 @@ def delete(self, uri, params=None, error=True, **kwargs):
"""Perform an HTTP DELETE."""
return self.request('DELETE', uri, params=params, error=error, **kwargs)

def login(self, kibana_username, kibana_password, provider_type=None, provider_name=None):
"""Authenticate to Kibana using the API to update our cookies."""
payload = {'username': kibana_username, 'password': kibana_password}
path = '/internal/security/login'

try:
self.post(path, data=payload, error=True, verbose=False)
except requests.HTTPError as e:
# 7.10 changed the structure of the auth data
# providers dictated by Kibana configs in:
# https://www.elastic.co/guide/en/kibana/current/security-settings-kb.html#authentication-security-settings
# more details: https://discuss.elastic.co/t/kibana-7-10-login-issues/255201/2
if e.response.status_code == 400 and '[undefined]' in e.response.text:
provider_type = provider_type or self.provider_type or 'basic'
provider_name = provider_name or self.provider_name or 'basic'

payload = {
'params': payload,
'currentURL': '',
'providerType': provider_type,
'providerName': provider_name
}
self.post(path, data=payload, error=True)
else:
raise

# Kibana will authenticate against URLs which contain invalid spaces
if self.space:
self.verify_space(self.space)

self.authenticated = True
self.status = self.get("/api/status")

# create ES and force authentication
if self.elasticsearch is None and self.elastic_url is not None:
self.elasticsearch = Elasticsearch(hosts=[self.elastic_url], http_auth=(kibana_username, kibana_password),
verify_certs=self.verify)
self.elasticsearch.info()

# make chaining easier
return self

def add_cookie(self, cookie):
"""Add cookie to be used for auth (such as from an SSO session)."""
# https://www.elastic.co/guide/en/kibana/7.10/security-settings-kb.html#security-session-and-cookie-settings
self.session.headers['sid'] = cookie
self.session.cookies.set('sid', cookie)
self.status = self.get('/api/status')
self.authenticated = True

def add_api_key(self, api_key: str) -> bool:
"""Add an API key to be used for auth."""
self.session.headers['Authorization'] = f'ApiKey {api_key}'
self.status = self.get('/api/status')
self.authenticated = True
return bool(self.status)

def logout(self):
"""Quit the current session."""
try:
Expand Down
2 changes: 1 addition & 1 deletion lib/kibana/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection-rules-kibana"
version = "0.4.1"
version = "0.4.2"
description = "Kibana API utilities for Elastic Detection Rules"
license = {text = "Elastic License v2"}
keywords = ["Elastic", "Kibana", "Detection Rules", "Security", "Elasticsearch"]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "0.4.26"
version = "1.0.0"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading