Skip to content

Commit 7c769e2

Browse files
committed
feat: 消费者轮询队列, 支持单个连接同时轮询多个队列
1 parent 13c86ee commit 7c769e2

File tree

3 files changed

+13
-7
lines changed

3 files changed

+13
-7
lines changed

delayqueue/delay_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ func Push(job Job) error {
4242
}
4343

4444
// 获取Job
45-
func Pop(topic string) (*Job, error) {
46-
jobId, err := blockPopFromReadyQueue(topic, config.Setting.QueueBlockTimeout)
45+
func Pop(topics []string) (*Job, error) {
46+
jobId, err := blockPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout)
4747
if err != nil {
4848
return nil, err
4949
}

delayqueue/ready_queue.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ func pushToReadyQueue(queueName string, jobId string) error {
1414
return err
1515
}
1616

17-
func blockPopFromReadyQueue(queueName string, timeout int) (string, error){
18-
queueName = fmt.Sprintf(config.Setting.QueueName, queueName)
19-
value, err := execRedisCommand("BLPOP", queueName, timeout)
17+
func blockPopFromReadyQueue(queues []string, timeout int) (string, error){
18+
var args []interface{}
19+
for _, queue := range queues {
20+
queue = fmt.Sprintf(config.Setting.QueueName, queue)
21+
args = append(args, queue)
22+
}
23+
args = append(args, timeout)
24+
value, err := execRedisCommand("BLPOP", args...)
2025
if err != nil {
2126
return "", err
2227
}

routers/routers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ func Pop(resp http.ResponseWriter, req *http.Request) {
7272
resp.Write(generateFailureBody("topic不能为空"))
7373
return
7474
}
75-
76-
job, err := delayqueue.Pop(topic)
75+
// 多个topic逗号分隔
76+
topics := strings.Split(topic, ",")
77+
job, err := delayqueue.Pop(topics)
7778
if err != nil {
7879
log.Printf("获取job失败#%s", err.Error())
7980
resp.Write(generateFailureBody("获取Job失败"))

0 commit comments

Comments
 (0)