Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asksocfortress modular #47

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/app/services/WazuhIndexer/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from elasticsearch7 import Elasticsearch
from loguru import logger

from app.services.ask_socfortress.univerval import AskSocfortressService
from app.services.ask_socfortress.universal import AskSocfortressService
from app.services.WazuhIndexer.universal import UniversalService


Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -53,47 +53,69 @@ def collect_asksocfortress_details(
else:
return None, None

def invoke_asksocfortress(self, data: str) -> Dict[str, Any]:
def create_payload(self, data: str) -> Dict[str, Any]:
"""
Invoke ASKSOCFortress API to enrich data via a POST request.
Creates the payload for the AskSOCFortress API request.

Attributes:
Args:
data (str): The data to be enriched.

Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection
and a message key containing further information about the connection result.
dict: The payload to be sent to the AskSOCFortress API.
"""
headers = {
return {"rule_description": data}

def create_headers(self) -> Dict[str, str]:
"""
Creates the headers for the AskSOCFortress API request.

Returns:
dict: The headers to be used for the AskSOCFortress API request.
"""
return {
"Content-Type": "application/json",
"x-api-key": self.connector_api_key,
"module-version": "1.0",
}
logger.info(f"Invoking AskSOCFortress API with data: {data}")

payload = {"rule_description": data}
def make_request(self, payload: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
"""
Makes the HTTP request to the AskSOCFortress API.

Args:
payload (dict): The payload to be sent to the AskSOCFortress API.
headers (dict): The headers to be used for the AskSOCFortress API request.

Returns:
requests.Response: The HTTP response from the AskSOCFortress API.
"""
return requests.post(
self.connector_url,
data=json.dumps(payload),
headers=headers,
timeout=120,
)

def handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
Handles the response from the AskSOCFortress API.

timeout = 120
Args:
response (requests.Response): The HTTP response from the AskSOCFortress API.

Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection,
a response key containing the response from the AskSOCFortress API (if successful), and
a message key containing further information about the connection result.
"""
try:
response = requests.post(
self.connector_url,
data=json.dumps(payload),
headers=headers,
timeout=timeout,
)
response.raise_for_status()
try:
response_data = response.json()
except ValueError:
logger.error(f"Unable to decode response from AskSOCFortress API: {response.text}")
raise
else:
return {
"success": True,
"response": response_data["message"],
"message": "Successfully invoked AskSOCFortress API",
}
response_data = response.json()
return {
"success": True,
"response": response_data["message"],
"message": "Successfully invoked AskSOCFortress API",
}
except requests.exceptions.HTTPError as e:
logger.error(f"Unable to invoke AskSOCFortress API: {e}")
return {
Expand All @@ -108,3 +130,23 @@ def invoke_asksocfortress(self, data: str) -> Dict[str, Any]:
"response": None,
"message": f"Unable to invoke AskSOCFortress API: {e}",
}

def invoke_asksocfortress(self, data: str) -> Dict[str, Any]:
"""
Invokes the AskSOCFortress API to enrich data via a POST request.

The function creates the payload and headers, makes the HTTP request, and handles the response.

Args:
data (str): The data to be enriched.

Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection,
a response key containing the response from the AskSOCFortress API (if successful), and
a message key containing further information about the connection result.
"""
logger.info(f"Invoking AskSOCFortress API with data: {data}")
payload = self.create_payload(data)
headers = self.create_headers()
response = self.make_request(payload, headers)
return self.handle_response(response)
5 changes: 5 additions & 0 deletions backend/docs/asksocfortress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## AskSOCFortress Overview

### <span style="color:blue">AskSOCFortress Services</span>

::: app.services.ask_socfortress.universal
1 change: 1 addition & 0 deletions backend/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ nav:
- InfluxDB: influxdb.md
- Healthcehcks: healthchecks.md
- SMTP: smtp.md
- ASK-SOCFortress: asksocfortress.md

markdown_extensions:
- pymdownx.highlight:
Expand Down