Skip to content

Commit

Permalink
[Bug Fix] Fixing issue with reopenning connection on consumers (#103)
Browse files Browse the repository at this point in the history
* Improved logic to create new connection instance on running consumer thread
  • Loading branch information
NeonKirill authored Dec 5, 2024
1 parent 77b6d76 commit ab771ee
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
52 changes: 33 additions & 19 deletions neon_mq_connector/consumers/blocking_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,22 @@ def __init__(self, connection_params: pika.ConnectionParameters,
threading.Thread.__init__(self, *args, **kwargs)
self._is_consuming = False # annotates that ConsumerThread is running
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self.connection = pika.BlockingConnection(connection_params)

self.callback_func = callback_func
self.error_func = error_func
self.auto_ack = auto_ack

self.exchange = exchange or ''
self.exchange_type = exchange_type or ExchangeType.direct
self.exchange_reset = exchange_reset

self.queue = queue or ''
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=50)
if queue_reset:
self.channel.queue_delete(queue=self.queue)
declared_queue = self.channel.queue_declare(queue=self.queue,
auto_delete=False,
exclusive=queue_exclusive)
if self.exchange:
if exchange_reset:
self.channel.exchange_delete(exchange=self.exchange)
self.channel.exchange_declare(exchange=self.exchange,
exchange_type=self.exchange_type,
auto_delete=False)
self.channel.queue_bind(queue=declared_queue.method.queue,
exchange=self.exchange)
self.channel.basic_consume(on_message_callback=self.callback_func,
queue=self.queue,
auto_ack=auto_ack)
self.queue_reset = queue_reset
self.queue_exclusive = queue_exclusive

self.connection_params = connection_params
self.connection = None
self.channel = None

@property
def is_consumer_alive(self) -> bool:
Expand All @@ -111,6 +103,7 @@ def run(self):
if not self._is_consuming:
try:
super(BlockingConsumerThread, self).run()
self._create_connection()
self._is_consuming = True
self.channel.start_consuming()
except Exception as e:
Expand All @@ -122,6 +115,27 @@ def run(self):
else:
self.error_func(self, e)

def _create_connection(self):
self.connection = pika.BlockingConnection(self.connection_params)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=50)
if self.queue_reset:
self.channel.queue_delete(queue=self.queue)
declared_queue = self.channel.queue_declare(queue=self.queue,
auto_delete=False,
exclusive=self.queue_exclusive)
if self.exchange:
if self.exchange_reset:
self.channel.exchange_delete(exchange=self.exchange)
self.channel.exchange_declare(exchange=self.exchange,
exchange_type=self.exchange_type,
auto_delete=False)
self.channel.queue_bind(queue=declared_queue.method.queue,
exchange=self.exchange)
self.channel.basic_consume(on_message_callback=self.callback_func,
queue=self.queue,
auto_ack=self.auto_ack)

def join(self, timeout: Optional[float] = None) -> None:
"""Terminating consumer channel"""
if self._is_consumer_alive:
Expand Down
3 changes: 2 additions & 1 deletion neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(self,
self.queue_reset = queue_reset
self.exchange_reset = exchange_reset

self.connection = self.create_connection()
self.connection = None
self.connection_failed_attempts = 0
self.max_connection_failed_attempts = 3

Expand Down Expand Up @@ -188,6 +188,7 @@ def run(self):
if not self.is_consuming:
try:
super(SelectConsumerThread, self).run()
self.connection = self.create_connection()
self._is_consuming = True
self.connection.ioloop.start()
except Exception as e:
Expand Down

0 comments on commit ab771ee

Please sign in to comment.