Skip to content

Commit

Permalink
Update event loop handling to ensure a maximum of one loop per Select…
Browse files Browse the repository at this point in the history
…Consumer

Ensure any created IOLoops are stopped when the thread is joined
  • Loading branch information
NeonDaniel committed Jan 16, 2025
1 parent f01d50a commit 818fd22
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,19 @@ def __init__(self,
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)

# Use an available event loop, else create a new one for this consumer
try:
get_event_loop()
self._loop = get_event_loop()
self.__stop_loop_on_exit = False
except RuntimeError as e:
LOG.warning(e)
loop = new_event_loop()
set_event_loop(loop)
loop.run_forever()
LOG.info(e)
self._loop = new_event_loop()
set_event_loop(self._loop)
self._loop.run_forever()
self.__stop_loop_on_exit = True

self._consumer_started = Event() # annotates that ConsumerThread is running
# self._exit_event = Event()
self._channel_closed = threading.Event()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self._stopping = False
Expand All @@ -105,9 +109,6 @@ def __init__(self,
self.connection_failed_attempts = 0
self.max_connection_failed_attempts = 3

# async def _wait_until_exit(self):
# await self._exit_event.wait()

def create_connection(self) -> pika.SelectConnection:
return pika.SelectConnection(parameters=self.connection_params,
on_open_callback=self.on_connected,
Expand Down Expand Up @@ -213,14 +214,11 @@ def is_consuming(self) -> bool:
return self._consumer_started.is_set()

def run(self):
"""Starting connection io loop """
try:
get_event_loop()
except RuntimeError as e:
LOG.warning(e)
loop = new_event_loop()
set_event_loop(loop)
loop.run_forever()
"""
Starting connection io loop
"""
# Ensure there is an event loop in this thread
set_event_loop(self._loop)
if not self.is_consuming:
try:
self.connection: pika.SelectConnection = self.create_connection()
Expand Down Expand Up @@ -279,7 +277,11 @@ def join(self, timeout: Optional[float] = None) -> None:
"""Terminating consumer channel"""
if self.is_consumer_alive:
self._close_connection(mark_consumer_as_dead=True)
# self._exit_event.set()
try:
if self.__stop_loop_on_exit:
self._loop.stop()
except Exception as e:
LOG.error(f"failed to stop ioloop: {e}")
LOG.info(f"Stopped consumer. Waiting up to {timeout}s for thread to terminate.")
try:
super().join(timeout=timeout)
Expand Down

0 comments on commit 818fd22

Please sign in to comment.