diff --git a/neon_mq_connector/consumers/blocking_consumer.py b/neon_mq_connector/consumers/blocking_consumer.py index f6b6a75..87a3e11 100644 --- a/neon_mq_connector/consumers/blocking_consumer.py +++ b/neon_mq_connector/consumers/blocking_consumer.py @@ -73,30 +73,22 @@ def __init__(self, connection_params: pika.ConnectionParameters, threading.Thread.__init__(self, *args, **kwargs) self._is_consuming = False # annotates that ConsumerThread is running self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated - self.connection = pika.BlockingConnection(connection_params) + self.callback_func = callback_func self.error_func = error_func + self.auto_ack = auto_ack + self.exchange = exchange or '' self.exchange_type = exchange_type or ExchangeType.direct + self.exchange_reset = exchange_reset + self.queue = queue or '' - self.channel = self.connection.channel() - self.channel.basic_qos(prefetch_count=50) - if queue_reset: - self.channel.queue_delete(queue=self.queue) - declared_queue = self.channel.queue_declare(queue=self.queue, - auto_delete=False, - exclusive=queue_exclusive) - if self.exchange: - if exchange_reset: - self.channel.exchange_delete(exchange=self.exchange) - self.channel.exchange_declare(exchange=self.exchange, - exchange_type=self.exchange_type, - auto_delete=False) - self.channel.queue_bind(queue=declared_queue.method.queue, - exchange=self.exchange) - self.channel.basic_consume(on_message_callback=self.callback_func, - queue=self.queue, - auto_ack=auto_ack) + self.queue_reset = queue_reset + self.queue_exclusive = queue_exclusive + + self.connection_params = connection_params + self.connection = None + self.channel = None @property def is_consumer_alive(self) -> bool: @@ -111,6 +103,7 @@ def run(self): if not self._is_consuming: try: super(BlockingConsumerThread, self).run() + self._create_connection() self._is_consuming = True self.channel.start_consuming() except Exception as e: @@ -122,6 +115,27 @@ def run(self): else: self.error_func(self, e) + def _create_connection(self): + self.connection = pika.BlockingConnection(self.connection_params) + self.channel = self.connection.channel() + self.channel.basic_qos(prefetch_count=50) + if self.queue_reset: + self.channel.queue_delete(queue=self.queue) + declared_queue = self.channel.queue_declare(queue=self.queue, + auto_delete=False, + exclusive=self.queue_exclusive) + if self.exchange: + if self.exchange_reset: + self.channel.exchange_delete(exchange=self.exchange) + self.channel.exchange_declare(exchange=self.exchange, + exchange_type=self.exchange_type, + auto_delete=False) + self.channel.queue_bind(queue=declared_queue.method.queue, + exchange=self.exchange) + self.channel.basic_consume(on_message_callback=self.callback_func, + queue=self.queue, + auto_ack=self.auto_ack) + def join(self, timeout: Optional[float] = None) -> None: """Terminating consumer channel""" if self._is_consumer_alive: diff --git a/neon_mq_connector/consumers/select_consumer.py b/neon_mq_connector/consumers/select_consumer.py index 2109ef0..79e278f 100644 --- a/neon_mq_connector/consumers/select_consumer.py +++ b/neon_mq_connector/consumers/select_consumer.py @@ -89,7 +89,7 @@ def __init__(self, self.queue_reset = queue_reset self.exchange_reset = exchange_reset - self.connection = self.create_connection() + self.connection = None self.connection_failed_attempts = 0 self.max_connection_failed_attempts = 3 @@ -188,6 +188,7 @@ def run(self): if not self.is_consuming: try: super(SelectConsumerThread, self).run() + self.connection = self.create_connection() self._is_consuming = True self.connection.ioloop.start() except Exception as e: