Skip to content

Commit

Permalink
Return UBX messages instead of gpsd ones, fix web location setting
Browse files Browse the repository at this point in the history
  • Loading branch information
mrosseel committed Jan 29, 2025
1 parent 51432cb commit 8b84e4b
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 73 deletions.
47 changes: 30 additions & 17 deletions python/PiFinder/gps_ubx.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,39 @@

async def process_messages(parser, gps_queue, console_queue, error_info):
gps_locked = False
got_sat_update = False # Track if we got a NAV-SAT message this cycle

async for msg in parser.parse_messages():
# logging.debug(msg)
if msg.get("class") == "SKY":
logger.debug("GPS: SKY: %s", msg)
if "hdop" in msg:
error_info['error_2d'] = msg["hdop"]
if "pdop" in msg:
error_info['error_3d'] = msg["pdop"]
msg_class = msg.get("class", "")
logger.debug("GPS: %s: %s", msg_class, msg)

if msg_class == "NAV-DOP":
error_info['error_2d'] = msg["hdop"]
error_info['error_3d'] = msg["pdop"]

elif msg_class == "NAV-SAT":
# Preferred satellite info source
got_sat_update = True # Mark that we got NAV-SAT
sats_seen = msg["nSat"]
sats_used = sum(1 for sat in msg.get("satellites", []) if sat.get("used", False))
sats[0] = sats_seen
sats[1] = sats_used
gps_queue.put(("satellites", tuple(sats)))
logger.debug("Number of sats seen: %i, used: %i", sats_seen, sats_used)

elif msg_class == "NAV-SVINFO" and not got_sat_update:
# Fallback satellite info if NAV-SAT not available
if "nSat" in msg and "uSat" in msg:
sats_seen = msg["nSat"]
sats_used = msg["uSat"]
sats[0] = sats_seen
sats[1] = sats_used
gps_queue.put(("satellites", tuple(sats)))
logger.debug("Number of sats seen: %i", sats_seen)
elif msg.get("class") == "TPV":
logger.debug("GPS: TPV: %s", msg)
if "satellites" in msg:
sats[1] = msg["satellites"]
sats_used = msg.get("satellites", 0)
gps_queue.put(("satellites", tuple(sats)))
logger.debug("Number of sats used: %i", sats_used)
if "lat" in msg and "lon" in msg and "altHAE" in msg and "ecefpAcc" in msg:
logger.debug("Number of sats (SVINFO) seen: %i, used: %i", sats_seen, sats_used)
got_sat_update = False # Reset for next cycle

elif msg_class == "NAV-SOL":
if all(k in msg for k in ["lat", "lon", "altHAE", "ecefpAcc"]):
if not gps_locked and msg["ecefpAcc"] < MAX_GPS_ERROR:
gps_locked = True
console_queue.put("GPS: Locked")
Expand All @@ -54,9 +64,12 @@ async def process_messages(parser, gps_queue, console_queue, error_info):
}
))
logger.debug("GPS fix: %s", msg)

elif msg_class == "NAV-TIMEGPS":
if "time" in msg:
gps_queue.put(("time", msg["time"]))
logger.debug("Setting time to %s", msg["time"])

await asyncio.sleep(0)

async def gps_main(gps_queue, console_queue, log_queue):
Expand All @@ -72,4 +85,4 @@ async def gps_main(gps_queue, console_queue, log_queue):
await asyncio.sleep(5)

def gps_monitor(gps_queue, console_queue, log_queue):
asyncio.run(gps_main(gps_queue, console_queue, log_queue))
asyncio.run(gps_main(gps_queue, console_queue, log_queue))
87 changes: 49 additions & 38 deletions python/PiFinder/gps_ubx_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from enum import IntEnum
import datetime

logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)

class UBXClass(IntEnum):
NAV = 0x01
Expand Down Expand Up @@ -67,7 +67,7 @@ def _generate_ubx_message(self, msg_class: int, msg_id: int, payload: bytes) ->
ck_a = (ck_a + b) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
final_msg = b'\xB5\x62' + msg + bytes([ck_a, ck_b])
logging.debug(f"Generated message class=0x{msg_class:02x}, id=0x{msg_id:02x}, len={len(payload)}, checksum=0x{ck_a:02x}{ck_b:02x}")
# logging.debug(f"Generated message class=0x{msg_class:02x}, id=0x{msg_id:02x}, len={len(payload)}, checksum=0x{ck_a:02x}{ck_b:02x}")
return final_msg

async def _handle_initial_messages(self):
Expand Down Expand Up @@ -100,8 +100,8 @@ async def _poll_messages(self):
suffix = bytes.fromhex('0d 0a') # \r\n
wrapped_msg = prefix + poll_msg + suffix

