Skip to content

Commit

Permalink
Simplify implementation again
Browse files Browse the repository at this point in the history
  • Loading branch information
x42005e1f authored Dec 11, 2024
1 parent 640957f commit f245ffb
Showing 1 changed file with 6 additions and 23 deletions.
29 changes: 6 additions & 23 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,33 +509,26 @@ async def put(self, item: T) -> None:
parent = self._parent
parent._check_closing()
async with parent._async_not_full:
parent._sync_mutex.acquire()
locked = True
parent._get_loop() # check the event loop
try:
with parent._sync_mutex:
parent._get_loop() # check the event loop
if parent._maxsize > 0:
do_wait = True
while do_wait:
do_wait = parent._qsize() >= parent._maxsize
if do_wait:
parent._async_not_full_waiting += 1
locked = False
parent._sync_mutex.release()
try:
await parent._async_not_full.wait()
finally:
parent._sync_mutex.acquire()
locked = True
parent._async_not_full_waiting -= 1

parent._put_internal(item)
if parent._async_not_empty_waiting:
parent._async_not_empty.notify()
if parent._sync_not_empty_waiting:
parent._sync_not_empty.notify()
finally:
if locked:
parent._sync_mutex.release()

def put_nowait(self, item: T) -> None:
"""Put an item into the queue without blocking.
Expand Down Expand Up @@ -566,23 +559,19 @@ async def get(self) -> T:
parent = self._parent
parent._check_closing()
async with parent._async_not_empty:
parent._sync_mutex.acquire()
locked = True
parent._get_loop() # check the event loop
try:
with parent._sync_mutex:
parent._get_loop() # check the event loop
do_wait = True
while do_wait:
do_wait = parent._qsize() == 0

if do_wait:
parent._async_not_empty_waiting += 1
locked = False
parent._sync_mutex.release()
try:
await parent._async_not_empty.wait()
finally:
parent._sync_mutex.acquire()
locked = True
parent._async_not_empty_waiting -= 1

item = parent._get()
Expand All @@ -591,9 +580,6 @@ async def get(self) -> T:
if parent._sync_not_full_waiting:
parent._sync_not_full.notify()
return item
finally:
if locked:
parent._sync_mutex.release()

def get_nowait(self) -> T:
"""Remove and return an item from the queue.
Expand Down Expand Up @@ -655,9 +641,8 @@ async def join(self) -> None:
parent = self._parent
parent._check_closing()
async with parent._async_tasks_done:
parent._sync_mutex.acquire()
parent._get_loop() # check the event loop
try:
with parent._sync_mutex:
parent._get_loop() # check the event loop
while parent._unfinished_tasks:
parent._async_tasks_done_waiting += 1
parent._sync_mutex.release()
Expand All @@ -667,8 +652,6 @@ async def join(self) -> None:
parent._sync_mutex.acquire()
parent._async_tasks_done_waiting -= 1
parent._check_closing()
finally:
parent._sync_mutex.release()


class PriorityQueue(Queue[T]):
Expand Down

0 comments on commit f245ffb

Please sign in to comment.