-
Notifications
You must be signed in to change notification settings - Fork 0
/
pgn_to_input.py
83 lines (66 loc) · 2.55 KB
/
pgn_to_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import random
import time
from itertools import count, islice
import numpy as np
import chess.pgn
from board_code import board_to_input, move_to_complete_output, move_to_output
from filenames import NUMPY_FILE, PGN_DATABASE
from more_itertools import flatten, unique_everseen
def is_valid_game(game) -> bool:
try:
elo_enough = (
int(game.headers["WhiteElo"]) > 1700
and int(game.headers["BlackElo"]) > 1700
)
time_control = int(game.headers["TimeControl"].split("+")[0]) > 500
return elo_enough and time_control
except ValueError:
return False
def game_to_boards(game):
boards = []
board = chess.Board()
for move in game.mainline_moves():
boards.append((board.copy(), move))
board.push(move)
return random.sample(boards, min(10, len(boards)))
def inspect(iterator):
def print_ret(v):
print(v)
return v
return map(print_ret, iterator)
def load(n: int) -> np.ndarray:
start_time = time.time()
def output_if_significant(iterator):
for (index, value) in enumerate(iterator, 1):
if index % 100 == 0:
current_time = time.time()
time_per_step = (current_time - start_time) / index
eta = time_per_step * (n - index)
formatted_time = time.strftime("%H:%M:%S", time.gmtime(eta))
formatted_current_time = time.strftime(
"%H:%M:%S", time.gmtime(current_time - start_time)
)
print(
f"Processed {index} / {n} boards in {formatted_current_time}, ETA: {formatted_time} ({time_per_step*1000:.0f} ms per step)",
end="\r",
)
yield value
file = open(PGN_DATABASE)
games = map(lambda _: chess.pgn.read_game(file), count())
games = filter(is_valid_game, games)
boards = flatten(map(game_to_boards, games))
boards = unique_everseen(boards, key=lambda x: x[0].fen())
boards = islice(boards, n)
boards = output_if_significant(boards)
boards, moves = zip(*boards)
boards = np.array(list(map(board_to_input, boards)))
# moves = np.array(list(map(move_to_output, moves)))
moves = np.array(list(map(move_to_complete_output, moves)))
file.close()
return boards, moves
if __name__ == "__main__":
loaded = load(200_000)
print("\nLoaded, saving...")
with open(NUMPY_FILE, "wb") as f:
np.savez_compressed(f, *loaded)
print(f"Saved to {NUMPY_FILE} (size: {f.tell() / 1024 / 1024:.2f} MB)")