-
Notifications
You must be signed in to change notification settings - Fork 2
/
read.py
executable file
·245 lines (201 loc) · 8.52 KB
/
read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Matthias Kolja Miehl
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
DESCRIPTION: Python script for reading a line via UART and
appending it to a CSV file along with a timestamp
UPDATES : https://github.com/makomi/uart2csv
"""
# -----------------------------------------------------------------------------
# include libraries and set defaults
# -----------------------------------------------------------------------------
import os
import operator
import serial
import serial.tools.list_ports
from datetime import datetime
folder_output = "csv"
#file_cfg = "settings.cfg"
# -----------------------------------------------------------------------------
# settings (change this as required)
# -----------------------------------------------------------------------------
serial_baud_rate = 115200
serial_timeout_read = 0.05 # number of seconds after which we consider the serial read operation to have failed
serial_timeout_msg = "--READ-TIMEOUT--"
serial_too_short_msg = "--ADDR-TOO-SHORT: "
serial_cmd = "\r" # characters sent to request the device ID
length_device_id = 16
# -----------------------------------------------------------------------------
# global variables
# -----------------------------------------------------------------------------
global selected_port # serial port that will be used
global operator_initials # used to identify the operator in the CSV file log
global uart # serial port object
global file_csv # file object for the CSV file
global serial_read_ok # 'True' if we read what we expected
# -----------------------------------------------------------------------------
# helper functions
# -----------------------------------------------------------------------------
def mkdir(folder_name):
"""create a new folder"""
if not os.path.isdir(folder_name):
try:
os.makedirs(folder_name)
except OSError:
if not os.path.isdir(folder_name):
raise
def get_available_serial_ports():
available_ports_all = list(serial.tools.list_ports.comports()) # get all available serial ports
available_ports = [port for port in available_ports_all if port[2] != 'n/a'] # remove all unfit serial ports
available_ports.sort(key=operator.itemgetter(1)) # sort the list based on the port
return available_ports
def select_a_serial_port(available_ports): # TODO: check file_cfg for preselected serial port
global selected_port
if len(available_ports) == 0: # list is empty -> exit
print("[!] No suitable serial port found.")
exit(-1)
elif len(available_ports) == 1: # only one port available
(selected_port,_,_) = available_ports[0]
print("[+] Using only available serial port: %s" % selected_port)
else: # let user choose a port
successful_selection = False
while not successful_selection:
print("[+] Select one of the available serial ports:")
# port selection
item=1
for port,desc,_ in available_ports:
print (" (%d) %s \"%s\"" % (item,port,desc))
item=item+1
selected_item = int(raw_input(">>> ")) # TODO: handle character input
# check if a valid item was selected
if (selected_item > 0) and (selected_item <= len(available_ports)):
(selected_port,_,_) = available_ports[selected_item-1]
successful_selection = True
else:
print("[!] Invalid serial port.\n")
def open_selected_serial_port():
global uart
try:
uart = serial.Serial(
selected_port,
serial_baud_rate,
timeout = serial_timeout_read,
bytesize = serial.EIGHTBITS,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE,
)
print("[+] Successfully connected.")
except serial.SerialException:
print("[!] Unable to open %s." % selected_port)
exit(-1)
def set_operator_initials():
global operator_initials
# get operator's initials
print("\n[+] Operator's initials:")
operator_initials = raw_input(">>> ")
# make it obvious that the operator did not provide initials
if len(operator_initials) == 0:
operator_initials = "n/a"
def create_csv_file():
global file_csv # file object for CSV file
mkdir(folder_output) # create the output folder for the CSV files if it does not already exist
file_csv = open('%s/%s.csv' % (folder_output,datetime.now().strftime("%Y-%m-%d %H-%M-%S")), 'w+', -1) # FIXME: make sure the file is continuously flushed
def print_usage_guide():
print("\nPress ENTER to read a line from the serial port.")
print("Press 'q' and ENTER to exit.")
def check_for_exit_condition():
"""exit program after releasing all resources"""
global uart
global file_csv
if user_input == "q":
successful_exit = False
# close serial port
try:
uart.close()
print("[+] Closed %s." % selected_port)
successful_exit = True
except serial.SerialException:
print("[!] Unable to close %s." % selected_port)
# close file
try:
file_csv.close()
print("[+] Closed CSV file.")
successful_exit = True
except:
print("[!] Unable to close CSV file.")
# exit
if successful_exit:
exit(0)
else:
exit(-1)
def get_device_id():
global uart
global device_id
global serial_read_ok
# request the device's ID and read the response
uart.write(serial_cmd)
line = uart.readline().decode('ascii')
# extract the device_id (expected: "<16 character device ID>\n")
device_id = line[0:length_device_id]
# make typical whitespace characters visible
if device_id == '\n':
device_id = "<LF>"
elif device_id == '\r':
device_id = "<CR>"
elif device_id == "\n\r":
device_id = "<LF><CR>"
elif device_id == "\r\n":
device_id = "<CR><LF>"
# display read timeout message to notify the operator
if len(device_id) == 0:
device_id = serial_timeout_msg
elif len(device_id) < length_device_id:
device_id = serial_too_short_msg + "'" + device_id + "'"
else:
serial_read_ok = True
def handle_device_id_duplicates():
pass # TODO: check if the device_id is a duplicate
def output_data():
global file_csv
# create a timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# display the result
print("%s %s" % (timestamp, device_id))
# append the result to the CSV
if serial_read_ok:
file_csv.write("%s,%s,%s\n" % (timestamp, device_id, operator_initials))
# TODO: print the device_id on paper
# Zebra S4M, v53.17.11Z
# -----------------------------------------------------------------------------
# main program
# -----------------------------------------------------------------------------
if __name__ == '__main__':
select_a_serial_port(get_available_serial_ports())
open_selected_serial_port()
set_operator_initials()
create_csv_file()
print_usage_guide()
while True:
serial_read_ok = False
# wait for enter
user_input = raw_input("")
# avoid empty line between results # FIXME: this only works on Linux terminals
#CURSOR_UP_ONE = '\x1b[1A'
#ERASE_LINE = '\x1b[2K'
#print(CURSOR_UP_ONE + ERASE_LINE + CURSOR_UP_ONE)
check_for_exit_condition()
get_device_id()
handle_device_id_duplicates()
output_data()