Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make RustNotify a context manager and kill the thread on exit #164

Merged
merged 5 commits into from
Jun 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/api/rust_backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,16 @@ r = RustNotify(['first/path', 'second/path'], False, False, 0)
changes = r.watch(1_600, 50, 100, None)
print(changes)
```

Or using `RustNotify` as a context manager:

```py
title="Rust backend context manager example"
from watchfiles._rust_notify import RustNotify

with RustNotify(['first/path', 'second/path'], False, False, 0) as r:
changes = r.watch(1_600, 50, 100, None)
print(changes)
```

(See the documentation on `close` above for when the context manager or `close` method are required.)
21 changes: 20 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const CHANGE_DELETED: u8 = 3;

#[derive(Debug)]
enum WatcherEnum {
None,
Poll(PollWatcher),
Recommended(RecommendedWatcher),
}
Expand Down Expand Up @@ -157,13 +158,16 @@ impl RustNotify {
timeout_ms: u64,
stop_event: PyObject,
) -> PyResult<PyObject> {
if matches!(self.watcher, WatcherEnum::None) {
return Err(PyRuntimeError::new_err("RustNotify watcher closed"));
}
let stop_event_is_set: Option<&PyAny> = match stop_event.is_none(py) {
true => None,
false => {
let event: &PyAny = stop_event.extract(py)?;
let func: &PyAny = event.getattr("is_set")?.extract()?;
if !func.is_callable() {
return Err(PyTypeError::new_err("'stop_event.is_set' must be callable".to_string()));
return Err(PyTypeError::new_err("'stop_event.is_set' must be callable"));
}
Some(func)
}
Expand Down Expand Up @@ -228,10 +232,25 @@ impl RustNotify {
Ok(py_changes)
}

/// https://github.com/PyO3/pyo3/issues/1205#issuecomment-1164096251 for advice on `__enter__`
pub fn __enter__(slf: Py<Self>) -> Py<Self> {
slf
}

pub fn close(&mut self) {
self.watcher = WatcherEnum::None;
Copy link

@graingert graingert Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you override __del__ to issue a ResourceWarning if the watcher is not None?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but I'm not sure it's worth it - if you actually delete the instance the thread will be closed. So technically __del__ is just as correct as __exit__ or close.

Also RustNotify is not supposed to be used directly and both the public methods which use take care of clean up.

I think what we have here is sufficient.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if you actually delete the instance the thread will be closed.

Yes this is the purpose of ResourceWarning in a __del__ method

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pytest hooks into this GC/ResourceWarning system to fail the test suite if an explicit close is forgotten

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But omitting to call close is fine in general.

If you'd like to submit a PR, feel free.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from your warning suggestion on __del__, can you confirm if this works?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep this fixes it!

/tmp/tmphp91terl
watcher: INotifyWatcher { channel: Sender { .. }, waker: Waker { inner: Waker { fd: File { fd: 5, path: "anon_inode:[eventfd]", read: true, write: true } } } }
keyboard interrupt happened gen=<generator object watch at 0x7f2abdb68a50> is closed gen.gi_frame=None is unset but still we're listening

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection I think you're right that we should raise a resource warning, fuller answer below.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #166 to keep track of adding a resource warning. I think htis can be merged through since we shouldn't wait for pyo3 to resolve the original bug here.

}

pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, _traceback: PyObject) {
self.close();
}

pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("RustNotify({:#?})", self.watcher))
}
}

impl RustNotify {
fn clear(&self) {
self.changes.lock().unwrap().clear();
}
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ def watch(self, debounce_ms: int, step_ms: int, timeout_ms: int, cancel_event):
self.watch_count += 1
return change

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def close(self):
pass


if TYPE_CHECKING:
from typing import Literal, Protocol
Expand Down
8 changes: 7 additions & 1 deletion tests/test_rust_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ def test_add(test_dir: Path):
assert watcher.watch(200, 50, 500, None) == {(1, str(test_dir / 'new_file.txt'))}


def test_recommended_repr(test_dir: Path):
def test_close(test_dir: Path):
watcher = RustNotify([str(test_dir)], True, False, 0)
assert repr(watcher).startswith('RustNotify(Recommended(\n')

watcher.close()

assert repr(watcher) == 'RustNotify(None)'
with pytest.raises(RuntimeError, match='RustNotify watcher closed'):
watcher.watch(200, 50, 500, None)


def test_modify_write(test_dir: Path):
watcher = RustNotify([str(test_dir)], True, False, 0)
Expand Down
9 changes: 9 additions & 0 deletions tests/test_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ def watch(self, *args):
self.i += 1
return {(Change.added, 'spam.py')}

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def close(self):
pass


async def test_awatch_interrupt_raise(mocker, caplog):
mocker.patch('watchfiles.main.RustNotify', return_value=MockRustNotifyRaise())
Expand Down
28 changes: 26 additions & 2 deletions watchfiles/_rust_notify.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal, Optional, Protocol, Set, Tuple, Union
from typing import Any, List, Literal, Optional, Protocol, Set, Tuple, Union

__all__ = 'RustNotify', 'WatchfilesRustInternalError'

Expand All @@ -10,7 +10,7 @@ class AbstractEvent(Protocol):
class RustNotify:
"""
Interface to the Rust [notify](https://crates.io/crates/notify) crate which does
the heavy lifting of watching for file changes and grouping them into a single event.
the heavy lifting of watching for file changes and grouping them into events.
"""

def __init__(self, watch_paths: List[str], debug: bool, force_polling: bool, poll_delay_ms: int) -> None:
Expand Down Expand Up @@ -56,6 +56,30 @@ class RustNotify:
`'signal'` if a signal was received, `'stop'` if the `stop_event` was set,
or `'timeout'` if `timeout_ms` was exceeded.
"""
def __enter__(self) -> 'RustNotify':
"""
Does nothing, but allows `RustNotify` to be used as a context manager.

Note: the watching thead is created when an instance is initiated, not on
`__enter__`.
"""
def __exit__(self, *args: Any) -> None:
"""
Calls close.
"""
def close(self) -> None:
"""
Stops the watching thread. After `close` is called, the RustNotify instance can no
longer be used, calls to [`watch`][watchfiles.RustNotify.watch] will raise a `RuntimeError`.

Note: `close` is not required, just deleting the `RustNotify` instance will kill the thread
implicitly.

As per samuelcolvin/watchfiles#163 `close()` is only required because in the
event of an error, the traceback in `sys.exc_info` keeps a reference to `watchfiles.watch`'s
frame, so you can't rely on the `RustNotify` object being deleted, and thereby stopping
the watching thread.
"""

class WatchfilesRustInternalError(RuntimeError):
"""
Expand Down
96 changes: 48 additions & 48 deletions watchfiles/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,27 @@ def watch(
print(changes)
```
"""
watcher = RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms)
while True:
raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'signal':
if raise_interrupt:
raise KeyboardInterrupt
else:
logger.warning('KeyboardInterrupt caught, stopping watch')
with RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms) as watcher:
while True:
raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'signal':
if raise_interrupt:
raise KeyboardInterrupt
else:
logger.warning('KeyboardInterrupt caught, stopping watch')
return
elif raw_changes == 'stop':
return
elif raw_changes == 'stop':
return
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes


async def awatch( # noqa C901
Expand Down Expand Up @@ -214,35 +214,35 @@ async def stop_soon():
else:
stop_event_ = stop_event

watcher = RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms)
timeout = _calc_async_timeout(rust_timeout)
CancelledError = anyio.get_cancelled_exc_class()

while True:
async with anyio.create_task_group() as tg:
try:
raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
except (CancelledError, KeyboardInterrupt):
stop_event_.set()
# suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
raise
tg.cancel_scope.cancel()

if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
with RustNotify([str(p) for p in paths], debug, force_polling, poll_delay_ms) as watcher:
timeout = _calc_async_timeout(rust_timeout)
CancelledError = anyio.get_cancelled_exc_class()

while True:
async with anyio.create_task_group() as tg:
try:
raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
except (CancelledError, KeyboardInterrupt):
stop_event_.set()
# suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
raise
tg.cancel_scope.cancel()

if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'stop':
return
elif raw_changes == 'signal':
# in theory the watch thread should never get a signal
raise RuntimeError('watch thread unexpectedly received a signal')
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'stop':
return
elif raw_changes == 'signal':
# in theory the watch thread should never get a signal
raise RuntimeError('watch thread unexpectedly received a signal')
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes


def _prep_changes(
Expand Down