diff --git a/neon_mq_connector/consumers/select_consumer.py b/neon_mq_connector/consumers/select_consumer.py index e00e3e2..4f51628 100644 --- a/neon_mq_connector/consumers/select_consumer.py +++ b/neon_mq_connector/consumers/select_consumer.py @@ -78,9 +78,13 @@ def __init__(self, threading.Thread.__init__(self, *args, **kwargs) try: get_event_loop() - except RuntimeError: - set_event_loop(new_event_loop()) + except RuntimeError as e: + LOG.warning(e) + loop = new_event_loop() + set_event_loop(loop) + loop.run_until_complete(self._wait_until_exit) self._consumer_started = Event() # annotates that ConsumerThread is running + self._exit_event = Event() self._channel_closed = threading.Event() self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated self._stopping = False @@ -101,6 +105,9 @@ def __init__(self, self.connection_failed_attempts = 0 self.max_connection_failed_attempts = 3 + async def _wait_until_exit(self): + await self._exit_event.wait() + def create_connection(self) -> pika.SelectConnection: return pika.SelectConnection(parameters=self.connection_params, on_open_callback=self.on_connected, @@ -207,6 +214,11 @@ def is_consuming(self) -> bool: def run(self): """Starting connection io loop """ + try: + get_event_loop() + except RuntimeError as e: + LOG.warning(e) + set_event_loop(new_event_loop()) if not self.is_consuming: try: self.connection: pika.SelectConnection = self.create_connection() @@ -265,6 +277,7 @@ def join(self, timeout: Optional[float] = None) -> None: """Terminating consumer channel""" if self.is_consumer_alive: self._close_connection(mark_consumer_as_dead=True) + self._exit_event.set() LOG.info(f"Stopped consumer. Waiting up to {timeout}s for thread to terminate.") try: super().join(timeout=timeout)