diff --git a/neon_mq_connector/consumers/select_consumer.py b/neon_mq_connector/consumers/select_consumer.py index a808a0b..221f31c 100644 --- a/neon_mq_connector/consumers/select_consumer.py +++ b/neon_mq_connector/consumers/select_consumer.py @@ -76,6 +76,10 @@ def __init__(self, to learn more about different exchanges """ threading.Thread.__init__(self, *args, **kwargs) + try: + self._io_loop = get_event_loop() + except RuntimeError: + self._io_loop = new_event_loop() self._consumer_started = Event() # annotates that ConsumerThread is running self._channel_closed = threading.Event() self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated @@ -98,15 +102,11 @@ def __init__(self, self.max_connection_failed_attempts = 3 def create_connection(self) -> pika.SelectConnection: - try: - io_loop = get_event_loop() - except RuntimeError: - io_loop = new_event_loop() return pika.SelectConnection(parameters=self.connection_params, on_open_callback=self.on_connected, on_open_error_callback=self.on_connection_fail, on_close_callback=self.on_close, - custom_ioloop=io_loop) + custom_ioloop=self._io_loop) def on_connected(self, _): """Called when we are fully connected to RabbitMQ"""