From cd6386abf96ee554bf031ace9e5376f3ec0264ca Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 12:35:21 -0800 Subject: [PATCH] Update `create_mq_connection` to respect Async consumer setting Update NeonMQHandler to us async consumer if configured --- neon_mq_connector/connector.py | 8 ++++++-- neon_mq_connector/utils/client_utils.py | 4 +--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index 72fc5d7..d949b98 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -399,8 +399,12 @@ def create_mq_connection(self, vhost: str = '/', **kwargs): """ if not self.config: raise Exception('Configuration is not set') - return pika.BlockingConnection( - parameters=self.get_connection_params(vhost, **kwargs)) + if self.async_consumers_enabled: + return pika.SelectConnection( + parameters=self.get_connection_params(vhost, **kwargs)) + else: + return pika.BlockingConnection( + parameters=self.get_connection_params(vhost, **kwargs)) def register_consumer(self, name: str, vhost: str, queue: str, callback: callable, diff --git a/neon_mq_connector/utils/client_utils.py b/neon_mq_connector/utils/client_utils.py index 2c8c175..1886d58 100644 --- a/neon_mq_connector/utils/client_utils.py +++ b/neon_mq_connector/utils/client_utils.py @@ -53,9 +53,7 @@ class NeonMQHandler(MQConnector): def __init__(self, config: dict, service_name: str, vhost: str): super().__init__(config, service_name) self.vhost = vhost - import pika - self.connection = pika.BlockingConnection( - parameters=self.get_connection_params(vhost)) + self.connection = self.create_mq_connection(self.vhost) def shutdown(self): MQConnector.stop(self)