Skip to content

Commit

Permalink
socfortress threat intel added
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorwalton committed Jul 18, 2023
1 parent 8ab8c17 commit dc769e6
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 0 deletions.
2 changes: 2 additions & 0 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from app.routes.shuffle import bp as shuffle_bp
from app.routes.smtp import bp as smtp_bp
from app.routes.sublime import bp as sublime_bp
from app.routes.threatintel import bp as threatintel_bp
from app.routes.velociraptor import bp as velociraptor_bp
from app.routes.wazuhindexer import bp as wazuhindexer_bp

Expand All @@ -72,3 +73,4 @@
app.register_blueprint(influxdb_bp) # Register the influxdb blueprint
app.register_blueprint(smtp_bp) # Register the smtp blueprint
app.register_blueprint(healthchecks_bp) # Register the healthchecks blueprint
app.register_blueprint(threatintel_bp) # Register the threatintel blueprint
55 changes: 55 additions & 0 deletions backend/app/models/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,60 @@ def verify_connection(self) -> Dict[str, Any]:
return {"connectionSuccessful": False, "response": None}


class SocfortressThreatIntelConnector(Connector):
"""
A connector for the SocfortressThreatIntel service, a subclass of Connector.
Args:
connector_name (str): The name of the connector.
"""

def __init__(self, connector_name: str):
super().__init__(attributes=self.get_connector_info_from_db(connector_name))

def verify_connection(self) -> Dict[str, Any]:
"""
Verifies the connection to ASK SOCFortress service.
Returns:
dict: A dictionary containing 'connectionSuccessful' status and 'response' if the connection is successful.
"""
logger.info(
f"Verifying the ASK SOCFortress connection to {self.attributes['connector_url']}",
)
try:
headers = {
"Content-Type": "application/json",
"x-api-key": f"{self.attributes['connector_api_key']}",
"module-version": "1.0",
}
params = {
"value": "evil.socfortress.co",
}
socfortress_threat_intel = requests.get(
f"{self.attributes['connector_url']}",
headers=headers,
params=params,
verify=False,
timeout=60,
)
if socfortress_threat_intel.status_code == 200:
logger.info(
f"Connection to {self.attributes['connector_url']} successful",
)
return {"connectionSuccessful": True}
else:
logger.error(
f"Connection to {self.attributes['connector_url']} failed with error: {socfortress_threat_intel.text}",
)
return {"connectionSuccessful": False, "response": None}
except Exception as e:
logger.error(
f"Connection to {self.attributes['connector_url']} failed with error: {e}",
)
return {"connectionSuccessful": False, "response": None}


