Skip to content

Commit

Permalink
Update SelectConsumerThread to pass exceptions to self.error_func
Browse files Browse the repository at this point in the history
… to match `BlockingConsumerThread`

Handle channel/connection closed exceptions explicitly in `SelectConsumerThread`
  • Loading branch information
NeonDaniel committed Jan 2, 2025
1 parent f845c3f commit be8aa96
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,17 @@ def run(self):
super(SelectConsumerThread, self).run()
self.connection: pika.SelectConnection = self.create_connection()
self.connection.ioloop.start()
except (pika.exceptions.ChannelClosed,
pika.exceptions.ConnectionClosed) as e:
LOG.info(f"Closed {e.reply_code}: {e.reply_text}")
if not self._stopping:
# Connection was unexpectedly closed
self._close_connection()
self.error_func(self, e)
except Exception as e:
LOG.error(f"Failed to start io loop on consumer thread {self.name!r}: {e}")
self._close_connection()
self.error_func(self, e)

def _close_connection(self, mark_consumer_as_dead: bool = True):
try:
Expand Down

0 comments on commit be8aa96

Please sign in to comment.