diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index d949b98..72fea18 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -266,7 +266,8 @@ def create_unique_id(): @classmethod def emit_mq_message(cls, - connection: pika.BlockingConnection, + connection: Union[pika.BlockingConnection, + pika.SelectConnection], request_data: dict, exchange: Optional[str] = '', queue: Optional[str] = '', @@ -389,22 +390,19 @@ def send_message(self, return msg_id @retry(use_self=True, num_retries=__run_retries__) - def create_mq_connection(self, vhost: str = '/', **kwargs): + def create_mq_connection(self, vhost: str = '/', + **kwargs) -> pika.BlockingConnection: """ - Creates MQ Connection on the specified virtual host - Note: Additional parameters can be defined via kwargs. + Creates a Blocking MQ Connection on the specified virtual host + Note: Additional parameters can be defined via kwargs. - :param vhost: address for desired virtual host - :raises Exception if self.config is not set + :param vhost: address for desired virtual host + :raises Exception if self.config is not set """ if not self.config: raise Exception('Configuration is not set') - if self.async_consumers_enabled: - return pika.SelectConnection( - parameters=self.get_connection_params(vhost, **kwargs)) - else: - return pika.BlockingConnection( - parameters=self.get_connection_params(vhost, **kwargs)) + return pika.BlockingConnection( + parameters=self.get_connection_params(vhost, **kwargs)) def register_consumer(self, name: str, vhost: str, queue: str, callback: callable,