Skip to content

Commit

Permalink
PLGN-398: filter out any previously returned log events/update time f…
Browse files Browse the repository at this point in the history
…ormat
  • Loading branch information
joneill-r7 committed Oct 4, 2023
1 parent aa76ba0 commit 80dda5b
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 19 deletions.
6 changes: 3 additions & 3 deletions plugins/okta/.CHECKSUM
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"spec": "2a124bebc54c6ed4bbefa8caa038ce0f",
"manifest": "96a067bc255b4bd4eab6fb3dfae79354",
"setup": "f41cee058863b3ed32c6048d876f1ea1",
"spec": "3ad1604efd5761d7129ee19e728efb5d",
"manifest": "ae6c3d90c00d8b25576f218a99b62763",
"setup": "f0f0aa00f602f9aa8da621bfe91e7107",
"schemas": [
{
"identifier": "add_user_to_group/schema.py",
Expand Down
2 changes: 1 addition & 1 deletion plugins/okta/bin/komand_okta
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from sys import argv

Name = "Okta"
Vendor = "rapid7"
Version = "4.2.0"
Version = "4.2.1"
Description = "Secure identity management and single sign-on to any application"


Expand Down
1 change: 1 addition & 0 deletions plugins/okta/help.md
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ by Okta themselves, or constructed by the plugin based on the information it has

# Version History

* 4.2.1 - Monitor Logs task: filter previously returned log events and update timestamp format.
* 4.2.0 - Monitor Logs task: return raw logs data without cleaning and use last log time as checkpoint in time for next run.
* 4.1.1 - Monitor Logs task: strip http/https in hostname
* 4.1.0 - New action Get User Groups | Update to latest SDK version
Expand Down
60 changes: 55 additions & 5 deletions plugins/okta/komand_okta/tasks/monitor_logs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
parameters = {}
try:
now = self.get_current_time() - timedelta(minutes=1) # allow for latency of this being triggered
now_iso = now.isoformat()
now_iso = self.get_iso(now)
next_page_link = state.get(self.NEXT_PAGE_LINK)
if not state:
self.logger.info("First run")
last_24_hours = now - timedelta(hours=24)
parameters = {"since": last_24_hours.isoformat(), "until": now_iso, "limit": 1000}
parameters = {"since": self.get_iso(last_24_hours), "until": now_iso, "limit": 1000}
state[self.LAST_COLLECTION_TIMESTAMP] = now_iso
else:
if next_page_link:
Expand All @@ -49,7 +49,8 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
if not next_page_link
else self.connection.api_client.get_next_page(next_page_link)
)
next_page_link, new_logs = self.get_next_page_link(new_logs_resp.headers), new_logs_resp.json()
next_page_link = self.get_next_page_link(new_logs_resp.headers)
new_logs = self.get_events(new_logs_resp.json(), state.get(self.LAST_COLLECTION_TIMESTAMP))
if next_page_link:
state[self.NEXT_PAGE_LINK] = next_page_link
has_more_pages = True
Expand All @@ -64,22 +65,71 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
def get_current_time():
return datetime.now(timezone.utc)

@staticmethod
def get_iso(time: datetime) -> str:
"""
Match the timestamp format used in old collector code and the format that Okta uses for 'published' to allow
comparison in `get_events`. e.g. '2023-10-02T15:43:51.450Z'
:param time: newly formatted time string value.
:return: formatted time string.
"""
return time.isoformat("T", "milliseconds").replace("+00:00", "Z")

@staticmethod
def get_next_page_link(headers: dict) -> str:
"""
Find the next page of results link from the response headers. Header example: `link: <url>; rel="next"`
:param headers: response headers from the request to Okta.
:return: next page link if available.
"""
links = headers.get("link").split(", ")
next_link = None
for link in links:
matched_link = re.match("<(.*?)>", link) if 'rel="next"' in link else None
next_link = matched_link.group(1) if matched_link else None
return next_link

def get_events(self, logs: list, time: str) -> list:
"""
In the collector code we would iterate over all events and drop any that match the 'since' parameter to make
sure that we don't double ingest the same event from the previous run (see `get_last_collection_timestamp`).
:param logs: response json including all returned events from Okta.
:param time: 'since' parameter being used to query Okta.
:return: filtered_logs: removed any events that matched the query start time.
"""

# If Okta returns only 1 event (no new events occurred) returned in a previous run we want to remove this.
if len(logs) == 1 and logs[0].get("published") == time:
self.logger.info("No new events found since last execution.")
return []

log = "Returning {filtered} log event(s) from this iteration. "
pop_index, filtered_logs = 0, logs

for index, event in enumerate(logs):
published = event.get("published")
if published and published > time:
pop_index = index
break
if pop_index:
filtered_logs = logs[pop_index:]
log += f"Removed {pop_index} event log(s) that should have been returned in previous iteration."
self.logger.info(log.format(filtered=len(filtered_logs)))
return filtered_logs

def get_last_collection_timestamp(self, now: str, new_logs: list) -> str:
"""
Mirror the behaviour in collector code to save the TS of the last parsed event as the 'since' time checkpoint.
:param now: default value to use if no event logs returned to save the time from.
:param new_logs: event logs returned from Okta.
:return: new time value to save as the checkpoint to query 'since' on the next run.
"""
new_ts = ""
# Mirror the behaviour in collector code to save the TS of the last parsed event as the 'since' time checkpoint.
if new_logs: # make sure that logs were returned from Okta otherwise will get index error
new_ts = new_logs[-1].get("published")
self.logger.info(f"Saving the last record's published timestamp ({new_ts}) as checkpoint.")
if not new_ts:
self.logger.warn(f'No published record to use as last timestamp, reverting to use "now" ({now})')
self.logger.warning(f'No published record to use as last timestamp, reverting to use "now" ({now})')
new_ts = now

return new_ts
2 changes: 1 addition & 1 deletion plugins/okta/plugin.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ sdk:
version: 5
user: nobody
description: Secure identity management and single sign-on to any application
version: 4.2.0
version: 4.2.1
connection_version: 4
resources:
source_url: https://github.com/rapid7/insightconnect-plugins/tree/master/plugins/okta
Expand Down
2 changes: 1 addition & 1 deletion plugins/okta/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


setup(name="okta-rapid7-plugin",
version="4.2.0",
version="4.2.1",
description="Secure identity management and single sign-on to any application",
author="rapid7",
author_email="",
Expand Down
81 changes: 81 additions & 0 deletions plugins/okta/unit_test/expected/get_logs_filtered.json.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"logs": [
{
"actor": {
"id": "12345",
"type": "User",
"alternateId": "user@example.com",
"displayName": "User 2"
},
"client": {
"userAgent": {
"rawUserAgent": "python-requests/2.26.0",
"os": "Unknown",
"browser": "UNKNOWN"
},
"zone": "null",
"device": "Unknown",
"ipAddress": "198.51.100.1",
"geographicalContext": {}
},
"authenticationContext": {
"externalSessionId": "12345"
},
"displayMessage": "Clear user session",
"eventType": "user.session.clear",
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
"isp": "test",
"domain": "example.com",
"isProxy": false
},
"severity": "INFO",
"debugContext": {
"debugData": {
"requestId": "12345",
"dtHash": "11111ecd0ecfb444ee1fcb9687ba8b174a3c8d251ce927e6016b871bc0222222",
"requestUri": "/api/v1/users/12345/lifecycle/suspend",
"url": "/api/v1/users/12345/lifecycle/suspend?"
}
},
"legacyEventType": "core.user_auth.session_clear",
"transaction": {
"type": "WEB",
"id": "12345",
"detail": {
"requestApiTokenId": "12345"
}
},
"uuid": "9de5069c-5afe-602b-2ea0-a04b66beb2c0",
"version": "0",
"request": {
"ipChain": [
{
"ip": "198.51.100.1",
"geographicalContext": {},
"version": "V4"
}
]
},
"target": [
{
"id": "12345",
"type": "User",
"alternateId": "user@example.com",
"displayName": "User Test123"
}
]
}
],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
},
"has_more_pages": true,
"status_code": 200
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"last_collection_timestamp": "2023-04-27T08:33:46"
"last_collection_timestamp": "2023-04-27T08:33:46.123Z"
}
73 changes: 73 additions & 0 deletions plugins/okta/unit_test/responses/get_logs_single_event.json.resp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
[
{
"actor": {
"id": "12345",
"type": "User",
"alternateId": "user@example.com",
"displayName": "User 2"
},
"client": {
"userAgent": {
"rawUserAgent": "python-requests/2.26.0",
"os": "Unknown",
"browser": "UNKNOWN"
},
"zone": "null",
"device": "Unknown",
"ipAddress": "198.51.100.1",
"geographicalContext": {}
},
"authenticationContext": {
"externalSessionId": "12345"
},
"displayMessage": "Clear user session",
"eventType": "user.session.clear",
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
"isp": "test",
"domain": "example.com",
"isProxy": false
},
"severity": "INFO",
"debugContext": {
"debugData": {
"requestId": "12345",
"dtHash": "11111ecd0ecfb444ee1fcb9687ba8b174a3c8d251ce927e6016b871bc0222222",
"requestUri": "/api/v1/users/12345/lifecycle/suspend",
"url": "/api/v1/users/12345/lifecycle/suspend?"
}
},
"legacyEventType": "core.user_auth.session_clear",
"transaction": {
"type": "WEB",
"id": "12345",
"detail": {
"requestApiTokenId": "12345"
}
},
"uuid": "9de5069c-5afe-602b-2ea0-a04b66beb2c0",
"version": "0",
"request": {
"ipChain": [
{
"ip": "198.51.100.1",
"geographicalContext": {},
"version": "V4"
}
]
},
"target": [
{
"id": "12345",
"type": "User",
"alternateId": "user@example.com",
"displayName": "User Test123"
}
]
}
]
50 changes: 47 additions & 3 deletions plugins/okta/unit_test/test_monitor_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
from unittest import TestCase
from komand_okta.tasks.monitor_logs.task import MonitorLogs
from util import Util
from unittest.mock import patch
from unittest.mock import patch, call
from parameterized import parameterized
from datetime import datetime
from datetime import datetime, timezone


