|
| 1 | +# Software License Agreement (BSD) |
| 2 | +# |
| 3 | +# @author Chris Iverach-Brereton (civerachb@clearpathrobotics.com) |
| 4 | +# @copyright (c) 2025, Clearpath Robotics, Inc., All rights reserved. |
| 5 | +# |
| 6 | +# Redistribution and use in source and binary forms, with or without |
| 7 | +# modification, are permitted provided that the following conditions are met: |
| 8 | +# * Redistributions of source code must retain the above copyright notice, |
| 9 | +# this list of conditions and the following disclaimer. |
| 10 | +# * Redistributions in binary form must reproduce the above copyright notice, |
| 11 | +# this list of conditions and the following disclaimer in the documentation |
| 12 | +# and/or other materials provided with the distribution. |
| 13 | +# * Neither the name of Clearpath Robotics nor the names of its contributors |
| 14 | +# may be used to endorse or promote products derived from this software |
| 15 | +# without specific prior written permission. |
| 16 | +# |
| 17 | +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| 18 | +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 19 | +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 20 | +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| 21 | +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 22 | +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 23 | +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 24 | +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 25 | +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 26 | +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| 27 | +# POSSIBILITY OF SUCH DAMAGE. |
| 28 | +from clearpath_config.common.types.config import BaseConfig |
| 29 | +from clearpath_config.common.utils.dictionary import flip_dict |
| 30 | +from clearpath_config.sensors.types.sensor import BaseSensor |
| 31 | + |
| 32 | + |
| 33 | +class BaseRouter(): |
| 34 | + """ |
| 35 | + Generic configuration parameters for a router we can install on the robot. |
| 36 | +
|
| 37 | + Must be sub-typed as appropriate. |
| 38 | + """ |
| 39 | + |
| 40 | + IP_ADDRESS = 'ip_address' |
| 41 | + LAUNCH_ENABLED = 'launch_enabled' |
| 42 | + |
| 43 | + def __init__( |
| 44 | + self, |
| 45 | + ip_address: str = None, |
| 46 | + launch_enabled: bool = True, |
| 47 | + ): |
| 48 | + self.ip_address = ip_address |
| 49 | + self.launch_enabled = launch_enabled |
| 50 | + |
| 51 | + def from_dict(self, d: dict): |
| 52 | + self.ip_address = d.get(self.IP_ADDRESS, self.ip_address) |
| 53 | + self.launch_enabled = d.get(self.LAUNCH_ENABLED, self.launch_enabled) |
| 54 | + |
| 55 | + @property |
| 56 | + def ip_address(self) -> str: |
| 57 | + return self._ip_address |
| 58 | + |
| 59 | + @ip_address.setter |
| 60 | + def ip_address(self, value: str) -> None: |
| 61 | + BaseSensor.assert_is_ipv4_address(value) |
| 62 | + self._ip_address = value |
| 63 | + |
| 64 | + @property |
| 65 | + def launch_enabled(self) -> bool: |
| 66 | + return self._launch_enabled |
| 67 | + |
| 68 | + @launch_enabled.setter |
| 69 | + def launch_enabled(self, value: bool): |
| 70 | + self._launch_enabled = value |
| 71 | + |
| 72 | + |
| 73 | +class PeplinkRouter(BaseRouter): |
| 74 | + """ |
| 75 | + Configuration object for Peplink routers. |
| 76 | +
|
| 77 | + These use the peplink_router_driver package to run the actual node |
| 78 | + """ |
| 79 | + |
| 80 | + MODEL = 'peplink' |
| 81 | + |
| 82 | + USERNAME = 'username' |
| 83 | + PASSWORD = 'password' |
| 84 | + ENABLE_GPS = 'enable_gps' |
| 85 | + PUBLISH_PASSWORDS = 'publish_passwords' |
| 86 | + |
| 87 | + DEFAULTS = { |
| 88 | + BaseRouter.IP_ADDRESS: '192.168.131.51', |
| 89 | + USERNAME: 'admin', |
| 90 | + PASSWORD: 'admin', |
| 91 | + ENABLE_GPS: False, |
| 92 | + PUBLISH_PASSWORDS: False, |
| 93 | + } |
| 94 | + |
| 95 | + def __init__( |
| 96 | + self, |
| 97 | + ip_address: str = DEFAULTS[BaseRouter.IP_ADDRESS], |
| 98 | + username: str = DEFAULTS[USERNAME], |
| 99 | + password: str = DEFAULTS[PASSWORD], |
| 100 | + enable_gps: bool = DEFAULTS[ENABLE_GPS], |
| 101 | + publish_passwords: bool = DEFAULTS[PUBLISH_PASSWORDS] |
| 102 | + ) -> None: |
| 103 | + super().__init__(ip_address=ip_address) |
| 104 | + self.username = username |
| 105 | + self.password = password |
| 106 | + self.enable_gps = enable_gps |
| 107 | + self.publish_passwords = publish_passwords |
| 108 | + |
| 109 | + def from_dict(self, d: dict) -> None: |
| 110 | + super().from_dict(d) |
| 111 | + self.username = d.get(self.USERNAME, self.username) |
| 112 | + self.password = d.get(self.PASSWORD, self.password) |
| 113 | + self.enable_gps = d.get(self.ENABLE_GPS, self.enable_gps) |
| 114 | + self.publish_passwords = d.get(self.PUBLISH_PASSWORDS, self.publish_passwords) |
| 115 | + |
| 116 | + @property |
| 117 | + def username(self) -> str: |
| 118 | + return self._username |
| 119 | + |
| 120 | + @username.setter |
| 121 | + def username(self, value: str) -> None: |
| 122 | + if not isinstance(value, str): |
| 123 | + raise TypeError(f'Username {value} must be of type "str"') |
| 124 | + if len(value) <= 0: |
| 125 | + raise ValueError('Username cannot be empty') |
| 126 | + self._username = value |
| 127 | + |
| 128 | + @property |
| 129 | + def password(self) -> str: |
| 130 | + return self._password |
| 131 | + |
| 132 | + @password.setter |
| 133 | + def password(self, value: str) -> None: |
| 134 | + if not isinstance(value, str): |
| 135 | + raise TypeError(f'Password {value} must be of type "str"') |
| 136 | + if len(value) <= 0: |
| 137 | + raise ValueError('Password cannot be empty') |
| 138 | + self._password = value |
| 139 | + |
| 140 | + @property |
| 141 | + def enable_gps(self) -> bool: |
| 142 | + return self._enable_gps |
| 143 | + |
| 144 | + @enable_gps.setter |
| 145 | + def enable_gps(self, value: bool) -> None: |
| 146 | + if not isinstance(value, bool): |
| 147 | + raise TypeError(f'Enable GPS flag {value} must be of type "bool"') |
| 148 | + self._enable_gps = value |
| 149 | + |
| 150 | + @property |
| 151 | + def publish_passwords(self) -> bool: |
| 152 | + return self._publish_passwords |
| 153 | + |
| 154 | + @publish_passwords.setter |
| 155 | + def publish_passwords(self, value: bool) -> None: |
| 156 | + if not isinstance(value, bool): |
| 157 | + raise TypeError(f'Publish passwords flag {value} must be of type "bool"') |
| 158 | + self._publish_passwords = value |
| 159 | + |
| 160 | + |
| 161 | +class Router: |
| 162 | + MODELS = { |
| 163 | + PeplinkRouter.MODEL: PeplinkRouter, |
| 164 | + } |
| 165 | + |
| 166 | + def __new__(cls, model: str) -> BaseRouter: |
| 167 | + if model not in Router.MODELS: |
| 168 | + raise TypeError(f'Router model {model} must be one of {Router.MODELS}') |
| 169 | + return Router.MODELS[model]() |
| 170 | + |
| 171 | + |
| 172 | +class WirelessConfig(BaseConfig): |
| 173 | + """ |
| 174 | + Contains additional wireless networking nodes we can enable/disable. |
| 175 | +
|
| 176 | + Currently only peplink devices are supported here, but this may expand in the future. |
| 177 | + """ |
| 178 | + |
| 179 | + WIRELESS = 'wireless' |
| 180 | + ROUTER = 'router' |
| 181 | + BASE_STATION = 'base_station' |
| 182 | + ENABLE_WIRELESS_WATCHER = 'enable_wireless_watcher' |
| 183 | + |
| 184 | + TEMPLATE = { |
| 185 | + ROUTER: ROUTER, |
| 186 | + BASE_STATION: BASE_STATION, |
| 187 | + ENABLE_WIRELESS_WATCHER: ENABLE_WIRELESS_WATCHER, |
| 188 | + } |
| 189 | + |
| 190 | + KEYS = flip_dict(TEMPLATE) |
| 191 | + |
| 192 | + DEFAULTS = { |
| 193 | + ROUTER: None, |
| 194 | + BASE_STATION: None, |
| 195 | + ENABLE_WIRELESS_WATCHER: True |
| 196 | + } |
| 197 | + |
| 198 | + def __init__( |
| 199 | + self, config: dict = {}, |
| 200 | + router: str = DEFAULTS[ROUTER], |
| 201 | + base_station: str = DEFAULTS[BASE_STATION], |
| 202 | + enable_wireless_watcher: str = DEFAULTS[ENABLE_WIRELESS_WATCHER] |
| 203 | + ) -> None: |
| 204 | + self._router = self.DEFAULTS[self.ROUTER] |
| 205 | + self._base_station = self.DEFAULTS[self.BASE_STATION] |
| 206 | + self._enable_wireless_watcher = self.DEFAULTS[self.ENABLE_WIRELESS_WATCHER] |
| 207 | + |
| 208 | + setters = { |
| 209 | + self.KEYS[self.ROUTER]: WirelessConfig.router, |
| 210 | + self.KEYS[self.BASE_STATION]: WirelessConfig.base_station, |
| 211 | + self.KEYS[self.ENABLE_WIRELESS_WATCHER]: WirelessConfig.enable_wireless_watcher, |
| 212 | + } |
| 213 | + super().__init__(setters, config) |
| 214 | + |
| 215 | + self.router = router |
| 216 | + self.base_station = base_station |
| 217 | + self.enable_wireless_watcher = enable_wireless_watcher |
| 218 | + |
| 219 | + def from_dict(self, value: dict): |
| 220 | + if self.ROUTER in value: |
| 221 | + router_cfg = value[self.ROUTER] |
| 222 | + if 'model' not in router_cfg: |
| 223 | + raise ValueError(f'Router configuration {router_cfg} must contain a "model" key') |
| 224 | + self.router = Router(router_cfg['model']) |
| 225 | + self.router.from_dict(router_cfg) |
| 226 | + |
| 227 | + if self.BASE_STATION in value: |
| 228 | + router_cfg = value[self.BASE_STATION] |
| 229 | + if 'model' not in router_cfg: |
| 230 | + raise ValueError(f'Base station configuration {router_cfg} must contain a "model" key') # noqa: E501 |
| 231 | + self.base_station = Router(router_cfg['model']) |
| 232 | + self.base_station.from_dict(router_cfg) |
| 233 | + |
| 234 | + if self.ENABLE_WIRELESS_WATCHER in value: |
| 235 | + self.enable_wireless_watcher = value[self.ENABLE_WIRELESS_WATCHER] |
| 236 | + |
| 237 | + @property |
| 238 | + def router(self) -> BaseRouter: |
| 239 | + return self._router |
| 240 | + |
| 241 | + @router.setter |
| 242 | + def router(self, value: dict | BaseRouter) -> None: |
| 243 | + if value is None: |
| 244 | + # no router; that's fine |
| 245 | + pass |
| 246 | + elif isinstance(value, dict): |
| 247 | + if 'model' not in value: |
| 248 | + raise ValueError(f'Router configuration {value} must contain a "model" key') |
| 249 | + self._router = Router(value['model']) |
| 250 | + self._router.from_dict(value) |
| 251 | + elif isinstance(value, BaseRouter): |
| 252 | + self._router = value |
| 253 | + else: |
| 254 | + raise TypeError(f'Router configuration must be of type "dict" or "BaseRouter". Got {value}') # noqa: E501 |
| 255 | + |
| 256 | + @property |
| 257 | + def base_station(self) -> BaseRouter: |
| 258 | + return self._base_station |
| 259 | + |
| 260 | + @base_station.setter |
| 261 | + def base_station(self, value: dict | BaseRouter) -> None: |
| 262 | + if value is None: |
| 263 | + # no base station; that's fine |
| 264 | + pass |
| 265 | + elif isinstance(value, dict): |
| 266 | + if 'model' not in value: |
| 267 | + raise ValueError(f'Base station configuration {value} must contain a "model" key') |
| 268 | + self._base_station = Router(value['model']) |
| 269 | + self._base_station.from_dict(value) |
| 270 | + elif isinstance(value, BaseRouter): |
| 271 | + self._base_station = value |
| 272 | + else: |
| 273 | + raise TypeError(f'Base station configuration must be of type "dict" or "BaseRouter". Got {value}') # noqa: E501 |
| 274 | + |
| 275 | + @property |
| 276 | + def enable_wireless_watcher(self) -> bool: |
| 277 | + return self._enable_wireless_watcher |
| 278 | + |
| 279 | + @enable_wireless_watcher.setter |
| 280 | + def enable_wireless_watcher(self, value: bool) -> None: |
| 281 | + if not isinstance(value, bool): |
| 282 | + raise TypeError(f'Enable wireless watcher {value} must be of type "bool"') |
| 283 | + self._enable_wireless_watcher = value |
0 commit comments