Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async #40

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# MRSAL AMQP
[![Release](https://img.shields.io/badge/release-1.0.9-etalue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10](https://img.shields.io/badge/python-3.10--3.11--3.12-blue.svg)](https://www.python.org/downloads/release/python-3103/)[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)
[![Release](https://img.shields.io/badge/release-1.0.9-etalue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10](https://img.shields.io/badge/python-3.10--3.11--3.12lue.svg)](https://www.python.org/downloads/release/python-3103/)[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)


## Intro
Expand Down
236 changes: 146 additions & 90 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from functools import partial
import pika
import json
from mrsal.exceptions import MrsalAbortedSetup
from logging import WARNING
from pika.connection import SSLOptions
from pika.exceptions import (
AMQPConnectionError,
ChannelClosedByBroker,
Expand All @@ -12,8 +10,8 @@
NackError,
UnroutableError
)
from pika.adapters.asyncio_connection import AsyncioConnection
from typing import Any, Callable, Type
from aio_pika import connect_robust, Channel as AioChannel
from typing import Callable, Type
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, before_sleep_log
from pydantic import ValidationError
from pydantic.dataclasses import dataclass
Expand All @@ -25,22 +23,15 @@
log = NeoLogger(__name__, rotate_days=config.LOG_DAYS)

@dataclass
class MrsalAMQP(Mrsal):
class MrsalBlockingAMQP(Mrsal):
"""
:param int blocked_connection_timeout: blocked_connection_timeout
is the timeout, in seconds,
for the connection to remain blocked; if the timeout expires,
the connection will be torn down during connection tuning.
"""
blocked_connection_timeout: int = 60 # sec
use_blocking: bool = False

def get_ssl_context(self) -> SSLOptions | None:
if self.ssl:
self.log.info("Setting up TLS connection")
context = self._ssl_setup()
ssl_options = pika.SSLOptions(context, self.host) if 'context' in locals() else None
return ssl_options

def setup_blocking_connection(self) -> None:
"""We can use setup_blocking_connection for establishing a connection to RabbitMQ server specifying connection parameters.
Expand Down Expand Up @@ -69,7 +60,7 @@ def setup_blocking_connection(self) -> None:
pika.ConnectionParameters(
host=self.host,
port=self.port,
ssl_options=self.get_ssl_context(),
ssl_options=self.get_ssl_context(async_conn=False),
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=self.heartbeat,
Expand All @@ -89,53 +80,6 @@ def setup_blocking_connection(self) -> None:
except Exception as e:
self.log.error(f"Unexpected error caught: {e}")

def setup_async_connection(self) -> None:
"""We can use setup_aync_connection for establishing a connection to RabbitMQ server specifying connection parameters.
The connection is async and is recommended to use if your app is realtime or will handle a lot of traffic.

Parameters
----------
context : Dict[str, str]
context is the structured map with information regarding the SSL options for connecting with rabbit server via TLS.
"""
connection_info = f"""
Mrsal connection parameters:
host={self.host},
virtual_host={self.virtual_host},
port={self.port},
heartbeat={self.heartbeat},
ssl={self.ssl}
"""
if self.verbose:
self.log.info(f"Establishing connection to RabbitMQ on {connection_info}")
credentials = pika.PlainCredentials(*self.credentials)
conn_conf = pika.ConnectionParameters(
host=self.host,
port=self.port,
ssl_options=self.get_ssl_context(),
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=self.heartbeat,
)
try:
self._connection = AsyncioConnection(
parameters=conn_conf,
on_open_callback=partial(
self.on_connection_open,
exchange_name=self.exchange_name, queue_name=self.queue_name,
exchange_type=self.exchange_type, routing_key=self.routing_key
),
on_open_error_callback=self.on_connection_error
)
self.log.info(f"Connection staged with RabbitMQ on {connection_info}")
except (AMQPConnectionError, ChannelClosedByBroker, ConnectionClosedByBroker, StreamLostError) as e:
self.log.error(f"Oh lordy lord I failed connecting to the Rabbit with: {e}")
raise
except Exception as e:
self.log.error(f"Unexpected error caught: {e}")



@retry(
retry=retry_if_exception_type((
AMQPConnectionError,
Expand All @@ -157,7 +101,8 @@ def start_consumer(self,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None,
payload_model: Type | None = None
payload_model: Type | None = None,
requeue: bool = True
) -> None:
"""
Start the consumer using blocking setup.
Expand All @@ -168,63 +113,48 @@ def start_consumer(self,
:param callback_args: Optional arguments to pass to the callback.
"""
# Connect and start the I/O loop
if self.use_blocking:
self.setup_blocking_connection()
else:
# set connection parameters
# parametes propagate through a 3 layers in order
# to spin up the async connection
self.queue_name = queue_name
self.exchange_name = exchange_name
self.exchange_type = exchange_type
self.routing_key = routing_key
self.auto_declare = auto_declare

self.setup_async_connection()
if self._connection.is_open:
self.log.success(f"Boom! Async connection established with {exchange_name} on {queue_name}")
self._connection.ioloop.run_forever()
else:
self.log.error('Straigh out of the swamp with no connection! Async connection did not activate')

if auto_declare and self.use_blocking:
self.setup_blocking_connection()

if auto_declare:
if None in (exchange_name, queue_name, exchange_type, routing_key):
raise TypeError('Make sure that you are passing in all the necessary args for auto_declare')

self._setup_exchange_and_queue(
exchange_name=exchange_name,
queue_name=queue_name,
exchange_type=exchange_type,
routing_key=routing_key
)

if not self.auto_declare_ok:
if self._connection.is_open:
self._connection.ioloop.stop()
raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted')

self.log.info(f"Consumer boi listening on queue: {queue_name} to the exchange {exchange_name}. Waiting for messages...")
self.log.info(f"Straigh out of the swamps -- consumer boi listening on queue: {queue_name} to the exchange {exchange_name}. Waiting for messages...")

try:
for method_frame, properties, body in self._channel.consume(
queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout):
if method_frame:
if properties:
app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given'
msg_id = properties.msg_id if hasattr(properties, 'msg_id') else 'no msgID given'
msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given'

if self.verbose:
self.log.info(
"""
f"""
Message received with:
- Method Frame: {method_frame)
- Method Frame: {method_frame}
- Redelivery: {method_frame.redelivered}
- Exchange: {method_frame.exchange}
- Routing Key: {method_frame.routing_key}
- Delivery Tag: {method_frame.delivery_tag}
- Properties: {properties}
- Requeue: {requeue}
- Auto Ack: {auto_ack}
"""
)
if auto_ack:
self.log.info(f'I successfully received a message from: {app_id} with messageID: {msg_id}')
self.log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}')

if payload_model:
try:
Expand All @@ -243,13 +173,13 @@ def start_consumer(self,
callback( method_frame, properties, body)
except Exception as e:
if not auto_ack:
self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)
self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=requeue)
self.log.error("Callback method failure: {e}")
continue

if not auto_ack:
self.log.success(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken')
self._channel.basic_ack(delivery_tag=method_frame.delivery_tag)

except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e:
log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}")
raise
Expand Down Expand Up @@ -318,3 +248,129 @@ def publish_message(
except Exception as e:
self.log.error(f"Unexpected error while publishing message: {e}")



class MrsalAsyncAMQP(Mrsal):
"""Handles asynchronous connection with RabbitMQ using aio-pika."""
async def setup_async_connection(self):
"""Setup an asynchronous connection to RabbitMQ using aio-pika."""
self.log.info(f"Establishing async connection to RabbitMQ on {self.host}:{self.port}")
try:
self._connection = await connect_robust(
host=self.host,
port=self.port,
login=self.credentials[0],
password=self.credentials[1],
virtualhost=self.virtual_host,
ssl=self.ssl,
ssl_context=self.get_ssl_context(),
heartbeat=self.heartbeat
)
self._channel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=self.prefetch_count)
self.log.info("Async connection established successfully.")
except (AMQPConnectionError, StreamLostError, ChannelClosedByBroker, ConnectionClosedByBroker) as e:
self.log.error(f"Error establishing async connection: {e}", exc_info=True)
raise
except Exception as e:
self.log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True)

@retry(
retry=retry_if_exception_type((
AMQPConnectionError,
ChannelClosedByBroker,
ConnectionClosedByBroker,
StreamLostError,
)),
stop=stop_after_attempt(3),
wait=wait_fixed(2),
before_sleep=before_sleep_log(log, WARNING)
)
async def start_consumer(
self,
queue_name: str,
callback: Callable | None = None,
callback_args: dict[str, str | int | float | bool] | None = None,
auto_ack: bool = False,
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None,
payload_model: Type | None = None,
requeue: bool = True
):
"""Start the async consumer with the provided setup."""
# Check if there's a connection; if not, create one
if not self._connection:
await self.setup_async_connection()


self._channel: AioChannel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=self.prefetch_count)