class InfluxDBConnector(Connector):
"""
A connector for the InfluxDB service, a subclass of Connector.
Expand Down Expand Up @@ -633,3 +687,4 @@ def create(self, key, connector_name):
connector_factory.register_creator("Sublime", "SublimeConnector")
connector_factory.register_creator("InfluxDB", "InfluxDBConnector")
connector_factory.register_creator("AskSocfortress", "AskSOCFortressConnector")
connector_factory.register_creator("SocfortressThreatIntel", "SocfortressThreatIntelConnector")
1 change: 1 addition & 0 deletions backend/app/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Connectors(db.Model):
"sublime": True,
"influxdb": True,
"asksocfortress": True,
"socfortressthreatintel": True,
}

def __init__(
Expand Down
23 changes: 23 additions & 0 deletions backend/app/routes/threatintel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any
from typing import Dict

from flask import Blueprint
from flask import jsonify
from flask import request
from loguru import logger

from app.services.threat_intel.socfortress.universal import SocfortressThreatIntelService

bp = Blueprint("threatintel", __name__)


@bp.route("/threatintel/socfortress/<ioc_value>", methods=["GET"])
def get_socfortress_threatintel(ioc_value: str) -> jsonify:
"""
Endpoint to check IoC in Socfortress Threat Intel.
Returns:
jsonify: A JSON response containing the list of all alerts from Socfortress.
"""
ioc_enriched = SocfortressThreatIntelService("SocfortressThreatIntel").invoke_socfortress_threat_intel(data=ioc_value)
return jsonify(ioc_enriched)
152 changes: 152 additions & 0 deletions backend/app/services/threat_intel/socfortress/universal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import json
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple

import requests
from loguru import logger

from app.models.connectors import Connector
from app.models.connectors import connector_factory


class SocfortressThreatIntelService:
"""
A service class that encapsulates the logic for interfacing with ASK SOCFortress. This class handles tasks like retrieving connector
details, and invoking the ask_socfortress connector.
"""

def __init__(self, connector_name: str) -> None:
"""
Initializes the SocfortressThreatIntelService by collecting SOCFortress Threat Intel details associated with the specified connector name.
Args:
connector_name (str): The name of the SOCFortress Threat Intel connector.
"""
self.connector_url, self.connector_api_key = self.collect_socfortress_threat_intel_details(
connector_name,
)

def collect_socfortress_threat_intel_details(
self,
connector_name: str,
) -> Tuple[Optional[str], Optional[str]]:
"""
Collects the details of the SOCFortress Threat Intel connector.
Args:
connector_name (str): The name of the SOCFortress Threat Intel connector.
Returns:
tuple: A tuple containing the connection URL and API key. If the connection is not successful, both elements of the tuple are
None.
"""
connector_instance = connector_factory.create(connector_name, connector_name)
connection_successful = connector_instance.verify_connection()
if connection_successful:
connection_details = Connector.get_connector_info_from_db(connector_name)
return (
connection_details.get("connector_url"),
connection_details.get("connector_api_key"),
)
else:
return None, None

def create_payload(self, data: str) -> Dict[str, Any]:
"""
Creates the payload for the SOCFortress Threat Intel API request.
Args:
data (str): The data to be enriched.
Returns:
dict: The payload to be sent to the SOCFortress Threat Intel API.
"""
return {"value": data}

def create_headers(self) -> Dict[str, str]:
"""
Creates the headers for the SOCFortress Threat Intel API request.
Returns:
dict: The headers to be used for the SOCFortress Threat Intel API request.
"""
return {
"Content-Type": "application/json",
"x-api-key": self.connector_api_key,
"module-version": "1.0",
}

def make_request(self, payload: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
"""
Makes the HTTP request to the SOCFortress Threat Intel API.
Args:
payload (dict): The payload to be sent to the SOCFortress Threat Intel API.
headers (dict): The headers to be used for the SOCFortress Threat Intel API request.
Returns:
requests.Response: The HTTP response from the SOCFortress Threat Intel API.
"""
return requests.get(
self.connector_url,
params=payload,
headers=headers,
timeout=120,
)

def handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
Handles the response from the SOCFortress Threat Intel API.
Args:
response (requests.Response): The HTTP response from the SOCFortress Threat Intel API.
Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection,
a response key containing the response from the SOCFortress Threat Intel API (if successful), and
a message key containing further information about the connection result.
"""
try:
response.raise_for_status()
response_data = response.json()
return {
"success": True,
"response": response_data["data"],
"message": "Successfully invoked SOCFortress Threat Intel API",
}
except requests.exceptions.HTTPError as e:
logger.error(f"Value not found in SOCFortress Threat Intel API: {e}")
return {
"success": True,
"response": None,
"message": "Value not found in SOCFortress Threat Intel API",
}
except Exception as e:
logger.error(f"Unable to invoke SOCFortress Threat Intel API: {e}")
return {
"success": False,
"response": None,
"message": f"Unable to invoke SOCFortress Threat Intel API: {e}",
}

def invoke_socfortress_threat_intel(self, data: str) -> Dict[str, Any]:
"""
Invokes the SOCFortress Threat Intel API to enrich data via a POST request.
The function creates the payload and headers, makes the HTTP request, and handles the response.
Args:
data (str): The data to be enriched.
Returns:
dict: A dictionary containing a success key indicating the success or failure of the connection,
a response key containing the response from the SOCFortress Threat Intel API (if successful), and
a message key containing further information about the connection result.
"""
logger.info(f"Invoking SOCFortress Threat Intel API with data: {data}")
payload = self.create_payload(data)
headers = self.create_headers()
response = self.make_request(payload, headers)
return self.handle_response(response)
58 changes: 58 additions & 0 deletions backend/app/static/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@
"description": "Find out more",
"url": "http://swagger.io"
}
},
{
"name": "Threat Intel",
"description": "Everything about Threat Intel",
"externalDocs": {
"description": "Find out more",
"url": "http://swagger.io"
}
}
],
"paths": {
Expand Down Expand Up @@ -3117,6 +3125,56 @@
"operationId": "getVelociraptorAgentHealthcheckByAgentID",
"tags": ["Healthcheck"]
}
},
"/threatintel/socfortress/{ioc_value}": {
"get": {
"summary": "Get SOC Fortress threat intelligence by IOC value",
"description": "Endpoint to get SOC Fortress threat intelligence by IOC value.",
"parameters": [
{
"name": "ioc_value",
"in": "path",
"description": "The IOC value.",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"description": "Successful operation",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"threat_intel": {
"type": "array",
"items": {
"type": "object",
"description": "Threat intelligence details"
}
}
}
}
}
}
},
"default": {
"description": "Unexpected error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
}
}
}
},
"operationId": "getSOCFortressThreatIntelByIOCValue",
"tags": ["Threat Intel"]
}
}
},
"components": {
Expand Down

0 comments on commit dc769e6

Please sign in to comment.