Skip to content

Commit 24e6fd5

Browse files
authored
🐛♻️ Comp backend: progress of a processing container is now limited to 99% (ITISFoundation#5080)
1 parent d7cce77 commit 24e6fd5

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from pathlib import Path
99
from pprint import pformat
1010
from types import TracebackType
11-
from typing import cast
11+
from typing import Final, cast
1212
from uuid import uuid4
1313

1414
from aiodocker import Docker
@@ -40,6 +40,7 @@
4040

4141
logger = logging.getLogger(__name__)
4242
CONTAINER_WAIT_TIME_SECS = 2
43+
_TASK_PROCESSING_PROGRESS_WEIGHT: Final[float] = 0.99
4344

4445

4546
@dataclass(kw_only=True, frozen=True, slots=True)
@@ -217,6 +218,7 @@ async def run(self, command: list[str]) -> TaskOutputData:
217218
log_file_url=self.log_file_url,
218219
log_publishing_cb=self._publish_sidecar_log,
219220
s3_settings=self.s3_settings,
221+
container_processing_progress_weight=_TASK_PROCESSING_PROGRESS_WEIGHT,
220222
):
221223
await container.start()
222224
await self._publish_sidecar_log(

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import logging
44
import re
55
import socket
6+
from collections.abc import AsyncGenerator, AsyncIterator, Awaitable, Callable
67
from pathlib import Path
78
from pprint import pformat
8-
from typing import Any, AsyncGenerator, AsyncIterator, Awaitable, Callable, cast
9+
from typing import Any, Final, cast
910

1011
import aiofiles
1112
import aiofiles.tempfile
@@ -151,15 +152,20 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float:
151152
return float(value_str.strip())
152153

153154

155+
_OSPARC_LOG_NUM_PARTS: Final[int] = 2
156+
157+
154158
async def _try_parse_progress(
155159
line: str, *, progress_regexp: re.Pattern[str]
156160
) -> float | None:
157161
with log_catch(logger, reraise=False):
158162
# pattern might be like "timestamp log"
159163
log = line.strip("\n")
160164
splitted_log = log.split(" ", maxsplit=1)
161-
with contextlib.suppress(arrow.ParserError):
162-
if len(splitted_log) == 2 and arrow.get(splitted_log[0]):
165+
with contextlib.suppress(arrow.ParserError, ValueError):
166+
if len(splitted_log) == _OSPARC_LOG_NUM_PARTS and arrow.get(
167+
splitted_log[0]
168+
):
163169
log = splitted_log[1]
164170
if match := re.search(progress_regexp, log):
165171
return _guess_progress_value(match)
@@ -172,19 +178,23 @@ async def _parse_and_publish_logs(
172178
*,
173179
task_publishers: TaskPublisher,
174180
progress_regexp: re.Pattern[str],
181+
container_processing_progress_weight: float,
175182
) -> None:
176183
progress_value = await _try_parse_progress(
177184
log_line, progress_regexp=progress_regexp
178185
)
186+
assert 0 < container_processing_progress_weight <= 1.0 # nosec # noqa: PLR2004
179187
if progress_value is not None:
180-
task_publishers.publish_progress(progress_value)
188+
task_publishers.publish_progress(
189+
container_processing_progress_weight * progress_value
190+
)
181191

182192
task_publishers.publish_logs(
183193
message=log_line, log_level=guess_message_log_level(log_line)
184194
)
185195

186196

187-
async def _parse_container_log_file(
197+
async def _parse_container_log_file( # noqa: PLR0913 # pylint: disable=too-many-arguments
188198
*,
189199
container: DockerContainer,
190200
progress_regexp: re.Pattern[str],
@@ -196,6 +206,7 @@ async def _parse_container_log_file(
196206
log_file_url: LogFileUploadURL,
197207
log_publishing_cb: LogPublishingCB,
198208
s3_settings: S3Settings | None,
209+
max_monitoring_progress_value: float,
199210
) -> None:
200211
log_file = task_volumes.logs_folder / LEGACY_SERVICE_LOG_FILE_NAME
201212
with log_context(
@@ -215,6 +226,7 @@ async def _parse_container_log_file(
215226
line,
216227
task_publishers=task_publishers,
217228
progress_regexp=progress_regexp,
229+
container_processing_progress_weight=max_monitoring_progress_value,
218230
)
219231

220232
# finish reading the logs if possible
@@ -228,6 +240,7 @@ async def _parse_container_log_file(
228240
line,
229241
task_publishers=task_publishers,
230242
progress_regexp=progress_regexp,
243+
container_processing_progress_weight=max_monitoring_progress_value,
231244
)
232245

233246
# copy the log file to the log_file_url
@@ -247,6 +260,7 @@ async def _parse_container_docker_logs(
247260
log_file_url: LogFileUploadURL,
248261
log_publishing_cb: LogPublishingCB,
249262
s3_settings: S3Settings | None,
263+
container_processing_progress_weight: float,
250264
) -> None:
251265
with log_context(
252266
logger, logging.DEBUG, "started monitoring of >=1.0 service - using docker logs"
@@ -276,6 +290,7 @@ async def _parse_container_docker_logs(
276290
log_msg_without_timestamp,
277291
task_publishers=task_publishers,
278292
progress_regexp=progress_regexp,
293+
container_processing_progress_weight=container_processing_progress_weight,
279294
)
280295

281296
# copy the log file to the log_file_url
@@ -284,7 +299,7 @@ async def _parse_container_docker_logs(
284299
)
285300

286301

287-
async def _monitor_container_logs(
302+
async def _monitor_container_logs( # noqa: PLR0913 # pylint: disable=too-many-arguments
288303
*,
289304
container: DockerContainer,
290305
progress_regexp: re.Pattern[str],
@@ -296,6 +311,7 @@ async def _monitor_container_logs(
296311
log_file_url: LogFileUploadURL,
297312
log_publishing_cb: LogPublishingCB,
298313
s3_settings: S3Settings | None,
314+
container_processing_progress_weight: float,
299315
) -> None:
300316
"""Services running with integration version 0.0.0 are logging into a file
301317
that must be available in task_volumes.log / log.dat
@@ -321,6 +337,7 @@ async def _monitor_container_logs(
321337
log_file_url=log_file_url,
322338
log_publishing_cb=log_publishing_cb,
323339
s3_settings=s3_settings,
340+
container_processing_progress_weight=container_processing_progress_weight,
324341
)
325342
else:
326343
await _parse_container_log_file(
@@ -334,11 +351,12 @@ async def _monitor_container_logs(
334351
log_file_url=log_file_url,
335352
log_publishing_cb=log_publishing_cb,
336353
s3_settings=s3_settings,
354+
max_monitoring_progress_value=container_processing_progress_weight,
337355
)
338356

339357

340358
@contextlib.asynccontextmanager
341-
async def managed_monitor_container_log_task(
359+
async def managed_monitor_container_log_task( # noqa: PLR0913 # pylint: disable=too-many-arguments
342360
container: DockerContainer,
343361
progress_regexp: re.Pattern[str],
344362
service_key: ContainerImage,
@@ -349,6 +367,7 @@ async def managed_monitor_container_log_task(
349367
log_file_url: LogFileUploadURL,
350368
log_publishing_cb: LogPublishingCB,
351369
s3_settings: S3Settings | None,
370+
container_processing_progress_weight: float,
352371
) -> AsyncIterator[Awaitable[None]]:
353372
monitoring_task = None
354373
try:
@@ -369,6 +388,7 @@ async def managed_monitor_container_log_task(
369388
log_file_url=log_file_url,
370389
log_publishing_cb=log_publishing_cb,
371390
s3_settings=s3_settings,
391+
container_processing_progress_weight=container_processing_progress_weight,
372392
),
373393
name=f"{service_key}:{service_version}_{container.id}_monitoring_task",
374394
)

0 commit comments

Comments
 (0)