Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kirgrim committed Nov 24, 2024
1 parent 14b819f commit db3f037
Showing 1 changed file with 55 additions and 76 deletions.
131 changes: 55 additions & 76 deletions neon_mq_connector/aio/consumer.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,30 @@
from typing import Optional

import aio_pika
import asyncio

import aio_pika
from aio_pika import ExchangeType
from aio_pika.abc import AbstractIncomingMessage
from aio_pika.exchange import ExchangeType
from ovos_utils import LOG

from neon_mq_connector.utils import consumer_utils


class AsyncConsumer:
def __init__(
self,
connection_params,
queue,
callback_func: callable,
error_func: callable = consumer_utils.default_error_handler,
auto_ack: bool = True,
queue_reset: bool = False,
queue_exclusive: bool = False,
exchange: Optional[str] = None,
exchange_reset: bool = False,
exchange_type: str = ExchangeType.DIRECT.value,
*args,
**kwargs,
):
def __init__(self, connection_params, queue, callback_func, *args, **kwargs):
self.channel = None
self.connection = None
self.connection_params = connection_params
self.queue = queue
self.callback_func = lambda message: self._async_on_message_wrapper(message, callback_func)
self.error_func = error_func
self.no_ack = auto_ack
self.queue_reset = queue_reset
self.queue_exclusive = queue_exclusive
self.exchange = exchange or ''
self.exchange_reset = exchange_reset
self.exchange_type = exchange_type or ExchangeType.DIRECT.value
self.error_func = kwargs.get("error_func", consumer_utils.default_error_handler)
self.no_ack = not kwargs.get("auto_ack", True)
self.queue_reset = kwargs.get("queue_reset", False)
self.queue_exclusive = kwargs.get("queue_exclusive", False)
self.exchange = kwargs.get("exchange", "")
self.exchange_reset = kwargs.get("exchange_reset", False)
self.exchange_type = kwargs.get("exchange_type", ExchangeType.DIRECT.value)
self._is_consuming = False
self._is_consumer_alive = True
self._consumer_stop_event = asyncio.Event()

async def create_connection(self):
return await aio_pika.connect_robust(
Expand All @@ -49,69 +35,62 @@ async def create_connection(self):
virtualhost=self.connection_params.virtual_host,
)

async def connect(self) -> None:
"""
Utilises aio-pika as a base interface for establishing async MQ connection
Upon establishing connection, declares queue and exchange if applicable
"""
self.connection = await self.create_connection()
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=50)
if self.queue_reset:
await self.channel.queue_delete(self.queue)
self.queue = await self.channel.declare_queue(
self.queue,
auto_delete=False,
exclusive=self.queue_exclusive
)
if self.exchange:
if self.exchange_reset:
await self.channel.exchange_delete(self.exchange)
self.exchange = await self.channel.declare_exchange(
self.exchange,
self.exchange_type,
auto_delete=False
)
await self.queue.bind(self.exchange)
await self.queue.consume(self.callback_func, no_ack=self.no_ack)
await asyncio.Future()

@property
def is_consumer_alive(self) -> bool:
"""
Flag specifying whether consumer thread is alive
:return: True if consumer thread is alive, False otherwise
"""
return self._is_consumer_alive
async def connect(self):
try:
self.connection = await self.create_connection()
async with self.connection:
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=50)
if self.queue_reset:
await self.channel.queue_delete(self.queue)
self.queue = await self.channel.get_queue(name=self.queue)
if self.exchange:
if self.exchange_reset:
await self.channel.exchange_delete(self.exchange)
self.exchange = await self.channel.get_exchange(name=self.exchange)
await self.queue.bind(exchange=self.exchange)
await self.queue.consume(self.callback_func, exclusive=self.queue_exclusive, no_ack=self.no_ack)
except asyncio.CancelledError:
LOG.info("Consumer startup cancelled.")
except Exception as e:
LOG.error(f"Error during consumer startup: {e}")
await self.cleanup()
raise

async def start(self):
if not self._is_consuming:
LOG.info("Starting consumer...")
while self._is_consumer_alive:
try:
await self.connect()
self._is_consuming = True
LOG.info("Consumer started successfully.")
await self._consumer_stop_event.wait()
break
except Exception as e:
self._is_consuming = False
self.error_func(self, e)
LOG.warning("Retrying connection in 5 seconds...")
await asyncio.sleep(5)
LOG.info("Consumer shutting down...")
await self.cleanup()

async def stop(self):
LOG.info("Stopping consumer...")
if self._is_consumer_alive:
try:
await self.queue.cancel()
self._consumer_stop_event.set()
await self.cleanup()
self._is_consuming = False
self._is_consumer_alive = False

async def cleanup(self):
try:
if self.channel and not self.channel.is_closed:
await self.channel.close()
if self.connection and not self.connection.is_closed:
await self.connection.close()
except Exception as e:
self.error_func(self, e)
finally:
self._is_consuming = False
self._is_consumer_alive = False
except Exception as e:
LOG.error(f"Error during cleanup: {e}")

@classmethod
async def _async_on_message_wrapper(cls, message: AbstractIncomingMessage, callback: callable):
"""
Async wrapper to process asynchronous MQ messages
:param message: `AbstractIncomingMessage` instance
:param callback: the actual callback function
:return:
"""
async with message.process(ignore_processed=True):
await callback(message)
await callback(message)

0 comments on commit db3f037

Please sign in to comment.