diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index b16da48..fdfffff 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -52,7 +52,6 @@ def _default_error_handler(*args): class ConsumerThread(threading.Thread): # retry to handle connection failures in case MQ server is still starting - @retry(use_self=True) def __init__(self, connection_params: pika.ConnectionParameters, queue: str, callback_func: callable, error_func: callable = _default_error_handler, @@ -81,7 +80,8 @@ def __init__(self, connection_params: pika.ConnectionParameters, to learn more about different exchanges """ threading.Thread.__init__(self, *args, **kwargs) - self.is_consuming = False + self._is_consuming = False # annotates that ConsumerThread is running + self._is_consumer_alive = True # annotates that ConsumerThread is alive and shall be recreated self.connection = pika.BlockingConnection(connection_params) self.callback_func = callback_func self.error_func = error_func @@ -107,31 +107,46 @@ def __init__(self, connection_params: pika.ConnectionParameters, queue=self.queue, auto_ack=auto_ack) + @property + def is_consumer_alive(self) -> bool: + return self._is_consumer_alive + + @property + def is_consuming(self) -> bool: + return self._is_consuming + def run(self): """Creating consumer channel""" - super(ConsumerThread, self).run() - try: - self.is_consuming = True - self.channel.start_consuming() - except pika.exceptions.ChannelClosed: - LOG.debug(f"Channel closed by broker: {self.callback_func}") - except Exception as e: - LOG.error(e) - self.error_func(self, e) - - def join(self, timeout: Optional[float] = ...) -> None: + if not self._is_consuming: + try: + super(ConsumerThread, self).run() + self._is_consuming = True + self.channel.start_consuming() + except Exception as e: + self._is_consuming = False + if isinstance(e, pika.exceptions.ChannelClosed): + LOG.error(f"Channel closed by broker: {self.callback_func}") + else: + LOG.error(e) + self.error_func(self, e) + self.join(allow_restart=True) + + def join(self, timeout: Optional[float] = ..., allow_restart: bool = True) -> None: """Terminating consumer channel""" - try: - self.channel.stop_consuming() - self.is_consuming = False - if self.channel.is_open: - self.channel.close() - if self.connection.is_open: - self.connection.close() - except Exception as x: - LOG.error(x) - finally: - super(ConsumerThread, self).join() + if self._is_consumer_alive: + try: + self.channel.stop_consuming() + if self.channel.is_open: + self.channel.close() + if self.connection.is_open: + self.connection.close() + except Exception as x: + LOG.error(x) + finally: + self._is_consuming = False + if not allow_restart: + self._is_consumer_alive = False + super(ConsumerThread, self).join(timeout=timeout) class MQConnector(ABC): @@ -179,7 +194,7 @@ def __init__(self, config: Optional[dict], service_name: str): self.property_key = 'properties' self._service_id = None self.service_name = service_name - self.consumers = dict() + self.consumers: Dict[str, ConsumerThread] = dict() self.consumer_properties = dict() self._vhost = None self._sync_thread = None @@ -515,7 +530,7 @@ def register_consumer(self, name: str, vhost: str, queue: str, if skip_on_existing: LOG.info(f'Consumer under index "{name}" already declared') return - self.stop_consumers(names=(name,)) + self.stop_consumers(names=(name,), allow_restart=False) self.consumer_properties.setdefault(name, {}) self.consumer_properties[name]['properties'] = \ dict(connection_params=self.get_connection_params(vhost), @@ -530,11 +545,14 @@ def register_consumer(self, name: str, vhost: str, queue: str, ConsumerThread(**self.consumer_properties[name]['properties']) def restart_consumer(self, name: str): - self.stop_consumers(names=(name,)) + self.stop_consumers(names=(name,), allow_restart=True) consumer_data = self.consumer_properties.get(name, {}) restart_attempts = consumer_data.get('restart_attempts', self.__max_consumer_restarts__) - if not consumer_data.get('properties'): + err_msg = '' + if not consumer_data.get('is_alive', True): + LOG.debug(f'Skipping joined consumer = "{name}"') + elif not consumer_data.get('properties'): err_msg = 'creation properties not found' elif 0 < restart_attempts < consumer_data.get('num_restarted', 0): err_msg = 'num restarts exceeded' @@ -543,7 +561,6 @@ def restart_consumer(self, name: str): self.run_consumers(names=(name,)) self.consumer_properties[name].setdefault('num_restarted', 0) self.consumer_properties[name]['num_restarted'] += 1 - err_msg = "" if err_msg: LOG.error(f'Cannot restart consumer "{name}" - {err_msg}') @@ -601,13 +618,12 @@ def run_consumers(self, names: tuple = (), daemon=True): if not names or len(names) == 0: names = list(self.consumers) for name in names: - if isinstance(self.consumers.get(name), ConsumerThread) and not \ - self.consumers[name].is_alive(): + if isinstance(self.consumers.get(name), ConsumerThread) and self.consumers[name].is_consumer_alive: self.consumers[name].daemon = daemon self.consumers[name].start() self.consumer_properties[name]['started'] = True - def stop_consumers(self, names: tuple = ()): + def stop_consumers(self, names: tuple = (), allow_restart: bool = True): """ Stops consumer threads based on the name if present (stops all of the declared consumers by default) @@ -617,15 +633,10 @@ def stop_consumers(self, names: tuple = ()): for name in names: try: if name in list(self.consumers): - self.consumers[name].join( - timeout=self.__consumer_join_timeout__) - if self.consumers[name] and self.consumers[name].is_alive(): - err_msg = f'{name} is alive although was set to join ' \ - f'for {self.__consumer_join_timeout__}!' - LOG.error(err_msg) - raise Exception(err_msg) - self.consumers[name] = None + self.consumers[name].join(timeout=self.__consumer_join_timeout__, allow_restart=allow_restart) + self.consumer_properties[name]['is_alive'] = self.consumers[name].is_consumer_alive self.consumer_properties[name]['started'] = False + self.consumers[name] = None except Exception as e: raise ChildProcessError(e) @@ -729,7 +740,7 @@ def stop_observer_thread(self): def stop(self): """Generic method for graceful instance stopping""" - self.stop_consumers() + self.stop_consumers(allow_restart=False) self.stop_sync_thread() self.stop_observer_thread()