-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathdw-server.py
executable file
·236 lines (214 loc) · 9.3 KB
/
dw-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python3
#
# Discover dw-link and then redirect data from a TCP/IP connection to the serial port and vice versa.
# Based on Chris Liechti's tcp_serial_redirct script
#
# Version 1.3.1 (03-Jan-2025)
# - Changed: greeting line
#
# Version 1.3.0 (27-Dec-2024)
# - Removed: the cortex-debug hack
# - Added: Partial argument parsing and a special case for the -s option: 'noop',
# which can be used to override earlier -s options in the argument line
# - With these two changes and the four additional lines below in the platform file,
# we can now handle the call of the openocd server without any hacks
# debug.cortex-debug.custom.serverArgs.0=-s
# debug.cortex-debug.custom.serverArgs.1=noop
# debug.cortex-debug.custom.serverArgs.2=-p
# debug.cortex-debug.custom.serverArgs.3=50000
# - Added: error handling for the case that a port is already in use
# - Added: opening the serial line in 'exclusive mode' so that it cannot be "stolen"
#
# Version 1.2.4 (22-Dec-2024)
# - New option -V gives version number
# - Now uses the gdb_port value when cortex-debug calls the script
#
# Version 1.2.3 (21-Dec-2024)
# - Fatal error messages from dw-link will now be shown by dw-server (they have the "***" prefix in an O-record)
# - The serial port is now shown when dw-server discovers dw-link
#
# Version 1.2.2 (20-Dec-2024)
#
# - dw-server will now "emulate" an openocd server if "gdb_port 50000" is mentioned as one of the arguments
# - it will ignore the port "/dev/cu.Bluetooth-Incoming-Port" under macOS
# - it has a new option: -v or --verbose, where everything transmitted is shown
#
# Version 1.2.1 (17-Dec-2024)
# - under Linux, we have to send the ENQ twice before we get an answer, I have no idea why
#
# Version 1.2.0 (21-sep-2023)
# - does not call gede with the (now obsolete) --no-run option; thze -g option still works, though
#
# Version 1.1.0
# - special option for calling gede with the --no-run option
#
# Version 1.0.0
# - first working version
#
VERSION = "1.3.1"
import sys
import socket
import serial
import serial.threaded
import time
import serial.tools.list_ports
import shutil, shlex, subprocess
import argparse
class SerialToNet(serial.threaded.Protocol):
"""serial->socket"""
def __init__(self):
self.socket = None
self.last = b""
def __call__(self):
return self
def data_received(self, data):
if self.socket is not None:
self.socket.sendall(data)
self.last += data
if self.last:
if self.last[-1] == ord('+') or (len(self.last) > 2 and self.last[-3] == ord('#')):
if len(self.last) > 2 and self.last[1] == ord('O') and self.last[2] != ord('K'):
message = self.convert_gdb_message()
else:
message = ""
if args.verbose:
sys.stdout.write("repl: {}\n".format(self.last))
if message:
sys.stdout.write("dw-link message: {}\n".format(message))
sys.stdout.flush()
elif len(message) > 2 and message[:3] == '***':
sys.stderr.write("dw-link message: {}\n".format(message))
sys.stderr.flush()
self.last = b""
def convert_gdb_message(self):
bs = self.last[2:self.last.find(b'#')]
hv = bs.decode('utf-8')
bv = bytes.fromhex(hv)
return bv.decode('utf-8')
# discovers the dw-link adapter, if present
def discover():
for delay in (0.2, 2):
for s in serial.tools.list_ports.comports(True):
if args.verbose:
sys.stdout.write("Device: {}\n".format(s.device))
sys.stdout.flush()
if s.device == "/dev/cu.Bluetooth-Incoming-Port":
continue
if args.verbose:
sys.stdout.write("Check:{}\n".format(s.device))
sys.stdout.flush()
try:
for sp in (115200, ):
with serial.Serial(s.device, sp, timeout=0.1, write_timeout=0.1, exclusive=True) as ser:
time.sleep(delay)
ser.write(b'\x05') # send ENQ
resp = ser.read(7) # under Linux, the first response might be empty
if resp != b'dw-link':
time.sleep(0.2)
ser.write(b'\x05') # try again sending ENQ
resp = ser.read(7) # now it should be the right response!
if resp == b'dw-link': # if we get this response, it must be an dw-link adapter
return (sp, s.device)
except:
pass
return (None, None)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='TCP/IP server for dw-link adapter (Version ' + VERSION + ')')
parser.add_argument('-p', '--port', type=int, default=2000, dest='port',
help='local port on machine (default 2000)')
parser.add_argument('-s', '--start', dest='prg',
help='start specified program')
parser.add_argument('-g', '--gede', action="store_true",
help='start gede')
parser.add_argument('-v', '--verbose', action="store_true",
help="log all communcation")
parser.add_argument('-V', '--version', action="store_true",
help="print version number and exit immediately")
args, unknown = parser.parse_known_args()
if args.version:
sys.stdout.write("{}\n".format(VERSION))
sys.stdout.flush()
sys.exit(0)
# discover adapter
speed, device = discover()
if speed == None or device == None:
sys.stderr.write('--- No dw-link adapter discovered ---\n')
sys.exit(1)
# connect to serial port
ser = serial.serial_for_url(device, do_not_open=True)
ser.baudrate = speed
ser.bytesize = 8
ser.parity = 'N'
ser.stopbits = 1
ser.rtscts = False
ser.xonxoff = False
ser.exclusive = True
try:
ser.open()
except serial.SerialException as e:
sys.stderr.write('Could not open serial port {}: {}\n'.format(device, e))
sys.exit(2)
ser_to_net = SerialToNet()
serial_worker = serial.threaded.ReaderThread(ser, ser_to_net)
serial_worker.start()
if args.gede:
args.prg = "gede"
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
srv.bind(('', args.port))
srv.listen(1)
except OSError as error:
sys.stderr.write("OSError: " + error.strerror +"\n\r");
sys.exit(3)
if args.prg and args.prg != "noop":
cmd = shlex.split(args.prg)
cmd[0] = shutil.which(cmd[0])
subprc = subprocess.Popen(cmd)
try:
while True:
sys.stdout.write("\n\rdw-server ({}) connected to {}.\r\n".format(VERSION,device))
sys.stdout.write("Info : Listening on port {} for gdb connection\n\r".format(args.port))
sys.stdout.flush()
client_socket, addr = srv.accept()
sys.stderr.write('Connected by {}\n'.format(addr))
# More quickly detect bad clients who quit without closing the
# connection: After 1 second of idle, start sending TCP keep-alive
# packets every 1 second. If 3 consecutive keep-alive packets
# fail, assume the client is gone and close the connection.
try:
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
except AttributeError:
pass # XXX not available on windows
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
ser_to_net.socket = client_socket
# enter network <-> serial loop
while True:
try:
data = client_socket.recv(1024)
if not data:
break
ser.write(data) # get a bunch of bytes and send them
if args.verbose:
sys.stdout.write("send: {}\n".format(data))
sys.stdout.flush()
except socket.error as msg:
sys.stderr.write('ERROR: {}\n'.format(msg))
# probably got disconnected
break
except socket.error as msg:
sys.stderr.write('ERROR: {}\n'.format(msg))
finally:
ser_to_net.socket = None
sys.stderr.write('Disconnected\n')
ser.write(b'$D#44') # send detach command to dw-link debugger
client_socket.close()
break
except KeyboardInterrupt:
pass
sys.stderr.write('\r\n--- exit ---\r\n')
serial_worker.stop()