From 68e7cd87a1360b315942133c29eb9b644e599832 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 13 Feb 2023 07:45:50 -0800 Subject: [PATCH 1/2] Upgrade to isort 5.12.0 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9f936ce8d..95adc999d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort args: ["--settings-path=pyproject.toml"] From 3363bc49ab03af47ef7cf4e897660a810470188a Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 13 Feb 2023 07:46:44 -0800 Subject: [PATCH 2/2] Prevent "coroutine was never awaited" warnings --- ucp/continuous_ucx_progress.py | 9 +++++---- ucp/core.py | 12 ++++++++++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/ucp/continuous_ucx_progress.py b/ucp/continuous_ucx_progress.py index 2bf27c85a..2ea96f443 100644 --- a/ucp/continuous_ucx_progress.py +++ b/ucp/continuous_ucx_progress.py @@ -25,10 +25,6 @@ def __init__(self, worker, event_loop): self.event_loop = event_loop self.asyncio_task = None - def __del__(self): - if self.asyncio_task is not None: - self.asyncio_task.cancel() - # Hash and equality is based on the event loop def __hash__(self): return hash(self.event_loop) @@ -83,6 +79,11 @@ def _fd_reader_callback(self): # Notice, we can safely overwrite `self.dangling_arm_task` # since previous arm task is finished by now. assert self.asyncio_task is None or self.asyncio_task.done() + from .core import has_context_referrers + + if not has_context_referrers(): + self.asyncio_task = None + return self.asyncio_task = self.event_loop.create_task(self._arm_worker()) async def _arm_worker(self): diff --git a/ucp/core.py b/ucp/core.py index 6f5ddf3c0..20d8836ea 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -205,7 +205,7 @@ class ApplicationContext: """ def __init__(self, config_dict={}, blocking_progress_mode=None): - self.progress_tasks = [] + self.progress_tasks = dict() # For now, a application context only has one worker self.context = ucx_api.UCXContext(config_dict) @@ -407,7 +407,7 @@ def continuous_ucx_progress(self, event_loop=None): task = BlockingMode(self.worker, loop, self.epoll_fd) else: task = NonBlockingMode(self.worker, loop) - self.progress_tasks.append(task) + self.progress_tasks[loop] = task def get_ucp_worker(self): """Returns the underlying UCP worker handle (ucp_worker_h) @@ -926,6 +926,14 @@ def init(options={}, env_takes_precedence=False, blocking_progress_mode=None): _ctx = ApplicationContext(options, blocking_progress_mode=blocking_progress_mode) +def has_context_referrers(): + global _ctx + if _ctx is not None: + weakref_ctx = weakref.ref(_ctx) + # gc.collect() + return weakref_ctx() is None + + def reset(): """Resets the UCX library by shutting down all of UCX.