Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major Cleanup #31

Merged
merged 22 commits into from
Sep 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
yia bish
JonNesvold committed Sep 12, 2024
commit 4ff4777c37299656d8cbc71f151dfaeaac7a0726
129 changes: 98 additions & 31 deletions mrsal/amqp/blocking.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import pika
from typing import Callable, Any
from pika import SSLOptions
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, StreamLostError, ConnectionClosedByBroker, UnroutableError
from pika.adapters.asyncio_connection import AsyncioConnection
from typing import Callable, Any, Optional, Type
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, before_sleep_log
from pydantic import ValidationError
from pydantic.dataclasses import dataclass
from mrsal.mrsal import Mrsal
from neolibrary.monitoring.logger import NeoLogger
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, before_sleep_log

from mrsal.mrsal import Mrsal
from pydantic.deprecated.tools import json

log = NeoLogger(__name__, log_days=10)

@@ -60,12 +64,13 @@ def setup_connection(self, context: dict[str, str] | None = None):
blocked_connection_timeout=self.blocked_connection_timeout,
)
)

self._channel = self._connection.channel()
# Note: prefetch is set to 1 here as an example only.
# In production you will want to test with different prefetch values to find which one provides the best performance and usability for your solution.
# use a high number of prefecth if you think the pods with Mrsal installed can handle it. A prefetch 4 will mean up to 4 async runs before ack is required
self._channel.basic_qos(prefetch_count=self.prefetch_count)
self.log.info(f"Connection established with RabbitMQ on {connection_info}")
self.log.info(f"Boom! Connection established with RabbitMQ on {connection_info}")
except (pika.exceptions.AMQPConnectionError, Exception) as err:
self.log.error(f"I tried to connect with the RabbitMQ server but failed with: {err}")

@@ -79,7 +84,8 @@ def start_consumer(self,
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None
routing_key: str | None = None,
payload_model: Type | None = None
):
"""
Start the consumer using blocking setup.
@@ -88,10 +94,11 @@ def start_consumer(self,
:param inactivity_timeout: Timeout for inactivity in the consumer loop.
:param callback: The callback function to process messages.
:param callback_args: Optional arguments to pass to the callback.
:param payload_model: Optional pydantic BaseModel class that specifies expected payload arg types
"""
if auto_declare:
if None in (exchange_name, queue_name, exchange_type, routing_key):
raise TypeError('Make sure that you are passing in all the necessary args for auto_declare')
raise TypeError('Make sure that you are passing in all the necessary args for auto_declare, yia bish')

self._setup_exchange_and_queue(
exchange_name=exchange_name,
@@ -122,18 +129,65 @@ def start_consumer(self,
if auto_ack:
self.log.success(f'I successfully received a message from: {app_id} with messageID: {msg_id}')

if payload_model:
try:
self.validate_payload(body, payload_model)
except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e:
self.log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}")
continue
if callback:
if callback_args:
callback(*callback_args, method_frame, properties, body)
else:

callback( method_frame, properties, body)
else:
# continue consuming
continue
except Exception as e:
self.log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}')

def publish_message(
self,
exchange_name: str,
routing_key: str,
message: Any,
exchange_type: str,
queue_name: str,
auto_declare: bool = True,
prop: pika.BasicProperties | None = None,
):
"""Publish message to the exchange specifying routing key and properties.

:param str exchange: The exchange to publish to
:param str routing_key: The routing key to bind on
:param bytes body: The message body; empty string if no body
:param pika.spec.BasicProperties properties: message properties
:param bool fast_setup:
- when True, will the method create the specified exchange, queue and bind them together using the routing kye.
- If False, this method will check if the specified exchange and queue already exist before publishing.

:raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`.
:raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`.
"""
if auto_declare:
if None in (exchange_name, queue_name, exchange_type, routing_key):
raise TypeError('Make sure that you are passing in all the necessary args for auto_declare')

self._setup_exchange_and_queue(
exchange_name=exchange_name,
queue_name=queue_name,
exchange_type=exchange_type,
routing_key=routing_key
)
try:
# Publish the message by serializing it in json dump
# NOTE! we are not dumping a json anymore here! This allows for more flexibility
self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=prop)
self.log.success(f"The message ({message}) is published to the exchange {exchange_name} with the routing key {routing_key}")

