diff --git a/queue/redis/pop.go b/queue/redis/pop.go index 732092c32..65a043e8b 100644 --- a/queue/redis/pop.go +++ b/queue/redis/pop.go @@ -15,13 +15,23 @@ import ( ) // Pop grabs an item from the specified channel off the queue. -func (c *client) Pop(ctx context.Context) (*types.Item, error) { +func (c *client) Pop(ctx context.Context, routes []string) (*types.Item, error) { c.Logger.Tracef("popping item from queue %s", c.config.Channels) + // define channels to pop from + var channels []string + + // if routes were supplied, use those + if len(routes) > 0 { + channels = routes + } else { + channels = c.config.Channels + } + // build a redis queue command to pop an item from queue // // https://pkg.go.dev/github.com/go-redis/redis?tab=doc#Client.BLPop - popCmd := c.Redis.BLPop(ctx, c.config.Timeout, c.config.Channels...) + popCmd := c.Redis.BLPop(ctx, c.config.Timeout, channels...) // blocking call to pop item from queue // diff --git a/queue/redis/pop_test.go b/queue/redis/pop_test.go index 1f19bf2c3..9490a4a31 100644 --- a/queue/redis/pop_test.go +++ b/queue/redis/pop_test.go @@ -34,7 +34,7 @@ func TestRedis_Pop(t *testing.T) { } // setup redis mock - _redis, err := NewTest(_signingPrivateKey, _signingPublicKey, "vela") + _redis, err := NewTest(_signingPrivateKey, _signingPublicKey, "vela", "custom") if err != nil { t.Errorf("unable to create queue service: %v", err) } @@ -47,6 +47,12 @@ func TestRedis_Pop(t *testing.T) { t.Errorf("unable to push item to queue: %v", err) } + // push item to queue with custom channel + err = _redis.Redis.RPush(context.Background(), "custom", signed).Err() + if err != nil { + t.Errorf("unable to push item to queue: %v", err) + } + // setup timeout redis mock timeout, err := NewTest(_signingPrivateKey, _signingPublicKey, "vela") if err != nil { @@ -73,15 +79,22 @@ func TestRedis_Pop(t *testing.T) { // setup tests tests := []struct { - failure bool - redis *client - want *types.Item + failure bool + redis *client + want *types.Item + channels []string }{ { failure: false, redis: _redis, want: _item, }, + { + failure: false, + redis: _redis, + want: _item, + channels: []string{"custom"}, + }, { failure: false, redis: timeout, @@ -96,7 +109,7 @@ func TestRedis_Pop(t *testing.T) { // run tests for _, test := range tests { - got, err := test.redis.Pop(context.Background()) + got, err := test.redis.Pop(context.Background(), test.channels) if test.failure { if err == nil { diff --git a/queue/service.go b/queue/service.go index 418b3a919..307364b50 100644 --- a/queue/service.go +++ b/queue/service.go @@ -26,7 +26,7 @@ type Service interface { // Pop defines a function that grabs an // item off the queue. - Pop(context.Context) (*types.Item, error) + Pop(context.Context, []string) (*types.Item, error) // Push defines a function that publishes an // item to the specified route in the queue.