diff --git a/src/dvc_task/proc/manager.py b/src/dvc_task/proc/manager.py index 5853113..bb78047 100644 --- a/src/dvc_task/proc/manager.py +++ b/src/dvc_task/proc/manager.py @@ -1,10 +1,12 @@ """Serverless process manager.""" import json +import locale import logging import os import signal import sys +import time from typing import Generator, List, Optional, Tuple, Union from celery import Signature, signature # pylint: disable=no-name-in-module @@ -174,3 +176,44 @@ def cleanup(self, force: bool = False): self.remove(name, force) except ProcessNotTerminatedError: continue + + def follow( + self, + name: str, + encoding: Optional[str] = None, + sleep_interval: int = 1, + ) -> Generator[str, None, None]: + """Iterate over lines in redirected output for a process. + + This will block calling thread when waiting for output (until the + followed process has exited). + + Arguments: + name: Process name. + encoding: Text encoding for redirected output. Defaults to + `locale.getpreferredencoding()`. + sleep_interval: Sleep interval for follow iterations (when waiting + for output). + + Note: + Yielded strings may not always end in line terminators (all + available output will yielded if EOF is reached). + """ + output_path = self[name].stdout + if output_path is None: + return + with open( + output_path, + encoding=encoding or locale.getpreferredencoding(), + ) as fobj: + while True: + offset = fobj.tell() + line = fobj.readline() + if line: + yield line + else: + info = self[name] + if info.returncode is not None: + return + time.sleep(sleep_interval) + fobj.seek(offset) diff --git a/tests/proc/test_manager.py b/tests/proc/test_manager.py index dd7146d..6825edb 100644 --- a/tests/proc/test_manager.py +++ b/tests/proc/test_manager.py @@ -1,4 +1,5 @@ """Process manager tests.""" +import builtins import signal import sys @@ -11,6 +12,7 @@ UnsupportedSignalError, ) from dvc_task.proc.manager import ProcessManager +from dvc_task.proc.process import ProcessInfo from .conftest import PID_RUNNING @@ -114,13 +116,50 @@ def test_remove( def test_cleanup( # pylint: disable=too-many-arguments mocker: MockerFixture, tmp_dir: TmpDir, + process_manager: ProcessManager, running_process: str, finished_process: str, force: bool, ): """Process directory should be removed.""" mocker.patch("os.kill", return_value=None) - process_manager = ProcessManager(tmp_dir) process_manager.cleanup(force) assert (tmp_dir / running_process).exists() != force assert not (tmp_dir / finished_process).exists() + + +def test_follow( + mocker: MockerFixture, + process_manager: ProcessManager, + running_process: str, +): + """Output should be followed and not duplicated.""" + orig_open = builtins.open + mock_file = mocker.mock_open()() + expected = ["foo\n", "bar\n", "b", "", "az\n"] + mock_file.readline = mocker.Mock(side_effect=expected) + + def _open(path, *args, **kwargs): + if path.endswith(".out"): + return mock_file + return orig_open(path, *args, **kwargs) + + mocker.patch("builtins.open", _open) + mock_sleep = mocker.patch("time.sleep") + follow_gen = process_manager.follow(running_process) + for line in expected: + if line: + assert line == next(follow_gen) + mock_sleep.assert_called_once_with(1) + + # Process exit with no further output should cause StopIteration + # (raised as RuntimeError) + mocker.patch.object( + process_manager, + "__getitem__", + return_value=ProcessInfo( + pid=PID_RUNNING, stdin=None, stdout=None, stderr=None, returncode=0 + ), + ) + with pytest.raises(RuntimeError): + next(follow_gen)