except UnroutableError as e:
self.log.error(f"Producer could not publish message:{message} to the exchange {exchange_name} with a routing key {routing_key}: {e}", exc_info=True)


@dataclass
class MrsalAsyncAMQP(Mrsal):
@@ -172,32 +226,38 @@ async def setup_aync_connection(self, context: dict[str, str] | None = None):
context = self._ssl_setup()
ssl_options = SSLOptions(context, self.host) if context else None
credentials = pika.PlainCredentials(*self.credentials)
await AsyncioConnection.create_connection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
ssl_options=ssl_options,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=self.heartbeat,
),
on_done=self.on_connection_open,
on_open_error_callback=self.on_connection_error
)
self.log.info(f"Connection established with RabbitMQ on {connection_info}")

try:
await AsyncioConnection.create_connection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
ssl_options=ssl_options,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=self.heartbeat,
),
on_done=self.on_connection_open,
on_open_error_callback=self.on_connection_error
)
except Exception as e:
self.log.error(f"Oh lordy lord I failed connecting to the Rabbit with: {e}")

self.log.success(f"Boom! Connection established with RabbitMQ on {connection_info}")


async def start_consumer(self,
queue_name: str,
callback: Callable | None = None,
callback_args: dict[str, str | int | float | bool] | None = None,
auto_ack: bool = True,
inactivity_timeout: int = 5,
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None
):
queue_name: str,
callback: Callable | None = None,
callback_args: dict[str, str | int | float | bool] | None = None,
auto_ack: bool = True,
inactivity_timeout: int = 5,
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None,
payload_model: Type | None = None
):
"""
Start the consumer using blocking setup.
:param queue: The queue to consume from.
@@ -239,6 +299,13 @@ async def start_consumer(self,
if auto_ack:
self.log.success(f'I successfully received a message from: {app_id} with messageID: {msg_id}')

if payload_model:
try:
self.validate_payload(body, payload_model)
except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e:
self.log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}")
continue

if callback:
if callback_args:
await callback(*callback_args, method_frame, properties, body)
@@ -287,7 +354,7 @@ async def publish_message(
# Publish the message by serializing it in json dump
# NOTE! we are not dumping a json anymore here! This allows for more flexibility
self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=prop)
self.log.info(f"Message ({message}) is published to the exchange {exchange_name} with a routing key {routing_key}")
self.log.success(f"The message ({message}) is published to the exchange {exchange_name} with the following routing key {routing_key}")

except UnroutableError as e:
self.log.error(f"Producer could not publish message:{message} to the exchange {exchange_name} with a routing key {routing_key}: {e}", exc_info=True)
27 changes: 22 additions & 5 deletions mrsal/mrsal.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import json
import os
import ssl
from typing import Any, Optional, Type
from mrsal.amqp.blocking import MrsalBlockingAMQP
import pika
import asyncio
from pydantic.dataclasses import dataclass
from typing import Any
from pika.exceptions import ChannelClosedByBroker, ConnectionClosedByBroker
from pika.exchange_type import ExchangeType
from pika.adapters.asyncio_connection import AsyncioConnection
from neolibrary.monitoring.logger import NeoLogger
from config.exceptions import MissingTLSCerts
from pydantic.deprecated.tools import json


@dataclass
@@ -254,3 +251,23 @@ def _ssl_setup(self) -> dict[str, str]:
context.load_cert_chain(certfile=self.tls_crt, keyfile=self.tls_key)
return context

def validate_payload(self, payload: Any, model: Type) -> None:
"""
Parses and validates the incoming message payload using the provided dataclass model.
:param payload: The message payload which could be of any type (str, bytes, dict, etc.).
:param model: The pydantic dataclass model class to validate against.
:return: An instance of the model if validation is successful, otherwise None.
"""
# If payload is bytes, decode it to a string
if isinstance(payload, bytes):
payload = payload.decode('utf-8')

# If payload is a string, attempt to load it as JSON
if isinstance(payload, str):
payload = json.loads(payload) # Converts JSON string to a dictionary

# Validate the payload against the provided model
if isinstance(payload, dict):
model(**payload)
else:
raise TypeError("Fool, we aint supporting this type yet {type(payload)}.. Bytes or str -- get it straight")