Skip to content

Commit

Permalink
proc: add ProcessManager.follow for 'tail -f' redirected output
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Mar 8, 2022
1 parent ea680cb commit a5a62ae
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 1 deletion.
43 changes: 43 additions & 0 deletions src/dvc_task/proc/manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Serverless process manager."""

import json
import locale
import logging
import os
import signal
import sys
import time
from typing import Generator, List, Optional, Tuple, Union

from celery import Signature, signature # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -174,3 +176,44 @@ def cleanup(self, force: bool = False):
self.remove(name, force)
except ProcessNotTerminatedError:
continue

def follow(
self,
name: str,
encoding: Optional[str] = None,
sleep_interval: int = 1,
) -> Generator[str, None, None]:
"""Iterate over lines in redirected output for a process.
This will block calling thread when waiting for output (until the
followed process has exited).
Arguments:
name: Process name.
encoding: Text encoding for redirected output. Defaults to
`locale.getpreferredencoding()`.
sleep_interval: Sleep interval for follow iterations (when waiting
for output).
Note:
Yielded strings may not always end in line terminators (all
available output will yielded if EOF is reached).
"""
output_path = self[name].stdout
if output_path is None:
return
with open(
output_path,
encoding=encoding or locale.getpreferredencoding(),
) as fobj:
while True:
offset = fobj.tell()
line = fobj.readline()
if line:
yield line
else:
info = self[name]
if info.returncode is not None:
return
time.sleep(sleep_interval)
fobj.seek(offset)
41 changes: 40 additions & 1 deletion tests/proc/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Process manager tests."""
import builtins
import signal
import sys

Expand All @@ -11,6 +12,7 @@
UnsupportedSignalError,
)
from dvc_task.proc.manager import ProcessManager
from dvc_task.proc.process import ProcessInfo

from .conftest import PID_RUNNING

Expand Down Expand Up @@ -114,13 +116,50 @@ def test_remove(
def test_cleanup( # pylint: disable=too-many-arguments
mocker: MockerFixture,
tmp_dir: TmpDir,
process_manager: ProcessManager,
running_process: str,
finished_process: str,
force: bool,
):
"""Process directory should be removed."""
mocker.patch("os.kill", return_value=None)
process_manager = ProcessManager(tmp_dir)
process_manager.cleanup(force)
assert (tmp_dir / running_process).exists() != force
assert not (tmp_dir / finished_process).exists()


def test_follow(
mocker: MockerFixture,
process_manager: ProcessManager,
running_process: str,
):
"""Output should be followed and not duplicated."""
orig_open = builtins.open
mock_file = mocker.mock_open()()
expected = ["foo\n", "bar\n", "b", "", "az\n"]
mock_file.readline = mocker.Mock(side_effect=expected)

def _open(path, *args, **kwargs):
if path.endswith(".out"):
return mock_file
return orig_open(path, *args, **kwargs)

mocker.patch("builtins.open", _open)
mock_sleep = mocker.patch("time.sleep")
follow_gen = process_manager.follow(running_process)
for line in expected:
if line:
assert line == next(follow_gen)
mock_sleep.assert_called_once_with(1)

# Process exit with no further output should cause StopIteration
# (raised as RuntimeError)
mocker.patch.object(
process_manager,
"__getitem__",
return_value=ProcessInfo(
pid=PID_RUNNING, stdin=None, stdout=None, stderr=None, returncode=0
),
)
with pytest.raises(RuntimeError):
next(follow_gen)

0 comments on commit a5a62ae

Please sign in to comment.