diff --git a/backend/app/services/WazuhIndexer/alerts.py b/backend/app/services/WazuhIndexer/alerts.py index 461caa9f..b4bf5762 100644 --- a/backend/app/services/WazuhIndexer/alerts.py +++ b/backend/app/services/WazuhIndexer/alerts.py @@ -4,7 +4,7 @@ from elasticsearch7 import Elasticsearch from loguru import logger -from app.services.ask_socfortress.univerval import AskSocfortressService +from app.services.ask_socfortress.universal import AskSocfortressService from app.services.WazuhIndexer.universal import UniversalService diff --git a/backend/app/services/ask_socfortress/__init__.py b/backend/app/services/ask_socfortress/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/services/ask_socfortress/univerval.py b/backend/app/services/ask_socfortress/universal.py similarity index 52% rename from backend/app/services/ask_socfortress/univerval.py rename to backend/app/services/ask_socfortress/universal.py index 3a2c7b01..05b96949 100644 --- a/backend/app/services/ask_socfortress/univerval.py +++ b/backend/app/services/ask_socfortress/universal.py @@ -53,47 +53,69 @@ def collect_asksocfortress_details( else: return None, None - def invoke_asksocfortress(self, data: str) -> Dict[str, Any]: + def create_payload(self, data: str) -> Dict[str, Any]: """ - Invoke ASKSOCFortress API to enrich data via a POST request. + Creates the payload for the AskSOCFortress API request. - Attributes: + Args: data (str): The data to be enriched. Returns: - dict: A dictionary containing a success key indicating the success or failure of the connection - and a message key containing further information about the connection result. + dict: The payload to be sent to the AskSOCFortress API. """ - headers = { + return {"rule_description": data} + + def create_headers(self) -> Dict[str, str]: + """ + Creates the headers for the AskSOCFortress API request. + + Returns: + dict: The headers to be used for the AskSOCFortress API request. + """ + return { "Content-Type": "application/json", "x-api-key": self.connector_api_key, "module-version": "1.0", } - logger.info(f"Invoking AskSOCFortress API with data: {data}") - payload = {"rule_description": data} + def make_request(self, payload: Dict[str, Any], headers: Dict[str, str]) -> requests.Response: + """ + Makes the HTTP request to the AskSOCFortress API. + + Args: + payload (dict): The payload to be sent to the AskSOCFortress API. + headers (dict): The headers to be used for the AskSOCFortress API request. + + Returns: + requests.Response: The HTTP response from the AskSOCFortress API. + """ + return requests.post( + self.connector_url, + data=json.dumps(payload), + headers=headers, + timeout=120, + ) + + def handle_response(self, response: requests.Response) -> Dict[str, Any]: + """ + Handles the response from the AskSOCFortress API. - timeout = 120 + Args: + response (requests.Response): The HTTP response from the AskSOCFortress API. + Returns: + dict: A dictionary containing a success key indicating the success or failure of the connection, + a response key containing the response from the AskSOCFortress API (if successful), and + a message key containing further information about the connection result. + """ try: - response = requests.post( - self.connector_url, - data=json.dumps(payload), - headers=headers, - timeout=timeout, - ) response.raise_for_status() - try: - response_data = response.json() - except ValueError: - logger.error(f"Unable to decode response from AskSOCFortress API: {response.text}") - raise - else: - return { - "success": True, - "response": response_data["message"], - "message": "Successfully invoked AskSOCFortress API", - } + response_data = response.json() + return { + "success": True, + "response": response_data["message"], + "message": "Successfully invoked AskSOCFortress API", + } except requests.exceptions.HTTPError as e: logger.error(f"Unable to invoke AskSOCFortress API: {e}") return { @@ -108,3 +130,23 @@ def invoke_asksocfortress(self, data: str) -> Dict[str, Any]: "response": None, "message": f"Unable to invoke AskSOCFortress API: {e}", } + + def invoke_asksocfortress(self, data: str) -> Dict[str, Any]: + """ + Invokes the AskSOCFortress API to enrich data via a POST request. + + The function creates the payload and headers, makes the HTTP request, and handles the response. + + Args: + data (str): The data to be enriched. + + Returns: + dict: A dictionary containing a success key indicating the success or failure of the connection, + a response key containing the response from the AskSOCFortress API (if successful), and + a message key containing further information about the connection result. + """ + logger.info(f"Invoking AskSOCFortress API with data: {data}") + payload = self.create_payload(data) + headers = self.create_headers() + response = self.make_request(payload, headers) + return self.handle_response(response) diff --git a/backend/docs/asksocfortress.md b/backend/docs/asksocfortress.md new file mode 100644 index 00000000..d1966d53 --- /dev/null +++ b/backend/docs/asksocfortress.md @@ -0,0 +1,5 @@ +## AskSOCFortress Overview + +### AskSOCFortress Services + +::: app.services.ask_socfortress.universal diff --git a/backend/mkdocs.yml b/backend/mkdocs.yml index 678b556d..fbd20011 100644 --- a/backend/mkdocs.yml +++ b/backend/mkdocs.yml @@ -45,6 +45,7 @@ nav: - InfluxDB: influxdb.md - Healthcehcks: healthchecks.md - SMTP: smtp.md + - ASK-SOCFortress: asksocfortress.md markdown_extensions: - pymdownx.highlight: