diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index 72fc5d7..72fea18 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -266,7 +266,8 @@ def create_unique_id(): @classmethod def emit_mq_message(cls, - connection: pika.BlockingConnection, + connection: Union[pika.BlockingConnection, + pika.SelectConnection], request_data: dict, exchange: Optional[str] = '', queue: Optional[str] = '', @@ -389,13 +390,14 @@ def send_message(self, return msg_id @retry(use_self=True, num_retries=__run_retries__) - def create_mq_connection(self, vhost: str = '/', **kwargs): + def create_mq_connection(self, vhost: str = '/', + **kwargs) -> pika.BlockingConnection: """ - Creates MQ Connection on the specified virtual host - Note: Additional parameters can be defined via kwargs. + Creates a Blocking MQ Connection on the specified virtual host + Note: Additional parameters can be defined via kwargs. - :param vhost: address for desired virtual host - :raises Exception if self.config is not set + :param vhost: address for desired virtual host + :raises Exception if self.config is not set """ if not self.config: raise Exception('Configuration is not set') diff --git a/neon_mq_connector/utils/client_utils.py b/neon_mq_connector/utils/client_utils.py index 2c8c175..1886d58 100644 --- a/neon_mq_connector/utils/client_utils.py +++ b/neon_mq_connector/utils/client_utils.py @@ -53,9 +53,7 @@ class NeonMQHandler(MQConnector): def __init__(self, config: dict, service_name: str, vhost: str): super().__init__(config, service_name) self.vhost = vhost - import pika - self.connection = pika.BlockingConnection( - parameters=self.get_connection_params(vhost)) + self.connection = self.create_mq_connection(self.vhost) def shutdown(self): MQConnector.stop(self)