Skip to content

Commit

Permalink
ENH: Log warning when runtime timeout is near
Browse files Browse the repository at this point in the history
skipci
  • Loading branch information
cortadocodes committed Dec 16, 2024
1 parent c467cbd commit b553dfa
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 13 deletions.
19 changes: 15 additions & 4 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import importlib.metadata
import json
import logging
import time
import uuid

import google.api_core.exceptions
Expand Down Expand Up @@ -232,11 +233,12 @@ def answer(self, question, heartbeat_interval=120, timeout=30):

try:
self._send_delivery_acknowledgment(**routing_metadata)
start_time = time.perf_counter()

heartbeater = RepeatingTimer(
interval=heartbeat_interval,
function=self._send_heartbeat,
kwargs=routing_metadata,
function=self._send_heartbeat_and_check_runtime,
kwargs={"start_time": start_time, **routing_metadata},
)

heartbeater.daemon = True
Expand Down Expand Up @@ -666,24 +668,30 @@ def _send_delivery_acknowledgment(

logger.info("%r acknowledged receipt of question %r.", self, question_uuid)

def _send_heartbeat(
def _send_heartbeat_and_check_runtime(
self,
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
retry_count,
start_time,
runtime_timeout_warning_time=3480, # This is 58 minutes in seconds.
timeout=30,
):
"""Send a heartbeat to the parent, indicating that the service is alive.
"""Send a heartbeat to the parent, indicating that the service is alive. If it's been running for longer than
the runtime warning time, log a warning that it will be stopped soon. This is primarily to warn the user of
the Cloud Run one hour timeout.
:param str question_uuid: the UUID of the question this event relates to
:param str|None parent_question_uuid: the UUID of the question that triggered this question
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the parent that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param int|float start_time: the `time.perf_counter` time that the analysis was started [s]
:param int|float runtime_timeout_warning_time: the amount of time after which to warn that the runtime timeout is approaching [s]
:param float timeout: time in seconds after which to give up sending
:return None:
"""
Expand All @@ -700,6 +708,9 @@ def _send_heartbeat(
timeout=timeout,
)

if time.perf_counter() - start_time > runtime_timeout_warning_time:
logger.warning("This analysis will reach the maximum runtime and be stopped soon.")

logger.debug("Heartbeat sent by %r.", self)

def _send_monitor_message(
Expand Down
16 changes: 8 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "octue"
version = "0.61.0"
version = "0.61.1"
description = "A package providing template applications for data services, and a python SDK to the Octue API."
readme = "README.md"
authors = ["Marcus Lugg <marcus@octue.com>", "Thomas Clark <support@octue.com>"]
Expand Down
28 changes: 28 additions & 0 deletions tests/cloud/pub_sub/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,34 @@ def run_function(*args, **kwargs):
delta=datetime.timedelta(0.05),
)

def test_runtime_timeout_warning_logged(self):
"""Test that a warning is logged when the runtime timeout warning time is reached."""

def run_function(*args, **kwargs):
time.sleep(0.3)
return MockAnalysis()

child = MockService(backend=BACKEND, run_function=lambda *args, **kwargs: run_function())
parent = MockService(backend=BACKEND, children={child.id: child})
child.serve()

with patch(
"octue.cloud.emulators._pub_sub.MockService.answer",
functools.partial(child.answer, heartbeat_interval=0.1),
):
with patch(
"octue.cloud.pub_sub.service.Service._send_heartbeat_and_check_runtime",
functools.partial(child._send_heartbeat_and_check_runtime, runtime_timeout_warning_time=0),
):
with self.assertLogs(level=logging.WARNING) as logging_context:
subscription, _ = parent.ask(service_id=child.id, input_values={})
parent.wait_for_answer(subscription)

self.assertIn(
"This analysis will reach the maximum runtime and be stopped soon.",
logging_context.output[0],
)

def test_send_monitor_messages_periodically(self):
"""Test that monitor messages are sent periodically if set up in the run function and that the periodic monitor
message thread doesn't stop the result from being received (i.e. message sending is thread-safe).
Expand Down

0 comments on commit b553dfa

Please sign in to comment.