-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcontrols.py
More file actions
157 lines (125 loc) · 4.84 KB
/
controls.py
File metadata and controls
157 lines (125 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import serial
import threading
class Control(object):
def __init__(self, control_id, update_callback):
self.control_id = control_id
self.update_callback = update_callback
class Encoder(Control):
def __init__(self, control_id, update_callback, value=0):
super(Encoder, self).__init__(control_id, update_callback)
self.value = 0
self.update(value)
def update(self, modifier):
self.value += modifier
self.update_callback(self)
class Switch(Control):
def __init__(self, control_id, update_callback, value=False):
super(Switch, self).__init__(control_id, update_callback)
self.update(value)
def update(self, state):
self.value = state
self.update_callback(self)
class Led(object):
def __init__(self, control_id, com, value=False):
self.com = com
self.control_id = control_id
self.update(value)
def update(self, value):
self.value = value
self.com.write('%s%d' % (str(self.control_id), int(self.value)))
class ControlPanel(object):
BAUD_RATE = 9600
def __init__(self, port):
self.com = serial.Serial(
port=port,
baudrate=ControlPanel.BAUD_RATE,
#parity=serial.PARITY_NONE,
#bytesize=serial.EIGHTBITS,
#stopbits=serial.STOPBITS_ONE,
timeout=0.2)
self.encoders = {}
self.switches = {}
self.leds = {}
def stop(self):
self.com.close()
def add_encoder(self, encoder_id, update_callback, value=0):
encoder = Encoder(encoder_id, update_callback, value)
if encoder.control_id in self.encoders:
raise RuntimeError(
'Encoder %s was already added to the control panel' %
str(encoder.control_id))
self.encoders[encoder.control_id] = encoder
def add_switch(self, switch_id, update_callback, value=False):
switch = Switch(switch_id, update_callback, value)
if switch.control_id in self.switches:
raise RuntimeError(
'Switch %s was already added to the control panel' %
str(switch.control_id))
self.switches[switch.control_id] = switch
def add_led(self, led_id, value=False):
led = Led(led_id, self.com, value)
if led.control_id in self.leds:
raise RuntimeError(
'LED %s was already added to the control panel' %
str(led.control_id))
self.leds[led.control_id] = led
def toggle_led(self, led_id):
self.update_led(led_id, not self.leds[led_id].value)
def update_led(self, led_id, value):
self.leds[led_id].update(value)
def update(self):
self.handle_message(self.com.read(2))
def is_encoder_message(self, message):
return message[0].isdigit() and len(message) == 2
def is_switch_message(self, message):
return message[0].isupper() and len(message) == 2
def handle_message(self, message):
if len(message) == 0:
pass # timed out
elif self.is_encoder_message(message):
self.handle_encoder(message)
elif self.is_switch_message(message):
self.handle_switch(message)
else:
vals = [str(ord(c)) for c in message]
hex_message = ' '.join(vals)
raise NotImplementedError('unhandled message %s' % hex_message)
def handle_encoder(self, message):
encoder_id = int(message[0])
if encoder_id not in self.encoders:
raise RuntimeError(
'message received for unhandled encoder %s' %
str(encoder_id))
encoder = self.encoders[encoder_id]
step = 1
if message[1] == 'L':
encoder.update(-step)
elif message[1] == 'R':
encoder.update(step)
else:
raise RuntimeError('malformed encoder message %s' % message)
def handle_switch(self, message):
switch_id = message[0]
if switch_id not in self.switches:
raise RuntimeError(
'message received for unhandled switch %s' %
str(switch_id))
switch = self.switches[switch_id]
if message[1] == '1':
switch.update(True)
elif message[1] == '0':
switch.update(False)
else:
raise RuntimeError('malformed switch message %s' % message)
class ControlPanelThread(threading.Thread):
def __init__(self, control_panel):
super(ControlPanelThread, self).__init__()
self.control_panel = control_panel
self.stopped = True
def run(self):
self.stopped = False
while not self.stopped:
self.control_panel.update()
def stop(self):
self.stopped = True
self.control_panel.stop()