diff --git a/plugins/okta/help.md b/plugins/okta/help.md index e1ac3f7c1b..15a6281272 100644 --- a/plugins/okta/help.md +++ b/plugins/okta/help.md @@ -1612,7 +1612,7 @@ by Okta themselves, or constructed by the plugin based on the information it has # Version History -* 4.2.1 - Monitor Logs task: filter previously returned log events | only update time checkpoint when an event is returned | update timestamp format. +* 4.2.1 - Monitor Logs task: filter previously returned log events | only update time checkpoint when an event is returned | update timestamp format | set cutoff time of 24 hours. * 4.2.0 - Monitor Logs task: return raw logs data without cleaning and use last log time as checkpoint in time for next run. * 4.1.1 - Monitor Logs task: strip http/https in hostname * 4.1.0 - New action Get User Groups | Update to latest SDK version diff --git a/plugins/okta/komand_okta/tasks/monitor_logs/task.py b/plugins/okta/komand_okta/tasks/monitor_logs/task.py index dae77e795a..6e6e550c24 100755 --- a/plugins/okta/komand_okta/tasks/monitor_logs/task.py +++ b/plugins/okta/komand_okta/tasks/monitor_logs/task.py @@ -29,10 +29,10 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument try: now = self.get_current_time() - timedelta(minutes=1) # allow for latency of this being triggered now_iso = self.get_iso(now) + last_24_hours = self.get_iso(now - timedelta(hours=24)) # cut off point - never query beyond 24 hours next_page_link = state.get(self.NEXT_PAGE_LINK) if not state: self.logger.info("First run") - last_24_hours = self.get_iso(now - timedelta(hours=24)) parameters = {"since": last_24_hours, "until": now_iso, "limit": 1000} state[self.LAST_COLLECTION_TIMESTAMP] = last_24_hours # we only change this once we get new events else: @@ -40,7 +40,11 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument state.pop(self.NEXT_PAGE_LINK) self.logger.info("Getting the next page of results...") else: - parameters = {"since": state.get(self.LAST_COLLECTION_TIMESTAMP), "until": now_iso, "limit": 1000} + parameters = { + "since": self.get_since(state.get(self.LAST_COLLECTION_TIMESTAMP), last_24_hours), + "until": now_iso, + "limit": 1000, + } self.logger.info("Subsequent run...") try: self.logger.info(f"Calling Okta with parameters={parameters} and next_page={next_page_link}") @@ -75,6 +79,25 @@ def get_iso(time: datetime) -> str: """ return time.isoformat("T", "milliseconds").replace("+00:00", "Z") + def get_since(self, saved_time: str, cut_off: str) -> str: + """ + If the customer has paused this task for an extended amount of time we don't want start polling events that + exceed 24 hours ago. Check if the saved state is beyond this and revert to use the last 24 hours time. + :param saved_time: saved state time from the last iteration. + :param cut_off: string time of now - 24 hours. + :return: updated time string to use in the parameters. + """ + + if saved_time and saved_time > cut_off: + since = saved_time + else: + self.logger.info( + f"Saved state {saved_time} exceeds the cut off (24 hours)." f" Reverting to use time: {cut_off}" + ) + since = cut_off + + return since + @staticmethod def get_next_page_link(headers: dict) -> str: """ diff --git a/plugins/okta/unit_test/test_monitor_logs.py b/plugins/okta/unit_test/test_monitor_logs.py index 37d32cc0ac..b20efd255d 100644 --- a/plugins/okta/unit_test/test_monitor_logs.py +++ b/plugins/okta/unit_test/test_monitor_logs.py @@ -1,8 +1,3 @@ -import sys -import os - -sys.path.append(os.path.abspath("../")) - from unittest import TestCase from komand_okta.tasks.monitor_logs.task import MonitorLogs from util import Util @@ -10,6 +5,11 @@ from parameterized import parameterized from datetime import datetime, timezone +import sys +import os + +sys.path.append(os.path.abspath("../")) + @patch( "komand_okta.tasks.monitor_logs.task.MonitorLogs.get_current_time", @@ -51,7 +51,7 @@ def setUpClass(cls) -> None: ], ] ) - def test_monitor_logs(self, mocked_warn, mock_request, mock_get_time, test_name, current_state, expected): + def test_monitor_logs(self, mocked_warn, mock_request, _mock_get_time, test_name, current_state, expected): # Tests and their workflow descriptions: # 1. without_state - first run, query from 24 hours ago until now and results returned. # 2. with_state - queries using the saved 'last_collection_timestamp' to pull new logs. @@ -77,7 +77,7 @@ def test_monitor_logs(self, mocked_warn, mock_request, mock_get_time, test_name, self.assertIn(log_call, mocked_warn.call_args_list) @patch("logging.Logger.info") - def test_monitor_logs_filters_events(self, mocked_logger, *mocks): + def test_monitor_logs_filters_events(self, mocked_logger, *_mocks): # Test the filtering of events returned in a previous iteration. Workflow being tested: # 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T08:49:21.764Z' # 2. The next execution will use this timestamp, meaning the last event will be returned again from Okta. @@ -104,6 +104,8 @@ def test_monitor_logs_filters_events(self, mocked_logger, *mocks): def test_monitor_logs_filters_single_event(self, mocked_info_log, mocked_warn_log, *mocks): # Test filtering when a single event is returned that was in the previous iteration. + # temp change mocked timestamp to be within the cutoff time without changing mocked response data. + mocks[1].return_value = datetime(2023, 4, 27, 8, 45, 46, 123156, timezone.utc) now = "2023-04-28T08:33:46.123Z" # Mocked value of 'now' - 1 minute current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.777Z"} # TS of the event in mocked response actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state) @@ -121,3 +123,24 @@ def test_monitor_logs_filters_single_event(self, mocked_info_log, mocked_warn_lo self.assertIn(logger_info_call, mocked_info_log.call_args_list) self.assertIn(logger_warn_call, mocked_warn_log.call_args_list) self.assertEqual(actual, []) # no events returned after filtering + + @patch("logging.Logger.info") + def test_monitor_logs_applies_cut_off(self, mocked_info_log, *_mocks): + # Test the scenario that a customer has paused the collector for an extended amount of time to test when they + # resume the task we should cut off, at a max 24 hours ago for since parameter. + expected = Util.read_file_to_dict("expected/get_logs.json.exp") # logs from last 24 hours + + paused_time = "2022-04-27T07:49:21.777Z" + current_state = {"last_collection_timestamp": paused_time} # Task has been paused for 1 year+ + actual, state, _has_more_pages, _status_code, _error = self.action.run(state=current_state) + + # Basic check that we match the same as a first test/run which returns logs from the last 24 hours + self.assertEqual(actual, expected.get("logs")) + self.assertEqual(state, expected.get("state")) + + # Check we called with the current parameters by looking at the info log + logger = call( + f"Saved state {paused_time} exceeds the cut off (24 hours). " + f"Reverting to use time: 2023-04-27T08:33:46.123Z" + ) + self.assertIn(logger, mocked_info_log.call_args_list)