From 5484541020c6fa61ef3a2c68bb0d50c8093e06ce Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Mon, 16 Sep 2024 23:13:05 +0200 Subject: [PATCH 1/3] =?UTF-8?q?tests=20fixed=20bl=C3=A6=C3=A6=C3=A6=C3=A6?= =?UTF-8?q?=C3=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mrsal/amqp/subclass.py | 5 +- tests/conftest.py | 3 +- tests/test_blocking_mrsal.py | 215 +------------------------- tests/test_mrsal_blocking_nossl.py | 232 +++++++++++++++++++++++++++++ 4 files changed, 239 insertions(+), 216 deletions(-) create mode 100644 tests/test_mrsal_blocking_nossl.py diff --git a/mrsal/amqp/subclass.py b/mrsal/amqp/subclass.py index 7a993cc..f49ceaf 100644 --- a/mrsal/amqp/subclass.py +++ b/mrsal/amqp/subclass.py @@ -3,6 +3,7 @@ from ssl import SSLContext from mrsal.exceptions import MrsalAbortedSetup from logging import WARNING +from pika.connection import SSLOptions from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker, StreamLostError, ConnectionClosedByBroker from pika.adapters.asyncio_connection import AsyncioConnection from typing import Callable, Type @@ -27,11 +28,11 @@ class MrsalAMQP(Mrsal): blocked_connection_timeout: int = 60 # sec use_blocking: bool = False - def get_ssl_context(self) -> SSLContext | None: + def get_ssl_context(self) -> SSLOptions | None: if self.ssl: self.log.info("Setting up TLS connection") context = self._ssl_setup() - ssl_options = pika.SSLOptions(context, self.host) if context else None + ssl_options = pika.SSLOptions(context, self.host) if 'context' in locals() else None return ssl_options def setup_blocking_connection(self) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 6f91a37..da0e916 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,8 @@ 'host': 'localhost', 'port': 5672, 'credentials': ('user', 'password'), - 'virtual_host': 'testboi' + 'virtual_host': 'testboi', + 'prefetch_count': 1 } @dataclass diff --git a/tests/test_blocking_mrsal.py b/tests/test_blocking_mrsal.py index 596a278..7a437e3 100644 --- a/tests/test_blocking_mrsal.py +++ b/tests/test_blocking_mrsal.py @@ -1,224 +1,13 @@ import os import unittest -from unittest.mock import Mock, patch, MagicMock, call -from pika.exceptions import AMQPConnectionError, UnroutableError +from unittest.mock import patch from pydantic import ValidationError from mrsal.amqp.subclass import MrsalAMQP -from tenacity import RetryError -from tests.conftest import SETUP_ARGS, ExpectedPayload +from tests.conftest import SETUP_ARGS -class TestMrsalBlockingAMQP(unittest.TestCase): - @patch('mrsal.amqp.subclass.MrsalAMQP.setup_blocking_connection') - @patch('mrsal.amqp.subclass.pika.channel') - def setUp(self, mock_blocking_connection, mock_setup_connection): - # Set up mock behaviors for the connection and channel - self.mock_channel = MagicMock() - self.mock_connection = MagicMock() - self.mock_connection.channel.return_value = self.mock_channel - mock_blocking_connection.return_value = self.mock_connection - - # Mock the setup_connection to simulate a successful connection setup - mock_setup_connection.return_value = None # Simulate setup_connection doing nothing (successful setup) - - # Create an instance of BlockRabbit - self.consumer = MrsalAMQP(**SETUP_ARGS, use_blocking=True) - self.consumer._channel = self.mock_channel # Set the channel to the mocked one - - @patch.object(MrsalAMQP, 'setup_blocking_connection') - def test_retry_on_connection_failure_blocking(self, mock_blocking_connection): - """Test reconnection retries in blocking consumer mode.""" - - # Set up a mock callback function - mock_callback = Mock() - - self.mock_channel.consume.side_effect = AMQPConnectionError("Connection lost") - - with self.assertRaises(RetryError): - self.consumer.start_consumer( - queue_name='test_q', - exchange_name='test_x', - exchange_type='direct', - routing_key='test_route', - callback=mock_callback, - ) - - self.assertEqual(mock_blocking_connection.call_count, 3) - - def test_valid_message_processing(self): - # Simulate a valid message - valid_body = b'{"id": 1, "name": "Test", "active": true}' - mock_method_frame = MagicMock() - mock_properties = MagicMock() - - # Mock the consume method to yield a valid message - self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)] - - # Set up a mock callback function - mock_callback = Mock() - - - # Start the consumer with the payload model and callback - self.consumer.start_consumer( - queue_name='test_q', - exchange_name='test_x', - exchange_type='direct', - routing_key='test_route', - callback=mock_callback, - payload_model=ExpectedPayload - ) - - # Assert the callback was called once with the correct data - mock_callback.assert_called_once_with(mock_method_frame, mock_properties, valid_body) - - def test_valid_message_processing_no_autoack(self): - """Test that a message is acknowledged on successful processing.""" - valid_body = b'{"id": 1, "name": "Test", "active": true}' - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_properties = MagicMock() - - self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)] - mock_callback = Mock() - - self.consumer.start_consumer( - exchange_name="test_x", - exchange_type="direct", - queue_name="test_q", - routing_key="test_route", - callback=mock_callback, - payload_model=ExpectedPayload, - auto_ack=False - ) - - self.mock_channel.basic_ack.assert_called_once_with(delivery_tag=123) - - def test_invalid_message_skipped(self): - # Simulate an invalid message that fails validation - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - mock_method_frame = MagicMock() - mock_properties = MagicMock() - - # Mock the consume method to yield an invalid message - self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, invalid_body)] - - # Set up a mock callback function - mock_callback = Mock() - - # Start the consumer with the payload model and callback - self.consumer.start_consumer( - queue_name='test_queue', - auto_ack=True, - exchange_name='test_x', - exchange_type='direct', - routing_key='test_route', - callback=mock_callback, - payload_model=ExpectedPayload - ) - - # Assert the callback was not called since the message should be skipped - mock_callback.assert_not_called() - - def test_requeue_on_validation_failure(self): - # Simulate an invalid message that fails validation - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 # Set a delivery tag for nack - mock_properties = MagicMock() - - # Mock the consume method to yield an invalid message - self.mock_channel.consume.return_value = [(mock_method_frame, mock_properties, invalid_body)] - - # Start the consumer with the payload model - with patch.object(self.consumer._channel, 'basic_nack') as mock_nack: - self.consumer.start_consumer( - queue_name='test_q', - auto_ack=False, # Disable auto_ack to test nack behavior - exchange_name='test_x', - exchange_type='direct', - routing_key='test_route', - payload_model=ExpectedPayload - ) - - # Assert that basic_nack was called with requeue=True - mock_nack.assert_called_once_with(delivery_tag=123, requeue=True) - - def test_publish_message(self): - """Test that the message is correctly published to the exchange.""" - # Mock the setup methods for auto declare - self.consumer._setup_exchange_and_queue = Mock() - - # Mock the message to be published - message = b'{"data": "test_message"}' - exchange_name = 'test_x' - routing_key = 'test_route' - - # Publish the message - self.consumer.publish_message( - exchange_name=exchange_name, - routing_key=routing_key, - message=message, - exchange_type='direct', - queue_name='test_q', - auto_declare=True - ) - - # Assert the setup was called - self.consumer._setup_exchange_and_queue.assert_called_once_with( - exchange_name=exchange_name, - queue_name='test_q', - exchange_type='direct', - routing_key=routing_key - ) - - # Assert the message was published correctly - self.mock_channel.basic_publish.assert_called_once_with( - exchange=exchange_name, - routing_key=routing_key, - body=message, - properties=None - ) - - def test_retry_on_unroutable_error(self): - """Test that the publish_message retries 3 times when UnroutableError is raised.""" - # Mock the setup methods for auto declare - self.consumer._setup_exchange_and_queue = Mock() - - # Set up the message and parameters - message = "test_message" - exchange_name = 'test_x' - routing_key = 'test_route' - queue_name = 'test_q' - - # Mock the basic_publish to raise UnroutableError - self.mock_channel.basic_publish.side_effect = UnroutableError("Message could not be routed") - - # Atempt to publish the message - with self.assertRaises(RetryError): - self.consumer.publish_message( - exchange_name=exchange_name, - routing_key=routing_key, - message=message, - exchange_type='direct', - queue_name=queue_name, - auto_declare=True - ) - - # Assert that basic_publish was called 3 times due to retries - self.assertEqual(self.mock_channel.basic_publish.call_count, 3) - # Assert that "test_message" appears 3 times - - # Assert the correct calls were made with the expected arguments - expected_call = call( - exchange=exchange_name, - routing_key=routing_key, - body=message, - properties=None - ) - self.mock_channel.basic_publish.assert_has_calls([expected_call] * 3) - class TestBlockRabbitSSLSetup(unittest.TestCase): def test_ssl_setup_with_valid_paths(self): diff --git a/tests/test_mrsal_blocking_nossl.py b/tests/test_mrsal_blocking_nossl.py new file mode 100644 index 0000000..307a092 --- /dev/null +++ b/tests/test_mrsal_blocking_nossl.py @@ -0,0 +1,232 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock, call +from pika.exceptions import AMQPConnectionError, UnroutableError +from pydantic.dataclasses import dataclass +from tenacity import RetryError +from mrsal.amqp.subclass import MrsalAMQP + +# Configuration and expected payload definition +SETUP_ARGS = { + 'host': 'localhost', + 'port': 5672, + 'credentials': ('user', 'password'), + 'virtual_host': 'testboi', + 'ssl': False, + 'use_blocking': True, + 'heartbeat': 60, + 'blocked_connection_timeout': 60, + 'prefetch_count': 1 +} + +@dataclass +class ExpectedPayload: + id: int + name: str + active: bool + + +# Fixture to mock the BlockingConnection and the setup connection method +@pytest.fixture +def mock_amqp_connection(): + with patch('mrsal.amqp.subclass.pika.BlockingConnection') as mock_blocking_connection, \ + patch('mrsal.amqp.subclass.MrsalAMQP.setup_blocking_connection', autospec=True) as mock_setup_blocking_connection: + + # Set up the mock behaviors for the connection and channel + mock_channel = MagicMock() + mock_connection = MagicMock() + mock_connection.channel.return_value = mock_channel + mock_blocking_connection.return_value = mock_connection + + # Ensure setup_blocking_connection does nothing during the tests + mock_setup_blocking_connection.return_value = None + + # Provide the mocks for use in the test + yield mock_connection, mock_channel, mock_setup_blocking_connection + + +# Fixture to create a MrsalAMQP consumer with mocked channel +@pytest.fixture +def amqp_consumer(mock_amqp_connection): + mock_connection, mock_channel, _ = mock_amqp_connection + consumer = MrsalAMQP(**SETUP_ARGS) + consumer._channel = mock_channel # Inject the mocked channel into the consumer + return consumer + + +def test_retry_on_connection_failure_blocking(amqp_consumer, mock_amqp_connection): + """Test reconnection retries in blocking consumer mode.""" + mock_connection, mock_channel, mock_setup_blocking_connection = mock_amqp_connection + mock_channel.consume.side_effect = AMQPConnectionError("Connection lost") + + # Attempt to start the consumer, which should trigger the retry + with pytest.raises(RetryError): + amqp_consumer.start_consumer( + queue_name='test_q', + exchange_name='test_x', + exchange_type='direct', + routing_key='test_route', + callback=Mock(), + ) + + # Verify that setup_blocking_connection was retried 3 times + assert mock_setup_blocking_connection.call_count == 3 + + +def test_valid_message_processing(amqp_consumer): + # Simulate a valid message + valid_body = b'{"id": 1, "name": "Test", "active": true}' + mock_method_frame = MagicMock() + mock_properties = MagicMock() + + # Mock the consume method to yield a valid message + amqp_consumer._channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)] + + mock_callback = Mock() + + amqp_consumer.start_consumer( + queue_name='test_q', + exchange_name='test_x', + exchange_type='direct', + routing_key='test_route', + callback=mock_callback, + payload_model=ExpectedPayload + ) + + # Assert the callback was called once with the correct data + mock_callback.assert_called_once_with(mock_method_frame, mock_properties, valid_body) + + +def test_valid_message_processing_no_autoack(amqp_consumer): + """Test that a message is acknowledged on successful processing.""" + valid_body = b'{"id": 1, "name": "Test", "active": true}' + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_properties = MagicMock() + + amqp_consumer._channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)] + mock_callback = Mock() + + amqp_consumer.start_consumer( + exchange_name="test_x", + exchange_type="direct", + queue_name="test_q", + routing_key="test_route", + callback=mock_callback, + payload_model=ExpectedPayload, + auto_ack=False + ) + + amqp_consumer._channel.basic_ack.assert_called_once_with(delivery_tag=123) + + +def test_invalid_message_skipped(amqp_consumer): + # Simulate an invalid message that fails validation + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + mock_method_frame = MagicMock() + mock_properties = MagicMock() + + # Mock the consume method to yield an invalid message + amqp_consumer._channel.consume.return_value = [(mock_method_frame, mock_properties, invalid_body)] + + mock_callback = Mock() + + amqp_consumer.start_consumer( + queue_name='test_queue', + auto_ack=True, + exchange_name='test_x', + exchange_type='direct', + routing_key='test_route', + callback=mock_callback, + payload_model=ExpectedPayload + ) + + # Assert the callback was not called since the message should be skipped + mock_callback.assert_not_called() + + +def test_requeue_on_validation_failure(amqp_consumer): + # Simulate an invalid message that fails validation + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_properties = MagicMock() + + # Mock the consume method to yield an invalid message + amqp_consumer._channel.consume.return_value = [(mock_method_frame, mock_properties, invalid_body)] + + with patch.object(amqp_consumer._channel, 'basic_nack') as mock_nack: + amqp_consumer.start_consumer( + queue_name='test_q', + auto_ack=False, + exchange_name='test_x', + exchange_type='direct', + routing_key='test_route', + payload_model=ExpectedPayload + ) + + # Assert that basic_nack was called with requeue=True + mock_nack.assert_called_once_with(delivery_tag=123, requeue=True) + + +def test_publish_message(amqp_consumer): + """Test that the message is correctly published to the exchange.""" + amqp_consumer._setup_exchange_and_queue = Mock() + + message = b'{"data": "test_message"}' + exchange_name = 'test_x' + routing_key = 'test_route' + + amqp_consumer.publish_message( + exchange_name=exchange_name, + routing_key=routing_key, + message=message, + exchange_type='direct', + queue_name='test_q', + auto_declare=True + ) + + amqp_consumer._setup_exchange_and_queue.assert_called_once_with( + exchange_name=exchange_name, + queue_name='test_q', + exchange_type='direct', + routing_key=routing_key + ) + + amqp_consumer._channel.basic_publish.assert_called_once_with( + exchange=exchange_name, + routing_key=routing_key, + body=message, + properties=None + ) + + +def test_retry_on_unroutable_error(amqp_consumer): + """Test that the publish_message retries 3 times when UnroutableError is raised.""" + amqp_consumer._setup_exchange_and_queue = Mock() + + message = "test_message" + exchange_name = 'test_x' + routing_key = 'test_route' + queue_name = 'test_q' + + amqp_consumer._channel.basic_publish.side_effect = UnroutableError("Message could not be routed") + + with pytest.raises(RetryError): + amqp_consumer.publish_message( + exchange_name=exchange_name, + routing_key=routing_key, + message=message, + exchange_type='direct', + queue_name=queue_name, + auto_declare=True + ) + + assert amqp_consumer._channel.basic_publish.call_count == 3 + + expected_call = call( + exchange=exchange_name, + routing_key=routing_key, + body=message, + properties=None + ) + amqp_consumer._channel.basic_publish.assert_has_calls([expected_call] * 3) From 5bf4d2dffa9da9c63f470b66a2313ef854030dd1 Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Mon, 16 Sep 2024 23:14:55 +0200 Subject: [PATCH 2/3] vbump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 73e1d90..163294c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "" maintainers = ["Raafat ", "Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "1.0.3b" +version = "1.0.4b" [tool.poetry.dependencies] colorlog = "^6.7.0" From d4337b76eb1917f8c370bd6b41fbdc7d5d1a5a15 Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Tue, 17 Sep 2024 08:01:02 +0000 Subject: [PATCH 3/3] Apply automatic changes --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c27beca..e10634a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-1.0.3--beta-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10](https://img.shields.io/badge/python-3.10--3.11--3.12-blue.svg)](https://www.python.org/downloads/release/python-3103/)[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) +[![Release](https://img.shields.io/badge/release-1.0.3-etalue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10](https://img.shields.io/badge/python-3.10--3.11--3.12lue.svg)](https://www.python.org/downloads/release/python-3103/)[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) ## Intro