-
Notifications
You must be signed in to change notification settings - Fork 1
/
count_min_sketch.go
218 lines (197 loc) · 6.44 KB
/
count_min_sketch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/*
Implements probabilistic data structure used in estimating count.
Count-Min Sketch: A probabilistic data structure used to estimate the frequency
of items in a data stream. Refer: http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf
The package implements both in-mem and Redis backed solutions for the data structures. The
in-memory data structures are thread-safe.
*/
package gostatix
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math"
"sync"
)
// CountMinSketch struct. This is an in-memory implementation of Count-Min Sketch.
// It's mainly governed by a 2-d slice _matrix_ which holds the count of hashed items
// at different hashed locations
// _lock_ is used to synchronize concurrent read/writes
type CountMinSketch struct {
AbstractCountMinSketch
matrix [][]uint64
lock sync.RWMutex
}
// NewCountMinSketch creates CountMinSketch with _rows_ and _columns_
func NewCountMinSketch(rows, columns uint) (*CountMinSketch, error) {
if rows <= 0 || columns <= 0 {
return nil, fmt.Errorf("gostatix: rows and columns size should be greater than 0")
}
abstractSketch := makeAbstractCountMinSketch(rows, columns, 0)
matrix := make([][]uint64, rows)
for i := range matrix {
matrix[i] = make([]uint64, columns)
}
sketch := &CountMinSketch{AbstractCountMinSketch: *abstractSketch, matrix: matrix}
return sketch, nil
}
// NewCountMinSketchFromEstimates creates a new CountMinSketch based upon the desired
// _errorRate_ and _delta_
// rows and columns are calculated based upon these supplied values
func NewCountMinSketchFromEstimates(errorRate, delta float64) (*CountMinSketch, error) {
columns := uint(math.Ceil(math.E / errorRate))
rows := uint(math.Ceil(math.Log(1 / delta)))
return NewCountMinSketch(rows, columns)
}
// UpdateOnce increments the count of _data_ in Count-Min Sketch by 1
func (cms *CountMinSketch) UpdateOnce(data []byte) {
cms.Update(data, 1)
}
// Update increments the count of _data_ (byte slice) in Count-Min Sketch by value _count_ passed
func (cms *CountMinSketch) Update(data []byte, count uint64) {
cms.lock.Lock()
defer cms.lock.Unlock()
for r, c := range cms.getPositions(data) {
cms.matrix[r][c] += count
}
cms.allSum += count
}
// UpdateString increments the count of _data_ (string) in Count-Min Sketch by value _count_ passed
func (cms *CountMinSketch) UpdateString(data string, count uint64) {
cms.Update([]byte(data), count)
}
// Count estimates the count of the _data_ (byte slice) in the Count-Min Sketch data structure
func (cms *CountMinSketch) Count(data []byte) uint64 {
cms.lock.Lock()
defer cms.lock.Unlock()
var min uint64
for r, c := range cms.getPositions(data) {
if r == 0 || cms.matrix[r][c] < min {
min = cms.matrix[r][c]
}
}
return min
}
// CountString estimates the count of the _data_ (string) in the Count-Min Sketch data structure
func (cms *CountMinSketch) CountString(data string) uint64 {
return cms.Count([]byte(data))
}
// internal type used to marshal/unmarshal Count-Min Sketch
type countMinSketchJSON struct {
Rows uint `json:"r"`
Columns uint `json:"c"`
AllSum uint64 `json:"s"`
Matrix [][]uint64 `json:"m"`
Key string `json:"k"`
}
// Export JSON marshals the CountMinSketch and returns a byte slice containing the data
func (cms *CountMinSketch) Export() ([]byte, error) {
return json.Marshal(countMinSketchJSON{cms.rows, cms.columns, cms.allSum, cms.matrix, ""})
}
// Import JSON unmarshals the _data_ into the CountMinSketch
func (cms *CountMinSketch) Import(data []byte) error {
var s countMinSketchJSON
err := json.Unmarshal(data, &s)
if err != nil {
return err
}
cms.rows = s.Rows
cms.columns = s.Columns
cms.allSum = s.AllSum
cms.matrix = s.Matrix
return nil
}
// Equals checks if two CountMinSketch are equal
func (cms *CountMinSketch) Equals(cms1 *CountMinSketch) bool {
if cms.rows != cms1.rows && cms.columns != cms1.columns {
return false
}
for i := range cms.matrix {
for j := range cms.matrix[i] {
if cms.matrix[i][j] != cms1.matrix[i][j] {
return false
}
}
}
return true
}
// Merge merges two Count-Min Sketch data structures
func (cms *CountMinSketch) Merge(cms1 *CountMinSketch) error {
if cms.rows != cms1.rows {
return fmt.Errorf("gostatix: can't merge sketches with unequal row counts, %d and %d", cms.rows, cms1.rows)
}
if cms.columns != cms1.columns {
return fmt.Errorf("gostatix: can't merge sketches with unequal column counts, %d and %d", cms.columns, cms1.columns)
}
for i := range cms.matrix {
for j := range cms.matrix[i] {
cms.matrix[i][j] += cms1.matrix[i][j]
}
}
return nil
}
// WriteTo writes the CountMinSketch onto the specified _stream_ and returns the
// number of bytes written.
// It can be used to write to disk (using a file stream) or to network.
func (cms *CountMinSketch) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, uint64(cms.rows))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(cms.columns))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, cms.allSum)
if err != nil {
return 0, err
}
row := make([]uint64, cms.columns)
for r := uint(0); r < cms.rows; r++ {
for c := uint(0); c < cms.columns; c++ {
row[c] = cms.matrix[r][c]
}
err = binary.Write(stream, binary.BigEndian, row)
if err != nil {
return 0, err
}
}
return int64(3*binary.Size(uint64(0)) + int(cms.rows)*binary.Size(row)), nil
}
// ReadFrom reads the CountMinSketch from the specified _stream_ and returns the
// number of bytes read.
// It can be used to read from disk (using a file stream) or from network.
func (cms *CountMinSketch) ReadFrom(stream io.Reader) (int64, error) {
var rows, columns, allSum uint64
err := binary.Read(stream, binary.BigEndian, &rows)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &columns)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &allSum)
if err != nil {
return 0, err
}
cms.rows = uint(rows)
cms.columns = uint(columns)
cms.allSum = allSum
cms.matrix = make([][]uint64, cms.rows)
for r := uint(0); r < cms.rows; r++ {
cms.matrix[r] = make([]uint64, cms.columns)
}
row := make([]uint64, cms.columns)
for r := uint(0); r < cms.rows; r++ {
err = binary.Read(stream, binary.BigEndian, &row)
if err != nil {
return 0, err
}
for c := uint(0); c < cms.columns; c++ {
cms.matrix[r][c] = row[c]
}
}
return int64(2*binary.Size(uint64(0)) + int(cms.rows)*binary.Size(row)), nil
}