diff --git a/python/PiFinder/gps_pi.py b/python/PiFinder/gps_pi.py index 741b4c7d..e6ed57c1 100644 --- a/python/PiFinder/gps_pi.py +++ b/python/PiFinder/gps_pi.py @@ -6,27 +6,20 @@ import asyncio from PiFinder.multiproclogging import MultiprocLogging -from gpsdclient import GPSDClient import logging logger = logging.getLogger("GPS") -error_2d = 999 -error_3d = 999 - - -def is_tpv_accurate(tpv_dict): +def is_tpv_accurate(tpv_dict, error_info): """ Check the accuracy of the GPS fix """ - global error_2d, error_3d - # get the ecefpAcc if present, else get sep, else use 499 - # error = tpv_dict.get("ecefpAcc", tpv_dict.get("sep", 499)) mode = tpv_dict.get("mode") + error_2d = error_info['error_2d'] + error_3d = error_info['error_3d'] logger.debug( "GPS: TPV: mode=%s, ecefpAcc=%s, sep=%s, error_2d=%s, error_3d=%s", mode, - # error, tpv_dict.get("ecefpAcc", -1), tpv_dict.get("sep", -1), error_2d, @@ -39,86 +32,53 @@ def is_tpv_accurate(tpv_dict): else: return False - -async def aiter_wrapper(sync_iter): - """Wrap a synchronous iterable into an asynchronous one.""" - for item in sync_iter: - yield item - await asyncio.sleep(0) # Yield control to the event loop - - -async def process_sky_messages(client, gps_queue): - sky_stream = client.dict_stream(filter=["SKY"]) - global error_2d, error_3d - async for result in aiter_wrapper(sky_stream): - logger.debug("GPS: SKY: %s", result) - if result["class"] == "SKY": - error_2d = result.get("hdop", 999) - error_3d = result.get("pdop", 999) - if result["class"] == "SKY" and "nSat" in result: - sats_seen = result["nSat"] - sats_used = result["uSat"] - num_sats = (sats_seen, sats_used) - msg = ("satellites", num_sats) - logger.debug("Number of sats seen: %i", sats_seen) - gps_queue.put(msg) - await asyncio.sleep(0) # Yield control to the event loop - - -async def process_reading_messages(client, gps_queue, console_queue, gps_locked): - tpv_stream = client.dict_stream(convert_datetime=True, filter=["TPV"]) - async for result in aiter_wrapper(tpv_stream): - if is_tpv_accurate(result): - # if True: - logger.debug("last reading is %s", result) - if result.get("lat") and result.get("lon") and result.get("altHAE"): - if not gps_locked: - gps_locked = True - console_queue.put("GPS: Locked") - logger.debug("GPS locked") - msg = ( - "fix", - { - "lat": result.get("lat"), - "lon": result.get("lon"), - "altitude": result.get("altHAE"), - }, - ) - logger.debug("GPS fix: %s", msg) - gps_queue.put(msg) - - if result.get("time"): - msg = ("time", result.get("time")) - logger.debug("Setting time to %s", result.get("time")) - gps_queue.put(msg) - await asyncio.sleep(0) # Yield control to the event loop - +async def process_messages(parser, gps_queue, console_queue, error_info): + gps_locked = False + async for msg in parser.parse_messages(): + if msg.get("class") == "SKY": + logger.debug("GPS: SKY: %s", msg) + if "hdop" in msg: + error_info['error_2d'] = msg["hdop"] + if "pdop" in msg: + error_info['error_3d'] = msg["pdop"] + if "nSat" in msg: + sats_seen = msg["nSat"] + sats_used = msg.get("uSat", 0) + gps_queue.put(("satellites", (sats_seen, sats_used))) + logger.debug("Number of sats seen: %i", sats_seen) + elif msg.get("class") == "TPV": + logger.debug("GPS: TPV: %s", msg) + if is_tpv_accurate(msg, error_info): + if "lat" in msg and "lon" in msg and "altHAE" in msg: + if not gps_locked: + gps_locked = True + console_queue.put("GPS: Locked") + logger.debug("GPS locked") + gps_queue.put(( + "fix", + { + "lat": msg["lat"], + "lon": msg["lon"], + "altitude": msg["altHAE"] + } + )) + logger.debug("GPS fix: %s", msg) + if "time" in msg: + gps_queue.put(("time", msg["time"])) + logger.debug("Setting time to %s", msg["time"]) + await asyncio.sleep(0) async def gps_main(gps_queue, console_queue, log_queue): MultiprocLogging.configurer(log_queue) - gps_locked = False + error_info = {'error_2d': 999, 'error_3d': 999} while True: try: - with GPSDClient(host="127.0.0.1") as client: - while True: - logger.debug("GPS waking") - - # Run both functions concurrently - await asyncio.gather( - process_sky_messages(client, gps_queue), - process_reading_messages( - client, gps_queue, console_queue, gps_locked - ), - ) - - logger.debug("GPS sleeping now for 7s") - await asyncio.sleep(7) + parser = await UBXParser.connect(host='127.0.0.1', port=2947) + await process_messages(parser, gps_queue, console_queue, error_info) except Exception as e: logger.error(f"Error in GPS monitor: {e}") - await asyncio.sleep(5) # Wait before attempting to reconnect - + await asyncio.sleep(5) -# To run the GPS monitor def gps_monitor(gps_queue, console_queue, log_queue): asyncio.run(gps_main(gps_queue, console_queue, log_queue)) diff --git a/python/PiFinder/gps_ubx.py b/python/PiFinder/gps_ubx.py new file mode 100644 index 00000000..1e6fd7d0 --- /dev/null +++ b/python/PiFinder/gps_ubx.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 + +import json +import re +import math +import logging +import asyncio +from typing import Dict, Callable, Any, Optional, Tuple, List +from dataclasses import dataclass +from enum import IntEnum +import datetime + +logging.basicConfig(level=logging.DEBUG) + +class UBXClass(IntEnum): + NAV = 0x01 + CFG = 0x06 + +class NAVMessageId(IntEnum): + SOL = 0x06 + SVINFO = 0x30 + SAT = 0x35 + TIMEGPS = 0x20 + DOP = 0x04 + +class CFGMessageId(IntEnum): + MSG = 0x01 + RATE = 0x08 + +@dataclass +class ParserConfig: + enable: bool = True + json: bool = False + raw: int = 2 + +class UBXParser: + def __init__( + self, + reader: Optional[asyncio.StreamReader] = None, + writer: Optional[asyncio.StreamWriter] = None, + file_path: Optional[str] = None + ): + self.reader = reader + self.writer = writer + self.file_path = file_path + self.config = ParserConfig() + self.message_parsers: Dict[Tuple[int, int], Callable[[bytes], dict]] = {} + self.buffer = bytearray() + self._initialize_parsers() + + def _initialize_parsers(self): + self._register_parser(UBXClass.NAV, NAVMessageId.SOL, self._parse_nav_sol) + self._register_parser(UBXClass.NAV, NAVMessageId.SAT, self._parse_nav_sat) + self._register_parser(UBXClass.NAV, NAVMessageId.TIMEGPS, self._parse_nav_timegps) + self._register_parser(UBXClass.NAV, NAVMessageId.DOP, self._parse_nav_dop) + self._register_parser(UBXClass.NAV, NAVMessageId.SVINFO, self._parse_nav_svinfo) + + def _register_parser(self, msg_class: UBXClass, msg_id: int, parser: Callable[[bytes], dict]): + self.message_parsers[(msg_class, msg_id)] = parser + + def _generate_ubx_message(self, msg_class: int, msg_id: int, payload: bytes) -> bytes: + msg = bytes([msg_class, msg_id]) + len(payload).to_bytes(2, 'little') + payload + ck_a = ck_b = 0 + for b in msg: + ck_a = (ck_a + b) & 0xFF + ck_b = (ck_b + ck_a) & 0xFF + return b'\xB5\x62' + msg + bytes([ck_a, ck_b]) + + async def _handle_initial_messages(self): + if self.writer is None: + return + watch_command = b'?WATCH={"enable":true,"json":false,"raw":2}\n' + self.writer.write(watch_command) + await self.writer.drain() + cfg_msg_sat = self._generate_ubx_message( + UBXClass.CFG, CFGMessageId.MSG, + bytes([UBXClass.NAV, NAVMessageId.SAT, 0x01]) + ) + self.writer.write(cfg_msg_sat) + await self.writer.drain() + cfg_msg_svinfo = self._generate_ubx_message( + UBXClass.CFG, CFGMessageId.MSG, + bytes([UBXClass.NAV, NAVMessageId.SVINFO, 0x01]) + ) + self.writer.write(cfg_msg_svinfo) + await self.writer.drain() + poll_sat = self._generate_ubx_message(UBXClass.NAV, NAVMessageId.SAT, b'') + self.writer.write(poll_sat) + await self.writer.drain() + poll_svinfo = self._generate_ubx_message(UBXClass.NAV, NAVMessageId.SVINFO, b'') + self.writer.write(poll_svinfo) + await self.writer.drain() + + @classmethod + async def connect(cls, host='127.0.0.1', port=2947): + reader, writer = await asyncio.open_connection(host, port) + parser = cls(reader=reader, writer=writer) + await parser._handle_initial_messages() + return parser + + @classmethod + def from_file(cls, file_path: str): + return cls(file_path=file_path) + + async def parse_messages(self): + while True: + if self.reader: + data = await self.reader.read(1024) + if not data: + break + self.buffer.extend(data) + elif self.file_path: + if not self.buffer: + with open(self.file_path, 'rb') as f: + self.buffer = bytearray(f.read()) + else: + await asyncio.sleep(0.1) + else: + break + + while True: + start = self.buffer.find(b'\xB5\x62') + if start == -1: + self.buffer = bytearray() + break + if start > 0: + self.buffer = self.buffer[start:] + start = 0 + if len(self.buffer) < 8: + break + msg_class = self.buffer[2] + msg_id = self.buffer[3] + length = int.from_bytes(self.buffer[4:6], 'little') + total_length = 8 + length + 2 + if len(self.buffer) < total_length: + break + msg_data = bytes(self.buffer[:total_length]) + self.buffer = self.buffer[total_length:] + parsed = self._parse_ubx(msg_data) + if parsed.get('class'): + yield parsed + if self.file_path and not self.buffer: + break + await asyncio.sleep(0) + + def _parse_ubx(self, data: bytes) -> dict: + if len(data) < 8: + return {"error": "Invalid UBX message"} + msg_class = data[2] + msg_id = data[3] + length = int.from_bytes(data[4:6], 'little') + payload = data[6:6+length] + parser = self.message_parsers.get((msg_class, msg_id)) + if parser: + result = parser(payload) + return result + return {"error": "Unknown message type"} + + def _ecef_to_lla(self, x: float, y: float, z: float): + a = 6378137.0 + f = 1/298.257223563 + b = a * (1 - f) + e = (a**2 - b**2)**0.5 / a + p = (x**2 + y**2)**0.5 + theta = math.atan2(z*a, p*b) + lon = math.atan2(y, x) + lat = math.atan2( + z + e**2 * b * math.sin(theta)**3, + p - e**2 * a * math.cos(theta)**3 + ) + N = a / (1 - e**2 * math.sin(lat)**2)**0.5 + alt = p / math.cos(lat) - N + return { + "latitude": math.degrees(lat), + "longitude": math.degrees(lon), + "altitude": alt + } + + def _parse_nav_sol(self, data: bytes) -> dict: + if len(data) < 52: + return {"error": "Invalid payload length"} + gpsFix = data[10] + ecefX = int.from_bytes(data[12:16], 'little', signed=True) / 100.0 + ecefY = int.from_bytes(data[16:20], 'little', signed=True) / 100.0 + ecefZ = int.from_bytes(data[20:24], 'little', signed=True) / 100.0 + pAcc = int.from_bytes(data[24:28], 'little') / 100.0 + numSV = data[47] + lla = self._ecef_to_lla(ecefX, ecefY, ecefZ) + return { + "class": "TPV", + "mode": gpsFix, + "lat": lla["latitude"], + "lon": lla["longitude"], + "altHAE": lla["altitude"], + "ecefpAcc": pAcc, + "satellites": numSV + } + + def _parse_nav_sat(self, data: bytes) -> dict: + if len(data) < 8: + return {"error": "Invalid payload length"} + numSvs = data[5] + satellites = [] + for i in range(numSvs): + offset = 8 + (12 * i) + if len(data) < offset + 12: + break + gnssId = data[offset] + svId = data[offset + 1] + cno = data[offset + 2] + elev = data[offset + 3] + azim = int.from_bytes(data[offset+4:offset+6], 'little') + flags = data[offset + 11] + satellites.append({ + "id": svId, + "system": gnssId, + "signal": cno, + "elevation": elev, + "azimuth": azim, + "used": bool(flags & 0x08) + }) + return { + "class": "SKY", + "nSat": numSvs, + "uSat": sum(1 for sat in satellites if sat["used"]), + "satellites": satellites + } + + def _parse_nav_svinfo(self, data: bytes) -> dict: + if len(data) < 8: + return {"error": "Invalid payload length"} + numCh = int.from_bytes(data[4:6], 'little') + satellites = [] + for i in range(numCh): + offset = 8 + (12 * i) + if len(data) < offset + 12: + break + svid = data[offset] + flags = data[offset + 1] + cno = data[offset + 3] + elev = data[offset + 4] + azim = int.from_bytes(data[offset+6:offset+8], 'little') + satellites.append({ + "id": svid, + "signal": cno, + "elevation": elev, + "azimuth": azim, + "used": bool(flags & 0x01) + }) + return { + "class": "SKY", + "nSat": numCh, + "uSat": sum(1 for sat in satellites if sat["used"]), + "satellites": satellites + } + + def _parse_nav_timegps(self, data: bytes) -> dict: + if len(data) < 16: + return {"error": "Invalid payload length"} + iTOW = int.from_bytes(data[0:4], 'little') + week = int.from_bytes(data[8:10], 'little', signed=True) + leapS = data[10] + gps_epoch = datetime.datetime(1980, 1, 6, tzinfo=datetime.timezone.utc) + tow = iTOW / 1000.0 + gps_time = gps_epoch + datetime.timedelta(weeks=week) + datetime.timedelta(seconds=tow) + utc_time = gps_time - datetime.timedelta(seconds=leapS) + return { + "class": "TPV", + "time": utc_time.replace(tzinfo=datetime.timezone.utc) + } + + def _parse_nav_dop(self, data: bytes) -> dict: + if len(data) < 18: + return {"error": "Invalid payload length"} + return { + "class": "SKY", + "hdop": int.from_bytes(data[12:14], 'little') * 0.01, + "pdop": int.from_bytes(data[6:8], 'little') * 0.01 + } + +if __name__ == "__main__": + async def test(): + parser = UBXParser.from_file("captured.ubx") + async for msg in parser.parse_messages(): + print(msg) + asyncio.run(test())