Skip to content

Commit

Permalink
fixing ubx parsing, changing gpsstatus
Browse files Browse the repository at this point in the history
mrosseel committed Jan 28, 2025

Verified

This commit was signed with the committer’s verified signature.
Lyrisbee lyrisbee
1 parent 4efbda6 commit 51432cb
Showing 5 changed files with 138 additions and 108 deletions.
36 changes: 10 additions & 26 deletions python/PiFinder/gps_ubx.py
Original file line number Diff line number Diff line change
@@ -12,41 +12,23 @@
logger = logging.getLogger("GPS")
sats = [0,0]

def is_tpv_accurate(tpv_dict, error_info):
"""
Check the accuracy of the GPS fix
"""
mode = tpv_dict.get("mode")
error_2d = error_info['error_2d']
error_3d = error_info['error_3d']
logger.debug(
"GPS: TPV: mode=%s, ecefpAcc=%s, sep=%s, error_2d=%s, error_3d=%s",
mode,
tpv_dict.get("ecefpAcc", -1),
tpv_dict.get("sep", -1),
error_2d,
error_3d,
)
if mode == 2 and error_2d < 1000:
return True
if mode == 3 and error_3d < 500:
return True
else:
return False
MAX_GPS_ERROR = 50000 # 50 km

async def process_messages(parser, gps_queue, console_queue, error_info):
gps_locked = False
async for msg in parser.parse_messages():
print(msg)
# logging.debug(msg)
if msg.get("class") == "SKY":
logger.debug("GPS: SKY: %s", msg)
if "hdop" in msg:
error_info['error_2d'] = msg["hdop"]
if "pdop" in msg:
error_info['error_3d'] = msg["pdop"]
if "nSat" in msg:
if "nSat" in msg and "uSat" in msg:
sats_seen = msg["nSat"]
sats_used = msg["uSat"]
sats[0] = sats_seen
sats[1] = sats_used
gps_queue.put(("satellites", tuple(sats)))
logger.debug("Number of sats seen: %i", sats_seen)
elif msg.get("class") == "TPV":
@@ -56,8 +38,8 @@ async def process_messages(parser, gps_queue, console_queue, error_info):
sats_used = msg.get("satellites", 0)
gps_queue.put(("satellites", tuple(sats)))
logger.debug("Number of sats used: %i", sats_used)
if "lat" in msg and "lon" in msg and "altHAE" in msg:
if not gps_locked:
if "lat" in msg and "lon" in msg and "altHAE" in msg and "ecefpAcc" in msg:
if not gps_locked and msg["ecefpAcc"] < MAX_GPS_ERROR:
gps_locked = True
console_queue.put("GPS: Locked")
logger.debug("GPS locked")
@@ -66,7 +48,9 @@ async def process_messages(parser, gps_queue, console_queue, error_info):
{
"lat": msg["lat"],
"lon": msg["lon"],
"altitude": msg["altHAE"]
"altitude": msg["altHAE"],
"source": "GPS",
"error_in_m": msg["ecefpAcc"]
}
))
logger.debug("GPS fix: %s", msg)
165 changes: 112 additions & 53 deletions python/PiFinder/gps_ubx_parser.py
Original file line number Diff line number Diff line change
@@ -66,38 +66,52 @@ def _generate_ubx_message(self, msg_class: int, msg_id: int, payload: bytes) ->
for b in msg:
ck_a = (ck_a + b) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
return b'\xB5\x62' + msg + bytes([ck_a, ck_b])
final_msg = b'\xB5\x62' + msg + bytes([ck_a, ck_b])
logging.debug(f"Generated message class=0x{msg_class:02x}, id=0x{msg_id:02x}, len={len(payload)}, checksum=0x{ck_a:02x}{ck_b:02x}")
return final_msg

