Skip to content

Commit

Permalink
Removed non-working tests and added pika-based Async Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
kirgrim committed Nov 24, 2024
1 parent 5270904 commit 3bdabcc
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 152 deletions.
167 changes: 167 additions & 0 deletions neon_mq_connector/async_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import threading

from typing import Optional

import pika.exceptions
from ovos_utils import LOG
from pika.exchange_type import ExchangeType

from neon_mq_connector.utils import consumer_utils


class ConsumerThread(threading.Thread):

# retry to handle connection failures in case MQ server is still starting
def __init__(self,
connection_params: pika.ConnectionParameters,
queue: str, callback_func: callable,
error_func: callable = consumer_utils.default_error_handler,
auto_ack: bool = True,
queue_reset: bool = False,
queue_exclusive: bool = False,
exchange: Optional[str] = None,
exchange_reset: bool = False,
exchange_type: str = ExchangeType.direct,
*args, **kwargs):
"""
Rabbit MQ Consumer class that aims at providing unified configurable
interface for consumer threads
:param connection_params: pika connection parameters
:param queue: Desired consuming queue
:param callback_func: logic on message receiving
:param error_func: handler for consumer thread errors
:param auto_ack: Boolean to enable ack of messages upon receipt
:param queue_reset: If True, delete an existing queue `queue`
:param queue_exclusive: Marks declared queue as exclusive
to a given channel (deletes with it)
:param exchange: exchange to bind queue to (optional)
:param exchange_reset: If True, delete an existing exchange `exchange`
:param exchange_type: type of exchange to bind to from ExchangeType
(defaults to direct)
follow: https://www.rabbitmq.com/tutorials/amqp-concepts.html
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)
self._is_consuming = False # annotates that ConsumerThread is running
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self.callback_func = callback_func
self.error_func = error_func
self.exchange = exchange or ''
self.exchange_type = exchange_type or ExchangeType.direct
self.queue = queue or ''
self.channel = None
self.queue_exclusive = queue_exclusive
self.auto_ack = auto_ack

self.queue_reset = queue_reset
self.exchange_reset = exchange_reset

self.connection = pika.SelectConnection(parameters=connection_params,
on_open_callback=self.on_connected,
on_close_callback=self.on_close,)

def on_connected(self, _):
"""Called when we are fully connected to RabbitMQ"""
self.connection.channel(on_open_callback=self.on_channel_open)

def on_channel_open(self, new_channel):
"""Called when our channel has opened"""
self.channel = new_channel
if self.queue_reset:
self.channel.queue_delete(queue=self.queue,
callback=self.declare_queue)
else:
self.declare_queue()

def declare_queue(self, _unused_frame = None):
return self.channel.queue_declare(queue=self.queue,
exclusive=self.queue_exclusive,
auto_delete=False,
callback=self.on_queue_declared)

def setup_exchange(self):
if self.exchange_reset:
self.channel.exchange_delete(exchange=self.exchange, callback=self.declare_exchange)
else:
self.declare_exchange()

def declare_exchange(self, _unused_frame = None):
self.channel.exchange_declare(exchange=self.exchange,
exchange_type=self.exchange_type,
auto_delete=False,
callback=self.bind_exchange_to_queue)

def bind_exchange_to_queue(self, _unused_frame = None):
try:
self.channel.queue_bind(
queue=self.queue,
exchange=self.exchange,
callback=self.set_qos
)
except Exception as e:
LOG.error(f"Error binding queue '{self.queue}' to exchange '{self.exchange}': {e}")

def on_queue_declared(self, _unused_frame = None):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
if self.exchange:
self.setup_exchange()
else:
self.set_qos()

def set_qos(self, _unused_frame = None):
self.channel.basic_qos(prefetch_count=50, callback=self.start_consuming)

def start_consuming(self, _unused_frame = None):
self.channel.basic_consume(queue=self.queue,
on_message_callback=self.on_message,
auto_ack=self.auto_ack)

def on_message(self, channel, method, properties, body):
try:
self.callback_func(channel, method, properties, body)
except Exception as e:
self.error_func(self, e)

def on_close(self, connection, exception):
self.connection.ioloop.stop()

@property
def is_consumer_alive(self) -> bool:
return self._is_consumer_alive

@property
def is_consuming(self) -> bool:
return self._is_consuming

def run(self):
"""Starting connnection io loop """
if not self._is_consuming:
try:
super(ConsumerThread, self).run()
self._is_consuming = True
self.connection.ioloop.start()
except Exception as e:
self._is_consuming = False
if isinstance(e, pika.exceptions.ChannelClosed):
LOG.error(f"Channel closed by broker: {self.callback_func}")
else:
LOG.error(e)
self.error_func(self, e)
self.join(allow_restart=True)

def join(self, timeout: Optional[float] = ..., allow_restart: bool = True) -> None:
"""Terminating consumer channel"""
if self._is_consumer_alive:
try:
if not (self.connection.is_closed or self.connection.is_closing):
self.connection.close()
except Exception as x:
LOG.error(x)
finally:
self._is_consuming = False
if not allow_restart:
self._is_consumer_alive = False
try:
super(ConsumerThread, self).join(timeout=timeout)
except RuntimeError:
pass
Loading

0 comments on commit 3bdabcc

Please sign in to comment.