Skip to content

Commit

Permalink
PLGN-398: only update collection timestamp when new events returned.
Browse files Browse the repository at this point in the history
  • Loading branch information
joneill-r7 committed Oct 9, 2023
1 parent 3a137b8 commit 9360dfc
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 39 deletions.
2 changes: 1 addition & 1 deletion plugins/okta/help.md
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ by Okta themselves, or constructed by the plugin based on the information it has

# Version History

* 4.2.1 - Monitor Logs task: filter previously returned log events and update timestamp format.
* 4.2.1 - Monitor Logs task: filter previously returned log events | only update time checkpoint when an event is returned | update timestamp format.
* 4.2.0 - Monitor Logs task: return raw logs data without cleaning and use last log time as checkpoint in time for next run.
* 4.1.1 - Monitor Logs task: strip http/https in hostname
* 4.1.0 - New action Get User Groups | Update to latest SDK version
Expand Down
25 changes: 15 additions & 10 deletions plugins/okta/komand_okta/tasks/monitor_logs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
next_page_link = state.get(self.NEXT_PAGE_LINK)
if not state:
self.logger.info("First run")
last_24_hours = now - timedelta(hours=24)
parameters = {"since": self.get_iso(last_24_hours), "until": now_iso, "limit": 1000}
state[self.LAST_COLLECTION_TIMESTAMP] = now_iso
last_24_hours = self.get_iso(now - timedelta(hours=24))
parameters = {"since": last_24_hours, "until": now_iso, "limit": 1000}
state[self.LAST_COLLECTION_TIMESTAMP] = last_24_hours # we only change this once we get new events
else:
if next_page_link:
state.pop(self.NEXT_PAGE_LINK)
Expand All @@ -54,7 +54,7 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
if next_page_link:
state[self.NEXT_PAGE_LINK] = next_page_link
has_more_pages = True
state[self.LAST_COLLECTION_TIMESTAMP] = self.get_last_collection_timestamp(now_iso, new_logs)
state[self.LAST_COLLECTION_TIMESTAMP] = self.get_last_collection_timestamp(new_logs, state)
return new_logs, state, has_more_pages, 200, None
except ApiException as error:
return [], state, False, error.status_code, error
Expand Down Expand Up @@ -103,7 +103,7 @@ def get_events(self, logs: list, time: str) -> list:
self.logger.info("No new events found since last execution.")
return []

log = "Returning {filtered} log event(s) from this iteration. "
log = "Returning {filtered} log event(s) from this iteration."
pop_index, filtered_logs = 0, logs

for index, event in enumerate(logs):
Expand All @@ -113,23 +113,28 @@ def get_events(self, logs: list, time: str) -> list:
break
if pop_index:
filtered_logs = logs[pop_index:]
log += f"Removed {pop_index} event log(s) that should have been returned in previous iteration."
log += f" Removed {pop_index} event log(s) that should have been returned in previous iteration."
self.logger.info(log.format(filtered=len(filtered_logs)))
return filtered_logs

def get_last_collection_timestamp(self, now: str, new_logs: list) -> str:
def get_last_collection_timestamp(self, new_logs: list, state: dict) -> str:
"""
Mirror the behaviour in collector code to save the TS of the last parsed event as the 'since' time checkpoint.
:param now: default value to use if no event logs returned to save the time from.
If no new events found then we want to keep the current checkpoint the same.
:param new_logs: event logs returned from Okta.
:param state: access state dictionary to get the current checkpoint in time if no new logs.
:return: new time value to save as the checkpoint to query 'since' on the next run.
"""
new_ts = ""
if new_logs: # make sure that logs were returned from Okta otherwise will get index error
new_ts = new_logs[-1].get("published")
self.logger.info(f"Saving the last record's published timestamp ({new_ts}) as checkpoint.")
if not new_ts:
self.logger.warning(f'No published record to use as last timestamp, reverting to use "now" ({now})')
new_ts = now
state_time = state.get(self.LAST_COLLECTION_TIMESTAMP)
self.logger.warning(
f"No record to use as last timestamp, will not move checkpoint forward. "
f"Keeping value of {state_time}"
)
new_ts = state_time

