Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.8.0 #122

Merged
merged 44 commits into from
Feb 4, 2025
Merged

0.8.0 #122

Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
816ae6d
Fixing Issues with Consumers termination
NeonKirill Mar 30, 2024
a84ab94
Increment Version to 0.7.2a1
NeonKirill Mar 30, 2024
99b2236
feat: allow ovos-utils 0.1.0 (#96)
mikejgray Apr 25, 2024
e6c61fa
Increment Version to 0.7.2a2
NeonDaniel Apr 25, 2024
4deb976
Update OVOS dependencies (#98)
NeonDaniel Sep 12, 2024
2c5cd08
Increment Version to 0.7.2a3
NeonDaniel Sep 12, 2024
2aafcf9
[FEAT] Async-based Consumers Support (#99)
NeonKirill Nov 21, 2024
e629a59
Increment Version to 0.7.2a4
NeonDaniel Nov 21, 2024
ef96424
[BUG FIX] Fixing Async Consumer Logic to work based on SelectConnecti…
NeonKirill Dec 3, 2024
a39737e
Increment Version to 0.7.2a5
NeonDaniel Dec 3, 2024
505ea14
Eliminated "self joining" problem in consumer threads and added grace…
NeonKirill Dec 5, 2024
77b6d76
Increment Version to 0.7.2a6
NeonKirill Dec 5, 2024
ab771ee
[Bug Fix] Fixing issue with reopenning connection on consumers (#103)
NeonKirill Dec 5, 2024
c634ead
Increment Version to 0.7.2a7
NeonKirill Dec 5, 2024
9915cc2
Update tests to use temporary RMQ instance and improve coverage (#104)
NeonDaniel Dec 11, 2024
f845c3f
Increment Version to 0.7.2a8
NeonDaniel Dec 11, 2024
13604c7
Improve connection close and error handling (#107)
NeonDaniel Jan 2, 2025
989429a
Increment Version to 0.7.2a9
NeonDaniel Jan 2, 2025
ced4ad7
Fix ` reconnection` with unit test coverage (#109)
NeonDaniel Jan 13, 2025
c5bd3b7
Increment Version to 0.7.2a10
NeonDaniel Jan 13, 2025
87c85ae
Define `shutdown` method for NeonMQHandler with unit test coverage (#…
NeonDaniel Jan 14, 2025
857a7d2
Increment Version to 0.7.2a11
NeonDaniel Jan 14, 2025
a777c9c
Resolve observed connection errors (#111)
NeonDaniel Jan 16, 2025
936c3d0
Increment Version to 0.7.2a12
NeonDaniel Jan 16, 2025
11d48e1
Update StreamLost handling from error to warning to prevent non-actio…
NeonDaniel Jan 17, 2025
ac28303
Increment Version to 0.7.2a13
NeonDaniel Jan 17, 2025
f083a7f
Raise exception if MQ startup check fails (#116)
NeonDaniel Jan 22, 2025
255d7f7
Increment Version to 0.7.2a14
NeonDaniel Jan 22, 2025
43667c8
Update default async consumer behavior and configuration (#113)
NeonDaniel Jan 23, 2025
a9fba67
Increment Version to 0.7.2a15
NeonDaniel Jan 23, 2025
cfc073f
Support Select Connections in `emit_mq_message` (#115)
NeonDaniel Jan 23, 2025
d0f56a5
Increment Version to 0.7.2a16
NeonDaniel Jan 23, 2025
9037174
Update copyright notices to 2025 (#118)
NeonDaniel Jan 23, 2025
2b52c34
Increment Version to 0.7.2a17
NeonDaniel Jan 23, 2025
0aa88f9
Improved Startup Connectivity Checks (#117)
NeonDaniel Jan 23, 2025
2f95c41
Increment Version to 0.7.2a18
NeonDaniel Jan 23, 2025
dd071d5
Add specific handling for RMQ checks without a configured vhost (#119)
NeonDaniel Jan 24, 2025
783abe9
Increment Version to 0.7.2a19
NeonDaniel Jan 24, 2025
3595966
Refactor `_started` to `_consumers_started` to avoid conflicts in cla…
NeonDaniel Jan 24, 2025
1922f5a
Increment Version to 0.7.2a20
NeonDaniel Jan 24, 2025
989e0fa
Update to ensure `message_id` is not None (#121)
NeonDaniel Jan 27, 2025
f1a019e
Increment Version to 0.7.2a21
NeonDaniel Jan 27, 2025
adf3b43
Increment Version to 0.8.0
NeonDaniel Jan 28, 2025
83bc863
Update Changelog
NeonDaniel Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add specific handling for RMQ checks without a configured vhost (#119)
Add test coverage for `check_rmq_is_available`
Update logging
NeonDaniel authored Jan 24, 2025

Verified

This commit was signed with the committer’s verified signature.
weblate Weblate (bot)
commit dd071d51bfb6ce156a2b734940e1147b63207db1
3 changes: 3 additions & 0 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
@@ -576,6 +576,7 @@ def run_consumers(self, names: Optional[tuple] = None, daemon=True):
self.consumers[name].daemon = daemon
self.consumers[name].start()
self.consumer_properties[name]['started'] = True
LOG.debug(f"Started consumers for {self.service_name}")

def stop_consumers(self, names: Optional[tuple] = None):
"""
@@ -596,6 +597,7 @@ def stop_consumers(self, names: Optional[tuple] = None):
self.consumer_properties[name]['started'] = False
except Exception as e:
raise ChildProcessError(e)
LOG.debug(f"Stopped consumers for {self.service_name}")

@retry(callback_on_exceeded='stop_sync_thread', use_self=True,
num_retries=__run_retries__)
@@ -705,6 +707,7 @@ def stop(self):
self.stop_sync_thread()
self.stop_observer_thread()
self._started = False
LOG.info(f"Stopped Connector {self.service_name}")

def pre_run(self, **kwargs):
"""Additional logic invoked before method run()"""
17 changes: 10 additions & 7 deletions neon_mq_connector/utils/connection_utils.py
Original file line number Diff line number Diff line change
@@ -27,14 +27,13 @@
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
import time
from asyncio import IncompleteReadError

from threading import Event
from typing import Union, Callable, Optional
from ovos_utils.log import LOG
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.utils.connection_workflow import AMQPConnectionWorkflowFailed, AMQPConnectorException
from pika.connection import ConnectionParameters
from pika.exceptions import AMQPConnectionError, IncompatibleProtocolError
from pika.exceptions import IncompatibleProtocolError, ProbableAccessDeniedError

from neon_mq_connector.utils.network_utils import check_port_is_open

@@ -190,11 +189,15 @@ def check_rmq_is_available(
connection = BlockingConnection(connection_params)
connection.close()
success = True
except AMQPConnectionError as e:
if isinstance(e, IncompatibleProtocolError):
LOG.debug(f"RMQ is likely still starting up (e={e})")
except IncompatibleProtocolError as e:
LOG.debug(f"RMQ is likely still starting up (e={e})")
except ProbableAccessDeniedError as e:
if connection_params.virtual_host == '/':
LOG.warning(f"Access was denied to default vhost='/'. Assuming RMQ "
f"broker is online.")
success = True
else:
raise e
finally:
pika_log.setLevel(pika_level)
return success
return success
53 changes: 51 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -35,7 +35,9 @@
import pytest
import pika

from threading import Thread, Event
from threading import Thread

from pika.exceptions import ProbableAuthenticationError

sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

@@ -208,12 +210,23 @@ def test_connector_shutdown(self):
self.assertTrue(connector.connection.is_closed)


@pytest.mark.usefixtures("rmq_instance")
class TestMQConnectionUtils(unittest.TestCase):
test_conf = None
counter = 0

def setUp(self) -> None:
self.counter = 0

if self.test_conf is None:
self.test_conf = {
"server": "localhost",
"port": self.rmq_instance.port,
"users": {"mq_handler": {"user": "test_user",
"password": "test_password"}}}
import neon_mq_connector.utils.client_utils
neon_mq_connector.utils.client_utils._default_mq_config = self.test_conf

def repeating_method(self):
"""Simple method incrementing counter by one"""
self.counter += 1
@@ -260,7 +273,43 @@ def test_wait_for_mq_startup(self):

def test_check_rmq_is_available(self):
from neon_mq_connector.utils.connection_utils import check_rmq_is_available
# TODO
from pika.exceptions import ProbableAccessDeniedError
from pika.credentials import PlainCredentials
from pika.connection import ConnectionParameters

valid_vhost = "/neon_testing"
invalid_vhost = "/mock_vhost"
base_connection_kwargs = {"host": self.test_conf['server'],
"port": self.test_conf['port']}

valid_creds = PlainCredentials("test_user",
"test_password")
invalid_creds = PlainCredentials("test_user",
"invalid_password")

valid_connection = ConnectionParameters(**base_connection_kwargs,
virtual_host=valid_vhost,
credentials=valid_creds)
self.assertTrue(check_rmq_is_available(valid_connection))

invalid_bad_vhost = ConnectionParameters(**base_connection_kwargs,
virtual_host=invalid_vhost,
credentials=valid_creds)
with self.assertRaises(ProbableAccessDeniedError):
self.assertFalse(check_rmq_is_available(invalid_bad_vhost))

invalid_bad_creds = ConnectionParameters(**base_connection_kwargs,
virtual_host=valid_vhost,
credentials=invalid_creds)
with self.assertRaises(ProbableAuthenticationError):
self.assertFalse(check_rmq_is_available(invalid_bad_creds))

# If the calling service doesn't specify a `vhost`, allow it to start
# anyway (i.e. klat-observer)
invalid_default_vhost = ConnectionParameters(**base_connection_kwargs,
virtual_host='/',
credentials=valid_creds)
self.assertTrue(check_rmq_is_available(invalid_default_vhost))


class TestConsumerUtils(unittest.TestCase):