diff --git a/watcher-py/watcher/__init__.py b/watcher-py/watcher/__init__.py index a12142df..117214fc 100644 --- a/watcher-py/watcher/__init__.py +++ b/watcher-py/watcher/__init__.py @@ -1 +1,5 @@ +""" +Filesystem watcher. Simple, efficient and friendly. +""" + from .watcher import Watch, _CEvent diff --git a/watcher-py/watcher/watcher.py b/watcher-py/watcher/watcher.py index 582604e6..c26bd314 100644 --- a/watcher-py/watcher/watcher.py +++ b/watcher-py/watcher/watcher.py @@ -1,3 +1,7 @@ +""" +Filesystem watcher. Simple, efficient and friendly. +""" + from __future__ import annotations import ctypes import os @@ -10,6 +14,7 @@ _LIB: ctypes.CDLL | None = None +# pylint: disable=too-few-public-methods class _CEvent(ctypes.Structure): _fields_ = [ ("path_name", ctypes.c_char_p), @@ -25,24 +30,25 @@ class _CEvent(ctypes.Structure): def _lazy_static_solib_handle() -> ctypes.CDLL: def native_solib_file_ending(): - match os.uname().sysname: - case "Darwin": - return "dylib" - case "Windows": - return "dll" - case _: - return "so" + sysname = os.uname().sysname + if sysname == "Darwin": + return "dylib" + if sysname == "Windows": + return "dll" + return "so" def libcwatcher_lib_path(): - version = "0.11.0" # hook: tool/release + version = "0.11.0" # hook: tool/release heredir = os.path.dirname(os.path.abspath(__file__)) dir_path = os.path.join(heredir, ".watcher.mesonpy.libs") lib_name = f"libcwatcher-{version}.{native_solib_file_ending()}" lib_path = os.path.join(dir_path, lib_name) if not os.path.exists(lib_path): - raise RuntimeError(f"Could not find '{lib_path}', did the install dir change?") + raise RuntimeError(f"Library does not exist: '{lib_path}'") return lib_path + # Resource is necessarily dynamic, mutable and bound to the lifetime of the program. + # pylint: disable=global-statement global _LIB if _LIB is None: _LIB = ctypes.CDLL(libcwatcher_lib_path()) @@ -56,12 +62,13 @@ def libcwatcher_lib_path(): def _as_utf8(s: str | bytes | memoryview | None) -> str: if s is None: return "" - elif isinstance(s, str): + if isinstance(s, str): return s - elif isinstance(s, memoryview): + if isinstance(s, memoryview): return s.tobytes().decode("utf-8") - else: + if isinstance(s, bytes): return s.decode("utf-8") + raise TypeError() def _c_event_to_event(c_event: _CEvent) -> Event: @@ -74,6 +81,10 @@ def _c_event_to_event(c_event: _CEvent) -> Event: class EffectType(Enum): + """ + The effect observed on a path. + """ + RENAME = 0 MODIFY = 1 CREATE = 2 @@ -83,6 +94,13 @@ class EffectType(Enum): class PathType(Enum): + """ + The type of a path as it was observed when the effect happened. + The `watcher` case is special. Commonly used to report errors, + warnings and important status updated (like when the watcher + first begins watching and when it stops). + """ + DIR = 0 FILE = 1 HARD_LINK = 2 @@ -93,6 +111,10 @@ class PathType(Enum): @dataclass class Event: + """ + Represents an event witnessed on the filesystem. + """ + path_name: str effect_type: EffectType path_type: PathType @@ -101,7 +123,24 @@ class Event: class Watch: + """ + Filesystem watcher. + Begins watching when constructed. + Stops when the context manager exits (preferred to use this way). + Or when `close`, del or deinit happens, but you don't need to do that. + Example usage: + ```python + with watcher.Watch(os.path.expanduser("~"), print) as _: + input() + ``` + """ + def __init__(self, path: str, callback: Callable[[Event], None]): + """ + - `path`: The path to watch. + - `callback`: Called when events happen. + """ + def callback_bridge(c_event: _CEvent, _) -> None: py_event = _c_event_to_event(c_event) callback(py_event) @@ -111,12 +150,13 @@ def callback_bridge(c_event: _CEvent, _) -> None: self._c_callback = _CCallback(callback_bridge) self._watcher = self._lib.wtr_watcher_open(self._path, self._c_callback, None) if not self._watcher: - if os.path.exists(path): - raise RuntimeError(f"No such path: {path}") - else: - raise RuntimeError("Internal error while opening a watcher") + raise RuntimeError("Failed to open a watcher") def close(self): + """ + You can call this manually (not required) to close the watcher. + Preferred to use a context manager, like in the example. + """ if self._watcher: self._lib.wtr_watcher_close(self._watcher) self._watcher = None @@ -135,6 +175,6 @@ def __exit__(self, *_): if __name__ == "__main__": - path = sys.argv[1] if len(sys.argv) > 1 else "." - with Watch(path, print) as watcher: + events_at = sys.argv[1] if len(sys.argv) > 1 else "." + with Watch(events_at, print) as _: input()