-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipepool.go
156 lines (127 loc) · 3.75 KB
/
pipepool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package pipepool
import (
"sync"
)
// PipePool 管道池
type PipePool[E any, C comparable] struct {
pool *Pool // stacking pool 堆积池
mPipesMutex sync.Mutex // mutex lock for map of pipes管道池并发锁
mPipes map[C]*Pipe[E] // map of pipes 管道池
categorizeByFunc func(E) C // categorize by function 分类函数
}
// New create new pipe pool 创建管道池
//
// Generic type E represents the type of the elements in the pipe.
// Generic type C represents the type of the categories of the pipe.
//
// Pass in bufferSize to control the size of the pool, and pass in categorizeByFunc to control the categorization of the pipe.
func New[E any, C comparable](bufferSize int64, categorizeBy func(E) C) *PipePool[E, C] {
return &PipePool[E, C]{
pool: NewPool(bufferSize),
mPipes: make(map[C]*Pipe[E]),
categorizeByFunc: categorizeBy,
}
}
func (pp *PipePool[E, C]) categorize(e E) {
pp.mPipesMutex.Lock()
defer pp.mPipesMutex.Unlock()
category := pp.categorizeByFunc(e)
pipe, ok := pp.mPipes[category]
if !ok {
pipe = NewPipe[E]()
pp.mPipes[category] = pipe
}
pipe.push(PipeElement[E]{element: e})
}
// Push push element to the pipe pool 向管道池推入元素
func (pp *PipePool[E, C]) Push(e E) {
pp.pool.Acquire()
pp.categorize(e)
}
// Categories returns the categories of the pipe pool 管道池的分类
func (pp *PipePool[E, C]) Categories() []C {
pp.mPipesMutex.Lock()
defer pp.mPipesMutex.Unlock()
categories := make([]C, 0, len(pp.mPipes))
for category := range pp.mPipes {
categories = append(categories, category)
}
return categories
}
// PipeQueuedCount returns the number of elements in the pipe pool 管道池中的元素数量
func (pp *PipePool[E, C]) PipeQueuedCount(category C) int {
pp.mPipesMutex.Lock()
defer pp.mPipesMutex.Unlock()
pipe, ok := pp.mPipes[category]
if !ok {
return 0
}
return pipe.Len()
}
// Total count of slots of pool 池完整大小
func (pp *PipePool[E, C]) TotalCount() int64 {
return pp.pool.TotalCount
}
// Available slots of pool 剩余池大小
func (pp *PipePool[E, C]) AvailableCount() int64 {
return pp.pool.AvailableCount()
}
// PipePoolTransaction transaction of pipe pool 管道池操作事务
type PipePoolTransaction[E any, C comparable] struct {
Category C // category of pipe 管道分类
pipePool *PipePool[E, C] // pipe pool 管道池实例
pipeTx *PipeTransaction[E] // pipe transaction 管道操作事务
}
// Begin begin transaction 开始事务
// returns a transaction object that can be used to pull, commit or rollback the transaction
func (pp *PipePool[E, C]) Begin() *PipePoolTransaction[E, C] {
var foundCategory C
var foundPipe *Pipe[E]
var foundPipeTx *PipeTransaction[E]
for {
pp.mPipesMutex.Lock()
var found bool
for category, v := range pp.mPipes {
if v.IsBlocking() {
continue
}
if v.Len() > 0 {
foundCategory = category
foundPipe = v
found = true
foundPipeTx = foundPipe.Begin()
break
}
}
pp.mPipesMutex.Unlock()
if found {
break
}
}
return &PipePoolTransaction[E, C]{
pipePool: pp,
Category: foundCategory,
pipeTx: foundPipeTx,
}
}
// Pull pull element from transaction 取出事务中的元素
func (tx *PipePoolTransaction[E, C]) Pull() E {
element := tx.pipeTx.Pull()
return element
}
// Rollback rollback transaction 回滚事务
func (tx *PipePoolTransaction[E, C]) Rollback() {
tx.pipePool.mPipesMutex.Lock()
defer tx.pipePool.mPipesMutex.Unlock()
tx.pipeTx.Rollback()
}
// Commit commit transaction 提交事务
func (tx *PipePoolTransaction[E, C]) Commit() {
tx.pipeTx.Commit()
tx.pipePool.pool.Release()
if tx.pipeTx.pipe.Len() == 0 {
tx.pipePool.mPipesMutex.Lock()
defer tx.pipePool.mPipesMutex.Unlock()
delete(tx.pipePool.mPipes, tx.Category)
}
}