-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocess_data.py
executable file
·73 lines (59 loc) · 2.08 KB
/
process_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python3
import argparse
import os
import sys
import time
import serial
cli = argparse.ArgumentParser()
cli.add_argument('--serial-port')
cli.add_argument('--baud-rate', type=int, default=115200)
cli.add_argument('--out-file')
args = cli.parse_args()
if args.serial_port is None:
# auto-determine from lsusb
USB_DEVICES = '/sys/bus/usb/devices'
for device in os.listdir(USB_DEVICES):
try:
with open(os.path.join(USB_DEVICES, device, 'interface')) as interface_file:
if interface_file.read().strip().lower() == 'black magic uart port':
break
except FileNotFoundError:
continue
else:
print('BMP not found in USB devices')
os.execvp('lsusb', ['lsusb'])
ttys = os.listdir(os.path.join(USB_DEVICES, device, 'tty'))
if len(ttys) != 1:
print(f'Unexpected interfaces in USB device {device!r}: {ttys}')
os.execvp('lsusb', ['lsusb'])
args.serial_port = os.path.join('/dev', ttys[0])
if args.out_file is not None:
outfile = open(args.out_file, 'w')
else:
outfile = None
def convert_ticks_to_volts(ticks):
return ticks/1000 # 2**12 * 3.3
def convert_to_amps(volts):
# we measure half because of averaging of isensa and isensb thus *2
return volts*10/20*2
print(f'{args.serial_port} @ {args.baud_rate}')
port = serial.Serial(args.serial_port,args.baud_rate)
rxbin = []
while True:
next_byte = port.read(1)
if (next_byte[0] & 0b11000000 == 0b11000000):
rxbin.append(next_byte[0] & 0b00111111)
if len(rxbin) == 4:
rxdata0 = rxbin[0] | (rxbin[1] << 6)
rxdata1 = rxbin[2] | (rxbin[3] << 6)
if outfile is not None:
data0 = convert_to_amps(convert_ticks_to_volts(rxdata0))
data1 = convert_to_amps(convert_ticks_to_volts(rxdata1))
outfile.write(f'{time.monotonic()},{data0},{data1}\n')
outfile.flush()
rxbin.clear()
else:
rxbin.clear()
sys.stdout.buffer.write(next_byte)
sys.stdout.flush()
port.close()