async def _handle_initial_messages(self):
if self.writer is None:
return
watch_command = b'?WATCH={"enable":true,"json":false,"raw":2}\n'
self.writer.write(watch_command)
await self.writer.drain()
cfg_msg_sat = self._generate_ubx_message(
UBXClass.CFG, CFGMessageId.MSG,
bytes([UBXClass.NAV, NAVMessageId.SAT, 0x01])
)
self.writer.write(cfg_msg_sat)
await self.writer.drain()
cfg_msg_svinfo = self._generate_ubx_message(
UBXClass.CFG, CFGMessageId.MSG,
bytes([UBXClass.NAV, NAVMessageId.SVINFO, 0x01])
)
self.writer.write(cfg_msg_svinfo)
await self.writer.drain()
poll_sat = self._generate_ubx_message(UBXClass.NAV, NAVMessageId.SAT, b'')
self.writer.write(poll_sat)
await self.writer.drain()
poll_svinfo = self._generate_ubx_message(UBXClass.NAV, NAVMessageId.SVINFO, b'')
self.writer.write(poll_svinfo)

# Watch command needs proper JSON
watch_json = json.dumps({
"enable": True,
"json": False,
"raw": 2,
"binary": True,
"nmea": False,
"scaled": False,
"timing": False,
"split24": False,
"pps": False,
"device": True
}, separators=(',', ':'))
watch_command = f'?WATCH={watch_json}\r\n'
self.writer.write(watch_command.encode())
await self.writer.drain()

async def _poll_messages(self):
while True:
for msg_id in [NAVMessageId.SVINFO, NAVMessageId.TIMEGPS]:
poll_msg = self._generate_ubx_message(UBXClass.NAV, msg_id, b'')
# Use bytes.fromhex to create the wrapped message
prefix = bytes.fromhex('21 31 57 3d') # !1W=
suffix = bytes.fromhex('0d 0a') # \r\n
wrapped_msg = prefix + poll_msg + suffix

logging.debug(f"Raw poll message: {poll_msg.hex()}")
logging.debug(f"Wrapped poll message: {wrapped_msg.hex()}")
self.writer.write(wrapped_msg)
await self.writer.drain()
await asyncio.sleep(1)

@classmethod
async def connect(cls, host='127.0.0.1', port=2947):
reader, writer = await asyncio.open_connection(host, port)
parser = cls(reader=reader, writer=writer)
await parser._handle_initial_messages()
asyncio.create_task(parser._poll_messages()) # Start polling in background
return parser

@classmethod
@@ -110,41 +124,55 @@ async def parse_messages(self):
data = await self.reader.read(1024)
if not data:
break
logging.debug(f"Raw data received ({len(data)} bytes): {data.hex()}")
self.buffer.extend(data)
elif self.file_path:
if not self.buffer:
with open(self.file_path, 'rb') as f:
self.buffer = bytearray(f.read())
else:
await asyncio.sleep(0.1)
else:
break

while True:
while len(self.buffer) >= 6: # Minimum bytes needed to check length
# Find UBX header
start = self.buffer.find(b'\xB5\x62')
if start == -1:
self.buffer = bytearray()
self.buffer.clear()
break

if start > 0:
self.buffer = self.buffer[start:]
start = 0
if len(self.buffer) < 8:
break

# Get message length from header
msg_class = self.buffer[2]
msg_id = self.buffer[3]
length = int.from_bytes(self.buffer[4:6], 'little')
total_length = 8 + length + 2
total_length = 8 + length # header (6) + payload + checksum (2)

# Check if we have the complete message
if len(self.buffer) < total_length:
break
msg_data = bytes(self.buffer[:total_length])
logging.debug(f"Incomplete message: have {len(self.buffer)}, need {total_length}")
break # Wait for more data

# Extract message including checksum
msg_data = self.buffer[:total_length]

# Verify checksum
ck_a = ck_b = 0
for b in msg_data[2:-2]: # Skip header and checksum
ck_a = (ck_a + b) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF

if msg_data[-2] == ck_a and msg_data[-1] == ck_b:
logging.debug(f"Valid message: class=0x{msg_class:02x}, id=0x{msg_id:02x}, len={length}")
parsed = self._parse_ubx(bytes(msg_data))
if parsed.get('class'):
yield parsed
else:
logging.warning(f"Checksum mismatch: expected {ck_a:02x}{ck_b:02x}, got {msg_data[-2]:02x}{msg_data[-1]:02x}")

# Remove processed message from buffer
self.buffer = self.buffer[total_length:]
parsed = self._parse_ubx(msg_data)
if parsed.get('class'):
yield parsed

if self.file_path and not self.buffer:
break
await asyncio.sleep(0)


