diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 971b405..fc23191 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -20,7 +20,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install .[aio] -r requirements/test_requirements.txt + pip install . -r requirements/test_requirements.txt - name: Run Utils Tests run: | pytest tests/test_utils.py --doctest-modules --junitxml=tests/utils-test-results.xml diff --git a/README.md b/README.md index 1399455..50ce009 100644 --- a/README.md +++ b/README.md @@ -69,28 +69,11 @@ Where `` is the queue to which the response will be published, and `data` ### [BETA] Asynchronous Consumers -Now there is a support for async-based consumers handling based on `aio-pika` API. - -#### Installation - -To install async dependencies, run installation with `[aio]` extra: -```shell -pip install neon-mq-connector[aio] -``` +Now there is a support for async-based consumers handling based on `pika.SelectConnection` #### Enabling in a code -When declaring the consumer/subscriber instances, specify `async_consumer` param like follows: -```python -register_consumer(name="some_consumer", - ... - async_consumer=True,) -register_subscriber(name="some_subscriber", - ... - async_consumer=True,) -``` - -To set creation of async consumers/subscribers globally, set the class-attribute `async_consumers_enabled` to True: +To enable creation of async consumers/subscribers, set the class-attribute `async_consumers_enabled` to True: ```python from neon_mq_connector import MQConnector diff --git a/neon_mq_connector/aio/__init__.py b/neon_mq_connector/aio/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/neon_mq_connector/aio/consumer.py b/neon_mq_connector/aio/consumer.py deleted file mode 100644 index 0d8e8de..0000000 --- a/neon_mq_connector/aio/consumer.py +++ /dev/null @@ -1,106 +0,0 @@ -from typing import Optional - -import aio_pika - -from aio_pika.abc import AbstractIncomingMessage -from aio_pika.exchange import ExchangeType - -from neon_mq_connector.utils import consumer_utils - - -class AsyncConsumer: - def __init__( - self, - connection_params, - queue, - callback_func: callable, - error_func: callable = consumer_utils.default_error_handler, - auto_ack: bool = True, - queue_reset: bool = False, - queue_exclusive: bool = False, - exchange: Optional[str] = None, - exchange_reset: bool = False, - exchange_type: str = ExchangeType.DIRECT.value, - *args, - **kwargs, - ): - self.channel = None - self.connection = None - self.connection_params = connection_params - self.queue = queue - self.callback_func = lambda message: self._async_on_message_wrapper(message, callback_func) - self.error_func = error_func - self.no_ack = auto_ack - self.queue_reset = queue_reset - self.queue_exclusive = queue_exclusive - self.exchange = exchange or '' - self.exchange_reset = exchange_reset - self.exchange_type = exchange_type or ExchangeType.DIRECT.value - self._is_consuming = False - self._is_consumer_alive = True - - async def connect(self) -> None: - """ - Utilises aio-pika as a base interface for establishing async MQ connection - Upon establishing connection, declares queue and exchange if applicable - """ - self.connection = await aio_pika.connect_robust(**self.connection_params) - self.channel = await self.connection.channel() - await self.channel.set_qos(prefetch_count=50) - if self.queue_reset: - await self.channel.queue_delete(self.queue) - self.queue = await self.channel.declare_queue( - self.queue, - auto_delete=False, - exclusive=self.queue_exclusive - ) - if self.exchange: - if self.exchange_reset: - await self.channel.exchange_delete(self.exchange) - self.exchange = await self.channel.declare_exchange( - self.exchange, - self.exchange_type, - auto_delete=False - ) - await self.queue.bind(self.exchange) - await self.queue.consume(self.callback_func, no_ack=self.no_ack) - - @property - def is_consumer_alive(self) -> bool: - """ - Flag specifying whether consumer thread is alive - :return: True if consumer thread is alive, False otherwise - """ - return self._is_consumer_alive - - async def start(self): - if not self._is_consuming: - try: - await self.connect() - self._is_consuming = True - except Exception as e: - self._is_consuming = False - self.error_func(self, e) - - async def stop(self): - if self._is_consumer_alive: - try: - await self.queue.cancel() - await self.channel.close() - await self.connection.close() - except Exception as e: - self.error_func(self, e) - finally: - self._is_consuming = False - self._is_consumer_alive = False - - @classmethod - async def _async_on_message_wrapper(cls, message: AbstractIncomingMessage, callback: callable): - """ - Async wrapper to process asynchronous MQ messages - :param message: `AbstractIncomingMessage` instance - :param callback: the actual callback function - :return: - """ - async with message.process(ignore_processed=True): - await callback(message) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index e642720..22228ec 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -32,28 +32,30 @@ import uuid import pika import pika.exceptions -import threading -import asyncio from abc import ABC -from typing import Optional, Dict, Any, Union +from typing import Optional, Dict, Any, Union, Type + from pika.exchange_type import ExchangeType from ovos_utils.log import LOG from neon_mq_connector.config import load_neon_mq_config -from neon_mq_connector.consumer import ConsumerThread - -try: - from neon_mq_connector.aio.consumer import AsyncConsumer -except ImportError: - LOG.warning("cannot use AsyncConsumer - `aio` dependencies are missing") - AsyncConsumer = None +from neon_mq_connector.consumers import BlockingConsumerThread, SelectConsumerThread from neon_mq_connector.utils.connection_utils import wait_for_mq_startup, retry from neon_mq_connector.utils.network_utils import dict_to_b64 from neon_mq_connector.utils.thread_utils import RepeatingTimer +# DO NOT REMOVE ME: Defined for backward compatibility +ConsumerThread = BlockingConsumerThread + +ConsumerThreadInstance = Union[BlockingConsumerThread, SelectConsumerThread] + +SUPPORTED_THREADED_CONSUMERS = (BlockingConsumerThread, + SelectConsumerThread,) + + class MQConnector(ABC): """ Abstract class implementing interface for attaching services to MQ server @@ -61,10 +63,9 @@ class MQConnector(ABC): __run_retries__ = 5 __max_consumer_restarts__ = -1 - __consumer_join_timeout__ = 10 + __consumer_join_timeout__ = 1 - # Feature flags enabling spawning `AsyncConsumer` instances by default on `register_consumer` - async_consumers_enabled: bool = False + async_consumers_enabled = False @staticmethod def init_config(config: Optional[dict] = None) -> dict: @@ -102,7 +103,7 @@ def __init__(self, config: Optional[dict], service_name: str): self.property_key = 'properties' self._service_id = None self.service_name = service_name - self.consumers: Dict[str, Union[ConsumerThread, AsyncConsumer]] = dict() + self.consumers: Dict[str, ConsumerThreadInstance] = dict() self.consumer_properties = dict() self._vhost = None self._sync_thread = None @@ -245,9 +246,10 @@ def vhost(self, val: str): val = f'/{val}' self._vhost = val - def get_connection_params(self, vhost: str, **kwargs) -> pika.ConnectionParameters: + def get_connection_params(self, vhost: str, **kwargs) -> \ + pika.ConnectionParameters: """ - Gets connection parameters to be used to create the MQ connection + Gets connection parameters to be used to create an mq connection :param vhost: virtual_host to connect to """ connection_params = pika.ConnectionParameters( @@ -283,7 +285,7 @@ def emit_mq_message(cls, (defaults to 1 second) :raises ValueError: invalid request data provided - :returns message_id: id of the published message + :returns message_id: id of the sent message """ if not isinstance(request_data, dict): raise TypeError(f"Expected dict and got {type(request_data)}") @@ -329,7 +331,7 @@ def publish_message(cls, (defaults to 1 second) :raises ValueError: invalid request data provided - :returns message_id: id of the published message + :returns message_id: id of the sent message """ return cls.emit_mq_message(connection=connection, request_data=request_data, exchange=exchange, @@ -400,34 +402,25 @@ def create_mq_connection(self, vhost: str = '/', **kwargs): return pika.BlockingConnection( parameters=self.get_connection_params(vhost, **kwargs)) - def register_consumer( - self, - name: str, - vhost: str, - queue: str, - callback: callable, - on_error: Optional[callable] = None, - auto_ack: bool = True, - queue_reset: bool = False, - exchange: str = None, - exchange_type: str = None, - exchange_reset: bool = False, - queue_exclusive: bool = False, - skip_on_existing: bool = False, - restart_attempts: Optional[int] = None, - async_consumer: Optional[bool] = None, - ) -> None: + def register_consumer(self, name: str, vhost: str, queue: str, + callback: callable, + on_error: Optional[callable] = None, + auto_ack: bool = True, queue_reset: bool = False, + exchange: str = None, exchange_type: str = None, + exchange_reset: bool = False, + queue_exclusive: bool = False, + skip_on_existing: bool = False, + restart_attempts: int = __max_consumer_restarts__): """ Registers a consumer for the specified queue. The callback function will handle items in the queue. Any raised exceptions will be passed as arguments to on_error. - - :param name: human-readable name of the consumer + :param name: Human readable name of the consumer :param vhost: vhost to register on :param queue: MQ Queue to read messages from :param queue_reset: to delete queue if exists (defaults to False) :param exchange: MQ Exchange to bind to - :param exchange_reset: delete exchange if exists (defaults to False) + :param exchange_reset: to delete exchange if exists (defaults to False) :param exchange_type: Type of MQ Exchange to use, documentation: https://www.rabbitmq.com/tutorials/amqp-concepts.html :param callback: Method to passed queued messages to @@ -438,67 +431,34 @@ def register_consumer( :param skip_on_existing: to skip if consumer already exists :param restart_attempts: max instance restart attempts (if < 0 - will restart infinitely times) - :param async_consumer: if set to True consumer will be served with `AsyncConsumer`, - otherwise `ConsumerThread` (defaults to `self.async_consumers_enabled` value) """ - - if exchange_type == ExchangeType.fanout.value: - LOG.info(f'Subscriber queue registered: {queue} ' - f'[subscriber_name={name},exchange={exchange},vhost={vhost}]') - elif exchange_type == ExchangeType.direct.value: - LOG.info(f'Consumer queue registered: {queue} ' - f'[subscriber_name={name},exchange={exchange},vhost={vhost}]') - - if restart_attempts is None: - restart_attempts = self.__max_consumer_restarts__ - error_handler = on_error or self.default_error_handler consumer = self.consumers.get(name, None) if consumer: # Gracefully terminating if skip_on_existing: - LOG.warning(f'Consumer under index "{name}" already declared') + LOG.info(f'Consumer under index "{name}" already declared') return self.stop_consumers(names=(name,), allow_restart=False) self.consumer_properties.setdefault(name, {}) - - self.consumer_properties[name]['properties'] = { - "connection_params": self.get_connection_params(vhost), - "queue": queue, - "queue_reset": queue_reset, - "callback_func": callback, - "exchange": exchange, - "exchange_reset": exchange_reset, - "exchange_type": exchange_type, - "error_func": error_handler, - "auto_ack": auto_ack, - "name": name, - "queue_exclusive": queue_exclusive, - } + self.consumer_properties[name]['properties'] = \ + dict(connection_params=self.get_connection_params(vhost), + queue=queue, queue_reset=queue_reset, callback_func=callback, + exchange=exchange, exchange_reset=exchange_reset, + exchange_type=exchange_type, error_func=error_handler, + auto_ack=auto_ack, name=name, queue_exclusive=queue_exclusive, ) self.consumer_properties[name]['restart_attempts'] = \ int(restart_attempts) self.consumer_properties[name]['started'] = False + self.consumers[name] = self.consumer_thread_cls(**self.consumer_properties[name]['properties']) - if async_consumer is None: - async_consumer = self.async_consumers_enabled - if async_consumer: - self.consumers[name] = \ - AsyncConsumer(**self.consumer_properties[name]['properties']) - else: - self.consumers[name] = \ - ConsumerThread(**self.consumer_properties[name]['properties']) - - def restart_consumer(self, name: str) -> None: - """ - Restarts consumer by name. - Additionally, checks whether mounted consumer has limitation on amount of restarts set, - if it does - checks if number of restarts exceeded. - - Otherwise, will increment restarts counter and recreate consumer worker via `run_consumers` + @property + def consumer_thread_cls(self) -> Type[ConsumerThreadInstance]: + if self.async_consumers_enabled: + return SelectConsumerThread + return BlockingConsumerThread - :param name: name of the consumer to restart - """ - consumer_cls = type(self.consumers.get(name)) + def restart_consumer(self, name: str): self.stop_consumers(names=(name,), allow_restart=True) consumer_data = self.consumer_properties.get(name, {}) restart_attempts = consumer_data.get('restart_attempts', @@ -511,33 +471,28 @@ def restart_consumer(self, name: str) -> None: elif 0 < restart_attempts < consumer_data.get('num_restarted', 0): err_msg = 'num restarts exceeded' else: - self.consumers[name] = consumer_cls(**consumer_data['properties']) + self.consumers[name] = self.consumer_thread_cls(**consumer_data['properties']) self.run_consumers(names=(name,)) self.consumer_properties[name].setdefault('num_restarted', 0) self.consumer_properties[name]['num_restarted'] += 1 if err_msg: LOG.error(f'Cannot restart consumer "{name}" - {err_msg}') - def register_subscriber(self, - name: str, - vhost: str, - callback: callable, - on_error: Optional[callable] = None, - exchange: str = None, - exchange_reset: bool = False, - auto_ack: bool = True, - skip_on_existing: bool = False, - restart_attempts: Optional[int] = None, - async_consumer: Optional[bool] = None, - ): + def register_subscriber(self, name: str, vhost: str, + callback: callable, + on_error: Optional[callable] = None, + exchange: str = None, exchange_reset: bool = False, + auto_ack: bool = True, + skip_on_existing: bool = False, + restart_attempts: int = __max_consumer_restarts__): """ Registers fanout exchange subscriber, wraps register_consumer() Any raised exceptions will be passed as arguments to on_error. - - :param name: Human-readable name of the consumer + :param name: Human readable name of the consumer :param vhost: vhost to register on :param exchange: MQ Exchange to bind to - :param exchange_reset: deletes exchange if exists (defaults to False) + :param exchange_reset: to delete exchange if exists + (defaults to False) :param callback: Method to passed queued messages to :param on_error: Optional method to handle any exceptions raised in message handling @@ -546,12 +501,12 @@ def register_subscriber(self, (defaults to False) :param restart_attempts: max instance restart attempts (if < 0 - will restart infinitely times) - :param async_consumer: if set to True consumer will be served with `AsyncConsumer`, - otherwise `ConsumerThread` (defaults to `self.async_consumers_enabled` value) """ # for fanout exchange queue does not matter unless its non-conflicting - # and is bound + # and is binded subscriber_queue = f'subscriber_{exchange}_{uuid.uuid4().hex[:6]}' + LOG.info(f'Subscriber queue registered: {subscriber_queue} ' + f'[subscriber_name={name},exchange={exchange},vhost={vhost}]') return self.register_consumer(name=name, vhost=vhost, queue=subscriber_queue, callback=callback, queue_reset=False, @@ -560,21 +515,16 @@ def register_subscriber(self, exchange_reset=exchange_reset, auto_ack=auto_ack, queue_exclusive=True, skip_on_existing=skip_on_existing, - restart_attempts=restart_attempts, - async_consumer=async_consumer) + restart_attempts=restart_attempts) @staticmethod - def default_error_handler(thread: ConsumerThread, exception: Exception): + def default_error_handler(thread: ConsumerThreadInstance, exception: Exception): LOG.error(f"{exception} occurred in {thread}") - def run_consumers( - self, - names: Optional[tuple] = None, - daemon=True - ) -> None: + def run_consumers(self, names: tuple = (), daemon=True): """ Runs consumer threads based on the name if present - (starts all the declared consumers by default) + (starts all of the declared consumers by default) :param names: names of consumers to consider :param daemon: to kill consumer threads once main thread is over @@ -582,54 +532,31 @@ def run_consumers( if not names or len(names) == 0: names = list(self.consumers) for name in names: - consumer = self.consumers.get(name) - if isinstance(consumer, ConsumerThread) and consumer.is_consumer_alive: - consumer.daemon = daemon - consumer.start() - elif AsyncConsumer is not None and isinstance(consumer, AsyncConsumer) and consumer.is_consumer_alive: - asyncio.create_task(consumer.start()) - - self.consumer_properties[name]['started'] = True + if isinstance(self.consumers.get(name), SUPPORTED_THREADED_CONSUMERS) and self.consumers[name].is_consumer_alive: + self.consumers[name].daemon = daemon + self.consumers[name].start() + self.consumer_properties[name]['started'] = True - def stop_consumers(self, - names: Optional[tuple] = None, - allow_restart: bool = True) -> None: + def stop_consumers(self, names: tuple = (), allow_restart: bool = True): """ - Stops consumer threads based on the name if present - (stops all the declared consumers by default) - - :param names: tuple of names of consumers to stop. If empty, will stop all consumers - :param allow_restart: to allow further restart of stopped consumers - - :raise ChildProcessError if exception occurred during consumer restart + Stops consumer threads based on the name if present + (stops all of the declared consumers by default) """ if not names or len(names) == 0: names = list(self.consumers) for name in names: try: - if name in list(self.consumers): - if isinstance(self.consumers[name], AsyncConsumer): - asyncio.run(self.consumers[name].stop()) - elif isinstance(self.consumers[name], ConsumerThread): - self.consumers[name].join(timeout=self.__consumer_join_timeout__, allow_restart=allow_restart) + if isinstance(self.consumers.get(name), SUPPORTED_THREADED_CONSUMERS) and self.consumers[name].is_alive(): + self.consumers[name].join(timeout=self.__consumer_join_timeout__, allow_restart=allow_restart) self.consumer_properties[name]['is_alive'] = self.consumers[name].is_consumer_alive self.consumer_properties[name]['started'] = False - self.consumers[name] = None except Exception as e: raise ChildProcessError(e) - @retry( - callback_on_exceeded='stop_sync_thread', - use_self=True, - num_retries=__run_retries__, - ) - def sync( - self, - vhost: str = None, - exchange: str = None, - queue: str = None, - request_data: dict = None - ): + @retry(callback_on_exceeded='stop_sync_thread', use_self=True, + num_retries=__run_retries__) + def sync(self, vhost: str = None, exchange: str = None, queue: str = None, + request_data: dict = None): """ Periodic notification message to be sent into MQ, used to notify other network listeners about this service health status @@ -651,18 +578,10 @@ def sync( self.publish_message(mq_connection, exchange=exchange, request_data=request_data) - @retry( - callback_on_exceeded='stop', - use_self=True, - num_retries=__run_retries__, - ) - def run( - self, - run_consumers: bool = True, - run_sync: bool = True, - run_observer: bool = True, - **kwargs, - ) -> None: + @retry(callback_on_exceeded='stop', use_self=True, + num_retries=__run_retries__) + def run(self, run_consumers: bool = True, run_sync: bool = True, + run_observer: bool = True, **kwargs): """ Generic method called on running the instance @@ -710,11 +629,8 @@ def observe_consumers(self): consumers_dict = copy.copy(self.consumers) for consumer_name, consumer_instance in consumers_dict.items(): if self.consumer_properties[consumer_name]['started'] and \ - not (isinstance(consumer_instance, ConsumerThread) and - not (isinstance(consumer_instance, AsyncConsumer)) - # This is the case when creation of `ConsumerThread` partially fails - # essentially leaving it as `threading.Thread` instance - and (isinstance(consumer_instance, threading.Thread) and consumer_instance.is_alive()) + not (isinstance(consumer_instance, SUPPORTED_THREADED_CONSUMERS) + and consumer_instance.is_alive() and consumer_instance.is_consuming): LOG.info(f'Consumer "{consumer_name}" is dead, restarting') self.restart_consumer(name=consumer_name) diff --git a/neon_mq_connector/consumers/__init__.py b/neon_mq_connector/consumers/__init__.py new file mode 100644 index 0000000..c336daa --- /dev/null +++ b/neon_mq_connector/consumers/__init__.py @@ -0,0 +1,36 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2022 Neongecko.com Inc. +# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, +# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo +# BSD-3 License +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +__all__ = [ + 'BlockingConsumerThread', + 'SelectConsumerThread', +] + +from neon_mq_connector.consumers.select_consumer import SelectConsumerThread +from neon_mq_connector.consumers.blocking_consumer import BlockingConsumerThread diff --git a/neon_mq_connector/consumer.py b/neon_mq_connector/consumers/blocking_consumer.py similarity index 63% rename from neon_mq_connector/consumer.py rename to neon_mq_connector/consumers/blocking_consumer.py index b34661e..b6e2d44 100644 --- a/neon_mq_connector/consumer.py +++ b/neon_mq_connector/consumers/blocking_consumer.py @@ -1,3 +1,32 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2022 Neongecko.com Inc. +# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, +# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo +# BSD-3 License +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + import threading from typing import Optional @@ -8,11 +37,13 @@ from neon_mq_connector.utils import consumer_utils -class ConsumerThread(threading.Thread): +class BlockingConsumerThread(threading.Thread): + """ + Consumer thread implementation based on pika.BlockingConnection + """ # retry to handle connection failures in case MQ server is still starting - def __init__(self, - connection_params: pika.ConnectionParameters, + def __init__(self, connection_params: pika.ConnectionParameters, queue: str, callback_func: callable, error_func: callable = consumer_utils.default_error_handler, auto_ack: bool = True, @@ -20,8 +51,7 @@ def __init__(self, queue_exclusive: bool = False, exchange: Optional[str] = None, exchange_reset: bool = False, - exchange_type: str = ExchangeType.direct, - *args, **kwargs): + exchange_type: str = ExchangeType.direct, *args, **kwargs): """ Rabbit MQ Consumer class that aims at providing unified configurable interface for consumer threads @@ -80,15 +110,16 @@ def run(self): """Creating consumer channel""" if not self._is_consuming: try: - super(ConsumerThread, self).run() + super(BlockingConsumerThread, self).run() self._is_consuming = True self.channel.start_consuming() except Exception as e: self._is_consuming = False if isinstance(e, pika.exceptions.ChannelClosed): - LOG.error(f"Channel closed by broker: {self.callback_func}") + LOG.info(f"Channel closed by broker: {self.callback_func}") + elif isinstance(e, pika.exceptions.StreamLostError): + LOG.info("Connection closed by broker") else: - LOG.error(e) self.error_func(self, e) self.join(allow_restart=True) @@ -97,14 +128,12 @@ def join(self, timeout: Optional[float] = ..., allow_restart: bool = True) -> No if self._is_consumer_alive: try: self.channel.stop_consuming() - if self.channel.is_open: - self.channel.close() if self.connection.is_open: self.connection.close() - except Exception as x: - LOG.error(x) + except Exception as e: + LOG.error(e) finally: self._is_consuming = False if not allow_restart: self._is_consumer_alive = False - super(ConsumerThread, self).join(timeout=timeout) + super(BlockingConsumerThread, self).join(timeout=timeout) diff --git a/neon_mq_connector/consumers/select_consumer.py b/neon_mq_connector/consumers/select_consumer.py new file mode 100644 index 0000000..d31b9ed --- /dev/null +++ b/neon_mq_connector/consumers/select_consumer.py @@ -0,0 +1,218 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2022 Neongecko.com Inc. +# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, +# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo +# BSD-3 License +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import threading +import time + +from typing import Optional + +import pika.exceptions +from ovos_utils import LOG +from pika.exchange_type import ExchangeType + +from neon_mq_connector.utils import consumer_utils + + +class SelectConsumerThread(threading.Thread): + """ + Consumer thread implementation based on pika.SelectConnection + """ + + def __init__(self, + connection_params: pika.ConnectionParameters, + queue: str, callback_func: callable, + error_func: callable = consumer_utils.default_error_handler, + auto_ack: bool = True, + queue_reset: bool = False, + queue_exclusive: bool = False, + exchange: Optional[str] = None, + exchange_reset: bool = False, + exchange_type: str = ExchangeType.direct, + *args, **kwargs): + """ + Rabbit MQ Consumer class that aims at providing unified configurable + interface for consumer threads + :param connection_params: pika connection parameters + :param queue: Desired consuming queue + :param callback_func: logic on message receiving + :param error_func: handler for consumer thread errors + :param auto_ack: Boolean to enable ack of messages upon receipt + :param queue_reset: If True, delete an existing queue `queue` + :param queue_exclusive: Marks declared queue as exclusive + to a given channel (deletes with it) + :param exchange: exchange to bind queue to (optional) + :param exchange_reset: If True, delete an existing exchange `exchange` + :param exchange_type: type of exchange to bind to from ExchangeType + (defaults to direct) + follow: https://www.rabbitmq.com/tutorials/amqp-concepts.html + to learn more about different exchanges + """ + threading.Thread.__init__(self, *args, **kwargs) + self._is_consuming = False # annotates that ConsumerThread is running + self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated + self.callback_func = callback_func + self.error_func = error_func + self.exchange = exchange or '' + self.exchange_type = exchange_type or ExchangeType.direct + self.queue = queue or '' + self.channel = None + self.queue_exclusive = queue_exclusive + self.auto_ack = auto_ack + + self.connection_params = connection_params + self.queue_reset = queue_reset + self.exchange_reset = exchange_reset + + self.connection = self.create_connection() + self.connection_failed_attempts = 0 + self.max_connection_failed_attempts = 3 + + def create_connection(self): + return pika.SelectConnection(parameters=self.connection_params, + on_open_callback=self.on_connected, + on_open_error_callback=self.on_connection_fail, + on_close_callback=self.on_close,) + + def on_connected(self, _): + """Called when we are fully connected to RabbitMQ""" + self.connection.channel(on_open_callback=self.on_channel_open) + + def on_connection_fail(self, _): + """ Called when connection to RabbitMQ fails""" + self.connection_failed_attempts += 1 + if self.connection_failed_attempts > self.max_connection_failed_attempts: + LOG.error(f'Failed establish MQ connection after {self.max_connection_failed_attempts} attempts') + self.join(timeout=1) + else: + self.reconnect() + + def on_channel_open(self, new_channel): + """Called when our channel has opened""" + self.channel = new_channel + if self.queue_reset: + self.channel.queue_delete(queue=self.queue, + if_unused=True, + callback=self.declare_queue) + else: + self.declare_queue() + + def declare_queue(self, _unused_frame = None): + return self.channel.queue_declare(queue=self.queue, + exclusive=self.queue_exclusive, + auto_delete=False, + callback=self.on_queue_declared) + + def on_queue_declared(self, _unused_frame = None): + """Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ""" + if self.exchange: + self.setup_exchange() + else: + self.set_qos() + + def setup_exchange(self): + if self.exchange_reset: + self.channel.exchange_delete(exchange=self.exchange, callback=self.declare_exchange) + else: + self.declare_exchange() + + def declare_exchange(self, _unused_frame = None): + self.channel.exchange_declare(exchange=self.exchange, + exchange_type=self.exchange_type, + auto_delete=False, + callback=self.bind_exchange_to_queue) + + def bind_exchange_to_queue(self, _unused_frame = None): + try: + self.channel.queue_bind( + queue=self.queue, + exchange=self.exchange, + callback=self.set_qos + ) + except Exception as e: + LOG.error(f"Error binding queue '{self.queue}' to exchange '{self.exchange}': {e}") + + def set_qos(self, _unused_frame = None): + self.channel.basic_qos(prefetch_count=50, callback=self.start_consuming) + + def start_consuming(self, _unused_frame = None): + self.channel.basic_consume(queue=self.queue, + on_message_callback=self.on_message, + auto_ack=self.auto_ack) + + def on_message(self, channel, method, properties, body): + try: + self.callback_func(channel, method, properties, body) + except Exception as e: + self.error_func(self, e) + + def on_close(self, _, e): + LOG.error(f"Closing MQ connection due to exception: {e}") + self.join() + + @property + def is_consumer_alive(self) -> bool: + return self._is_consumer_alive + + @property + def is_consuming(self) -> bool: + return self._is_consuming + + def run(self): + """Starting connnection io loop """ + if not self.is_consuming: + try: + super(SelectConsumerThread, self).run() + self._is_consuming = True + self.connection.ioloop.start() + except Exception as e: + LOG.error(f"Failed to start io loop on consumer thread {self.name!r}: {e}") + self.join(allow_restart=True) + + def _close_connection(self): + try: + if self.connection and not (self.connection.is_closed or self.connection.is_closing): + self.connection.ioloop.stop() + self.connection.close() + except Exception as e: + LOG.error(f"Failed to close connection for Consumer {self.name!r}: {e}") + self._is_consuming = False + + def reconnect(self, wait_interval: int = 1): + self._close_connection() + time.sleep(wait_interval) + self.run() + + def join(self, timeout: Optional[float] = None, allow_restart: bool = True) -> None: + """Terminating consumer channel""" + if self.is_consumer_alive and self.is_consuming: + self._is_consuming = False + self._close_connection() + if not allow_restart: + self._is_consumer_alive = False + super().join(timeout=timeout) diff --git a/neon_mq_connector/utils/consumer_utils.py b/neon_mq_connector/utils/consumer_utils.py index 4fe8649..f65d3e6 100644 --- a/neon_mq_connector/utils/consumer_utils.py +++ b/neon_mq_connector/utils/consumer_utils.py @@ -1,3 +1,32 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2022 Neongecko.com Inc. +# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, +# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo +# BSD-3 License +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + from ovos_utils.log import LOG def default_error_handler(*args): diff --git a/neon_mq_connector/utils/rabbit_utils.py b/neon_mq_connector/utils/rabbit_utils.py index 59fa09d..00ec73b 100644 --- a/neon_mq_connector/utils/rabbit_utils.py +++ b/neon_mq_connector/utils/rabbit_utils.py @@ -27,6 +27,7 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from functools import wraps + from ovos_utils.log import LOG from neon_mq_connector.utils.network_utils import b64_to_dict diff --git a/requirements/aio_requirements.txt b/requirements/aio_requirements.txt deleted file mode 100644 index 88d4bb8..0000000 --- a/requirements/aio_requirements.txt +++ /dev/null @@ -1 +0,0 @@ -aio-pika~=9.4.3 diff --git a/requirements/test_requirements.txt b/requirements/test_requirements.txt index 609d40d..aac01b0 100644 --- a/requirements/test_requirements.txt +++ b/requirements/test_requirements.txt @@ -1,2 +1,2 @@ pytest~=6.2 -parameterized~=0.9.0 \ No newline at end of file +parameterized~=0.9.0 diff --git a/setup.py b/setup.py index 1eedad9..fee3035 100644 --- a/setup.py +++ b/setup.py @@ -78,8 +78,5 @@ def get_requirements(requirements_filename: str): 'Intended Audience :: Developers', 'Programming Language :: Python :: 3.8', ], - extras_require={ - "aio": get_requirements("aio_requirements.txt"), - }, python_requires = '>=3.8', ) diff --git a/tests/test_backward_compatibility.py b/tests/test_backward_compatibility.py index 4fa62c8..3cd82bc 100644 --- a/tests/test_backward_compatibility.py +++ b/tests/test_backward_compatibility.py @@ -33,7 +33,8 @@ from neon_mq_connector import MQConnector from neon_mq_connector.config import Configuration -from neon_mq_connector.connector import ConsumerThread, AsyncConsumer +from neon_mq_connector.connector import ConsumerThreadInstance +from neon_mq_connector.consumers import SelectConsumerThread, BlockingConsumerThread class OldMQConnectorChild(MQConnector): @@ -62,21 +63,22 @@ def setUpClass(cls) -> None: ( "async_mode_enabled", # test name True, # async consumer flag - AsyncConsumer, # expected consumer instance + SelectConsumerThread, # expected consumer instance ), ( "async_mode_disabled", False, - ConsumerThread, + BlockingConsumerThread, ) ] ) def test_stable_register_consumer_args( self, - test_name: str, - async_mode_enabled: bool, - expected_consumer_instance + test_name, + async_mode_enabled, + expected_consumer_instance, ): + self.connector.async_consumers_enabled = async_mode_enabled # Required connector.register_consumer() arguments order: # name: str, vhost: str, queue: str, # callback: callable, on_error: Optional[callable] = None, @@ -88,7 +90,6 @@ def test_stable_register_consumer_args( callback=self.connector.callback_func_1, on_error=self.connector.default_error_handler, auto_ack=False, - async_consumer=async_mode_enabled, ) self.assertIsInstance(self.connector.consumers['test_consumer'], expected_consumer_instance) diff --git a/tests/test_connector.py b/tests/test_connector.py index e0e7ec2..be7186d 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -27,16 +27,17 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os import threading +import time import unittest import pika import pytest from unittest.mock import Mock from ovos_utils.log import LOG -from parameterized import parameterized +from pika.exchange_type import ExchangeType from neon_mq_connector.config import Configuration -from neon_mq_connector.connector import MQConnector, ConsumerThread +from neon_mq_connector.connector import MQConnector, ConsumerThreadInstance from neon_mq_connector.utils import RepeatingTimer from neon_mq_connector.utils.rabbit_utils import create_mq_callback @@ -53,11 +54,7 @@ def __init__(self, config: dict, service_name: str): self._consume_event = None self._consumer_restarted_event = None self._vhost = "/neon_testing" - self.observe_period = 10 - self.register_consumer(name="error", vhost=self.vhost, queue="error", - callback=self.callback_func_error, - on_error=self.handle_error, auto_ack=False, - restart_attempts=0) + self.observe_period = 5 @create_mq_callback(include_callback_props=('channel', 'method',)) def callback_func_1(self, channel, method): @@ -91,7 +88,7 @@ def callback_func_after_message(self, channel, method): def callback_func_error(self, channel, method, properties, body): raise Exception("Exception to Handle") - def handle_error(self, thread: ConsumerThread, exception: Exception): + def handle_error(self, thread: ConsumerThreadInstance, exception: Exception): self.exception = exception self.consume_event.set() @@ -132,38 +129,27 @@ def tearDownClass(cls) -> None: def test_not_null_service_id(self): self.assertIsNotNone(self.connector_instance.service_id) - @parameterized.expand( - input=[ - ( - "async_mode_enabled", # test name - True, # async consumer flag - ), - ( - "async_mode_disabled", - False, - ) - ] - ) + def tearDown(self): + self.connector_instance.stop_consumers(allow_restart=False) + @pytest.mark.timeout(30) - async def test_mq_messaging(self, test_name: str, async_consumer_flag: bool): + def test_mq_messaging(self): self.connector_instance.func_1_ok = False self.connector_instance.func_2_ok = False test_consumers = ('test1', 'test2',) - self.connector_instance.stop_consumers(names=test_consumers) - - self.connector_instance.register_consumer(name="test1", vhost=self.connector_instance.vhost, + self.connector_instance.register_consumer(name="test1", + vhost=self.connector_instance.vhost, exchange='', queue='test', callback=self.connector_instance.callback_func_1, - auto_ack=False, - async_consumer=async_consumer_flag,) - self.connector_instance.register_consumer(name="test2", vhost=self.connector_instance.vhost, + auto_ack=False,) + self.connector_instance.register_consumer(name="test2", + vhost=self.connector_instance.vhost, exchange='', queue='test1', callback=self.connector_instance.callback_func_2, - auto_ack=False, - async_consumer=async_consumer_flag,) + auto_ack=False,) self.connector_instance.run_consumers(names=test_consumers) @@ -179,42 +165,28 @@ async def test_mq_messaging(self, test_name: str, async_consumer_flag: bool): self.assertTrue(self.connector_instance.func_1_ok) self.assertTrue(self.connector_instance.func_2_ok) - @parameterized.expand( - input=[ - ( - "async_mode_enabled", # test name - True, # async consumer flag - ), - ( - "async_mode_disabled", - False, - ) - ] - ) @pytest.mark.timeout(30) - async def test_publish_subscribe(self, test_name: str, async_consumer_flag: bool): + def test_publish_subscribe(self): self.connector_instance.func_1_ok = False self.connector_instance.func_2_ok = False test_consumers = ('test1', 'test2',) - self.connector_instance.stop_consumers(names=test_consumers) - self.connector_instance.register_subscriber(name="test1", vhost=self.connector_instance.vhost, exchange='test', + # exchange_reset=True, callback=self.connector_instance.callback_func_1, - auto_ack=False, - async_consumer=async_consumer_flag) + auto_ack=False,) self.connector_instance.register_subscriber(name="test2", vhost=self.connector_instance.vhost, exchange='test', callback=self.connector_instance.callback_func_2, - auto_ack=False, - async_consumer=async_consumer_flag) + auto_ack=False,) self.connector_instance.run_consumers(names=test_consumers) + time.sleep(0.5) self.connector_instance.send_message(exchange='test', - exchange_type='fanout', + exchange_type=ExchangeType.fanout, request_data={'data': 'Hello!'}, expiration=4000) @@ -224,28 +196,31 @@ async def test_publish_subscribe(self, test_name: str, async_consumer_flag: bool self.assertTrue(self.connector_instance.func_2_ok) @pytest.mark.timeout(30) - def test_error(self): + def test_error(self,): + self.connector_instance.register_consumer( + name="error", + vhost=self.connector_instance.vhost, + queue="error", + queue_reset=True, + callback=self.connector_instance.callback_func_error, + on_error=self.connector_instance.handle_error, + auto_ack=False, + restart_attempts=0 + ) + self.connector_instance.run_consumers(names=("error",)) + time.sleep(0.5) + self.connector_instance.send_message(queue='error', request_data={'data': 'test'}, expiration=4000) - self.connector_instance.consume_event.wait(5) + + self.connector_instance.consume_event.wait(10) + self.assertIsInstance(self.connector_instance.exception, Exception) self.assertEqual(str(self.connector_instance.exception), "Exception to Handle") - @parameterized.expand( - input=[ - ( - "async_mode_enabled", # test name - True, # async consumer flag - ), - ( - "async_mode_disabled", - False, - ) - ] - ) @pytest.mark.timeout(30) - async def test_consumer_after_message(self, test_name: str, async_consumer_flag: bool): + def test_consumer_after_message(self,): self.connector_instance.send_message(queue='test3', request_data={'data': 'test'}, @@ -255,40 +230,28 @@ async def test_consumer_after_message(self, test_name: str, async_consumer_flag: vhost=self.connector_instance.vhost, queue="test3", callback=self.connector_instance.callback_func_after_message, - auto_ack=False, - async_consumer=async_consumer_flag) + auto_ack=False,) self.connector_instance.run_consumers(names=("test_consumer_after_message",)) - self.connector_instance.consume_event.wait(5) + self.connector_instance.consume_event.wait(10) self.assertTrue(self.connector_instance.callback_ok) - @parameterized.expand( - input=[ - ( - "async_mode_enabled", # test name - True, # async consumer flag - ), - ( - "async_mode_disabled", - False, - ) - ] - ) @pytest.mark.timeout(30) - async def test_consumer_restarted(self, test_name: str, async_consumer_flag: bool): + def test_consumer_restarted(self,): self.connector_instance.register_consumer( name="test3", vhost=self.connector_instance.vhost, exchange='', queue='test_failing_once_queue', + queue_reset=True, callback=self.connector_instance.callback_func_3, restart_attempts=1, auto_ack=False, - async_consumer=async_consumer_flag, ) self.connector_instance.run_consumers(names=('test3',)) + time.sleep(0.5) self.connector_instance.send_message(queue='test_failing_once_queue', request_data={'data': 'knock'}, @@ -319,6 +282,14 @@ def test_sync(self): self.connector_instance.publish_message = real_method +class MQConnectorChildAsyncModeTest(MQConnectorChildTest): + + @classmethod + def setUpClass(cls): + super(MQConnectorChildAsyncModeTest, cls).setUpClass() + cls.connector_instance.async_consumers_enabled = True + + class TestMQConnectorInit(unittest.TestCase): def test_connector_init(self): connector = MQConnector(None, "test")