-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathturtlebot_rl_eval_.py
executable file
·284 lines (239 loc) · 9.07 KB
/
turtlebot_rl_eval_.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#!/usr/bin/env python3
"""
This script uses Gazebo simulation environment via ROS interface for an 'RL Evaluation' Task in PyTorch.
Task - Evaluate RL model from Dubins Gym to navigate from any random point to global origin
"""
import rospy
import time
from std_msgs.msg import Bool, Float32, Float64
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
import numpy as np
import matplotlib.pyplot as plt
import math
import gym
from agent import ContinuousDubinGym, DiscreteDubinGym
from gym import spaces
from std_srvs.srv import Empty
import argparse
import datetime
import itertools
import torch, gc
import sys
sys.path.append('./algorithm/SAC')
from sac import SAC
from replay_memory import ReplayMemory
from torch.utils.tensorboard import SummaryWriter
MAX_STEER = 2.84
MAX_SPEED = 0.22
MIN_SPEED = 0.
THRESHOLD_DISTANCE_2_GOAL = 0.05
GRID = 3.
THETA0 = np.pi/4
# Global Initialisation
pos = [0,0]
yaw_car = 0
done = False
episode_steps = 0
x_pub = rospy.Publisher('/cmd_vel',Twist,queue_size=1)
parser = argparse.ArgumentParser(description='PyTorch Soft Actor-Critic Args')
parser.add_argument('--policy', default="Gaussian",
help='Policy Type: Gaussian | Deterministic (default: Gaussian)')
parser.add_argument('--eval', type=bool, default=True,
help='Evaluates a policy a policy every 10 episode (default: True)')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='discount factor for reward (default: 0.99)')
parser.add_argument('--tau', type=float, default=0.005, metavar='G',
help='target smoothing coefficient(τ) (default: 0.005)')
parser.add_argument('--lr', type=float, default=0.0003, metavar='G',
help='learning rate (default: 0.0003)')
parser.add_argument('--alpha', type=float, default=0.2, metavar='G',
help='Temperature parameter α determines the relative importance of the entropy\
term against the reward (default: 0.2)')
parser.add_argument('--automatic_entropy_tuning', type=bool, default=False, metavar='G',
help='Automaically adjust α (default: False)')
parser.add_argument('--seed', type=int, default=123456, metavar='N',
help='random seed (default: 123456)')
parser.add_argument('--batch_size', type=int, default=256, metavar='N',
help='batch size (default: 256)')
parser.add_argument('--num_steps', type=int, default=500000, metavar='N',
help='maximum number of steps (default: 1000000)')
parser.add_argument('--hidden_size', type=int, default=256, metavar='N',
help='hidden size (default: 256)')
parser.add_argument('--updates_per_step', type=int, default=1, metavar='N',
help='model updates per simulator step (default: 1)')
parser.add_argument('--start_steps', type=int, default=1, metavar='N',
help='Steps sampling random actions (default: 10000)')
parser.add_argument('--target_update_interval', type=int, default=1, metavar='N',
help='Value target update per no. of updates per step (default: 1)')
parser.add_argument('--replay_size', type=int, default=1000000, metavar='N',
help='size of replay buffer (default: 10000000)')
parser.add_argument('--cuda',type=int, default=0, metavar='N',
help='run on CUDA (default: False)')
parser.add_argument('--max_episode_length', type=int, default=3000, metavar='N',
help='max episode length (default: 3000)')
args = parser.parse_args()
class ContinuousDubinGym(gym.Env):
def __init__(self):
super(ContinuousDubinGym,self).__init__()
metadata = {'render.modes': ['console']}
print("Initialising Continuous Dubin Gym Enviuronment...")
self.action_space = spaces.Box(np.array([-0.22, -2.84]), np.array([0.22, 2.84]), dtype = np.float16) # max rotational velocity of burger is 2.84 rad/s
low = np.array([-1.,-1.,-4.])
high = np.array([1.,1.,4.])
self.observation_space = spaces.Box(low, high, dtype=np.float16)
self.target = [0., 0., 1.57]
self.pose = [0., 0., 1.57]
self.action = [0., 0.]
self.traj_x = [self.pose[0]]
self.traj_y = [self.pose[1]]
self.traj_yaw = [self.pose[2]]
def reset(self):
self.pose = np.array([0., 0., 0.])
x = random.uniform(-1., 1.)
y = random.choice([-1., 1.])
self.target[0], self.target[1] = random.choice([[x, y], [y, x]])
head_to_target = self.get_heading(self.pose, self.target)
yaw = random.uniform(head_to_target - THETA0, head_to_target + THETA0)
self.pose[2] = yaw
self.target[2] = yaw
self.traj_x = [0.]
self.traj_y = [0.]
self.traj_yaw = [self.pose[2]]
print("Reset target to : [{:.2f}, {:.2f}]".format(self.target[0], self.target[1]))
obs = [(self.target[0] - self.pose[0])/GRID, (self.target[1] - self.pose[1])/GRID, head_to_target - self.pose[2]]
obs = [round(x, 2) for x in obs]
return np.array(obs)
def get_distance(self,x1,x2):
return math.sqrt((x1[0] - x2[0])**2 + (x1[1] - x2[1])**2)
def get_heading(self, x1,x2):
return math.atan2((x2[1] - x1[1]), (x2[0] - x1[0]))
def get_reward(self):
x_target = self.target[0]
y_target = self.target[1]
yaw_target = self.target[2]
x = self.pose[0]
y = self.pose[1]
yaw_car = self.pose[2]
head_to_target = self.get_heading(self.pose, self.target)
alpha = head_to_target - yaw_car
ld = self.get_distance(self.pose, self.target)
crossTrackError = math.sin(alpha) * ld
headingError = abs(alpha)
alongTrackError = abs(x - x_target) + abs(y - y_target)
return -1*(abs(crossTrackError)**2 + alongTrackError + 3*headingError/1.57)/6
def check_goal(self):
done = False
if abs(self.pose[0]) < GRID and abs(self.pose[1]) < GRID:
if (abs(self.pose[0]-self.target[0]) < THRESHOLD_DISTANCE_2_GOAL and abs(self.pose[1]-self.target[1]) < THRESHOLD_DISTANCE_2_GOAL):
done = True
reward = 10
print("Goal Reached!")
self.stop_car()
else:
reward = self.get_reward()
else:
done = True
reward = -1
print("Outside Range")
self.stop_car()
return done, reward
def step(self,action):
reward = 0
done = False
info = {}
self.action = [round(x, 2) for x in action]
msg = Twist()
msg.linear.x = self.action[0]
msg.angular.z = self.action[1]
print("Lin Vel : , Rot Vel : ", msg.linear.x, msg.angular.z)
x_pub.publish(msg)
time.sleep(0.02)
head_to_target = self.get_heading(self.pose, self.target)
# done, reward = self.check_goal()
obs = [(self.target[0] - self.pose[0])/GRID, (self.target[1] - self.pose[1])/GRID, head_to_target - self.pose[2]]
obs = [round(x, 2) for x in obs]
return np.array(obs), reward, done, info
def stop_car(self):
'''
Stop the vehicle
'''
global x_pub
msg = Twist()
msg.linear.x = 0.
msg.angular.z = 0.
x_pub.publish(msg)
time.sleep(1)
# RL Model paths
actor_path = "models/eshika/sac_actor_burger_reward6"
critic_path = "models/eshika/sac_critic_burger_reward6"
# Instantiate RL Environment and load saved model
env = ContinuousDubinGym()
env.target = [1.0, -1.0, 1.57]
agent = SAC(env.observation_space.shape[0], env.action_space, args)
memory = ReplayMemory(args.replay_size, args.seed)
agent.load_model(actor_path, critic_path)
state = np.zeros(env.observation_space.shape[0])
def euler_from_quaternion(x, y, z, w):
'''
Convert a quaternion into euler angles (roll, pitch, yaw)
roll is rotation around x in radians (counterclockwise)
pitch is rotation around y in radians (counterclockwise)
yaw is rotation around z in radians (counterclockwise)
'''
t0 = +2.0 * (w * x + y * z)
t1 = +1.0 - 2.0 * (x * x + y * y)
roll_x = math.atan2(t0, t1)
t2 = +2.0 * (w * y - z * x)
t2 = +1.0 if t2 > +1.0 else t2
t2 = -1.0 if t2 < -1.0 else t2
pitch_y = math.asin(t2)
t3 = +2.0 * (w * z + x * y)
t4 = +1.0 - 2.0 * (y * y + z * z)
yaw_z = math.atan2(t3, t4)
return roll_x, pitch_y, yaw_z # in radians
def pose_callback(pose_data):
'''
Callback function to run the trained RL policy
'''
global episode_steps, action, done, sub
pos[0] = pose_data.pose.pose.position.x
pos[1] = pose_data.pose.pose.position.y
orientation = pose_data.pose.pose.orientation
q = (orientation.x,orientation.y,orientation.z,orientation.w)
euler = euler_from_quaternion(q[0], q[1], q[2], q[3])
head_to_target = env.get_heading([pos[0], pos[1]], env.target)
yaw = euler[2]
state = [(env.target[0] - pos[0])/GRID, (env.target[1] - pos[1])/GRID, head_to_target - yaw]
state = np.array([round(x, 2) for x in state])
done = False
print("State : ", state)
# Sample action from policy
action = agent.select_action(state, True)
print("Network Output : ", action)
if (env.target[0] - pos[0]) < THRESHOLD_DISTANCE_2_GOAL and (env.target[1] - pos[1]) < THRESHOLD_DISTANCE_2_GOAL :
print(env.target[0] - pos[0], env.target[1] - pos[1])
print("Goal Reached")
done = True
# Stop the car and reset episode
env.stop_car()
# env.reset()
sub.unregister()
print('Counter:',episode_steps)
else:
# Execute action
next_state, reward, done, _ = env.step(action)
episode_steps += 1
def start():
'''
Subscribe to robot pose topic and initiate callback thread
'''
global sub, ts, episode_steps, action1, action2
rospy.init_node('burger_gym', anonymous=True)
sub = pose_subscriber = rospy.Subscriber("/odom", Odometry, pose_callback)
rospy.spin()
if __name__ == '__main__':
try:
start()
except rospy.ROSInterruptException:
pass