From a4ff2b243c39a848e9fe8d65ec7fae761dacfba6 Mon Sep 17 00:00:00 2001 From: Martin Date: Thu, 29 Aug 2024 14:34:26 +0200 Subject: [PATCH] function name change --- mrsal/mrsal.py | 175 +++++++++++++++++++++++++++++++++++++------------ pyproject.toml | 2 +- 2 files changed, 134 insertions(+), 43 deletions(-) diff --git a/mrsal/mrsal.py b/mrsal/mrsal.py index 9c2c06a..5082159 100644 --- a/mrsal/mrsal.py +++ b/mrsal/mrsal.py @@ -9,7 +9,7 @@ import time import pika from pika import SSLOptions -from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, ChannelWrongStateError, ConnectionClosedByBroker +from pika.exceptions import ChannelClosedByBroker, ConnectionClosedByBroker from pika.exchange_type import ExchangeType from retry import retry @@ -52,7 +52,6 @@ class Mrsal: _connection: pika.BlockingConnection = None _channel = None - @retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, TypeError, gaierror), tries=5, delay=1, jitter=(0, 10), logger=logger) def connect_to_server(self, context: Dict[str, str] = None): """We can use connect_to_server for establishing a connection to RabbitMQ server specifying connection parameters. @@ -164,15 +163,12 @@ def setup_queue(self, queue: str, arguments: Dict[str, str] = None, durable: boo """ if self.verbose: logger.info(f"Declaring queue with: {queue_declare_info}") - try: - queue_declare_result = self._channel.queue_declare(queue=queue, arguments=arguments, durable=durable, exclusive=exclusive, auto_delete=auto_delete, passive=passive) - if self.verbose: - logger.info(f"Queue is declared successfully: {queue_declare_info},result={queue_declare_result.method}") - return queue_declare_result - except (ChannelClosedByBroker, ChannelWrongStateError) as err: - msg: str = f"I tried to setup a queue but failed with: {err}" - logger.error(msg) - raise pika.exceptions.ConnectionClosedByBroker(503, msg) + + queue_declare_result = self._channel.queue_declare(queue=queue, arguments=arguments, durable=durable, exclusive=exclusive, auto_delete=auto_delete, passive=passive) + if self.verbose: + logger.info(f"Queue is declared successfully: {queue_declare_info},result={queue_declare_result.method}") + return queue_declare_result + def setup_queue_binding(self, exchange: str, queue: str, routing_key: str = None, arguments=None): """Bind queue to exchange. @@ -187,16 +183,12 @@ def setup_queue_binding(self, exchange: str, queue: str, routing_key: str = None """ if self.verbose: logger.info(f"Binding queue to exchange: queue={queue}, exchange={exchange}, routing_key={routing_key}") - try: - bind_result = self._channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key, arguments=arguments) - if self.verbose: - logger.info(f"The queue is bound to exchange successfully: queue={queue}, exchange={exchange}, routing_key={routing_key}, result={bind_result}") - return bind_result - except pika.exceptions.ChannelClosedByBroker as err: - msg: str = f"I tried to bind your queue but I failed with: {err}" - logger.error(msg) - raise pika.exceptions.ConnectionClosedByBroker(503, msg) + bind_result = self._channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key, arguments=arguments) + if self.verbose: + logger.info(f"The queue is bound to exchange successfully: queue={queue}, exchange={exchange}, routing_key={routing_key}, result={bind_result}") + return bind_result + def __ssl_setup(self) -> Dict[str, str]: """__ssl_setup is private method we are using to connect with rabbit server via signed certificates and some TLS settings. @@ -292,7 +284,6 @@ def consume_messages_with_retries( logger.error(f"Connection closed with error: {e}") self._channel.stop_consuming() - @retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=5, delay=1, jitter=(0, 10), logger=logger) def start_consumer( self, queue: str, @@ -527,7 +518,6 @@ def start_concurrence_consumer( [callback_with_delivery_info] * total_threads, ) - @retry((pika.exceptions.UnroutableError, pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker), tries=5, delay=1, jitter=(0, 10), logger=logger) def publish_message( self, exchange: str, @@ -587,9 +577,91 @@ def publish_dead_letter(self, message: str, delivery_tag: int, dead_letters_exch self.publish_message(exchange=dead_letters_exchange, routing_key=dead_letters_routing_key, message=json.dumps(message), properties=prop) logger.info(f"Dead letter was published: message={message}, exchange={dead_letters_exchange}, routing_key={dead_letters_routing_key}") return True - except pika.exceptions.UnroutableError: - logger.error("Dead letter was returned") + except pika.exceptions.UnroutableError as e: + logger.error(f"Dead letter was returned with error: {e}") return False + + @retry((gaierror, pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=15, delay=1, jitter=(2, 10), logger=logger) + def full_setup( + self, + exchange: str = None, + exchange_type: str = None, + arguments: Dict[str, str] = None, + routing_key: str = None, + queue: str = None, + callback: Callable = None, + requeue: bool = False, + callback_with_delivery_info: bool = False, + auto_ack: bool = False, + ) -> None: + """ + Sets up the connection, exchange, queue, and consumer for interacting with a RabbitMQ server. + + This method configures the connection to the RabbitMQ server and sets up the required messaging + components such as exchange, queue, and consumer. It also handles retries in case of connection failures. + + Parameters + ---------- + exchange : str, optional + The name of the exchange to declare. If `None`, no exchange will be declared. Default is `None`. + exchange_type : str, optional + The type of exchange to declare (e.g., 'direct', 'topic', 'fanout', 'headers'). Required if `exchange` is specified. + Default is `None`. + arguments : Dict[str, str], optional + A dictionary of additional arguments to pass when declaring the exchange or queue. Default is `None`. + routing_key : str, optional + The routing key to bind the queue to the exchange. This is used to determine which messages go to which queue. + Default is `None`. + queue : str, optional + The name of the queue to declare. If `None`, a randomly named queue will be created. Default is `None`. + callback : Callable, optional + A callback function to be executed when a message is received. The function should accept the message as a parameter. + Default is `None`. + requeue : bool, optional + If `True`, failed messages will be requeued. This is used in cases where you want to retry processing a message later. + Default is `False`. + callback_with_delivery_info : bool, optional + If `True`, the callback function will receive additional delivery information (e.g., delivery tag, redelivered flag). + Default is `False`. + auto_ack : bool, optional + If `True`, messages will be automatically acknowledged as soon as they are delivered to the consumer. + If `False`, messages need to be manually acknowledged. Default is `False`. + + Returns + ------- + None + This function does not return any value. It performs the setup and starts consuming messages. + + Raises + ------ + pika.exceptions.AMQPConnectionError + Raised if the connection to the RabbitMQ server fails after multiple retry attempts. + pika.exceptions.ChannelClosedByBroker + Raised if the channel is closed by the broker for some reason. + pika.exceptions.ConnectionClosedByBroker + Raised if the connection is closed by the broker. + + Example + ------- + >>> major_setup( + exchange='my_exchange', + exchange_type='direct', + routing_key='my_routing_key', + queue='my_queue', + callback=my_callback_function + ) + """ + self.connect_to_server() + self.setup_exchange(exchange=exchange, exchange_type=exchange_type, arguments=arguments) + self.setup_queue(queue=queue) + self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key) + self.start_consumer( + queue=queue, + callback=callback, + requeue=requeue, + callback_with_delivery_info=callback_with_delivery_info, + auto_ack=auto_ack, + ) @@ -621,31 +693,50 @@ def test_callback( # Example test print('\n\033[1;35;40m Start NeoCowboy Service \033[0m') - mrsal: Mrsal = Mrsal( + # mrsal: Mrsal = Mrsal( + # host=config.RABBIT_DOMAIN, + # port=config.RABBITMQ_PORT_TLS, + # credentials=(config.RABBITMQ_CREDENTIALS), + # virtual_host=config.V_HOST, + # ssl=True, + # verbose=True, + # ) + + # mrsal.connect_to_server() + + # exch_result: pika.frame.Method = mrsal.setup_exchange( + # exchange="exchangeRT", + # exchange_type='x-delayed-message', + # arguments={'x-delayed-type': 'x-delayed-message'}, + # ) + # q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue") + # qb_result: pika.frame.Method = mrsal.setup_queue_binding( + # exchange="exchangeRT", + # routing_key="exchangeRT.mrsal_testQueue", + # queue="mrsal_testQueue", + # ) + + # mrsal.start_consumer( + # queue="mrsal_testQueue", + # callback=test_callback, + # requeue=False, + # callback_with_delivery_info=True, + # auto_ack=True, + # ) + + + Mrsal = Mrsal( host=config.RABBIT_DOMAIN, port=config.RABBITMQ_PORT_TLS, credentials=(config.RABBITMQ_CREDENTIALS), virtual_host=config.V_HOST, ssl=True, verbose=True, - ) - - mrsal.connect_to_server() - - exch_result: pika.frame.Method = mrsal.setup_exchange( - exchange="exchangeRT", + ).major_setup( + exchange='exchangeRT', exchange_type='x-delayed-message', - arguments={'x-delayed-type': 'x-delayed-message'}, - ) - q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue") - qb_result: pika.frame.Method = mrsal.setup_queue_binding( - exchange="exchangeRT", - routing_key="exchangeRT.mrsal_testQueue", - queue="mrsal_testQueue", - ) - - mrsal.start_consumer( - queue="mrsal_testQueue", + routing_key='exchangeRT.mrsal_testQueue', + queue='mrsal_testQueue', callback=test_callback, requeue=False, callback_with_delivery_info=True, diff --git a/pyproject.toml b/pyproject.toml index 981c5c8..fb18135 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "" maintainers = ["Raafat ", "Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "0.7.3-alpha" +version = "0.7.6-alpha" [tool.poetry.dependencies] colorlog = "^6.7.0"