-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathkeys_in_index.go
108 lines (89 loc) · 2.67 KB
/
keys_in_index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package goriak
import (
"errors"
riak "github.com/basho/riak-go-client"
)
// SecondaryIndexQueryResult is the items sent to the callback function
// used by KeysInIndex
type SecondaryIndexQueryResult struct {
Key string
IsComplete bool
}
type CommandKeysInIndex struct {
c *Command
builder *riak.SecondaryIndexQueryCommandBuilder
}
type KeysInIndexResult struct {
Continuation []byte
}
func (c *Command) commonIndexBuilder(indexName string, callback func(SecondaryIndexQueryResult)) *riak.SecondaryIndexQueryCommandBuilder {
cb := func(res []*riak.SecondaryIndexQueryResult) error {
if len(res) == 0 {
callback(SecondaryIndexQueryResult{
Key: "",
IsComplete: true,
})
}
for _, i := range res {
callback(SecondaryIndexQueryResult{
Key: string(i.ObjectKey),
IsComplete: false,
})
}
return nil
}
return riak.NewSecondaryIndexQueryCommandBuilder().
WithBucket(c.bucket).
WithBucketType(c.bucketType).
WithIndexName(indexName).
WithStreaming(true).
WithCallback(cb)
}
// KeysInIndex returns all keys in the index indexName that has the value indexValue
// The values will be returned to the callbak function
// When all keys have been returned SecondaryIndexQueryResult.IsComplete will be true
func (c *Command) KeysInIndex(indexName, indexValue string, callback func(SecondaryIndexQueryResult)) *CommandKeysInIndex {
builder := c.commonIndexBuilder(indexName, callback)
builder = builder.WithIndexKey(indexValue)
return &CommandKeysInIndex{
c: c,
builder: builder,
}
}
// KeysInIndexRange is similar to KeysInIndex(), but works with with a range of index values
func (c *Command) KeysInIndexRange(indexName, min, max string, callback func(SecondaryIndexQueryResult)) *CommandKeysInIndex {
builder := c.commonIndexBuilder(indexName, callback)
builder = builder.WithRange(min, max)
return &CommandKeysInIndex{
c: c,
builder: builder,
}
}
// Limit sets the limit returned in KeysInIndex
// A limit of 0 means unlimited
func (c *CommandKeysInIndex) Limit(limit uint32) *CommandKeysInIndex {
c.builder.WithMaxResults(limit)
return c
}
func (c *CommandKeysInIndex) IndexContinuation(continuation []byte) *CommandKeysInIndex {
c.builder.WithContinuation(continuation)
return c
}
func (c *CommandKeysInIndex) Run(session *Session) (*KeysInIndexResult, error) {
// Build it!
cmd, err := c.builder.Build()
if err != nil {
return nil, err
}
err = session.riak.Execute(cmd)
if err != nil {
return nil, err
}
res := cmd.(*riak.SecondaryIndexQueryCommand)
if !res.Success() {
return nil, errors.New("not successful")
}
return &KeysInIndexResult{
Continuation: res.Response.Continuation,
}, nil
}