Skip to content

Commit

Permalink
[SOAR-18956] Mimecast V2 - Update hash limits (#3167)
Browse files Browse the repository at this point in the history
* Update hash limits

* Remove uneccessary max limit check

* Separate log type hashes

* Remove debugging

* Add max logs logic for each run

* Refactor for prospector

* Fix unit test | Add custom config log limit | Update file start to 0

* Remove unused import

* Add unit test

* Refactor resume function | Refactor custom config log limit | Add json decode error handling and unit test

* Update to stream content in chunks | Update unit tests | Add additional logging

* Update testing value

* Update testing value

* Update error handling | Add comment
  • Loading branch information
ablakley-r7 authored Feb 26, 2025
1 parent 7f10ebc commit 6bb44ac
Show file tree
Hide file tree
Showing 13 changed files with 510 additions and 68 deletions.
6 changes: 3 additions & 3 deletions plugins/mimecast_v2/.CHECKSUM
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"spec": "c7a3d4f63684574f9f41df7cb0ff9a68",
"manifest": "77d59430569721dbe1790202bb95a440",
"setup": "9c87c7459f7e1e8eaf44625e1316d046",
"spec": "880bc61a8eb5b59f28ffc3ac33753b0d",
"manifest": "9b26de3dbacf469405733dca5936b972",
"setup": "bf06aad5c32fd49b0794aa9a49eae5c8",
"schemas": [
{
"identifier": "connection/schema.py",
Expand Down
2 changes: 1 addition & 1 deletion plugins/mimecast_v2/bin/icon_mimecast_v2
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from sys import argv

Name = "Mimecast V2"
Vendor = "rapid7"
Version = "1.0.1"
Version = "1.0.2"
Description = "[Mimecast](https://www.mimecast.com) is a set of cloud services designed to provide next generation protection against advanced email-borne threats such as malicious URLs, malware, impersonation attacks, as well as internally generated threats, with a focus on email security. This plugin utilizes the [Mimecast API](https://www.mimecast.com/developer/documentation)"


Expand Down
1 change: 1 addition & 0 deletions plugins/mimecast_v2/help.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Example output:

# Version History

* 1.0.2 - `Monitor SIEM Logs` Limit the amount of logs used to deduplicate logs in subsequent runs
* 1.0.1 - Update SDK | Improve output for a successful connection test
* 1.0.0 - Initial plugin

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import insightconnect_plugin_runtime
from insightconnect_plugin_runtime.exceptions import APIException, PluginException
from insightconnect_plugin_runtime.helper import compare_and_dedupe_hashes, hash_sha1
from insightconnect_plugin_runtime.helper import hash_sha1
from .schema import MonitorSiemLogsInput, MonitorSiemLogsOutput, MonitorSiemLogsState, Input, Output, Component, State
from typing import Dict, List, Tuple
from datetime import datetime, timezone, timedelta
Expand All @@ -9,11 +9,18 @@
# Date format for conversion
DATE_FORMAT = "%Y-%m-%d"
# Default and max values
LOG_TYPES = ["receipt", "url protect", "attachment protect"]
RECEIPT = "receipt"
URL_PROTECT = "url protect"
ATTACHMENT_PROTECT = "attachment protect"
LOG_TYPES = [RECEIPT, URL_PROTECT, ATTACHMENT_PROTECT]
DEFAULT_THREAD_COUNT = 10
DEFAULT_PAGE_SIZE = 100
MAX_LOOKBACK_DAYS = 7
INITIAL_MAX_LOOKBACK_DAYS = 1
LARGE_LOG_SIZE_LIMIT = 7000
SMALL_LOG_SIZE_LIMIT = 250
LARGE_LOG_HASH_SIZE_LIMIT = 4800
SMALL_LOG_HASH_SIZE_LIMIT = 100
# Run type
INITIAL_RUN = "initial_run"
SUBSEQUENT_RUN = "subsequent_run"
Expand All @@ -24,9 +31,12 @@
QUERY_DATE = "query_date"
CAUGHT_UP = "caught_up"
NEXT_PAGE = "next_page"
SAVED_FILE_URL = "saved_file_url"
SAVED_FILE_POSITION = "saved_file_position"
# Access keys for custom config
THREAD_COUNT = "thread_count"
PAGE_SIZE = "page_size"
LOG_LIMITS = "log_limits"


class MonitorSiemLogs(insightconnect_plugin_runtime.Task):
Expand All @@ -40,21 +50,18 @@ def __init__(self):
)

def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-argument
self.logger.info(f"TASK: Received State: {state.get(QUERY_CONFIG)}")
existing_state = state.copy()
try:
now_date = datetime.now(tz=timezone.utc).date()
run_condition = self.detect_run_condition(state.get(QUERY_CONFIG, {}), now_date)
self.logger.info(f"TASK: Run state is {run_condition}")
state = self.update_state(state)
page_size, thead_count = self.apply_custom_config(state, run_condition, custom_config)
page_size, thead_count, log_limit = self.apply_custom_config(state, run_condition, custom_config)
max_run_lookback_date = self.get_max_lookback_date(now_date, run_condition, bool(custom_config))
query_config = self.prepare_query_params(state.get(QUERY_CONFIG, {}), max_run_lookback_date, now_date)
logs, query_config = self.get_all_logs(run_condition, query_config, page_size, thead_count)
logs, query_config = self.get_all_logs(run_condition, query_config, page_size, thead_count, log_limit)
self.logger.info(f"TASK: Total logs collected this run {len(logs)}")
logs, log_hashes = compare_and_dedupe_hashes(state.get(LOG_HASHES, []), logs)
self.logger.info(f"TASK: Total logs after deduplication {len(logs)}")
exit_state, has_more_pages = self.prepare_exit_state(state, query_config, now_date, log_hashes)
exit_state, has_more_pages = self.prepare_exit_state(state, query_config, now_date)
return logs, exit_state, has_more_pages, 200, None
except APIException as error:
self.logger.info(
Expand Down Expand Up @@ -120,7 +127,7 @@ def apply_custom_config(self, state: Dict, run_type: str, custom_config: Dict =
:param current_query_config:
:param run_type:
:param custom_config:
:return: Page size and thread count
:return: Page size, thread count and log limit
"""
custom_query_config = {}
if custom_config:
Expand All @@ -134,7 +141,8 @@ def apply_custom_config(self, state: Dict, run_type: str, custom_config: Dict =
current_query_config[log_type] = {QUERY_DATE: log_query_date}
page_size = max(1, min(custom_config.get(PAGE_SIZE, DEFAULT_PAGE_SIZE), DEFAULT_PAGE_SIZE))
thread_count = max(1, custom_config.get(THREAD_COUNT, DEFAULT_THREAD_COUNT))
return page_size, thread_count
log_limit = custom_config.get(LOG_LIMITS, {})
return page_size, thread_count, log_limit

def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_date: datetime) -> Dict:
"""
Expand Down Expand Up @@ -179,35 +187,82 @@ def validate_config_lookback(self, log_type_config: Dict, max_lookback_date: dat
return log_type_config

def get_all_logs(
self, run_condition: str, query_config: Dict, page_size: int, thead_count: int
self, run_condition: str, query_config: Dict, page_size: int, thead_count: int, log_limits: Dict = {}
) -> Tuple[List, Dict]:
"""
Gets all logs of provided log type. First retrieves batch URLs. Then downloads and reads batches, pooling logs.
:param run_condition:
:param query_config:
:param page_size:
:param thead_count:
:param log_limit:
:return: Logs, updated query configuration (state)
"""
complete_logs = []
for log_type, log_type_config in query_config.items():
if (not log_type_config.get(CAUGHT_UP)) or (run_condition != PAGINATION_RUN):
logs, results_next_page, caught_up = self.connection.api.get_siem_logs(
# Receipt logs are much higher volume than others, so should make up the bulk of the logs queried
log_size_limit = LARGE_LOG_SIZE_LIMIT if log_type == RECEIPT else SMALL_LOG_SIZE_LIMIT
if log_limits:
log_size_limit = log_limits.get(log_type, log_size_limit)
logs, results_next_page, caught_up, saved_file, saved_position = self.connection.api.get_siem_logs(
log_type=log_type,
query_date=log_type_config.get(QUERY_DATE),
next_page=log_type_config.get(NEXT_PAGE),
page_size=page_size,
max_threads=thead_count,
starting_url=log_type_config.get(SAVED_FILE_URL),
starting_position=log_type_config.get(SAVED_FILE_POSITION),
log_size_limit=log_size_limit,
)
log_hash_size_limit = LARGE_LOG_HASH_SIZE_LIMIT if log_type == RECEIPT else SMALL_LOG_HASH_SIZE_LIMIT
deduplicated_logs, log_hashes = self.compare_and_dedupe_hashes(
query_config.get(LOG_HASHES, []), logs, log_hash_size_limit
)
self.logger.info(
f"TASK: Number of logs after de-duplication: {len(deduplicated_logs)} for log type {log_type}"
)
complete_logs.extend(deduplicated_logs)
log_type_config.update(
{
NEXT_PAGE: results_next_page,
CAUGHT_UP: caught_up,
LOG_HASHES: log_hashes,
SAVED_FILE_URL: saved_file,
SAVED_FILE_POSITION: saved_position,
}
)
complete_logs.extend(logs)
log_type_config.update({NEXT_PAGE: results_next_page, CAUGHT_UP: caught_up})
else:
self.logger.info(f"TASK: Query for {log_type} is caught up. Skipping as we are currently paginating")
return complete_logs, query_config

def prepare_exit_state(
self, state: dict, query_config: dict, now_date: datetime, log_hashes: List[str]
) -> Tuple[Dict, bool]:
def compare_and_dedupe_hashes(
self, previous_logs_hashes: list, new_logs: list, log_hash_size_limit: int = SMALL_LOG_HASH_SIZE_LIMIT
) -> Tuple[list, list]:
"""
Iterate through two lists of values, hashing each. Compare hash value to a list of existing hash values.
If the hash exists, return both it and the value in separate lists once iterated.
:param previous_logs_hashes: List of existing hashes to compare against.
:type list:
:param new_logs: New values to hash and compare to existing list of hashes.
:type list:
:return: Hex digest of hash.
:rtype: Tuple[list, list]
"""
new_logs_hashes = []
logs_to_return = []
# Limit the amount of log hashes saved in order to reduce state size
log_hash_save_start = len(new_logs) - log_hash_size_limit
new_logs.sort(key=lambda x: x["timestamp"])
for index, log in enumerate(new_logs):
hash_ = hash_sha1(log)
if hash_ not in previous_logs_hashes:
logs_to_return.append(log)
if index >= log_hash_save_start:
new_logs_hashes.append(hash_)
return logs_to_return, new_logs_hashes

def prepare_exit_state(self, state: dict, query_config: dict, now_date: datetime) -> Tuple[Dict, bool]:
"""
Prepare state and pagination for task completion. Format date time.
:param state:
Expand All @@ -225,5 +280,4 @@ def prepare_exit_state(
has_more_pages = True
log_type_config[QUERY_DATE] = query_date.strftime(DATE_FORMAT)
state[QUERY_CONFIG] = query_config
state[LOG_HASHES] = log_hashes
return state, has_more_pages
116 changes: 99 additions & 17 deletions plugins/mimecast_v2/icon_mimecast_v2/util/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import requests
from insightconnect_plugin_runtime.exceptions import (
APIException,
Expand All @@ -11,9 +12,10 @@
from io import BytesIO
from icon_mimecast_v2.util.constants import Endpoints
from typing import Dict, List, Tuple
from multiprocessing.dummy import Pool
from multiprocessing.dummy import Manager, Pool
import gzip
import json
from urllib.parse import urlparse, urlunparse

GET = "GET"
POST = "POST"
Expand All @@ -40,21 +42,55 @@ def authenticate(self) -> None:
self.logger.info("API: Authenticated")

def get_siem_logs(
self, log_type: str, query_date: str, next_page: str, page_size: int = 100, max_threads: int = 10
self,
log_type: str,
query_date: str,
next_page: str,
page_size: int = 100,
max_threads: int = 10,
starting_url: str = None,
starting_position: int = 0,
log_size_limit: int = 250,
) -> Tuple[List[str], str, bool]:
batch_download_urls, result_next_page, caught_up = self.get_siem_batches(
log_type, query_date, next_page, page_size
)
logs = []
pool_data = self.resume_from_batch(batch_download_urls, starting_url)
self.logger.info(f"API: Getting SIEM logs from batches for log type {log_type}...")
self.logger.info(f"API: Applying page size limit of {page_size}")
log_count = 0
manager = Manager()
saved_file = None
saved_position = None
total_count = manager.Value("i", log_count)
logs = manager.list()
lock = manager.Lock()
with Pool(max_threads) as pool:
batch_logs = pool.imap(self.get_siem_logs_from_batch, batch_download_urls)
for result in batch_logs:
if isinstance(result, (List, Dict)):
logs.extend(result)
result = pool.imap(
functools.partial(
self.get_siem_logs_from_batch, saved_url=starting_url, saved_position=starting_position
),
pool_data,
)
for batch_logs, url in result:
with lock:
batch_logs_count = len(batch_logs)
total_count.value = total_count.value + batch_logs_count
if total_count.value >= log_size_limit:
leftover_logs_count = total_count.value - log_size_limit
batch_logs = batch_logs[: (batch_logs_count - leftover_logs_count)]
logs.extend(batch_logs)
saved_file = self.strip_query_params(url)
saved_position = len(batch_logs)
caught_up = False
result_next_page = next_page
self.logger.info(f"API: Log limit reached for log type {log_type} at {log_size_limit}")
self.logger.info(f"API: Saving file for next run: {saved_file} at line {saved_position}")
self.logger.info(f"API: {leftover_logs_count} left to process in file")
break
logs.extend(batch_logs)
self.logger.info(f"API: Discovered {len(logs)} logs for log type {log_type}")
return logs, result_next_page, caught_up
return logs, result_next_page, caught_up, saved_file, saved_position

def get_siem_batches(
self, log_type: str, query_date: str, next_page: str, page_size: int = 100
Expand All @@ -79,15 +115,61 @@ def get_siem_batches(
urls = [batch.get("url") for batch in batch_list]
return urls, batch_response.get("@nextPage"), caught_up

def get_siem_logs_from_batch(self, url: str):
response = requests.request(method=GET, url=url, stream=False)
with gzip.GzipFile(fileobj=BytesIO(response.content), mode="rb") as file_:
logs = []
# Iterate over lines in the decompressed file, decode and load the JSON
for line in file_:
decoded_line = line.decode("utf-8").strip()
logs.append(json.loads(decoded_line))
return logs
def resume_from_batch(self, list_of_batches: List[str], saved_url: str) -> Tuple[str, int]:
"""
Attempt to find a previously fully unread file if available, and trim list of URLs to that starting point.
:param list_of_batches:
:param saved_url:
:return list_of_batches: Trimmed list of batches
"""
sub_list = list_of_batches[
next(
(index for index, url in enumerate(list_of_batches) if saved_url and saved_url in url),
len(list_of_batches),
) :
]
if sub_list:
return sub_list
if saved_url:
self.logger.info(f"API: Saved URL {saved_url} not found in list of batches")
self.logger.info("API: Processing entire batch list")
return list_of_batches

def get_siem_logs_from_batch(self, url: str, saved_url: str, saved_position: int) -> Tuple[List[Dict], str]:
line_start = 0
if saved_url and saved_url in url:
line_start = saved_position

response = requests.request(method="GET", url=url, stream=True)
logs = []
lines_count = 0
buffer = BytesIO()
with gzip.GzipFile(fileobj=buffer, mode="rb") as file_:
for chunk in response.iter_content(chunk_size=8192):
buffer.write(chunk)
buffer.seek(0)
file_.fileobj = buffer
for line in file_:
if lines_count < line_start:
lines_count += 1
continue
try:
decoded_line = line.decode("utf-8").strip()
logs.append(json.loads(decoded_line))
except (json.JSONDecodeError, UnicodeDecodeError) as error:
self.logger.info("API: Invalid JSON or encoding error detected, skipping remainder of file")
self.logger.info(f"API: Error is: {error}")
return logs, url
# Reset buffer to clean state to begin again for next chunk
buffer.seek(0)
buffer.truncate(0)

return logs, url

def strip_query_params(self, url: str) -> str:
parsed_url = urlparse(url)
stripped_url = urlunparse(parsed_url._replace(query=""))
return stripped_url

@rate_limiting(5)
def make_api_request(
Expand Down
3 changes: 2 additions & 1 deletion plugins/mimecast_v2/plugin.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ links:
- "[Mimecast](http://mimecast.com)"
references:
- "[Mimecast API](https://www.mimecast.com/developer/documentation)"
version: 1.0.1
version: 1.0.2
connection_version: 1
supported_versions: ["Mimecast 2.0 API 2025-01-23"]
vendor: rapid7
Expand All @@ -38,6 +38,7 @@ hub_tags:
keywords: [mimecast, email, cloud_enabled]
features: []
version_history:
- "1.0.2 - `Monitor SIEM Logs` Limit the amount of logs used to deduplicate logs in subsequent runs"
- "1.0.1 - Update SDK | Improve output for a successful connection test"
- "1.0.0 - Initial plugin"
connection:
Expand Down
2 changes: 1 addition & 1 deletion plugins/mimecast_v2/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


setup(name="mimecast_v2-rapid7-plugin",
version="1.0.1",
version="1.0.2",
description="[Mimecast](https://www.mimecast.com) is a set of cloud services designed to provide next generation protection against advanced email-borne threats such as malicious URLs, malware, impersonation attacks, as well as internally generated threats, with a focus on email security. This plugin utilizes the [Mimecast API](https://www.mimecast.com/developer/documentation)",
author="rapid7",
author_email="",
Expand Down
Loading

0 comments on commit 6bb44ac

Please sign in to comment.