Skip to content

Commit

Permalink
logging, exceptions handling
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinRovang committed Aug 28, 2024
1 parent 87c7991 commit bf926a5
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 15 deletions.
2 changes: 1 addition & 1 deletion mrsal/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
V_HOST: str = os.environ.get("RABBITMQ_DEFAULT_VHOST", "myMrsalHost")
RABBITMQ_PORT: int = os.environ.get("RABBITMQ_PORT", 5672)
RABBITMQ_PORT_TLS: int = os.environ.get("RABBITMQ_PORT_TLS", 5671)
RABBIT_DOMAIN: str = os.environ.get("RABBITMQ_DOMAIN", "localhost")
RABBIT_DOMAIN: str = os.environ.get("RABBITMQ_DOMAIN", "rabbitmq.neomodels.app")

RABBITMQ_USER = os.environ.get("RABBITMQ_DEFAULT_USER", "root")
RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_DEFAULT_PASS", "password")
Expand Down
89 changes: 76 additions & 13 deletions mrsal/mrsal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import ssl
from dataclasses import dataclass
from socket import gaierror
from typing import Any, Callable, Dict, Tuple
from typing import Any, Callable, Dict, Tuple, Union, List

import time
import pika
from pika import SSLOptions
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, ChannelWrongStateError, ConnectionClosedByBroker
from pika.exchange_type import ExchangeType
from retry import retry

import mrsal.config.config as config
from mrsal.config import config
from loguru import logger
from mrsal.utils.utils import is_redelivery_configured
from mrsal.utils import utils


@dataclass
Expand Down Expand Up @@ -51,7 +52,7 @@ class Mrsal:
_connection: pika.BlockingConnection = None
_channel = None

