-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrollout_thread.py
executable file
·69 lines (53 loc) · 1.63 KB
/
rollout_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import sys
import numpy as np
import random
import time
import math
from utils import noise_and_argmax
from env.terrain import Terrain
class RolloutThread(object):
def __init__(
self,
task,
start_x,
start_y,
num_steps,
policy,
map_index,
use_laser,
noise_argmax):
self.task = task
self.start_x = start_x
self.start_y = start_y
self.num_steps = num_steps
self.policy = policy
self.noise_argmax = noise_argmax
self.env = Terrain(map_index, use_laser)
def rollout(self):
states, tasks, actions, rewards_of_episode, next_states = [], [], [], [], []
self.env.resetgame(self.task, self.start_x, self.start_y)
state = self.env.player.getposition()
step = 1
while True:
if step > self.num_steps:
break
if self.noise_argmax:
logit = self.policy[state[0], state[1], self.task, 0]
action = noise_and_argmax(logit)
else:
pi = self.policy[state[0], state[1], self.task, 1]
action = np.random.choice(range(len(pi)), p = np.array(pi)/ np.sum(pi)) # select action w.r.t the actions prob
reward, done = self.env.player.action(action)
next_state = self.env.player.getposition()
# Store results
states.append(state)
tasks.append(self.task)
actions.append(action)
rewards_of_episode.append(reward)
state = next_state
next_states.append(next_state)
if done:
break
step += 1
redundant_steps = step + self.env.min_dist[self.task][states[-1][1], states[-1][0]] - self.env.min_dist[self.task][self.start_y, self.start_x]
return states, tasks, actions, rewards_of_episode, next_states, redundant_steps