Skip to content

Commit

Permalink
Merge pull request #34 from NeoMedSys/ssl-patch
Browse files Browse the repository at this point in the history
tests fixed blæææææ
  • Loading branch information
JonNesvold authored Sep 17, 2024
2 parents 00d21b2 + d4337b7 commit afe1466
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 218 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# MRSAL AMQP
[![Release](https://img.shields.io/badge/release-1.0.3--beta-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10](https://img.shields.io/badge/python-3.10--3.11--3.12-blue.svg)](https://www.python.org/downloads/release/python-3103/)[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)
[![Release](https://img.shields.io/badge/release-1.0.3-etalue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10](https://img.shields.io/badge/python-3.10--3.11--3.12lue.svg)](https://www.python.org/downloads/release/python-3103/)[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)


## Intro
Expand Down
5 changes: 3 additions & 2 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ssl import SSLContext
from mrsal.exceptions import MrsalAbortedSetup
from logging import WARNING
from pika.connection import SSLOptions
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, StreamLostError, ConnectionClosedByBroker
from pika.adapters.asyncio_connection import AsyncioConnection
from typing import Callable, Type
Expand All @@ -27,11 +28,11 @@ class MrsalAMQP(Mrsal):
blocked_connection_timeout: int = 60 # sec
use_blocking: bool = False

def get_ssl_context(self) -> SSLContext | None:
def get_ssl_context(self) -> SSLOptions | None:
if self.ssl:
self.log.info("Setting up TLS connection")
context = self._ssl_setup()
ssl_options = pika.SSLOptions(context, self.host) if context else None
ssl_options = pika.SSLOptions(context, self.host) if 'context' in locals() else None
return ssl_options

def setup_blocking_connection(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = ""
maintainers = ["Raafat <rafatzahran90@gmail.com>", "Jon E Nesvold <jnesvold@pm.me>"]
name = "mrsal"
readme = "README.md"
version = "1.0.3b"
version = "1.0.4b"

[tool.poetry.dependencies]
colorlog = "^6.7.0"
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
'host': 'localhost',
'port': 5672,
'credentials': ('user', 'password'),
'virtual_host': 'testboi'
'virtual_host': 'testboi',
'prefetch_count': 1
}

@dataclass
Expand Down
215 changes: 2 additions & 213 deletions tests/test_blocking_mrsal.py
Original file line number Diff line number Diff line change
@@ -1,224 +1,13 @@
import os
import unittest
from unittest.mock import Mock, patch, MagicMock, call
from pika.exceptions import AMQPConnectionError, UnroutableError
from unittest.mock import patch
from pydantic import ValidationError

from mrsal.amqp.subclass import MrsalAMQP
from tenacity import RetryError
from tests.conftest import SETUP_ARGS, ExpectedPayload
from tests.conftest import SETUP_ARGS



class TestMrsalBlockingAMQP(unittest.TestCase):
@patch('mrsal.amqp.subclass.MrsalAMQP.setup_blocking_connection')
@patch('mrsal.amqp.subclass.pika.channel')
def setUp(self, mock_blocking_connection, mock_setup_connection):
# Set up mock behaviors for the connection and channel
self.mock_channel = MagicMock()
self.mock_connection = MagicMock()
self.mock_connection.channel.return_value = self.mock_channel
mock_blocking_connection.return_value = self.mock_connection

# Mock the setup_connection to simulate a successful connection setup
mock_setup_connection.return_value = None # Simulate setup_connection doing nothing (successful setup)

# Create an instance of BlockRabbit
self.consumer = MrsalAMQP(**SETUP_ARGS, use_blocking=True)
self.consumer._channel = self.mock_channel # Set the channel to the mocked one

@patch.object(MrsalAMQP, 'setup_blocking_connection')
def test_retry_on_connection_failure_blocking(self, mock_blocking_connection):
"""Test reconnection retries in blocking consumer mode."""

# Set up a mock callback function
mock_callback = Mock()

self.mock_channel.consume.side_effect = AMQPConnectionError("Connection lost")

with self.assertRaises(RetryError):
self.consumer.start_consumer(
queue_name='test_q',
exchange_name='test_x',
exchange_type='direct',
routing_key='test_route',
callback=mock_callback,
)

self.assertEqual(mock_blocking_connection.call_count, 3)

def test_valid_message_processing(self):
# Simulate a valid message
valid_body = b'{"id": 1, "name": "Test", "active": true}'
mock_method_frame = MagicMock()
mock_properties = MagicMock()

# Mock the consume method to yield a valid message
self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)]

# Set up a mock callback function
mock_callback = Mock()


# Start the consumer with the payload model and callback
self.consumer.start_consumer(
queue_name='test_q',
exchange_name='test_x',
exchange_type='direct',
routing_key='test_route',
callback=mock_callback,
payload_model=ExpectedPayload
)

# Assert the callback was called once with the correct data
mock_callback.assert_called_once_with(mock_method_frame, mock_properties, valid_body)

def test_valid_message_processing_no_autoack(self):
"""Test that a message is acknowledged on successful processing."""
valid_body = b'{"id": 1, "name": "Test", "active": true}'
mock_method_frame = MagicMock()
mock_method_frame.delivery_tag = 123
mock_properties = MagicMock()

self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)]
mock_callback = Mock()

