From 54c75fcd3ff3ef43f8cafd6ec7baa4cf023d4c15 Mon Sep 17 00:00:00 2001 From: taylor_socfortress <111797488+taylorwalton@users.noreply.github.com> Date: Tue, 18 Jul 2023 14:46:05 -0600 Subject: [PATCH] create iris alert more modular and docstrings (#52) --- backend/app/services/DFIR_IRIS/alerts.py | 391 ++++++++++++++--------- 1 file changed, 245 insertions(+), 146 deletions(-) diff --git a/backend/app/services/DFIR_IRIS/alerts.py b/backend/app/services/DFIR_IRIS/alerts.py index c9f7769c..3f4aa862 100644 --- a/backend/app/services/DFIR_IRIS/alerts.py +++ b/backend/app/services/DFIR_IRIS/alerts.py @@ -1,12 +1,8 @@ -# Standard library imports from typing import Any from typing import Dict from typing import Set -# Local application imports from dfir_iris_client.alert import Alert - -# Third-party library imports from loguru import logger from app.models.agents import agent_metadata_schema @@ -21,6 +17,40 @@ class IRISAlertsService: """ A service class that encapsulates the logic for pulling alerts from DFIR-IRIS. + + Attributes + ---------- + universal_service : UniversalService + UniversalService object for interacting with "DFIR-IRIS" + iris_session : Optional[dict] + Session object for interacting with "DFIR-IRIS". None if session creation failed. + + Methods + ------- + _create_iris_session() -> Optional[dict]: + Create a session with the universal service. + list_alerts() -> Dict[str, Any]: + List all alerts from DFIR-IRIS. + create_alert_general(alert_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: + Create an alert with the provided data. + get_agent_data(agent_id: str) -> Dict[str, Any]: + Get agent data based on agent_id. + get_asset_type_id(os: str) -> int: + Use AssetTypeResolver to determine the asset type ID. + validate_ioc_type(ioc_value: str) -> str: + Validate IoC type using validators. + create_alert_with_payload(alert_payload: Dict[str, Any]) -> Dict[str, Any]: + Create an alert using given alert_payload. + field_exists_ioc(alert_data: Dict[str, Any]) -> Dict[str, Any]: + Check if an IoC field exists in the alert data. + valid_ioc_fields() -> Set[str]: + Get the set of valid IoC fields. + create_general_payload(alert_data: Dict[str, Any], agent_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: + Craft the general alert payload when it does not contain an IoC. + create_general_ioc_payload(alert_data: Dict[str, Any], agent_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: + Craft the general alert payload when it does contain an IoC. + create_base_payload(alert_data: Dict[str, Any], agent_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: + Craft the base alert payload. """ def __init__(self): @@ -74,72 +104,116 @@ def list_alerts(self) -> Dict[str, object]: def create_alert_general(self, alert_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: """ - Create an alert with the provided data. - """ - # Get agent data - agent_id = alert_data["agent_id"] - service = AgentService() - agent = service.get_agent(agent_id) - agent_data = agent_metadata_schema.dump(agent) + Create an alert within DFIR-IRIS with the provided data. - # Use AssetTypeResolver to determine the asset type ID - asset_resolver = AssetTypeResolver(agent_data["os"]) - agent_asset_type = asset_resolver.get_asset_type_id() + Parameters + ---------- + alert_data : Dict[str, Any] + The alert data used to create the alert. + alert_id : str + The ID of the alert. + index : str + The index. - # Append the asset type ID to the alert data - alert_data["asset_type_id"] = agent_asset_type + Returns + ------- + Dict[str, Any] + The result of the alert creation. Contains information on whether the alert creation was successful, + an associated message, and the resulting data. + """ + agent_data = self.get_agent_data(alert_data["agent_id"]) + alert_data["asset_type_id"] = self.get_asset_type_id(agent_data["os"]) - # Check if IoC field exists ioc_field_present = self.field_exists_ioc(alert_data) if ioc_field_present["success"]: logger.info(f"Found IoC field: {ioc_field_present}") alert_data["ioc_value"] = ioc_field_present["field_value"] + alert_data["ioc_type"] = self.validate_ioc_type(ioc_field_present["field_value"]) + alert_payload = self.create_general_ioc_payload(alert_data, agent_data, alert_id, index) + else: + alert_payload = self.create_general_payload(alert_data, agent_data, alert_id, index) - # Define the validator classes to be used - validators = [IPv4AddressValidator, HashValidator, DomainValidator] - ioc_type = None + return self.create_alert_with_payload(alert_payload) - # Iterate over each validator class - for Validator in validators: - validator = Validator(ioc_field_present["field_value"]) - result = validator.validate() + def get_agent_data(self, agent_id: str) -> Dict[str, Any]: + """ + Get agent data based on agent_id from the `agent_metadata` table. - # If the validation is successful, store the ioc_type and break the loop - if result["success"]: - ioc_type = result["ioc_type"] - break + Parameters + ---------- + agent_id : str + The ID of the agent. - if ioc_type is not None: - alert_data["ioc_type"] = ioc_type - else: - logger.error("Failed to validate IoC value.") + Returns + ------- + Dict[str, Any] + The agent data corresponding to the given agent_id. + """ + service = AgentService() + agent = service.get_agent(agent_id) + return agent_metadata_schema.dump(agent) - alert_payload = self.create_general_ioc_payload(alert_data=alert_data, agent_data=agent_data, alert_id=alert_id, index=index) + def get_asset_type_id(self, os: str) -> int: + """ + Use AssetTypeResolver to determine the asset type ID to set within DFIR-IRIS. - # Create an alert - alert = Alert(session=self.iris_session) - result = self.universal_service.fetch_and_parse_data( - self.iris_session, - alert.add_alert, - alert_payload, - ) + Parameters + ---------- + os : str + The operating system (OS) string used to resolve the asset type ID. - if not result["success"]: - return { - "success": False, - "message": "Failed to create alert in DFIR-IRIS", - } + Returns + ------- + int + The ID corresponding to the asset type. + """ + asset_resolver = AssetTypeResolver(os) + return asset_resolver.get_asset_type_id() - return { - "success": True, - "message": "Successfully created alert in DFIR-IRIS", - "results": result["data"], - } + def validate_ioc_type(self, ioc_value: str) -> str: + """ + Validate IoC type using validators. + + Parameters + ---------- + ioc_value : str + The value to validate the IoC type. + + Returns + ------- + str + The type of the IoC. Returns None if validation fails. + """ + validators = [IPv4AddressValidator, HashValidator, DomainValidator] + ioc_type = None + + for Validator in validators: + validator = Validator(ioc_value) + result = validator.validate() + + if result["success"]: + ioc_type = result["ioc_type"] + break + + if ioc_type is None: + logger.error("Failed to validate IoC value.") + return ioc_type - # Create alert payload - alert_payload = self.create_general_payload(alert_data=alert_data, agent_data=agent_data, alert_id=alert_id, index=index) + def create_alert_with_payload(self, alert_payload: Dict[str, Any]) -> Dict[str, Any]: + """ + Create the alert payload to be sent to DFIR-IRIS. + + Parameters + ---------- + alert_payload : Dict[str, Any] + The payload used to create the alert. - # Create an alert + Returns + ------- + Dict[str, Any] + The result of the alert creation. Contains information on whether the alert creation was successful, + an associated message, and the resulting data. + """ alert = Alert(session=self.iris_session) result = self.universal_service.fetch_and_parse_data( self.iris_session, @@ -161,7 +235,20 @@ def create_alert_general(self, alert_data: Dict[str, Any], alert_id: str, index: def field_exists_ioc(self, alert_data: Dict[str, Any]) -> Dict[str, Any]: """ - Check if an IoC field exists in the alert data. + Checks if an IoC field exists in the alert data. + + Parameters + ---------- + alert_data : Dict[str, Any] + The alert data to check for the presence of an IoC field. + + Returns + ------- + Dict[str, Any] + If an IoC field exists, returns a dictionary with 'success' set to True, + 'field_name' as the name of the field, and 'field_value' as the value of the field. + If an IoC field does not exist, returns a dictionary with 'success' set to False, + and 'field_name' set to None. """ for field_name in self.valid_ioc_fields: if field_name in alert_data: @@ -175,56 +262,39 @@ def field_exists_ioc(self, alert_data: Dict[str, Any]) -> Dict[str, Any]: @property def valid_ioc_fields(self) -> Set[str]: """ - Get the set of valid IoC fields. + Getter for the set of valid IoC fields. + + Returns + ------- + Set[str] + The set of valid IoC fields. """ return {"misp_value", "opencti_value", "threat_intel_value"} def create_general_payload(self, alert_data: Dict[str, Any], agent_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: """ - Craft the general alert payload when it does not contain an IoC. + Crafts the general alert payload when it does not contain an IoC. + + Parameters + ---------- + alert_data : Dict[str, Any] + The alert data. + agent_data : Dict[str, Any] + The agent data. + alert_id : str + The ID of the alert. + index : str + The index. + + Returns + ------- + Dict[str, Any] + The crafted alert payload. If an error occurs during crafting, + a dictionary with 'success' set to False and an error message is returned. """ try: - payload = { - "alert_title": alert_data["rule_description"], - "alert_description": alert_data["rule_description"], - "alert_source": "Wazuh", - "assets": [ - { - "asset_name": agent_data["hostname"], - "asset_ip": agent_data["ip_address"], - "asset_description": agent_data["os"], - "asset_type_id": alert_data["asset_type_id"], - }, - ], - # "alert_source_link": f"{self.grafana_url}/explore?left=%5B%22now-6h%22,%22now%22,%22WAZUH%22,%7B%22refId%22" - # ":%22A%22,%22query%22:%22process_id:%5C%22" - # f"{alert_data['process_id']}%5C%22%20AND%20" - # f"agent_name:%5C%22{alert_data['agent_name']}%5C%22%22,%22alias%22" - # ":%22%22,%22metrics%22:%5B%7B%22id%22:%221%22,%22type%22:%22logs%22,%22settings%22:%7B%22limit%22:%22500%22" - # "%7D%7D%5D,%22bucketAggs%22:%5B%5D,%22timeField%22:%22timestamp%22%7D%5D", - "alert_status_id": 3, - "alert_severity_id": 5, - # "alert_customer_id": customer_code_details["customer_code_iris_id"], - "alert_customer_id": 1, - "alert_source_content": alert_data, - "alert_context": { - # "customer_id": f"{alert_data['alert_payload']['alert_details']['_source']['agent_labels_customer']}," - # f"{customer_code_details['customer_code_iris_index']}", - "alert_id": alert_id, - "alert_name": alert_data["rule_description"], - "alert_level": alert_data["rule_level"], - "rule_id": alert_data["rule_id"], - "asset_name": agent_data["hostname"], - "asset_ip": agent_data["ip_address"], - "asset_type": alert_data["asset_type_id"], - "process_id": alert_data["process_id"], - # If the `rule_mitre_id` field exists in the alert_details, add it to the payload - "rule_mitre_id": alert_data.get("rule_mitre_id", "n/a"), - "rule_mitre_tactic": alert_data.get("rule_mitre_tactic", "n/a"), - "rule_mitre_technique": alert_data.get("rule_mitre_technique", "n/a"), - }, - "alert_note": alert_data.get("ask_socfortress", "Ask SOCFortress not enabled"), - } + payload = self.create_base_payload(alert_data, agent_data, alert_id, index) + payload["alert_note"] = alert_data.get("ask_socfortress", "Ask SOCFortress not enabled") return payload except Exception as e: logger.error(f"Error creating general alert payload: {e}") @@ -238,59 +308,88 @@ def create_general_ioc_payload( index: str, ) -> Dict[str, Any]: """ - Craft the general alert payload when it does contain an IoC. + Crafts the general alert payload when it does contain an IoC. + + Parameters + ---------- + alert_data : Dict[str, Any] + The alert data. + agent_data : Dict[str, Any] + The agent data. + alert_id : str + The ID of the alert. + index : str + The index. + + Returns + ------- + Dict[str, Any] + The crafted alert payload with IoC. If an error occurs during crafting, + a dictionary with 'success' set to False and an error message is returned. """ try: - payload = { - "alert_title": alert_data["rule_description"], - "alert_description": alert_data["rule_description"], - "alert_source": "Wazuh", - "assets": [ - { - "asset_name": agent_data["hostname"], - "asset_ip": agent_data["ip_address"], - "asset_description": agent_data["os"], - "asset_type_id": alert_data["asset_type_id"], - }, - ], - # "alert_source_link": f"{self.grafana_url}/explore?left=%5B%22now-6h%22,%22now%22,%22WAZUH%22,%7B%22refId%22" - # ":%22A%22,%22query%22:%22process_id:%5C%22" - # f"{alert_data['process_id']}%5C%22%20AND%20" - # f"agent_name:%5C%22{alert_data['agent_name']}%5C%22%22,%22alias%22" - # ":%22%22,%22metrics%22:%5B%7B%22id%22:%221%22,%22type%22:%22logs%22,%22settings%22:%7B%22limit%22:%22500%22" - # "%7D%7D%5D,%22bucketAggs%22:%5B%5D,%22timeField%22:%22timestamp%22%7D%5D", - "alert_status_id": 3, - "alert_severity_id": 5, - # "alert_customer_id": customer_code_details["customer_code_iris_id"], - "alert_customer_id": 1, - "alert_source_content": alert_data, - "alert_context": { - # "customer_id": f"{alert_data['alert_payload']['alert_details']['_source']['agent_labels_customer']}," - # f"{customer_code_details['customer_code_iris_index']}", - "alert_id": alert_id, - "alert_name": alert_data["rule_description"], - "alert_level": alert_data["rule_level"], - "rule_id": alert_data["rule_id"], - "asset_name": agent_data["hostname"], - "asset_ip": agent_data["ip_address"], - "asset_type": alert_data["asset_type_id"], - "process_id": alert_data["process_id"], - # If the `rule_mitre_id` field exists in the alert_details, add it to the payload - "rule_mitre_id": alert_data.get("rule_mitre_id", "n/a"), - "rule_mitre_tactic": alert_data.get("rule_mitre_tactic", "n/a"), - "rule_mitre_technique": alert_data.get("rule_mitre_technique", "n/a"), + payload = self.create_base_payload(alert_data, agent_data, alert_id, index) + payload["alert_note"] = alert_data.get("ask_socfortress", "Ask SOCFortress not enabled") + payload["alert_iocs"] = [ + { + "ioc_value": alert_data["ioc_value"], + "ioc_description": "IoC found in the alert", + "ioc_tlp_id": 1, + "ioc_type_id": alert_data["ioc_type"], }, - "alert_note": alert_data.get("ask_socfortress", "Ask SOCFortress not enabled"), - "alert_iocs": [ - { - "ioc_value": alert_data["ioc_value"], - "ioc_description": "IoC found in the alert", - "ioc_tlp_id": 1, - "ioc_type_id": alert_data["ioc_type"], - }, - ], - } + ] return payload except Exception as e: logger.error(f"Error creating general alert payload with IoC: {e}") return {"success": False, "message": f"Error creating general alert payload with IoC: {e}"} + + def create_base_payload(self, alert_data: Dict[str, Any], agent_data: Dict[str, Any], alert_id: str, index: str) -> Dict[str, Any]: + """ + Crafts the base alert payload. + + Parameters + ---------- + alert_data : Dict[str, Any] + The alert data. + agent_data : Dict[str, Any] + The agent data. + alert_id : str + The ID of the alert. + index : str + The index. + + Returns + ------- + Dict[str, Any] + The crafted base alert payload. + """ + return { + "alert_title": alert_data["rule_description"], + "alert_description": alert_data["rule_description"], + "alert_source": "Wazuh", + "assets": [ + { + "asset_name": agent_data["hostname"], + "asset_ip": agent_data["ip_address"], + "asset_description": agent_data["os"], + "asset_type_id": alert_data["asset_type_id"], + }, + ], + "alert_status_id": 3, + "alert_severity_id": 5, + "alert_customer_id": 1, + "alert_source_content": alert_data, + "alert_context": { + "alert_id": alert_id, + "alert_name": alert_data["rule_description"], + "alert_level": alert_data["rule_level"], + "rule_id": alert_data["rule_id"], + "asset_name": agent_data["hostname"], + "asset_ip": agent_data["ip_address"], + "asset_type": alert_data["asset_type_id"], + "process_id": alert_data["process_id"], + "rule_mitre_id": alert_data.get("rule_mitre_id", "n/a"), + "rule_mitre_tactic": alert_data.get("rule_mitre_tactic", "n/a"), + "rule_mitre_technique": alert_data.get("rule_mitre_technique", "n/a"), + }, + }