-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathload_balance_actor_agent.py
195 lines (149 loc) · 6.2 KB
/
load_balance_actor_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import numpy as np
import tensorflow as tf
from utils import *
from param import *
from nn_ops import *
class ActorAgent(object):
def __init__(self, sess, eps=args.eps, act_fn=leaky_relu,
optimizer=tf.train.AdamOptimizer,
scope='actor_agent'):
self.sess = sess
self.scope = scope
self.eps = eps
self.act_fn = act_fn
self.optimizer = optimizer
self.input_dim = args.num_workers + 1 # (queue size, job_size)
self.hid_dims = args.hid_dims
self.output_dim = args.num_workers # priority (to softmax over)
# input dimension: [batch_size, num_workers + 1]
self.inputs = tf.placeholder(tf.float32, [None, self.input_dim])
# initialize nn parameters
self.weights, self.bias = self.nn_init(
self.input_dim, self.hid_dims, self.output_dim)
# actor network
self.act_probs = self.actor_network(
self.inputs, self.weights, self.bias)
# sample an action (from OpenAI baselines)
logits = tf.log(self.act_probs)
noise = tf.random_uniform(tf.shape(logits))
self.act = tf.argmax(logits - tf.log(-tf.log(noise)), 1)
# selected action: [batch_size, num_workers]
self.act_vec = tf.placeholder(tf.float32, [None, self.output_dim])
# advantage term
self.adv = tf.placeholder(tf.float32, [None, 1])
# use entropy to promote exploration, this term decays over time
self.entropy_weight = tf.placeholder(tf.float32, ())
# select action probability
self.selected_act_prob = tf.reduce_sum(tf.multiply(
self.act_probs, self.act_vec),
reduction_indices=1, keep_dims=True)
# actor loss due to advantge (negated)
self.adv_loss = tf.reduce_sum(tf.multiply(
tf.log(self.selected_act_prob + \
self.eps), -self.adv))
# entropy loss (normalized)
self.entropy_loss = tf.reduce_sum(tf.multiply(
self.act_probs, tf.log(self.act_probs + self.eps))) / \
np.log(args.num_workers)
# define combined loss
self.loss = self.adv_loss + self.entropy_weight * self.entropy_loss
# get training parameters
self.params = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES, scope=self.scope)
# operations for setting parameters
self.input_params, self.set_params_op = \
self.define_params_op()
# actor gradients
self.act_gradients = tf.gradients(self.loss, self.params)
# adaptive learning rate
self.lr_rate = tf.placeholder(tf.float32, shape=[])
# actor optimizer
self.act_opt = self.optimizer(self.lr_rate).minimize(self.loss)
# apply gradient directly to update parameters
self.apply_grads = self.optimizer(self.lr_rate).\
apply_gradients(zip(self.act_gradients, self.params))
def nn_init(self, input_dim, hid_dims, output_dim):
weights = []
bias = []
curr_in_dim = input_dim
# hidden layers
for hid_dim in hid_dims:
weights.append(
glorot([curr_in_dim, hid_dim], scope=self.scope))
bias.append(
zeros([hid_dim], scope=self.scope))
curr_in_dim = hid_dim
# output layer
weights.append(glorot([curr_in_dim, output_dim], scope=self.scope))
bias.append(zeros([output_dim], scope=self.scope))
return weights, bias
def actor_network(self, inputs, weights, bias):
# non-linear feed forward
x = inputs
for (w, b) in zip(weights[:-1], bias[:-1]):
x = tf.matmul(x, w)
x += b
x = self.act_fn(x)
# final linear output layer
x = tf.matmul(x, weights[-1])
x += bias[-1]
# softmax
x = tf.nn.softmax(x, dim=-1)
return x
def apply_gradients(self, gradients, lr_rate):
self.sess.run(self.apply_grads, feed_dict={
i: d for i, d in zip(
self.act_gradients + [self.lr_rate],
gradients + [lr_rate])
})
def define_params_op(self):
# define operations for setting network parameters
input_params = []
for param in self.params:
input_params.append(
tf.placeholder(tf.float32, shape=param.get_shape()))
set_params_op = []
for idx, param in enumerate(input_params):
set_params_op.append(self.params[idx].assign(param))
return input_params, set_params_op
def get_params(self):
return self.sess.run(self.params)
def set_params(self, input_params):
self.sess.run(self.set_params_op, feed_dict={
i: d for i, d in zip(self.input_params, input_params)
})
def predict(self, inputs):
return self.sess.run(self.act, feed_dict={
self.inputs: inputs
})
def get_gradients(self, inputs, act_vec, adv, entropy_weight):
return self.sess.run(
[self.act_gradients, [self.adv_loss, self.entropy_loss]],
feed_dict={
self.inputs: inputs,
self.act_vec: act_vec,
self.adv: adv,
self.entropy_weight: entropy_weight
})
def compute_gradients(self, batch_inputs, batch_act_vec, \
batch_adv, entropy_weight):
# stack into batch format
inputs = np.vstack(batch_inputs)
act_vec = np.vstack(batch_act_vec)
# invoke learning model
gradients, loss = self.get_gradients(
inputs, act_vec, batch_adv, entropy_weight)
# append baseline loss
loss.append(np.mean(batch_adv ** 2))
return gradients, loss
def get_action(self, state):
workers, job, _ = state
inputs = np.zeros([1, args.num_workers + 1])
for worker in workers:
inputs[0, worker.worker_id] = \
min(sum(j.size for j in worker.queue) / \
args.job_size_norm_factor / 5.0, # normalization
20.0)
inputs[0, -1] = min(job.size / args.job_size_norm_factor, 10.0) # normalization
action = self.predict(inputs)
return action[0]