Skip to content

Commit

Permalink
alerts by host and more modular alerts.py (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorwalton authored Jul 14, 2023
1 parent 02475d0 commit 8ea0e24
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 49 deletions.
38 changes: 37 additions & 1 deletion backend/app/routes/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,41 @@ def get_alerts() -> jsonify:
containing all its associated data.
"""
service = AlertsService()
alerts = service.collect_alerts()
alerts = service.collect_alerts(size=1000) # replace `collect_all_alerts` with `collect_alerts(size=1000)`
return jsonify(alerts)


@bp.route("/alerts/top_10", methods=["GET"])
def get_top_10_alerts() -> jsonify:
"""
Retrieves top 10 alerts from the AlertsService.
This endpoint retrieves top 10 alerts from the AlertsService. It does this by creating an instance of
the AlertsService class and calling its `collect_alerts` method. The result is a list of top 10 alerts currently
available.
Returns:
jsonify: A JSON response containing a list of alerts. Each item in the list is a dictionary representing an alert,
containing all its associated data.
"""
service = AlertsService()
alerts = service.collect_alerts(size=10) # replace `collect_top_10_alerts` with `collect_alerts(size=10)`
return jsonify(alerts)


@bp.route("/alerts/hosts", methods=["GET"])
def get_hosts() -> jsonify:
"""
Retrieves all hosts from the AlertsService that have an alert.
This endpoint retrieves all available hosts from the AlertsService. It does this by creating an instance of
the AlertsService class and calling its `collect_alerts_by_host` method. The result is a list of all hosts currently
available.
Returns:
jsonify: A JSON response containing a list of hosts. Each item in the list is a dictionary representing a host,
containing all its associated data.
"""
service = AlertsService()
hosts = service.collect_alerts_by_host()
return jsonify(hosts)
111 changes: 68 additions & 43 deletions backend/app/services/WazuhIndexer/alerts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any
from typing import Dict

from elasticsearch7 import Elasticsearch
Expand All @@ -9,12 +10,6 @@
class AlertsService:
"""
A service class that encapsulates the logic for pulling alerts from the Wazuh-Indexer.
Attributes:
connector_url (str): The url to the Wazuh-Indexer.
connector_username (str): The username for the Wazuh-Indexer.
connector_password (str): The password for the Wazuh-Indexer.
es (Elasticsearch): The Elasticsearch client.
"""

SKIP_INDEX_NAMES: Dict[str, bool] = {
Expand All @@ -26,11 +21,12 @@ def __init__(self):
"""
Initializes the service by collecting Wazuh-Indexer details and creating an Elasticsearch client.
"""
self.universal_service = UniversalService()
(
self.connector_url,
self.connector_username,
self.connector_password,
) = UniversalService().collect_wazuhindexer_details("Wazuh-Indexer")
) = self.universal_service.collect_wazuhindexer_details("Wazuh-Indexer")
self.es = Elasticsearch(
[self.connector_url],
http_auth=(self.connector_username, self.connector_password),
Expand All @@ -43,75 +39,104 @@ def __init__(self):
def is_index_skipped(self, index_name: str) -> bool:
"""
Checks whether the given index name should be skipped.
Args:
index_name (str): The name of the index.
Returns:
bool: True if the index should be skipped, False otherwise.
"""
for skipped in self.SKIP_INDEX_NAMES:
if index_name.startswith(skipped):
return True
return False
return any(index_name.startswith(skipped) for skipped in self.SKIP_INDEX_NAMES)

def collect_alerts(self) -> Dict[str, object]:
def is_valid_index(self, index_name: str) -> bool:
"""
Checks if the index name starts with "wazuh_" and is not in the SKIP_INDEX_NAMES list.
"""
Collects the alerts from the Wazuh-Indexer where the index name starts with "wazuh_"
and is not in the SKIP_INDEX_NAMES list.
Returns the 10 previous alerts based on the `timestamp_utc` field.
return index_name.startswith("wazuh_") and not self.is_index_skipped(index_name)

Returns:
Dict[str, object]: A dictionary containing success status and alerts or an error message.
def _collect_indices_and_validate(self) -> Dict[str, Any]:
"""
if not all(
[self.connector_url, self.connector_username, self.connector_password],
):
return {
"message": "Failed to collect Wazuh-Indexer details",
"success": False,
}
Collect indices and validate connector details.
"""
if not all([self.connector_url, self.connector_username, self.connector_password]):
return self._error_response("Failed to collect Wazuh-Indexer details")

indices_list = UniversalService().collect_indices()
indices_list = self.universal_service.collect_indices()
if not indices_list["success"]:
return {"message": "Failed to collect indices", "success": False}
return self._error_response("Failed to collect indices")

alerts_summary = []
for index_name in indices_list["indices_list"]:
if not index_name.startswith("wazuh_") or self.is_index_skipped(index_name):
continue
valid_indices = [index for index in indices_list["indices_list"] if self.is_valid_index(index)]

return {"success": True, "indices": valid_indices}

def collect_alerts(self, size: int) -> Dict[str, object]:
"""
Collects alerts from the Wazuh-Indexer.
"""
indices_validation = self._collect_indices_and_validate()
if not indices_validation["success"]:
return indices_validation

alerts = self._collect_alerts(index_name)
alerts_summary = []
for index_name in indices_validation["indices"]:
alerts = self._collect_alerts(index_name, size=size)
if alerts["success"] and len(alerts["alerts"]) > 0:
alerts_summary.append(
{
"index_name": index_name,
"total_alerts": len(alerts["alerts"]),
"last_10_alerts": alerts["alerts"],
"alerts": alerts["alerts"],
},
)

return {
"message": "Successfully collected alerts",
"message": f"Successfully collected top {size} alerts",
"success": True,
"alerts_summary": alerts_summary,
}

def _collect_alerts(self, index_name: str) -> Dict[str, object]:
def collect_alerts_by_host(self) -> Dict[str, int]:
"""
Collects the number of alerts per host.
"""
indices_validation = self._collect_indices_and_validate()
if not indices_validation["success"]:
return indices_validation

alerts_by_host_dict = {}
for index_name in indices_validation["indices"]:
alerts = self._collect_alerts(index_name=index_name, size=1000)
if alerts["success"]:
for alert in alerts["alerts"]:
host = alert["_source"]["agent_name"]
alerts_by_host_dict[host] = alerts_by_host_dict.get(host, 0) + 1

alerts_by_host_list = [{"hostname": host, "number_of_alerts": count} for host, count in alerts_by_host_dict.items()]

return {
"message": "Successfully collected alerts by host",
"success": True,
"alerts_by_host": alerts_by_host_list,
}

@staticmethod
def _error_response(message: str) -> Dict[str, bool]:
"""
Standardizes the error response format.
"""
return {"message": message, "success": False}

def _collect_alerts(self, index_name: str, size: int = None) -> Dict[str, object]:
"""
Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or the
Elasticsearch query to get the most recent alerts where the `rule_level` is 12 or higher or the
`syslog_level` field is `ALERT` and return the results in descending order by the `timestamp_utc` field.
The number of alerts to return can be limited by the `size` parameter.
Args:
index_name (str): The name of the index to query.
size (int, optional): The maximum number of alerts to return. If None, all alerts are returned.
Returns:
Dict[str, object]: A dictionary containing success status and alerts or an error message.
"""
logger.info(f"Collecting alerts from {index_name}")
query = self._build_query()
try:
alerts = self.es.search(index=index_name, body=query, size=10)
alerts = self.es.search(index=index_name, body=query, size=size)
alerts_list = [alert for alert in alerts["hits"]["hits"]]
return {
"message": "Successfully collected alerts",
Expand All @@ -125,7 +150,7 @@ def _collect_alerts(self, index_name: str) -> Dict[str, object]:
@staticmethod
def _build_query() -> Dict[str, object]:
"""
Builds the Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or
Builds the Elasticsearch query to get the most recent alerts where the `rule_level` is 12 or higher or
the `syslog_level` field is `ALERT`.
Returns:
Expand Down
14 changes: 9 additions & 5 deletions backend/app/services/smtp/create_report.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# from app.services.WazuhIndexer.alerts import AlertsService
# from loguru import logger
from loguru import logger
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

from app.services.WazuhIndexer.alerts import AlertsService


def create_pdf():
# service = AlertsService()
# alerts = service.collect_alerts()
# logger.info(alerts)
service = AlertsService()
alerts = service.collect_alerts()
alerts_by_host_percentage = service.get_alerts_by_host_percentage(alerts["alerts_summary"])
logger.info(alerts_by_host_percentage)
alerts_by_host_per_index = service.get_alerts_by_host_percentage_by_index_name(alerts["alerts_summary"])
logger.info(alerts_by_host_per_index)
c = canvas.Canvas("report.pdf", pagesize=letter)
width, height = letter
c.setFont("Helvetica", 24)
Expand Down
Binary file modified backend/report.pdf
Binary file not shown.

0 comments on commit 8ea0e24

Please sign in to comment.