diff --git a/janus/__init__.py b/janus/__init__.py index 3126bde..ad3d204 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -509,23 +509,19 @@ async def put(self, item: T) -> None: parent = self._parent parent._check_closing() async with parent._async_not_full: - parent._sync_mutex.acquire() - locked = True - parent._get_loop() # check the event loop - try: + with parent._sync_mutex: + parent._get_loop() # check the event loop if parent._maxsize > 0: do_wait = True while do_wait: do_wait = parent._qsize() >= parent._maxsize if do_wait: parent._async_not_full_waiting += 1 - locked = False parent._sync_mutex.release() try: await parent._async_not_full.wait() finally: parent._sync_mutex.acquire() - locked = True parent._async_not_full_waiting -= 1 parent._put_internal(item) @@ -533,9 +529,6 @@ async def put(self, item: T) -> None: parent._async_not_empty.notify() if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() - finally: - if locked: - parent._sync_mutex.release() def put_nowait(self, item: T) -> None: """Put an item into the queue without blocking. @@ -566,23 +559,19 @@ async def get(self) -> T: parent = self._parent parent._check_closing() async with parent._async_not_empty: - parent._sync_mutex.acquire() - locked = True - parent._get_loop() # check the event loop - try: + with parent._sync_mutex: + parent._get_loop() # check the event loop do_wait = True while do_wait: do_wait = parent._qsize() == 0 if do_wait: parent._async_not_empty_waiting += 1 - locked = False parent._sync_mutex.release() try: await parent._async_not_empty.wait() finally: parent._sync_mutex.acquire() - locked = True parent._async_not_empty_waiting -= 1 item = parent._get() @@ -591,9 +580,6 @@ async def get(self) -> T: if parent._sync_not_full_waiting: parent._sync_not_full.notify() return item - finally: - if locked: - parent._sync_mutex.release() def get_nowait(self) -> T: """Remove and return an item from the queue. @@ -655,9 +641,8 @@ async def join(self) -> None: parent = self._parent parent._check_closing() async with parent._async_tasks_done: - parent._sync_mutex.acquire() - parent._get_loop() # check the event loop - try: + with parent._sync_mutex: + parent._get_loop() # check the event loop while parent._unfinished_tasks: parent._async_tasks_done_waiting += 1 parent._sync_mutex.release() @@ -667,8 +652,6 @@ async def join(self) -> None: parent._sync_mutex.acquire() parent._async_tasks_done_waiting -= 1 parent._check_closing() - finally: - parent._sync_mutex.release() class PriorityQueue(Queue[T]):