-
Notifications
You must be signed in to change notification settings - Fork 7
/
main_real_life.py
114 lines (91 loc) · 3.61 KB
/
main_real_life.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from api import Api, Api_recv, InfoApi
import comm
import match
import argparse
import fields as pitch
from commons.utils import get_config
from pyVSSSReferee.RefereeComm import RefereeComm
from pySSLVision.VisionComm import SSLVision, assign_empty_values
import os
import threading
import time, collections
parser = argparse.ArgumentParser(description='NeonFC')
parser.add_argument('--config_file', default='config_real_life.json')
parser.add_argument('--env', default='real_life')
args = parser.parse_args()
class Game():
def __init__(self, config_file=None, env='real_life'):
self.config = get_config(config_file)
self.match = match.MatchRealLife(self,
**self.config.get('match')
)
self.vision = SSLVision()
self.comm = comm.RLComm()
self.field = pitch.Field(self.match.category)
self.environment = env
# self.t1 = time.time()
# self.list = collections.deque(maxlen=25)
self.use_api = self.config.get("api")
self.api_address = self.config.get("network").get("api_address")
self.api_port = self.config.get("network").get("api_port")
self.api_recv_port = self.config.get("network").get("api_recv_port")
self.referee = RefereeComm(config_file)
if self.use_api:
self.api = Api(self.api_address, self.api_port)
self.api_recv = Api_recv(self.match, self.api_address, self.api_recv_port)
if os.environ.get('USE_REFEREE'):
self.use_referee = bool(int(os.environ.get('USE_REFEREE')))
else:
self.use_referee = self.config.get('referee')
self.start()
def start(self):
self.vision.assign_vision(self.update)
if self.use_referee:
self.referee.start()
self.match.start()
self.vision.start()
self.comm.start()
if self.use_api:
#self.match.game_status = 'GAME_ON'
self.info_api = InfoApi(self.match, self.match.robots, self.match.opposites, self.match.coach, self.match.ball, self.match.parameters)
self.api.start()
self.api_recv.connect_info(self.info_api)
self.api_recv.start() #Problema 1
self.sender_thread = threading.Thread(target = self.api.send_data, args=[self.info_api])
self.sender_thread.start()
def update(self):
frame = assign_empty_values(
self.vision.frame,
field_size=self.field.get_dimensions(),
team_side=self.match.team_side,
last_frame=self.vision.last_frame
)
self.vision.last_frame = frame
self.match.update(frame)
if self.use_referee:
self.match.check_foul(self.referee)
commands = self.match.decide()
if (self.use_api or self.use_referee) and (self.match.game_status == 'STOP' or self.match.game_status is None):
commands = [
{
'robot_id': r['robot_id'],
'color': r['color'],
'wheel_left': 0,
'wheel_right': 0
} for r in commands
]
self.comm.send(commands)
# delta_t = float(time.time() - self.t1)
# self.list.append(delta_t)
# self.t1 = time.time()
# print(len(self.list)/sum(self.list), 'hz')
# print('a')
def stop(self):
for t in threading.enumerate():
t.kill_recieved = True
g = Game(config_file=args.config_file, env=args.env)
try:
while True:
time.sleep(0.01)
except KeyboardInterrupt:
g.stop()