diff --git a/mrsal/config/config.py b/mrsal/config/config.py index 20d5a9b..6267d62 100644 --- a/mrsal/config/config.py +++ b/mrsal/config/config.py @@ -7,7 +7,7 @@ V_HOST: str = os.environ.get("RABBITMQ_DEFAULT_VHOST", "myMrsalHost") RABBITMQ_PORT: int = os.environ.get("RABBITMQ_PORT", 5672) RABBITMQ_PORT_TLS: int = os.environ.get("RABBITMQ_PORT_TLS", 5671) -RABBIT_DOMAIN: str = os.environ.get("RABBITMQ_DOMAIN", "localhost") +RABBIT_DOMAIN: str = os.environ.get("RABBITMQ_DOMAIN", "rabbitmq.neomodels.app") RABBITMQ_USER = os.environ.get("RABBITMQ_DEFAULT_USER", "root") RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_DEFAULT_PASS", "password") diff --git a/mrsal/mrsal.py b/mrsal/mrsal.py index 99e0722..83b9bf4 100644 --- a/mrsal/mrsal.py +++ b/mrsal/mrsal.py @@ -4,17 +4,18 @@ import ssl from dataclasses import dataclass from socket import gaierror -from typing import Any, Callable, Dict, Tuple +from typing import Any, Callable, Dict, Tuple, Union, List +import time import pika from pika import SSLOptions from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, ChannelWrongStateError, ConnectionClosedByBroker from pika.exchange_type import ExchangeType from retry import retry -import mrsal.config.config as config +from mrsal.config import config from loguru import logger -from mrsal.utils.utils import is_redelivery_configured +from mrsal.utils import utils @dataclass @@ -51,7 +52,7 @@ class Mrsal: _connection: pika.BlockingConnection = None _channel = None - @retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, TypeError, gaierror), tries=2, delay=5, jitter=(1, 3), logger = logger) + @retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, TypeError, gaierror), tries=5, delay=1, jitter=(0, 2), logger=logger) def connect_to_server(self, context: Dict[str, str] = None): """We can use connect_to_server for establishing a connection to RabbitMQ server specifying connection parameters. @@ -93,7 +94,7 @@ def connect_to_server(self, context: Dict[str, str] = None): self._channel.basic_qos(prefetch_count=self.prefetch_count) logger.info(f"Connection established with RabbitMQ on {connection_info}") return self._connection - except AMQPConnectionError as err: + except pika.exceptions.AMQPConnectionError as err: msg: str = f"I tried to connect with the RabbitMQ server but failed with: {err}" logger.error(msg) raise pika.exceptions.AMQPConnectionError(msg) @@ -291,7 +292,7 @@ def consume_messages_with_retries( logger.error(f"Connection closed with error: {e}") self._channel.stop_consuming() - @retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=2, delay=5, jitter=(1, 3), logger=logger) + @retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=5, delay=1, jitter=(0, 2), logger=logger) def start_consumer( self, queue: str, @@ -408,7 +409,7 @@ def start_consumer( if not auto_ack and reject_unprocessed: self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=requeue) logger.info(f"{print_thread_index}Message coming from the app={app_id} with messageId={msg_id} is rejected.") - if is_redelivery_configured(properties): + if utils.is_redelivery_configured(properties): msg_headers = properties.headers x_retry = msg_headers[config.RETRY_KEY] x_retry_limit = msg_headers[config.RETRY_LIMIT_KEY] @@ -433,9 +434,10 @@ def start_consumer( logger.warning(f"{print_thread_index}Given period of inactivity {inactivity_timeout} is exceeded. Cancel consumer.") self.stop_consuming(self.consumer_tag) self._channel.cancel() - except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, ValueError, TypeError): - logger.error(f"{print_thread_index}I lost the connection with the Mrsal.", exc_info=True) - pass + except pika.exceptions.ConnectionClosedByBroker as err: + logger.error(f"{print_thread_index}I lost the connection with the Mrsal. {err}", exc_info=True) + self._channel.cancel() + raise pika.exceptions.ConnectionClosedByBroker(503, str(err)) except KeyboardInterrupt: logger(f"{print_thread_index}Stopping Mrsal consumption.") self.stop_consuming(self.consumer_tag) @@ -525,7 +527,7 @@ def start_concurrence_consumer( [callback_with_delivery_info] * total_threads, ) - @retry((pika.exceptions.UnroutableError), tries=2, delay=5, jitter=(1, 3)) + @retry((pika.exceptions.UnroutableError, pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker), tries=5, delay=1, jitter=(0, 2), logger=logger) def publish_message( self, exchange: str, @@ -560,7 +562,7 @@ def publish_message( self.exchange_exist(exchange=exchange, exchange_type=exchange_type) if queue is not None: self.queue_exist(queue=queue) - except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker) as err: + except pika.exceptions.ChannelClosedByBroker as err: logger.error(f"Failed to check active resources. Cancel consumer. {str(err)}") self._channel.cancel() raise pika.exceptions.ChannelClosedByBroker(404, str(err)) @@ -574,7 +576,7 @@ def publish_message( return True except pika.exceptions.UnroutableError as err1: logger.error(f"Producer could not publish message:{message} to the exchange {exchange} with a routing key {routing_key}: {err1}", exc_info=True) - return False + raise pika.exceptions.UnroutableError(404, str(err1)) # TODO NOT IN USE: maybe we will use it in the method consume_messages_with_retries # to publish messages to dead letters exchange after retries limit. (remove or use) @@ -588,3 +590,64 @@ def publish_dead_letter(self, message: str, delivery_tag: int, dead_letters_exch except pika.exceptions.UnroutableError: logger.error("Dead letter was returned") return False + + + +if __name__ == "__main__": + + # Main script testing + + def test_callback( + method: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes + ) -> None: + consumer_tag = method.consumer_tag + exchange = method.exchange + routing_key = method.routing_key + app_id = properties.app_id + message_id = properties.message_id + + # Decode and parse the message body + enc_payload: Dict[str, Union[str, int, List]] | str = json.loads(body) + payload = enc_payload if isinstance(enc_payload, dict) else json.loads(enc_payload) + + print(f" [x] Received {payload}") + + print("Simulating a long running process") + time.sleep(5) + print("Process completed") + return True + + # Example test + print('\n\033[1;35;40m Start NeoCowboy Service \033[0m') + mrsal: Mrsal = Mrsal( + host=config.RABBIT_DOMAIN, + port=config.RABBITMQ_PORT_TLS, + credentials=(config.RABBITMQ_CREDENTIALS), + virtual_host=config.V_HOST, + ssl=True, + verbose=True, + ) + + mrsal.connect_to_server() + + exch_result: pika.frame.Method = mrsal.setup_exchange( + exchange="exchangeRT", + exchange_type='x-delayed-message', + arguments={'x-delayed-type': 'x-delayed-message'}, + ) + q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue") + qb_result: pika.frame.Method = mrsal.setup_queue_binding( + exchange="exchangeRT", + routing_key="exchangeRT.mrsal_testQueue", + queue="mrsal_testQueue", + ) + + mrsal.start_consumer( + queue="mrsal_testQueue", + callback=test_callback, + requeue=False, + callback_with_delivery_info=True, + auto_ack=True, + ) \ No newline at end of file diff --git a/mrsal/utils/__init__.py b/mrsal/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mrsal/utils/utils.py b/mrsal/utils/utils.py index 55ceda1..c89cfb3 100644 --- a/mrsal/utils/utils.py +++ b/mrsal/utils/utils.py @@ -1,6 +1,6 @@ import pika -import mrsal.config.config as config +from mrsal.config import config def is_redelivery_configured(msg_prop: pika.spec.BasicProperties):