Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include queue id in job key #2

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,17 @@ type BulkDequeuer interface {
// FindOptions specifies how a job is searched from a queue.
type FindOptions struct {
Namespace string
QueueID string
}

// Validate validates FindOptions.
func (opt *FindOptions) Validate() error {
if opt.Namespace == "" {
return ErrEmptyNamespace
}
if opt.QueueID == "" {
return ErrEmptyQueueID
}
return nil
}

Expand Down
29 changes: 19 additions & 10 deletions redis_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ type redisQueue struct {
findScript *redis.Script
}

// scriptKey returns a slice of strings containing at least one of the keys to
// be used by a script. This allows Redis route our script execution to the
// correct node in the event we're using a namespace.
func scriptKey(ns, queueID string) []string {
return []string{strings.Join([]string{ns, "queue", queueID}, ":")}
}

// RedisQueue implements Queue with other additional capabilities
type RedisQueue interface {
Queue
Expand All @@ -39,7 +46,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue {
local at = tonumber(ARGV[i])
local job_id = ARGV[i+1]
local jobm = ARGV[i+2]
local job_key = table.concat({ns, "job", job_id}, ":")
local job_key = table.concat({ns, "queue", queue_id, "job", job_id}, ":")

-- update job fields
redis.call("hset", job_key, "msgpack", jobm)
Expand Down Expand Up @@ -105,7 +112,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue {

for i = 3,table.getn(ARGV) do
local job_id = ARGV[i]
local job_key = table.concat({ns, "job", job_id}, ":")
local job_key = table.concat({ns, "queue", queue_id, "job", job_id}, ":")

-- delete job fields
table.insert(del_args, job_key)
Expand All @@ -119,10 +126,11 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue {

findScript := redis.NewScript(`
local ns = ARGV[1]
local queue_id = ARGV[2]
local ret = {}
for i = 2,table.getn(ARGV) do
for i = 3,table.getn(ARGV) do
local job_id = ARGV[i]
local job_key = table.concat({ns, "job", job_id}, ":")
local job_key = table.concat({ns, "queue", queue_id, "job", job_id}, ":")
local jobm = redis.call("hget", job_key, "msgpack")

table.insert(ret, jobm)
Expand Down Expand Up @@ -163,7 +171,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error {
args[2+3*i+1] = job.ID
args[2+3*i+2] = jobm
}
return q.enqueueScript.Run(context.Background(), q.client, nil, args...).Err()
return q.enqueueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err()
}

func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) {
Expand All @@ -179,7 +187,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro
if err != nil {
return nil, err
}
res, err := q.dequeueScript.Run(context.Background(), q.client, nil,
res, err := q.dequeueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID),
opt.Namespace,
opt.QueueID,
opt.At.Unix(),
Expand Down Expand Up @@ -223,7 +231,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error {
for i, job := range jobs {
args[2+i] = job.ID
}
return q.ackScript.Run(context.Background(), q.client, nil, args...).Err()
return q.ackScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err()
}

func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) {
Expand All @@ -234,12 +242,13 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error)
if len(jobIDs) == 0 {
return nil, nil
}
args := make([]interface{}, 1+len(jobIDs))
args := make([]interface{}, 2+len(jobIDs))
args[0] = opt.Namespace
args[1] = opt.QueueID
for i, jobID := range jobIDs {
args[1+i] = jobID
args[2+i] = jobID
}
res, err := q.findScript.Run(context.Background(), q.client, nil, args...).Result()
res, err := q.findScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result()
if err != nil {
return nil, err
}
Expand Down
96 changes: 49 additions & 47 deletions redis_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func TestRedisQueueEnqueue(t *testing.T) {
require.NoError(t, err)

err = q.Enqueue(job, &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID)
jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID)

h, err := client.HGetAll(context.Background(), jobKey).Result()
require.NoError(t, err)
Expand All @@ -42,7 +42,8 @@ func TestRedisQueueEnqueue(t *testing.T) {
}, h)

jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{
Namespace: "{ns1}",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)
require.Len(t, jobs, 2)
Expand All @@ -52,12 +53,13 @@ func TestRedisQueueEnqueue(t *testing.T) {

jobs[0].LastError = "hello world"
err = q.Enqueue(jobs[0], &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)
jobs, err = q.BulkFind([]string{job.ID}, &FindOptions{
Namespace: "{ns1}",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)
require.Len(t, jobs, 1)
Expand All @@ -66,7 +68,7 @@ func TestRedisQueueEnqueue(t *testing.T) {

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -77,14 +79,14 @@ func TestRedisQueueEnqueue(t *testing.T) {
require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score)

err = q.Enqueue(job.Delay(time.Minute), &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -108,19 +110,19 @@ func TestRedisQueueDequeue(t *testing.T) {
job := NewJob()
err := job.MarshalPayload(message{Text: "hello"})
require.NoError(t, err)
jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID)
jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID)

err = q.Enqueue(job, &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

now := job.EnqueuedAt.Add(123 * time.Second)

jobDequeued, err := q.Dequeue(&DequeueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
At: now,
InvisibleSec: 0,
})
Expand All @@ -129,7 +131,7 @@ func TestRedisQueueDequeue(t *testing.T) {

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -140,8 +142,8 @@ func TestRedisQueueDequeue(t *testing.T) {
require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score)

jobDequeued, err = q.Dequeue(&DequeueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
At: now,
InvisibleSec: 60,
})
Expand All @@ -158,7 +160,7 @@ func TestRedisQueueDequeue(t *testing.T) {

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -170,8 +172,8 @@ func TestRedisQueueDequeue(t *testing.T) {

// empty
_, err = q.Dequeue(&DequeueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
At: now,
InvisibleSec: 60,
})
Expand All @@ -194,12 +196,12 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) {
require.NoError(t, err)

err = q.Enqueue(job, &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID)
jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID)

h, err := client.HGetAll(context.Background(), jobKey).Result()
require.NoError(t, err)
Expand All @@ -212,16 +214,16 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) {
require.NoError(t, client.Del(context.Background(), jobKey).Err())

_, err = q.Dequeue(&DequeueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
At: job.EnqueuedAt,
InvisibleSec: 60,
})
require.Equal(t, ErrEmptyQueue, err)

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -239,16 +241,16 @@ func TestRedisQueueAck(t *testing.T) {
job := NewJob()

err := q.Enqueue(job, &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID)
jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID)

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -263,14 +265,14 @@ func TestRedisQueueAck(t *testing.T) {
require.EqualValues(t, 1, e)

err = q.Ack(job, &AckOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
"ns1:queue:{q1}",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -283,8 +285,8 @@ func TestRedisQueueAck(t *testing.T) {
require.EqualValues(t, 0, e)

err = q.Ack(job, &AckOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)
}
Expand All @@ -298,30 +300,30 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) {
job := NewJob()

err := q.Enqueue(job, &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
})
require.NoError(t, err)

m, err := q.GetQueueMetrics(&QueueMetricsOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
At: job.EnqueuedAt,
})
require.NoError(t, err)
require.Equal(t, "{ns1}", m.Namespace)
require.Equal(t, "q1", m.QueueID)
require.Equal(t, "ns1", m.Namespace)
require.Equal(t, "{q1}", m.QueueID)
require.EqualValues(t, 1, m.ReadyTotal)
require.EqualValues(t, 0, m.ScheduledTotal)

m, err = q.GetQueueMetrics(&QueueMetricsOptions{
Namespace: "{ns1}",
QueueID: "q1",
Namespace: "ns1",
QueueID: "{q1}",
At: job.EnqueuedAt.Add(-time.Second),
})
require.NoError(t, err)
require.Equal(t, "{ns1}", m.Namespace)
require.Equal(t, "q1", m.QueueID)
require.Equal(t, "ns1", m.Namespace)
require.Equal(t, "{q1}", m.QueueID)
require.EqualValues(t, 0, m.ReadyTotal)
require.EqualValues(t, 1, m.ScheduledTotal)
}