@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, TypeError, gaierror), tries=2, delay=5, jitter=(1, 3), logger = logger)
@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, TypeError, gaierror), tries=5, delay=1, jitter=(0, 2), logger=logger)
def connect_to_server(self, context: Dict[str, str] = None):
"""We can use connect_to_server for establishing a connection to RabbitMQ server specifying connection parameters.
Expand Down Expand Up @@ -93,7 +94,7 @@ def connect_to_server(self, context: Dict[str, str] = None):
self._channel.basic_qos(prefetch_count=self.prefetch_count)
logger.info(f"Connection established with RabbitMQ on {connection_info}")
return self._connection
except AMQPConnectionError as err:
except pika.exceptions.AMQPConnectionError as err:
msg: str = f"I tried to connect with the RabbitMQ server but failed with: {err}"
logger.error(msg)
raise pika.exceptions.AMQPConnectionError(msg)
Expand Down Expand Up @@ -291,7 +292,7 @@ def consume_messages_with_retries(
logger.error(f"Connection closed with error: {e}")
self._channel.stop_consuming()

@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=2, delay=5, jitter=(1, 3), logger=logger)
@retry((pika.exceptions.AMQPConnectionError, pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.ChannelClosedByBroker), tries=5, delay=1, jitter=(0, 2), logger=logger)
def start_consumer(
self,
queue: str,
Expand Down Expand Up @@ -408,7 +409,7 @@ def start_consumer(
if not auto_ack and reject_unprocessed:
self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=requeue)
logger.info(f"{print_thread_index}Message coming from the app={app_id} with messageId={msg_id} is rejected.")
if is_redelivery_configured(properties):
if utils.is_redelivery_configured(properties):
msg_headers = properties.headers
x_retry = msg_headers[config.RETRY_KEY]
x_retry_limit = msg_headers[config.RETRY_LIMIT_KEY]
Expand All @@ -433,9 +434,10 @@ def start_consumer(
logger.warning(f"{print_thread_index}Given period of inactivity {inactivity_timeout} is exceeded. Cancel consumer.")
self.stop_consuming(self.consumer_tag)
self._channel.cancel()
except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, ValueError, TypeError):
logger.error(f"{print_thread_index}I lost the connection with the Mrsal.", exc_info=True)
pass
except pika.exceptions.ConnectionClosedByBroker as err:
logger.error(f"{print_thread_index}I lost the connection with the Mrsal. {err}", exc_info=True)
self._channel.cancel()
raise pika.exceptions.ConnectionClosedByBroker(503, str(err))
except KeyboardInterrupt:
logger(f"{print_thread_index}Stopping Mrsal consumption.")
self.stop_consuming(self.consumer_tag)
Expand Down Expand Up @@ -525,7 +527,7 @@ def start_concurrence_consumer(
[callback_with_delivery_info] * total_threads,
)

@retry((pika.exceptions.UnroutableError), tries=2, delay=5, jitter=(1, 3))
@retry((pika.exceptions.UnroutableError, pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker), tries=5, delay=1, jitter=(0, 2), logger=logger)
def publish_message(
self,
exchange: str,
Expand Down Expand Up @@ -560,7 +562,7 @@ def publish_message(
self.exchange_exist(exchange=exchange, exchange_type=exchange_type)
if queue is not None:
self.queue_exist(queue=queue)
except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker) as err:
except pika.exceptions.ChannelClosedByBroker as err:
logger.error(f"Failed to check active resources. Cancel consumer. {str(err)}")
self._channel.cancel()
raise pika.exceptions.ChannelClosedByBroker(404, str(err))
Expand All @@ -574,7 +576,7 @@ def publish_message(
return True
except pika.exceptions.UnroutableError as err1:
logger.error(f"Producer could not publish message:{message} to the exchange {exchange} with a routing key {routing_key}: {err1}", exc_info=True)
return False
raise pika.exceptions.UnroutableError(404, str(err1))

# TODO NOT IN USE: maybe we will use it in the method consume_messages_with_retries
# to publish messages to dead letters exchange after retries limit. (remove or use)
Expand All @@ -588,3 +590,64 @@ def publish_dead_letter(self, message: str, delivery_tag: int, dead_letters_exch
except pika.exceptions.UnroutableError:
logger.error("Dead letter was returned")
return False



if __name__ == "__main__":

# Main script testing

def test_callback(
method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes
) -> None:
consumer_tag = method.consumer_tag
exchange = method.exchange
routing_key = method.routing_key
app_id = properties.app_id
message_id = properties.message_id

# Decode and parse the message body
enc_payload: Dict[str, Union[str, int, List]] | str = json.loads(body)
payload = enc_payload if isinstance(enc_payload, dict) else json.loads(enc_payload)

print(f" [x] Received {payload}")

print("Simulating a long running process")
time.sleep(5)
print("Process completed")
return True

# Example test
print('\n\033[1;35;40m Start NeoCowboy Service \033[0m')
mrsal: Mrsal = Mrsal(
host=config.RABBIT_DOMAIN,
port=config.RABBITMQ_PORT_TLS,
credentials=(config.RABBITMQ_CREDENTIALS),
virtual_host=config.V_HOST,
ssl=True,
verbose=True,
)

mrsal.connect_to_server()

exch_result: pika.frame.Method = mrsal.setup_exchange(
exchange="exchangeRT",
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'x-delayed-message'},
)
q_result: pika.frame.Method = mrsal.setup_queue(queue="mrsal_testQueue")
qb_result: pika.frame.Method = mrsal.setup_queue_binding(
exchange="exchangeRT",
routing_key="exchangeRT.mrsal_testQueue",
queue="mrsal_testQueue",
)

mrsal.start_consumer(
queue="mrsal_testQueue",
callback=test_callback,
requeue=False,
callback_with_delivery_info=True,
auto_ack=True,
)
Empty file added mrsal/utils/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion mrsal/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pika

import mrsal.config.config as config
from mrsal.config import config


def is_redelivery_configured(msg_prop: pika.spec.BasicProperties):
Expand Down

0 comments on commit bf926a5

Please sign in to comment.