-
Notifications
You must be signed in to change notification settings - Fork 55
Description
Right now, an engine only attempts an API call once. If the associated API has an error or fails, there is no retry and the analysis continues.
But if the engine API calls were each factored into their own function (within the ABC or the Protocol), the calls could be wrapped in a @retry decorator from the Tenacity library and requests could be retried or monitored for specific errors.
Taking the crt.sh engine as an example:
from tenacity import after_log, retry, stop_after_attempt, wait_exponential
class CrtShEngine(BaseEngine):
[...]
@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG)
)
def _make_request(self, url: str) -> requests.Response:
"""Request data from the crt.sh API.
Up to 3 requests can be made before reraising the resulting
API exception to the calling function.
After each attempt, the delay between requests is exponentially increased
and a DEBUG level log message is emitted.
"""
response = requests.get(url, proxies=self.proxies, verify=self.ssl_verify, timeout=10)
response.raise_for_status()
return response
def analyze(self, observable_value: str, observable_type: str) -> dict[str, Any] | None:
# If observable is a URL, extract domain
if observable_type == "URL":
domain_part = observable_value.split("/")[2].split(":")[0]
observable = domain_part
else:
observable = observable_value
url = f"https://crt.sh/json?q={observable}"
try:
response: requests.Response = self._make_request(url)
except requests.exceptions.RequestException as e:
logger.error("Error fetching crt.sh data for '%s': %s", observable_value, e, exc_info=True)
return None
# Rest of analyze() method follows
[...]Another good example (with a minor twist) is the Shodan API, which actually makes use of an HTTP 404 response to indicate no open ports were found on the queried host or IP. The HTTP error code can be checked first before raising other errors to trigger a tenacity retry:
class ShodanEngine(BaseEngine):
[...]
@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
after=after_log(logger, logging.DEBUG)
)
def _make_request(self, url: str) -> requests.Response:
headers: dict[str, str] = {"Accept": "application/json"}
params: dict[str, str] = {"key": self.secrets.shodan}
response: requests.Response = requests.get(url, headers=headers, params=params, proxies=self.proxies, verify=self.ssl_verify, timeout=5)
# Shodan returns 404 if no ports are open on the specified host, so that is NOT an error
if response.status_code == 404:
return response
response.raise_for_status()
return response
def analyze(self, observable_value: str, observable_type: str) -> dict | None:
url = f"https://api.shodan.io/shodan/host/{observable_value}"
try:
response: requests.Response = self._make_request(url)
if response.status_code == 404:
return None
data = response.json()
except (HTTPError, JSONDecodeError, Exception) as e:
logger.error(f"Error querying Shodan: {e}")
return None
return {
"ports": data.get("ports", []),
"tags": data.get("tags", []),
"link": f"https://www.shodan.io/host/{observable_value}",
}This approach is valid for both the ABC or Protocol engine design models.
I would be willing to work through the various engines over time to help implement this, but wanted to put it out there as a suggestion, in general.