-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsensus_environment.py
153 lines (113 loc) · 4.84 KB
/
consensus_environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#
# This is a definition of the orientation consensus environment simulation engine.
# It provides the initialization of a random environment and methods of communication between the different agents.
#
import random
import geometry
import math
import agent
import neat
class Environment:
"""
The instance holding every agents and methods of communication
"""
def __init__(self, length=100, height=100, N=10):
"""
Creates a new random environment of length and height given, with N agents placed with random locations and headings, and angular velocity of zero
"""
self.length=length
self.height=height
self.agent_list=[]
for i in range(N):
x = random.random() * length
y = random.random() * height
location = geometry.Point(x,y)
heading = random.random() * 360
robot = agent.Agent(location, i, heading, angular_vel=0)
self.agent_list.append(robot)
def communication(self):
"""
Gives each robot on the map a new message in one of their radar
"""
for robot in self.agent_list:
# The list of all robots in range
robots_in_range = []
for other_robot in self.agent_list:
if other_robot.agent_id != robot.agent_id and robot.location.distance(other_robot.location) <= 80:
robots_in_range.append(other_robot)
# Randomly selects one robot from which the message will be received
random_id = random.randint(0, len(robots_in_range)-1)
selected_robot = robots_in_range[random_id]
robot.update_radar(selected_robot)
def consensus_verified(self):
"""
Returns : True if all robots in the environment are heading the same way, with an error tolerated of 5°.
"""
min_heading = self.agent_list[0].heading
max_heading = self.agent_list[0].heading
for robot in self.agent_list:
if robot.heading < min_heading:
min_heading = robot.heading
elif robot.heading > max_heading:
max_heading = robot.heading
return max_heading-min_heading < 5 # doesn't consider the fact that it can be around 360°
def avg_heading(self):
"""
Returns the average angle of heading of every robots in the environment
"""
r0 = self.agent_list[0]
sum_point = geometry.Point(math.cos(geometry.deg_to_rad(r0.heading)), math.sin(geometry.deg_to_rad(r0.heading)))
for r in self.agent_list[1:]:
new_point = geometry.Point(math.cos(geometry.deg_to_rad(r.heading)), math.sin(geometry.deg_to_rad(r.heading)))
sum_point = geometry.sum_points(sum_point, new_point)
return sum_point.angle()
def fitness(self):
"""
Return the fitness score of the environment
"""
R = len(self.agent_list)
sum_r = 0
avg_heading = self.avg_heading()
for r in self.agent_list:
sum_r += r.individual_fitness(avg_heading)
return (1/R)*sum_r
def consensus_simulation_evaluate(env, net, time_steps=600, robot_orientation_list = None):
"""
The function to evaluate simulation for specific environment
and controll ANN provided. The results will be saved into provided
agent record holder.
Arguments:
env: The configuration environment.
net: The agent's control ANN.
time_steps: The number of time steps for maze simulation.
"""
for i in range(time_steps):
if consensus_simulation_step(env, net, robot_orientation_list):
print("Consensus reached in %d steps" % (i + 1))
return 1.0
# Calculate the fitness score based on distance from exit
fitness = env.fitness()
if fitness <= 0.01:
fitness = 0.01
return fitness
def consensus_simulation_step(env, net, robot_orientation_list):
"""
The function to perform one step of consensus orientation simulation.
Arguments:
env: The maze configuration environment.
net: The maze solver agent's control ANN
Returns:
The True if every robots are heading the same way, with a 5° error tolerated
"""
# Activate/update communication for this step
env.communication()
for i, robot in enumerate(env.agent_list):
# create inputs from the current state of the robot in environment
inputs = robot.create_net_inputs()
# load inputs into controll ANN and get results
output = net.activate(inputs)
# apply control signal to the environment and update
robot.apply_outputs(output)
if robot_orientation_list != None:
robot_orientation_list[i].append(robot.heading)
return env.consensus_verified()