-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDeep_Q_Network.py
259 lines (209 loc) · 10.8 KB
/
Deep_Q_Network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import tensorflow as tf
import numpy as np
import random
import math
import os
# Parameters
epsilon = 1 # The probability of choosing a random action (in training). This decays as iterations increase. (0 to 1)
epsilonMinimumValue = 0.001 # The minimum value we want epsilon to reach in training. (0 to 1)
nbActions = 3 # The number of actions. Since we only have left/stay/right that means 3 actions.
epoch = 1001 # The number of games we want the system to run for.
hiddenSize = 100 # Number of neurons in the hidden layers.
maxMemory = 500 # How large should the memory be (where it stores its past experiences).
batchSize = 50 # The mini-batch size for training. Samples are randomly taken from memory till mini-batch size.
gridSize = 10 # The size of the grid that the agent is going to play the game on.
nbStates = gridSize * gridSize # We eventually flatten to a 1d tensor to feed the network.
discount = 0.9 # The discount is used to force the network to choose states that lead to the reward quicker (0 to 1)
learningRate = 0.2 # Learning Rate for Stochastic Gradient Descent (our optimizer).
# Create the base model.
X = tf.placeholder(tf.float32, [None, nbStates])
W1 = tf.Variable(tf.truncated_normal([nbStates, hiddenSize], stddev=1.0 / math.sqrt(float(nbStates))))
b1 = tf.Variable(tf.truncated_normal([hiddenSize], stddev=0.01))
input_layer = tf.nn.relu(tf.matmul(X, W1) + b1)
W2 = tf.Variable(tf.truncated_normal([hiddenSize, hiddenSize], stddev=1.0 / math.sqrt(float(hiddenSize))))
b2 = tf.Variable(tf.truncated_normal([hiddenSize], stddev=0.01))
hidden_layer = tf.nn.relu(tf.matmul(input_layer, W2) + b2)
W3 = tf.Variable(tf.truncated_normal([hiddenSize, nbActions], stddev=1.0 / math.sqrt(float(hiddenSize))))
b3 = tf.Variable(tf.truncated_normal([nbActions], stddev=0.01))
output_layer = tf.matmul(hidden_layer, W3) + b3
# True labels
Y = tf.placeholder(tf.float32, [None, nbActions])
# Mean squared error cost function
cost = tf.reduce_sum(tf.square(Y - output_layer)) / (2 * batchSize)
# Stochastic Gradient Decent Optimizer
optimizer = tf.train.GradientDescentOptimizer(learningRate).minimize(cost)
# Helper function: Chooses a random value between the two boundaries.
def randf(s, e):
return (float(random.randrange(0, (e - s) * 9999)) / 10000) + s;
# The environment: Handles interactions and contains the state of the environment
class CatchEnvironment():
def __init__(self, gridSize):
self.gridSize = gridSize
self.nbStates = self.gridSize * self.gridSize
self.state = np.empty(3, dtype=np.uint8)
# Returns the state of the environment.
def observe(self):
canvas = self.drawState()
canvas = np.reshape(canvas, (-1, self.nbStates))
return canvas
def drawState(self):
canvas = np.zeros((self.gridSize, self.gridSize))
canvas[self.state[0] - 1, self.state[1] - 1] = 1 # Draw the fruit.
# Draw the basket. The basket takes the adjacent two places to the position of basket.
canvas[self.gridSize - 1, self.state[2] - 1 - 1] = 1
canvas[self.gridSize - 1, self.state[2] - 1] = 1
canvas[self.gridSize - 1, self.state[2] - 1 + 1] = 1
return canvas
# Resets the environment. Randomly initialise the fruit position (always at the top to begin with) and bucket.
def reset(self):
initialFruitColumn = random.randrange(1, self.gridSize + 1)
initialBucketPosition = random.randrange(2, self.gridSize + 1 - 1)
self.state = np.array([1, initialFruitColumn, initialBucketPosition])
return self.getState()
def getState(self):
stateInfo = self.state
fruit_row = stateInfo[0]
fruit_col = stateInfo[1]
basket = stateInfo[2]
return fruit_row, fruit_col, basket
# Returns the award that the agent has gained for being in the current environment state.
def getReward(self):
fruitRow, fruitColumn, basket = self.getState()
if (fruitRow == self.gridSize - 1): # If the fruit has reached the bottom.
if (abs(fruitColumn - basket) <= 1): # Check if the basket caught the fruit.
return 1
else:
return -1
else:
return 0
def isGameOver(self):
if (self.state[0] == self.gridSize - 1):
return True
else:
return False
def updateState(self, action):
if (action == 1):
action = -1
elif (action == 2):
action = 0
else:
action = 1
fruitRow, fruitColumn, basket = self.getState()
newBasket = min(max(2, basket + action),
self.gridSize - 1) # The min/max prevents the basket from moving out of the grid.
fruitRow = fruitRow + 1 # The fruit is falling by 1 every action.
self.state = np.array([fruitRow, fruitColumn, newBasket])
# Action can be 1 (move left) or 2 (move right)
def act(self, action):
self.updateState(action)
reward = self.getReward()
gameOver = self.isGameOver()
return self.observe(), reward, gameOver, self.getState() # For purpose of the visual, I also return the state.
# The memory: Handles the internal memory that we add experiences that occur based on agent's actions,
# and creates batches of experiences based on the mini-batch size for training.
class ReplayMemory:
def __init__(self, gridSize, maxMemory, discount):
self.maxMemory = maxMemory
self.gridSize = gridSize
self.nbStates = self.gridSize * self.gridSize
self.discount = discount
canvas = np.zeros((self.gridSize, self.gridSize))
canvas = np.reshape(canvas, (-1, self.nbStates))
self.inputState = np.empty((self.maxMemory, 100), dtype=np.float32)
self.actions = np.zeros(self.maxMemory, dtype=np.uint8)
self.nextState = np.empty((self.maxMemory, 100), dtype=np.float32)
self.gameOver = np.empty(self.maxMemory, dtype=np.bool)
self.rewards = np.empty(self.maxMemory, dtype=np.int8)
self.count = 0
self.current = 0
# Appends the experience to the memory.
def remember(self, currentState, action, reward, nextState, gameOver):
self.actions[self.current] = action
self.rewards[self.current] = reward
self.inputState[self.current, ...] = currentState
self.nextState[self.current, ...] = nextState
self.gameOver[self.current] = gameOver
self.count = max(self.count, self.current + 1)
self.current = (self.current + 1) % self.maxMemory
def getBatch(self, model, batchSize, nbActions, nbStates, sess, X):
# We check to see if we have enough memory inputs to make an entire batch, if not we create the biggest
# batch we can (at the beginning of training we will not have enough experience to fill a batch).
memoryLength = self.count
chosenBatchSize = min(batchSize, memoryLength)
inputs = np.zeros((chosenBatchSize, nbStates))
targets = np.zeros((chosenBatchSize, nbActions))
# Fill the inputs and targets up.
for i in range(chosenBatchSize):
if memoryLength == 1:
memoryLength = 2
# Choose a random memory experience to add to the batch.
randomIndex = random.randrange(1, memoryLength)
current_inputState = np.reshape(self.inputState[randomIndex], (1, 100))
target = sess.run(model, feed_dict={X: current_inputState})
current_nextState = np.reshape(self.nextState[randomIndex], (1, 100))
current_outputs = sess.run(model, feed_dict={X: current_nextState})
# Gives us Q_sa, the max q for the next state.
nextStateMaxQ = np.amax(current_outputs)
if (self.gameOver[randomIndex] == True):
target[0, [self.actions[randomIndex] - 1]] = self.rewards[randomIndex]
else:
# reward + discount(gamma) * max_a' Q(s',a')
# We are setting the Q-value for the action to r + gamma*max a' Q(s', a'). The rest stay the same
# to give an error of 0 for those outputs.
target[0, [self.actions[randomIndex] - 1]] = self.rewards[randomIndex] + self.discount * nextStateMaxQ
# Update the inputs and targets.
inputs[i] = current_inputState
targets[i] = target
return inputs, targets
def main(_):
print("Training new model")
# Define Environment
env = CatchEnvironment(gridSize)
# Define Replay Memory
memory = ReplayMemory(gridSize, maxMemory, discount)
# Add ops to save and restore all the variables.
saver = tf.train.Saver()
winCount = 0
with tf.Session() as sess:
tf.initialize_all_variables().run()
for i in range(epoch):
# Initialize the environment.
err = 0
env.reset()
isGameOver = False
# The initial state of the environment.
currentState = env.observe()
while (isGameOver != True):
action = -9999 # action initilization
# Decides if we should choose a random action, or an action from the policy network.
global epsilon
if (randf(0, 1) <= epsilon):
action = random.randrange(1, nbActions + 1)
else:
# Forward the current state through the network.
q = sess.run(output_layer, feed_dict={X: currentState})
# Find the max index (the chosen action).
index = q.argmax()
action = index + 1
# Decay the epsilon by multiplying by 0.999, not allowing it to go below a certain threshold.
if (epsilon > epsilonMinimumValue):
epsilon = epsilon * 0.999
nextState, reward, gameOver, stateInfo = env.act(action)
if (reward == 1):
winCount = winCount + 1
memory.remember(currentState, action, reward, nextState, gameOver)
# Update the current state and if the game is over.
currentState = nextState
isGameOver = gameOver
# We get a batch of training data to train the model.
inputs, targets = memory.getBatch(output_layer, batchSize, nbActions, nbStates, sess, X)
# Train the network which returns the error.
_, loss = sess.run([optimizer, cost], feed_dict={X: inputs, Y: targets})
err = err + loss
print("Epoch " + str(i) + ": err = " + str(err) + ": Win count = " + str(winCount) + " Win ratio = " + str(
float(winCount) / float(i + 1) * 100))
# Save the variables to disk.
save_path = saver.save(sess, os.getcwd() + "/model.ckpt")
print("Model saved in file: %s" % save_path)
if __name__ == '__main__':
tf.app.run()