Skip to content

Commit

Permalink
Update emit_mq_message to prevent mutating input data
Browse files Browse the repository at this point in the history
Add test coverage for `emit_mq_message` for Blocking and Select connections
  • Loading branch information
NeonDaniel committed Jan 21, 2025
1 parent 643e386 commit 079094b
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 1 deletion.
3 changes: 3 additions & 0 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ def emit_mq_message(cls,
:raises ValueError: invalid request data provided
:returns message_id: id of the sent message
"""
# Make a copy of request_data to prevent modifying the input object
request_data = dict(request_data)

if not isinstance(request_data, dict):
raise TypeError(f"Expected dict and got {type(request_data)}")
if not request_data:
Expand Down
81 changes: 80 additions & 1 deletion tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

from unittest.mock import Mock
from ovos_utils.log import LOG
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.select_connection import SelectConnection
from pika.exchange_type import ExchangeType

from neon_mq_connector.connector import MQConnector, ConsumerThreadInstance
Expand Down Expand Up @@ -300,6 +302,7 @@ def setUp(self):
self.connector_instance.async_consumers_enabled = True


@pytest.mark.usefixtures("rmq_instance")
class TestMQConnectorInit(unittest.TestCase):
def test_connector_init(self):
connector = MQConnector(None, "test")
Expand Down Expand Up @@ -339,4 +342,80 @@ def test_connector_init(self):
self.assertEqual(connector.vhost, test_vhost)
connector.vhost = "/testing"
self.assertEqual(connector.vhost, test_vhost)
# TODO: test other methods

def test_emit_mq_message(self):
from neon_mq_connector.utils.network_utils import b64_to_dict

test_config = {"server": "127.0.0.1",
"port": self.rmq_instance.port,
"users": {
"test": {
"user": "test_user",
"password": "test_password"
}}}
test_vhost = "/neon_testing"
test_queue = "test_queue"
connector = MQConnector(test_config, "test")
connector.vhost = test_vhost

request_data = {"test": True,
"data": ["test"]}

callback_event = threading.Event()
callback = Mock(side_effect=lambda *args: callback_event.set())
connector.register_consumer("test_consumer", vhost=test_vhost,
queue=test_queue, callback=callback)
connector.run()

close_event = threading.Event()
on_open = Mock()
on_error = Mock()
on_close = Mock(side_effect=lambda *args: close_event.set())

blocking_connection = BlockingConnection(
parameters=connector.get_connection_params(test_vhost))

async_connection = SelectConnection(
parameters=connector.get_connection_params(test_vhost),
on_open_callback=on_open, on_open_error_callback=on_error,
on_close_callback=on_close)
async_thread = threading.Thread(target=async_connection.ioloop.start,
daemon=True)
async_thread.start()

# Blocking connection emit
message_id = connector.emit_mq_message(blocking_connection,
request_data, queue=test_queue)
self.assertIsInstance(message_id, str)
callback_event.wait(timeout=5)
self.assertTrue(callback_event.is_set())
callback.assert_called_once()
self.assertEqual(b64_to_dict(callback.call_args.args[3]),
{**request_data, "message_id": message_id})
callback.reset_mock()
callback_event.clear()

# Async connection emit
on_open.assert_called_once()
message_id_2 = connector.emit_mq_message(async_connection,
request_data, queue=test_queue)
self.assertIsInstance(message_id, str)
self.assertNotEqual(message_id, message_id_2)
callback_event.wait(timeout=5)
self.assertTrue(callback_event.is_set())
callback.assert_called_once()
self.assertEqual(b64_to_dict(callback.call_args.args[3]),
{**request_data, "message_id": message_id_2})

on_close.assert_not_called()
connector.stop()

async_connection.close()
close_event.wait(timeout=5)
self.assertTrue(close_event.is_set())
on_close.assert_called_once()

async_thread.join(3)
on_error.assert_not_called()

# TODO: test other methods

0 comments on commit 079094b

Please sign in to comment.