@patch(
"komand_okta.tasks.monitor_logs.task.MonitorLogs.get_current_time",
return_value=datetime.strptime("2023-04-28T08:34:46", "%Y-%m-%dT%H:%M:%S"),
return_value=datetime(2023, 4, 28, 8, 34, 46, 123156, timezone.utc),
)
@patch("requests.request", side_effect=Util.mock_request)
class TestMonitorLogs(TestCase):
Expand Down Expand Up @@ -45,3 +45,47 @@ def test_monitor_logs(self, mock_request, mock_get_time, test_name, current_stat
self.assertEqual(actual, expected.get("logs"))
self.assertEqual(actual_state, expected.get("state"))
self.assertEqual(has_more_pages, expected.get("has_more_pages"))

@patch("logging.Logger.info")
def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
# Test the filtering of events returned in a previous iteration. Workflow being tested:
# 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T07:49:21.764Z'
# 2. The next execution will use this timestamp, meaning the last event will be returned again from Okta.
# 3. This duplicate event should be removed so that it is not returned to IDR again.

current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.764Z"}
expected = Util.read_file_to_dict("expected/get_logs_filtered.json.exp")
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual_state, expected.get("state"))
self.assertEqual(has_more_pages, expected.get("has_more_pages"))

# make sure that the mocked response contained 2 log entries and that 1 is filtered out in `get_events`
expected_logs = expected.get("logs")
logger_call = call(
"Returning 1 log event(s) from this iteration. Removed 1 event log(s) that should have "
"been returned in previous iteration."
)

