-
Notifications
You must be signed in to change notification settings - Fork 1
/
usb_trading.py
397 lines (347 loc) · 12.4 KB
/
usb_trading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
import usb.core
import usb.util
import signal
import sys
import traceback
import time
import os
import multiboot
from utilities.gsc_trading import GSCTrading
from utilities.gsc_trading_jp import GSCTradingJP
from utilities.rby_trading import RBYTrading
from utilities.rby_trading_jp import RBYTradingJP
from utilities.rse_sp_trading import RSESPTrading
from utilities.websocket_client import PoolTradeRunner, ProxyConnectionRunner
from utilities.gsc_trading_menu import GSCTradingMenu
from utilities.gsc_trading_strings import GSCTradingStrings
dev = None
reattach = False
serial_port = None
epIn = None
epOut = None
p = None
max_usb_timeout_w = 5
max_usb_timeout_r = 0.1
max_packet_size = 0x40
VID = 0xcafe
PID = 0x4011
path = "pokemon_gen3_to_genx_mb.gba"
def kill_function():
os.kill(os.getpid(), signal.SIGINT)
def transfer_func(sender, receiver, list_sender, raw_receiver, is_serial):
menu = GSCTradingMenu(kill_function)
menu.handle_menu()
if menu.verbose:
print(GSCTradingStrings.waiting_transfer_start_str)
if menu.trade_type == GSCTradingStrings.two_player_trade_str:
connection = ProxyConnectionRunner(menu, kill_function)
elif menu.trade_type == GSCTradingStrings.pool_trade_str:
connection = PoolTradeRunner(menu, kill_function)
if menu.multiboot:
menu.gen = 3
if menu.gen == 3:
config_base = multiboot.get_configure_list(1000, 4)
else:
config_base = multiboot.get_configure_list(1000, 1)
result = 1
while result != 0:
result = multiboot.read_all(raw_receiver)
list_sender(config_base, chunk_size=len(config_base))
ret = multiboot.read_all(raw_receiver)
if is_serial and (ret != 1):
print("WARNING: Firmware not recognized!\nWhen using Serial, you MUST use a firmware which doesn't alter the output!\nIt's best if you update to the one available at:\nhttps://github.com/Lorenzooone/gb-link-firmware-reconfigurable/releases")
if(menu.gen == 3) and (ret != 1):
print("Non-reconfigurable firmware found!\nIt's best if you update to the one available at:\nhttps://github.com/Lorenzooone/gb-link-firmware-reconfigurable/releases")
pre_sleep = False
if(ret == 1) and (menu.gen == 3):
pre_sleep = True
if menu.multiboot:
multiboot.multiboot(raw_receiver, sender, list_sender, path)
return
if menu.gen == 2:
if menu.japanese:
trade_c = GSCTradingJP(sender, receiver, connection, menu, kill_function, pre_sleep)
else:
trade_c = GSCTrading(sender, receiver, connection, menu, kill_function, pre_sleep)
elif menu.gen == 3:
trade_c = RSESPTrading(sender, receiver, connection, menu, kill_function, pre_sleep)
elif menu.gen == 1:
if menu.japanese:
trade_c = RBYTradingJP(sender, receiver, connection, menu, kill_function, pre_sleep)
else:
trade_c = RBYTrading(sender, receiver, connection, menu, kill_function, pre_sleep)
connection.start()
if menu.trade_type == GSCTradingStrings.two_player_trade_str:
trade_c.player_trade(menu.buffered)
elif menu.trade_type == GSCTradingStrings.pool_trade_str:
trade_c.pool_trade()
# Code dependant on this connection method
def sendByte(byte_to_send, num_bytes):
epOut.write(byte_to_send.to_bytes(num_bytes, byteorder='big'), timeout=int(max_usb_timeout_w * 1000))
return
# Code dependant on this connection method
def sendList(data, chunk_size=8):
num_iters = int(len(data)/chunk_size)
for i in range(num_iters):
epOut.write(data[i*chunk_size:(i+1)*chunk_size], timeout=int(max_usb_timeout_w * 1000))
#print(num_iters*chunk_size)
#print(len(data))
if (num_iters*chunk_size) != len(data):
epOut.write(data[num_iters*chunk_size:], timeout=int(max_usb_timeout_w * 1000))
def receiveByte(num_bytes=None):
if num_bytes is None:
num_bytes = max_packet_size
recv = int.from_bytes(epIn.read(num_bytes, timeout=int(max_usb_timeout_r * 1000)), byteorder='big')
return recv
def receiveByte_raw(num_bytes=None):
if num_bytes is None:
num_bytes = max_packet_size
return epIn.read(num_bytes, timeout=int(max_usb_timeout_r * 1000))
# Code dependant on this connection method
def sendByte_serial(byte_to_send, num_bytes):
serial_port.write(byte_to_send.to_bytes(num_bytes, byteorder='big'))
return
# Code dependant on this connection method
def sendList_serial(data, chunk_size=8):
num_iters = int(len(data)/chunk_size)
for i in range(num_iters):
serial_port.write(bytes(data[i*chunk_size:(i+1)*chunk_size]))
#print(num_iters*chunk_size)
#print(len(data))
if (num_iters*chunk_size) != len(data):
serial_port.write(bytes(data[num_iters*chunk_size:]))
def receiveByte_serial(num_bytes=None):
if num_bytes is None:
num_bytes = max_packet_size
recv = int.from_bytes(serial_port.read(num_bytes), byteorder='big')
return recv
def receiveByte_raw_serial(num_bytes=None):
if num_bytes is None:
num_bytes = max_packet_size
return serial_port.read(num_bytes)
# Code dependant on this connection method
def sendByte_win(byte_to_send, num_bytes):
p.write(byte_to_send.to_bytes(num_bytes, byteorder='big'))
# Code dependant on this connection method
def sendList_win(data, chunk_size=8):
#Why? Idk. But it fixes it... :/
if(chunk_size > 0x3C):
chunk_size = 0x3C
num_iters = int(len(data)/chunk_size)
for i in range(num_iters):
p.write(bytes(data[i*chunk_size:(i+1)*chunk_size]))
#print(num_iters*chunk_size)
#print(len(data))
if (num_iters*chunk_size) != len(data):
p.write(bytes(data[num_iters*chunk_size:]))
# Code dependant on this connection method
# The original was so slow, I had to rewrite it a bit to make it work for time sensitive applications
def read_win(self, size=None):
if not self.is_open:
return None
rx = [self._rxremaining]
length = len(self._rxremaining)
self._rxremaining = b''
end_timeout = time.time() + (self.timeout or 0.2)
if size:
super(ComPort, self).set_timeout(self._ep_in, (self.timeout or 0.2) * 10)
while length < size:
c = super(ComPort, self).read(self._ep_in, size-length)
if c is not None and len(c):
rx.append(c)
length += len(c)
if len(c) == self.maximum_packet_size:
end_timeout += (self.timeout or 0.2)
if time.time() > end_timeout:
break
else:
super(ComPort, self).set_timeout(self._ep_in, (self.timeout or 0.2))
while True:
c = super(ComPort, self).read(self._ep_in, self.maximum_packet_size)
if c is not None and len(c):
rx.append(c)
length += len(c)
if len(c) == self.maximum_packet_size:
end_timeout += (self.timeout or 0.2)
else:
break
else:
break
if time.time() > end_timeout:
break
chunk = b''.join(rx)
if size and len(chunk) >= size:
if self._rxremaining:
self._rxremaining = chunk[size:] + self._rxremaining
else:
self._rxremaining = chunk[size:]
chunk = chunk[0:size]
return chunk
def receiveByte_win(num_bytes=None):
recv = int.from_bytes(read_win(p, size=num_bytes), byteorder='big')
return recv
def receiveByte_raw_win(num_bytes=None):
return read_win(p, size=num_bytes)
# Things for the USB connection part
def exit_gracefully():
if dev is not None:
usb.util.dispose_resources(dev)
if(os.name != "nt"):
if reattach:
dev.attach_kernel_driver(0)
if serial_port is not None:
serial_port.reset_input_buffer()
serial_port.reset_output_buffer()
serial_port.close()
os._exit(1)
def exit_no_device():
print("Device not found!")
exit_gracefully()
def signal_handler(sig, frame):
print("You pressed Ctrl+C!")
exit_gracefully()
def libusb_method():
global dev, epIn, epOut, reattach
try:
devices = list(usb.core.find(find_all=True,idVendor=VID, idProduct=PID))
for d in devices:
#print('Device: %s' % d.product)
dev = d
if dev is None:
return False
reattach = False
if(os.name != "nt"):
if dev.is_kernel_driver_active(0):
try:
reattach = True
dev.detach_kernel_driver(0)
except usb.core.USBError as e:
sys.exit("Could not detach kernel driver: %s" % str(e))
else:
pass
#print("no kernel driver attached")
dev.reset()
dev.set_configuration()
cfg = dev.get_active_configuration()
intf = cfg[(2,0)] # Or find interface with class 0xff
epIn = usb.util.find_descriptor(
intf,
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_IN)
assert epIn is not None
epOut = usb.util.find_descriptor(
intf,
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_OUT)
assert epOut is not None
dev.ctrl_transfer(bmRequestType = 1, bRequest = 0x22, wIndex = 2, wValue = 0x01)
except:
return False
return True
def winusbcdc_method():
global p
if(os.name == "nt"):
try:
print("Trying WinUSB CDC")
p = ComPort(vid=VID, pid=PID)
if not p.is_open:
return False
#p.baudrate = 115200
p.settimeout(max_usb_timeout_r)
except:
return False
else:
return False
return True
def serial_method():
global serial_port
try:
ports = list(serial.tools.list_ports.comports())
serial_success = False
port = None
for device in ports:
if(device.vid is not None) and (device.pid is not None):
if(device.vid == VID) and (device.pid == PID):
port = device.device
break
if port is None:
return False
serial_port = serial.Serial(port=port, bytesize=8, timeout=max_usb_timeout_r, write_timeout = max_usb_timeout_w)
except Exception as e:
return False
return True
signal.signal(signal.SIGINT, signal_handler)
try_serial = False
try_libusb = False
try_winusbcdc = False
is_serial = False
try:
import usb.core
import usb.util
try_libusb = True
except:
pass
if(os.name == "nt"):
try:
from winusbcdc import ComPort
try_winusbcdc = True
except:
pass
try:
import serial
import serial.tools.list_ports
try_serial = True
except:
pass
sender = None
receiver = None
list_sender = None
raw_receiver = None
found = False
# The execution path
try:
if (not found) and try_libusb:
if(libusb_method()):
sender = sendByte
receiver = receiveByte
list_sender = sendList
raw_receiver = receiveByte_raw
found = True
if (not found) and try_winusbcdc:
if(winusbcdc_method()):
sender = sendByte_win
receiver = receiveByte_win
list_sender = sendList_win
raw_receiver = receiveByte_raw_win
found = True
if (not found) and try_serial:
if(serial_method()):
sender = sendByte_serial
receiver = receiveByte_serial
list_sender = sendList_serial
raw_receiver = receiveByte_raw_serial
found = True
is_serial = True
if found:
print("USB connection established!")
transfer_func(sender, receiver, list_sender, raw_receiver, is_serial)
else:
print("Couldn't find USB device!")
missing = ""
if not try_serial:
missing += "PySerial, "
if not try_libusb:
missing += "PyUSB, "
if(os.name == "nt") and (not try_winusbcdc):
missing += "WinUsbCDC, "
if missing != "":
print("If the device is attached, try installing " + missing[:-2])
exit_gracefully()
except:
traceback.print_exc()
print("Unexpected exception: ", sys.exc_info()[0])
exit_gracefully()