-
Notifications
You must be signed in to change notification settings - Fork 0
/
usb2mq.py
144 lines (128 loc) · 4.18 KB
/
usb2mq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#/usr/bin/python2
"""
udev service for USB
transfer USB data to zeromq pull server via tcp
required pull socket: tcp://localhost:6372
Author: Jirawat I. <nodtem66@gmail.com>
Python version: 2.7
"""
import argparse
import os
import signal
import struct
import sys
import time
import usb1 as _usb1
import zmq as _zmq
import hardware
from logger import getLogger
parser = argparse.ArgumentParser()
parser.add_argument('bus')
args = parser.parse_args()
_args = args.bus.split(':')
LOG = getLogger('usb2mq')
if len(_args) < 2:
LOG.error('invalid argument %s', args.bus)
sys.exit(1)
bus, address = _args[0:2]
# write pid into file
#pidfile = open('/opt/ca-hub-rpi/pid/{}-{}'.format(bus, address), 'w')
#pidfile.write(str(os.getpid()))
#pidfile.close()
# init zmq broker
zmq = _zmq.Context()
sender = zmq.socket(_zmq.PUSH)
sender.linger = 250
sender.connect('tcp://127.0.0.1:6372')
LOG.info('connect zmq pull server')
def send(*arr):
if not sender:
return
try:
data = bytearray()
for x in arr:
data += bytearray(x)
sender.send(struct.pack('>' + str(len(data)) + 'B', *data), flags=_zmq.NOBLOCK)
except _zmq.ZMQError:
pass
# register signal handler
running = False
def shutdown(signum, frame):
global running
LOG.info('Shutting down...')
if running and not sender.closed:
running = False
send([2, int(bus), int(address), productId])
running = False
if handle is not None:
handle.releaseInterface(0)
handle.close()
if type(device).__name__ == 'USBDevice':
device.close()
sender.close()
zmq.term()
os._exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# get device from bus and address
usb1 = _usb1.USBContext()
device = None
handle = None
productId = hardware.INVALID_PRODUCT_ID
productName = ''
maxPacketSize = 64
try:
for _device in usb1.getDeviceIterator(skip_on_error=True):
if (_device.getBusNumber() == int(bus) and _device.getDeviceAddress() == int(address)):
LOG.info('Initialize device bus:%s address:%s', bus, address)
productName = _device.getProduct()
maxPacketSize = _device.getMaxPacketSize(hardware.ENDPOINT_ADDRESS)
LOG.info('%s (%s)', productName, _device.getManufacturer())
LOG.info('packet size: %d', maxPacketSize)
productId = hardware.getIdFromProductName(productName)
device = _device
break
except (RuntimeError, IOError, _usb1.USBError) as e:
LOG.error("Unexpected error 1: %s", str(e))
send([3, int(bus), int(address), productId], str(e))
shutdown(0, 0)
if device is None:
LOG.error('Device can not be initialized!')
shutdown(0, 0)
if productId == hardware.INVALID_PRODUCT_ID:
LOG.error('Unsupport USB device')
shutdown(0, 0)
# transfer callback function
def mainloop():
global handle
global running
# init device
try:
handle = device.open()
handle.claimInterface(0)
send([1, int(bus), int(address), productId])
running = True
#scheduler = sched.scheduler(time.time, time.sleep)
while running:
try:
data = handle.interruptRead(hardware.ENDPOINT_ADDRESS, maxPacketSize)
isValid = False
if productId == hardware.SPO2_PRODUCT_ID:
assert len(data) == 6
isValid = True
time.sleep(1.0/hardware.SPO2_SAMPLING_RATE_HZ/1.5)
elif productId == hardware.ECG_PRODUCT_ID:
assert len(data) == 27
isValid = True
time.sleep(1.0/hardware.ECG_SAMPLING_RATE_HZ/1.5)
if isValid and running:
send([0, int(bus), int(address), productId], data)
except _usb1.USBErrorInterrupted as e:
LOG.error("USB Error: %s", str(e))
send([3, int(bus), int(address), productId], str(e))
shutdown(0, 0)
except (RuntimeError, IOError, _usb1.USBError) as e:
LOG.error("Unexpected error 3: %s", str(e))
send([3, int(bus), int(address), productId], str(e))
if __name__ == '__main__':
mainloop()