Skip to content

Commit

Permalink
enhance(queue): adjust pop to allow channels as argument (#948)
Browse files Browse the repository at this point in the history
Co-authored-by: David May <49894298+wass3rw3rk@users.noreply.github.com>
Co-authored-by: Kelly Merrick <kelly.merrick@target.com>
  • Loading branch information
3 people authored Aug 31, 2023
1 parent 17af770 commit 46337cf
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
14 changes: 12 additions & 2 deletions queue/redis/pop.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@ import (
)

// Pop grabs an item from the specified channel off the queue.
func (c *client) Pop(ctx context.Context) (*types.Item, error) {
func (c *client) Pop(ctx context.Context, routes []string) (*types.Item, error) {
c.Logger.Tracef("popping item from queue %s", c.config.Channels)

// define channels to pop from
var channels []string

// if routes were supplied, use those
if len(routes) > 0 {
channels = routes
} else {
channels = c.config.Channels
}

// build a redis queue command to pop an item from queue
//
// https://pkg.go.dev/github.com/go-redis/redis?tab=doc#Client.BLPop
popCmd := c.Redis.BLPop(ctx, c.config.Timeout, c.config.Channels...)
popCmd := c.Redis.BLPop(ctx, c.config.Timeout, channels...)

// blocking call to pop item from queue
//
Expand Down
23 changes: 18 additions & 5 deletions queue/redis/pop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestRedis_Pop(t *testing.T) {
}

// setup redis mock
_redis, err := NewTest(_signingPrivateKey, _signingPublicKey, "vela")
_redis, err := NewTest(_signingPrivateKey, _signingPublicKey, "vela", "custom")
if err != nil {
t.Errorf("unable to create queue service: %v", err)
}
Expand All @@ -47,6 +47,12 @@ func TestRedis_Pop(t *testing.T) {
t.Errorf("unable to push item to queue: %v", err)
}

// push item to queue with custom channel
err = _redis.Redis.RPush(context.Background(), "custom", signed).Err()
if err != nil {
t.Errorf("unable to push item to queue: %v", err)
}

// setup timeout redis mock
timeout, err := NewTest(_signingPrivateKey, _signingPublicKey, "vela")
if err != nil {
Expand All @@ -73,15 +79,22 @@ func TestRedis_Pop(t *testing.T) {

// setup tests
tests := []struct {
failure bool
redis *client
want *types.Item
failure bool
redis *client
want *types.Item
channels []string
}{
{
failure: false,
redis: _redis,
want: _item,
},
{
failure: false,
redis: _redis,
want: _item,
channels: []string{"custom"},
},
{
failure: false,
redis: timeout,
Expand All @@ -96,7 +109,7 @@ func TestRedis_Pop(t *testing.T) {

// run tests
for _, test := range tests {
got, err := test.redis.Pop(context.Background())
got, err := test.redis.Pop(context.Background(), test.channels)

if test.failure {
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion queue/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Service interface {

// Pop defines a function that grabs an
// item off the queue.
Pop(context.Context) (*types.Item, error)
Pop(context.Context, []string) (*types.Item, error)

// Push defines a function that publishes an
// item to the specified route in the queue.
Expand Down

0 comments on commit 46337cf

Please sign in to comment.