Skip to content

Commit

Permalink
Update tests to use temporary RMQ instance and improve coverage (#104)
Browse files Browse the repository at this point in the history
* Refactor tests to use a local RMQ instance instead of configured remote
Allow non-backwards-compat tests to run in parallel

* Fix indent error in `unit_tests`

* Fix indent error in `unit_tests`

* Update async test to support changes

* Remove duplicated backwards-compat test automation

* Refactor test classes to match module order for better readability
Add test coverage for `utils` submodule
Update `create_mq_callback` to support decorating both functions and methods (including static methods)

* Update docstring for `create_mq_callback`
Fix bug introduced in test refactoring

* Add test coverage for `BlockingConsumerThread`

* Refactor test names and simplify GHA automation

* Add basic unittest coverage for SelectConsumerThread lifecycle
Update SelectConsumerThread to better define startup/shutdown processes
Add type annotations to SelectConsumerThread
Add logging to SelectConsumerThread

* Address all review comments
Remove redundant `channel.stop_consuming` call
Revert change to `super()` call
  • Loading branch information
NeonDaniel authored Dec 11, 2024
1 parent c634ead commit 9915cc2
Show file tree
Hide file tree
Showing 12 changed files with 631 additions and 158 deletions.
56 changes: 35 additions & 21 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,58 @@ on:
workflow_dispatch:

jobs:
unit_tests:
backwards_compat_tests:
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10', '3.11', '3.12' ]
python-version: [ '3.8', '3.10']
max-parallel: 1
timeout-minutes: 30
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install . -r requirements/test_requirements.txt
- name: Run Utils Tests
run: |
pytest tests/test_utils.py --doctest-modules --junitxml=tests/utils-test-results.xml
- name: Upload Utils test results
uses: actions/upload-artifact@v4
with:
name: utils-test-results-${{ matrix.python-version }}
path: tests/utils-test-results.xml
- name: Run Connector Tests
run: |
pytest tests/test_connector.py --doctest-modules --junitxml=tests/connector-test-results.xml
- name: Upload Connector test results
uses: actions/upload-artifact@v4
with:
name: connector-test-results-${{ matrix.python-version }}
path: tests/connector-test-results.xml
- name: Run Backward Compatibility Tests
run: |
pytest tests/test_backward_compatibility.py --doctest-modules --junitxml=tests/backward-compatibility-test-results.xml
pytest tests/backward_compat_tests.py --doctest-modules --junitxml=tests/backward-compatibility-test-results.xml
- name: Upload Backward Compatibility test results
uses: actions/upload-artifact@v4
with:
name: backward-compatibility-test-results-${{ matrix.python-version }}
path: tests/backward-compatibility-test-results.xml
unit_tests:
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10', '3.11', '3.12' ]
timeout-minutes: 30
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install apt dependencies
run: |
sudo apt update
sudo apt install -y rabbitmq-server
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install . -r requirements/test_requirements.txt
- name: Run Unit Tests
run: |
pytest tests --doctest-modules --junitxml=tests/test-results.xml
- name: Upload test results
uses: actions/upload-artifact@v4
with:
name: test-results-${{ matrix.python-version }}
path: tests/test-results.xml
14 changes: 8 additions & 6 deletions neon_mq_connector/consumers/blocking_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class BlockingConsumerThread(threading.Thread):

# retry to handle connection failures in case MQ server is still starting
def __init__(self, connection_params: pika.ConnectionParameters,
queue: str, callback_func: callable,
queue: str,
callback_func: callable,
error_func: callable = consumer_utils.default_error_handler,
auto_ack: bool = True,
queue_reset: bool = False,
Expand All @@ -71,7 +72,8 @@ def __init__(self, connection_params: pika.ConnectionParameters,
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)
self._is_consuming = False # annotates that ConsumerThread is running
self._consumer_started = threading.Event() # annotates that ConsumerThread is running
self._consumer_started.clear()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated

self.callback_func = callback_func
Expand All @@ -96,15 +98,15 @@ def is_consumer_alive(self) -> bool:

@property
def is_consuming(self) -> bool:
return self._is_consuming
return self._consumer_started.is_set()

def run(self):
"""Creating consumer channel"""
if not self._is_consuming:
if not self.is_consuming:
try:
super(BlockingConsumerThread, self).run()
self._create_connection()
self._is_consuming = True
self._consumer_started.set()
self.channel.start_consuming()
except Exception as e:
self._close_connection()
Expand Down Expand Up @@ -150,5 +152,5 @@ def _close_connection(self):
pass
except Exception as e:
LOG.exception(f"Failed to close connection due to unexpected exception: {e}")
self._is_consuming = False
self._consumer_started.clear()
self._is_consumer_alive = False
74 changes: 52 additions & 22 deletions neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import threading
import time

from asyncio import Event, run
from typing import Optional

import pika.exceptions
from ovos_utils import LOG
from pika.channel import Channel
from pika.exchange_type import ExchangeType
from pika.frame import Method

from neon_mq_connector.utils import consumer_utils

Expand All @@ -46,7 +49,8 @@ class SelectConsumerThread(threading.Thread):

def __init__(self,
connection_params: pika.ConnectionParameters,
queue: str, callback_func: callable,
queue: str,
callback_func: callable,
error_func: callable = consumer_utils.default_error_handler,
auto_ack: bool = True,
queue_reset: bool = False,
Expand Down Expand Up @@ -74,8 +78,10 @@ def __init__(self,
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)
self._is_consuming = False # annotates that ConsumerThread is running
self._consumer_started = Event() # annotates that ConsumerThread is running
self._channel_closed = threading.Event()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self._stopping = False
self.callback_func = callback_func
self.error_func = error_func
self.exchange = exchange or ''
Expand All @@ -89,11 +95,11 @@ def __init__(self,
self.queue_reset = queue_reset
self.exchange_reset = exchange_reset

self.connection = None
self.connection: Optional[pika.SelectConnection] = None
self.connection_failed_attempts = 0
self.max_connection_failed_attempts = 3

def create_connection(self):
def create_connection(self) -> pika.SelectConnection:
return pika.SelectConnection(parameters=self.connection_params,
on_open_callback=self.on_connected,
on_open_error_callback=self.on_connection_fail,
Expand All @@ -103,32 +109,39 @@ def on_connected(self, _):
"""Called when we are fully connected to RabbitMQ"""
self.connection.channel(on_open_callback=self.on_channel_open)

def on_connection_fail(self, _):
def on_connection_fail(self, *_, **__):
""" Called when connection to RabbitMQ fails"""
self.connection_failed_attempts += 1
if self.connection_failed_attempts > self.max_connection_failed_attempts:
LOG.error(f'Failed establish MQ connection after {self.max_connection_failed_attempts} attempts')
self._close_connection()
LOG.error(f'Failed establish MQ connection after {self.connection_failed_attempts} attempts')
self.error_func("Connection not established")
self._close_connection(mark_consumer_as_dead=True)
else:
self.reconnect()

def on_channel_open(self, new_channel):
def on_channel_open(self, new_channel: Channel):
"""Called when our channel has opened"""
new_channel.add_on_close_callback(self.on_channel_close)
self.channel = new_channel
if self.queue_reset:
self.channel.queue_delete(queue=self.queue,
if_unused=True,
callback=self.declare_queue)
else:
self.declare_queue()
self._consumer_started.set()

def on_channel_close(self, *_, **__):
LOG.info(f"Channel closed.")
self._channel_closed.set()

def declare_queue(self, _unused_frame = None):
def declare_queue(self, _unused_frame: Optional[Method] = None):
return self.channel.queue_declare(queue=self.queue,
exclusive=self.queue_exclusive,
auto_delete=False,
callback=self.on_queue_declared)

def on_queue_declared(self, _unused_frame = None):
def on_queue_declared(self, _unused_frame: Optional[Method] = None):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
if self.exchange:
self.setup_exchange()
Expand All @@ -141,13 +154,13 @@ def setup_exchange(self):
else:
self.declare_exchange()

def declare_exchange(self, _unused_frame = None):
def declare_exchange(self, _unused_frame: Optional[Method] = None):
self.channel.exchange_declare(exchange=self.exchange,
exchange_type=self.exchange_type,
auto_delete=False,
callback=self.bind_exchange_to_queue)

def bind_exchange_to_queue(self, _unused_frame = None):
def bind_exchange_to_queue(self, _unused_frame: Optional[Method] = None):
try:
self.channel.queue_bind(
queue=self.queue,
Expand All @@ -157,10 +170,10 @@ def bind_exchange_to_queue(self, _unused_frame = None):
except Exception as e:
LOG.error(f"Error binding queue '{self.queue}' to exchange '{self.exchange}': {e}")

def set_qos(self, _unused_frame = None):
def set_qos(self, _unused_frame: Optional[Method] = None):
self.channel.basic_qos(prefetch_count=50, callback=self.start_consuming)

def start_consuming(self, _unused_frame = None):
def start_consuming(self, _unused_frame: Optional[Method] = None):
self.channel.basic_consume(queue=self.queue,
on_message_callback=self.on_message,
auto_ack=self.auto_ack)
Expand All @@ -172,39 +185,52 @@ def on_message(self, channel, method, properties, body):
self.error_func(self, e)

def on_close(self, _, e):
LOG.error(f"Closing MQ connection due to exception: {e}")
self.reconnect()
if isinstance(e, pika.exceptions.ConnectionClosed):
LOG.info(f"Connection closed normally: {e}")
if not self._stopping:
LOG.error(f"Closing MQ connection due to exception: {e}")
self.reconnect()

@property
def is_consumer_alive(self) -> bool:
return self._is_consumer_alive

@property
def is_consuming(self) -> bool:
return self._is_consuming
return self._consumer_started.is_set()

def run(self):
"""Starting connnection io loop """
if not self.is_consuming:
try:
super(SelectConsumerThread, self).run()
self.connection = self.create_connection()
self._is_consuming = True
self.connection: pika.SelectConnection = self.create_connection()
self.connection.ioloop.start()
except Exception as e:
LOG.error(f"Failed to start io loop on consumer thread {self.name!r}: {e}")
self._close_connection()

def _close_connection(self, mark_consumer_as_dead: bool = True):
try:
self._stopping = True
if self.connection and not (self.connection.is_closed or self.connection.is_closing):
self.connection.ioloop.stop()
self.connection.close()
LOG.info(f"Waiting for channel close")
if not self._channel_closed.wait(15):
raise TimeoutError(f"Timeout waiting for channel close. "
f"is_closed={self.channel.is_closed}")
LOG.info(f"Channel closed")
if self.connection:
self.connection.ioloop.stop()
self.connection = None
except Exception as e:
LOG.error(f"Failed to close connection for Consumer {self.name!r}: {e}")
self._is_consuming = False
self._consumer_started.clear()
if mark_consumer_as_dead:
self._is_consumer_alive = False
else:
self._stopping = False

def reconnect(self, wait_interval: int = 1):
self._close_connection(mark_consumer_as_dead=False)
Expand All @@ -213,6 +239,10 @@ def reconnect(self, wait_interval: int = 1):

def join(self, timeout: Optional[float] = None) -> None:
"""Terminating consumer channel"""
if self.is_consumer_alive and self.is_consuming:
if self.is_consumer_alive:
self._close_connection(mark_consumer_as_dead=True)
super().join(timeout=timeout)
LOG.info(f"Stopped consumer. Waiting up to {timeout}s for thread to terminate.")
try:
super().join(timeout=timeout)
except Exception as e:
LOG.exception(e)
1 change: 1 addition & 0 deletions neon_mq_connector/utils/consumer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from ovos_utils.log import LOG


def default_error_handler(*args):
"""
Default handler for Consumer instances
Expand Down
Loading

0 comments on commit 9915cc2

Please sign in to comment.