-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
304 lines (277 loc) · 7.72 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package queue
import (
"context"
"fmt"
"log"
"slices"
"sort"
"strconv"
"sync"
"time"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
type JobFnc func(job *Job)
type Queue struct {
Name string
client *redis.Client
mutex *redsync.Mutex
jobFnc JobFnc
jobs []Job
RetryFailures int
workers int
limiter *RateLimiter
ctx context.Context
scheduler *cron.Cron
cronPattern string
running bool
}
type RateLimiter struct {
Max int
Duration time.Duration
}
type Options struct {
Connect *redis.Options
Workers int
RetryFailures int
Limiter *RateLimiter
Pattern string
}
// New creates a new queue with the given name and options. The name is used to
// identify the queue in Redis, and the options are used to configure the queue
// behavior. The options are as follows:
//
// - Connect: the Redis connection options
// - Workers: the number of workers to run concurrently
// - RetryFailures: the number of times to retry a failed job
// - Limiter: the rate limiter options
// - Pattern: the cron pattern to use for scheduling jobs
//
// The returned queue is ready to use.
func New(name string, opt *Options) *Queue {
client := redis.NewClient(opt.Connect)
pool := goredis.NewPool(client)
rs := redsync.New(pool)
queue := &Queue{
client: client,
Name: name,
mutex: rs.NewMutex(name),
workers: opt.Workers,
RetryFailures: opt.RetryFailures,
limiter: opt.Limiter,
ctx: context.Background(),
running: true,
}
if opt.Pattern != "" {
queue.scheduler = cron.New()
queue.cronPattern = opt.Pattern
}
return queue
}
// AddJob adds a new job to the queue. If the queue is currently rate limited, the
// job is delayed. Otherwise, the job is added to the waiting list and the queue
// is run.
func (q *Queue) AddJob(opt AddJobOptions) {
var job *Job
if q.IsLimit() {
fmt.Printf("Add job %s to delay\n", opt.Id)
job = q.delayJob(opt)
} else {
fmt.Printf("Add job %s to waiting\n", opt.Id)
job = q.newJob(opt)
}
q.jobs = append(q.jobs, *job)
sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority })
q.Run()
}
type AddJobOptions struct {
Id string
Data interface{}
Priority int
}
// BulkAddJob adds multiple jobs to the queue at once. If the queue is currently
// rate limited, the jobs are delayed. Otherwise, the jobs are added to the
// waiting list and the queue is run.
func (q *Queue) BulkAddJob(options []AddJobOptions) {
sort.SliceStable(options, func(i, j int) bool { return options[i].Priority > options[j].Priority })
for _, option := range options {
var job *Job
if q.IsLimit() {
fmt.Printf("Add job %s to delay\n", option.Id)
job = q.delayJob(option)
} else {
fmt.Printf("Add job %s to waiting\n", option.Id)
job = q.newJob(option)
}
q.jobs = append(q.jobs, *job)
}
sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority })
q.Run()
}
// Process sets the callback for the queue to process jobs. If the queue has a
// scheduler, it will be started with the given cron pattern. Otherwise, the
// callback is simply stored.
func (q *Queue) Process(jobFnc JobFnc) {
q.jobFnc = jobFnc
if q.scheduler != nil {
_, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() })
if err != nil {
log.Fatalf("failed to add job: %v\n", err)
}
q.scheduler.Start()
}
}
// Run runs all ready jobs in the queue. It locks the mutex, runs all ready jobs
// in parallel, and then unlocks the mutex. If the queue has a scheduler, it
// will be started with the given cron pattern. Otherwise, the callback is simply
// stored.
func (q *Queue) Run() {
if !q.running {
fmt.Println("Queue is not running")
return
}
// Lock the mutex
if err := q.mutex.Lock(); err != nil {
fmt.Println(err)
return
}
fmt.Printf("Running on %s\n", time.Now().String())
execJobs := []*Job{}
for i := 0; i < len(q.jobs); i++ {
if q.jobs[i].IsReady() {
execJobs = append(execJobs, &q.jobs[i])
}
}
for len(execJobs) > 0 {
min := Min(len(execJobs), q.workers)
numJobs := execJobs[:min]
var wg sync.WaitGroup
for i := 0; i < len(numJobs); i++ {
job := numJobs[i]
wg.Add(1)
go func() {
defer wg.Done()
q.jobFnc(job)
}()
}
wg.Wait()
execJobs = execJobs[min:]
}
q.Retry()
// Unlock the mutex
if ok, err := q.mutex.Unlock(); !ok || err != nil {
fmt.Println(err)
}
}
// Retry processes all jobs that are in the DelayedStatus. It locks the mutex,
// collects all delayed jobs, and then processes them concurrently up to the
// number of available workers. After processing, it checks if the job is finished
// and removes it from the list of jobs to retry. Finally, it unlocks the mutex.
func (q *Queue) Retry() {
execJobs := []*Job{}
// For retry failures
for i := 0; i < len(q.jobs); i++ {
if q.jobs[i].Status == DelayedStatus {
execJobs = append(execJobs, &q.jobs[i])
}
}
for len(execJobs) > 0 {
min := Min(len(execJobs), q.workers)
numJobs := execJobs[:min]
var wg sync.WaitGroup
var finishedJob []string
for i := 0; i < len(numJobs); i++ {
job := numJobs[i]
wg.Add(1)
go func() {
defer wg.Done()
q.jobFnc(job)
if job.IsFinished() {
finishedJob = append(finishedJob, job.Id)
}
}()
}
wg.Wait()
if len(finishedJob) > 0 {
for _, id := range finishedJob {
if len(execJobs) == 1 && execJobs[0].Id == id {
execJobs = []*Job{}
break
}
idx := slices.IndexFunc(execJobs, func(j *Job) bool { return j.Id == id })
if idx != -1 {
execJobs = append(execJobs[:idx], execJobs[idx+1:]...)
} else {
break
}
}
}
}
}
// CountJobs returns the number of jobs in the queue that have the given status.
//
// This can be used to monitor the queue, and to test the queue's behavior.
func (q *Queue) CountJobs(status JobStatus) int {
count := 0
for i := 0; i < len(q.jobs); i++ {
if q.jobs[i].Status == status {
count++
}
}
return count
}
// Remove removes the job with the given key from the queue. It uses a linear
// search, so it has a time complexity of O(n), where n is the number of jobs in
// the queue.
func (q *Queue) Remove(key string) {
findIdx := slices.IndexFunc(q.jobs, func(j Job) bool { return j.Id == key })
if findIdx != -1 {
q.jobs = append(q.jobs[:findIdx], q.jobs[findIdx+1:]...)
}
}
func Min(a int, b int) int {
if a < b {
return a
}
return b
}
// IsLimit returns true if the number of jobs in the queue has reached the
// maximum value set in the RateLimiter. It checks the current value of the
// counter in Redis and returns true if it is greater than or equal to the
// maximum value. If the counter does not exist or is less than the maximum,
// it increments the counter and returns false. If the increment fails, it
// panics.
func (q *Queue) IsLimit() bool {
if q.limiter == nil {
return false
}
client := q.client
attemps, _ := client.Get(q.ctx, q.Name).Result()
attempNum, _ := strconv.Atoi(attemps)
if attemps != "" && attempNum >= q.limiter.Max {
return true
} else {
value, err := client.Incr(q.ctx, q.Name).Result()
if err != nil {
panic(err.Error())
}
if value == 1 {
client.Expire(q.ctx, q.Name, q.limiter.Duration)
}
return false
}
}
// Pause stops the queue from running. When paused, the queue will not accept new
// jobs and will not run any jobs in the queue. It will resume when Resume is
// called.
func (q *Queue) Pause() {
q.running = false
}
// Resume resumes the queue from a paused state. When resumed, the queue will
// accept new jobs and run any jobs in the queue.
func (q *Queue) Resume() {
q.running = true
q.Run()
}