-
Notifications
You must be signed in to change notification settings - Fork 0
/
bandit.py
213 lines (185 loc) · 8.26 KB
/
bandit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import copy
import numpy as np
from gym import Env, spaces
from scipy.special import comb
class NoStatBernoulli(Env):
"""
Basic bandit setting, independent reward between arms
Observation: None
Action: [0,n) (n is # of arms)
Reward: {0,1} bernoulli distribution of chosen arm
"""
def __init__(self, n_arms, n_tasks):
while True:
self.p_dist = np.random.uniform(size=(n_tasks, n_arms))
if np.max(self.p_dist) > 0:
break
self.r_dist = np.full((n_tasks, n_arms), 1)
self.n_tasks = n_tasks
self.n_arms = n_arms
self.action_space = spaces.Discrete(self.n_arms)
self.observation_space = spaces.Discrete(1)
self.reset_task(0)
def step(self, action):
action = int(np.rint(action))
reward = np.random.choice(2, p=[1 - self._p[action], self._p[action]]) * self._r[action]
done = False
ob = self._get_obs()
self.cur_step += 1
return ob, reward, done, dict()
def _get_obs(self):
return -1
def reset_task(self, idx):
"""change current env to idx"""
self.cur_task = int(idx)
self._p = self.p_dist[self.cur_task]
self._r = self.r_dist[self.cur_task]
return self.reset()
def reset(self):
self.cur_step = -1
return self._get_obs()
def viewer_setup(self):
print("no viewer")
pass
def render(self):
print("no render")
pass
class Bernoulli(NoStatBernoulli):
"""
Observation: return statistic (avg_0, #_chosen_0, avg_1, ...)
"""
def __init__(self, n_arms, n_tasks):
super().__init__(n_arms, n_tasks)
self.observation_space = spaces.Discrete(2 * n_arms)
def _get_obs(self):
return copy.deepcopy(self.counter)
def reset(self):
self.caches = [] # For debug only
self.total_reward = 0
self.counter = np.zeros((2 * self.n_arms,))
self.cur_step = -1
return self._get_obs()
def step(self, action):
action = int(np.rint(action))
reward = np.random.choice(2, p=[1 - self._p[action], self._p[action]]) * self._r[action]
self.total_reward += reward
choice_avg = self.counter[2 * action]
choice_no_pick = self.counter[2 * action + 1]
self.counter[2 * action] = (choice_avg * choice_no_pick + reward) / (choice_no_pick + 1)
self.counter[2 * action + 1] = choice_no_pick + 1
done = False
self.cur_step += 1
return copy.deepcopy(self.counter), reward, done, self.caches
class MetaStochastic(Bernoulli):
"""
All task's optimal arms is in a sub-group
"""
def __init__(self, n_arms, opt_size, n_tasks, horizon=None, **kwargs):
self.r_dist = np.full((n_tasks, n_arms), 1)
self.n_tasks = n_tasks
self.n_arms = n_arms
self.horizon = horizon
self.action_space = spaces.Discrete(self.n_arms)
self.observation_space = spaces.Discrete(2 * n_arms)
self.opt_size = opt_size
self.gap_constrain = kwargs["gap_constrain"]
self.n_experts = comb(self.n_arms, self.opt_size)
self.opt_indices = np.arange(n_arms)
np.random.shuffle(self.opt_indices)
self.sub_opt_indices = self.opt_indices[opt_size:]
self.opt_indices = np.sort(self.opt_indices[:opt_size]) # The indices of the optimal sub-group
low = 1e-6
if self.gap_constrain is not None:
low = self.gap_constrain
while True:
self.p_dist = np.zeros((n_tasks, n_arms))
opt_values = np.random.uniform(low=low, size=(n_tasks,))
while True:
opt_indices = np.random.choice(self.opt_indices, size=(n_tasks,))
if len(list(set(opt_indices.tolist()))) == opt_size:
break
temp = np.random.uniform(high=opt_values - low, size=(n_arms, n_tasks))
if self.gap_constrain is None: # force violate gap threshold
GAP_THRESHOLD = np.sqrt(self.n_arms * np.log(self.n_tasks) / self.horizon)
second_best_arms = np.random.uniform(
low=opt_values - GAP_THRESHOLD, high=opt_values, size=(1, n_tasks)
)
second_best_arms[np.where(second_best_arms < 0)] = 0
second_best_arms_idx = np.random.randint(
n_arms, size=n_tasks
) # some value can be duplicate with opt_indices, but doesn't matter
for i in range(n_tasks): # TODO: slow
temp[second_best_arms_idx[i]][i] = second_best_arms[0][i]
self.p_dist = temp.T
self.p_dist[np.arange(n_tasks), opt_indices] = opt_values
if np.max(opt_values) > 0:
break
self.reset_task(0)
if kwargs["quiet"] is False:
print(f"opt_indices = {self.opt_indices}")
class NonObliviousMetaAdversarial(MetaStochastic):
def __init__(self, n_arms, opt_size, n_tasks, horizon, **kwargs):
super().__init__(n_arms, opt_size, n_tasks, horizon, **kwargs)
self.B_TK = np.sqrt(horizon * n_arms * np.log(n_arms))
self.B_TM = np.sqrt(horizon * opt_size)
def generate_next_task(self, EXT_set):
if self.cur_task > self.n_tasks - 2: # Final task
return
if EXT_set is None: # First round
opt_arm = np.random.choice(self.opt_indices)
else:
correct_EXT_set = np.intersect1d(self.opt_indices, EXT_set)
G = np.sqrt(2 * (self.B_TK - self.B_TM) * (self.horizon - self.B_TM) * (self.n_tasks - self.cur_task - 2))
q = (self.B_TK - self.B_TM) / (
self.horizon - self.B_TM + G
) # probability that next task optimal arm is NOT in correct_EXT_set
self.is_out_of_set = bool(np.random.choice(2, p=[1 - q, q]))
inv_correct_EXT_set = np.setdiff1d(self.opt_indices, correct_EXT_set)
if len(inv_correct_EXT_set) == 0:
opt_arm = np.random.choice(correct_EXT_set)
else:
if self.is_out_of_set is True or len(correct_EXT_set) == 0:
opt_arm = np.random.choice(inv_correct_EXT_set)
else:
opt_arm = np.random.choice(correct_EXT_set)
low = 1e-6
if self.gap_constrain is not None:
low = self.gap_constrain
opt_values = np.random.uniform(low=low)
if self.gap_constrain is None: # force violate gap threshold
GAP_THRESHOLD = np.sqrt(self.n_arms * np.log(self.n_tasks) / self.horizon)
while True:
next_p = np.random.uniform(high=opt_values - low, size=(self.n_arms,))
next_p[opt_arm] = 0
if opt_values - GAP_THRESHOLD < max(next_p):
break
else:
next_p = np.random.uniform(high=opt_values - low, size=(self.n_arms,))
next_p[opt_arm] = opt_values
self.p_dist[self.cur_task + 1] = next_p
class MetaAdversarial(NonObliviousMetaAdversarial):
"""
Using internal EXT_set instead of looking at the agent's EXT_set
"""
def __init__(self, n_arms, opt_size, n_tasks, horizon, **kwargs):
super().__init__(n_arms, opt_size, n_tasks, horizon, **kwargs)
self.EXT_set = []
for i in range(self.n_tasks - 1):
self.reset_task(i)
opt_arm_idx = np.argmax(self.p_dist[i])
if opt_arm_idx not in self.EXT_set:
G = np.sqrt(
2 * (self.B_TK - self.B_TM) * (self.horizon - self.B_TM) * (self.n_tasks - self.cur_task - 2)
)
p = (self.horizon - self.B_TM) / (
self.horizon - self.B_TM + G
) # probability that the agent will EXR and find this new optimal arm
# Commented aboved are part 3.1 (gap condition satisfied). Below are the general strategy
# p = np.sqrt((self.opt_size*self.horizon)/(self.n_tasks*self.B_TK))
does_find_new_arm = bool(np.random.choice(2, p=[1 - p, p]))
if does_find_new_arm is True:
self.EXT_set.append(opt_arm_idx)
super().generate_next_task(self.EXT_set)
self.reset_task(0)
def generate_next_task(self, EXT_set): # Placeholder
pass