Skip to content

Commit

Permalink
function name change
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinRovang committed Aug 29, 2024
1 parent 3210fb0 commit a4ff2b2
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 43 deletions.
175 changes: 133 additions & 42 deletions mrsal/mrsal.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time
import pika
from pika import SSLOptions
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, ChannelWrongStateError, ConnectionClosedByBroker
from pika.exceptions import ChannelClosedByBroker, ConnectionClosedByBroker
from pika.exchange_type import ExchangeType
from retry import retry

Expand Down Expand Up @@ -52,7 +52,6 @@ class Mrsal:
_connection: pika.BlockingConnection = None
_channel = None

@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, TypeError, gaierror), tries=5, delay=1, jitter=(0, 10), logger=logger)
def connect_to_server(self, context: Dict[str, str] = None):
"""We can use connect_to_server for establishing a connection to RabbitMQ server specifying connection parameters.
Expand Down Expand Up @@ -164,15 +163,12 @@ def setup_queue(self, queue: str, arguments: Dict[str, str] = None, durable: boo
"""
if self.verbose:
logger.info(f"Declaring queue with: {queue_declare_info}")
try:
queue_declare_result = self._channel.queue_declare(queue=queue, arguments=arguments, durable=durable, exclusive=exclusive, auto_delete=auto_delete, passive=passive)
if self.verbose:
logger.info(f"Queue is declared successfully: {queue_declare_info},result={queue_declare_result.method}")
return queue_declare_result
except (ChannelClosedByBroker, ChannelWrongStateError) as err:
msg: str = f"I tried to setup a queue but failed with: {err}"
logger.error(msg)
raise pika.exceptions.ConnectionClosedByBroker(503, msg)

queue_declare_result = self._channel.queue_declare(queue=queue, arguments=arguments, durable=durable, exclusive=exclusive, auto_delete=auto_delete, passive=passive)
if self.verbose:
logger.info(f"Queue is declared successfully: {queue_declare_info},result={queue_declare_result.method}")
return queue_declare_result


def setup_queue_binding(self, exchange: str, queue: str, routing_key: str = None, arguments=None):
"""Bind queue to exchange.
Expand All @@ -187,16 +183,12 @@ def setup_queue_binding(self, exchange: str, queue: str, routing_key: str = None
"""
if self.verbose:
logger.info(f"Binding queue to exchange: queue={queue}, exchange={exchange}, routing_key={routing_key}")
try:
bind_result = self._channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key, arguments=arguments)
if self.verbose:
logger.info(f"The queue is bound to exchange successfully: queue={queue}, exchange={exchange}, routing_key={routing_key}, result={bind_result}")
return bind_result
except pika.exceptions.ChannelClosedByBroker as err:
msg: str = f"I tried to bind your queue but I failed with: {err}"
logger.error(msg)
raise pika.exceptions.ConnectionClosedByBroker(503, msg)

bind_result = self._channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key, arguments=arguments)
if self.verbose:
logger.info(f"The queue is bound to exchange successfully: queue={queue}, exchange={exchange}, routing_key={routing_key}, result={bind_result}")
return bind_result

