forked from augcog/ROAR
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner_ios.py
151 lines (132 loc) · 5.61 KB
/
runner_ios.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import time
from pathlib import Path
from ROAR_iOS.ios_runner import iOSRunner
from ROAR.configurations.configuration import Configuration as AgentConfig
from ROAR_iOS.config_model import iOSConfig
from ROAR_Unity.unity_runner import iOSUnityRunner
from ROAR.agent_module.special_agents.pointcloud_recording_agent import PointcloudRecordingAgent
from ROAR.agent_module.cs249_agent import CS249Agent
from ROAR.utilities_module.vehicle_models import Vehicle
import logging
import argparse
from misc.utils import str2bool
from ROAR.utilities_module.utilities import get_ip
import qrcode
import cv2
import numpy as np
import socket
import json
import requests
class mode_list(list):
# list subclass that uses lower() when testing for 'in'
def __contains__(self, other):
return super(mode_list, self).__contains__(other.lower())
def showIPUntilAckUDP():
port = 8890
img = np.array(qrcode.make(f"{get_ip(),port}").convert('RGB'))
success = False
addr = None
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0.1)
try:
s.bind((get_ip(), port))
while success is False:
try:
cv2.imshow("Scan this code to connect to phone", img)
k = cv2.waitKey(1) & 0xff
seg, addr = s.recvfrom(1024) # this command might timeout
if k == ord('q') or k == 27:
s.close()
break
addr = addr
success = True
for i in range(10):
s.sendto(b"hi", addr)
except socket.timeout as e:
logging.info(f"Please tap on the ip address to scan QR code. ({get_ip()}:{8008}). {e}")
except Exception as e:
logging.error(f"Unable to bind socket: {e}")
finally:
s.close()
cv2.destroyWindow("Scan this code to connect to phone")
return success, addr[0]
def showIPUntilAck():
port = 40001
img = np.array(qrcode.make(f"{get_ip()},{port}").convert('RGB'))
success = False
addr = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind((get_ip(), port))
s.settimeout(1)
while True:
try:
s.listen()
cv2.imshow("Scan this code to connect to phone", img)
k = cv2.waitKey(1) & 0xff
if k == ord('q') or k == 27:
s.close()
break
conn, addr = s.accept()
addr = addr[0]
if conn:
s.close()
success = True
break
except socket.timeout as e:
logging.info(f"Please tap on the ip address to scan QR code. ({get_ip()}:{8008}). {e}")
except Exception as e:
logging.error(f"Unable to bind socket: {e}")
finally:
s.close()
cv2.destroyWindow("Scan this code to connect to phone")
return success, addr
def is_glove_online(host, port):
r = requests.get(url=f"http://{host}:{port}", timeout=1)
if r.status_code == 200:
return True
return False
if __name__ == '__main__':
choices = mode_list(['ar', 'vr'])
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt="%H:%M:%S", level=logging.DEBUG)
logging.getLogger("matplotlib").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
parser = argparse.ArgumentParser()
parser.add_argument("-auto", action='store_true', help="Enable auto control")
parser.add_argument("-m", "--mode", choices=choices, help="AR or VR [WARNING not implemented yet!]", default="vr")
parser.add_argument("-r", "--reconnect", action='store_true', help="Scan QR code to attach phone to PC")
parser.add_argument("-u", "--use_unity", action='store_true',
help="Use unity as rendering and control service")
parser.add_argument("-g", "--use_glove", help="use glove based controller by supplying its ip address!")
args = parser.parse_args()
try:
agent_config_file_path = Path("ROAR/configurations/iOS/iOS_agent_configuration.json")
ios_config_file_path = Path("ROAR_iOS/configurations/ios_config.json")
agent_config = AgentConfig.parse_file(agent_config_file_path)
ios_config: iOSConfig = iOSConfig.parse_file(ios_config_file_path)
ios_config.ar_mode = True if args.mode == "ar" else False
if args.use_glove:
try:
is_glove_online(args.use_glove, port=81)
ios_config.glove_ip_addr = args.use_glove
ios_config.should_use_glove = True
except requests.exceptions.ConnectTimeout as e:
print(f"ERROR. Cannot find Glove at that ip address {args.use_glove}. Shutting down...")
exit(0)
else:
ios_config.should_use_glove = False
success = False
if args.reconnect:
success, addr = showIPUntilAck()
if success:
ios_config.ios_ip_addr = addr
json.dump(ios_config.dict(), ios_config_file_path.open('w'), indent=4)
time.sleep(2)
if success or args.reconnect is False:
agent = PointcloudRecordingAgent(vehicle=Vehicle(), agent_settings=agent_config, should_init_default_cam=True)
runner = iOSUnityRunner(agent=agent, ios_config=ios_config, is_unity=args.use_unity)
runner.start_game_loop(auto_pilot=args.auto)
except Exception as e:
print(f"Something bad happened: {e}")