-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpfifo.go
70 lines (61 loc) · 1.19 KB
/
pfifo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package queue
import (
"math"
"sync/atomic"
)
// Parallel FIFO engine implementation.
type pfifo struct {
pool []chan item
c, o, m uint64
}
func (e *pfifo) init(config *Config) error {
var inst uint64
if inst = uint64(config.Streams); inst == 0 {
inst = 1
}
cap_ := config.Capacity / inst
for i := uint64(0); i < inst; i++ {
e.pool = append(e.pool, make(chan item, cap_))
}
e.c, e.o, e.m = math.MaxUint64, math.MaxUint64, inst
return nil
}
func (e *pfifo) enqueue(itm *item, block bool) bool {
idx := atomic.AddUint64(&e.c, 1) % e.m
if !block {
select {
case e.pool[idx] <- *itm:
return true
default:
return false
}
}
e.pool[idx] <- *itm
return true
}
func (e *pfifo) dequeue() (item, bool) {
idx := atomic.AddUint64(&e.o, 1) % e.m
itm, ok := <-e.pool[idx]
return itm, ok
}
func (e *pfifo) dequeueSQ(_ uint32) (item, bool) {
return e.dequeue()
}
func (e *pfifo) size() (r int) {
for i := uint64(0); i < e.m; i++ {
r += len(e.pool[i])
}
return
}
func (e *pfifo) cap() (r int) {
for i := uint64(0); i < e.m; i++ {
r += cap(e.pool[i])
}
return
}
func (e *pfifo) close(_ bool) error {
for i := uint64(0); i < e.m; i++ {
close(e.pool[i])
}
return nil
}