-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathaggregator.go
88 lines (75 loc) · 2 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package gopark
import (
"sort"
)
// We need the []interface{} type for the gob to work
type CombinerCreator func(interface{}) []interface{}
type CombinerMerger func([]interface{}, []interface{}) []interface{}
type ValueMerger func([]interface{}, interface{}) []interface{}
type _Aggregator struct {
combinerCreator CombinerCreator
combinerMerger CombinerMerger
valueMerger ValueMerger
}
func newMergeAggregator() *_Aggregator {
a := &_Aggregator{}
a.combinerCreator = func(x interface{}) []interface{} {
return []interface{}{x}[:]
}
a.combinerMerger = func(x, y []interface{}) []interface{} {
return append(x, y...)
}
a.valueMerger = func(x []interface{}, y interface{}) []interface{} {
return append(x, y)
}
return a
}
type Partitioner interface {
numPartitions() int
getPartition(key interface{}) int
}
type _HashPartitioner struct {
partitions int
}
func (p *_HashPartitioner) numPartitions() int {
return p.partitions
}
func (p *_HashPartitioner) getPartition(key interface{}) int {
hashCode := hashCode(key)
return int(hashCode % int64(p.partitions))
}
func newHashPartitioner(partitions int) Partitioner {
p := &_HashPartitioner{}
if partitions < 1 {
p.partitions = 1
} else {
p.partitions = partitions
}
return p
}
type _RangePartitioner struct {
keys []interface{}
reverse bool
fn KeyLessFunc
}
func (p *_RangePartitioner) numPartitions() int {
return len(p.keys) + 1
}
func (p *_RangePartitioner) getPartition(key interface{}) int {
index := sort.Search(len(p.keys), func(i int) bool {
return !p.fn(p.keys[i], key)
})
if !p.reverse {
return index
}
return len(p.keys) - index
}
func newRangePartitioner(fn KeyLessFunc, keys []interface{}, reverse bool) Partitioner {
p := &_RangePartitioner{}
p.fn = fn
p.reverse = reverse
sorter := NewParkSorter(keys, fn)
sort.Sort(sorter)
p.keys = keys
return p
}