diff --git a/CHANGELOG.md b/CHANGELOG.md
index f930693..b0274b1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,11 @@
+
+## [v2.1.0](https://github.com/robinjoseph08/go-pg-migrations/compare/v2.1.0...v3.0.0) (2020-10-15)
+
+### Features
+
+* **redis:** updated redis client library to v8
+
## [v2.1.0](https://github.com/robinjoseph08/go-pg-migrations/compare/v2.0.0...v2.1.0) (2020-10-15)
diff --git a/README.md b/README.md
index 2245dc1..cfacf74 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# redisqueue
-
-[](https://pkg.go.dev/github.com/robinjoseph08/redisqueue/v2?tab=doc)
+
+[](https://pkg.go.dev/github.com/robinjoseph08/redisqueue/v3?tab=doc)
[](https://travis-ci.org/robinjoseph08/redisqueue)
[](https://coveralls.io/github/robinjoseph08/redisqueue?branch=master)
[](https://goreportcard.com/report/github.com/robinjoseph08/redisqueue)
@@ -38,13 +38,13 @@ versioning. So please make sure to initialize a Go module before installing
```sh
go mod init github.com/my/repo
-go get github.com/robinjoseph08/redisqueue/v2
+go get github.com/robinjoseph08/redisqueue/v3
```
Import:
```go
-import "github.com/robinjoseph08/redisqueue/v2"
+import "github.com/robinjoseph08/redisqueue/v3"
```
## Example
@@ -57,11 +57,11 @@ package main
import (
"fmt"
- "github.com/robinjoseph08/redisqueue/v2"
+ "github.com/robinjoseph08/redisqueue/v3"
)
func main() {
- p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
+ p, err := redisqueue.NewProducerWithOptions(context.TODO(), &redisqueue.ProducerOptions{
StreamMaxLength: 10000,
ApproximateMaxLength: true,
})
@@ -96,11 +96,11 @@ import (
"fmt"
"time"
- "github.com/robinjoseph08/redisqueue/v2"
+ "github.com/robinjoseph08/redisqueue/v3"
)
func main() {
- c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
+ c, err := redisqueue.NewConsumerWithOptions(context.TODO(), &redisqueue.ConsumerOptions{
VisibilityTimeout: 60 * time.Second,
BlockingTimeout: 5 * time.Second,
ReclaimInterval: 1 * time.Second,
diff --git a/consumer.go b/consumer.go
index 73af3db..fc6eef7 100644
--- a/consumer.go
+++ b/consumer.go
@@ -1,12 +1,13 @@
package redisqueue
import (
+ "context"
"net"
"os"
"sync"
"time"
- "github.com/go-redis/redis/v7"
+ "github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)
@@ -80,6 +81,7 @@ type Consumer struct {
streams []string
queue chan *Message
wg *sync.WaitGroup
+ ctx context.Context
stopReclaim chan struct{}
stopPoll chan struct{}
@@ -143,6 +145,7 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
streams: make([]string, 0),
queue: make(chan *Message, options.BufferSize),
wg: &sync.WaitGroup{},
+ ctx: context.Background(),
stopReclaim: make(chan struct{}, 1),
stopPoll: make(chan struct{}, 1),
@@ -190,7 +193,7 @@ func (c *Consumer) Run() {
for stream, consumer := range c.consumers {
c.streams = append(c.streams, stream)
- err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, consumer.id).Err()
+ err := c.redis.XGroupCreateMkStream(c.ctx, stream, c.options.GroupName, consumer.id).Err()
// ignoring the BUSYGROUP error makes this a noop
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
c.Errors <- errors.Wrap(err, "error creating consumer group")
@@ -256,7 +259,7 @@ func (c *Consumer) reclaim() {
end := "+"
for {
- res, err := c.redis.XPendingExt(&redis.XPendingExtArgs{
+ res, err := c.redis.XPendingExt(c.ctx, &redis.XPendingExtArgs{
Stream: stream,
Group: c.options.GroupName,
Start: start,
@@ -276,7 +279,7 @@ func (c *Consumer) reclaim() {
for _, r := range res {
if r.Idle >= c.options.VisibilityTimeout {
- claimres, err := c.redis.XClaim(&redis.XClaimArgs{
+ claimres, err := c.redis.XClaim(c.ctx, &redis.XClaimArgs{
Stream: stream,
Group: c.options.GroupName,
Consumer: c.options.Name,
@@ -297,7 +300,7 @@ func (c *Consumer) reclaim() {
// exists, the only way we can get it out of the
// pending state is to acknowledge it.
if err == redis.Nil {
- err = c.redis.XAck(stream, c.options.GroupName, r.ID).Err()
+ err = c.redis.XAck(c.ctx, stream, c.options.GroupName, r.ID).Err()
if err != nil {
c.Errors <- errors.Wrapf(err, "error acknowledging after failed claim for %q stream and %q message", stream, r.ID)
continue
@@ -335,7 +338,7 @@ func (c *Consumer) poll() {
}
return
default:
- res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{
+ res, err := c.redis.XReadGroup(c.ctx, &redis.XReadGroupArgs{
Group: c.options.GroupName,
Consumer: c.options.Name,
Streams: c.streams,
@@ -389,7 +392,7 @@ func (c *Consumer) work() {
c.Errors <- errors.Wrapf(err, "error calling ConsumerFunc for %q stream and %q message", msg.Stream, msg.ID)
continue
}
- err = c.redis.XAck(msg.Stream, c.options.GroupName, msg.ID).Err()
+ err = c.redis.XAck(c.ctx, msg.Stream, c.options.GroupName, msg.ID).Err()
if err != nil {
c.Errors <- errors.Wrapf(err, "error acknowledging after success for %q stream and %q message", msg.Stream, msg.ID)
continue
diff --git a/consumer_test.go b/consumer_test.go
index 844ae6f..10cdbdb 100644
--- a/consumer_test.go
+++ b/consumer_test.go
@@ -1,11 +1,12 @@
package redisqueue
import (
+ "context"
"os"
"testing"
"time"
- "github.com/go-redis/redis/v7"
+ "github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -128,6 +129,8 @@ func TestRegisterWithLastID(t *testing.T) {
}
func TestRun(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
t.Run("sends an error if no ConsumerFuncs are registered", func(tt *testing.T) {
c, err := NewConsumer()
require.NoError(tt, err)
@@ -152,12 +155,12 @@ func TestRun(t *testing.T) {
require.NoError(tt, err)
// create a producer
- p, err := NewProducer()
+ p, err := NewProducer(ctx)
require.NoError(tt, err)
// create consumer group
- c.redis.XGroupDestroy(tt.Name(), c.options.GroupName)
- c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$")
+ c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
+ c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
// enqueue a message
err = p.Enqueue(&Message{
@@ -196,12 +199,12 @@ func TestRun(t *testing.T) {
require.NoError(tt, err)
// create a producer
- p, err := NewProducer()
+ p, err := NewProducer(ctx)
require.NoError(tt, err)
// create consumer group
- c.redis.XGroupDestroy(tt.Name(), c.options.GroupName)
- c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$")
+ c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
+ c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
// enqueue a message
msg := &Message{
@@ -220,7 +223,7 @@ func TestRun(t *testing.T) {
})
// read the message but don't acknowledge it
- res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{
+ res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.options.GroupName,
Consumer: "failed_consumer",
Streams: []string{tt.Name(), ">"},
@@ -256,15 +259,15 @@ func TestRun(t *testing.T) {
require.NoError(tt, err)
// create a producer
- p, err := NewProducerWithOptions(&ProducerOptions{
+ p, err := NewProducerWithOptions(ctx, &ProducerOptions{
StreamMaxLength: 2,
ApproximateMaxLength: false,
})
require.NoError(tt, err)
// create consumer group
- c.redis.XGroupDestroy(tt.Name(), c.options.GroupName)
- c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$")
+ c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
+ c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
// enqueue a message
msg1 := &Message{
@@ -287,7 +290,7 @@ func TestRun(t *testing.T) {
})
// read the message but don't acknowledge it
- res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{
+ res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.options.GroupName,
Consumer: "failed_consumer",
Streams: []string{tt.Name(), ">"},
@@ -312,7 +315,7 @@ func TestRun(t *testing.T) {
c.Run()
// check if the pending message is still there
- pendingRes, err := c.redis.XPendingExt(&redis.XPendingExtArgs{
+ pendingRes, err := c.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: tt.Name(),
Group: c.options.GroupName,
Start: "-",
@@ -336,15 +339,15 @@ func TestRun(t *testing.T) {
require.NoError(tt, err)
// create a producer
- p, err := NewProducerWithOptions(&ProducerOptions{
+ p, err := NewProducerWithOptions(ctx, &ProducerOptions{
StreamMaxLength: 1,
ApproximateMaxLength: false,
})
require.NoError(tt, err)
// create consumer group
- c.redis.XGroupDestroy(tt.Name(), c.options.GroupName)
- c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$")
+ c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
+ c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
// enqueue a message
msg := &Message{
@@ -361,7 +364,7 @@ func TestRun(t *testing.T) {
})
// read the message but don't acknowledge it
- res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{
+ res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.options.GroupName,
Consumer: "failed_consumer",
Streams: []string{tt.Name(), ">"},
@@ -373,7 +376,7 @@ func TestRun(t *testing.T) {
require.Equal(tt, msg.ID, res[0].Messages[0].ID)
// delete the message
- err = c.redis.XDel(tt.Name(), msg.ID).Err()
+ err = c.redis.XDel(ctx, tt.Name(), msg.ID).Err()
require.NoError(tt, err)
// watch for consumer errors
@@ -392,7 +395,7 @@ func TestRun(t *testing.T) {
c.Run()
// check that there are no pending messages
- pendingRes, err := c.redis.XPendingExt(&redis.XPendingExtArgs{
+ pendingRes, err := c.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: tt.Name(),
Group: c.options.GroupName,
Start: "-",
@@ -414,12 +417,12 @@ func TestRun(t *testing.T) {
require.NoError(tt, err)
// create a producer
- p, err := NewProducer()
+ p, err := NewProducer(ctx)
require.NoError(tt, err)
// create consumer group
- c.redis.XGroupDestroy(tt.Name(), c.options.GroupName)
- c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$")
+ c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
+ c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
// enqueue a message
err = p.Enqueue(&Message{
@@ -458,12 +461,12 @@ func TestRun(t *testing.T) {
require.NoError(tt, err)
// create a producer
- p, err := NewProducer()
+ p, err := NewProducer(ctx)
require.NoError(tt, err)
// create consumer group
- c.redis.XGroupDestroy(tt.Name(), c.options.GroupName)
- c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$")
+ c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
+ c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
// enqueue a message
err = p.Enqueue(&Message{
diff --git a/go.mod b/go.mod
index c798d27..8b15e58 100644
--- a/go.mod
+++ b/go.mod
@@ -6,18 +6,15 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.7.0 // indirect
github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f
- github.com/go-redis/redis/v7 v7.3.0
- github.com/golang/protobuf v1.3.3 // indirect
+ github.com/go-redis/redis/v8 v8.11.4
github.com/imdario/mergo v0.3.7 // indirect
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/goveralls v0.0.2
github.com/pborman/uuid v1.2.0 // indirect
github.com/pkg/errors v0.9.1
- github.com/stretchr/testify v1.5.1
+ github.com/stretchr/testify v1.7.0
github.com/tsuyoshiwada/go-gitcmd v0.0.0-20180205145712-5f1f5f9475df // indirect
github.com/urfave/cli v1.20.0 // indirect
- golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db // indirect
gopkg.in/AlecAivazis/survey.v1 v1.8.5 // indirect
gopkg.in/kyokomi/emoji.v1 v1.5.1 // indirect
- gopkg.in/yaml.v2 v2.3.0 // indirect
)
diff --git a/go.sum b/go.sum
index 2208032..b61f743 100644
--- a/go.sum
+++ b/go.sum
@@ -1,22 +1,43 @@
github.com/Netflix/go-expect v0.0.0-20180615182759-c93bf25de8e8 h1:xzYJEypr/85nBpB11F9br+3HUrpgb+fcm5iADzXXYEw=
github.com/Netflix/go-expect v0.0.0-20180615182759-c93bf25de8e8/go.mod h1:oX5x61PbNXchhh0oikYAH+4Pcfw5LKv21+Jnpr6r6Pc=
+github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f h1:8l4Aw3Jmx0pLKYMkY+1b6yBPgE+rzRtA5T3vqFyI2Z8=
github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f/go.mod h1:Dcsy1kii/xFyNad5JqY/d0GO5mu91sungp5xotbm3Yk=
github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg=
github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
+github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
+github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ=
@@ -43,11 +64,18 @@ github.com/mattn/goveralls v0.0.2 h1:7eJB6EqsPhRVxvwEXGnqdO2sJI0PTsrWoTMXEk9/OQc
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -59,35 +87,72 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tsuyoshiwada/go-gitcmd v0.0.0-20180205145712-5f1f5f9475df h1:Y2l28Jr3vOEeYtxfVbMtVfOdAwuUqWaP9fvNKiBVeXY=
github.com/tsuyoshiwada/go-gitcmd v0.0.0-20180205145712-5f1f5f9475df/go.mod h1:pnyouUty/nBr/zm3GYwTIt+qFTLWbdjeLjZmJdzJOu8=
github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180606202747-9527bec2660b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db h1:9hRk1xeL9LTT3yX/941DqeBz87XgHAQuj+TbimYJuiw=
golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/AlecAivazis/survey.v1 v1.8.5 h1:QoEEmn/d5BbuPIL2qvXwzJdttFFhRQFkaq+tEKb7SMI=
gopkg.in/AlecAivazis/survey.v1 v1.8.5/go.mod h1:iBNOmqKz/NUbZx3bA+4hAGLRC7fSK7tgtVDT4tB22XA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
@@ -106,3 +171,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/producer.go b/producer.go
index b52c028..4a6c51b 100644
--- a/producer.go
+++ b/producer.go
@@ -1,7 +1,9 @@
package redisqueue
import (
- "github.com/go-redis/redis/v7"
+ "context"
+
+ "github.com/go-redis/redis/v8"
)
// ProducerOptions provide options to configure the Producer.
@@ -36,6 +38,7 @@ type ProducerOptions struct {
type Producer struct {
options *ProducerOptions
redis redis.UniversalClient
+ ctx context.Context
}
var defaultProducerOptions = &ProducerOptions{
@@ -46,12 +49,12 @@ var defaultProducerOptions = &ProducerOptions{
// NewProducer uses a default set of options to create a Producer. It sets
// StreamMaxLength to 1000 and ApproximateMaxLength to true. In most production
// environments, you'll want to use NewProducerWithOptions.
-func NewProducer() (*Producer, error) {
- return NewProducerWithOptions(defaultProducerOptions)
+func NewProducer(ctx context.Context) (*Producer, error) {
+ return NewProducerWithOptions(ctx, defaultProducerOptions)
}
// NewProducerWithOptions creates a Producer using custom ProducerOptions.
-func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) {
+func NewProducerWithOptions(ctx context.Context, options *ProducerOptions) (*Producer, error) {
var r redis.UniversalClient
if options.RedisClient != nil {
@@ -67,6 +70,7 @@ func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) {
return &Producer{
options: options,
redis: r,
+ ctx: ctx,
}, nil
}
@@ -85,7 +89,7 @@ func (p *Producer) Enqueue(msg *Message) error {
} else {
args.MaxLen = p.options.StreamMaxLength
}
- id, err := p.redis.XAdd(args).Result()
+ id, err := p.redis.XAdd(p.ctx, args).Result()
if err != nil {
return err
}
diff --git a/producer_test.go b/producer_test.go
index 6c00b9a..3a90806 100644
--- a/producer_test.go
+++ b/producer_test.go
@@ -1,6 +1,7 @@
package redisqueue
import (
+ "context"
"testing"
"github.com/stretchr/testify/assert"
@@ -8,8 +9,10 @@ import (
)
func TestNewProducer(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.TODO())
+ defer cancel()
t.Run("creates a new producer", func(tt *testing.T) {
- p, err := NewProducer()
+ p, err := NewProducer(ctx)
require.NoError(tt, err)
assert.NotNil(tt, p)
@@ -17,8 +20,10 @@ func TestNewProducer(t *testing.T) {
}
func TestNewProducerWithOptions(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
t.Run("creates a new producer", func(tt *testing.T) {
- p, err := NewProducerWithOptions(&ProducerOptions{})
+ p, err := NewProducerWithOptions(ctx, &ProducerOptions{})
require.NoError(tt, err)
assert.NotNil(tt, p)
@@ -27,7 +32,7 @@ func TestNewProducerWithOptions(t *testing.T) {
t.Run("allows custom *redis.Client", func(tt *testing.T) {
rc := newRedisClient(nil)
- p, err := NewProducerWithOptions(&ProducerOptions{
+ p, err := NewProducerWithOptions(ctx, &ProducerOptions{
RedisClient: rc,
})
require.NoError(tt, err)
@@ -37,7 +42,7 @@ func TestNewProducerWithOptions(t *testing.T) {
})
t.Run("bubbles up errors", func(tt *testing.T) {
- _, err := NewProducerWithOptions(&ProducerOptions{
+ _, err := NewProducerWithOptions(ctx, &ProducerOptions{
RedisOptions: &RedisOptions{Addr: "localhost:0"},
})
require.Error(tt, err)
@@ -47,8 +52,10 @@ func TestNewProducerWithOptions(t *testing.T) {
}
func TestEnqueue(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
t.Run("puts the message in the stream", func(tt *testing.T) {
- p, err := NewProducerWithOptions(&ProducerOptions{})
+ p, err := NewProducerWithOptions(ctx, &ProducerOptions{})
require.NoError(t, err)
msg := &Message{
@@ -58,13 +65,13 @@ func TestEnqueue(t *testing.T) {
err = p.Enqueue(msg)
require.NoError(tt, err)
- res, err := p.redis.XRange(msg.Stream, msg.ID, msg.ID).Result()
+ res, err := p.redis.XRange(ctx, msg.Stream, msg.ID, msg.ID).Result()
require.NoError(tt, err)
assert.Equal(tt, "value", res[0].Values["test"])
})
t.Run("bubbles up errors", func(tt *testing.T) {
- p, err := NewProducerWithOptions(&ProducerOptions{ApproximateMaxLength: true})
+ p, err := NewProducerWithOptions(ctx, &ProducerOptions{ApproximateMaxLength: true})
require.NoError(t, err)
msg := &Message{
diff --git a/redis.go b/redis.go
index 0a5a68c..d1c00c9 100644
--- a/redis.go
+++ b/redis.go
@@ -1,12 +1,13 @@
package redisqueue
import (
+ "context"
"fmt"
"regexp"
"strconv"
"strings"
- "github.com/go-redis/redis/v7"
+ "github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)
@@ -30,7 +31,7 @@ func newRedisClient(options *RedisOptions) *redis.Client {
// to the actual instance and that the instance supports Redis streams (i.e.
// it's at least v5).
func redisPreflightChecks(client redis.UniversalClient) error {
- info, err := client.Info("server").Result()
+ info, err := client.Info(context.Background(), "server").Result()
if err != nil {
return err
}
diff --git a/redis_test.go b/redis_test.go
index 4614ab7..052a34e 100644
--- a/redis_test.go
+++ b/redis_test.go
@@ -1,6 +1,7 @@
package redisqueue
import (
+ "context"
"testing"
"github.com/stretchr/testify/assert"
@@ -8,18 +9,20 @@ import (
)
func TestNewRedisClient(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
t.Run("returns a new redis client", func(tt *testing.T) {
options := &RedisOptions{}
r := newRedisClient(options)
- err := r.Ping().Err()
+ err := r.Ping(ctx).Err()
assert.NoError(tt, err)
})
t.Run("defaults options if it's nil", func(tt *testing.T) {
r := newRedisClient(nil)
- err := r.Ping().Err()
+ err := r.Ping(ctx).Err()
assert.NoError(tt, err)
})
}