Skip to content

Commit

Permalink
Troubleshoot IOLoop init in SelectConsumerThread
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel committed Jan 16, 2025
1 parent e09e9f5 commit 2ef70bb
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def __init__(self,
LOG.warning(e)
loop = new_event_loop()
set_event_loop(loop)
loop.run_until_complete(self._wait_until_exit)
loop.run_forever()
self._consumer_started = Event() # annotates that ConsumerThread is running
self._exit_event = Event()
# self._exit_event = Event()
self._channel_closed = threading.Event()
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self._stopping = False
Expand All @@ -105,8 +105,8 @@ def __init__(self,
self.connection_failed_attempts = 0
self.max_connection_failed_attempts = 3

async def _wait_until_exit(self):
await self._exit_event.wait()
# async def _wait_until_exit(self):
# await self._exit_event.wait()

def create_connection(self) -> pika.SelectConnection:
return pika.SelectConnection(parameters=self.connection_params,
Expand Down Expand Up @@ -277,7 +277,7 @@ def join(self, timeout: Optional[float] = None) -> None:
"""Terminating consumer channel"""
if self.is_consumer_alive:
self._close_connection(mark_consumer_as_dead=True)
self._exit_event.set()
# self._exit_event.set()
LOG.info(f"Stopped consumer. Waiting up to {timeout}s for thread to terminate.")
try:
super().join(timeout=timeout)
Expand Down

0 comments on commit 2ef70bb

Please sign in to comment.