Skip to content

Commit

Permalink
Troubleshooting IOLoop handling
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel committed Jan 16, 2025
1 parent 64dd025 commit e09e9f5
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ def __init__(self,
threading.Thread.__init__(self, *args, **kwargs)
try:
get_event_loop()
except RuntimeError:
set_event_loop(new_event_loop())
except RuntimeError as e:
LOG.warning(e)
loop = new_event_loop()
set_event_loop(loop)
loop.run_until_complete(self._wait_until_exit)
self._consumer_started = Event() # annotates that ConsumerThread is running
self._exit_event = Event()
self._channel_closed = threading.Event()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self._stopping = False
Expand All @@ -101,6 +105,9 @@ def __init__(self,
self.connection_failed_attempts = 0
self.max_connection_failed_attempts = 3

async def _wait_until_exit(self):
await self._exit_event.wait()

def create_connection(self) -> pika.SelectConnection:
return pika.SelectConnection(parameters=self.connection_params,
on_open_callback=self.on_connected,
Expand Down Expand Up @@ -207,6 +214,11 @@ def is_consuming(self) -> bool:

def run(self):
"""Starting connection io loop """
try:
get_event_loop()
except RuntimeError as e:
LOG.warning(e)
set_event_loop(new_event_loop())
if not self.is_consuming:
try:
self.connection: pika.SelectConnection = self.create_connection()
Expand Down Expand Up @@ -265,6 +277,7 @@ def join(self, timeout: Optional[float] = None) -> None:
"""Terminating consumer channel"""
if self.is_consumer_alive:
self._close_connection(mark_consumer_as_dead=True)
self._exit_event.set()
LOG.info(f"Stopped consumer. Waiting up to {timeout}s for thread to terminate.")
try:
super().join(timeout=timeout)
Expand Down

0 comments on commit e09e9f5

Please sign in to comment.