-
Notifications
You must be signed in to change notification settings - Fork 0
/
dce_control.py
368 lines (277 loc) · 13 KB
/
dce_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
#!/usr/bin/env python
"""
A module written to generate commands to control the Blue Canyon Technologies D.C.E. and execute the commands on the device.
Atributes:
universal - an array to store commonly used values such as DCE memory addresses
gSerialPort - an object which stores an opened connection to the serial port
"""
import serial
from Logs.log import log
from Logs.errors import ERROR
from time import sleep
from decimal import *
__author__="Simon Kowerski"
__credits__=["Andrew Yu", "Simon Kowerski"]
__creation_date__="2023"
gSerialPort=None
universal={
# | Address | Length |
"WRITE_HEADER": [0xEB],
"READ_HEADER": [0xED],
# Address where information is stored
"WRITE_ADDR": [0X00,0x00],
# | Address | Length |
"SPEED_ADDR": [0x04,0x69, 0x00, 0x10],
"TORQUE_ADDR": [0x04,0x89, 0x00, 0x10],
# Conversion factors for WRITE information
"SPEED_WRITE_CONV": [0.2],
"TORQUE_WRITE_CONV": [1e-8],
# Conversion factors for READ information
"SPEED_READ_CONV": [0.002],
"TORQUE_READ_CONV": [1e-8]
}
def startup():
"""
Opens the serial connection and reads from the HR_RUN_COUNT register to ensure that the DCE is running
Causes the thread to sleep for .6 seconds
"""
global gSerialPort
if(gSerialPort):
return
# opens serial Connection
try:
gSerialPort=serial.Serial("/dev/ttyS0", 115200, timeout=1)
except Exception:
raise ERROR(1310)
if not gSerialPort:
raise ERROR(1310)
log(310)
sleep(.6)
# attempts to read from HR_RUN_COUNT
_send_command("0xec 0x04 0xfe 0x00 0x04")
try:
val = gSerialPort.read(4)
except Exception:
raise ERROR(1311, "failed to read from DCE")
if(len(val) != 4):
raise ERROR(1311, "DCE failed to supply correct values from HR_RUN_COUNT")
log(311)
def convert_to_hex(wheel_rate,length, conversion_factor=1.0):
"""
Converts a wheel_rate into its corresponding hex value
Args:
wheel_rate (float): speed OR torque of the wheel
length (int): the number of hex digits in the output (leading zeros will be added accordingly)
conversion_factor (float): EU conversion factor as specified on the DCE command table
Ret:
str: the resulting hexadecimal expression (exculding the leading 0x)
"""
nbits=4*length
wheel_rate=int(wheel_rate/conversion_factor)
parameter_hex = hex((wheel_rate + (1 << nbits)) % (1 << nbits)) #ensures proper twos compliment is used <-- do we want this...?
parameter_hex = parameter_hex[2:]
for i in range(length - len(parameter_hex)):
parameter_hex='0'+parameter_hex
return parameter_hex
def convert_from_hex(parameter_hex:str, conversion_factor=1.0):
"""
Converts a hex value read from the DCE to the corresponding integer value
Args:
parameter_hex (str): a hex value formatted like 0xc0ffee OR c0ffee which obeys two's complement
conversion_factor (float): EU conversion factor as specified on the DCE command table
Ret:
float: the integer representation of the hex expression
"""
ret=int(parameter_hex, 16)
twos=int(parameter_hex[0], 16)
# obey twos compliment
if len(format(twos,"b"))%4==0:
print("here")
ret=ret-(1 << len(format(ret,"b")))
return ret * conversion_factor
#FIXME: When writing to one wheel, read from DCE to fill in missing values
def set_wheel_torque(wheel_num:int, wheel_rate:float):
"""
Generates and sends a code to the DCE to set the torque on one or all of the reaction wheels.
Accepts a wheel number between 0 and 4 (inclusive), where 0 selects all 4 wheels
Accepts a torque between -0.0055 and +0.0055 (Nm) (exclusive)
Args:
wheel_num (int): The wheel to be selected
wheel_rate (float): The torque to be applied to the wheel
Returns:
str: The command formatted and ready to send to the DCE - formatted 0xAB 0xCD 0xEF .....
"""
log(300)
command = universal["WRITE_HEADER"].copy() # add the write header
command.extend(universal["WRITE_ADDR"]) # add the write address
command.extend([0x00, 0x13]) # add the length of the command - 19 bytes
command.extend([0x07,0x0D, wheel_num]) # call the 'SetWheelTorque32' command
# Ensures requested rate is within safe operating bounds
if wheel_rate>0.0055 or wheel_rate<-0.0055:
raise ERROR(1300, f"requested wheel torque is out of operational limits - {wheel_rate}")
# Ensures wheel selected properly
if(wheel_num > 4 or wheel_num < 0):
raise ERROR(1300, f"wheel selected improperly - {wheel_num}")
# | Wheel 1 | Wheel 2 | Wheel 3 | Wheel 4 |
wheelSet = [0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00]
# preforms the EU conversion and and formats it into hex
parameter_hex = convert_to_hex(wheel_rate,8,universal["TORQUE_WRITE_CONV"][0])
# sets the speed for the correct wheel
if(wheel_num == 0): # if wheel_num is zero it sets for all 4
for i in range(1,5):
# breaks the hex values into groups of two
wheelSet[0+(4 * (i - 1))] = int(parameter_hex[:2], 16)
wheelSet[1+(4 * (i - 1))] = int(parameter_hex[2:4], 16)
wheelSet[2+(4 * (i - 1))] = int(parameter_hex[4:6], 16)
wheelSet[3+(4 * (i - 1))] = int(parameter_hex[6:], 16)
else: # if a specific wheel num is selected, update that specific wheel
# breaks the hex values into groups of two
wheelSet[0+(4 * (wheel_num - 1))] = int("0x"+parameter_hex[:2], 16)
wheelSet[1+(4 * (wheel_num - 1))] = int("0x"+parameter_hex[2:4], 16)
wheelSet[2+(4 * (wheel_num - 1))] = int("0x"+parameter_hex[4:6], 16)
wheelSet[3+(4 * (wheel_num - 1))] = int("0x"+parameter_hex[6:], 16)
command.extend(wheelSet)
ret = ""
for i in range(len(command)):
command[i] = '0x{:02X}'.format(command[i])
ret += command[i] + " "
ret=ret[:-1]
_send_command(ret)
return ret
#FIXME: When writing to one wheel, read from DCE to fill in missing values
def set_wheel_speed(wheel_num:int, wheel_rate:float):
"""
Generates and sends a code to the DCE to set the speed of one or all of the reaction wheels.
Accepts a wheel number between 0 and 4 (inclusive), where 0 selects all 4 wheels
Accepts a speed in RPM between -6553.4 and +6553.4 (RPM) (exclusive)
Args:
wheel_num (int): The wheel to be selected
wheel_rate (float): The speed to set each wheel to
Returns:
str: The command formatted and ready to send to the DCE - formatted 0xAB 0xCD 0xEF .....
"""
log(301)
command = universal["WRITE_HEADER"].copy() # add the write header
command.extend(universal["WRITE_ADDR"]) # add the write address
command.extend([0x00, 0x0B]) # add the length of the command - 11 byets?
command.extend([0x07,0x0A, wheel_num]) # call the 'SetWheelSpeed4' command
# Ensures requested rate is within safe operating bounds
if wheel_rate>6553.4 or wheel_rate<-6553.4:
raise ERROR(1301, f"requested wheel rate is out of operational limits - {wheel_rate}")
# Ensures wheel is requested properly
if(wheel_num > 4 or wheel_num < 0):
raise ERROR(1301, f"wheel selected incorrectly - {wheel_num}")
# | Wheel 1 | Wheel 2 | Wheel 3 | Wheel 4 |
wheelSet = [0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00]
# preforms the EU conversion and and formats the rate into hex
parameter_hex = convert_to_hex(wheel_rate, 4, universal["SPEED_WRITE_CONV"][0])
# sets the speed for the correct wheel
if(wheel_num == 0): # if wheel_num is zero it sets for all 4
for i in range(1,5):
# breaks the hex values into groups of two
wheelSet[0 + (2 * (i - 1))] = int("0x"+parameter_hex[:2], 16)
wheelSet[1 + (2 * (i - 1))] = int("0x"+parameter_hex[2:], 16)
else: # if a specific wheel num is selected, update that specific wheel
# breaks the hex values into groups of two
wheelSet[0 + (2 * (wheel_num - 1))] = int("0x"+parameter_hex[:2], 16)
wheelSet[1 + (2 * (wheel_num - 1))] = int("0x"+parameter_hex[2:], 16)
command.extend(wheelSet)
ret = ""
for i in range(len(command)):
command[i] = '0x{:02X}'.format(command[i])
ret += command[i] + " "
ret=ret[:-1]
_send_command(ret)
return ret
def read_data(read_type:str):
"""
Generates and sends a code to the DCE to read speed or torque data
Args:
read_type (str): Type of data to be read. MUST be either 'SPEED' or 'TORQUE'
Returns:
int[4]: the speed or torque values collected converted into integers\n
str: the exact data collected from the DCE - formatted 0xAB 0xCD 0xEF .....\n
str: The command formatted and ready to send to the DCE - formatted 0xAB 0xCD 0xEF .....\n
"""
log(302)
global gSerialPort
# ensure serial connection is open
if(not gSerialPort):
raise ERROR(1302, f"serial port not open")
command = universal["READ_HEADER"].copy() # add the read header
# Ensures user is requesting either speed or torque data from the DCE
read_type = read_type.upper()
if(read_type != "SPEED" and read_type != "TORQUE"):
raise ERROR(1302, f"data type {read_type} not recognized")
command.extend(universal[f"{read_type}_ADDR"]) # add the address AND length read
ret_command = ""
for i in range(len(command)):
command[i] = '0x{:02X}'.format(command[i])
ret_command += command[i] + " "
ret_command=ret_command[:-1]
_send_command(ret_command)
actual=0
# attempt to read from serial port
read_length=6+1+universal[f"{read_type}_ADDR"][3]
try:
actual=gSerialPort.read(read_length) # how many bytes to be read from the serial port (6 hex values for the header, DATA, 1 hex value for CRC8)
except Exception:
raise ERROR(1304, f"failed to read from serial port - {ret_command}")
# verify the correct output is recieved
if(len(actual) != read_length):
raise ERROR(1305, f"did not recieve the correct number of bytes - Requested: {read_length} byes - Recieved: {len(actual)} btyes - {ret_command}")
actual=bytes.hex(actual)
if(not verify_output(actual)):
raise ERROR(1305)
# perform conversions on actual
len_per = universal[f"{read_type}_ADDR"][3] / 2
#wheel_arr = [actual[12:20], actual[20:28], actual[28:36], actual[36:44]]
wheel_arr = [actual[12:12+len_per], actual[12+len_per:12+len_per*2], actual[12+len_per*2:12+len_per*3],actual[12+len_per*3:12+len_per*4]]
for i in range(len(wheel_arr)):
wheel_arr[i] = convert_from_hex(wheel_arr[i], universal[f"{read_type}_READ_CONV"][0])
ret_actual=""
for i in range(0, len(actual)-1, 2):
ret_actual+=f"0x{actual[i].capitalize()}{actual[i+1].capitalize()} "
ret_actual=ret_actual[:-1]
log(304, f"- Output: {ret_actual}")
return wheel_arr, ret_actual, ret_command
def verify_output(input_data:str):
"""
Determines whether or not the optupt string generated by the DCE was correct
Args:
input_data (str): the data read from the DCE, not seperated by spaces, excluding the leading '0x'
Returns:
bool: True if the output matches, false if not
"""
POLYNOMIAL=(0x1070<<3)
crc=0xFF
for j in range(0, int((len(input_data)-2)/2)):
data=crc^int(input_data[(2*j):(2*(j+1))], 16)
data<<=8
for i in range(8):
if((data & 0x8000)!=0):
data=data^POLYNOMIAL
data<<=1
crc=data>>8
return hex(crc)[2:].upper() == input_data[-2:].upper()
def _send_command(hex_code:str):
"""
Sends a command to the DCE to be executed
Args:
hex_code (str): A string containing the entire command to be sent to the DCE formatted 0xAB 0xCD 0xEF ....
"""
global gSerialPort
# ensure serial connection is open
if(not gSerialPort):
raise ERROR(1303, f"serial port not open")
# make packet to send hex values to serial port
packet = bytearray()
arr = hex_code.split(" ")
for item in arr:
packet.append(int(item, 16))
# write packet to the serial port
try:
gSerialPort.write(packet)
except Exception:
raise ERROR(1303, f"failed to write to serial port at /dev/ttyS0 - {hex_code}")
log(303, f"- {hex_code}")