Skip to content

Commit 302ac39

Browse files
Merge pull request #38 from a-mma/vec_queue_optimizations
vector queue added
2 parents 3181261 + a704472 commit 302ac39

File tree

3 files changed

+184
-66
lines changed

3 files changed

+184
-66
lines changed

src/core/faissclient/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ module.exports = {
354354
keys: vec_ids_
355355
},
356356
function(err, resp) {
357+
console.log(err, resp)
357358
if (!err) {
358359
var doc_ids_ = [];
359360
for (let i = 0; i < resp.rows.length; i++) {

src/hannoy/index.py

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,20 @@
33
import yaml
44
import os
55

6+
import threading
7+
import queue
8+
import time
9+
610
model_location = '/data/model_ha'
711

812
class Annoy:
913
def __init__(self):
10-
self.total = 0
14+
# to keep the thread & queue running
15+
self.process_flag = True
16+
self.q_maxsize = 10100
17+
self.process_thread = None
18+
self._lock = threading.Lock()
19+
self.process_timeout_sec = 5 # seconds
1120
# this is to keep track of all vectors inserted
1221
# for saving into disk and retrieve later
1322
self.index_disk = None
@@ -21,53 +30,97 @@ def __init__(self):
2130
except Exception as e:
2231
print('Error initializing Annoy: ', e)
2332

33+
# spawn process thread
34+
self.spawn()
35+
36+
def __del__(self):
37+
self.process_flag = False
38+
if self.process_thread:
39+
self.process_thread.join()
40+
41+
def spawn (self):
42+
# create pipeline to add documents
43+
self.pipeline = queue.Queue(maxsize=self.q_maxsize)
44+
# create process thread
45+
self.process_thread = threading.Thread(target=self.process, args=(), daemon=True)
46+
# start process thread
47+
self.process_thread.start()
48+
# return self.pipeline
49+
2450
def initAnnoy(self):
2551
# only do if no index loaded from disk
2652
if not self.modelLoaded:
2753
print('Annoy init index')
2854
self.a_index = AnnoyIndex(self.dim, self.sim_metric)
2955

30-
# build index
31-
build_ = self.a_index.build(self.n_trees)
56+
# Lock index read / wtite until it is built
57+
with self._lock:
58+
# build index
59+
build_ = self.a_index.build(self.n_trees)
60+
61+
if build_:
62+
self.modelLoaded = self.saveModelToDisk()
3263

33-
if build_:
34-
self.modelLoaded = self.saveModelToDisk()
3564
return self.modelLoaded
3665

3766
def addVectors(self, documents):
38-
# unbuild index first
39-
self.a_index.unbuild()
40-
self.total = self.total + len(documents)
4167
ids = []
4268
# add vectors
4369
for document in documents:
44-
_id = document._id
45-
vec = document.vector
46-
ids.append(_id)
47-
vector_e = vec.e
48-
vector_e_l = len(vector_e)
49-
# check if the vector length is below dimention limit
50-
# then pad vector with 0 by dimension
51-
if vector_e_l < self.dim:
52-
vector_e.extend([0]*(self.dim-vector_e_l))
53-
# make sure vector length doesn't exceed dimension limit
54-
vector_e = vector_e[:self.dim]
55-
56-
# add vector
57-
self.a_index.add_item(int(_id), vector_e)
58-
# keep a copy for disk storage
59-
list_ = vector_e
60-
list_.append(int(_id))
61-
if self.index_disk is None:
62-
self.index_disk = np.array([list_], dtype=float)
63-
else:
64-
self.index_disk = np.append(self.index_disk, [list_], axis=0)
65-
66-
# build vector
67-
build_ = self.a_index.build(self.n_trees)
68-
if build_:
69-
self.modelLoaded = self.saveModelToDisk()
70-
return self.modelLoaded, ids
70+
# add document to queue
71+
self.pipeline.put_nowait(document)
72+
ids.append(document._id)
73+
return True, ids
74+
75+
def process(self):
76+
while (self.process_flag):
77+
# print(list(self.pipeline.queue))
78+
79+
# set a timeout till next vector indexing
80+
time.sleep(self.process_timeout_sec)
81+
82+
# check if queue is not empty
83+
if self.pipeline.qsize() > 0:
84+
# Lock index read / wtite until it is built
85+
with self._lock:
86+
87+
# unbuild index first
88+
self.a_index.unbuild()
89+
90+
# fetch all currently available documents from queue
91+
while not self.pipeline.empty():
92+
# extract document & contents
93+
document = self.pipeline.get_nowait()
94+
_id = document._id
95+
vec = document.vector
96+
vector_e = vec.e
97+
98+
# resize vectors
99+
vector_e_l = len(vector_e)
100+
# check if the vector length is below dimention limit
101+
# then pad vector with 0 by dimension
102+
if vector_e_l < self.dim:
103+
vector_e.extend([0]*(self.dim-vector_e_l))
104+
# make sure vector length doesn't exceed dimension limit
105+
vector_e = vector_e[:self.dim]
106+
107+
# add vector to index
108+
self.a_index.add_item(int(_id), vector_e)
109+
# keep a copy for disk storage
110+
list_ = vector_e
111+
list_.append(int(_id))
112+
# append to disk proxy
113+
if self.index_disk is None:
114+
self.index_disk = np.array([list_], dtype=float)
115+
else:
116+
self.index_disk = np.append(self.index_disk, [list_], axis=0)
117+
118+
# build vector
119+
build_ = self.a_index.build(self.n_trees)
120+
121+
# write to disk
122+
if build_:
123+
self.modelLoaded = self.saveModelToDisk()
71124

72125
def deleteVectors(self, ids):
73126

@@ -77,10 +130,12 @@ def getNearest(self, matrix, k):
77130
ids = []
78131
dists = []
79132

80-
for vec_data in matrix:
81-
_id, _dist = self.a_index.get_nns_by_vector(vec_data, k, include_distances=True)
82-
ids.append(_id)
83-
dists.append(_dist)
133+
# Lock index read / wtite until nearest neighbor search
134+
with self._lock:
135+
for vec_data in matrix:
136+
_id, _dist = self.a_index.get_nns_by_vector(vec_data, k, include_distances=True)
137+
ids.append(_id)
138+
dists.append(_dist)
84139

85140
return True, ids, dists
86141

src/hfaiss/index.py

Lines changed: 89 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import numpy as np
22
import faiss
33

4+
import threading
5+
import queue
6+
import time
7+
48
model_location = '/data/model_hf'
59

610
class Faiss:
@@ -13,6 +17,30 @@ def __init__(self):
1317
self.modelLoaded = self.loadModelFromDisk(model_location)
1418
self.is_initiated = self.modelLoaded
1519

20+
# to keep the thread & queue running
21+
self.process_flag = True
22+
self.q_maxsize = 10100
23+
self.process_thread = None
24+
self._lock = threading.Lock()
25+
self.process_timeout_sec = 5 # seconds
26+
27+
# spawn process thread
28+
self.spawn()
29+
30+
def __del__(self):
31+
self.process_flag = False
32+
if self.process_thread:
33+
self.process_thread.join()
34+
35+
def spawn (self):
36+
# create pipeline to add documents
37+
self.pipeline = queue.Queue(maxsize=self.q_maxsize)
38+
# create process thread
39+
self.process_thread = threading.Thread(target=self.process, args=(), daemon=True)
40+
# start process thread
41+
self.process_thread.start()
42+
# return self.pipeline
43+
1644
def initFaiss(self, nlist, nprobe, bytesPerVec, bytesPerSubVec, dim, matrix):
1745
self.nlist = nlist
1846
self.nprobe = nprobe
@@ -23,14 +51,18 @@ def initFaiss(self, nlist, nprobe, bytesPerVec, bytesPerSubVec, dim, matrix):
2351
self.train_data = np.matrix(matrix).astype('float32')
2452
print('FAISS init quantizer', self.train_data, self.train_data.shape)
2553
self.f_quantizer = faiss.IndexFlatL2(self.dim)
26-
print('FAISS init index')
27-
self.f_index = faiss.IndexIVFPQ(self.f_quantizer, self.dim, self.nlist, self.bytesPerVec, self.bytesPerSubVec)
28-
print('FAISS train index')
29-
self.f_index.train(self.train_data)
30-
print('FAISS train index finished')
54+
# Lock index read / wtite until it is built
55+
with self._lock:
56+
print('FAISS init index')
57+
self.f_index = faiss.IndexIVFPQ(self.f_quantizer, self.dim, self.nlist, self.bytesPerVec, self.bytesPerSubVec)
58+
print('FAISS train index')
59+
self.f_index.train(self.train_data)
60+
print('FAISS train index finished')
3161

32-
self.modelLoaded = self.saveModelToDisk(model_location, self.f_index)
62+
# write index to disk
63+
self.modelLoaded = self.saveModelToDisk(model_location, self.f_index)
3364
self.is_initiated = self.modelLoaded
65+
3466
return self.is_initiated
3567

3668
def isInitiated(self):
@@ -39,11 +71,11 @@ def isInitiated(self):
3971
def loadModelFromDisk(self, location):
4072
try:
4173
# read index
42-
self.f_index = read_index(location)
74+
self.f_index = faiss.read_index(location)
4375
print('FAISS index loading success')
4476
return True
45-
except:
46-
print('FAISS index loading failed')
77+
except Exception as e:
78+
print('FAISS index loading failed', e)
4779
return False
4880

4981
def saveModelToDisk(self, location, index):
@@ -58,32 +90,62 @@ def saveModelToDisk(self, location, index):
5890

5991
def addVectors(self, documents):
6092
ids = []
61-
vecs = []
93+
# add vectors
6294
for document in documents:
63-
_id = document._id
64-
vec = document.vector
65-
ids.append(_id)
66-
vector_e = vec.e
67-
vector_e_l = len(vector_e)
68-
# check if the vector length is below dimention limit
69-
# then pad vector with 0 by dimension
70-
if vector_e_l < self.dim:
71-
vector_e.extend([0]*(self.dim-vector_e_l))
72-
# make sure vector length doesn't exceed dimension limit
73-
vecs.append(vector_e[:self.dim])
74-
# convert to np matrix
75-
vec_data = np.matrix(vecs).astype('float32')
76-
id_data = np.array(ids).astype('int')
77-
# add vector
78-
self.f_index.add_with_ids(vec_data, id_data)
95+
# add document to queue
96+
self.pipeline.put_nowait(document)
97+
ids.append(document._id)
7998
return True, ids
8099

100+
def process(self):
101+
while (self.process_flag):
102+
# print(list(self.pipeline.queue))
103+
104+
# set a timeout till next vector indexing
105+
time.sleep(self.process_timeout_sec)
106+
107+
# check if queue is not empty
108+
if self.pipeline.qsize() > 0:
109+
ids = []
110+
vecs = []
111+
112+
# fetch all currently available documents from queue
113+
while not self.pipeline.empty():
114+
# extract document & contents
115+
document = self.pipeline.get_nowait()
116+
_id = document._id
117+
vec = document.vector
118+
ids.append(_id)
119+
vector_e = vec.e
120+
vector_e_l = len(vector_e)
121+
# check if the vector length is below dimention limit
122+
# then pad vector with 0 by dimension
123+
if vector_e_l < self.dim:
124+
vector_e.extend([0]*(self.dim-vector_e_l))
125+
# make sure vector length doesn't exceed dimension limit
126+
vecs.append(vector_e[:self.dim])
127+
128+
# convert to np matrix
129+
vec_data = np.matrix(vecs).astype('float32')
130+
id_data = np.array(ids).astype('int')
131+
132+
# Lock index read / wtite until it is built
133+
with self._lock:
134+
# add vector
135+
self.f_index.add_with_ids(vec_data, id_data)
136+
137+
# write to disk
138+
self.saveModelToDisk(model_location, self.f_index)
139+
81140
def deleteVectors(self, ids):
82141

83142
return True, ids
84143

85144
def getNearest(self, matrix, k):
86145
# convert to np matrix
87146
vec_data = np.matrix(matrix).astype('float32')
88-
D, I = self.f_index.search(vec_data, k)
147+
148+
# Lock index read / wtite until nearest neighbor search
149+
with self._lock:
150+
D, I = self.f_index.search(vec_data, k)
89151
return True, I.tolist(), D.tolist()

0 commit comments

Comments
 (0)