-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathserial_monitor.py
executable file
·286 lines (237 loc) · 6.69 KB
/
serial_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/usr/bin/env python2
import os
import fnmatch
import glob
import signal
import errno
import serial
class AnsiColors(object):
Codes = {
'normal': '0',
'black': '0;30', 'dark_gray': '1;30',
'red': '0;31', 'bright_red': '1;31',
'green': '0;32', 'bright_green': '1;32',
'yellow': '0;33', 'bright_yellow': '1;33',
'blue': '0;34', 'bright_blue': '1;34',
'purple': '0;35', 'bright_purple': '1;35',
'cyan': '0;36', 'bright_cyan': '1;36',
'bright_gray': '0;37', 'white': '1;37',
}
@classmethod
def switch(cls, color):
return "\033[" + cls.Codes[color] + "m"
@classmethod
def embed(cls, color, text):
return cls.switch(color) + text + cls.switch("normal")
for color in AnsiColors.Codes.iterkeys():
@classmethod
def colorizer(cls, text, color = color):
return cls.embed(color, text)
setattr(AnsiColors, color, colorizer)
class Lock(object):
"""A file in the filesystem the locks a corresponding resource.
The lock is taken in the file exists. The PID of the process
holding the lock is written into the file."""
def __init__(self, path):
self.path = path
def taken(self):
"""Return True iff lock is already taken."""
return os.path.exists(self.path)
def getpid(self):
"""Return the PID currently holding this lock.
Returns None if this lock is currently not taken"""
if not self.taken():
return None
return int(file(self.path).read().strip())
def take(self):
assert not self.taken()
f = file(self.path, "w")
print >>f, os.getpid()
f.close()
os.chmod(self.path, 0444)
def release(self):
assert self.getpid() == os.getpid() # I own this lock
os.remove(self.path)
def kill_holder(self):
"""Attempt to kill the holder of this lock, and free it."""
pid = self.getpid()
if pid is None:
return True # Already released
assert pid != os.getpid(), "We already hold this lock!"
try:
os.kill(pid, signal.SIGTERM)
except OSError, e:
if e.errno != errno.ESRCH: # No such process
return False
# Remove the lock file manually
try:
os.remove(self.path)
except OSError, e:
if e.errno != errno.ENOENT: # No such file
return False
return True
class SerialPort(object):
DevPathPatterns = [
"/dev/ttyACM*",
"/dev/ttyAMA*",
"/dev/ttyUSB*",
"/dev/ttyS*",
]
LockPathPrefix = "/run/lock/LCK.."
@classmethod
def enumerate(cls):
for pattern in cls.DevPathPatterns:
for path in glob.iglob(pattern):
yield cls(path)
def __init__(self, path):
self.path = path
self.name = os.path.basename(self.path)
self.lock = Lock(self.LockPathPrefix + self.name)
self.f = None
def is_open(self):
return self.f is not None
def __repr__(self):
state = self.is_open() and "open" or (
self.lock.taken() and "locked" or "available")
return "<SerialPort %s at %s (%s)>" % (
self.name, self.path, state)
def __cmp__(self, other):
'''Order serial ports based on their corresponding patterns
in DevPathPatterns, otherwise alphabetically.'''
for p in self.DevPathPatterns:
self_match = fnmatch.fnmatch(self.path, p)
other_match = fnmatch.fnmatch(other.path, p)
if self_match and other_match:
return cmp(self.name, other.name)
elif self_match:
return -1
elif other_match:
return 1
return cmp(self.name, other.name)
def open(self, baudrate):
assert not self.is_open()
self.lock.take()
self.f = serial.Serial(self.path, baudrate)
def close(self):
assert self.is_open()
self.f.close()
self.lock.release()
self.f = None
def choose_serialport(default = None):
default_is_first_available = default is None
print "Detecting serial ports..."
ports = sorted(SerialPort.enumerate())
print
default_port = None
for i, p in enumerate(ports):
print AnsiColors.white("%3u)" % i), "% 8s" % p.name,
if p.lock.taken():
print AnsiColors.red("(LOCKED)"),
else:
print AnsiColors.green("(available)"),
if (default_is_first_available and not p.lock.taken()) \
or (p.name == default):
print AnsiColors.bright_cyan("(default)"),
default_port = p
default_is_first_available = False
print
print
print "Which serial port to monitor?",
if default_port:
print AnsiColors.bright_cyan("[%s]" % default_port.name),
choice = raw_input(AnsiColors.white("(0-%u): " % (len(ports) - 1)))
if not choice and default_port:
return default_port
try:
return ports[int(choice)]
except:
pass
print
print AnsiColors.bright_red("*** Invalid choice '%s'!" % choice)
print
return choose_serialport(default)
def choose_kill_locker(serial_port):
pid = serial_port.lock.getpid()
print
print AnsiColors.bright_red(
"*** Serial port %s is currently locked by PID %s." % (
serial_port.name, pid))
print
print "Attempt to kill PID %s and claim %s for yourself?" % (
pid, serial_port.name),
choice = raw_input(AnsiColors.white("(y/N): "))
if not choice or choice.lower().startswith("n"):
return False
elif choice.lower().startswith("y"):
if serial_port.lock.kill_holder():
return True
print
print AnsiColors.bright_red("*** Failed to free up lock!")
print
else:
print
print AnsiColors.bright_red("*** Invalid choice '%s'!" % choice)
print
return choose_kill_locker(serial_port)
def choose_baudrate(default = None):
print "Available baud rates:"
baudrates = sorted(filter(lambda b: 9600 <= b <= 115200,
serial.baudrate_constants))
if default is None:
default = baudrates[-1]
print
for i, rate in enumerate(baudrates):
print AnsiColors.white("%3u)" % i), "% 8u" % rate,
if rate == default:
print AnsiColors.bright_cyan("(default)"),
print
print
print "Which baud rate to use?",
if default is not None:
print AnsiColors.bright_cyan("[%u]" % default),
choice = raw_input(AnsiColors.white("(0-%u): " % (
len(baudrates) - 1)))
if not choice and default is not None:
return default
try:
return baudrates[int(choice)]
except:
try:
if int(choice) in baudrates:
return int(choice)
except:
pass
print
print AnsiColors.bright_red("*** Invalid choice '%s'!" % choice)
print
return choose_baudrate(default)
def main(args):
serial_port = None
baudrate = None
# Select serial port
while serial_port is None:
serial_port = choose_serialport()
if serial_port.lock.taken():
if not choose_kill_locker(serial_port):
serial_port = None
# Select baud rate
baudrate = choose_baudrate()
print
print AnsiColors.bright_green("--- Using %s @ %ubps" % (
serial_port.name, baudrate))
print
serial_port.open(baudrate)
try:
while True:
line = serial_port.f.readline().rstrip()
print line
finally:
serial_port.close()
return 0
if __name__ == '__main__':
import sys
try:
sys.exit(main(sys.argv[1:]))
except KeyboardInterrupt:
print "Aborted!"
sys.exit(0)