def __ssl_setup(self) -> Dict[str, str]:
"""__ssl_setup is private method we are using to connect with rabbit server via signed certificates and some TLS settings.
Expand Down Expand Up @@ -292,7 +284,6 @@ def consume_messages_with_retries(
logger.error(f"Connection closed with error: {e}")
self._channel.stop_consuming()

@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=5, delay=1, jitter=(0, 10), logger=logger)
def start_consumer(
self,
queue: str,
Expand Down Expand Up @@ -527,7 +518,6 @@ def start_concurrence_consumer(
[callback_with_delivery_info] * total_threads,
)

@retry((pika.exceptions.UnroutableError, pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker), tries=5, delay=1, jitter=(0, 10), logger=logger)
def publish_message(
self,
exchange: str,
Expand Down Expand Up @@ -587,9 +577,91 @@ def publish_dead_letter(self, message: str, delivery_tag: int, dead_letters_exch
self.publish_message(exchange=dead_letters_exchange, routing_key=dead_letters_routing_key, message=json.dumps(message), properties=prop)
logger.info(f"Dead letter was published: message={message}, exchange={dead_letters_exchange}, routing_key={dead_letters_routing_key}")
return True
except pika.exceptions.UnroutableError:
logger.error("Dead letter was returned")
except pika.exceptions.UnroutableError as e:
logger.error(f"Dead letter was returned with error: {e}")
return False

@retry((gaierror, pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=15, delay=1, jitter=(2, 10), logger=logger)
def full_setup(
self,
exchange: str = None,
exchange_type: str = None,
arguments: Dict[str, str] = None,
routing_key: str = None,
queue: str = None,
callback: Callable = None,
requeue: bool = False,
callback_with_delivery_info: bool = False,
auto_ack: bool = False,
) -> None:
"""
Sets up the connection, exchange, queue, and consumer for interacting with a RabbitMQ server.
This method configures the connection to the RabbitMQ server and sets up the required messaging
components such as exchange, queue, and consumer. It also handles retries in case of connection failures.
Parameters
----------
exchange : str, optional
The name of the exchange to declare. If `None`, no exchange will be declared. Default is `None`.
exchange_type : str, optional
The type of exchange to declare (e.g., 'direct', 'topic', 'fanout', 'headers'). Required if `exchange` is specified.
Default is `None`.
arguments : Dict[str, str], optional
A dictionary of additional arguments to pass when declaring the exchange or queue. Default is `None`.
routing_key : str, optional
The routing key to bind the queue to the exchange. This is used to determine which messages go to which queue.
Default is `None`.
queue : str, optional
The name of the queue to declare. If `None`, a randomly named queue will be created. Default is `None`.
callback : Callable, optional
A callback function to be executed when a message is received. The function should accept the message as a parameter.
Default is `None`.
requeue : bool, optional
If `True`, failed messages will be requeued. This is used in cases where you want to retry processing a message later.
Default is `False`.
callback_with_delivery_info : bool, optional
If `True`, the callback function will receive additional delivery information (e.g., delivery tag, redelivered flag).
Default is `False`.
auto_ack : bool, optional
If `True`, messages will be automatically acknowledged as soon as they are delivered to the consumer.
If `False`, messages need to be manually acknowledged. Default is `False`.
Returns
-------
None
This function does not return any value. It performs the setup and starts consuming messages.
Raises
------
pika.exceptions.AMQPConnectionError
Raised if the connection to the RabbitMQ server fails after multiple retry attempts.
pika.exceptions.ChannelClosedByBroker
Raised if the channel is closed by the broker for some reason.
pika.exceptions.ConnectionClosedByBroker
Raised if the connection is closed by the broker.
Example
-------
>>> major_setup(
exchange='my_exchange',
exchange_type='direct',
routing_key='my_routing_key',
queue='my_queue',
callback=my_callback_function
)
"""
self.connect_to_server()
self.setup_exchange(exchange=exchange, exchange_type=exchange_type, arguments=arguments)
self.setup_queue(queue=queue)
self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key)
self.start_consumer(
queue=queue,
callback=callback,
requeue=requeue,
callback_with_delivery_info=callback_with_delivery_info,
auto_ack=auto_ack,
)



Expand Down Expand Up @@ -621,31 +693,50 @@ def test_callback(

# Example test
print('\n\033[1;35;40m Start NeoCowboy Service \033[0m')
mrsal: Mrsal = Mrsal(
# mrsal: Mrsal = Mrsal(
# host=config.RABBIT_DOMAIN,
# port=config.RABBITMQ_PORT_TLS,
# credentials=(config.RABBITMQ_CREDENTIALS),
# virtual_host=config.V_HOST,
# ssl=True,
# verbose=True,
# )

# mrsal.connect_to_server()

# exch_result: pika.frame.Method = mrsal.setup_exchange(
# exchange="exchangeRT",
# exchange_type='x-delayed-message',
# arguments={'x-delayed-type': 'x-delayed-message'},
# )
# q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue")
# qb_result: pika.frame.Method = mrsal.setup_queue_binding(
# exchange="exchangeRT",
# routing_key="exchangeRT.mrsal_testQueue",
# queue="mrsal_testQueue",
# )

# mrsal.start_consumer(
# queue="mrsal_testQueue",
# callback=test_callback,
# requeue=False,
# callback_with_delivery_info=True,
# auto_ack=True,
# )


Mrsal = Mrsal(
host=config.RABBIT_DOMAIN,
port=config.RABBITMQ_PORT_TLS,
credentials=(config.RABBITMQ_CREDENTIALS),
virtual_host=config.V_HOST,
ssl=True,
verbose=True,
)

mrsal.connect_to_server()

exch_result: pika.frame.Method = mrsal.setup_exchange(
exchange="exchangeRT",
).major_setup(
exchange='exchangeRT',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'x-delayed-message'},
)
q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue")
qb_result: pika.frame.Method = mrsal.setup_queue_binding(
exchange="exchangeRT",
routing_key="exchangeRT.mrsal_testQueue",
queue="mrsal_testQueue",
)

mrsal.start_consumer(
queue="mrsal_testQueue",
routing_key='exchangeRT.mrsal_testQueue',
queue='mrsal_testQueue',
callback=test_callback,
requeue=False,
callback_with_delivery_info=True,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = ""
maintainers = ["Raafat <rafatzahran90@gmail.com>", "Jon E Nesvold <jnesvold@pm.me>"]
name = "mrsal"
readme = "README.md"
version = "0.7.3-alpha"
version = "0.7.6-alpha"

[tool.poetry.dependencies]
colorlog = "^6.7.0"
Expand Down

0 comments on commit a4ff2b2

Please sign in to comment.