From 996a07528328ee48b2a5e9bfebd10c1c49bdd035 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 22 Oct 2024 14:58:10 +0200 Subject: [PATCH] fix: handle QueueFull exception in file watch (#342) Signed-off-by: Alex --- .../eda/plugins/event_source/file_watch.py | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/extensions/eda/plugins/event_source/file_watch.py b/extensions/eda/plugins/event_source/file_watch.py index 8c2d499b..e7478072 100644 --- a/extensions/eda/plugins/event_source/file_watch.py +++ b/extensions/eda/plugins/event_source/file_watch.py @@ -41,47 +41,55 @@ def __init__(self: "Handler", **kwargs: Any) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) def on_created(self: "Handler", event: FileSystemEvent) -> None: - loop.call_soon_threadsafe( - queue.put_nowait, - { - "change": "created", - "src_path": event.src_path, - "type": event.__class__.__name__, - "root_path": root_path, - }, + asyncio.run_coroutine_threadsafe( + queue.put( + { + "change": "created", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, + ), + loop, ) def on_deleted(self: "Handler", event: FileSystemEvent) -> None: - loop.call_soon_threadsafe( - queue.put_nowait, - { - "change": "deleted", - "src_path": event.src_path, - "type": event.__class__.__name__, - "root_path": root_path, - }, + asyncio.run_coroutine_threadsafe( + queue.put( + { + "change": "deleted", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, + ), + loop, ) def on_modified(self: "Handler", event: FileSystemEvent) -> None: - loop.call_soon_threadsafe( - queue.put_nowait, - { - "change": "modified", - "src_path": event.src_path, - "type": event.__class__.__name__, - "root_path": root_path, - }, + asyncio.run_coroutine_threadsafe( + queue.put( + { + "change": "modified", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, + ), + loop, ) def on_moved(self: "Handler", event: FileSystemEvent) -> None: - loop.call_soon_threadsafe( - queue.put_nowait, - { - "change": "moved", - "src_path": event.src_path, - "type": event.__class__.__name__, - "root_path": root_path, - }, + asyncio.run_coroutine_threadsafe( + queue.put( + { + "change": "moved", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, + ), + loop, ) observer = Observer()