Skip to content

Commit

Permalink
PLGN-398: only update collection timestamp when new events returned.
Browse files Browse the repository at this point in the history
  • Loading branch information
joneill-r7 committed Oct 6, 2023
1 parent 80dda5b commit ba076ef
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 36 deletions.
19 changes: 11 additions & 8 deletions plugins/okta/komand_okta/tasks/monitor_logs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
next_page_link = state.get(self.NEXT_PAGE_LINK)
if not state:
self.logger.info("First run")
last_24_hours = now - timedelta(hours=24)
parameters = {"since": self.get_iso(last_24_hours), "until": now_iso, "limit": 1000}
state[self.LAST_COLLECTION_TIMESTAMP] = now_iso
last_24_hours = self.get_iso(now - timedelta(hours=24))
parameters = {"since": last_24_hours, "until": now_iso, "limit": 1000}
state[self.LAST_COLLECTION_TIMESTAMP] = last_24_hours # we only change this once we get new events
else:
if next_page_link:
state.pop(self.NEXT_PAGE_LINK)
Expand All @@ -54,7 +54,7 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
if next_page_link:
state[self.NEXT_PAGE_LINK] = next_page_link
has_more_pages = True
state[self.LAST_COLLECTION_TIMESTAMP] = self.get_last_collection_timestamp(now_iso, new_logs)
state[self.LAST_COLLECTION_TIMESTAMP] = self.get_last_collection_timestamp(new_logs, state)
return new_logs, state, has_more_pages, 200, None
except ApiException as error:
return [], state, False, error.status_code, error
Expand Down Expand Up @@ -117,19 +117,22 @@ def get_events(self, logs: list, time: str) -> list:
self.logger.info(log.format(filtered=len(filtered_logs)))
return filtered_logs

def get_last_collection_timestamp(self, now: str, new_logs: list) -> str:
def get_last_collection_timestamp(self, new_logs: list, state: dict) -> str:
"""
Mirror the behaviour in collector code to save the TS of the last parsed event as the 'since' time checkpoint.
:param now: default value to use if no event logs returned to save the time from.
If no new events found then we want to keep the current checkpoint the same.
:param new_logs: event logs returned from Okta.
:param state: access state dictionary to get the current checkpoint in time if no new logs.
:return: new time value to save as the checkpoint to query 'since' on the next run.
"""
new_ts = ""
if new_logs: # make sure that logs were returned from Okta otherwise will get index error
new_ts = new_logs[-1].get("published")
self.logger.info(f"Saving the last record's published timestamp ({new_ts}) as checkpoint.")
if not new_ts:
self.logger.warning(f'No published record to use as last timestamp, reverting to use "now" ({now})')
new_ts = now
state_time = state.get(self.LAST_COLLECTION_TIMESTAMP)
self.logger.warning(f'No record to use as last timestamp, will not move checkpoint forward. '
f'Keeping value of {state_time}')
new_ts = state_time

return new_ts
6 changes: 3 additions & 3 deletions plugins/okta/unit_test/expected/get_logs.json.exp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.764Z",
"published": "2023-04-27T08:49:21.764Z",
"securityContext": {
"asNumber": 123456,
"asOrg": "test",
Expand Down Expand Up @@ -97,7 +97,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"published": "2023-04-27T09:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
Expand Down Expand Up @@ -144,7 +144,7 @@
}
],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"last_collection_timestamp": "2023-04-27T09:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
},
"has_more_pages": true,
Expand Down
8 changes: 8 additions & 0 deletions plugins/okta/unit_test/expected/get_logs_empty_resp.json.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"logs": [],
"state": {
"last_collection_timestamp": "2023-04-27T08:33:46.123Z"
},
"has_more_pages": false,
"status_code": 200
}
4 changes: 2 additions & 2 deletions plugins/okta/unit_test/expected/get_logs_filtered.json.exp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"published": "2023-04-27T09:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
Expand Down Expand Up @@ -73,7 +73,7 @@
}
],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"last_collection_timestamp": "2023-04-27T09:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
},
"has_more_pages": true,
Expand Down
8 changes: 8 additions & 0 deletions plugins/okta/unit_test/expected/get_logs_next_empty.json.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"logs": [],
"state": {
"last_collection_timestamp": "2023-04-27T07:49:21.777Z"
},
"has_more_pages": false,
"status_code": 200
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"last_collection_timestamp": "2023-04-27T08:34:46",
"last_collection_timestamp": "2023-04-27T07:49:21.777Z",
"next_page_link": "https://example.com/nextLink?q=next"
}
4 changes: 2 additions & 2 deletions plugins/okta/unit_test/responses/get_logs.json.resp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.764Z",
"published": "2023-04-27T08:49:21.764Z",
"securityContext": {
"asNumber": 123456,
"asOrg": "test",
Expand Down Expand Up @@ -96,7 +96,7 @@
"outcome": {
"result": "SUCCESS"
},
"published": "2023-04-27T07:49:21.777Z",
"published": "2023-04-27T09:49:21.777Z",
"securityContext": {
"asNumber": 12345,
"asOrg": "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
48 changes: 38 additions & 10 deletions plugins/okta/unit_test/test_monitor_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
return_value=datetime(2023, 4, 28, 8, 34, 46, 123156, timezone.utc),
)
@patch("requests.request", side_effect=Util.mock_request)
@patch("logging.Logger.warning")
class TestMonitorLogs(TestCase):
@classmethod
def setUpClass(cls) -> None:
Expand All @@ -38,22 +39,49 @@ def setUpClass(cls) -> None:
Util.read_file_to_dict("inputs/monitor_logs_next_page.json.inp"),
Util.read_file_to_dict("expected/get_logs_next_page.json.exp"),
],
[
"next_page_no_results",
Util.read_file_to_dict("inputs/monitor_logs_next_page.json.inp"),
Util.read_file_to_dict("expected/get_logs_next_empty.json.exp"),
],
[
"without_state_no_results",
Util.read_file_to_dict("inputs/monitor_logs_without_state.json.inp"),
Util.read_file_to_dict("expected/get_logs_empty_resp.json.exp"),
]
]
)
def test_monitor_logs(self, mock_request, mock_get_time, test_name, current_state, expected):
def test_monitor_logs(self, mocked_warn, mock_request, mock_get_time, test_name, current_state, expected):
# Tests and their workflow descriptions:
# 1. without_state - first run, query from 24 hours ago until now and results returned.
# 2. with_state - queries using the saved 'last_collection_timestamp' to pull new logs.
# 3. next_page - state has `next_page_link` which returns more logs to parse.
# 4. next_page_no_results -`next_page_link` but the output of this is no logs - we don't move the TS forward.
# 5. without_state_no_results - first run but no results returned - save state as the 'since' parameter value