logging.debug(f"Raw poll message: {poll_msg.hex()}")
logging.debug(f"Wrapped poll message: {wrapped_msg.hex()}")
# logging.debug(f"Raw poll message: {poll_msg.hex()}")
# logging.debug(f"Wrapped poll message: {wrapped_msg.hex()}")
self.writer.write(wrapped_msg)
await self.writer.drain()
await asyncio.sleep(1)
Expand All @@ -124,7 +124,7 @@ async def parse_messages(self):
data = await self.reader.read(1024)
if not data:
break
logging.debug(f"Raw data received ({len(data)} bytes): {data.hex()}")
# logging.debug(f"Raw data received ({len(data)} bytes): {data.hex()}")
self.buffer.extend(data)

while len(self.buffer) >= 6: # Minimum bytes needed to check length
Expand Down Expand Up @@ -158,7 +158,7 @@ async def parse_messages(self):
ck_b = (ck_b + ck_a) & 0xFF

if msg_data[-2] == ck_a and msg_data[-1] == ck_b:
logging.debug(f"Valid message: class=0x{msg_class:02x}, id=0x{msg_id:02x}, len={length}")
# logging.debug(f"Valid message: class=0x{msg_class:02x}, id=0x{msg_id:02x}, len={length}")
parsed = self._parse_ubx(bytes(msg_data))
if parsed.get('class'):
yield parsed
Expand All @@ -182,7 +182,7 @@ def _parse_ubx(self, data: bytes) -> dict:
payload = data[6:6+length]
parser = self.message_parsers.get((msg_class, msg_id))
if parser:
logging.debug(f"Found parser for message class=0x{msg_class:02x}, id=0x{msg_id:02x}")
# logging.debug(f"Found parser for message class=0x{msg_class:02x}, id=0x{msg_id:02x}")
result = parser(payload)
return result
logging.debug(f"No parser found for message class=0x{msg_class:02x}, id=0x{msg_id:02x}")
Expand All @@ -204,32 +204,33 @@ def _ecef_to_lla(self, x: float, y: float, z: float):
}

