Skip to content

Commit

Permalink
Asksocfortress modular (#47)
Browse files Browse the repository at this point in the history
* modular ask socfortress

* docstrings

* mkdocs

* precommit
  • Loading branch information
taylorwalton authored Jul 18, 2023
1 parent 80d5902 commit 8ab8c17
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 27 deletions.
2 changes: 1 addition & 1 deletion backend/app/services/WazuhIndexer/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from elasticsearch7 import Elasticsearch
from loguru import logger

from app.services.ask_socfortress.univerval import AskSocfortressService
from app.services.ask_socfortress.universal import AskSocfortressService
from app.services.WazuhIndexer.universal import UniversalService


Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -53,47 +53,69 @@ def collect_asksocfortress_details(
else:
return None, None

def invoke_asksocfortress(self, data: str) -> Dict[str, Any]:
def create_payload(self, data: str) -> Dict[str, Any]:
"""
Invoke ASKSOCFortress API to enrich data via a POST request.
Creates the payload for the AskSOCFortress API request.
Attributes:
Args:
data (str): The data to be enriched.
Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection
and a message key containing further information about the connection result.
dict: The payload to be sent to the AskSOCFortress API.
"""
headers = {
return {"rule_description": data}

def create_headers(self) -> Dict[str, str]:
"""
Creates the headers for the AskSOCFortress API request.
Returns:
dict: The headers to be used for the AskSOCFortress API request.
"""
return {
"Content-Type": "application/json",
"x-api-key": self.connector_api_key,
"module-version": "1.0",
}
logger.info(f"Invoking AskSOCFortress API with data: {data}")

payload = {"rule_description": data}
def make_request(self, payload: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
"""
Makes the HTTP request to the AskSOCFortress API.
Args:
payload (dict): The payload to be sent to the AskSOCFortress API.
headers (dict): The headers to be used for the AskSOCFortress API request.
Returns:
requests.Response: The HTTP response from the AskSOCFortress API.
"""
return requests.post(
self.connector_url,
data=json.dumps(payload),
headers=headers,
timeout=120,
)

def handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
Handles the response from the AskSOCFortress API.
timeout = 120
Args:
response (requests.Response): The HTTP response from the AskSOCFortress API.
Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection,
a response key containing the response from the AskSOCFortress API (if successful), and
a message key containing further information about the connection result.
"""
try:
response = requests.post(
self.connector_url,
data=json.dumps(payload),
headers=headers,
timeout=timeout,
)
response.raise_for_status()
try:
response_data = response.json()
except ValueError:
logger.error(f"Unable to decode response from AskSOCFortress API: {response.text}")
raise
else:
return {
"success": True,
"response": response_data["message"],
"message": "Successfully invoked AskSOCFortress API",
}
response_data = response.json()
return {
"success": True,
"response": response_data["message"],
"message": "Successfully invoked AskSOCFortress API",
}
except requests.exceptions.HTTPError as e:
logger.error(f"Unable to invoke AskSOCFortress API: {e}")
return {
Expand All @@ -108,3 +130,23 @@ def invoke_asksocfortress(self, data: str) -> Dict[str, Any]:
"response": None,
"message": f"Unable to invoke AskSOCFortress API: {e}",
}

def invoke_asksocfortress(self, data: str) -> Dict[str, Any]:
"""
Invokes the AskSOCFortress API to enrich data via a POST request.
The function creates the payload and headers, makes the HTTP request, and handles the response.
Args:
data (str): The data to be enriched.
Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection,
a response key containing the response from the AskSOCFortress API (if successful), and
a message key containing further information about the connection result.
"""
logger.info(f"Invoking AskSOCFortress API with data: {data}")
payload = self.create_payload(data)
headers = self.create_headers()
response = self.make_request(payload, headers)
return self.handle_response(response)
5 changes: 5 additions & 0 deletions backend/docs/asksocfortress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## AskSOCFortress Overview

### <span style="color:blue">AskSOCFortress Services</span>

::: app.services.ask_socfortress.universal
1 change: 1 addition & 0 deletions backend/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ nav:
- InfluxDB: influxdb.md
- Healthcehcks: healthchecks.md
- SMTP: smtp.md
- ASK-SOCFortress: asksocfortress.md

markdown_extensions:
- pymdownx.highlight:
Expand Down

0 comments on commit 8ab8c17

Please sign in to comment.