Skip to content

Commit f3ca206

Browse files
committed
add separate producer
1 parent 461d5a1 commit f3ca206

File tree

5 files changed

+145
-8
lines changed

5 files changed

+145
-8
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,22 @@ func main() {
7171
> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8`
7272
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
7373
74+
## Producer consumer distributed deployment
75+
76+
By default, delayqueue instances can be both producers and consumers. If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you.
77+
78+
```go
79+
func consumer() {
80+
queue := NewQueue("test", redisCli, cb)
81+
queue.StartConsume()
82+
}
83+
84+
func producer() {
85+
publisher := NewPublisher("test", redisCli)
86+
publisher.SendDelayMsg(strconv.Itoa(i), 0)
87+
}
88+
```
89+
7490
## Options
7591

7692
```go

README_CN.md

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ DelayQueue 的主要优势:
1414
- 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker
1515
- 支持各类 Redis 集群
1616

17-
# 安装
17+
## 安装
1818

1919
在启用了 go mod 的项目中运行下列命令即可完成安装:
2020

@@ -24,7 +24,7 @@ go get github.com/hdt3213/delayqueue
2424

2525
> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@v8`
2626
27-
# 开始使用
27+
## 开始使用
2828

2929
```go
3030
package main
@@ -68,7 +68,23 @@ func main() {
6868
> 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8`
6969
> 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中
7070
71-
# 选项
71+
## 分开部署生产者和消费者
72+
73+
默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewProducer`.
74+
75+
```go
76+
func consumer() {
77+
queue := NewQueue("test", redisCli, cb)
78+
queue.StartConsume()
79+
}
80+
81+
func producer() {
82+
publisher := NewPublisher("test", redisCli)
83+
publisher.SendDelayMsg(strconv.Itoa(i), 0)
84+
}
85+
```
86+
87+
## 选项
7288

7389
```go
7490
WithLogger(logger *log.Logger)
@@ -121,7 +137,7 @@ WithDefaultRetryCount(count uint)
121137

122138
在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。
123139

124-
# 集群
140+
## 集群
125141

126142
如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`:
127143

@@ -151,7 +167,7 @@ callback := func(s string) bool {
151167
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())
152168
```
153169

154-
# 更多细节
170+
## 更多细节
155171

156172
完整流程如图所示:
157173

delayqueue.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type RedisCli interface {
5353
}
5454

5555
type hashTagKeyOpt int
56+
type noCallbackOpt int
5657

5758
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
5859
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
@@ -71,16 +72,19 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in
7172
if cli == nil {
7273
panic("cli is required")
7374
}
74-
if callback == nil {
75-
panic("callback is required")
76-
}
7775
useHashTag := false
76+
noCallback := false
7877
for _, opt := range opts {
7978
switch opt.(type) {
8079
case hashTagKeyOpt:
8180
useHashTag = true
81+
case noCallbackOpt:
82+
noCallback = true
8283
}
8384
}
85+
if !noCallback && callback == nil {
86+
panic("callback is required")
87+
}
8488
var keyPrefix string
8589
if useHashTag {
8690
keyPrefix = "{dp:" + name + "}"

publisher.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package delayqueue
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
"github.com/redis/go-redis/v9"
8+
)
9+
10+
// Publisher only publishes messages to delayqueue, it is a encapsulation of delayqueue
11+
type Publisher struct {
12+
inner *DelayQueue
13+
}
14+
15+
// NewPublisher0 creates a new Publisher by a RedisCli instance
16+
func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher {
17+
opts = append(opts, noCallbackOpt(1))
18+
return &Publisher{
19+
inner: NewQueue0(name, cli, nil, opts...),
20+
}
21+
}
22+
23+
// NewPublisher creates a new Publisher by a *redis.Client
24+
func NewPublisher(name string, cli *redis.Client, opts ...interface{}) *Publisher {
25+
rc := &redisV9Wrapper{
26+
inner: cli,
27+
}
28+
return NewPublisher0(name, rc, opts...)
29+
}
30+
31+
// WithLogger customizes logger for queue
32+
func (p *Publisher) WithLogger(logger *log.Logger) *Publisher {
33+
p.inner.logger = logger
34+
return p
35+
}
36+
37+
// SendScheduleMsg submits a message delivered at given time
38+
func (p *Publisher) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
39+
return p.inner.SendScheduleMsg(payload, t, opts...)
40+
}
41+
42+
// SendDelayMsg submits a message delivered after given duration
43+
func (p *Publisher) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
44+
return p.inner.SendDelayMsg(payload, duration, opts...)
45+
}

publisher_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package delayqueue
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/redis/go-redis/v9"
12+
)
13+
14+
func TestPublisher(t *testing.T) {
15+
redisCli := redis.NewClient(&redis.Options{
16+
Addr: "127.0.0.1:6379",
17+
})
18+
redisCli.FlushDB(context.Background())
19+
size := 1000
20+
retryCount := 3
21+
deliveryCount := make(map[string]int)
22+
cb := func(s string) bool {
23+
deliveryCount[s]++
24+
i, _ := strconv.ParseInt(s, 10, 64)
25+
return i%2 == 0
26+
}
27+
logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)
28+
queue := NewQueue("test", redisCli, cb).WithLogger(logger)
29+
publisher := NewPublisher("test", redisCli).WithLogger(logger)
30+
31+
for i := 0; i < size; i++ {
32+
err := publisher.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
33+
if err != nil {
34+
t.Error(err)
35+
}
36+
}
37+
for i := 0; i < 10*size; i++ {
38+
err := queue.consume()
39+
if err != nil {
40+
t.Errorf("consume error: %v", err)
41+
return
42+
}
43+
}
44+
for k, v := range deliveryCount {
45+
i, _ := strconv.ParseInt(k, 10, 64)
46+
if i%2 == 0 {
47+
if v != 1 {
48+
t.Errorf("expect 1 delivery, actual %d", v)
49+
}
50+
} else {
51+
if v != retryCount+1 {
52+
t.Errorf("expect %d delivery, actual %d", retryCount+1, v)
53+
}
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)