diff --git a/README.md b/README.md index 9c6000d..ed70d54 100644 --- a/README.md +++ b/README.md @@ -375,6 +375,16 @@ Note that `delivery.Push()` has the same affect as `delivery.Reject()` if the queue has no push queue set up. So in our example above, if the delivery fails in the consumer on `pushQ2`, then the `Push()` call will reject the delivery. +### Schedule Publish + +If you want to publish a task after a given time(with a maximum delay of no more than 2 hours),you can call `queue.SchedulePublish` + +```go +things, err := connection.OpenQueue("things") +things.SchedulePublish("abc", 180) //publish a task after 180s +``` + + ### Stop Consuming If you want to stop consuming from the queue, you can call `StopConsuming()`: diff --git a/connection.go b/connection.go index 0247627..4f1ac92 100644 --- a/connection.go +++ b/connection.go @@ -54,10 +54,10 @@ type redisConnection struct { unackedTemplate string readyTemplate string rejectedTemplate string - - redisClient RedisClient - errChan chan<- error - heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped + scheduleTemplate string + redisClient RedisClient + errChan chan<- error + heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped lock sync.Mutex stopped bool @@ -110,6 +110,7 @@ func openConnection(tag string, redisClient RedisClient, useRedisHashTags bool, unackedTemplate: getTemplate(connectionQueueUnackedBaseTemplate, useRedisHashTags), readyTemplate: getTemplate(queueReadyBaseTemplate, useRedisHashTags), rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags), + scheduleTemplate: getTemplate(queueScheduleBaseTemplate, useRedisHashTags), redisClient: redisClient, errChan: errChan, heartbeatStop: make(chan chan struct{}, 1), // mark heartbeat as active, can be stopped @@ -328,6 +329,7 @@ func (connection *redisConnection) openQueue(name string) Queue { connection.unackedTemplate, connection.readyTemplate, connection.rejectedTemplate, + connection.scheduleTemplate, connection.redisClient, connection.errChan, ) diff --git a/errors.go b/errors.go index 1382f94..0565ce5 100644 --- a/errors.go +++ b/errors.go @@ -51,3 +51,16 @@ func (e *DeliveryError) Error() string { func (e *DeliveryError) Unwrap() error { return e.RedisErr } + +type EnqueuingError struct { + RedisErr error + Count int // number of consecutive errors +} + +func (e *EnqueuingError) Error() string { + return fmt.Sprintf("rmq.EnqueuingError (%d): %s", e.Count, e.RedisErr.Error()) +} + +func (e *EnqueuingError) Unwrap() error { + return e.RedisErr +} diff --git a/go.mod b/go.mod index 0517eb7..acf9f85 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.6.0 github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/yuin/gopher-lua v1.1.0 // indirect diff --git a/go.sum b/go.sum index 904701d..d73c229 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/queue.go b/queue.go index 8999882..f601b5a 100644 --- a/queue.go +++ b/queue.go @@ -7,6 +7,9 @@ import ( "strings" "sync" "time" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" ) const ( @@ -17,6 +20,7 @@ const ( type Queue interface { Publish(payload ...string) error PublishBytes(payload ...[]byte) error + SchedulePublish(payload string, publishDelay uint64) error SetPushQueue(pushQueue Queue) Remove(payload string, count int64, removeFromRejected bool) error RemoveBytes(payload []byte, count int64, removeFromRejected bool) error @@ -51,6 +55,7 @@ type redisQueue struct { unackedKey string // key to list of currently consuming deliveries readyKey string // key to list of ready deliveries rejectedKey string // key to list of rejected deliveries + scheduleKey string // key to set of schedule deliveries pushKey string // key to list of pushed deliveries redisClient RedisClient errChan chan<- error @@ -67,7 +72,7 @@ type redisQueue struct { func newQueue( name, connectionName, queuesKey string, - consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string, + consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate, scheduleTemplate string, redisClient RedisClient, errChan chan<- error, ) *redisQueue { @@ -80,6 +85,7 @@ func newQueue( readyKey := strings.Replace(readyTemplate, phQueue, name, 1) rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1) + scheduleKey := strings.Replace(scheduleTemplate, phQueue, name, 1) consumingStopped := make(chan struct{}) ackCtx, ackCancel := context.WithCancel(context.Background()) @@ -92,6 +98,7 @@ func newQueue( unackedKey: unackedKey, readyKey: readyKey, rejectedKey: rejectedKey, + scheduleKey: scheduleKey, redisClient: redisClient, errChan: errChan, consumingStopped: consumingStopped, @@ -172,6 +179,7 @@ func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.D queue.deliveryChan = make(chan Delivery, prefetchLimit) // log.Printf("rmq queue started consuming %s %d %s", queue, prefetchLimit, pollDuration) go queue.consume() + go queue.enqueueSchedule() return nil } @@ -454,6 +462,20 @@ func (queue *redisQueue) PurgeRejected() (int64, error) { return queue.deleteRedisList(queue.rejectedKey) } +// SchedulePublish publishes a task after a given time (in seconds), with a maximum delay of no more than 2 hours +func (queue *redisQueue) SchedulePublish(payload string, delay uint64) error { + if delay > 7200 { + delay = 7200 + } + // add uuid for payload as prefix , so that make every payload is unique to avoid overwriting when publishing same string value + uniquePayload := fmt.Sprintf("%s:%s", uuid.NewString(), payload) + _, err := queue.redisClient.ZAdd(queue.scheduleKey, redis.Z{ + Score: float64(time.Now().Unix()) + float64(delay), + Member: uniquePayload, + }) + return err +} + // return number of deleted list items // https://www.redisgreen.net/blog/deleting-large-lists func (queue *redisQueue) deleteRedisList(key string) (int64, error) { @@ -603,3 +625,71 @@ func jitteredDuration(duration time.Duration) time.Duration { factor := 0.9 + rand.Float64()*0.2 // a jitter factor between 0.9 and 1.1 (+-10%) return time.Duration(float64(duration) * factor) } + +// enqueueSchedule enqueues tasks that already reach the given time to ready list +func (queue *redisQueue) enqueueSchedule() error { + errorCount := 0 //number of consecutive errors + + for { + select { + case <-queue.consumingStopped: + return ErrorConsumingStopped + default: + } + + now := time.Now().Unix() + result, err := queue.redisClient.ZRangeByScore(queue.scheduleKey, &redis.ZRangeBy{ + Min: "-inf", + Max: fmt.Sprintf("%d", now), + }) + + if err != nil { + errorCount++ + + select { // try to add error to channel, but don't block + case queue.errChan <- &EnqueuingError{RedisErr: err, Count: errorCount}: + default: + } + + continue + } else { //success + errorCount = 0 + } + + if len(result) > 0 { + err := queue.redisClient.TxPipelined(func(pipe redis.Pipeliner) error { + for _, val := range result { + + parts := strings.SplitN(val, ":", 2) + + if len(parts) < 2 { + return fmt.Errorf("invalid payload format: %s", val) + } + + originalPayload := parts[1] + + _, err := pipe.LPush(context.TODO(), queue.readyKey, originalPayload).Result() + if err != nil { + return err + } + _, err = pipe.ZRem(context.TODO(), queue.scheduleKey, val).Result() + if err != nil { + return err + } + } + return nil + }) + if err != nil { + errorCount++ + select { // try to add error to channel, but don't block + case queue.errChan <- &EnqueuingError{RedisErr: err, Count: errorCount}: + default: + } + continue + } else { //success + errorCount = 0 + } + } + time.Sleep(time.Second) + } +} diff --git a/redis_client.go b/redis_client.go index c1d031c..7edf12e 100644 --- a/redis_client.go +++ b/redis_client.go @@ -1,6 +1,10 @@ package rmq -import "time" +import ( + "time" + + "github.com/redis/go-redis/v9" +) type RedisClient interface { // simple keys @@ -20,6 +24,11 @@ type RedisClient interface { SAdd(key, value string) (total int64, err error) SMembers(key string) (members []string, err error) SRem(key, value string) (affected int64, err error) + ZAdd(key string, members ...redis.Z) (total int64, err error) + ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) + + // tx + TxPipelined(fn func(pipe redis.Pipeliner) error) error // special FlushDb() error diff --git a/redis_keys.go b/redis_keys.go index db4e782..1567ab4 100644 --- a/redis_keys.go +++ b/redis_keys.go @@ -12,6 +12,7 @@ const ( queuesKey = "rmq::queues" // Set of all open queues queueReadyBaseTemplate = "rmq::queue::[{queue}]::ready" // List of deliveries in that {queue} (right is first and oldest, left is last and youngest) queueRejectedBaseTemplate = "rmq::queue::[{queue}]::rejected" // List of rejected deliveries from that {queue} + queueScheduleBaseTemplate = "rmq::queue::[{queue}]::schedule" // List of schedule deliveries phConnection = "{connection}" // connection name phQueue = "{queue}" // queue name diff --git a/redis_wrapper.go b/redis_wrapper.go index 9550d58..6e45a2f 100644 --- a/redis_wrapper.go +++ b/redis_wrapper.go @@ -70,6 +70,19 @@ func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error) return wrapper.rawClient.SRem(context.TODO(), key, value).Result() } +func (wrapper RedisWrapper) ZAdd(key string, members ...redis.Z) (affected int64, err error) { + return wrapper.rawClient.ZAdd(context.TODO(), key, members...).Result() +} + +func (wrapper RedisWrapper) ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) { + return wrapper.rawClient.ZRangeByScore(context.TODO(), key, opt).Result() +} + +func (wrapper RedisWrapper) TxPipelined(fn func(pipe redis.Pipeliner) error) error { + _, err := wrapper.rawClient.TxPipelined(context.TODO(), fn) + return err +} + func (wrapper RedisWrapper) FlushDb() error { // NOTE: using Err() here because Result() string is always "OK" return wrapper.rawClient.FlushDB(context.TODO()).Err() diff --git a/test_queue.go b/test_queue.go index 6dbfea8..15bc49b 100644 --- a/test_queue.go +++ b/test_queue.go @@ -39,6 +39,7 @@ func (queue *TestQueue) RemoveBytes(payload []byte, count int64, removeFromRejec } func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) } +func (*TestQueue) SchedulePublish(string, uint64) error { panic(errorNotSupported) } func (*TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) } func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) } func (*TestQueue) AddConsumer(string, Consumer) (string, error) { panic(errorNotSupported) } diff --git a/test_redis_client.go b/test_redis_client.go index a1841ee..5ce40ea 100644 --- a/test_redis_client.go +++ b/test_redis_client.go @@ -433,3 +433,15 @@ func (client *TestRedisClient) findList(key string) ([]string, error) { // return an empty list if not found return []string{}, nil } + +func (client *TestRedisClient) TxPipelined(fn func(pipe redis.Pipeliner) error) error { + panic(errorNotSupported) +} + +func (client *TestRedisClient) ZAdd(key string, members ...redis.Z) (total int64, err error) { + panic(errorNotSupported) +} + +func (client *TestRedisClient) ZRangeByScore(key string, opt *redis.ZRangeBy) (result []string, err error) { + panic(errorNotSupported) +}