-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworker.go
273 lines (245 loc) · 6.58 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package que
import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"golang.org/x/time/rate"
)
// NewWorker creates a worker instance by given opts.
// The Queue, Mutex and Perform of opts are required.
// Other options is set to a default value if they are invalid.
func NewWorker(opts WorkerOptions) (*Worker, error) {
if opts.Queue == "" {
return nil, errors.New("queue must not be empty string")
}
if opts.Mutex == nil {
return nil, errors.New("mutex must not be nil")
}
if opts.Perform == nil {
return nil, errors.New("perform must not be nil")
}
if opts.MaxLockPerSecond <= 0 {
opts.MaxLockPerSecond = 1
}
if opts.MaxPerformPerSecond <= 0 {
opts.MaxPerformPerSecond = 1
}
if opts.MaxConcurrentPerformCount <= 0 {
opts.MaxConcurrentPerformCount = 1
}
if opts.MaxBufferJobsCount < 0 {
opts.MaxBufferJobsCount = 0
}
performContext := opts.PerformContext
if performContext == nil {
performContext = context.Background()
}
var count = opts.MaxConcurrentPerformCount + opts.MaxBufferJobsCount
return &Worker{
queue: opts.Queue,
mutex: opts.Mutex,
performContext: performContext,
perform: opts.Perform,
maxConcurrentPerformCount: opts.MaxConcurrentPerformCount,
lockLimiter: newRateLimiter(opts.MaxLockPerSecond),
jobC: make(chan Job, count),
performLimiter: newRateLimiter(opts.MaxPerformPerSecond),
processed: make([]int64, 0, count),
}, nil
}
func newRateLimiter(perSecond float64) *rate.Limiter {
return rate.NewLimiter(rate.Limit(perSecond), 1)
}
// WorkerOptions is used for new a worker.
type WorkerOptions struct {
// A queue instance must only or not assign to a worker.
// When work stoped, inner queue is closed.
Queue string
Mutex Mutex
// MaxLockPerSecond is maximum frequency calls Lock() of Queue.
// Lower number uses lower database cpu.
MaxLockPerSecond float64
// MaxBufferJobsCount is maximum of jobs in chan that can't find
// a goroutine execute it.
MaxBufferJobsCount int
// Perform is user special logic function.
// context.Context is PerformContext.
//
// One of Done, Destroy and Expire method of Job must be called
// in Perform when execute successfully, if not, Perform will be executed forever.
//
// When Perform panic or returns an error, RetryIn will be auto executed according to PerformRetryPolicy.
// When PerformRetryPolicy says stop retry, Expire will be executed.
Perform func(context.Context, Job) error
PerformContext context.Context
// MaxPerformPerSecond is maximum frequency of Perform execution.
MaxPerformPerSecond float64
// MaxConcurrentPerformCount is maximum goroutine of Perform execution.
MaxConcurrentPerformCount int
}
// Worker locks jobs from Queue and executes Perform method according to given WorkerOptions.
//
// Run a worker:
//
// mutex := q.Mutex()
// w, err := que.NewWorker(que.WorkerOptions{
// Queue: "order.mail.register",
// Mutex: mutex,
// Perform: func(ctx context.Context, job que.Job) error {
// plan := job.Plan()
// args := decode(plan.Args)
// fmt.Println(args)
// // one of Done, Destroy, Expire, RetryAfter and RetryInPlan must be called.
// return job.Done(ctx)
// },
// })
// if err := w.Run(); err != nil {
// log.Println(err)
// }
//
// Reasonable quit worker execution:
//
// if err := w.Stop(context.Background()); err != nil {
// log.Println(err)
// }
type Worker struct {
queue string
mutex Mutex
perform func(context.Context, Job) error
performContext context.Context
maxConcurrentPerformCount int
lockLimiter *rate.Limiter
jobC chan Job
performLimiter *rate.Limiter
performRetryPolicy RetryPolicy
ongoing int32
mux sync.Mutex
processed []int64
stopped int32
}
// Run acutal starts goroutine to execute Perform method and locks jobs.
// Run blocked current caller's goroutine and returns error when get unsolved error.
// When Run() returned, any call to it get ErrWorkerStoped.
func (w *Worker) Run() error {
if w.isStopped() {
return ErrWorkerStoped
}
for i := 0; i < w.maxConcurrentPerformCount; i++ {
go w.work()
}
return w.lock()
}
func (w *Worker) lock() (err error) {
defer func() {
if !w.isStopped() {
w.Stop(context.Background())
}
}()
defer close(w.jobC)
var jobs []Job
for {
w.lockLimiter.Wait(context.Background())
if w.isStopped() {
return ErrWorkerStoped
}
ongoingCount := int(atomic.LoadInt32(&w.ongoing))
lockCount := cap(w.jobC) - len(w.jobC) - ongoingCount
if lockCount > 0 {
jobs, err = w.mutex.Lock(context.Background(), w.queue, lockCount)
if err != nil {
return err
}
for _, job := range jobs {
atomic.AddInt32(&w.ongoing, 1)
w.jobC <- job
}
}
err = w.unlockProcessed()
if err != nil {
return err
}
}
}
func (w *Worker) work() {
for {
job, ok := <-w.jobC
if !ok {
return
}
if w.isStopped() {
w.asProcessed(job.ID())
continue
}
w.performLimiter.Wait(context.Background())
w.tryPerform(job)
}
}
func (w *Worker) tryPerform(job Job) {
defer w.asProcessed(job.ID())
var err error
defer func() {
e := recover()
if e != nil {
err = fmt.Errorf("%v", e)
}
if err != nil {
err2 := w.handleErr(job, err)
if err2 != nil {
log.Printf(
"que: perform(job(%v)) with err %v but handle err with a new err %v",
job.ID(), err, err2,
)
}
}
}()
err = w.perform(w.performContext, job)
}
func (w *Worker) handleErr(job Job, cerr error) error {
return job.RetryInPlan(context.Background(), cerr)
}
func (w *Worker) asProcessed(id int64) {
w.mux.Lock()
w.processed = append(w.processed, id)
w.mux.Unlock()
atomic.AddInt32(&w.ongoing, -1)
}
func (w *Worker) unlockProcessed() error {
var processed []int64
w.mux.Lock()
if len(w.processed) > 0 {
processed = w.processed
w.processed = make([]int64, 0, cap(processed))
}
w.mux.Unlock()
var err error
if len(processed) > 0 {
err = w.mutex.Unlock(context.Background(), processed)
}
return err
}
func (w *Worker) isStopped() bool {
return atomic.LoadInt32(&w.stopped) == 1
}
// Stop stops worker execution.
// It blocks until ctx.Done() or all processing jobs done.
func (w *Worker) Stop(ctx context.Context) error {
atomic.StoreInt32(&w.stopped, 1)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
wait:
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if atomic.LoadInt32(&w.ongoing) == 0 {
break wait
}
}
}
return w.unlockProcessed()
}