Skip to content

Commit

Permalink
Alerts by rule description (#26)
Browse files Browse the repository at this point in the history
* alert routes

* alerts by rule description pie chart pdf
  • Loading branch information
taylorwalton authored Jul 14, 2023
1 parent ecb6193 commit c2154ae
Show file tree
Hide file tree
Showing 11 changed files with 501 additions and 82 deletions.
Binary file modified backend/alerts_by_host.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added backend/alerts_by_host_report.pdf
Binary file not shown.
Binary file added backend/alerts_by_rule.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added backend/alerts_by_rule_report.pdf
Binary file not shown.
56 changes: 55 additions & 1 deletion backend/app/routes/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_alerts() -> jsonify:
@bp.route("/alerts/top_10", methods=["GET"])
def get_top_10_alerts() -> jsonify:
"""
Retrieves top 10 alerts from the AlertsService.
Retrieves top 10 alerts from the AlertsService per index.
This endpoint retrieves top 10 alerts from the AlertsService. It does this by creating an instance of
the AlertsService class and calling its `collect_alerts` method. The result is a list of top 10 alerts currently
Expand All @@ -42,6 +42,24 @@ def get_top_10_alerts() -> jsonify:
return jsonify(alerts)


@bp.route("/alerts/index/<index_name>", methods=["GET"])
def get_alerts_by_index(index_name: str) -> jsonify:
"""
Retrieves all alerts from the AlertsService by index name.
This endpoint retrieves all available alerts from the AlertsService. It does this by creating an instance of
the AlertsService class and calling its `collect_alerts_by_index` method. The result is a list of all alerts currently
available.
Returns:
jsonify: A JSON response containing a list of alerts. Each item in the list is a dictionary representing an alert,
containing all its associated data.
"""
service = AlertsService()
alerts = service.collect_alerts_by_index(index_name=index_name, size=1000)
return jsonify(alerts)


@bp.route("/alerts/hosts", methods=["GET"])
def get_hosts() -> jsonify:
"""
Expand All @@ -58,3 +76,39 @@ def get_hosts() -> jsonify:
service = AlertsService()
hosts = service.collect_alerts_by_host()
return jsonify(hosts)


@bp.route("/alerts/rules", methods=["GET"])
def get_rules() -> jsonify:
"""
Retrieves all rules from the AlertsService that have an alert.
This endpoint retrieves all available rules from the AlertsService. It does this by creating an instance of
the AlertsService class and calling its `collect_alerts_by_rule` method. The result is a list of all rules currently
available.
Returns:
jsonify: A JSON response containing a list of rules. Each item in the list is a dictionary representing a rule,
containing all its associated data.
"""
service = AlertsService()
rules = service.collect_alerts_by_rule()
return jsonify(rules)


@bp.route("/alerts/rules/host", methods=["GET"])
def get_rules_by_host() -> jsonify:
"""
Retrieves all rules from the AlertsService that have an alert and organizes by host.
This endpoint retrieves all available rules from the AlertsService. It does this by creating an instance of
the AlertsService class and calling its `collect_alerts_by_rule_per_host` method. The result is a list of all rules currently
available.
Returns:
jsonify: A JSON response containing a list of rules. Each item in the list is a dictionary representing a rule,
containing all its associated data.
"""
service = AlertsService()
rules = service.collect_alerts_by_rule_per_host()
return jsonify(rules)
9 changes: 6 additions & 3 deletions backend/app/routes/smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from flask import request
from loguru import logger

from app.services.smtp.send_report import send_email_with_pdf
from app.services.smtp.send_report import EmailReportSender
from app.services.smtp.universal import UniversalEmailCredentials

bp = Blueprint("smtp", __name__)
Expand Down Expand Up @@ -58,6 +58,9 @@ def send_report() -> jsonify:
if not request.is_json:
return jsonify({"message": "Missing JSON in request", "success": False}), 400

# template_name = request.json.get("template_name", None)
send_report = send_email_with_pdf()
to_email = request.json.get("to_email", None)
if not to_email:
return jsonify({"message": "Missing 'to_email' in request", "success": False}), 400

send_report = EmailReportSender(to_email).send_email_with_pdf()
return jsonify(send_report), 201
71 changes: 71 additions & 0 deletions backend/app/services/WazuhIndexer/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ def collect_alerts(self, size: int) -> Dict[str, object]:
"alerts_summary": alerts_summary,
}

def collect_alerts_by_index(self, index_name: str, size: int) -> Dict[str, Any]:
"""
Collects alerts from the given index.
"""
if not self.is_valid_index(index_name):
return self._error_response("Invalid index name")

alerts = self._collect_alerts(index_name=index_name, size=size)
if not alerts["success"]:
return alerts

return {
"message": f"Successfully collected top {size} alerts from {index_name}",
"success": True,
"alerts": alerts["alerts"],
"total_alerts": len(alerts["alerts"]),
}

