-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathSolvers.py
313 lines (235 loc) · 11.3 KB
/
Solvers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
from map_data import *
import ujson
from shutil import get_terminal_size
from random import choice, random, randrange
DISCOUNT = 0.3
terminal_width, _ = get_terminal_size()
_visualizers = {}
def _default_visualizer(_, state):
'''Generic visualizer for unknown problems.'''
print(state)
class Visualizer:
'''Visualization and printing functionality encapsulation.'''
def __init__(self, problem):
'''Constructor with the problem to visualize.'''
self.problem = problem
self.counter = 0
def visualize(self, frontier):
'''Visualizes the frontier at every step.'''
self.counter += 1
print(f'Frontier at step {self.counter}')
for state in frontier:
print()
_visualizers.get(type(self.problem), _default_visualizer)(self.problem, state)
print('-' * terminal_width)
def _robot_visualizer(env, state):
'''Custom visualizer for Tom and Jerry.'''
robot = env.state[:2]
crates = [ building_to_position[bl] for bl in state[3 :] if bl > 0]
for j in range(env.bounds[1] - 1, -1, -1):
for i in range(env.bounds[0]):
if (i , j) == robot :
print('🤖' , end='')
elif (i ,j) in crates :
print('📦' , end = '')
else :
print('⬜',end = '')
print()
class Environment:
'''
Abstract base class for an (interactive) environment formulation.
It declares the expected methods to be used to solve it.
All the methods declared are just placeholders that throw errors if not overriden by child "concrete" classes!
'''
def __init__(self):
'''Constructor that initializes the problem. Typically used to setup the initial state.'''
self.state = None
def actions(self):
'''Returns an iterable with the applicable actions to the current environment state.'''
raise NotImplementedError
def apply(self, action):
'''Applies the action to the current state of the environment and returns the new state from applying the given action to the current environment state; not necessarily deterministic.'''
raise NotImplementedError
@classmethod
def new_random_instance(cls):
'''Factory method to a problem instance with a random initial state.'''
raise NotImplementedError
def action_from_q(env, q, verbose=True):
'''Get the best action for the current state of the environment from Q-values'''
return max((action for action in env.actions()), key=lambda action: q.get((env.state, action), 0))
def q_learning(env, q ={}, n ={}, f=lambda q, n: (q+1)/(n+1), alpha=lambda n: 60/(n+59), error=1e-6, verbose=False , states_target = None , table_target = None):
'''Q-learning implementation that trains on an environment till no more actions can be taken'''
if verbose: visualizer = Visualizer(env)
if states_target : states = []
while env.state is not None:
if verbose: visualizer.visualize([env.state])
state = env.state
if states_target : states.append(state)
action = max(env.actions(),
key=lambda next_action: f(q.get((state, next_action), 0), n.get((state, next_action), 0)))
n[(state, action)] = n.get((state, action), 0) + 1
reward = env.apply(action)
q[(state, action)] = q.get((state, action), 0) \
+ alpha(n[state, action]) \
* (reward
+ env.discount * max((q.get((env.state, next_action), 0) for next_action in env.actions()), default=0)
- q.get((state, action), 0))
if table_target : table_target.give_me_my_table(q)
if states_target :
states_target.give_me_my_states(states , env)
return q, n
from math import inf
from time import time
from itertools import count
def simulate(env_ctor, n_iterations=inf, duration=inf, **q_learning_params):
'''A helper function to train for a fixed number of iterations or fixed time'''
for param in ('q', 'n'): q_learning_params[param] = q_learning_params.get(param, {})
start_time = time()
i = count()
s = time()
idx = 0
while time() < start_time + duration and next(i) < n_iterations:
env = env_ctor()
q, n = q_learning(env, **q_learning_params)
print(f'Iteration {idx +1}'); idx+=1;
print("duration: ", time() - s)
return q_learning_params['q'], q_learning_params['n']
class DeliveryRobot(Environment):
def __init__(self, start_pos: tuple, crates_pickup_locations: tuple, crates_dropoff_locations: tuple, max_reward, discount):
""" The constructor for the delivery bot env
Building integer encoding :
OSS --> 1
AB --> 2
NB --> 3
HB --> 4
NONE --> 0
Args:
start_pos (tuple): (x ,y) the start location of the robt
crates_pickup_locations (tuple): a tuple containing crates pick up locations ex ( 3,3,3,3 , 4,4,4,4 )
crates_dropoff_locations (tuple): a tuple containing crates drop off locations ex (1,2,2,1,1,1,2,1 )
max_reward (float): _description_
discount (float): _description_
"""
x, y, pickup_locations, dropoff_locations = start_pos[0], start_pos[1], crates_pickup_locations, crates_dropoff_locations;
currently_holding = (0,)
self.state = (x, y) + currently_holding + pickup_locations
#print( f'Current state : {self.state}')
self.dropoff_locations = dropoff_locations
self.bounds = (14, 14)
self.max_reward = max_reward
self.discount = discount
self.start_pos = start_pos
def decode_state(self, state) :
""" Extracts the data from the state vector to a more user friendly format
Args:
state (tuple): The tuple of the state
"""
x = state[0]
y = state[1]
currently_holding = state[2]
pickup_locations = state[3:]
return (x, y), currently_holding, pickup_locations
def actions(self):
if self.state is None: return []
pos, currently_holding, pickup_locations = self.decode_state(self.state)
current_building = position_to_building.get(pos, -1)
# print(f'Currently holding {currently_holding}')
# print(f'Current state {self.state}')
if not currently_holding and all(not pick_up for pick_up in pickup_locations) : return ['Finish']
if not currently_holding and current_building != -1 and current_building in pickup_locations:
return ['Pickup']
if currently_holding and self.dropoff_locations[currently_holding -1] == current_building:
return ['Dropoff']
return ['up', 'down', 'left', 'right']
def apply(self, action):
# print(f'Action is {action}')
# print(f'Current state is {self.state}')
up = lambda position: (position[0], min(position[1] + 1, self.bounds[1] - 1))
down = lambda position: (position[0], max(position[1] - 1, 0))
left = lambda position: (max(position[0] - 1, 0), position[1])
right = lambda position: (min(position[0] + 1, self.bounds[0] - 1), position[1])
pos ,currently_holding, pickup_locations = self.decode_state(self.state)
current_building = position_to_building.get(pos, -1)
if action == 'up':
self.state = (up(pos)+ (currently_holding,) + pickup_locations)
return -(0.4 if currently_holding else 0.6 ) * self.max_reward
if action == 'down':
self.state = (down(pos) + (currently_holding,) + pickup_locations)
return -(0.4 if currently_holding else 0.6 ) * self.max_reward
if action == 'left':
self.state = (left(pos) + (currently_holding,) + pickup_locations)
return -(0.4 if currently_holding else 0.6 ) * self.max_reward
if action == 'right':
self.state = (right(pos) + (currently_holding,)+ pickup_locations)
return -(0.4 if currently_holding else 0.6 ) * self.max_reward
elif action == 'Pickup':
crate_idx = pickup_locations.index(current_building)
temp = list(pickup_locations)
temp[crate_idx ] = 0 ; # Mark crate as taken
pickup_locations = tuple(temp)
currently_holding = crate_idx +1
self.state = pos + (currently_holding ,) + pickup_locations
return 0.4 * self.max_reward
elif action == 'Dropoff':
currently_holding = 0
#self.state = self.start_pos + (currently_holding,) + pickup_locations
self.state = pos + (currently_holding,) + pickup_locations
return 0.7 * self.max_reward
elif action == 'Finish': self.state = None; return +self.max_reward
def generate_env() :
'''
Building integer encoding :
OSS --> 1
AB --> 2
NB --> 3
HB --> 4
NONE --> 0
'''
pick_ups = (3, 4, 3, 4, 3, 4, 3, 4) # list of crates to be picked up at each building
drop_offs = (1, 2, 2, 1, 1, 1, 2, 1) # list of delivery location for each crate
start_pos = (7, 1)
return DeliveryRobot(crates_dropoff_locations = drop_offs, crates_pickup_locations = pick_ups,
start_pos = start_pos, discount = DISCOUNT, max_reward = 100)
_visualizers[DeliveryRobot] = _robot_visualizer
class ai_master:
"""A class for encapsulating the RL process
"""
iterations = 0
table = {}
q = {}
n = {}
def give_me_my_states(self, states , env) :
""" Stores a refrence to the environment and a list of states (final path) after training
Args:
states (_type_): States aquired from the q learning functions
env (_type_): refrence to the env
"""
self.states = states;
self.env = env
def give_me_my_table(self,table) :
self.iterations += 1
for key , val in table.items() :
self.table[key] = self.table.get(key , '') + f' ,i({self.iterations}) : {val :.2f} '
def train(self , alg = 'exploring', learning_rate = 'Normal' , discount = 0.3,iterations = 200 , save_table = False ) :
self.q = {}
self.n = {}
self.table = {}
self.iterations = 0
if alg == 'exploring' : f=lambda q, n: 1/(n+1)
if alg == 'random' : f=lambda q, n: random()
if alg == 'greedy' : f=lambda q, n: q
print(f'save table : {save_table}')
if learning_rate == 'Normal' : alpha=lambda n: 60/(n+59)
else : alpha=lambda n: float(learning_rate)
DISCOUNT = discount
simulate(generate_env, n_iterations = iterations, q = self.q, n = self.n, verbose = False, alpha = alpha ,
f = f , table_target = self if save_table else None)
if save_table: self.save_to_json()
def start(self) :
""" Simulate after training (updates the states list)
"""
simulate(generate_env, n_iterations = 1, q = self.q, n = self.n, verbose = False, f = lambda q, n: q, states_target = self)
#print(q)
def save_to_json(self) :
with open("qvals.json", "w") as outfile:
ujson.dump(self.table, outfile , indent = 4)