From 8ea0e2447696b18b9968d63b3520fcb70ccca0f4 Mon Sep 17 00:00:00 2001 From: taylor_socfortress <111797488+taylorwalton@users.noreply.github.com> Date: Fri, 14 Jul 2023 12:59:41 -0600 Subject: [PATCH] alerts by host and more modular alerts.py (#24) --- backend/app/routes/alerts.py | 38 ++++++- backend/app/services/WazuhIndexer/alerts.py | 111 ++++++++++++-------- backend/app/services/smtp/create_report.py | 14 ++- backend/report.pdf | Bin 1408 -> 1408 bytes 4 files changed, 114 insertions(+), 49 deletions(-) diff --git a/backend/app/routes/alerts.py b/backend/app/routes/alerts.py index 3626c12e..368f10ec 100644 --- a/backend/app/routes/alerts.py +++ b/backend/app/routes/alerts.py @@ -20,5 +20,41 @@ def get_alerts() -> jsonify: containing all its associated data. """ service = AlertsService() - alerts = service.collect_alerts() + alerts = service.collect_alerts(size=1000) # replace `collect_all_alerts` with `collect_alerts(size=1000)` return jsonify(alerts) + + +@bp.route("/alerts/top_10", methods=["GET"]) +def get_top_10_alerts() -> jsonify: + """ + Retrieves top 10 alerts from the AlertsService. + + This endpoint retrieves top 10 alerts from the AlertsService. It does this by creating an instance of + the AlertsService class and calling its `collect_alerts` method. The result is a list of top 10 alerts currently + available. + + Returns: + jsonify: A JSON response containing a list of alerts. Each item in the list is a dictionary representing an alert, + containing all its associated data. + """ + service = AlertsService() + alerts = service.collect_alerts(size=10) # replace `collect_top_10_alerts` with `collect_alerts(size=10)` + return jsonify(alerts) + + +@bp.route("/alerts/hosts", methods=["GET"]) +def get_hosts() -> jsonify: + """ + Retrieves all hosts from the AlertsService that have an alert. + + This endpoint retrieves all available hosts from the AlertsService. It does this by creating an instance of + the AlertsService class and calling its `collect_alerts_by_host` method. The result is a list of all hosts currently + available. + + Returns: + jsonify: A JSON response containing a list of hosts. Each item in the list is a dictionary representing a host, + containing all its associated data. + """ + service = AlertsService() + hosts = service.collect_alerts_by_host() + return jsonify(hosts) diff --git a/backend/app/services/WazuhIndexer/alerts.py b/backend/app/services/WazuhIndexer/alerts.py index 0a1e86cc..b5224b13 100644 --- a/backend/app/services/WazuhIndexer/alerts.py +++ b/backend/app/services/WazuhIndexer/alerts.py @@ -1,3 +1,4 @@ +from typing import Any from typing import Dict from elasticsearch7 import Elasticsearch @@ -9,12 +10,6 @@ class AlertsService: """ A service class that encapsulates the logic for pulling alerts from the Wazuh-Indexer. - - Attributes: - connector_url (str): The url to the Wazuh-Indexer. - connector_username (str): The username for the Wazuh-Indexer. - connector_password (str): The password for the Wazuh-Indexer. - es (Elasticsearch): The Elasticsearch client. """ SKIP_INDEX_NAMES: Dict[str, bool] = { @@ -26,11 +21,12 @@ def __init__(self): """ Initializes the service by collecting Wazuh-Indexer details and creating an Elasticsearch client. """ + self.universal_service = UniversalService() ( self.connector_url, self.connector_username, self.connector_password, - ) = UniversalService().collect_wazuhindexer_details("Wazuh-Indexer") + ) = self.universal_service.collect_wazuhindexer_details("Wazuh-Indexer") self.es = Elasticsearch( [self.connector_url], http_auth=(self.connector_username, self.connector_password), @@ -43,67 +39,96 @@ def __init__(self): def is_index_skipped(self, index_name: str) -> bool: """ Checks whether the given index name should be skipped. - - Args: - index_name (str): The name of the index. - - Returns: - bool: True if the index should be skipped, False otherwise. """ - for skipped in self.SKIP_INDEX_NAMES: - if index_name.startswith(skipped): - return True - return False + return any(index_name.startswith(skipped) for skipped in self.SKIP_INDEX_NAMES) - def collect_alerts(self) -> Dict[str, object]: + def is_valid_index(self, index_name: str) -> bool: + """ + Checks if the index name starts with "wazuh_" and is not in the SKIP_INDEX_NAMES list. """ - Collects the alerts from the Wazuh-Indexer where the index name starts with "wazuh_" - and is not in the SKIP_INDEX_NAMES list. - Returns the 10 previous alerts based on the `timestamp_utc` field. + return index_name.startswith("wazuh_") and not self.is_index_skipped(index_name) - Returns: - Dict[str, object]: A dictionary containing success status and alerts or an error message. + def _collect_indices_and_validate(self) -> Dict[str, Any]: """ - if not all( - [self.connector_url, self.connector_username, self.connector_password], - ): - return { - "message": "Failed to collect Wazuh-Indexer details", - "success": False, - } + Collect indices and validate connector details. + """ + if not all([self.connector_url, self.connector_username, self.connector_password]): + return self._error_response("Failed to collect Wazuh-Indexer details") - indices_list = UniversalService().collect_indices() + indices_list = self.universal_service.collect_indices() if not indices_list["success"]: - return {"message": "Failed to collect indices", "success": False} + return self._error_response("Failed to collect indices") - alerts_summary = [] - for index_name in indices_list["indices_list"]: - if not index_name.startswith("wazuh_") or self.is_index_skipped(index_name): - continue + valid_indices = [index for index in indices_list["indices_list"] if self.is_valid_index(index)] + + return {"success": True, "indices": valid_indices} + + def collect_alerts(self, size: int) -> Dict[str, object]: + """ + Collects alerts from the Wazuh-Indexer. + """ + indices_validation = self._collect_indices_and_validate() + if not indices_validation["success"]: + return indices_validation - alerts = self._collect_alerts(index_name) + alerts_summary = [] + for index_name in indices_validation["indices"]: + alerts = self._collect_alerts(index_name, size=size) if alerts["success"] and len(alerts["alerts"]) > 0: alerts_summary.append( { "index_name": index_name, "total_alerts": len(alerts["alerts"]), - "last_10_alerts": alerts["alerts"], + "alerts": alerts["alerts"], }, ) return { - "message": "Successfully collected alerts", + "message": f"Successfully collected top {size} alerts", "success": True, "alerts_summary": alerts_summary, } - def _collect_alerts(self, index_name: str) -> Dict[str, object]: + def collect_alerts_by_host(self) -> Dict[str, int]: + """ + Collects the number of alerts per host. + """ + indices_validation = self._collect_indices_and_validate() + if not indices_validation["success"]: + return indices_validation + + alerts_by_host_dict = {} + for index_name in indices_validation["indices"]: + alerts = self._collect_alerts(index_name=index_name, size=1000) + if alerts["success"]: + for alert in alerts["alerts"]: + host = alert["_source"]["agent_name"] + alerts_by_host_dict[host] = alerts_by_host_dict.get(host, 0) + 1 + + alerts_by_host_list = [{"hostname": host, "number_of_alerts": count} for host, count in alerts_by_host_dict.items()] + + return { + "message": "Successfully collected alerts by host", + "success": True, + "alerts_by_host": alerts_by_host_list, + } + + @staticmethod + def _error_response(message: str) -> Dict[str, bool]: + """ + Standardizes the error response format. + """ + return {"message": message, "success": False} + + def _collect_alerts(self, index_name: str, size: int = None) -> Dict[str, object]: """ - Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or the + Elasticsearch query to get the most recent alerts where the `rule_level` is 12 or higher or the `syslog_level` field is `ALERT` and return the results in descending order by the `timestamp_utc` field. + The number of alerts to return can be limited by the `size` parameter. Args: index_name (str): The name of the index to query. + size (int, optional): The maximum number of alerts to return. If None, all alerts are returned. Returns: Dict[str, object]: A dictionary containing success status and alerts or an error message. @@ -111,7 +136,7 @@ def _collect_alerts(self, index_name: str) -> Dict[str, object]: logger.info(f"Collecting alerts from {index_name}") query = self._build_query() try: - alerts = self.es.search(index=index_name, body=query, size=10) + alerts = self.es.search(index=index_name, body=query, size=size) alerts_list = [alert for alert in alerts["hits"]["hits"]] return { "message": "Successfully collected alerts", @@ -125,7 +150,7 @@ def _collect_alerts(self, index_name: str) -> Dict[str, object]: @staticmethod def _build_query() -> Dict[str, object]: """ - Builds the Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or + Builds the Elasticsearch query to get the most recent alerts where the `rule_level` is 12 or higher or the `syslog_level` field is `ALERT`. Returns: diff --git a/backend/app/services/smtp/create_report.py b/backend/app/services/smtp/create_report.py index 1b509f61..02e5a824 100644 --- a/backend/app/services/smtp/create_report.py +++ b/backend/app/services/smtp/create_report.py @@ -1,13 +1,17 @@ -# from app.services.WazuhIndexer.alerts import AlertsService -# from loguru import logger +from loguru import logger from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas +from app.services.WazuhIndexer.alerts import AlertsService + def create_pdf(): - # service = AlertsService() - # alerts = service.collect_alerts() - # logger.info(alerts) + service = AlertsService() + alerts = service.collect_alerts() + alerts_by_host_percentage = service.get_alerts_by_host_percentage(alerts["alerts_summary"]) + logger.info(alerts_by_host_percentage) + alerts_by_host_per_index = service.get_alerts_by_host_percentage_by_index_name(alerts["alerts_summary"]) + logger.info(alerts_by_host_per_index) c = canvas.Canvas("report.pdf", pagesize=letter) width, height = letter c.setFont("Helvetica", 24) diff --git a/backend/report.pdf b/backend/report.pdf index 51d58db1790ce97a67aef46ba2ef491ba19d820c..fe743e452b9d64ed9d8833b035b83d13f4335198 100644 GIT binary patch delta 99 zcmZqRZs6V^$i!-FXkctKS&S(G!rENTWY6SenPiw`XkrQ^jf_%^6D?CMO;Zxn%q>$b TOf8H}ERyVO2&tI-houDo-9s9? delta 99 zcmZqRZs6V^$i!-BYGP_IS&S(G!rENTWY6SekY-?!YHDs`WN4CXV3eF>W@c$=o?>ol Ukz$gRVr*z=XG2KE