return new_ts
6 changes: 3 additions & 3 deletions plugins/okta/unit_test/expected/get_logs.json.exp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.764Z",
"published": "2023-04-27T08:49:21.764Z",
"securityContext": {
"asNumber": 123456,
"asOrg": "test",
Expand Down Expand Up @@ -97,7 +97,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"published": "2023-04-27T09:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
Expand Down Expand Up @@ -144,7 +144,7 @@
}
],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"last_collection_timestamp": "2023-04-27T09:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
},
"has_more_pages": true,
Expand Down
8 changes: 8 additions & 0 deletions plugins/okta/unit_test/expected/get_logs_empty_resp.json.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"logs": [],
"state": {
"last_collection_timestamp": "2023-04-27T08:33:46.123Z"
},
"has_more_pages": false,
"status_code": 200
}
4 changes: 2 additions & 2 deletions plugins/okta/unit_test/expected/get_logs_filtered.json.exp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"published": "2023-04-27T09:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
Expand Down Expand Up @@ -73,7 +73,7 @@
}
],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"last_collection_timestamp": "2023-04-27T09:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
},
"has_more_pages": true,
Expand Down
8 changes: 8 additions & 0 deletions plugins/okta/unit_test/expected/get_logs_next_empty.json.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"logs": [],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z"
},
"has_more_pages": false,
"status_code": 200
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"last_collection_timestamp": "2023-04-27T08:34:46",
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
}
4 changes: 2 additions & 2 deletions plugins/okta/unit_test/responses/get_logs.json.resp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.764Z",
"published": "2023-04-27T08:49:21.764Z",
"securityContext": {
"asNumber": 123456,
"asOrg": "test",
Expand Down Expand Up @@ -96,7 +96,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"published": "2023-04-27T09:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
52 changes: 42 additions & 10 deletions plugins/okta/unit_test/test_monitor_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
return_value=datetime(2023, 4, 28, 8, 34, 46, 123156, timezone.utc),
)
@patch("requests.request", side_effect=Util.mock_request)
@patch("logging.Logger.warning")
class TestMonitorLogs(TestCase):
@classmethod
def setUpClass(cls) -> None:
Expand All @@ -38,22 +39,51 @@ def setUpClass(cls) -> None:
Util.read_file_to_dict("inputs/monitor_logs_next_page.json.inp"),
Util.read_file_to_dict("expected/get_logs_next_page.json.exp"),
],
[
"next_page_no_results",
Util.read_file_to_dict("inputs/monitor_logs_next_page.json.inp"),
Util.read_file_to_dict("expected/get_logs_next_empty.json.exp"),
],
[
"without_state_no_results",
Util.read_file_to_dict("inputs/monitor_logs_without_state.json.inp"),
Util.read_file_to_dict("expected/get_logs_empty_resp.json.exp"),
],
]
)
def test_monitor_logs(self, mock_request, mock_get_time, test_name, current_state, expected):
def test_monitor_logs(self, mocked_warn, mock_request, mock_get_time, test_name, current_state, expected):
# Tests and their workflow descriptions:
# 1. without_state - first run, query from 24 hours ago until now and results returned.
# 2. with_state - queries using the saved 'last_collection_timestamp' to pull new logs.
# 3. next_page - state has `next_page_link` which returns more logs to parse.
# 4. next_page_no_results -`next_page_link` but the output of this is no logs - we don't move the TS forward.
# 5. without_state_no_results - first run but no results returned - save state as the 'since' parameter value

if test_name in ["next_page_no_results", "without_state_no_results"]:
mock_request.side_effect = Util.mock_empty_response

actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual, expected.get("logs"))
self.assertEqual(actual_state, expected.get("state"))
self.assertEqual(has_more_pages, expected.get("has_more_pages"))

