Skip to content

Commit

Permalink
Add test coverage for BlockingConsumerThread
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel committed Dec 6, 2024
1 parent df06465 commit 8ee4116
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 6 deletions.
16 changes: 10 additions & 6 deletions neon_mq_connector/consumers/blocking_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class BlockingConsumerThread(threading.Thread):

# retry to handle connection failures in case MQ server is still starting
def __init__(self, connection_params: pika.ConnectionParameters,
queue: str, callback_func: callable,
queue: str,
callback_func: callable,
error_func: callable = consumer_utils.default_error_handler,
auto_ack: bool = True,
queue_reset: bool = False,
Expand All @@ -71,7 +72,8 @@ def __init__(self, connection_params: pika.ConnectionParameters,
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)
self._is_consuming = False # annotates that ConsumerThread is running
self._consumer_started = threading.Event() # annotates that ConsumerThread is running
self._consumer_started.clear()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated

self.callback_func = callback_func
Expand All @@ -96,15 +98,15 @@ def is_consumer_alive(self) -> bool:

@property
def is_consuming(self) -> bool:
return self._is_consuming
return self._consumer_started.is_set()

def run(self):
"""Creating consumer channel"""
if not self._is_consuming:
if not self.is_consuming:
try:
super(BlockingConsumerThread, self).run()
self._create_connection()
self._is_consuming = True
self._consumer_started.set()
self.channel.start_consuming()
except Exception as e:
self._close_connection()
Expand Down Expand Up @@ -144,11 +146,13 @@ def join(self, timeout: Optional[float] = None) -> None:

def _close_connection(self):
try:
if self.is_consuming:
self.channel.stop_consuming()
if self.connection and self.connection.is_open:
self.connection.close()
except pika.exceptions.StreamLostError:
pass
except Exception as e:
LOG.exception(f"Failed to close connection due to unexpected exception: {e}")
self._is_consuming = False
self._consumer_started.clear()
self._is_consumer_alive = False
100 changes: 100 additions & 0 deletions tests/test_consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework
# All trademark and other rights reserved by their respective owners
# Copyright 2008-2024 Neongecko.com Inc.
# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds,
# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo
# BSD-3 License
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from unittest.mock import Mock

import pytest

from unittest import TestCase

from pika.connection import ConnectionParameters
from pika.credentials import PlainCredentials
from pika.exchange_type import ExchangeType

from .fixtures import rmq_instance


@pytest.mark.usefixtures("rmq_instance")
class TestBlockingConsumer(TestCase):

def test_blocking_consumer_thread(self):
from neon_mq_connector.consumers.blocking_consumer import BlockingConsumerThread
connection_params = ConnectionParameters(host='localhost',
port=self.rmq_instance.port,
virtual_host="/neon_testing",
credentials=PlainCredentials(
"test_user",
"test_password"))
queue = "test_q"
callback = Mock()
error = Mock()

# Valid thread
test_thread = BlockingConsumerThread(connection_params, queue, callback,
error)
self.assertEqual(test_thread.callback_func, callback)
self.assertEqual(test_thread.error_func, error)
self.assertIsInstance(test_thread.auto_ack, bool)
self.assertIsInstance(test_thread.exchange, str)
self.assertIsInstance(test_thread.exchange_type, ExchangeType)
self.assertIsInstance(test_thread.exchange_reset, bool)
self.assertEqual(test_thread.queue, queue)
self.assertIsInstance(test_thread.queue_reset, bool)
self.assertIsInstance(test_thread.queue_exclusive, bool)
self.assertEqual(test_thread.connection_params, connection_params)

self.assertTrue(test_thread.is_consumer_alive)
self.assertFalse(test_thread.is_consuming)

test_thread.start()
test_thread._consumer_started.wait(5)
self.assertTrue(test_thread.is_consuming)
self.assertTrue(test_thread.channel.is_open)

test_thread.join(30)
self.assertFalse(test_thread.is_consuming)
self.assertTrue(test_thread.channel.is_closed)
self.assertFalse(test_thread.is_consumer_alive)

# Invalid thread connection
connection_params.port = 80
test_thread = BlockingConsumerThread(connection_params, queue, callback,
error)
test_thread.start()
test_thread._consumer_started.wait(5)
self.assertFalse(test_thread.is_consuming)
self.assertIsNone(test_thread.channel)

test_thread.join(30)
self.assertFalse(test_thread.is_consuming)
self.assertFalse(test_thread.is_consumer_alive)


@pytest.mark.usefixtures("rmq_instance")
class TestSelectConsumer(TestCase):
from neon_mq_connector.consumers.select_consumer import SelectConsumerThread
# TODO

0 comments on commit 8ee4116

Please sign in to comment.