Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Carbon Black Cloud] Observability Window Restraint (#2877) #2891

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions plugins/carbon_black_cloud/.CHECKSUM
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"spec": "66318d167ac1b5d373d20192eeeabf6c",
"manifest": "8edbe16f85abc5a43d8e13a86d5e0f4e",
"setup": "1aef9354d76d5741f137fe1af55f02ff",
"spec": "e65a2ba63a0336dd71483fe60f8fc09e",
"manifest": "cc99bad588629becc537d4e9726b339c",
"setup": "a3be16b44e39ce0215df20244b1b719d",
"schemas": [
{
"identifier": "get_agent_details/schema.py",
Expand Down
2 changes: 1 addition & 1 deletion plugins/carbon_black_cloud/bin/icon_carbon_black_cloud
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from sys import argv

Name = "VMware Carbon Black Cloud"
Vendor = "rapid7"
Version = "2.2.6"
Version = "2.2.7"
Description = "The [VMware Carbon Black Cloud](https://www.carbonblack.com/products/vmware-carbon-black-cloud/) is a cloud-native endpoint protection platform (EPP) that combines the intelligent system hardening and behavioral prevention needed to keep emerging threats at bay, using a single lightweight agent and an easy-to-use console. Manage and contain threats on your Carbon Black endpoints using this plugin"


Expand Down
1 change: 1 addition & 0 deletions plugins/carbon_black_cloud/help.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ Example output:

# Version History

* 2.2.7 - Restrain the observability window to a configurable amount if data collection falls behind
* 2.2.6 - Update SDK to 6.1.4
* 2.2.5 - To split the PAGE_SIZE limit into ALERT_PAGE_SIZE and OBSERVATION_PAGE_SIZE
* 2.2.4 - Add new connection tests for tasks | Update SDK to 6.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# Custom imports below

from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta, timezone, tzinfo
from typing import Dict, Tuple, Any

from icon_carbon_black_cloud.util.helper_util import hash_sha1
Expand All @@ -15,20 +15,22 @@
TIME_FORMAT,
)

# State held values
# State values
RATE_LIMITED = "rate_limited_until"
LAST_ALERT_TIME = "last_alert_time"
LAST_ALERT_HASHES = "last_alert_hashes"
LAST_OBSERVATION_TIME = "last_observation_time"
LAST_OBSERVATION_HASHES = "last_observation_hashes"
LAST_OBSERVATION_JOB = "last_observation_job"
LAST_OBSERVATION_JOB_TIME = "last_observation_job_time"
OBSERVATION_QUERY_END_TIME = "observation_end_time"
OBSERVATION_JOB_OFFSET = "observation_job_offset"

# CB can return 10K per API and suggest that if more than this is returned to then query from last event time.
# To prevent overloading IDR/PIF drop this limit to 2.5k on each endpoint.
# This value can also be customised via CPS with the page_size property.
ALERT_PAGE_SIZE_DEFAULT = 2500
OBSERVATION_PAGE_SIZE_DEFAULT = 2500
ALERT_PAGE_SIZE_DEFAULT = 200
OBSERVATION_PAGE_SIZE_DEFAULT = 7300
OBSERVATION_WINDOW = 3

DEFAULT_LOOKBACK = 5 # first look back time in minutes
MAX_LOOKBACK = 7 # allows saved state to be within 7 days to auto recover from an error
Expand Down Expand Up @@ -64,28 +66,45 @@ def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-
# Force 'now' to be 15 minutes before now as CB Analytics alerts can be updated for up to 15 minutes
# following the original backend_timestamp, after which time the alert is considered immutable.
now = now_time - timedelta(minutes=15)
end_time = now.strftime(TIME_FORMAT)

# Check if we have made use of custom config to change the start times from DEFAULT_LOOKBACK
alerts_start, observations_start, alert_page_size, observation_page_size, debug = self._parse_custom_config(
custom_config, now, state
(
alerts_start,
observations_start,
alert_page_size,
observation_page_size,
observation_window,
debug,
) = self._parse_custom_config(custom_config, now, state)

# calculate end window
calculated_end_window = datetime.strptime(observations_start, TIME_FORMAT).astimezone(
timezone.utc
) + timedelta(hours=observation_window)
alert_end_time = now.strftime(TIME_FORMAT)
observability_end_time = (
now.strftime(TIME_FORMAT)
if now < calculated_end_window
else calculated_end_window.strftime(TIME_FORMAT)
)

# Retrieve job ID from last run or trigger a new one
observation_job_id = state.get(LAST_OBSERVATION_JOB)
if not observation_job_id:
self.logger.info("No observation job ID found in state, triggering a new job...")
observation_job_id, state = self.trigger_observation_search_job(
observations_start, end_time, observation_page_size, debug, state
observations_start, observability_end_time, observation_page_size, debug, state
)

alerts, alert_has_more_pages, state = self.get_alerts(alerts_start, end_time, alert_page_size, debug, state)
alerts, alert_has_more_pages, state = self.get_alerts(
alerts_start, alert_end_time, alert_page_size, debug, state
)
alerts_and_observations.extend(alerts)
alerts_success = True

if observation_job_id:
observations, observations_has_more_pages, state = self.get_observations(
observation_job_id, observation_page_size, debug, state
observation_job_id, observation_page_size, debug, state, now
)
alerts_and_observations.extend(observations)
if observations_has_more_pages or alert_has_more_pages:
Expand Down Expand Up @@ -154,18 +173,21 @@ def trigger_observation_search_job(
self, start_time: str, end_time: str, page_size: int, debug: bool, state: Dict[str, str]
) -> Tuple[str, Dict]:
endpoint = f"api/investigate/v2/orgs/{self.connection.org_key}/observations/search_jobs"
index = state.get(OBSERVATION_JOB_OFFSET, 0)
search_params = {
"rows": page_size,
"start": 0,
"start": index,
"fields": ["*"],
"criteria": {"observation_type": OBSERVATION_TYPES},
"sort": [{"field": OBSERVATION_TIME_FIELD, "order": "asc"}],
"time_range": {"start": start_time, "end": end_time},
}
url = f"{self.connection.base_url}/{endpoint}"
self.logger.info(f"Triggering observation search using parameters {search_params['time_range']}")
self.logger.info("Triggering observation search", time_start=start_time, time_end=end_time, start=index)
observation_job_id = self.connection.request_api(url, search_params, debug=debug).get("job_id")

state[OBSERVATION_QUERY_END_TIME] = end_time

if observation_job_id:
self.logger.info(
f"Saving observation job ID {observation_job_id} to the state. "
Expand All @@ -183,7 +205,7 @@ def trigger_observation_search_job(
return observation_job_id, state

def get_observations(
self, job_id: str, page_size: int, debug: bool, state: Dict[str, str]
self, job_id: str, page_size: int, debug: bool, state: Dict[str, Any], now: datetime
) -> Tuple[list, bool, Dict[str, str]]:
observations, has_more_pages = [], False
endpoint = f"api/investigate/v2/orgs/{self.connection.org_key}/observations/search_jobs/{job_id}/results"
Expand All @@ -197,10 +219,11 @@ def get_observations(
if job_completed or job_time_exceeded:
# only observations if the job is completed otherwise it is partial results and these are not sorted
observations = observation_json.get("results", [])

start_observation_time = state.get(LAST_OBSERVATION_TIME)
if observations:
# pass start time as the time saved in state - noticed 1 occasion CB API may return an observation
# with a device_timestamp before queried window in which case we can't use this as the start time
start_observation_time = state.get(LAST_OBSERVATION_TIME)
observations, state = self._dedupe_and_get_last_time(observations, state, start_observation_time)

num_found = observation_json.get("num_found")
Expand All @@ -209,9 +232,29 @@ def get_observations(
f"More data is available on the API (num_found={num_found}) - setting has_more_pages=True..."
)
has_more_pages = True

# we use if not observations, vs else, to ensure we pick up on the case where we dedupe every observation
if not observations:
state[LAST_OBSERVATION_TIME] = state.get(OBSERVATION_QUERY_END_TIME)

if not has_more_pages:
end_of_window = state.get(OBSERVATION_QUERY_END_TIME)
has_more_pages = datetime.strptime(end_of_window, TIME_FORMAT).replace(tzinfo=timezone.utc) != now

# remove the job ID as this is completed and next run we want to trigger a new one
del state[LAST_OBSERVATION_JOB]
del state[LAST_OBSERVATION_JOB_TIME]

# if LAST_OBSERVATION_TIME hasn't got moved forward then we need to query the same time frame
# this is because observations can arrive before the start time and also occur so frequently
# that they can be returned in a page size but not move the time forward
if state.get(LAST_OBSERVATION_TIME) <= start_observation_time:
state[OBSERVATION_JOB_OFFSET] = state.get(OBSERVATION_JOB_OFFSET, 0) + page_size
state[LAST_OBSERVATION_TIME] = start_observation_time
has_more_pages = True
else:
state.pop(OBSERVATION_JOB_OFFSET, None)

else:
self.logger.info("Job is not yet finished running, will get results in next task execution...")
has_more_pages = True # trigger again as it should be finished imminently (jobs run for a max of 3 minutes)
Expand Down Expand Up @@ -240,7 +283,7 @@ def _dedupe_and_get_last_time(
alert_time = alert.get(time_key)
# Observations quirk that it can return an alert time < start_time of the job search, should be in previous
# run but return anyway as it won't be held in the hash values.
if (alert_time == start_time and hash_sha1(alert) not in old_hashes) or alert_time < start_time:
if alert_time <= start_time and hash_sha1(alert) not in old_hashes:
deduped_alerts.append(alert)
elif alert_time > start_time:
deduped_alerts += alerts[index:] # we've gone past start time, keep the rest of the alerts
Expand All @@ -264,7 +307,7 @@ def _dedupe_and_get_last_time(

def _parse_custom_config(
self, custom_config: Dict[str, Any], now: datetime, saved_state: Dict[str, str]
) -> Tuple[str, str, int, int, bool]:
) -> Tuple[str, str, int, int, int, bool]:
"""
Takes custom config from CPS and allows the specification of a new start time for either alerts or observations,
and allows the page_size to be customised.
Expand All @@ -282,6 +325,9 @@ def _parse_custom_config(
alert_page_size = custom_config.get("alert_page_size", ALERT_PAGE_SIZE_DEFAULT)
observation_page_size = custom_config.get("observation_page_size", OBSERVATION_PAGE_SIZE_DEFAULT)

# window for observations
observation_window = custom_config.get("observation_window", OBSERVATION_WINDOW)

# this flag will be used to allow logging of request times for debugging
debug = custom_config.get("debug", False)

Expand All @@ -305,6 +351,7 @@ def _parse_custom_config(
if comparison_date > saved_time:
self.logger.info(f"Saved time ({saved_time}) exceeds cut off, moving to ({comparison_date}).")
state[cb_type_time] = comparison_date
state.pop(OBSERVATION_JOB_OFFSET, None)

alerts_start = state.get(LAST_ALERT_TIME)
observation_start = state.get(LAST_OBSERVATION_TIME)
Expand All @@ -313,7 +360,7 @@ def _parse_custom_config(
f"{log_msg}Applying the following start times: alerts='{alerts_start}' "
f"and observations='{observation_start}'. Max pages: alert_page_size='{alert_page_size}, observation_page_size='{observation_page_size}'."
)
return alerts_start, observation_start, alert_page_size, observation_page_size, debug
return alerts_start, observation_start, alert_page_size, observation_page_size, observation_window, debug

def _check_if_job_time_exceeded(self, job_start_time: str, job_id: str) -> bool:
"""
Expand Down
3 changes: 2 additions & 1 deletion plugins/carbon_black_cloud/plugin.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ products: [insightconnect]
name: carbon_black_cloud
title: VMware Carbon Black Cloud
description: The [VMware Carbon Black Cloud](https://www.carbonblack.com/products/vmware-carbon-black-cloud/) is a cloud-native endpoint protection platform (EPP) that combines the intelligent system hardening and behavioral prevention needed to keep emerging threats at bay, using a single lightweight agent and an easy-to-use console. Manage and contain threats on your Carbon Black endpoints using this plugin
version: 2.2.6
version: 2.2.7
vendor: rapid7
support: rapid7
cloud_ready: true
Expand All @@ -18,6 +18,7 @@ requirements:
- API Credentials
- Base URL
version_history:
- "2.2.7 - Restrain the observability window to a configurable amount if data collection falls behind"
- "2.2.6 - Update SDK to 6.1.4"
- "2.2.5 - To split the PAGE_SIZE limit into ALERT_PAGE_SIZE and OBSERVATION_PAGE_SIZE"
- "2.2.4 - Add new connection tests for tasks | Update SDK to 6.1.0"
Expand Down
2 changes: 1 addition & 1 deletion plugins/carbon_black_cloud/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


setup(name="carbon_black_cloud-rapid7-plugin",
version="2.2.6",
version="2.2.7",
description="The [VMware Carbon Black Cloud](https://www.carbonblack.com/products/vmware-carbon-black-cloud/) is a cloud-native endpoint protection platform (EPP) that combines the intelligent system hardening and behavioral prevention needed to keep emerging threats at bay, using a single lightweight agent and an easy-to-use console. Manage and contain threats on your Carbon Black endpoints using this plugin",
author="rapid7",
author_email="",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{
"results": [
{
"backend_timestamp": "2024-04-25T15:37:38.389Z",
"device_group_id": 0,
"device_id": 12345,
"device_name": "device/name",
"device_policy_id": 123,
"device_timestamp": "2024-04-25T15:41:00.000Z",
"enriched": true,
"enriched_event_type": [
"NETWORK"
],
"event_description": "event description",
"event_id": "event-id-1234-5",
"event_type": "netconn",
"ingress_time": 1714035021905,
"legacy": true,
"netconn_inbound": false,
"netconn_ipv4": 56789,
"netconn_local_ipv4": 9877654,
"netconn_location": ",,Reserved",
"netconn_port": 123,
"observation_description": "observation description",
"netconn_protocol": "PROTO_TCP",
"observation_id": "observation-id-1234",
"observation_type": "CONTEXTUAL_ACTIVITY",
"org_id": "org-id-123",
"parent_guid": "parent-guid-7890",
"parent_pid": 123,
"process_guid": "process-gui-1234",
"process_hash": [
"hash-1234",
"hash-6780"
],
"process_name": "c:\\windows\\system32\\process.exe",
"process_pid": [
1234
],
"process_username": [
"NT AUTHORITY\\SYSTEM"
]
},
{
"backend_timestamp": "2024-04-25T15:39:38.389Z",
"device_group_id": 0,
"device_id": 12345,
"device_name": "device/name",
"device_policy_id": 123,
"device_timestamp": "2024-04-25T15:43:00.000Z",
"enriched": true,
"enriched_event_type": [
"NETWORK"
],
"event_description": "event description",
"event_id": "event-id-1234-5",
"event_type": "netconn",
"ingress_time": 1714035021905,
"legacy": true,
"netconn_inbound": false,
"netconn_ipv4": 56789,
"netconn_local_ipv4": 9877654,
"netconn_location": ",,Reserved",
"netconn_port": 123,
"observation_description": "observation description",
"netconn_protocol": "PROTO_TCP",
"observation_id": "observation-id-1234",
"observation_type": "CONTEXTUAL_ACTIVITY",
"org_id": "org-id-123",
"parent_guid": "parent-guid-7890",
"parent_pid": 123,
"process_guid": "process-gui-1234",
"process_hash": [
"hash-1234",
"hash-6780"
],
"process_name": "c:\\windows\\system32\\process.exe",
"process_pid": [
1234
],
"process_username": [
"NT AUTHORITY\\SYSTEM"
]
}
],
"num_found": 2,
"num_available": 2,
"approximate_unaggregated": 392,
"num_aggregated": 392,
"contacted": 3,
"completed": 3,
"message": ""
}
Loading
Loading