-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrun.py
146 lines (117 loc) · 5.51 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
Script for running RHEA experiments in the Illustrative Domain.
"""
import argparse
import random
import numpy as np
from tqdm import tqdm
from expert_models import expert_model_1, expert_model_2, expert_model_3
from evolutionary_operators import random_candidate, rules_mutate, rules_crossover
from nsga2_parent_selector import Nsga2ParentSelector
from predictor import evaluate_candidate, collect_behavior, compute_oracle_front
parser = argparse.ArgumentParser()
parser.add_argument("--method", help="either rhea or evolution", required=True)
args = parser.parse_args()
method = args.method
assert method in ['rhea', 'evolution']
results_file = f'{method}_results.csv'
selector_params = {'evolution': {'fitness': [{"metric_name": "utility", "maximize": True},
{"metric_name": "cost", "maximize": False}]}}
selector = Nsga2ParentSelector(experiment_params=selector_params)
optimal_front = compute_oracle_front()
pop_size = 20
n_generations = 500
n_trials = 100
for trial in range(n_trials):
for n_actions in [10, 15, 20, 25, 30, 35, 40, 45, 50]:
# Initialize population
if method == 'rhea':
initial_individuals = [expert_model_1, expert_model_2, expert_model_3]
else:
initial_individuals = [random_candidate(n_actions=n_actions) for i in range(3)]
population = []
behavior_set = {} # Use to prevent duplicate behaviors
curr_id = 0
# Start by evaluating initial individuals and placing them in the population
for candidate in initial_individuals:
utility, cost = evaluate_candidate(candidate)
indy = {'rules': candidate,
'metrics': {'utility': utility,
'cost': cost},
'id': curr_id
}
behavior = collect_behavior(candidate)
behavior_set[behavior] = curr_id
indy['behavior'] = behavior
population.append(indy)
curr_id += 1
# Run evolution
for gen in tqdm(range(n_generations)):
# Compute closeness to oracle front
utilities = [indy['metrics']['utility'] for indy in population]
costs = [indy['metrics']['cost'] for indy in population]
pop_metrics = set()
for utility, cost in zip(utilities, costs):
pop_metrics.add((utility, cost))
still_to_find = []
for utility, cost in optimal_front[0]:
if (utility, cost) not in pop_metrics:
still_to_find.append((utility, cost))
if (len(still_to_find) == 0) or (gen == n_generations - 1):
print(f"\n# actions: {n_actions}; gens: {gen}\n")
with open(results_file, 'a') as f:
f.write(f"{n_actions},{gen},{still_to_find}\n")
break
# Generate new individuals
generation_size = 10 * pop_size
new_candidates = []
while len(population) + len(new_candidates) < generation_size:
# Select parents
parent1_idx, parent2_idx = np.random.choice(np.arange(len(population)), 2, replace=False)
parent1 = population[parent1_idx]
parent2 = population[parent2_idx]
# Crossover
rules1 = parent1['rules']
rules2 = parent2['rules']
child = rules_crossover(rules1, rules2)
# Mutate
child = rules_mutate(child, n_actions=n_actions)
# Add to new candidates list if non-trivial and non-duplicate behavior
if len(child) > 0:
behavior = collect_behavior(child)
if behavior not in behavior_set:
new_candidates.append({'rules': child, 'id': curr_id, 'behavior': behavior})
behavior_set[behavior] = curr_id
curr_id += 1
else:
prev_id = behavior_set[behavior]
if np.random.random() < 0.5:
for i in range(len(population)):
if population[i]['id'] == prev_id:
population[i]['rules'] = child
# Evaluate new individuals
for candidate in new_candidates:
utility, cost = evaluate_candidate(candidate['rules'])
candidate['metrics'] = {'utility': utility,
'cost': cost}
# Refine population
population = population + new_candidates
random.shuffle(population)
population = selector.sort_individuals(population)
for indy in population[pop_size:]:
del behavior_set[indy['behavior']]
population = population[:pop_size]
# Add back random or expert solutions
for candidate in initial_individuals:
utility, cost = evaluate_candidate(candidate)
indy = {'rules': candidate,
'metrics': {'utility': utility,
'cost': cost},
'id': curr_id
}
behavior = collect_behavior(candidate)
if behavior not in behavior_set:
behavior_set[behavior] = curr_id
indy['behavior'] = behavior
population.append(indy)
curr_id += 1