-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindexer.go
371 lines (329 loc) · 9.69 KB
/
indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
package xs4go
import (
"bytes"
"fmt"
"strings"
"github.com/ninggf/xs4go/cmd"
"github.com/ninggf/xs4go/schema"
"github.com/ninggf/xs4go/server"
"github.com/ninggf/xs4go/tokenizer"
)
// Indexer indicates a index server
type Indexer struct {
conn *server.Connection
setting *schema.Setting
schema *schema.Schema
cfg *schema.Config
buffer *bytes.Buffer
tokenizer tokenizer.Tokenizer
bufferSize uint32
rebuilding bool
}
// NewIndexer creates a Indexer
func NewIndexer(conf string) (*Indexer, error) {
indexer := new(Indexer)
setting, err := schema.LoadConf(conf)
if err != nil {
return nil, err
}
indexer.setting = setting
indexer.cfg = setting.Conf
indexer.schema = setting.Schema
conn, err := server.NewConnection(setting.Conf.IndexServer)
if err != nil {
return nil, err
}
indexer.conn = conn
indexer.conn.SetTimeout(0)
return indexer.setProject(setting.Conf.Name)
}
// Schema of current indexer hold
func (indexer *Indexer) Schema() *schema.Schema {
return indexer.schema
}
// SetTokenizer sets your tokenizer
func (indexer *Indexer) SetTokenizer(tokenizer tokenizer.Tokenizer) {
if tokenizer != nil {
indexer.tokenizer = tokenizer
}
}
// SetDB to set custom database
func (indexer *Indexer) SetDB(db string) error {
cmdx := cmd.XsCommand{}
cmdx.Cmd = cmd.XS_CMD_INDEX_SET_DB
cmdx.Buf = db
_, err := indexer.conn.ExecOK(&cmdx, cmd.XS_CMD_OK_DB_CHANGED)
return err
}
// FlushLogging 强制刷新服务端当前项目的搜索日志
func (indexer *Indexer) FlushLogging() error {
cmdx := cmd.XsCommand{}
cmdx.Cmd = cmd.XS_CMD_FLUSH_LOGGING
_, err := indexer.conn.ExecOK(&cmdx, cmd.XS_CMD_OK_LOG_FLUSHED)
return err
}
// FlushIndex 强制刷新服务端当前项目的搜索日志
func (indexer *Indexer) FlushIndex() error {
cmdx := cmd.XsCommand{}
cmdx.Cmd = cmd.XS_CMD_INDEX_COMMIT
_, err := indexer.conn.ExecOK(&cmdx, cmd.XS_CMD_OK_DB_COMMITED)
return err
}
// Add document to index server
func (indexer *Indexer) Add(doc map[string]string) error {
return indexer.update(doc, true)
}
// Update document by id on index server
func (indexer *Indexer) Update(doc map[string]string) error {
return indexer.update(doc, false)
}
// Del deletes Document from index server
func (indexer *Indexer) Del(terms ...string) error {
return indexer.DelByField("", terms...)
}
// DelByField deletes by field from index server
func (indexer *Indexer) DelByField(field string, terms ...string) error {
if field == "" {
field = indexer.schema.StrId
}
idField, ok := indexer.schema.FieldMetas[field]
if !ok {
return fmt.Errorf("field '%s' is not defined", field)
}
ln := len(terms)
var cmdx *cmd.XsCommand
if ln == 1 {
cmdx = cmd.NewCommand2(cmd.XS_CMD_INDEX_REMOVE, 0, idField.Vno, strings.ToLower(terms[0]), "")
} else {
buf := bytes.NewBuffer([]byte{})
for _, te := range terms {
cmds := cmd.NewCommand2(cmd.XS_CMD_INDEX_REMOVE, 0, idField.Vno, strings.ToLower(te), "")
buf.Write(cmds.Encode(false)[:])
}
bufStr := string(buf.Bytes())
cmdx = cmd.NewCommand(cmd.XS_CMD_INDEX_EXDATA, 0, bufStr, "")
}
_, err := indexer.conn.ExecOK(cmdx, cmd.XS_CMD_OK_RQST_FINISHED)
return err
}
// AddSynonym adds 添加同义词
func (indexer *Indexer) AddSynonym(word string, synonyms ...string) error {
if word == "" || len(synonyms) == 0 {
return nil
}
for _, synonym := range synonyms {
if synonym == "" {
continue
}
cmdx := cmd.NewCommand2(cmd.XS_CMD_INDEX_SYNONYMS, cmd.XS_CMD_INDEX_SYNONYMS_ADD, 0, word, synonym)
if err := indexer.bufferExec(cmdx, cmd.XS_CMD_OK_RQST_FINISHED); err != nil {
return err
}
}
return nil
}
// DelSynonym delete synonym on index server
func (indexer *Indexer) DelSynonym(word string, synonyms ...string) error {
if word == "" {
return nil
}
if len(synonyms) == 0 {
cmdx := cmd.NewCommand2(cmd.XS_CMD_INDEX_SYNONYMS, cmd.XS_CMD_INDEX_SYNONYMS_DEL, 0, word, "")
return indexer.bufferExec(cmdx, cmd.XS_CMD_OK_RQST_FINISHED)
}
for _, synonym := range synonyms {
if synonym == "" {
continue
}
cmdx := cmd.NewCommand2(cmd.XS_CMD_INDEX_SYNONYMS, cmd.XS_CMD_INDEX_SYNONYMS_DEL, 0, word, synonym)
if err := indexer.bufferExec(cmdx, cmd.XS_CMD_OK_RQST_FINISHED); err != nil {
return err
}
}
return nil
}
// OpenBuffer open buffer for improving performance
func (indexer *Indexer) OpenBuffer(size uint32) error {
if size > 32 {
size = 32
}
if indexer.buffer != nil {
if err := indexer.flushBuffer(); err != nil {
indexer.buffer = nil
indexer.bufferSize = 0
return err
}
if indexer.bufferSize == (size << 20) {
return nil
}
}
if size > 0 {
indexer.bufferSize = 30 //(size << 20)
indexer.buffer = bytes.NewBuffer(make([]byte, 0, indexer.bufferSize))
} else {
indexer.bufferSize = 0
indexer.buffer = nil
}
return nil
}
// Submit submits buffer to index server and checks the return
func (indexer *Indexer) Submit() error {
return indexer.OpenBuffer(0)
}
// Clean index database. 如果当前数据库处于重建过程中将禁止清空
func (indexer *Indexer) Clean() error {
cmdx := cmd.NewCommand(cmd.XS_CMD_INDEX_CLEAN_DB, 0, "", "")
_, err := indexer.conn.ExecOK(cmdx, cmd.XS_CMD_OK_DB_CLEAN)
return err
}
// BeginRebuild 开始重建索引
// 此后所有的索引更新指令将写到临时库, 而不是当前搜索库,
// 重建完成后调用 {EnRebuild} 实现平滑重建索引, 重建过程仍可搜索旧的索引库,
// 如直接用 Clean 清空数据, 则会导致重建过程搜索到不全的数据
func (indexer *Indexer) BeginRebuild() error {
cmdx := cmd.XsCommand{cmd.XS_CMD_INDEX_REBUILD, 0, 0, "", ""}
if _, err := indexer.conn.ExecOK(&cmdx, cmd.XS_CMD_OK_DB_REBUILD); err != nil {
return err
}
indexer.rebuilding = true
return nil
}
// EndRebuild 完成并关闭重建索引
// 重建完成后调用, 用重建好的索引数据代替旧的索引数据
func (indexer *Indexer) EndRebuild() error {
cmdx := cmd.XsCommand{cmd.XS_CMD_INDEX_REBUILD, 1, 0, "", ""}
_, err := indexer.conn.ExecOK(&cmdx, cmd.XS_CMD_OK_DB_REBUILD)
return err
}
// StopRebuild 中止索引重建
// 丢弃重建临时库的所有数据, 恢复成当前搜索库, 主要用于偶尔重建意外中止的情况
func (indexer *Indexer) StopRebuild() error {
cmdx := cmd.XsCommand{cmd.XS_CMD_INDEX_REBUILD, 2, 0, "", ""}
_, err := indexer.conn.ExecOK(&cmdx, cmd.XS_CMD_OK_DB_REBUILD)
return err
}
// Close connection
func (indexer *Indexer) Close() {
if indexer.conn != nil {
if indexer.buffer != nil {
indexer.flushBuffer()
}
indexer.conn.Close()
indexer.conn = nil
}
}
func (indexer *Indexer) setProject(project string) (*Indexer, error) {
cmdx := cmd.UseProjectCmd(project)
_, err := indexer.conn.ExecOK(cmdx, cmd.XS_CMD_OK_PROJECT)
if err != nil {
indexer.conn.Close()
indexer.conn = nil
indexer.cfg = nil
return nil, err
}
var tokenizer tokenizer.Tokenizer = tokenizer.DefaultTokenizer{"default"}
indexer.tokenizer = tokenizer
return indexer, nil
}
func (indexer *Indexer) update(doc map[string]string, add bool) error {
idField := indexer.setting.Schema.Id // id
key, ok := doc[idField.Name]
// check primary key of document
if !ok || key == "" {
return fmt.Errorf("Missing value of primary key (FIELD:%s)", idField.Name)
}
// request cmd
cmdx := &cmd.XsCommand{}
cmdx.Cmd = cmd.XS_CMD_INDEX_REQUEST
if add {
cmdx.Arg1 = cmd.XS_CMD_INDEX_REQUEST_ADD
} else {
cmdx.Arg1 = cmd.XS_CMD_INDEX_REQUEST_UPDATE
cmdx.Arg2 = idField.Vno
cmdx.Buf = key
}
cmds := make(map[int]*cmd.XsCommand)
cmds[len(cmds)] = cmdx
indexer.buildCmd(idField.Name, idField, doc, cmds)
for f, v := range indexer.schema.FieldMetas {
if v.Type == "id" {
continue
}
indexer.buildCmd(f, v, doc, cmds)
}
for i := 0; i < len(cmds); i++ {
icmd := cmds[i]
_, err := indexer.conn.ExecOK(icmd, cmd.XS_CMD_NONE)
if err != nil {
return err
}
}
// todo: submit cmd
submitCmd := &cmd.XsCommand{}
submitCmd.Cmd = cmd.XS_CMD_INDEX_SUBMIT
_, err := indexer.conn.ExecOK(submitCmd, cmd.XS_CMD_OK_RQST_FINISHED)
return err
}
func (indexer *Indexer) buildCmd(f string, v *schema.FieldMeta, doc map[string]string, cmds map[int]*cmd.XsCommand) {
value, ok := doc[f]
// 索引操作
if ok && value != "" { //找到对应的值
varg := uint8(0)
if v.IsNumeric() {
varg = cmd.XS_CMD_VALUE_FLAG_NUMERIC
}
if v.HasIndex() { //启用索引
terms := indexer.tokenizer.GetTokens(value)
if len(terms) > 0 && v.HasIndexSelf() {
wdf := uint8(1)
if !v.IsBoolIndex() {
wdf = uint8(v.Weight | cmd.XS_CMD_INDEX_FLAG_CHECKSTEM)
}
for _, term := range terms {
if len(term) > 200 {
continue
}
term = strings.ToLower(term)
cmds[len(cmds)] = cmd.NewCommand2(cmd.XS_CMD_DOC_TERM, wdf, v.Vno, term, "")
}
}
if len(terms) > 0 && v.HasIndexMixed() {
mtext := strings.Join(terms, " ")
cmds[len(cmds)] = cmd.NewCommand2(cmd.XS_CMD_DOC_INDEX, uint8(v.Weight), schema.MIXED_VNO, mtext, "")
}
}
// add value
cmds[len(cmds)] = cmd.NewCommand2(cmd.XS_CMD_DOC_VALUE, varg, v.Vno, value, "")
}
// TODO: process add terms
// todo: process add text
}
func (indexer *Indexer) bufferExec(cmdx *cmd.XsCommand, resArg uint16) error {
if indexer.buffer != nil {
ln := indexer.buffer.Len()
buf := cmdx.Encode(false)
ln1 := len(buf)
if uint32(ln1+ln) > indexer.bufferSize {
err := indexer.flushBuffer()
if err != nil {
return err
}
}
indexer.buffer.Write(buf[:])
return nil
}
_, err := indexer.conn.ExecOK(cmdx, resArg)
return err
}
func (indexer *Indexer) flushBuffer() error {
if indexer.buffer != nil {
buf := string(indexer.buffer.Bytes())
if buf != "" {
cmdx := cmd.NewCommand(cmd.XS_CMD_INDEX_EXDATA, 0, buf, "")
_, err := indexer.conn.ExecOK(cmdx, cmd.XS_CMD_OK_RQST_FINISHED)
indexer.buffer.Reset()
return err
}
}
return nil
}