-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
221 lines (187 loc) · 5.66 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package promise
import (
"context"
"sync"
)
// PoolEventListener can be attached to a promise pool to listen for
// fulfillment and rejection events of the promises created and tracked by the
// pool. This can be used for logging or collecting values.
type PoolEventListener struct {
// OnFulfilled is called on each promise fulfillment.
OnFulfilled func(val Value)
// OnRejected is called on each promise rejection.
OnRejected func(err error)
}
// A Pool creates promises from a stream of promise factory funcs and
// supervises their resolution. It ensures that only a configurable number of
// promises will be resolved concurrently.
type Pool struct {
promise Promise
options PoolOptions
sem chan struct{}
done chan struct{}
result chan Result
fns <-chan func() Promise
mu sync.Mutex
listeners []*PoolEventListener // guarded by mu
}
// PoolOptions configure the behaviour of a promise pool.
type PoolOptions struct {
// ContinueOnError controls whether promise rejections will cause the pool
// to stop consuming more promise factory funcs or not. If true, rejections
// are ignored. This is useful if errors are handled by other means. The
// default is to abort on the first rejected promise.
ContinueOnError bool
}
// NewPool creates a new promise pool with given concurrency and channel which
// provides promise factory funcs. Negative concurrency values will cause a
// panic. Nil funcs or nil promises returned by the funcs from the channel will
// also cause panics when Run is called on the pool. Accepts optional pool
// options as the third argument.
func NewPool(concurrency int, fns <-chan func() Promise, opts ...PoolOptions) *Pool {
if concurrency <= 0 {
panic("promise.NewPool: concurrency must be greater than 0")
}
var options PoolOptions
if len(opts) > 0 {
options = opts[0]
}
return &Pool{
fns: fns,
sem: make(chan struct{}, concurrency),
done: make(chan struct{}),
result: make(chan Result),
options: options,
}
}
// Run starts the pool. This will consume the funcs from the channel provided
// to NewPool with the configured concurrency. It returns a promise which
// fulfills once the channel providing the promise factory funcs is closed. The
// promise rejects upon the first error encountered (unless ContinueOnError was
// set in the PoolOptions passed to NewPool) or if ctx is cancelled. Upon
// fulfillment, the promise's value will always be nil. Run must only be called
// once. Subsequent calls to it will panic.
func (p *Pool) Run(ctx context.Context) Promise {
if p.promise != nil {
panic("promise.Pool: promise pool cannot be started twice")
}
p.promise = New(func(resolve ResolveFunc, reject RejectFunc) {
defer close(p.done)
select {
case res := <-p.result:
if res.Err != nil {
reject(res.Err)
return
}
resolve(res.Value)
case <-ctx.Done():
reject(ctx.Err())
}
})
go p.run(ctx)
return p.promise
}
func (p *Pool) run(ctx context.Context) {
for {
select {
case <-p.done:
return
case fn, ok := <-p.fns:
if !ok {
// Fns channel was closed, we need to stop. By consuming all
// semaphores we make sure that all promises that are currently
// in flight resolved before we send the final result.
for i := 0; i < cap(p.sem); i++ {
p.sem <- struct{}{}
}
// If the pool promise was already rejected by an error we
// might leak a goroutine here when pushing the result into the
// channel as there is no consumer for it anymore. To avoid
// that, we also return if we are already done.
select {
case p.result <- Result{}:
return
case <-p.done:
return
}
}
// Wait for a semaphore before executing the promise factory func.
select {
case p.sem <- struct{}{}:
p.execute(fn)
case <-p.done:
// One of the promises that are currently in flight rejected or
// ctx was cancelled which in turn caused the pool promise to
// reject while waiting for sem. We must return here.
return
}
}
}
}
func (p *Pool) execute(fn func() Promise) {
fn().Then(func(val Value) Value {
p.dispatchFulfillment(val)
<-p.sem
return val
}).Catch(func(err error) Value {
p.dispatchRejection(err)
if !p.options.ContinueOnError {
// Use a select with default to prevent blocking in the case where a
// result was already sent by another catch handler. We would discard
// it anyways as the first error by any promise immediately rejects the
// pool promise. Also, we avoid leaking a goroutine by that.
select {
case p.result <- Result{Err: err}:
default:
}
}
<-p.sem
return err
})
}
func (p *Pool) dispatchFulfillment(val Value) {
p.mu.Lock()
listeners := p.listeners
p.mu.Unlock()
for _, l := range listeners {
if l.OnFulfilled != nil {
l.OnFulfilled(val)
}
}
}
func (p *Pool) dispatchRejection(err error) {
p.mu.Lock()
listeners := p.listeners
p.mu.Unlock()
for _, l := range listeners {
if l.OnRejected != nil {
l.OnRejected(err)
}
}
}
// AddEventListener adds listener to the pool. Will not add it again if
// listener is already present. Panics if listener is nil.
func (p *Pool) AddEventListener(listener *PoolEventListener) {
if listener == nil {
panic("promise.Pool: listener must be non-nil")
}
p.mu.Lock()
defer p.mu.Unlock()
for _, l := range p.listeners {
if l == listener {
return
}
}
p.listeners = append(p.listeners, listener)
}
// RemoveEventListener removes listener from the pool if it was present.
func (p *Pool) RemoveEventListener(listener *PoolEventListener) {
p.mu.Lock()
defer p.mu.Unlock()
for i, l := range p.listeners {
if l == listener {
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
return
}
}
}