Skip to content

Commit

Permalink
Update BlockingConsumerThread to ensure connection is closed exactl…
Browse files Browse the repository at this point in the history
…y once

Update tests to check for expected error callbacks
  • Loading branch information
NeonDaniel committed Jan 2, 2025
1 parent be8aa96 commit 89c0e26
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
18 changes: 10 additions & 8 deletions neon_mq_connector/consumers/blocking_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,16 @@ def run(self):
self._create_connection()
self._consumer_started.set()
self.channel.start_consuming()
except Exception as e:
self._close_connection()
if isinstance(e, pika.exceptions.ChannelClosed):
LOG.info(f"Channel closed by broker: {self.callback_func}")
elif isinstance(e, pika.exceptions.StreamLostError):
LOG.info("Connection closed by broker")
else:
except (pika.exceptions.ChannelClosed,
pika.exceptions.ConnectionClosed) as e:
LOG.info(f"Closed {e.reply_code}: {e.reply_text}")
if self._is_consumer_alive:
self._close_connection()
self.error_func(self, e)
except Exception as e:
if self._is_consumer_alive:
self._close_connection()
self.error_func(self, e)

def _create_connection(self):
self.connection = pika.BlockingConnection(self.connection_params)
Expand Down Expand Up @@ -145,6 +147,7 @@ def join(self, timeout: Optional[float] = None) -> None:
super(BlockingConsumerThread, self).join(timeout=timeout)

def _close_connection(self):
self._is_consumer_alive = False
try:
if self.connection and self.connection.is_open:
self.connection.close()
Expand All @@ -153,4 +156,3 @@ def _close_connection(self):
except Exception as e:
LOG.exception(f"Failed to close connection due to unexpected exception: {e}")
self._consumer_started.clear()
self._is_consumer_alive = False
3 changes: 3 additions & 0 deletions tests/test_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def test_blocking_consumer_thread(self):
self.assertFalse(test_thread.is_consuming)
self.assertTrue(test_thread.channel.is_closed)
self.assertFalse(test_thread.is_consumer_alive)
test_thread.error_func.assert_not_called()

# Invalid thread connection
connection_params.port = 80
Expand All @@ -90,6 +91,7 @@ def test_blocking_consumer_thread(self):
test_thread._consumer_started.wait(5)
self.assertFalse(test_thread.is_consuming)
self.assertIsNone(test_thread.channel)
test_thread.error_func.assert_called_once()

test_thread.join(30)
self.assertFalse(test_thread.is_consuming)
Expand Down Expand Up @@ -147,6 +149,7 @@ def test_select_consumer_thread(self):
self.assertFalse(test_thread.is_consumer_alive)
self.assertTrue(test_thread.channel.is_closed)
test_thread.on_close.assert_called_once()
error.assert_not_called()

# Invalid thread connection
connection_params.port = 80
Expand Down

0 comments on commit 89c0e26

Please sign in to comment.