This repository was archived by the owner on Sep 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathenv_util.py
141 lines (124 loc) · 4.84 KB
/
env_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from gym.wrappers.time_limit import TimeLimit
from envs.frozen_lake import FrozenLakeEnv
from envs.hopper import HopperEnv
from envs.cliff_envs import CliffCheetahEnv, CliffWalkerEnv
from envs.pusher import PusherEnv
from envs.peg_insertion import PegInsertionEnv
import numpy as np
def get_env(env_name, safety_param=0):
# Reset reward should be in [-1, 0]
if env_name in ['small-gridworld', 'large-gridworld']:
if env_name == 'small-gridworld':
map_name = '4x4'
max_episode_steps = 30
num_training_iterations = 20000
else:
map_name = '8x8'
max_episode_steps = 100
num_training_iterations = 20000
env = FrozenLakeEnv(map_name=map_name)
done_state = np.zeros(env.nS)
done_state[0] = 1
def reset_done_fn(s):
return np.all(s == done_state)
def reset_reward_fn(s, a):
return float(reset_done_fn(s)) - 1.0
agent_type = 'DDQNAgent'
elif env_name == 'hopper':
env = HopperEnv()
def reset_reward_fn(s):
height = s[0]
ang = s[1]
return (height > .7) and (abs(ang) < .2) - 1.0
def reset_done_fn(s, a):
return float(reset_done_fn(s)) - 1.0
agent_type = 'DDPGAgent'
max_episode_steps = 1000
num_training_iterations = 1000000
elif env_name == 'ball-in-cup':
# Only import control suite if used. All other environments can be
# used without the control suite dependency.
from dm_control.suite.ball_in_cup import BallInCup
env = BallInCup()
reset_state = np.array([0., 0., 0., -0.05, 0., 0., 0., 0.])
def reset_reward_fn(s):
dist = np.linalg.norm(reset_state - s)
return np.clip(1.0 - 0.5 * dist, 0, 1) - 1.0
def reset_done_fn(s):
return (reset_reward_fn(s) > 0.7)
max_episode_steps = 50
agent_type = 'DDPGAgent'
num_training_iterations = 1000000
elif env_name == 'peg-insertion':
env = PegInsertionEnv()
def reset_reward_fn(s, a):
(forward_reward, reset_reward) = env.env._get_rewards(s, a)
return reset_reward - 1.0
def reset_done_fn(s):
a = np.zeros(env.action_space.shape[0])
return (reset_reward_fn(s, a) > 0.7)
max_episode_steps = 50
num_training_iterations = 1000000
agent_type = 'DDPGAgent'
elif env_name == 'pusher':
env = PusherEnv()
def reset_reward_fn(s, a):
(forward_reward, reset_reward) = env.env._get_rewards(s, a)
return reset_reward - 1.0
def reset_done_fn(s):
a = np.zeros(env.action_space.shape[0])
return (reset_reward_fn(s, a) > 0.7)
max_episode_steps = 100
num_training_iterations = 1000000
agent_type = 'DDPGAgent'
elif env_name == 'cliff-walker':
dist = 6
env = CliffWalkerEnv()
def reset_reward_fn(s, a):
(forward_reward, reset_reward) = env.env._get_rewards(s, a)
return (reset_reward > 0.7) - 1.0
def reset_done_fn(s):
a = np.zeros(env.action_space.shape[0])
return (reset_reward_fn(s, a) > 0.7)
max_episode_steps = 500
num_training_iterations = 1000000
agent_type = 'DDPGAgent'
elif env_name == 'cliff-cheetah':
dist = 14
env = CliffCheetahEnv()
def reset_reward_fn(s, a):
(forward_reward, reset_reward) = env.env._get_rewards(s, a)
return (reset_reward > 0.7) - 1.0
def reset_done_fn(s):
a = np.zeros(env.action_space.shape[0])
return (reset_reward_fn(s, a) > 0.7)
max_episode_steps = 500
agent_type = 'DDPGAgent'
num_training_iterations = 1000000
else:
raise ValueError('Unknown environment: %s' % env_name)
env = TimeLimit(env, max_episode_steps=max_episode_steps)
q_min = -1 * (1. - safety_param) * env._max_episode_steps
lnt_params = {
'reset_reward_fn': reset_reward_fn,
'reset_done_fn': reset_done_fn,
'q_min': q_min,
}
agent_params = {
'num_training_iterations': num_training_iterations,
'agent_type': agent_type,
}
return (env, lnt_params, agent_params)