if test_name in ["next_page_no_results", "without_state_no_results"]:
mock_request.side_effect = Util.mock_empty_response

actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual, expected.get("logs"))
self.assertEqual(actual_state, expected.get("state"))
self.assertEqual(has_more_pages, expected.get("has_more_pages"))

# Check errors returned and logger warning only applied in tests 4 and 5.
self.assertEqual(error, None)
if mocked_warn.called:
log_call = call("No record to use as last timestamp, will not move checkpoint forward. "
f"Keeping value of {expected.get('state').get('last_collection_timestamp')}")
self.assertIn(log_call, mocked_warn.call_args_list)

@patch("logging.Logger.info")
def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
# Test the filtering of events returned in a previous iteration. Workflow being tested:
# 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T07:49:21.764Z'
# 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T08:49:21.764Z'
# 2. The next execution will use this timestamp, meaning the last event will be returned again from Okta.
# 3. This duplicate event should be removed so that it is not returned to IDR again.

current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.764Z"}
current_state = {"last_collection_timestamp": "2023-04-27T08:49:21.764Z"}
expected = Util.read_file_to_dict("expected/get_logs_filtered.json.exp")
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual_state, expected.get("state"))
Expand All @@ -71,20 +99,20 @@ def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
self.assertEqual(actual, expected_logs)

@patch("logging.Logger.info")
@patch("logging.Logger.warning")
def test_monitor_logs_filters_single_event(self, mocked_warn_log, mocked_info_log, *mocks):
def test_monitor_logs_filters_single_event(self, mocked_info_log, mocked_warn_log, *mocks):
# Test filtering when a single event is returned that was in the previous iteration.

now = "2023-04-28T08:33:46.123Z" # Mocked value of 'now' - 1 minute
current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.777Z"} # TS of the event in mocked response
expected = {"last_collection_timestamp": now}
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
self.assertEqual(actual_state, expected)
self.assertEqual(has_more_pages, False) # empty results so no next pages
self.assertEqual(actual_state, current_state) # state has not changed because no new events.
self.assertNotEqual(actual_state.get('last_collection_timestamp'), now) # we have not moved the TS forward.
self.assertEqual(has_more_pages, False) # empty results so no next pages.

# make sure that the mocked response contained a single entry that we discarded and logged this happening
# ensure sure that the mocked response contained a single entry that we discarded and logged this happening
logger_info_call = call("No new events found since last execution.")
logger_warn_call = call(f'No published record to use as last timestamp, reverting to use "now" ({now})')
logger_warn_call = call(f'No record to use as last timestamp, will not move checkpoint forward. '
f'Keeping value of {current_state.get("last_collection_timestamp")}')

self.assertIn(logger_info_call, mocked_info_log.call_args_list)
self.assertIn(logger_warn_call, mocked_warn_log.call_args_list)
Expand Down
29 changes: 19 additions & 10 deletions plugins/okta/unit_test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,15 @@ def read_file_to_dict(filename: str) -> dict:
return json.loads(Util.read_file_to_string(filename))

@staticmethod
def mock_request(*args, **kwargs):
class MockResponse:
def __init__(self, status_code: int, filename: str = None, headers: dict = {}):
self.status_code = status_code
self.text = ""
self.headers = headers
if filename:
self.text = Util.read_file_to_string(f"responses/{filename}")
def mock_wrapper(url=''):
return Util.mock_request(url=url)

def json(self):
return json.loads(self.text)
@staticmethod
def mock_empty_response(**kwargs):
return MockResponse(200, "get_logs_empty_response.resp", {"link": ""})

@staticmethod
def mock_request(*args, **kwargs):
method = kwargs.get("method")
url = kwargs.get("url")
params = kwargs.get("params")
Expand Down Expand Up @@ -220,3 +217,15 @@ def json(self):
return MockResponse(404)

raise NotImplementedError("Not implemented", kwargs)


class MockResponse:
def __init__(self, status_code: int, filename: str = None, headers: dict = {}):
self.status_code = status_code
self.text = ""
self.headers = headers
if filename:
self.text = Util.read_file_to_string(f"responses/{filename}")

def json(self):
return json.loads(self.text)

0 comments on commit ba076ef

Please sign in to comment.