Skip to content

Commit

Permalink
watcher-py: pylint
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Jul 14, 2024
1 parent 069132c commit 0621cc4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 18 deletions.
4 changes: 4 additions & 0 deletions watcher-py/watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
"""
Filesystem watcher. Simple, efficient and friendly.
"""

from .watcher import Watch, _CEvent
76 changes: 58 additions & 18 deletions watcher-py/watcher/watcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Filesystem watcher. Simple, efficient and friendly.
"""

from __future__ import annotations
import ctypes
import os
Expand All @@ -10,6 +14,7 @@
_LIB: ctypes.CDLL | None = None


# pylint: disable=too-few-public-methods
class _CEvent(ctypes.Structure):
_fields_ = [
("path_name", ctypes.c_char_p),
Expand All @@ -25,24 +30,25 @@ class _CEvent(ctypes.Structure):

def _lazy_static_solib_handle() -> ctypes.CDLL:
def native_solib_file_ending():
match os.uname().sysname:
case "Darwin":
return "dylib"
case "Windows":
return "dll"
case _:
return "so"
sysname = os.uname().sysname
if sysname == "Darwin":
return "dylib"
if sysname == "Windows":
return "dll"
return "so"

def libcwatcher_lib_path():
version = "0.11.0" # hook: tool/release
version = "0.11.0" # hook: tool/release
heredir = os.path.dirname(os.path.abspath(__file__))
dir_path = os.path.join(heredir, ".watcher.mesonpy.libs")
lib_name = f"libcwatcher-{version}.{native_solib_file_ending()}"
lib_path = os.path.join(dir_path, lib_name)
if not os.path.exists(lib_path):
raise RuntimeError(f"Could not find '{lib_path}', did the install dir change?")
raise RuntimeError(f"Library does not exist: '{lib_path}'")
return lib_path

# Resource is necessarily dynamic, mutable and bound to the lifetime of the program.
# pylint: disable=global-statement
global _LIB
if _LIB is None:
_LIB = ctypes.CDLL(libcwatcher_lib_path())
Expand All @@ -56,12 +62,13 @@ def libcwatcher_lib_path():
def _as_utf8(s: str | bytes | memoryview | None) -> str:
if s is None:
return ""
elif isinstance(s, str):
if isinstance(s, str):
return s
elif isinstance(s, memoryview):
if isinstance(s, memoryview):
return s.tobytes().decode("utf-8")
else:
if isinstance(s, bytes):
return s.decode("utf-8")
raise TypeError()


def _c_event_to_event(c_event: _CEvent) -> Event:
Expand All @@ -74,6 +81,10 @@ def _c_event_to_event(c_event: _CEvent) -> Event:


class EffectType(Enum):
"""
The effect observed on a path.
"""

RENAME = 0
MODIFY = 1
CREATE = 2
Expand All @@ -83,6 +94,13 @@ class EffectType(Enum):


class PathType(Enum):
"""
The type of a path as it was observed when the effect happened.
The `watcher` case is special. Commonly used to report errors,
warnings and important status updated (like when the watcher
first begins watching and when it stops).
"""

DIR = 0
FILE = 1
HARD_LINK = 2
Expand All @@ -93,6 +111,10 @@ class PathType(Enum):

@dataclass
class Event:
"""
Represents an event witnessed on the filesystem.
"""

path_name: str
effect_type: EffectType
path_type: PathType
Expand All @@ -101,7 +123,24 @@ class Event:


class Watch:
"""
Filesystem watcher.
Begins watching when constructed.
Stops when the context manager exits (preferred to use this way).
Or when `close`, del or deinit happens, but you don't need to do that.
Example usage:
```python
with watcher.Watch(os.path.expanduser("~"), print) as _:
input()
```
"""

def __init__(self, path: str, callback: Callable[[Event], None]):
"""
- `path`: The path to watch.
- `callback`: Called when events happen.
"""

def callback_bridge(c_event: _CEvent, _) -> None:
py_event = _c_event_to_event(c_event)
callback(py_event)
Expand All @@ -111,12 +150,13 @@ def callback_bridge(c_event: _CEvent, _) -> None:
self._c_callback = _CCallback(callback_bridge)
self._watcher = self._lib.wtr_watcher_open(self._path, self._c_callback, None)
if not self._watcher:
if os.path.exists(path):
raise RuntimeError(f"No such path: {path}")
else:
raise RuntimeError("Internal error while opening a watcher")
raise RuntimeError("Failed to open a watcher")

def close(self):
"""
You can call this manually (not required) to close the watcher.
Preferred to use a context manager, like in the example.
"""
if self._watcher:
self._lib.wtr_watcher_close(self._watcher)
self._watcher = None
Expand All @@ -135,6 +175,6 @@ def __exit__(self, *_):


if __name__ == "__main__":
path = sys.argv[1] if len(sys.argv) > 1 else "."
with Watch(path, print) as watcher:
events_at = sys.argv[1] if len(sys.argv) > 1 else "."
with Watch(events_at, print) as _:
input()

0 comments on commit 0621cc4

Please sign in to comment.