-
Notifications
You must be signed in to change notification settings - Fork 31
/
gym_solver.py
111 lines (86 loc) · 3.6 KB
/
gym_solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import argparse
import gym
import os
import numpy as np
from neat import nn, population, statistics, parallel
### User Params ###
# The name of the game to solve
game_name = 'CartPole-v0'
### End User Params ###
parser = argparse.ArgumentParser(description='OpenAI Gym Solver')
parser.add_argument('--max-steps', dest='max_steps', type=int, default=1000,
help='The max number of steps to take per genome (timeout)')
parser.add_argument('--episodes', type=int, default=1,
help="The number of times to run a single genome. This takes the fitness score from the worst run")
parser.add_argument('--render', action='store_true')
parser.add_argument('--generations', type=int, default=50,
help="The number of generations to evolve the network")
parser.add_argument('--checkpoint', type=str,
help="Uses a checkpoint to start the simulation")
parser.add_argument('--num-cores', dest="numCores", type=int, default=4,
help="The number cores on your computer for parallel execution")
args = parser.parse_args()
def simulate_species(net, env, episodes=1, steps=5000, render=False):
fitnesses = []
for runs in range(episodes):
inputs = my_env.reset()
cum_reward = 0.0
for j in range(steps):
outputs = net.serial_activate(inputs)
action = np.argmax(outputs)
inputs, reward, done, _ = env.step(action)
if render:
env.render()
if done:
break
cum_reward += reward
fitnesses.append(cum_reward)
fitness = np.array(fitnesses).mean()
print("Species fitness: %s" % str(fitness))
return fitness
def worker_evaluate_genome(g):
net = nn.create_feed_forward_phenotype(g)
return simulate_species(net, my_env, args.episodes, args.max_steps, render=args.render)
def train_network(env):
def evaluate_genome(g):
net = nn.create_feed_forward_phenotype(g)
return simulate_species(net, env, args.episodes, args.max_steps, render=args.render)
def eval_fitness(genomes):
for g in genomes:
fitness = evaluate_genome(g)
g.fitness = fitness
# Simulation
local_dir = os.path.dirname(__file__)
config_path = os.path.join(local_dir, 'gym_config')
pop = population.Population(config_path)
# Load checkpoint
if args.checkpoint:
pop.load_checkpoint(args.checkpoint)
# Start simulation
if args.render:
pop.run(eval_fitness, args.generations)
else:
pe = parallel.ParallelEvaluator(args.numCores, worker_evaluate_genome)
pop.run(pe.evaluate, args.generations)
pop.save_checkpoint("checkpoint")
# Log statistics.
statistics.save_stats(pop.statistics)
statistics.save_species_count(pop.statistics)
statistics.save_species_fitness(pop.statistics)
print('Number of evaluations: {0}'.format(pop.total_evaluations))
# Show output of the most fit genome against training data.
winner = pop.statistics.best_genome()
# Save best network
import pickle
with open('winner.pkl', 'wb') as output:
pickle.dump(winner, output, 1)
print('\nBest genome:\n{!s}'.format(winner))
print('\nOutput:')
raw_input("Press Enter to run the best genome...")
winner_net = nn.create_feed_forward_phenotype(winner)
for i in range(100):
simulate_species(winner_net, env, 1, args.max_steps, render=True)
my_env = gym.make(game_name)
print "Input Nodes: %s" % str(len(my_env.observation_space.high))
print "Output Nodes: %s" % str(my_env.action_space.n)
train_network(my_env)