def collect_alerts_by_host(self) -> Dict[str, int]:
"""
Collects the number of alerts per host.
Expand All @@ -113,6 +131,59 @@ def collect_alerts_by_host(self) -> Dict[str, int]:
"alerts_by_host": alerts_by_host_list,
}

def collect_alerts_by_rule(self) -> Dict[str, int]:
"""
Collects the number of alerts per rule.
"""
indices_validation = self._collect_indices_and_validate()
if not indices_validation["success"]:
return indices_validation

alerts_by_rule_dict = {}
for index_name in indices_validation["indices"]:
alerts = self._collect_alerts(index_name=index_name, size=1000)
if alerts["success"]:
for alert in alerts["alerts"]:
rule = alert["_source"]["rule_description"]
alerts_by_rule_dict[rule] = alerts_by_rule_dict.get(rule, 0) + 1

alerts_by_rule_list = [{"rule": rule, "number_of_alerts": count} for rule, count in alerts_by_rule_dict.items()]

return {
"message": "Successfully collected alerts by rule",
"success": True,
"alerts_by_rule": alerts_by_rule_list,
}

def collect_alerts_by_rule_per_host(self) -> Dict[str, int]:
"""
Collects the number of alerts per rule per host.
"""
indices_validation = self._collect_indices_and_validate()
if not indices_validation["success"]:
return indices_validation

alerts_by_rule_per_host_dict = {}
for index_name in indices_validation["indices"]:
alerts = self._collect_alerts(index_name=index_name, size=1000)
if alerts["success"]:
for alert in alerts["alerts"]:
rule = alert["_source"]["rule_description"]
host = alert["_source"]["agent_name"]
alerts_by_rule_per_host_dict[rule] = alerts_by_rule_per_host_dict.get(rule, {})
alerts_by_rule_per_host_dict[rule][host] = alerts_by_rule_per_host_dict[rule].get(host, 0) + 1

alerts_by_rule_per_host_list = []
for rule, hosts in alerts_by_rule_per_host_dict.items():
for host, count in hosts.items():
alerts_by_rule_per_host_list.append({"rule": rule, "hostname": host, "number_of_alerts": count})

return {
"message": "Successfully collected alerts by rule per host",
"success": True,
"alerts_by_rule_per_host": alerts_by_rule_per_host_list,
}

@staticmethod
def _error_response(message: str) -> Dict[str, bool]:
"""
Expand Down
78 changes: 39 additions & 39 deletions backend/app/services/smtp/create_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,64 @@

matplotlib.use(
"Agg",
) # set the backend to Agg which is a non-interactive backend suitable for
# scripts and web servers. This should resolve the main thread is not in main
# loop issue as it bypasses the need for tkinter.
) # set the backend to Agg which is a non-interactive backend suitable
# for scripts and web servers. This should resolve the main thread is not
# in main loop issue as it bypasses the need for tkinter.
import matplotlib.pyplot as plt

from app.services.WazuhIndexer.alerts import AlertsService


def create_bar_chart(alerts_by_host: dict) -> None:
"""
Creates a horizontal bar chart with hostnames on the y-axis and the number of alerts on the x-axis.
def fetch_alert_data(service, fetch_func):
alerts = fetch_func()
logger.info(alerts)
return alerts

Args:
alerts_by_host (dict): A dictionary containing hostnames and the corresponding number of alerts.

Returns:
None
"""
hostnames = [alert["hostname"] for alert in alerts_by_host["alerts_by_host"]]
num_alerts = [alert["number_of_alerts"] for alert in alerts_by_host["alerts_by_host"]]
def create_bar_chart(alerts: dict, title: str, output_filename: str) -> None:
entities = [alert["hostname"] for alert in alerts["alerts_by_host"]]
num_alerts = [alert["number_of_alerts"] for alert in alerts["alerts_by_host"]]

plt.figure(figsize=(10, 6)) # Set the figure size
plt.barh(hostnames, num_alerts, color="blue") # Create a horizontal bar chart
plt.xlabel("Number of Alerts") # Label x-axis
plt.ylabel("Hostnames") # Label y-axis
plt.title("Number of Alerts by Host") # Title of the chart
plt.figure(figsize=(10, 6))
plt.barh(entities, num_alerts, color="blue")
plt.xlabel("Number of Alerts")
plt.ylabel("Hostnames")
plt.title(title)
plt.tight_layout()
plt.savefig("alerts_by_host.png") # Save the figure as a .png file
plt.savefig(output_filename)


def create_pdf() -> None:
"""
Creates a PDF file with a title and an image of the bar chart.
def create_pie_chart(alerts: dict, title: str, output_filename: str) -> None:
entities = [alert["rule"] for alert in alerts["alerts_by_rule"]]
num_alerts = [alert["number_of_alerts"] for alert in alerts["alerts_by_rule"]]

Returns:
None
"""
c = canvas.Canvas("report.pdf", pagesize=letter)
plt.figure(figsize=(10, 6))
plt.pie(num_alerts, labels=entities, autopct="%1.1f%%")
plt.title(title)
plt.tight_layout()
plt.savefig(output_filename)


