From b553dfa8630a54e34769bc76833a11c7bffb2049 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 16 Dec 2024 11:01:06 +0000 Subject: [PATCH] ENH: Log warning when runtime timeout is near skipci --- octue/cloud/pub_sub/service.py | 19 +++++++++++++++---- poetry.lock | 16 ++++++++-------- pyproject.toml | 2 +- tests/cloud/pub_sub/test_service.py | 28 ++++++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index f750cfa4f..1a38d5c9c 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -5,6 +5,7 @@ import importlib.metadata import json import logging +import time import uuid import google.api_core.exceptions @@ -232,11 +233,12 @@ def answer(self, question, heartbeat_interval=120, timeout=30): try: self._send_delivery_acknowledgment(**routing_metadata) + start_time = time.perf_counter() heartbeater = RepeatingTimer( interval=heartbeat_interval, - function=self._send_heartbeat, - kwargs=routing_metadata, + function=self._send_heartbeat_and_check_runtime, + kwargs={"start_time": start_time, **routing_metadata}, ) heartbeater.daemon = True @@ -666,7 +668,7 @@ def _send_delivery_acknowledgment( logger.info("%r acknowledged receipt of question %r.", self, question_uuid) - def _send_heartbeat( + def _send_heartbeat_and_check_runtime( self, question_uuid, parent_question_uuid, @@ -674,9 +676,13 @@ def _send_heartbeat( parent, originator, retry_count, + start_time, + runtime_timeout_warning_time=3480, # This is 58 minutes in seconds. timeout=30, ): - """Send a heartbeat to the parent, indicating that the service is alive. + """Send a heartbeat to the parent, indicating that the service is alive. If it's been running for longer than + the runtime warning time, log a warning that it will be stopped soon. This is primarily to warn the user of + the Cloud Run one hour timeout. :param str question_uuid: the UUID of the question this event relates to :param str|None parent_question_uuid: the UUID of the question that triggered this question @@ -684,6 +690,8 @@ def _send_heartbeat( :param str parent: the SRUID of the parent that asked the question this event is related to :param str originator: the SRUID of the service revision that triggered all ancestor questions of this question :param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question) + :param int|float start_time: the `time.perf_counter` time that the analysis was started [s] + :param int|float runtime_timeout_warning_time: the amount of time after which to warn that the runtime timeout is approaching [s] :param float timeout: time in seconds after which to give up sending :return None: """ @@ -700,6 +708,9 @@ def _send_heartbeat( timeout=timeout, ) + if time.perf_counter() - start_time > runtime_timeout_warning_time: + logger.warning("This analysis will reach the maximum runtime and be stopped soon.") + logger.debug("Heartbeat sent by %r.", self) def _send_monitor_message( diff --git a/poetry.lock b/poetry.lock index e27d38830..7db364ed6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -60,19 +60,19 @@ files = [ [[package]] name = "attrs" -version = "24.2.0" +version = "24.3.0" description = "Classes Without Boilerplate" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "attrs-24.2.0-py3-none-any.whl", hash = "sha256:81921eb96de3191c8258c199618104dd27ac608d9366f5e35d011eae1867ede2"}, - {file = "attrs-24.2.0.tar.gz", hash = "sha256:5cfb1b9148b5b086569baec03f20d7b6bf3bcacc9a42bebf87ffaaca362f6346"}, + {file = "attrs-24.3.0-py3-none-any.whl", hash = "sha256:ac96cd038792094f438ad1f6ff80837353805ac950cd2aa0e0625ef19850c308"}, + {file = "attrs-24.3.0.tar.gz", hash = "sha256:8f5c07333d543103541ba7be0e2ce16eeee8130cb0b3f9238ab904ce1e85baff"}, ] [package.extras] benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"] cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] -dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier (<24.7)"] tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] @@ -202,13 +202,13 @@ files = [ [[package]] name = "certifi" -version = "2024.8.30" +version = "2024.12.14" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" files = [ - {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, - {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, + {file = "certifi-2024.12.14-py3-none-any.whl", hash = "sha256:1275f7a45be9464efc1173084eaa30f866fe2e47d389406136d332ed4967ec56"}, + {file = "certifi-2024.12.14.tar.gz", hash = "sha256:b650d30f370c2b724812bee08008be0c4163b163ddaec3f2546c1caf65f191db"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index ec577c70d..38c6fb8f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "octue" -version = "0.61.0" +version = "0.61.1" description = "A package providing template applications for data services, and a python SDK to the Octue API." readme = "README.md" authors = ["Marcus Lugg ", "Thomas Clark "] diff --git a/tests/cloud/pub_sub/test_service.py b/tests/cloud/pub_sub/test_service.py index 9085188c2..11075d436 100644 --- a/tests/cloud/pub_sub/test_service.py +++ b/tests/cloud/pub_sub/test_service.py @@ -764,6 +764,34 @@ def run_function(*args, **kwargs): delta=datetime.timedelta(0.05), ) + def test_runtime_timeout_warning_logged(self): + """Test that a warning is logged when the runtime timeout warning time is reached.""" + + def run_function(*args, **kwargs): + time.sleep(0.3) + return MockAnalysis() + + child = MockService(backend=BACKEND, run_function=lambda *args, **kwargs: run_function()) + parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() + + with patch( + "octue.cloud.emulators._pub_sub.MockService.answer", + functools.partial(child.answer, heartbeat_interval=0.1), + ): + with patch( + "octue.cloud.pub_sub.service.Service._send_heartbeat_and_check_runtime", + functools.partial(child._send_heartbeat_and_check_runtime, runtime_timeout_warning_time=0), + ): + with self.assertLogs(level=logging.WARNING) as logging_context: + subscription, _ = parent.ask(service_id=child.id, input_values={}) + parent.wait_for_answer(subscription) + + self.assertIn( + "This analysis will reach the maximum runtime and be stopped soon.", + logging_context.output[0], + ) + def test_send_monitor_messages_periodically(self): """Test that monitor messages are sent periodically if set up in the run function and that the periodic monitor message thread doesn't stop the result from being received (i.e. message sending is thread-safe).