forked from SpectacularAI/u-blox-capture
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ubx_logger.py
70 lines (60 loc) · 2.17 KB
/
ubx_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import argparse
import json
from serial import Serial
from ubxtranslator.ubxtranslator.core import Parser
from ubxtranslator.ubxtranslator.predefined import NAV_CLS
from pandas import Timestamp, Timedelta # Use pandas time objects for nanosecond precision
import os
import threading
import time
parser = argparse.ArgumentParser(description="Log UBX-NAV-* messages from device to JSONL file")
parser.add_argument("device", help="Serial device")
parser.add_argument("-b", help="Baudrate", type=int, default=460800)
parser.add_argument("-v", help="Verbose, prints errors", action="store_true")
parser.add_argument("-o", help="Output file")
def inputThreadFn(aList):
input()
aList.append(True)
def parseUBX(payload):
raw = {}
for field in payload._fields:
value = getattr(payload, field)
if type(value) != int:
obj = {}
for field2 in value._fields:
obj[field2] = getattr(value, field2)
raw[field] = obj
else:
raw[field] = value
return raw
def run(args):
outputFile = "./output/ubx-" + Timestamp.now().strftime("%Y-%m-%d-%H-%M-%S") + ".jsonl"
if args.o: outputFile = args.o
os.makedirs("./output", exist_ok = True)
device = Serial(args.device, args.b, timeout=10)
parser = Parser([NAV_CLS])
print("Starting to listen for UBX packets")
print("Press ENTER to stop recording...")
aList = []
threading.Thread(target = inputThreadFn, args=(aList,)).start()
with open(outputFile, "w") as writer:
try:
while not aList:
try:
msg, msg_name, payload = parser.receive_from(device)
raw = parseUBX(payload)
entry = {
"type": msg_name,
"payload": raw,
"monoTime": time.monotonic()
}
writer.write(json.dumps(entry) + "\n")
except (ValueError, IOError) as err:
if args.v:
print(err)
finally:
device.close()
if __name__ == "__main__":
args = parser.parse_args()
run(args)
print("Done!")