Skip to content

Commit

Permalink
Fixing Issues with Consumers termination
Browse files Browse the repository at this point in the history
Added "is_consumer_alive" flag to ensure graceful termination of the ConsumerThread
  • Loading branch information
NeonKirill authored Mar 30, 2024
1 parent 78fb74f commit 816ae6d
Showing 1 changed file with 52 additions and 41 deletions.
93 changes: 52 additions & 41 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def _default_error_handler(*args):
class ConsumerThread(threading.Thread):

# retry to handle connection failures in case MQ server is still starting
@retry(use_self=True)
def __init__(self, connection_params: pika.ConnectionParameters,
queue: str, callback_func: callable,
error_func: callable = _default_error_handler,
Expand Down Expand Up @@ -81,7 +80,8 @@ def __init__(self, connection_params: pika.ConnectionParameters,
to learn more about different exchanges
"""
threading.Thread.__init__(self, *args, **kwargs)
self.is_consuming = False
self._is_consuming = False # annotates that ConsumerThread is running
self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated
self.connection = pika.BlockingConnection(connection_params)
self.callback_func = callback_func
self.error_func = error_func
Expand All @@ -107,31 +107,46 @@ def __init__(self, connection_params: pika.ConnectionParameters,
queue=self.queue,
auto_ack=auto_ack)

@property
def is_consumer_alive(self) -> bool:
return self._is_consumer_alive

@property
def is_consuming(self) -> bool:
return self._is_consuming

def run(self):
"""Creating consumer channel"""
super(ConsumerThread, self).run()
try:
self.is_consuming = True
self.channel.start_consuming()
except pika.exceptions.ChannelClosed:
LOG.debug(f"Channel closed by broker: {self.callback_func}")
except Exception as e:
LOG.error(e)
self.error_func(self, e)

def join(self, timeout: Optional[float] = ...) -> None:
if not self._is_consuming:
try:
super(ConsumerThread, self).run()
self._is_consuming = True
self.channel.start_consuming()
except Exception as e:
self._is_consuming = False
if isinstance(e, pika.exceptions.ChannelClosed):
LOG.error(f"Channel closed by broker: {self.callback_func}")
else:
LOG.error(e)
self.error_func(self, e)
self.join(allow_restart=True)

def join(self, timeout: Optional[float] = ..., allow_restart: bool = True) -> None:
"""Terminating consumer channel"""
try:
self.channel.stop_consuming()
self.is_consuming = False
if self.channel.is_open:
self.channel.close()
if self.connection.is_open:
self.connection.close()
except Exception as x:
LOG.error(x)
finally:
super(ConsumerThread, self).join()
if self._is_consumer_alive:
try:
self.channel.stop_consuming()
if self.channel.is_open:
self.channel.close()
if self.connection.is_open:
self.connection.close()
except Exception as x:
LOG.error(x)
finally:
self._is_consuming = False
if not allow_restart:
self._is_consumer_alive = False
super(ConsumerThread, self).join(timeout=timeout)


class MQConnector(ABC):
Expand Down Expand Up @@ -179,7 +194,7 @@ def __init__(self, config: Optional[dict], service_name: str):
self.property_key = 'properties'
self._service_id = None
self.service_name = service_name
self.consumers = dict()
self.consumers: Dict[str, ConsumerThread] = dict()
self.consumer_properties = dict()
self._vhost = None
self._sync_thread = None
Expand Down Expand Up @@ -515,7 +530,7 @@ def register_consumer(self, name: str, vhost: str, queue: str,
if skip_on_existing:
LOG.info(f'Consumer under index "{name}" already declared')
return
self.stop_consumers(names=(name,))
self.stop_consumers(names=(name,), allow_restart=False)
self.consumer_properties.setdefault(name, {})
self.consumer_properties[name]['properties'] = \
dict(connection_params=self.get_connection_params(vhost),
Expand All @@ -530,11 +545,14 @@ def register_consumer(self, name: str, vhost: str, queue: str,
ConsumerThread(**self.consumer_properties[name]['properties'])

def restart_consumer(self, name: str):
self.stop_consumers(names=(name,))
self.stop_consumers(names=(name,), allow_restart=True)
consumer_data = self.consumer_properties.get(name, {})
restart_attempts = consumer_data.get('restart_attempts',
self.__max_consumer_restarts__)
if not consumer_data.get('properties'):
err_msg = ''
if not consumer_data.get('is_alive', True):
LOG.debug(f'Skipping joined consumer = "{name}"')
elif not consumer_data.get('properties'):
err_msg = 'creation properties not found'
elif 0 < restart_attempts < consumer_data.get('num_restarted', 0):
err_msg = 'num restarts exceeded'
Expand All @@ -543,7 +561,6 @@ def restart_consumer(self, name: str):
self.run_consumers(names=(name,))
self.consumer_properties[name].setdefault('num_restarted', 0)
self.consumer_properties[name]['num_restarted'] += 1
err_msg = ""
if err_msg:
LOG.error(f'Cannot restart consumer "{name}" - {err_msg}')

Expand Down Expand Up @@ -601,13 +618,12 @@ def run_consumers(self, names: tuple = (), daemon=True):
if not names or len(names) == 0:
names = list(self.consumers)
for name in names:
if isinstance(self.consumers.get(name), ConsumerThread) and not \
self.consumers[name].is_alive():
if isinstance(self.consumers.get(name), ConsumerThread) and self.consumers[name].is_consumer_alive:
self.consumers[name].daemon = daemon
self.consumers[name].start()
self.consumer_properties[name]['started'] = True

def stop_consumers(self, names: tuple = ()):
def stop_consumers(self, names: tuple = (), allow_restart: bool = True):
"""
Stops consumer threads based on the name if present
(stops all of the declared consumers by default)
Expand All @@ -617,15 +633,10 @@ def stop_consumers(self, names: tuple = ()):
for name in names:
try:
if name in list(self.consumers):
self.consumers[name].join(
timeout=self.__consumer_join_timeout__)
if self.consumers[name] and self.consumers[name].is_alive():
err_msg = f'{name} is alive although was set to join ' \
f'for {self.__consumer_join_timeout__}!'
LOG.error(err_msg)
raise Exception(err_msg)
self.consumers[name] = None
self.consumers[name].join(timeout=self.__consumer_join_timeout__, allow_restart=allow_restart)
self.consumer_properties[name]['is_alive'] = self.consumers[name].is_consumer_alive
self.consumer_properties[name]['started'] = False
self.consumers[name] = None
except Exception as e:
raise ChildProcessError(e)

Expand Down Expand Up @@ -729,7 +740,7 @@ def stop_observer_thread(self):

def stop(self):
"""Generic method for graceful instance stopping"""
self.stop_consumers()
self.stop_consumers(allow_restart=False)
self.stop_sync_thread()
self.stop_observer_thread()

Expand Down

0 comments on commit 816ae6d

Please sign in to comment.