def _parse_nav_sol(self, data: bytes) -> dict:
if len(data) < 52:
return {"error": "Invalid payload length"}
logging.debug("Parsing nav-sol")
if len(data) < 52:
return {"error": "Invalid payload length for nav-sol"}
gpsFix = data[10]
ecefX = int.from_bytes(data[12:16], 'little', signed=True) / 100.0
ecefY = int.from_bytes(data[16:20], 'little', signed=True) / 100.0
ecefZ = int.from_bytes(data[20:24], 'little', signed=True) / 100.0
pAcc = int.from_bytes(data[24:28], 'little') / 100.0
numSV = data[47]
lla = self._ecef_to_lla(ecefX, ecefY, ecefZ)
return {
"class": "TPV",
result = {
"class": "NAV-SOL",
"mode": gpsFix,
"lat": lla["latitude"],
"lon": lla["longitude"],
"altHAE": lla["altitude"],
"ecefpAcc": pAcc,
"satellites": numSV
}
logging.debug(f"NAV-SOL result: {result}")
return result

def _parse_nav_sat(self, data: bytes) -> dict:
if len(data) < 8:
return {"error": "Invalid payload length"}
logging.debug("Parsing nav-sat")
if len(data) < 8:
return {"error": "Invalid payload length for nav-sat"}
numSvs = data[5]
print(f"{numSvs=}")
satellites = []
for i in range(numSvs):
offset = 8 + (12 * i)
Expand All @@ -249,24 +250,26 @@ def _parse_nav_sat(self, data: bytes) -> dict:
"azimuth": azim,
"used": bool(flags & 0x08)
})
return {
"class": "SKY",
result = {
"class": "NAV-SAT",
"nSat": sum(1 for sat in satellites),
"satellites": satellites
}
logging.debug(f"NAV-SAT result: {result}")
return result

def _parse_nav_svinfo(self, data: bytes) -> dict:
logging.debug("Parsing nav-svinfo")
if len(data) < 8:
logging.debug(f"SVINFO: Message too short ({len(data)} bytes)")
return {"error": "Invalid payload length"}
return {"error": "Invalid payload length for nav-svinfo"}

logging.debug("Parsing nav-svinfo")
numCh = data[4] # First byte after iTOW
numCh = data[4]
globalFlags = data[5]
logging.debug(f"SVINFO: Number of channels: {numCh}, flags: 0x{globalFlags:02x}")

satellites = []
used_sats = 0 # Counter for satellites used in fix
used_sats = 0

for i in range(numCh):
offset = 8 + (12 * i)
Expand All @@ -281,11 +284,11 @@ def _parse_nav_svinfo(self, data: bytes) -> dict:
elev = data[offset + 4]
azim = int.from_bytes(data[offset+6:offset+8], 'little')

is_used = bool(flags & 0x01) # Check if satellite is used in fix
is_used = bool(flags & 0x01)
if is_used:
used_sats += 1

if cno > 0: # Only include satellites with signal
if cno > 0:
satellites.append({
"id": svid,
"signal": cno,
Expand All @@ -294,48 +297,56 @@ def _parse_nav_svinfo(self, data: bytes) -> dict:
"used": is_used,
"quality": quality
})

logging.debug(f"SVINFO: Processed {len(satellites)} visible satellites, {used_sats} used in fix")

return {
"class": "SKY",
result = {
"class": "NAV-SVINFO",
"nSat": len(satellites),
"uSat": used_sats, # Return actual count of used satellites
"satellites": sorted(satellites, key=lambda x: x["id"]) # Sort by ID
"uSat": used_sats,
"satellites": sorted(satellites, key=lambda x: x["id"])
}
logging.debug(f"NAV-SVINFO result: {result}")
return result

def _parse_nav_timegps(self, data: bytes) -> dict:
logging.debug("Parsing nav-timegps")
if len(data) < 16:
return {"error": "Invalid payload length"}

return {"error": "Invalid payload length for nav-timegps"}
iTOW = int.from_bytes(data[0:4], 'little')
fTOW = int.from_bytes(data[4:8], 'little')
week = int.from_bytes(data[8:10], 'little', signed=True)
leapS = data[10]
valid = data[11]
tAcc = int.from_bytes(data[12:16], 'little')

gps_epoch = datetime.datetime(1980, 1, 6, tzinfo=datetime.timezone.utc)
tow = iTOW / 1000.0 + fTOW * 1e-9 # Combine integer and fractional parts
tow = iTOW / 1000.0 + fTOW * 1e-9
gps_time = gps_epoch + datetime.timedelta(weeks=week) + datetime.timedelta(seconds=tow)
utc_time = gps_time - datetime.timedelta(seconds=leapS)

return {
"class": "TPV",
result = {
"class": "NAV-TIMEGPS",
"time": utc_time.replace(tzinfo=datetime.timezone.utc),
"leapSeconds": leapS,
"valid": bool(valid & 0x01),
"tAcc": tAcc * 1e-9 # Convert to seconds
"tAcc": tAcc * 1e-9
}
logging.debug(f"NAV-TIMEGPS result: {result}")
return result

def _parse_nav_dop(self, data: bytes) -> dict:
logging.debug("Parsing nav-dop")
if len(data) < 18:
return {"error": "Invalid payload length"}
return {
"class": "SKY",
return {"error": "Invalid payload length for nav-dop"}
result = {
"class": "NAV-DOP",
"hdop": int.from_bytes(data[12:14], 'little') * 0.01,
"pdop": int.from_bytes(data[6:8], 'little') * 0.01
}
logging.debug(f"NAV-DOP result: {result}")
return result


if __name__ == "__main__":
Expand Down
38 changes: 20 additions & 18 deletions python/PiFinder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,25 +480,27 @@ def main(
# logger.debug("GPS fix msg: %s", gps_content)
if gps_content["lat"] + gps_content["lon"] != 0:
location: Location = shared_state.location()
location.lat = gps_content["lat"]
location.lon = gps_content["lon"]
location.altitude = gps_content["altitude"]
location.source = gps_content["source"]
location.last_gps_lock = (
datetime.datetime.now().time().isoformat()[:8]
)
if not location.lock:
# Write to config if we just got a lock
location.timezone = tz_finder.timezone_at(
lat=location.lat, lng=location.lon
)
# cfg.set_option("last_location", location)
console.write(
f'GPS: Location {location.lat} {location.lon} {location.altitude}'
if location.source != "WEB":
location.lat = gps_content["lat"]
location.lon = gps_content["lon"]
location.altitude = gps_content["altitude"]
location.source = gps_content["source"]
location.error_in_m = gps_content["error_in_m"]
location.last_gps_lock = (
datetime.datetime.now().time().isoformat()[:8]
)
location.lock = True

shared_state.set_location(location)
if not location.lock:
# Write to config if we just got a lock
location.timezone = tz_finder.timezone_at(
lat=location.lat, lng=location.lon
)
# cfg.set_option("last_location", location)
console.write(
f'GPS: Location {location.lat} {location.lon} {location.altitude}'
)
location.lock = True

shared_state.set_location(location)
if gps_msg == "time":
# logger.debug("GPS time msg: %s", gps_content)
gps_dt = gps_content
Expand Down
1 change: 1 addition & 0 deletions python/PiFinder/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ def gps_lock(lat: float = 50, lon: float = 3, altitude: float = 10):
"lat": lat,
"lon": lon,
"altitude": altitude,
"error_in_m": 0,
"source": "WEB",
},
)
Expand Down

0 comments on commit 8b84e4b

Please sign in to comment.