Skip to content

Commit

Permalink
PLGN-398: add logic to prevent searching beyond 24 hours
Browse files Browse the repository at this point in the history
  • Loading branch information
joneill-r7 committed Oct 9, 2023
1 parent 159ee23 commit 311c6f8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 10 deletions.
2 changes: 1 addition & 1 deletion plugins/okta/help.md
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ by Okta themselves, or constructed by the plugin based on the information it has

# Version History

* 4.2.1 - Monitor Logs task: filter previously returned log events | only update time checkpoint when an event is returned | update timestamp format.
* 4.2.1 - Monitor Logs task: filter previously returned log events | only update time checkpoint when an event is returned | update timestamp format | set cutoff time of 24 hours.
* 4.2.0 - Monitor Logs task: return raw logs data without cleaning and use last log time as checkpoint in time for next run.
* 4.1.1 - Monitor Logs task: strip http/https in hostname
* 4.1.0 - New action Get User Groups | Update to latest SDK version
Expand Down
27 changes: 25 additions & 2 deletions plugins/okta/komand_okta/tasks/monitor_logs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@ def run(self, params={}, state={}): # pylint: disable=unused-argument
try:
now = self.get_current_time() - timedelta(minutes=1) # allow for latency of this being triggered
now_iso = self.get_iso(now)
last_24_hours = self.get_iso(now - timedelta(hours=24)) # cut off point - never query beyond 24 hours
next_page_link = state.get(self.NEXT_PAGE_LINK)
if not state:
self.logger.info("First run")
last_24_hours = self.get_iso(now - timedelta(hours=24))
parameters = {"since": last_24_hours, "until": now_iso, "limit": 1000}
state[self.LAST_COLLECTION_TIMESTAMP] = last_24_hours # we only change this once we get new events
else:
if next_page_link:
state.pop(self.NEXT_PAGE_LINK)
self.logger.info("Getting the next page of results...")
else:
parameters = {"since": state.get(self.LAST_COLLECTION_TIMESTAMP), "until": now_iso, "limit": 1000}
parameters = {
"since": self.get_since(state.get(self.LAST_COLLECTION_TIMESTAMP), last_24_hours),
"until": now_iso,
"limit": 1000,
}
self.logger.info("Subsequent run...")
try:
self.logger.info(f"Calling Okta with parameters={parameters} and next_page={next_page_link}")
Expand Down Expand Up @@ -75,6 +79,25 @@ def get_iso(time: datetime) -> str:
"""
return time.isoformat("T", "milliseconds").replace("+00:00", "Z")

def get_since(self, saved_time: str, cut_off: str) -> str:
"""
If the customer has paused this task for an extended amount of time we don't want start polling events that
exceed 24 hours ago. Check if the saved state is beyond this and revert to use the last 24 hours time.
:param saved_time: saved state time from the last iteration.
:param cut_off: string time of now - 24 hours.
:return: updated time string to use in the parameters.
"""

if saved_time and saved_time > cut_off:
since = saved_time
else:
self.logger.info(
f"Saved state {saved_time} exceeds the cut off (24 hours)." f" Reverting to use time: {cut_off}"
)
since = cut_off

return since

@staticmethod
def get_next_page_link(headers: dict) -> str:
"""
Expand Down
37 changes: 30 additions & 7 deletions plugins/okta/unit_test/test_monitor_logs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import sys
import os

sys.path.append(os.path.abspath("../"))

from unittest import TestCase
from komand_okta.tasks.monitor_logs.task import MonitorLogs
from util import Util
from unittest.mock import patch, call
from parameterized import parameterized
from datetime import datetime, timezone

import sys
import os

sys.path.append(os.path.abspath("../"))


@patch(
"komand_okta.tasks.monitor_logs.task.MonitorLogs.get_current_time",
Expand Down Expand Up @@ -51,7 +51,7 @@ def setUpClass(cls) -> None:
],
]
)
def test_monitor_logs(self, mocked_warn, mock_request, mock_get_time, test_name, current_state, expected):
def test_monitor_logs(self, mocked_warn, mock_request, _mock_get_time, test_name, current_state, expected):
# Tests and their workflow descriptions:
# 1. without_state - first run, query from 24 hours ago until now and results returned.
# 2. with_state - queries using the saved 'last_collection_timestamp' to pull new logs.
Expand All @@ -77,7 +77,7 @@ def test_monitor_logs(self, mocked_warn, mock_request, mock_get_time, test_name,
self.assertIn(log_call, mocked_warn.call_args_list)

@patch("logging.Logger.info")
def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
def test_monitor_logs_filters_events(self, mocked_logger, *_mocks):
# Test the filtering of events returned in a previous iteration. Workflow being tested:
# 1. C2C executed and queried for events until 8am however the last event time was '2023-04-27T08:49:21.764Z'
# 2. The next execution will use this timestamp, meaning the last event will be returned again from Okta.
Expand All @@ -104,6 +104,8 @@ def test_monitor_logs_filters_events(self, mocked_logger, *mocks):
def test_monitor_logs_filters_single_event(self, mocked_info_log, mocked_warn_log, *mocks):
# Test filtering when a single event is returned that was in the previous iteration.

# temp change mocked timestamp to be within the cutoff time without changing mocked response data.
mocks[1].return_value = datetime(2023, 4, 27, 8, 45, 46, 123156, timezone.utc)
now = "2023-04-28T08:33:46.123Z" # Mocked value of 'now' - 1 minute
current_state = {"last_collection_timestamp": "2023-04-27T07:49:21.777Z"} # TS of the event in mocked response
actual, actual_state, has_more_pages, status_code, error = self.action.run(state=current_state)
Expand All @@ -121,3 +123,24 @@ def test_monitor_logs_filters_single_event(self, mocked_info_log, mocked_warn_lo
self.assertIn(logger_info_call, mocked_info_log.call_args_list)
self.assertIn(logger_warn_call, mocked_warn_log.call_args_list)
self.assertEqual(actual, []) # no events returned after filtering

@patch("logging.Logger.info")
def test_monitor_logs_applies_cut_off(self, mocked_info_log, *_mocks):
# Test the scenario that a customer has paused the collector for an extended amount of time to test when they
# resume the task we should cut off, at a max 24 hours ago for since parameter.
expected = Util.read_file_to_dict("expected/get_logs.json.exp") # logs from last 24 hours

paused_time = "2022-04-27T07:49:21.777Z"
current_state = {"last_collection_timestamp": paused_time} # Task has been paused for 1 year+
actual, state, _has_more_pages, _status_code, _error = self.action.run(state=current_state)

# Basic check that we match the same as a first test/run which returns logs from the last 24 hours
self.assertEqual(actual, expected.get("logs"))
self.assertEqual(state, expected.get("state"))

# Check we called with the current parameters by looking at the info log
logger = call(
f"Saved state {paused_time} exceeds the cut off (24 hours). "
f"Reverting to use time: 2023-04-27T08:33:46.123Z"
)
self.assertIn(logger, mocked_info_log.call_args_list)

0 comments on commit 311c6f8

Please sign in to comment.