if auto_declare:
if None in (exchange_name, queue_name, exchange_type, routing_key):
raise TypeError('Make sure that you are passing in all the necessary args for auto_declare')

queue = await self._async_setup_exchange_and_queue(
exchange_name=exchange_name,
queue_name=queue_name,
exchange_type=exchange_type,
routing_key=routing_key
)

if not self.auto_declare_ok:
if self._connection:
await self._connection.close()
raise MrsalAbortedSetup('Auto declaration failed during setup.')

self.log.info(f"Straight out of the swamps -- Consumer boi listening on queue: {queue_name}, exchange: {exchange_name}")

# async with queue.iterator() as queue_iter:
async for message in queue.iterator():
if message is None:
continue

# Extract message metadata
app_id = message.app_id if hasattr(message, 'app_id') else 'NoAppID'
msg_id = message.app_id if hasattr(message, 'message_id') else 'NoMsgID'

if self.verbose:
self.log.info(f"""
Message received with:
- Redelivery: {message.redelivered}
- Exchange: {message.exchange}
- Routing Key: {message.routing_key}
- Delivery Tag: {message.delivery_tag}
- Requeue: {requeue}
- Auto Ack: {auto_ack}
""")

if auto_ack:
await message.ack()
self.log.info(f'I successfully received a message from: {app_id} with messageID: {msg_id}')

if payload_model:
try:
self.validate_payload(message.body, payload_model)
except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e:
self.log.error(f"Payload validation failed: {e}", exc_info=True)
if not auto_ack:
await message.reject(requeue=requeue)
continue

if callback:
try:
if callback_args:
await callback(*callback_args, message)
else:
await callback(message)
except Exception as e:
self.log.error(f"Splæt! Error processing message with callback: {e}", exc_info=True)
if not auto_ack:
await message.reject(requeue=requeue)
continue

if not auto_ack:
await message.ack()
self.log.success(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.')
Loading