From 52f62c07176a9b3fcbaa6175d72f6f447f684fa0 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 20:29:58 +0100 Subject: [PATCH 1/5] Implement exponential backoff with cap for reconnect after errors Signed-off-by: Olivier 'reivilibre --- launch/ntrip_client.launch | 2 ++ launch/ntrip_serial_device.launch | 2 ++ scripts/ntrip_ros.py | 2 ++ scripts/ntrip_serial_device_ros.py | 2 ++ src/ntrip_client/ntrip_base.py | 32 ++++++++++++++++++++++++++++-- src/ntrip_client/ntrip_client.py | 8 +++++++- src/ntrip_client/ntrip_ros_base.py | 4 +++- 7 files changed, 48 insertions(+), 4 deletions(-) diff --git a/launch/ntrip_client.launch b/launch/ntrip_client.launch index 1225e72..ee23f2f 100644 --- a/launch/ntrip_client.launch +++ b/launch/ntrip_client.launch @@ -66,6 +66,8 @@ + + diff --git a/launch/ntrip_serial_device.launch b/launch/ntrip_serial_device.launch index c999044..9f75838 100644 --- a/launch/ntrip_serial_device.launch +++ b/launch/ntrip_serial_device.launch @@ -36,6 +36,8 @@ + + diff --git a/scripts/ntrip_ros.py b/scripts/ntrip_ros.py index 6e7d19e..22d2639 100755 --- a/scripts/ntrip_ros.py +++ b/scripts/ntrip_ros.py @@ -75,6 +75,8 @@ def __init__(self): self._client.nmea_parser.nmea_min_length = self._nmea_min_length self._client.reconnect_attempt_max = self._reconnect_attempt_max self._client.reconnect_attempt_wait_seconds = self._reconnect_attempt_wait_seconds + self._client.reconnect_backoff_base = self._reconnect_backoff_base + self._client.reconnect_backoff_max_seconds = self._reconnect_backoff_max_seconds self._client.rtcm_timeout_seconds = rospy.get_param('~rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS) diff --git a/scripts/ntrip_serial_device_ros.py b/scripts/ntrip_serial_device_ros.py index 6fa64d3..c849b12 100755 --- a/scripts/ntrip_serial_device_ros.py +++ b/scripts/ntrip_serial_device_ros.py @@ -44,6 +44,8 @@ def __init__(self): self._client.nmea_parser.nmea_min_length = self._nmea_min_length self._client.reconnect_attempt_max = self._reconnect_attempt_max self._client.reconnect_attempt_wait_seconds = self._reconnect_attempt_wait_seconds + self._client.reconnect_backoff_base = self._reconnect_backoff_base + self._client.reconnect_backoff_max_seconds = self._reconnect_backoff_max_seconds if __name__ == '__main__': diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index f353018..3787e09 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -10,6 +10,9 @@ class NTRIPBase: DEFAULT_RECONNECT_ATTEMPT_MAX = 10 DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5 + DEFAULT_RECONNECT_BACKOFF_BASE = 1.8 + DEFAULT_RECONNECT_BACKOFF_MAX_SECONDS = 300 + def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS self._logerr = logerr @@ -34,10 +37,16 @@ def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=loggin # Setup some state self._shutdown = False self._connected = False + # How many connection attempts have failed since we last connected? + # We don't consider connection successful until some valid data has been received. + # TODO merge _reconnect_attempts into this, since it seems to track almost the same + self._failed_connections = 0 # Public reconnect info self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS + self.reconnect_backoff_base = self.DEFAULT_RECONNECT_BACKOFF_BASE + self.reconnect_backoff_max_seconds = self.DEFAULT_RECONNECT_BACKOFF_MAX_SECONDS def connect(self): raise NotImplementedError("Must override connect") @@ -45,15 +54,31 @@ def connect(self): def disconnect(self): raise NotImplementedError("Must override disconnect") + def _compute_reconnect_wait_time(self): + """ + Compute a time to sleep before attempting to reconnect. + + This is based on an exponential backoff, capped to a maximum. + + All of the initial wait times, the maximum and the base are configurable. + """ + return min( + self.reconnect_attempt_wait_seconds * (self.reconnect_backoff_base ** self._failed_connections), + self.reconnect_backoff_max_seconds + ) + def reconnect(self): if self._connected: while not self._shutdown: self._reconnect_attempt_count += 1 self.disconnect() + to_wait = self._compute_reconnect_wait_time() + self._logerr(f"Reconnecting in {to_wait} seconds") + time.sleep(self._compute_reconnect_wait_time()) + self._failed_connections += 1 connect_success = self.connect() if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: - self._logerr('Reconnect failed. Retrying in {} seconds'.format(self.reconnect_attempt_wait_seconds)) - time.sleep(self.reconnect_attempt_wait_seconds) + self._logerr('Reconnect failed') elif self._reconnect_attempt_count >= self.reconnect_attempt_max: self._reconnect_attempt_count = 0 raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count)) @@ -63,6 +88,9 @@ def reconnect(self): else: self._logdebug('Reconnect called while not connected, ignoring') + def mark_successful_connection(self): + self._failed_connections = 0 + def send_nmea(self): raise NotImplementedError("Must override send_nmea") diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index c439e9b..39904ed 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -235,7 +235,13 @@ def recv_rtcm(self): self._first_rtcm_received = True # Send the data to the RTCM parser to parse it - return self.rtcm_parser.parse(data) if data else [] + parsed_packets = self.rtcm_parser.parse(data) if data else [] + + if parsed_packets: + # now we've received a packet, we can happily say that we are successfully connected + self.mark_successful_connection() + + return parsed_packets def _form_request(self): if self._ntrip_version != None and self._ntrip_version != '': diff --git a/src/ntrip_client/ntrip_ros_base.py b/src/ntrip_client/ntrip_ros_base.py index 9719555..0804a9d 100755 --- a/src/ntrip_client/ntrip_ros_base.py +++ b/src/ntrip_client/ntrip_ros_base.py @@ -77,6 +77,8 @@ def __init__(self, name): self._nmea_min_length = rospy.get_param('~nmea_min_length', NMEA_DEFAULT_MIN_LENGTH) self._reconnect_attempt_max = rospy.get_param('~reconnect_attempt_max', NTRIPBase.DEFAULT_RECONNECT_ATTEMPT_MAX) self._reconnect_attempt_wait_seconds = rospy.get_param('~reconnect_attempt_wait_seconds', NTRIPBase.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS) + self._reconnect_backoff_base = rospy.get_param('~reconnect_backoff_base', NTRIPBase.DEFAULT_RECONNECT_BACKOFF_BASE) + self._reconnect_backoff_max_seconds = rospy.get_param('~reconnect_backoff_max_seconds', NTRIPBase.DEFAULT_RECONNECT_BACKOFF_MAX_SECONDS) def run(self): # Setup a shutdown hook @@ -173,4 +175,4 @@ def _create_rtcm_msgs_rtcm_message(self, rtcm): frame_id=self._rtcm_frame_id ), message=rtcm - ) \ No newline at end of file + ) From 1a203356862f75dd3ed4e184bc3d867885adc3f8 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:14:11 +0100 Subject: [PATCH 2/5] Don't die if we initially fail to connect Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 14 ++++++++------ src/ntrip_client/ntrip_ros_base.py | 6 ++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 3787e09..384ad7d 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -67,14 +67,16 @@ def _compute_reconnect_wait_time(self): self.reconnect_backoff_max_seconds ) - def reconnect(self): - if self._connected: + def reconnect(self, initial = False): + if self._connected or initial: while not self._shutdown: self._reconnect_attempt_count += 1 - self.disconnect() - to_wait = self._compute_reconnect_wait_time() - self._logerr(f"Reconnecting in {to_wait} seconds") - time.sleep(self._compute_reconnect_wait_time()) + if not initial: + self.disconnect() + to_wait = self._compute_reconnect_wait_time() + self._logerr(f"Reconnecting in {to_wait} seconds") + time.sleep(self._compute_reconnect_wait_time()) + initial = False self._failed_connections += 1 connect_success = self.connect() if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: diff --git a/src/ntrip_client/ntrip_ros_base.py b/src/ntrip_client/ntrip_ros_base.py index 0804a9d..c45ec7f 100755 --- a/src/ntrip_client/ntrip_ros_base.py +++ b/src/ntrip_client/ntrip_ros_base.py @@ -84,10 +84,8 @@ def run(self): # Setup a shutdown hook rospy.on_shutdown(self.stop) - # Connect the client - if not self._client.connect(): - rospy.logerr('Unable to connect to NTRIP server') - return 1 + # Start the client's reconnection loop + self._client.reconnect(initial = True) # Setup our subscriber self._nmea_sub = rospy.Subscriber('nmea', Sentence, self.subscribe_nmea, queue_size=10) From 2f16610f0c7c57ad2badf51d45eef3b3d2a4156a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:23:23 +0100 Subject: [PATCH 3/5] Format delays more sensibly Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 384ad7d..1d1a9eb 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -74,7 +74,7 @@ def reconnect(self, initial = False): if not initial: self.disconnect() to_wait = self._compute_reconnect_wait_time() - self._logerr(f"Reconnecting in {to_wait} seconds") + self._logerr(f"Reconnecting in {to_wait:.1f} seconds") time.sleep(self._compute_reconnect_wait_time()) initial = False self._failed_connections += 1 From fb8948658b3b0ed01e8f5fc3f0e45dbc87cfe86a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:41:05 +0100 Subject: [PATCH 4/5] Restructure the reconnect method for clarity Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 48 ++++++++++++++++---------------- src/ntrip_client/ntrip_client.py | 1 - 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 1d1a9eb..4d8a59d 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -39,8 +39,7 @@ def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=loggin self._connected = False # How many connection attempts have failed since we last connected? # We don't consider connection successful until some valid data has been received. - # TODO merge _reconnect_attempts into this, since it seems to track almost the same - self._failed_connections = 0 + self._reconnect_attempt_count = 0 # Public reconnect info self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX @@ -63,35 +62,36 @@ def _compute_reconnect_wait_time(self): All of the initial wait times, the maximum and the base are configurable. """ return min( - self.reconnect_attempt_wait_seconds * (self.reconnect_backoff_base ** self._failed_connections), + self.reconnect_attempt_wait_seconds * (self.reconnect_backoff_base ** self._reconnect_attempt_count ), self.reconnect_backoff_max_seconds ) def reconnect(self, initial = False): - if self._connected or initial: - while not self._shutdown: - self._reconnect_attempt_count += 1 - if not initial: - self.disconnect() - to_wait = self._compute_reconnect_wait_time() - self._logerr(f"Reconnecting in {to_wait:.1f} seconds") - time.sleep(self._compute_reconnect_wait_time()) - initial = False - self._failed_connections += 1 - connect_success = self.connect() - if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: - self._logerr('Reconnect failed') - elif self._reconnect_attempt_count >= self.reconnect_attempt_max: - self._reconnect_attempt_count = 0 - raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count)) - elif connect_success: - self._reconnect_attempt_count = 0 - break - else: + if not (self._connected or initial): self._logdebug('Reconnect called while not connected, ignoring') + return + + while not self._shutdown: + if not initial: + # If this isn't our initial connection attempt, + # disconnect and wait a reconnection interval + self.disconnect() + to_wait = self._compute_reconnect_wait_time() + self._logerr(f"Reconnecting in {to_wait:.1f} seconds") + time.sleep(self._compute_reconnect_wait_time()) # TODO should this be ros.sleep ? + + initial = False + self._reconnect_attempt_count += 1 + + connect_success = self.connect() + if connect_success: + break + + if self._reconnect_attempt_count >= self.reconnect_attempt_max: + raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count)) def mark_successful_connection(self): - self._failed_connections = 0 + self._reconnect_attempt_count = 0 def send_nmea(self): raise NotImplementedError("Must override send_nmea") diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index 39904ed..a74ca9b 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -53,7 +53,6 @@ def __init__(self, host, port, mountpoint, ntrip_version, username, password, lo self.ca_cert = None # Private reconnect info - self._reconnect_attempt_count = 0 self._nmea_send_failed_count = 0 self._nmea_send_failed_max = 5 self._read_zero_bytes_count = 0 From f9eb69a1fadf772db3d311dd99a4929bd32f31aa Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:45:12 +0100 Subject: [PATCH 5/5] Use rospy.sleep instead of time.sleep I believe time.sleep() is ill-advised in ROS because it's not interruptible? Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 4d8a59d..65cc278 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -1,5 +1,5 @@ -import time import logging +import rospy from .nmea_parser import NMEAParser from .rtcm_parser import RTCMParser @@ -78,7 +78,7 @@ def reconnect(self, initial = False): self.disconnect() to_wait = self._compute_reconnect_wait_time() self._logerr(f"Reconnecting in {to_wait:.1f} seconds") - time.sleep(self._compute_reconnect_wait_time()) # TODO should this be ros.sleep ? + rospy.sleep(self._compute_reconnect_wait_time()) initial = False self._reconnect_attempt_count += 1