-
Notifications
You must be signed in to change notification settings - Fork 0
/
queued_evaluator.py
44 lines (33 loc) · 1.3 KB
/
queued_evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from queue import Queue
from network import load_or_create_model
import numpy as np
from threading import Event
class QueuedEvaluator:
def __init__(self, target_queue: Queue, name):
self.result_queue = Queue()
self.target_queue = target_queue
self.name = name
def predict(self, features):
self.target_queue.put((self.result_queue, features))
prediction = self.result_queue.get()
return prediction
class MultiplexingEvaluator:
def __init__(self, model_name, batch_size):
self.model_name = model_name
self.batch_size = batch_size
self.model_loaded = Event()
def run(self, input_queue):
model = load_or_create_model(self.model_name)
self.name = model.name
self.model_loaded.set()
features_board = np.zeros(((self.batch_size, 8, 8, 19)), np.int8)
while True:
response_queues = []
for i in range(self.batch_size):
request = input_queue.get()
response_queues.append(request[0])
features = request[1]
features_board[i] = features[0]
predictions = model.predict([features_board])
for i in range(self.batch_size):
response_queues[i].put([predictions[0][i], predictions[1][i]])