Skip to content

Commit

Permalink
Update emit_mq_message to support SelectConnections
Browse files Browse the repository at this point in the history
Related to neon-iris improvements
  • Loading branch information
NeonDaniel committed Jan 21, 2025
1 parent adf0e78 commit f2d3e79
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ def create_unique_id():

@classmethod
def emit_mq_message(cls,
connection: pika.BlockingConnection,
connection: Union[pika.BlockingConnection,
pika.SelectConnection],
request_data: dict,
exchange: Optional[str] = '',
queue: Optional[str] = '',
Expand Down Expand Up @@ -297,22 +298,26 @@ def emit_mq_message(cls,
.get("mq", {}).get("message_id") or
cls.create_unique_id())

with connection.channel() as channel:
if exchange:
channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
auto_delete=False)
if queue:
declared_queue = channel.queue_declare(queue=queue,
auto_delete=False)
if exchange_type == ExchangeType.fanout.value:
channel.queue_bind(queue=declared_queue.method.queue,
exchange=exchange)
channel.basic_publish(exchange=exchange or '',
routing_key=queue,
body=dict_to_b64(request_data),
properties=pika.BasicProperties(
expiration=str(expiration)))
channel = connection.channel()

if exchange:
channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
auto_delete=False)
if queue:
declared_queue = channel.queue_declare(queue=queue,
auto_delete=False)
if exchange_type == ExchangeType.fanout.value:
channel.queue_bind(queue=declared_queue.method.queue,
exchange=exchange)
channel.basic_publish(exchange=exchange or '',
routing_key=queue,
body=dict_to_b64(request_data),
properties=pika.BasicProperties(
expiration=str(expiration)))

channel.close()

LOG.debug(f"sent message: {request_data['message_id']}")
return request_data['message_id']

Expand Down

0 comments on commit f2d3e79

Please sign in to comment.