From cacd3100d6087a2c9d27aa6c7db08843bd300ea3 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 20:29:58 +0100 Subject: [PATCH 1/5] Implement exponential backoff with cap for reconnect after errors Signed-off-by: Olivier 'reivilibre --- launch/ntrip_client.launch | 2 ++ launch/ntrip_serial_device.launch | 2 ++ scripts/ntrip_ros.py | 2 ++ scripts/ntrip_serial_device_ros.py | 2 ++ src/ntrip_client/ntrip_base.py | 32 ++++++++++++++++++++++++++++-- src/ntrip_client/ntrip_client.py | 8 +++++++- src/ntrip_client/ntrip_ros_base.py | 2 ++ 7 files changed, 47 insertions(+), 3 deletions(-) diff --git a/launch/ntrip_client.launch b/launch/ntrip_client.launch index 1225e72..ee23f2f 100644 --- a/launch/ntrip_client.launch +++ b/launch/ntrip_client.launch @@ -66,6 +66,8 @@ + + diff --git a/launch/ntrip_serial_device.launch b/launch/ntrip_serial_device.launch index c999044..9f75838 100644 --- a/launch/ntrip_serial_device.launch +++ b/launch/ntrip_serial_device.launch @@ -36,6 +36,8 @@ + + diff --git a/scripts/ntrip_ros.py b/scripts/ntrip_ros.py index 6e7d19e..22d2639 100755 --- a/scripts/ntrip_ros.py +++ b/scripts/ntrip_ros.py @@ -75,6 +75,8 @@ def __init__(self): self._client.nmea_parser.nmea_min_length = self._nmea_min_length self._client.reconnect_attempt_max = self._reconnect_attempt_max self._client.reconnect_attempt_wait_seconds = self._reconnect_attempt_wait_seconds + self._client.reconnect_backoff_base = self._reconnect_backoff_base + self._client.reconnect_backoff_max_seconds = self._reconnect_backoff_max_seconds self._client.rtcm_timeout_seconds = rospy.get_param('~rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS) diff --git a/scripts/ntrip_serial_device_ros.py b/scripts/ntrip_serial_device_ros.py index 6fa64d3..c849b12 100755 --- a/scripts/ntrip_serial_device_ros.py +++ b/scripts/ntrip_serial_device_ros.py @@ -44,6 +44,8 @@ def __init__(self): self._client.nmea_parser.nmea_min_length = self._nmea_min_length self._client.reconnect_attempt_max = self._reconnect_attempt_max self._client.reconnect_attempt_wait_seconds = self._reconnect_attempt_wait_seconds + self._client.reconnect_backoff_base = self._reconnect_backoff_base + self._client.reconnect_backoff_max_seconds = self._reconnect_backoff_max_seconds if __name__ == '__main__': diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index b189529..d27ae23 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -10,6 +10,9 @@ class NTRIPBase: DEFAULT_RECONNECT_ATTEMPT_MAX = 10 DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5 + DEFAULT_RECONNECT_BACKOFF_BASE = 1.8 + DEFAULT_RECONNECT_BACKOFF_MAX_SECONDS = 300 + def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS self._logerr = logerr @@ -34,10 +37,16 @@ def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=loggin # Setup some state self._shutdown = False self._connected = False + # How many connection attempts have failed since we last connected? + # We don't consider connection successful until some valid data has been received. + # TODO merge _reconnect_attempts into this, since it seems to track almost the same + self._failed_connections = 0 # Public reconnect info self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS + self.reconnect_backoff_base = self.DEFAULT_RECONNECT_BACKOFF_BASE + self.reconnect_backoff_max_seconds = self.DEFAULT_RECONNECT_BACKOFF_MAX_SECONDS def connect(self): raise NotImplementedError("Must override connect") @@ -45,15 +54,31 @@ def connect(self): def disconnect(self): raise NotImplementedError("Must override disconnect") + def _compute_reconnect_wait_time(self): + """ + Compute a time to sleep before attempting to reconnect. + + This is based on an exponential backoff, capped to a maximum. + + All of the initial wait times, the maximum and the base are configurable. + """ + return min( + self.reconnect_attempt_wait_seconds * (self.reconnect_backoff_base ** self._failed_connections), + self.reconnect_backoff_max_seconds + ) + def reconnect(self): if self._connected: while not self._shutdown: self._reconnect_attempt_count += 1 self.disconnect() + to_wait = self._compute_reconnect_wait_time() + self._logerr(f"Reconnecting in {to_wait} seconds") + time.sleep(self._compute_reconnect_wait_time()) + self._failed_connections += 1 connect_success = self.connect() if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: - self._logerr('Reconnect failed. Retrying in {} seconds'.format(self.reconnect_attempt_wait_seconds)) - time.sleep(self.reconnect_attempt_wait_seconds) + self._logerr('Reconnect failed') elif self._reconnect_attempt_count >= self.reconnect_attempt_max: self._reconnect_attempt_count = 0 self._logerr('Reconnect failed. Max attempts reached. Shutting down') @@ -65,6 +90,9 @@ def reconnect(self): else: self._logdebug('Reconnect called while not connected, ignoring') + def mark_successful_connection(self): + self._failed_connections = 0 + def send_nmea(self): raise NotImplementedError("Must override send_nmea") diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index c439e9b..39904ed 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -235,7 +235,13 @@ def recv_rtcm(self): self._first_rtcm_received = True # Send the data to the RTCM parser to parse it - return self.rtcm_parser.parse(data) if data else [] + parsed_packets = self.rtcm_parser.parse(data) if data else [] + + if parsed_packets: + # now we've received a packet, we can happily say that we are successfully connected + self.mark_successful_connection() + + return parsed_packets def _form_request(self): if self._ntrip_version != None and self._ntrip_version != '': diff --git a/src/ntrip_client/ntrip_ros_base.py b/src/ntrip_client/ntrip_ros_base.py index 120187e..7e5ad4c 100755 --- a/src/ntrip_client/ntrip_ros_base.py +++ b/src/ntrip_client/ntrip_ros_base.py @@ -77,6 +77,8 @@ def __init__(self, name): self._nmea_min_length = rospy.get_param('~nmea_min_length', NMEA_DEFAULT_MIN_LENGTH) self._reconnect_attempt_max = rospy.get_param('~reconnect_attempt_max', NTRIPBase.DEFAULT_RECONNECT_ATTEMPT_MAX) self._reconnect_attempt_wait_seconds = rospy.get_param('~reconnect_attempt_wait_seconds', NTRIPBase.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS) + self._reconnect_backoff_base = rospy.get_param('~reconnect_backoff_base', NTRIPBase.DEFAULT_RECONNECT_BACKOFF_BASE) + self._reconnect_backoff_max_seconds = rospy.get_param('~reconnect_backoff_max_seconds', NTRIPBase.DEFAULT_RECONNECT_BACKOFF_MAX_SECONDS) def run(self): # Setup a shutdown hook From e93cde78d269ed51957d343981c27e8bdf4a2748 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:14:11 +0100 Subject: [PATCH 2/5] Don't die if we initially fail to connect Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 14 ++++++++------ src/ntrip_client/ntrip_ros_base.py | 6 ++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index d27ae23..12fe2fe 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -67,14 +67,16 @@ def _compute_reconnect_wait_time(self): self.reconnect_backoff_max_seconds ) - def reconnect(self): - if self._connected: + def reconnect(self, initial = False): + if self._connected or initial: while not self._shutdown: self._reconnect_attempt_count += 1 - self.disconnect() - to_wait = self._compute_reconnect_wait_time() - self._logerr(f"Reconnecting in {to_wait} seconds") - time.sleep(self._compute_reconnect_wait_time()) + if not initial: + self.disconnect() + to_wait = self._compute_reconnect_wait_time() + self._logerr(f"Reconnecting in {to_wait} seconds") + time.sleep(self._compute_reconnect_wait_time()) + initial = False self._failed_connections += 1 connect_success = self.connect() if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: diff --git a/src/ntrip_client/ntrip_ros_base.py b/src/ntrip_client/ntrip_ros_base.py index 7e5ad4c..90fdb3b 100755 --- a/src/ntrip_client/ntrip_ros_base.py +++ b/src/ntrip_client/ntrip_ros_base.py @@ -84,10 +84,8 @@ def run(self): # Setup a shutdown hook rospy.on_shutdown(self.stop) - # Connect the client - if not self._client.connect(): - rospy.logerr('Unable to connect to NTRIP server') - return 1 + # Start the client's reconnection loop + self._client.reconnect(initial = True) # Setup our subscriber self._nmea_sub = rospy.Subscriber('nmea', Sentence, self.subscribe_nmea, queue_size=10) From 029958bbd364bb98724b660713e8784f6448d51a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:23:23 +0100 Subject: [PATCH 3/5] Format delays more sensibly Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 12fe2fe..69cade1 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -74,7 +74,7 @@ def reconnect(self, initial = False): if not initial: self.disconnect() to_wait = self._compute_reconnect_wait_time() - self._logerr(f"Reconnecting in {to_wait} seconds") + self._logerr(f"Reconnecting in {to_wait:.1f} seconds") time.sleep(self._compute_reconnect_wait_time()) initial = False self._failed_connections += 1 From 71877aabda922e41dc4b14d383d0f5b0b4c77984 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:41:05 +0100 Subject: [PATCH 4/5] Restructure the reconnect method for clarity Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 53 ++++++++++++++++---------------- src/ntrip_client/ntrip_client.py | 1 - 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 69cade1..1786636 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -39,8 +39,7 @@ def __init__(self, logerr=logging.error, logwarn=logging.warning, loginfo=loggin self._connected = False # How many connection attempts have failed since we last connected? # We don't consider connection successful until some valid data has been received. - # TODO merge _reconnect_attempts into this, since it seems to track almost the same - self._failed_connections = 0 + self._reconnect_attempt_count = 0 # Public reconnect info self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX @@ -63,37 +62,39 @@ def _compute_reconnect_wait_time(self): All of the initial wait times, the maximum and the base are configurable. """ return min( - self.reconnect_attempt_wait_seconds * (self.reconnect_backoff_base ** self._failed_connections), + self.reconnect_attempt_wait_seconds * (self.reconnect_backoff_base ** self._reconnect_attempt_count ), self.reconnect_backoff_max_seconds ) def reconnect(self, initial = False): - if self._connected or initial: - while not self._shutdown: - self._reconnect_attempt_count += 1 - if not initial: - self.disconnect() - to_wait = self._compute_reconnect_wait_time() - self._logerr(f"Reconnecting in {to_wait:.1f} seconds") - time.sleep(self._compute_reconnect_wait_time()) - initial = False - self._failed_connections += 1 - connect_success = self.connect() - if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: - self._logerr('Reconnect failed') - elif self._reconnect_attempt_count >= self.reconnect_attempt_max: - self._reconnect_attempt_count = 0 - self._logerr('Reconnect failed. Max attempts reached. Shutting down') - self.shutdown() - break - elif connect_success: - self._reconnect_attempt_count = 0 - break - else: + if not (self._connected or initial): self._logdebug('Reconnect called while not connected, ignoring') + return + + while not self._shutdown: + if not initial: + # If this isn't our initial connection attempt, + # disconnect and wait a reconnection interval + self.disconnect() + to_wait = self._compute_reconnect_wait_time() + self._logerr(f"Reconnecting in {to_wait:.1f} seconds") + time.sleep(self._compute_reconnect_wait_time()) # TODO should this be ros.sleep ? + + initial = False + self._reconnect_attempt_count += 1 + + connect_success = self.connect() + if connect_success: + break + + if self._reconnect_attempt_count >= self.reconnect_attempt_max: + self._reconnect_attempt_count = 0 + self._logerr('Reconnect failed. Max attempts reached. Shutting down') + self.shutdown() + break def mark_successful_connection(self): - self._failed_connections = 0 + self._reconnect_attempt_count = 0 def send_nmea(self): raise NotImplementedError("Must override send_nmea") diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index 39904ed..a74ca9b 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -53,7 +53,6 @@ def __init__(self, host, port, mountpoint, ntrip_version, username, password, lo self.ca_cert = None # Private reconnect info - self._reconnect_attempt_count = 0 self._nmea_send_failed_count = 0 self._nmea_send_failed_max = 5 self._read_zero_bytes_count = 0 From 78ef47ebddebc1e8c9948b965b266e82e3b7e473 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 15 May 2025 21:45:12 +0100 Subject: [PATCH 5/5] Use rospy.sleep instead of time.sleep I believe time.sleep() is ill-advised in ROS because it's not interruptible? Signed-off-by: Olivier 'reivilibre --- src/ntrip_client/ntrip_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ntrip_client/ntrip_base.py b/src/ntrip_client/ntrip_base.py index 1786636..fb6646e 100644 --- a/src/ntrip_client/ntrip_base.py +++ b/src/ntrip_client/ntrip_base.py @@ -1,5 +1,5 @@ -import time import logging +import rospy from .nmea_parser import NMEAParser from .rtcm_parser import RTCMParser @@ -78,7 +78,7 @@ def reconnect(self, initial = False): self.disconnect() to_wait = self._compute_reconnect_wait_time() self._logerr(f"Reconnecting in {to_wait:.1f} seconds") - time.sleep(self._compute_reconnect_wait_time()) # TODO should this be ros.sleep ? + rospy.sleep(self._compute_reconnect_wait_time()) initial = False self._reconnect_attempt_count += 1