-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsketch.go
215 lines (185 loc) · 5.27 KB
/
sketch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package topk
import (
"math"
"math/rand/v2"
"slices"
"sort"
"github.com/keilerkonzept/topk/heap"
"github.com/keilerkonzept/topk/internal/sizeof"
)
// Bucket is a single sketch counter together with the corresponding item's fingerprint.
type Bucket struct {
Fingerprint uint32
Count uint32
}
// Sketch is a top-k sketch.
// The entire structure is serializable using any serialization method - all fields and sub-structs are exported and can be reasonably serialized.
type Sketch struct {
K int // Keep track of top `K` items in the min-heap..
Width int // Number of buckets per hash function.
Depth int // Number of hash functions.
// `math.Pow(Decay, i)` is the probability that a flow's counter with value `i` is decremented on collision.
Decay float32
// Look-up table for powers of `Decay`. The value at `i` is `math.Pow(Decay, i)`
DecayLUT []float32
Buckets []Bucket // Sketch counters.
Heap *heap.Min // Top-K min-heap.
}
// New returns a sliding top-k sketch with the given `k` (number of top items to keep) and `windowSize` (in ticks).`
//
// - The depth defaults to `max(3, log(k))` unless the [WithDepth] option is set.
// - The width defaults to `max(256, k*log(k))` unless the [WithWidth] option is set.
// - The decay parameter defaults to 0.9 unless the [WithDecay] option is set.
// - The decay LUT size defaults to 256 unless the [WithDecayLUTSize] option is set.
func New(k int, opts ...Option) *Sketch {
log_k := int(math.Log(float64(k)))
k_log_k := int(float64(k) * math.Log(float64(k)))
// default settings
out := Sketch{
K: k,
Width: max(256, k_log_k),
Depth: max(3, log_k),
Decay: 0.9,
}
for _, o := range opts {
o(&out)
}
if len(out.DecayLUT) == 0 {
// if not specified, default to 256
out.DecayLUT = make([]float32, 256)
}
out.Heap = heap.NewMin(out.K)
out.initBuckets()
out.initDecayLUT()
return &out
}
func (me *Sketch) initDecayLUT() {
for i := range me.DecayLUT {
me.DecayLUT[i] = float32(math.Pow(float64(me.Decay), float64(i)))
}
}
func (me *Sketch) initBuckets() {
me.Buckets = make([]Bucket, me.Width*me.Depth)
}
// SizeBytes returns the current size of the sketch in bytes.
func (me *Sketch) SizeBytes() int {
bucketsSize := (sizeofBucketStruct) * len(me.Buckets)
heapSize := me.Heap.SizeBytes()
decayTableSize := len(me.DecayLUT) * sizeof.Float32
return sizeofSketchStruct +
bucketsSize +
heapSize +
decayTableSize
}
// Count returns the estimated count of the given item.
func (me *Sketch) Count(item string) uint32 {
if i := me.Heap.Find(item); i >= 0 {
b := me.Heap.Items[i]
if b.Item == item {
return b.Count
}
}
fingerprint := Fingerprint(item)
var maxCount uint32
for i := range me.Depth {
b := &me.Buckets[BucketIndex(item, i, me.Width)]
if b.Fingerprint != fingerprint {
continue
}
maxCount = max(maxCount, b.Count)
}
return maxCount
}
// Incr counts a single instance of the given item.
func (me *Sketch) Incr(item string) bool {
return me.Add(item, 1)
}
// Add increments the given item's count by the given increment.
// Returns whether the item is in the top K.
func (me *Sketch) Add(item string, increment uint32) bool {
var maxCount uint32
fingerprint := Fingerprint(item)
width := me.Width
for i := range me.Depth {
k := BucketIndex(item, i, width)
b := &me.Buckets[k]
count := b.Count
switch {
// empty bucket (zero count)
case count == 0:
b.Fingerprint = fingerprint
count = increment
b.Count = count
maxCount = max(maxCount, count)
// this flow's bucket (equal fingerprint)
case b.Fingerprint == fingerprint:
count += increment
b.Count = count
maxCount = max(maxCount, count)
// another flow's bucket (nonequal fingerprint)
default:
// can't be inlined, so not factored out
var decay float32
lookupTableSize := uint32(len(me.DecayLUT))
for incrementRemaining := increment; incrementRemaining > 0; incrementRemaining-- {
if count < lookupTableSize {
decay = me.DecayLUT[count]
} else {
decay =
float32(math.Pow(
float64(me.DecayLUT[lookupTableSize-1]),
float64(count/(lookupTableSize-1)))) * me.DecayLUT[count%(lookupTableSize-1)]
}
if rand.Float32() < decay {
count--
if count == 0 {
b.Fingerprint = fingerprint
count = incrementRemaining
maxCount = max(maxCount, count)
break
}
}
}
b.Count = count
}
}
return me.Heap.Update(item, fingerprint, maxCount)
}
// Query returns whether the given item is in the top K items by count.
func (me *Sketch) Query(item string) bool {
return me.Heap.Contains(item)
}
// Iter iterates over the top K items.
func (me *Sketch) Iter(yield func(*heap.Item) bool) {
for i := range me.Heap.Items {
if me.Heap.Items[i].Count == 0 {
continue
}
if !yield(&me.Heap.Items[i]) {
break
}
}
}
// SortedSlice returns the top K items as a sorted slice.
func (me *Sketch) SortedSlice() []heap.Item {
out := slices.Clone(me.Heap.Items)
sort.SliceStable(out, func(i, j int) bool {
ci, cj := out[i].Count, out[j].Count
if ci == cj {
return out[i].Item < out[j].Item
}
return ci > cj
})
end := len(out)
for ; end > 0; end-- {
if out[end-1].Count > 0 {
break
}
}
return out[:end]
}
// Reset resets the sketch to an empty state.
func (me *Sketch) Reset() {
clear(me.Buckets)
me.Heap.Reset()
}