Skip to content

Commit

Permalink
Update asyncio ioloop handling
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel committed Jan 16, 2025
1 parent 28e4330 commit ec83ddc
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def __init__(self,
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)
try:
self._io_loop = get_event_loop()
except RuntimeError:
self._io_loop = new_event_loop()
self._consumer_started = Event() # annotates that ConsumerThread is running
self._channel_closed = threading.Event()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
Expand All @@ -98,15 +102,11 @@ def __init__(self,
self.max_connection_failed_attempts = 3

def create_connection(self) -> pika.SelectConnection:
try:
io_loop = get_event_loop()
except RuntimeError:
io_loop = new_event_loop()
return pika.SelectConnection(parameters=self.connection_params,
on_open_callback=self.on_connected,
on_open_error_callback=self.on_connection_fail,
on_close_callback=self.on_close,
custom_ioloop=io_loop)
custom_ioloop=self._io_loop)

def on_connected(self, _):
"""Called when we are fully connected to RabbitMQ"""
Expand Down

0 comments on commit ec83ddc

Please sign in to comment.