Skip to content

Commit

Permalink
Update create_mq_connection to respect Async consumer setting
Browse files Browse the repository at this point in the history
Update NeonMQHandler to us async consumer if configured
  • Loading branch information
NeonDaniel committed Jan 20, 2025
1 parent ac28303 commit cd6386a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
8 changes: 6 additions & 2 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,12 @@ def create_mq_connection(self, vhost: str = '/', **kwargs):
"""
if not self.config:
raise Exception('Configuration is not set')
return pika.BlockingConnection(
parameters=self.get_connection_params(vhost, **kwargs))
if self.async_consumers_enabled:
return pika.SelectConnection(
parameters=self.get_connection_params(vhost, **kwargs))
else:
return pika.BlockingConnection(
parameters=self.get_connection_params(vhost, **kwargs))

def register_consumer(self, name: str, vhost: str, queue: str,
callback: callable,
Expand Down
4 changes: 1 addition & 3 deletions neon_mq_connector/utils/client_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ class NeonMQHandler(MQConnector):
def __init__(self, config: dict, service_name: str, vhost: str):
super().__init__(config, service_name)
self.vhost = vhost
import pika
self.connection = pika.BlockingConnection(
parameters=self.get_connection_params(vhost))
self.connection = self.create_mq_connection(self.vhost)

def shutdown(self):
MQConnector.stop(self)
Expand Down

0 comments on commit cd6386a

Please sign in to comment.