-
Notifications
You must be signed in to change notification settings - Fork 2
/
capture.py
251 lines (207 loc) · 9.06 KB
/
capture.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
'''
This file has code from the NatNet SDK and Pupil-Labs GitHub Repo.
NatNet Version 2.10.0 (06/15/2016)
File created: (03/17/2017)
'''
import argparse
from NatNetClient import NatNetClient
from time import sleep, time
import logging
import msgpack as serializer
import zmq
from zmq.utils.monitor import recv_monitor_message
import json
import sys
assert zmq.__version__ > '15.1'
class Msg_Receiver(object):
'''
Recv messages on a sub port.
Not threadsafe. Make a new one for each thread
__init__ will block until connection is established.
'''
def __init__(self, ctx, url, topics=(), block_until_connected=True):
self.socket = zmq.Socket(ctx, zmq.SUB)
assert type(topics) != str
if block_until_connected:
# connect node and block until a connecetion has been made
monitor = self.socket.get_monitor_socket()
self.socket.connect(url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
else:
raise Exception("ZMQ connection failed")
self.socket.disable_monitor()
else:
self.socket.connect(url)
for t in topics:
self.subscribe(t)
def subscribe(self, topic):
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
def unsubscribe(self, topic):
self.socket.unsubscribe(topic)
def recv(self):
'''Recv a message with topic, payload.
Topic is a utf-8 encoded string. Returned as unicode object.
Payload is a msgpack serialized dict. Returned as a python dict.
Any addional message frames will be added as a list
in the payload dict with key: '__raw_data__' .
'''
topic = self.socket.recv_string()
payload = serializer.loads(self.socket.recv(), encoding='utf-8')
extra_frames = []
while self.socket.get(zmq.RCVMORE):
extra_frames.append(self.socket.recv())
if extra_frames:
payload['__raw_data__'] = extra_frames
return topic, payload
@property
def new_data(self):
return self.socket.get(zmq.EVENTS)
def __del__(self):
self.socket.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='python capture.py',
description='''
This program captures data streamed from
pupil labs and OptiTrack. The data is then
saved as a json file which can be changed
by passing a path to the output argument.''',
epilog='''
''')
parser.add_argument("--output",
default="output.json",
help="path to output file. (default: output.json)")
parser.add_argument("--optitrack-ip",
default="127.0.0.1",
help="ip address for OptiTrack. (default: 127.0.0.1)")
parser.add_argument("--optitrack-command-port",
default=1510,
type=int,
help="command port for OptiTrack. (default: 1510)")
parser.add_argument("--optitrack-data-port",
default=1511,
type=int,
help="data port for OptiTrack. (default: 1511)")
parser.add_argument("--optitrack-multicast-address",
default="239.255.42.99",
help="multicast address for OptiTrack. (default: 239.255.42.99)")
parser.add_argument("--pupil-labs-ip",
default="127.0.0.1",
help="ip address for Pupil Labs. (default: 127.0.0.1)")
parser.add_argument("--pupil-labs-port",
default=50020,
type=int,
help="port for Pupil Labs. (default: 50020)")
parser.add_argument('--pupil-labs-off',
action='store_true',
help="don't record any data from pupil labs.")
parser.add_argument('--pupil0-off',
action='store_true',
help="don't record any pupil.0 data from pupil labs.")
parser.add_argument('--pupil1-off',
action='store_true',
help="don't record any pupil.1 data from pupil labs.")
parser.add_argument('--optitrack-off',
action='store_true',
help="don't record any data from OptiTrack.")
parser.add_argument("--max-frames-per-second",
default=70,
type=int,
help="sets the max number of frames captured per second. (default: 70)")
args = parser.parse_args()
output_header = {}
print( 'Starting program' )
print( 'Pupil Labs:', not args.pupil_labs_off )
if not args.pupil_labs_off:
print( 'Pupil.0:', not args.pupil0_off )
print( 'Pupil.1:', not args.pupil1_off )
print( 'tcp://%s:%d' % (args.pupil_labs_ip, args.pupil_labs_port) )
# tap into the IPC backbone of pupil capture
ctx = zmq.Context()
# the requester talks to Pupil remote and
# recevied the session unique IPC SUB URL
requester = ctx.socket(zmq.REQ)
requester.connect( 'tcp://%s:%d' % (args.pupil_labs_ip, args.pupil_labs_port) )
requester.send_string('SUB_PORT')
ipc_sub_port = requester.recv().decode("utf-8")
print( 'ipc_sub_port:', ipc_sub_port )
print( 'tcp://%s:%s' % (args.pupil_labs_ip, ipc_sub_port) )
# Subscribe to pupils
if not args.pupil0_off:
pupil0 = Msg_Receiver(
ctx, 'tcp://%s:%s' % (args.pupil_labs_ip, ipc_sub_port),
topics=('pupil.0',))
if not args.pupil1_off:
pupil1 = Msg_Receiver(
ctx, 'tcp://%s:%s' % (args.pupil_labs_ip, ipc_sub_port),
topics=('pupil.1',))
sleep(1)
print( 'OptiTrack:', not args.optitrack_off )
if not args.optitrack_off:
print( 'ip:', args.optitrack_ip )
print( 'command port:', args.optitrack_command_port )
print( 'data port:', args.optitrack_data_port )
print( 'multicast address:', args.optitrack_multicast_address )
# This will create a new NatNet client
streamingClient = NatNetClient(args.optitrack_ip,
args.optitrack_multicast_address,
args.optitrack_command_port,
args.optitrack_data_port)
# Start up the streaming client now that the callbacks are set up.
# This will run perpetually, and operate on a separate thread.
streamingClient.run()
print( streamingClient.get_version() )
sleep(2)
output_header['rigidBodyInfo'] = streamingClient.getRigidBodyDescription()
input( 'Press Enter to continue and start recording...' )
print( "Recording Started" )
print( 'Press Ctrl-C to stop recording' )
start_time = time()
with open(args.output, 'w') as f:
f.write('{\"static\": \n')
f.write(json.dumps(output_header))
f.write(',\n\"frames\": [\n')
try:
first_frame = True
frame = 1
st = time()
while True:
sft = time()
if frame % 100 == 0:
et = time()
sys.stdout.write("\rframe: %d at %f fps" % (frame, 100.0/(et-st)))
sys.stdout.flush()
st = et
obj = {}
obj['frame'] = frame
obj['time'] = time() - start_time
if not args.optitrack_off:
streamingClient.lock()
if not args.pupil_labs_off:
if not args.pupil0_off:
pupil0_topic, pupil0_msg = pupil0.recv()
obj['pupil0'] = pupil0_msg
if not args.pupil1_off:
pupil1_topic, pupil1_msg = pupil1.recv()
obj['pupil1'] = pupil1_msg
if not args.optitrack_off:
obj['rigidBodies'] = streamingClient.getRigidBodyList()
obj['markers'] = streamingClient.getMarkerList()
if not args.optitrack_off:
streamingClient.unlock()
if not first_frame:
f.write(",\n")
else:
first_frame = False
f.write(json.dumps(obj))
frame = frame + 1
sleep(max(0, (1.0/args.max_frames_per_second) - (time() - sft)))
except KeyboardInterrupt:
f.write(']}\n')
f.close()
print( "Done" )