self.assertIn(logger_call, mocked_logger.call_args_list)
self.assertEqual(len(actual), len(expected_logs))
self.assertEqual(actual, expected_logs)

@patch("logging.Logger.info")
@patch("logging.Logger.warning")
def test_monitor_logs_filters_single_event(self, mocked_warn_log, mocked_info_log, *mocks):
# Test filtering when a single event is returned that was in the previous iteration.

now = "2023-04-28T08:33:46.123Z" # Mocked value of 'now' - 1 minute
current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.777Z"} # TS of the event in mocked response
expected = {"last_collection_timestamp": now}
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual_state, expected)
self.assertEqual(has_more_pages, False) # empty results so no next pages

# make sure that the mocked response contained a single entry that we discarded and logged this happening
logger_info_call = call("No new events found since last execution.")
logger_warn_call = call(f'No published record to use as last timestamp, reverting to use "now" ({now})')

self.assertIn(logger_info_call, mocked_info_log.call_args_list)
self.assertIn(logger_warn_call, mocked_warn_log.call_args_list)
self.assertEqual(actual, []) # no events returned after filtering
12 changes: 8 additions & 4 deletions plugins/okta/unit_test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ def json(self):
global first_request

if url == "https://example.com/api/v1/logs":
if params == {"since": "2023-04-27T08:33:46", "until": "2023-04-28T08:33:46", "limit": 1000}:
return MockResponse(
200, "get_logs.json.resp", {"link": '<https://example.com/nextLink?q=next> rel="next"'}
)
resp_args = {
"status_code": 200,
"filename": "get_logs.json.resp",
"headers": {"link": '<https://example.com/nextLink?q=next> rel="next"'},
}
if params.get("since") == "2023-04-27T07:49:21.777Z":
resp_args["filename"], resp_args["headers"] = "get_logs_single_event.json.resp", {"link": ""}
return MockResponse(**resp_args)
if url == "https://example.com/nextLink?q=next":
return MockResponse(200, "get_logs_next_page.json.resp", {"link": ""})
if url == "https://example.com/api/v1/groups/12345/users" and first_request:
Expand Down

0 comments on commit 80dda5b

Please sign in to comment.