def _parse_ubx(self, data: bytes) -> dict:
if len(data) < 8:
return {"error": "Invalid UBX message"}
@@ -154,8 +182,10 @@ def _parse_ubx(self, data: bytes) -> dict:
payload = data[6:6+length]
parser = self.message_parsers.get((msg_class, msg_id))
if parser:
logging.debug(f"Found parser for message class=0x{msg_class:02x}, id=0x{msg_id:02x}")
result = parser(payload)
return result
logging.debug(f"No parser found for message class=0x{msg_class:02x}, id=0x{msg_id:02x}")
return {"error": "Unknown message type"}

def _ecef_to_lla(self, x: float, y: float, z: float):
@@ -227,46 +257,75 @@ def _parse_nav_sat(self, data: bytes) -> dict:

def _parse_nav_svinfo(self, data: bytes) -> dict:
if len(data) < 8:
logging.debug(f"SVINFO: Message too short ({len(data)} bytes)")
return {"error": "Invalid payload length"}

logging.debug("Parsing nav-svinfo")
numCh = data[4] # Read single byte for number of channels
numCh = data[4] # First byte after iTOW
globalFlags = data[5]
logging.debug(f"SVINFO: Number of channels: {numCh}, flags: 0x{globalFlags:02x}")

satellites = []
used_sats = 0 # Counter for satellites used in fix

for i in range(numCh):
offset = 8 + (12 * i)
if len(data) < offset + 12:
logging.warning(f"SVINFO: Message truncated at satellite {i}")
break

svid = data[offset]
flags = data[offset + 1]
quality = data[offset + 2]
cno = data[offset + 3]
elev = data[offset + 4]
azim = int.from_bytes(data[offset+6:offset+8], 'little')
satellites.append({
"id": svid,
"signal": cno,
"elevation": elev,
"azimuth": azim,
"used": bool(flags & 0x01)
})

is_used = bool(flags & 0x01) # Check if satellite is used in fix
if is_used:
used_sats += 1

if cno > 0: # Only include satellites with signal
satellites.append({
"id": svid,
"signal": cno,
"elevation": elev,
"azimuth": azim,
"used": is_used,
"quality": quality
})

logging.debug(f"SVINFO: Processed {len(satellites)} visible satellites, {used_sats} used in fix")

return {
"class": "SKY",
"nSat": numCh,
"uSat": sum(1 for sat in satellites if sat["used"]),
"satellites": satellites
"nSat": len(satellites),
"uSat": used_sats, # Return actual count of used satellites
"satellites": sorted(satellites, key=lambda x: x["id"]) # Sort by ID
}

def _parse_nav_timegps(self, data: bytes) -> dict:
if len(data) < 16:
return {"error": "Invalid payload length"}

iTOW = int.from_bytes(data[0:4], 'little')
fTOW = int.from_bytes(data[4:8], 'little')
week = int.from_bytes(data[8:10], 'little', signed=True)
leapS = data[10]
valid = data[11]
tAcc = int.from_bytes(data[12:16], 'little')

gps_epoch = datetime.datetime(1980, 1, 6, tzinfo=datetime.timezone.utc)
tow = iTOW / 1000.0
tow = iTOW / 1000.0 + fTOW * 1e-9 # Combine integer and fractional parts
gps_time = gps_epoch + datetime.timedelta(weeks=week) + datetime.timedelta(seconds=tow)
utc_time = gps_time - datetime.timedelta(seconds=leapS)

return {
"class": "TPV",
"time": utc_time.replace(tzinfo=datetime.timezone.utc)
"time": utc_time.replace(tzinfo=datetime.timezone.utc),
"leapSeconds": leapS,
"valid": bool(valid & 0x01),
"tAcc": tAcc * 1e-9 # Convert to seconds
}

def _parse_nav_dop(self, data: bytes) -> dict:
1 change: 1 addition & 0 deletions python/PiFinder/main.py
Original file line number Diff line number Diff line change
@@ -483,6 +483,7 @@ def main(
location.lat = gps_content["lat"]
location.lon = gps_content["lon"]
location.altitude = gps_content["altitude"]
location.source = gps_content["source"]
location.last_gps_lock = (
datetime.datetime.now().time().isoformat()[:8]
)
Loading

0 comments on commit 51432cb

Please sign in to comment.