# Check errors returned and logger warning only applied in tests 4 and 5.
self.assertEqual(error, None)
if mocked_warn.called:
log_call = call(
"No record to use as last timestamp, will not move checkpoint forward. "
f"Keeping value of {expected.get('state').get('last_collection_timestamp')}"
)
self.assertIn(log_call, mocked_warn.call_args_list)

@patch("logging.Logger.info")
def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
# Test the filtering of events returned in a previous iteration. Workflow being tested:
# 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T07:49:21.764Z'
# 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T08:49:21.764Z'
# 2. The next execution will use this timestamp, meaning the last event will be returned again from Okta.
# 3. This duplicate event should be removed so that it is not returned to IDR again.

current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.764Z"}
current_state = {"last_collection_timestamp": "2023-04-27T08:49:21.764Z"}
expected = Util.read_file_to_dict("expected/get_logs_filtered.json.exp")
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual_state, expected.get("state"))
Expand All @@ -71,20 +101,22 @@ def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
self.assertEqual(actual, expected_logs)

@patch("logging.Logger.info")
@patch("logging.Logger.warning")
def test_monitor_logs_filters_single_event(self, mocked_warn_log, mocked_info_log, *mocks):
def test_monitor_logs_filters_single_event(self, mocked_info_log, mocked_warn_log, *mocks):
# Test filtering when a single event is returned that was in the previous iteration.

now = "2023-04-28T08:33:46.123Z" # Mocked value of 'now' - 1 minute
current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.777Z"} # TS of the event in mocked response
expected = {"last_collection_timestamp": now}
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual_state, expected)
self.assertEqual(has_more_pages, False) # empty results so no next pages
self.assertEqual(actual_state, current_state) # state has not changed because no new events.
self.assertNotEqual(actual_state.get("last_collection_timestamp"), now) # we have not moved the TS forward.
self.assertEqual(has_more_pages, False) # empty results so no next pages.

# make sure that the mocked response contained a single entry that we discarded and logged this happening
# ensure sure that the mocked response contained a single entry that we discarded and logged this happening
logger_info_call = call("No new events found since last execution.")
logger_warn_call = call(f'No published record to use as last timestamp, reverting to use "now" ({now})')
logger_warn_call = call(
f"No record to use as last timestamp, will not move checkpoint forward. "
f'Keeping value of {current_state.get("last_collection_timestamp")}'
)

self.assertIn(logger_info_call, mocked_info_log.call_args_list)
self.assertIn(logger_warn_call, mocked_warn_log.call_args_list)
Expand Down
29 changes: 19 additions & 10 deletions plugins/okta/unit_test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,15 @@ def read_file_to_dict(filename: str) -> dict:
return json.loads(Util.read_file_to_string(filename))

@staticmethod
def mock_request(*args, **kwargs):
class MockResponse:
def __init__(self, status_code: int, filename: str = None, headers: dict = {}):
self.status_code = status_code
self.text = ""
self.headers = headers
if filename:
self.text = Util.read_file_to_string(f"responses/{filename}")
def mock_wrapper(url=""):
return Util.mock_request(url=url)

def json(self):
return json.loads(self.text)
@staticmethod
def mock_empty_response(**kwargs):
return MockResponse(200, "get_logs_empty_response.resp", {"link": ""})

@staticmethod
def mock_request(*args, **kwargs):
method = kwargs.get("method")
url = kwargs.get("url")
params = kwargs.get("params")
Expand Down Expand Up @@ -220,3 +217,15 @@ def json(self):
return MockResponse(404)

raise NotImplementedError("Not implemented", kwargs)


class MockResponse:
def __init__(self, status_code: int, filename: str = None, headers: dict = {}):
self.status_code = status_code
self.text = ""
self.headers = headers
if filename:
self.text = Util.read_file_to_string(f"responses/{filename}")

def json(self):
return json.loads(self.text)

0 comments on commit 9360dfc

Please sign in to comment.