self.consumer.start_consumer(
exchange_name="test_x",
exchange_type="direct",
queue_name="test_q",
routing_key="test_route",
callback=mock_callback,
payload_model=ExpectedPayload,
auto_ack=False
)

self.mock_channel.basic_ack.assert_called_once_with(delivery_tag=123)

def test_invalid_message_skipped(self):
# Simulate an invalid message that fails validation
invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}'
mock_method_frame = MagicMock()
mock_properties = MagicMock()

# Mock the consume method to yield an invalid message
self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, invalid_body)]

# Set up a mock callback function
mock_callback = Mock()

# Start the consumer with the payload model and callback
self.consumer.start_consumer(
queue_name='test_queue',
auto_ack=True,
exchange_name='test_x',
exchange_type='direct',
routing_key='test_route',
callback=mock_callback,
payload_model=ExpectedPayload
)

# Assert the callback was not called since the message should be skipped
mock_callback.assert_not_called()

def test_requeue_on_validation_failure(self):
# Simulate an invalid message that fails validation
invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}'
mock_method_frame = MagicMock()
mock_method_frame.delivery_tag = 123 # Set a delivery tag for nack
mock_properties = MagicMock()

# Mock the consume method to yield an invalid message
self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, invalid_body)]

# Start the consumer with the payload model
with patch.object(self.consumer._channel, 'basic_nack') as mock_nack:
self.consumer.start_consumer(
queue_name='test_q',
auto_ack=False, # Disable auto_ack to test nack behavior
exchange_name='test_x',
exchange_type='direct',
routing_key='test_route',
payload_model=ExpectedPayload
)

# Assert that basic_nack was called with requeue=True
mock_nack.assert_called_once_with(delivery_tag=123, requeue=True)

def test_publish_message(self):
"""Test that the message is correctly published to the exchange."""
# Mock the setup methods for auto declare
self.consumer._setup_exchange_and_queue = Mock()

# Mock the message to be published
message = b'{"data": "test_message"}'
exchange_name = 'test_x'
routing_key = 'test_route'

# Publish the message
self.consumer.publish_message(
exchange_name=exchange_name,
routing_key=routing_key,
message=message,
exchange_type='direct',
queue_name='test_q',
auto_declare=True
)

# Assert the setup was called
self.consumer._setup_exchange_and_queue.assert_called_once_with(
exchange_name=exchange_name,
queue_name='test_q',
exchange_type='direct',
routing_key=routing_key
)

# Assert the message was published correctly
self.mock_channel.basic_publish.assert_called_once_with(
exchange=exchange_name,
routing_key=routing_key,
body=message,
properties=None
)

def test_retry_on_unroutable_error(self):
"""Test that the publish_message retries 3 times when UnroutableError is raised."""
# Mock the setup methods for auto declare
self.consumer._setup_exchange_and_queue = Mock()

# Set up the message and parameters
message = "test_message"
exchange_name = 'test_x'
routing_key = 'test_route'
queue_name = 'test_q'

# Mock the basic_publish to raise UnroutableError
self.mock_channel.basic_publish.side_effect = UnroutableError("Message could not be routed")

# Atempt to publish the message
with self.assertRaises(RetryError):
self.consumer.publish_message(
exchange_name=exchange_name,
routing_key=routing_key,
message=message,
exchange_type='direct',
queue_name=queue_name,
auto_declare=True
)

# Assert that basic_publish was called 3 times due to retries
self.assertEqual(self.mock_channel.basic_publish.call_count, 3)
# Assert that "test_message" appears 3 times

# Assert the correct calls were made with the expected arguments
expected_call = call(
exchange=exchange_name,
routing_key=routing_key,
body=message,
properties=None
)
self.mock_channel.basic_publish.assert_has_calls([expected_call] * 3)

class TestBlockRabbitSSLSetup(unittest.TestCase):

def test_ssl_setup_with_valid_paths(self):
Expand Down
Loading

0 comments on commit afe1466

Please sign in to comment.