Skip to content

Commit

Permalink
make RustNotify a context manager and kill the thread on exit (#164)
Browse files Browse the repository at this point in the history
* make RustNotify a context manager and kill the thread on exit

* test for closed watcher

* use Py not PyRef

* update docs

* make clear private
  • Loading branch information
samuelcolvin authored Jun 25, 2022
1 parent a06c0b0 commit 6349679
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 52 deletions.
13 changes: 13 additions & 0 deletions docs/api/rust_backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,16 @@ r = RustNotify(['first/path', 'second/path'], False, False, 0)
changes = r.watch(1_600, 50, 100, None)
print(changes)
```

Or using `RustNotify` as a context manager:

```py
title="Rust backend context manager example"
from watchfiles._rust_notify import RustNotify

with RustNotify(['first/path', 'second/path'], False, False, 0) as r:
changes = r.watch(1_600, 50, 100, None)
print(changes)
```

(See the documentation on `close` above for when the context manager or `close` method are required.)
21 changes: 20 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const CHANGE_DELETED: u8 = 3;

#[derive(Debug)]
enum WatcherEnum {
None,
Poll(PollWatcher),
Recommended(RecommendedWatcher),
}
Expand Down Expand Up @@ -157,13 +158,16 @@ impl RustNotify {
timeout_ms: u64,
stop_event: PyObject,
) -> PyResult<PyObject> {
if matches!(self.watcher, WatcherEnum::None) {
return Err(PyRuntimeError::new_err("RustNotify watcher closed"));
}
let stop_event_is_set: Option<&PyAny> = match stop_event.is_none(py) {
true => None,
false => {
let event: &PyAny = stop_event.extract(py)?;
let func: &PyAny = event.getattr("is_set")?.extract()?;
if !func.is_callable() {
return Err(PyTypeError::new_err("'stop_event.is_set' must be callable".to_string()));
return Err(PyTypeError::new_err("'stop_event.is_set' must be callable"));
}
Some(func)
}
Expand Down Expand Up @@ -228,10 +232,25 @@ impl RustNotify {
Ok(py_changes)
}

/// https://github.com/PyO3/pyo3/issues/1205#issuecomment-1164096251 for advice on `__enter__`
pub fn __enter__(slf: Py<Self>) -> Py<Self> {
slf
}

pub fn close(&mut self) {
self.watcher = WatcherEnum::None;
}

pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, _traceback: PyObject) {
self.close();
}

pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("RustNotify({:#?})", self.watcher))
}
}

impl RustNotify {
fn clear(&self) {
self.changes.lock().unwrap().clear();
}
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ def watch(self, debounce_ms: int, step_ms: int, timeout_ms: int, cancel_event):
self.watch_count += 1
return change

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def close(self):
pass


if TYPE_CHECKING:
from typing import Literal, Protocol
Expand Down
8 changes: 7 additions & 1 deletion tests/test_rust_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ def test_add(test_dir: Path):
assert watcher.watch(200, 50, 500, None) == {(1, str(test_dir / 'new_file.txt'))}


def test_recommended_repr(test_dir: Path):
def test_close(test_dir: Path):
watcher = RustNotify([str(test_dir)], True, False, 0)
assert repr(watcher).startswith('RustNotify(Recommended(\n')

watcher.close()

assert repr(watcher) == 'RustNotify(None)'
with pytest.raises(RuntimeError, match='RustNotify watcher closed'):
watcher.watch(200, 50, 500, None)


def test_modify_write(test_dir: Path):
watcher = RustNotify([str(test_dir)], True, False, 0)
Expand Down
9 changes: 9 additions & 0 deletions tests/test_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ def watch(self, *args):
self.i += 1
return {(Change.added, 'spam.py')}

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def close(self):
pass


async def test_awatch_interrupt_raise(mocker, caplog):
mocker.patch('watchfiles.main.RustNotify', return_value=MockRustNotifyRaise())
Expand Down
28 changes: 26 additions & 2 deletions watchfiles/_rust_notify.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal, Optional, Protocol, Set, Tuple, Union
from typing import Any, List, Literal, Optional, Protocol, Set, Tuple, Union

__all__ = 'RustNotify', 'WatchfilesRustInternalError'

Expand All @@ -11,7 +11,7 @@ class AbstractEvent(Protocol):
class RustNotify:
"""
Interface to the Rust [notify](https://crates.io/crates/notify) crate which does
the heavy lifting of watching for file changes and grouping them into a single event.
the heavy lifting of watching for file changes and grouping them into events.
"""

def __init__(self, watch_paths: List[str], debug: bool, force_polling: bool, poll_delay_ms: int) -> None:
Expand Down Expand Up @@ -57,6 +57,30 @@ class RustNotify:
`'signal'` if a signal was received, `'stop'` if the `stop_event` was set,
or `'timeout'` if `timeout_ms` was exceeded.
"""
def __enter__(self) -> 'RustNotify':
"""
Does nothing, but allows `RustNotify` to be used as a context manager.
Note: the watching thead is created when an instance is initiated, not on
`__enter__`.
"""
def __exit__(self, *args: Any) -> None:
"""
Calls close.
"""
def close(self) -> None:
"""
Stops the watching thread. After `close` is called, the RustNotify instance can no
longer be used, calls to [`watch`][watchfiles.RustNotify.watch] will raise a `RuntimeError`.
Note: `close` is not required, just deleting the `RustNotify` instance will kill the thread
implicitly.
As per samuelcolvin/watchfiles#163 `close()` is only required because in the
event of an error, the traceback in `sys.exc_info` keeps a reference to `watchfiles.watch`'s
frame, so you can't rely on the `RustNotify` object being deleted, and thereby stopping
the watching thread.
"""

class WatchfilesRustInternalError(RuntimeError):
"""
Expand Down
96 changes: 48 additions & 48 deletions watchfiles/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,27 @@ def watch(
print(changes)
```
"""
watcher = RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms)
while True:
raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'signal':
if raise_interrupt:
raise KeyboardInterrupt
else:
logger.warning('KeyboardInterrupt caught, stopping watch')
with RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms) as watcher:
while True:
raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'signal':
if raise_interrupt:
raise KeyboardInterrupt
else:
logger.warning('KeyboardInterrupt caught, stopping watch')
return
elif raw_changes == 'stop':
return
elif raw_changes == 'stop':
return
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes


async def awatch( # noqa C901
Expand Down Expand Up @@ -214,35 +214,35 @@ async def stop_soon():
else:
stop_event_ = stop_event

watcher = RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms)
timeout = _calc_async_timeout(rust_timeout)
CancelledError = anyio.get_cancelled_exc_class()

while True:
async with anyio.create_task_group() as tg:
try:
raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
except (CancelledError, KeyboardInterrupt):
stop_event_.set()
# suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
raise
tg.cancel_scope.cancel()

if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
with RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms) as watcher:
timeout = _calc_async_timeout(rust_timeout)
CancelledError = anyio.get_cancelled_exc_class()

while True:
async with anyio.create_task_group() as tg:
try:
raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
except (CancelledError, KeyboardInterrupt):
stop_event_.set()
# suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
raise
tg.cancel_scope.cancel()

if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'stop':
return
elif raw_changes == 'signal':
# in theory the watch thread should never get a signal
raise RuntimeError('watch thread unexpectedly received a signal')
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'stop':
return
elif raw_changes == 'signal':
# in theory the watch thread should never get a signal
raise RuntimeError('watch thread unexpectedly received a signal')
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes


def _prep_changes(
Expand Down

0 comments on commit 6349679

Please sign in to comment.