From 818fd22b19b60f879cac3c5f1ac34b1ba70db5e5 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Thu, 16 Jan 2025 08:43:44 -0800 Subject: [PATCH] Update event loop handling to ensure a maximum of one loop per SelectConsumer Ensure any created IOLoops are stopped when the thread is joined --- .../consumers/select_consumer.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/neon_mq_connector/consumers/select_consumer.py b/neon_mq_connector/consumers/select_consumer.py index 2673f61..5f8c588 100644 --- a/neon_mq_connector/consumers/select_consumer.py +++ b/neon_mq_connector/consumers/select_consumer.py @@ -76,15 +76,19 @@ def __init__(self, to learn more about different exchanges """ threading.Thread.__init__(self, *args, **kwargs) + + # Use an available event loop, else create a new one for this consumer try: - get_event_loop() + self._loop = get_event_loop() + self.__stop_loop_on_exit = False except RuntimeError as e: - LOG.warning(e) - loop = new_event_loop() - set_event_loop(loop) - loop.run_forever() + LOG.info(e) + self._loop = new_event_loop() + set_event_loop(self._loop) + self._loop.run_forever() + self.__stop_loop_on_exit = True + self._consumer_started = Event() # annotates that ConsumerThread is running - # self._exit_event = Event() self._channel_closed = threading.Event() self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated self._stopping = False @@ -105,9 +109,6 @@ def __init__(self, self.connection_failed_attempts = 0 self.max_connection_failed_attempts = 3 - # async def _wait_until_exit(self): - # await self._exit_event.wait() - def create_connection(self) -> pika.SelectConnection: return pika.SelectConnection(parameters=self.connection_params, on_open_callback=self.on_connected, @@ -213,14 +214,11 @@ def is_consuming(self) -> bool: return self._consumer_started.is_set() def run(self): - """Starting connection io loop """ - try: - get_event_loop() - except RuntimeError as e: - LOG.warning(e) - loop = new_event_loop() - set_event_loop(loop) - loop.run_forever() + """ + Starting connection io loop + """ + # Ensure there is an event loop in this thread + set_event_loop(self._loop) if not self.is_consuming: try: self.connection: pika.SelectConnection = self.create_connection() @@ -279,7 +277,11 @@ def join(self, timeout: Optional[float] = None) -> None: """Terminating consumer channel""" if self.is_consumer_alive: self._close_connection(mark_consumer_as_dead=True) - # self._exit_event.set() + try: + if self.__stop_loop_on_exit: + self._loop.stop() + except Exception as e: + LOG.error(f"failed to stop ioloop: {e}") LOG.info(f"Stopped consumer. Waiting up to {timeout}s for thread to terminate.") try: super().join(timeout=timeout)