-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomms.py
82 lines (70 loc) · 2.6 KB
/
comms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import threading, queue, time, struct
from serial import *
class Comms:
def __init__(self, outputQueue=None, controls=None):
port_no = int(input("port no: "))
#self.arduinoSerial = Serial(port=f"/dev/ttyUSB{port_no}", baudrate=9600)
self.arduinoSerial = Serial(port=f"/dev/cu.usbserial-1420", baudrate=115200)
# "/dev/ttyS0"
#"/dev/cu.usbmodem14201"
#eh make these static
self.HEADER = bytes.fromhex("AB")
self.FOOTER = bytes.fromhex("B3")
self.INTFOOTER = 179
self.outputQueue = outputQueue
self.controls = controls
def readOneByte(self):
test = self.arduinoSerial.read()
print(test)
def read(self):
currByte = self.arduinoSerial.read()
footerFound = False
headerFound = False
#print("currByte")
#print(currByte)
while (not headerFound):
if (currByte == self.HEADER):
headerFound = True
returnValue = self.arduinoSerial.read_until(expected=self.FOOTER, size=7)
if (returnValue[-1] == self.INTFOOTER):
footerFound=True
break
currByte = self.arduinoSerial.read()
#print(returnValue)
if (len(returnValue) != 7):
return -1
structValue = struct.unpack("=ccfc", returnValue)
if (headerFound):
if (footerFound):
return structValue
else:
return -1
def write(self, command, params):
#print("writing")
#print(command)
#print(params)
self.arduinoSerial.write(self.HEADER)
self.arduinoSerial.write(command.to_bytes(1, byteorder="big"))
for i in range(len(params)):
self.arduinoSerial.write(params[i].to_bytes(1, byteorder="big"))
self.arduinoSerial.write(self.FOOTER)
def setBaudRate(self, baudrate):
self.arduinoSerial.baudrate = baudrate
def setPort(self, port):
self.arduinoSerial.port = port
def getInfoDict(self):
pass
def commThread(self):
while True:
if (self.arduinoSerial.in_waiting >= 7):
#print("reading sensor input")
self.controls.handleInput(self.read())
if (not self.outputQueue.empty()):
currOutputValue = self.outputQueue.get()
self.write(currOutputValue[0], currOutputValue[1])
def startThread(self):
#print("B")
currThread = threading.Thread(target=self.commThread)
currThread.start()
print("B")
#return currThread