Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinRovang committed Sep 11, 2024
1 parent 02a083c commit e9de625
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 46 deletions.
78 changes: 33 additions & 45 deletions mrsal/mrsal.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def setup_exchange(self, exchange: str, exchange_type: str, arguments: Dict[str,
:param bool internal: Can only be published to by other exchanges
:param dict arguments: Custom key/value pair arguments for the exchange
:returns: Method frame from the Exchange.Declare-ok response
:rtype: `pika.frame.Method` having `method` attribute of type `spec.Exchange.DeclareOk`
:rtype: `pika.frame.Method` having `method` attribute of type `spec.Exchange.DeclareOk`@
"""
exchange_declare_info = f"""
exchange={exchange},
Expand Down Expand Up @@ -583,7 +583,13 @@ def publish_dead_letter(self, message: str, delivery_tag: int, dead_letters_exch
logger.error(f"Dead letter was returned with error: {e}")
return False

@retry((gaierror, pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=15, delay=1, jitter=(2, 10), logger=logger)
@retry((
gaierror,
pika.exceptions.AMQPConnectionError,
pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.ChannelClosedByBroker,
), tries=15, delay=1, jitter=(2, 10), logger=logger)
def full_setup(
self,
exchange: str = None,
Expand Down Expand Up @@ -653,17 +659,30 @@ def full_setup(
callback=my_callback_function
)
"""
self.connect_to_server()
self.setup_exchange(exchange=exchange, exchange_type=exchange_type, arguments=arguments)
self.setup_queue(queue=queue)
self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key)
self.start_consumer(
queue=queue,
callback=callback,
requeue=requeue,
callback_with_delivery_info=callback_with_delivery_info,
auto_ack=auto_ack,
)
try:
self.connect_to_server()
self.setup_exchange(exchange=exchange, exchange_type=exchange_type, arguments=arguments)
self.setup_queue(queue=queue)
self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key)
self.start_consumer(
queue=queue,
callback=callback,
requeue=requeue,
callback_with_delivery_info=callback_with_delivery_info,
auto_ack=auto_ack,
)
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"Failed to connect to RabbitMQ server: {e}")
raise pika.exceptions.AMQPConnectionError(503, str(e))
except pika.exceptions.ChannelClosedByBroker as e:
logger.error(f"Channel is closed by the broker: {e}")
raise pika.exceptions.ChannelClosedByBroker(404, str(e))
except pika.exceptions.ConnectionClosedByBroker as e:
logger.error(f"Connection is closed by the broker: {e}")
raise pika.exceptions.ConnectionClosedByBroker(503, str(e))
except Exception as e:
logger.error(f"Failed to setup RabbitMQ: {e}")
raise e



Expand Down Expand Up @@ -696,37 +715,6 @@ def test_callback(

# Example test
print('\n\033[1;35;40m Start NeoCowboy Service \033[0m')
# mrsal: Mrsal = Mrsal(
# host=config.RABBIT_DOMAIN,
# port=config.RABBITMQ_PORT_TLS,
# credentials=(config.RABBITMQ_CREDENTIALS),
# virtual_host=config.V_HOST,
# ssl=True,
# verbose=True,
# )

# mrsal.connect_to_server()

# exch_result: pika.frame.Method = mrsal.setup_exchange(
# exchange="exchangeRT",
# exchange_type='x-delayed-message',
# arguments={'x-delayed-type': 'x-delayed-message'},
# )
# q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue")
# qb_result: pika.frame.Method = mrsal.setup_queue_binding(
# exchange="exchangeRT",
# routing_key="exchangeRT.mrsal_testQueue",
# queue="mrsal_testQueue",
# )

# mrsal.start_consumer(
# queue="mrsal_testQueue",
# callback=test_callback,
# requeue=False,
# callback_with_delivery_info=True,
# auto_ack=True,
# )


Mrsal = Mrsal(
host=config.RABBIT_DOMAIN,
Expand All @@ -737,7 +725,7 @@ def test_callback(
verbose=True,
).full_setup(
exchange='exchangeRT',
exchange_type='x-delayed-message',
exchange_type='topic',
routing_key='exchangeRT.mrsal_testQueue',
queue='mrsal_testQueue',
callback=test_callback,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = ""
maintainers = ["Raafat <rafatzahran90@gmail.com>", "Jon E Nesvold <jnesvold@pm.me>"]
name = "mrsal"
readme = "README.md"
version = "0.7.6-alpha"
version = "0.7.7a"

[tool.poetry.dependencies]
colorlog = "^6.7.0"
Expand Down

0 comments on commit e9de625

Please sign in to comment.