def create_pdf(title: str, image_filename: str, pdf_filename: str) -> None:
c = canvas.Canvas(pdf_filename, pagesize=letter)
width, height = letter
c.setFont("Helvetica", 24)
c.drawString(30, height - 50, "Test")
c.drawImage("alerts_by_host.png", 50, height / 2, width=400, height=300) # Draw the image in the canvas
c.drawString(30, height - 50, title)
c.drawImage(image_filename, 50, height / 2, width=400, height=300)
c.save()


def create_alerts_by_host_pdf() -> None:
"""
Creates a PDF report of alerts per host.
service = AlertsService()
alerts_by_host = fetch_alert_data(service, service.collect_alerts_by_host)

create_bar_chart(alerts_by_host, "Number of Alerts by Host", "alerts_by_host.png")
create_pdf("Test", "alerts_by_host.png", "alerts_by_host_report.pdf")

This function fetches the alerts per host from the AlertsService, generates a bar chart image using the fetched data,
and finally creates a PDF file with the generated image.

Returns:
None
"""
def create_alerts_by_rules_pdf() -> None:
service = AlertsService()
alerts_by_host = service.collect_alerts_by_host()
logger.info(alerts_by_host)
alerts_by_rules = fetch_alert_data(service, service.collect_alerts_by_rule)

create_bar_chart(alerts_by_host)
create_pdf()
create_pie_chart(alerts_by_rules, "Number of Alerts by Rule", "alerts_by_rule.png")
create_pdf("Test", "alerts_by_rule.png", "alerts_by_rule_report.pdf")
87 changes: 51 additions & 36 deletions backend/app/services/smtp/send_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,69 @@
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List

from app.services.smtp.create_report import create_alerts_by_host_pdf
from app.services.smtp.create_report import create_alerts_by_rules_pdf
from app.services.smtp.universal import EmailTemplate
from app.services.smtp.universal import UniversalEmailCredentials


def create_email_message(subject: str, from_email: str, to_email: str, body: str) -> MIMEMultipart:
msg = MIMEMultipart()
msg["From"] = from_email
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "html"))
return msg
class EmailReportSender:
def __init__(self, to_email: str):
self.to_email = to_email

def _get_credentials(self) -> dict:
try:
return UniversalEmailCredentials.read_all()["emails_configured"][0]
except IndexError:
return {"error": "No email credentials found"}

def attach_pdf(msg: MIMEMultipart, filename: str) -> MIMEMultipart:
with open(filename, "rb") as attachment_file:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment_file.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename= {filename}")
msg.attach(part)
return msg
def create_email_message(self, subject: str, body: str) -> MIMEMultipart:
msg = MIMEMultipart()
credentials = self._get_credentials()
if "error" in credentials:
return credentials
msg["From"] = credentials["email"]
msg["To"] = self.to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "html"))
return msg

def attach_pdfs(self, msg: MIMEMultipart, filenames: List[str]) -> MIMEMultipart:
for filename in filenames:
with open(filename, "rb") as attachment_file:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment_file.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename= {filename}")
msg.attach(part)
return msg

def send_email_with_pdf():
# Generate the PDF report
create_alerts_by_host_pdf()
def send_email_with_pdf(self):
# Generate the PDF reports
create_alerts_by_host_pdf()
create_alerts_by_rules_pdf()

# Get email credentials
try:
credentials = UniversalEmailCredentials.read_all()["emails_configured"][0]
except IndexError:
raise Exception("No email credentials found in the database.")
# Render the email body
template = EmailTemplate("email_template")
body = template.render_html_body(template_name="email_template")

# Render the email body
template = EmailTemplate("email_template")
body = template.render_html_body(template_name="email_template")
# Create the email message and attach the PDFs
msg = self.create_email_message("Test Report", body)
if isinstance(msg, dict) and "error" in msg:
return {"message": msg["error"], "success": False}
msg = self.attach_pdfs(msg, ["alerts_by_host_report.pdf", "alerts_by_rule_report.pdf"])

# Create the email message and attach the PDF
msg = create_email_message("Test Report", credentials["email"], "walton.taylor23@gmail.com", body)
msg = attach_pdf(msg, "report.pdf")
credentials = self._get_credentials()
if "error" in credentials:
return {"message": credentials["error"], "success": False}

# Send the email
with smtplib.SMTP(credentials["smtp_server"], credentials["smtp_port"]) as server:
server.starttls()
server.login(credentials["email"], credentials["password"])
text = msg.as_string()
server.sendmail(credentials["email"], "walton.taylor23@gmail.com", text)
# Send the email
with smtplib.SMTP(credentials["smtp_server"], credentials["smtp_port"]) as server:
server.starttls()
server.login(credentials["email"], credentials["password"])
text = msg.as_string()
server.sendmail(credentials["email"], self.to_email, text)

return {"message": "Report sent successfully", "success": True}
return {"message": "Report sent successfully", "success": True}
Loading

0 comments on commit c2154ae

Please sign in to comment.