Skip to content

Commit

Permalink
Queue context timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
tbuchaillot committed Jan 31, 2024
1 parent 17071ee commit f9095af
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
10 changes: 9 additions & 1 deletion temporal/internal/driver/redisv9/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package redisv9
import (
"context"
"errors"
"fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests persistent storage - DB mongo 4.4

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests persistent storage - DB mongo 4.2

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests persistent storage - DB mongo 7

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests persistent storage - DB mongo 6

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests temporal storage - DB redis 6.0.0 single

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests temporal storage - DB redis 7.0.0 single

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests temporal storage - DB redis 6.0.0 cluster

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests temporal storage - DB redis 7.0.0 cluster

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests temporal storage - DB redis 7.2.0 single

imported and not used: "fmt"

Check failure on line 6 in temporal/internal/driver/redisv9/queue.go

View workflow job for this annotation

GitHub Actions / Tests temporal storage - DB redis 7.0.0 TLS

imported and not used: "fmt"
"time"

"github.com/TykTechnologies/storage/temporal/model"
"github.com/TykTechnologies/storage/temporal/temperr"
Expand Down Expand Up @@ -70,7 +72,13 @@ func (m *messageAdapter) Payload() (string, error) {

// Receive waits for and returns the next message from the subscription.
func (r *subscriptionAdapter) Receive(ctx context.Context) (model.Message, error) {
msg, err := r.pubSub.Receive(ctx)
timeout := time.Duration(0)
deadline, ok := ctx.Deadline()
if ok {
timeout = deadline.Sub(time.Now())
}

msg, err := r.pubSub.ReceiveTimeout(ctx, timeout)
if err != nil {
if errors.Is(err, redis.ErrClosed) {
return nil, temperr.ClosedConnection
Expand Down
53 changes: 53 additions & 0 deletions temporal/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"errors"
"fmt"
"net"
"strings"
"testing"
Expand Down Expand Up @@ -278,3 +279,55 @@ func TestQueue_NewQueue(t *testing.T) {
_, err := NewQueue(&testutil.StubConnector{})
assert.NotNil(t, err)
}

func TestQueue_Ctx(t *testing.T) {
connectors := testutil.TestConnectors(t)
defer testutil.CloseConnectors(t, connectors)

for _, connector := range connectors {
t.Run(connector.Type(), func(t *testing.T) {
queue, err := NewQueue(connector)
assert.Nil(t, err)
assert.NotNil(t, queue)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

didReceive := make(chan bool, 1)
go func(context.Context) {

defer func() {
close(didReceive)
}()
sub := queue.Subscribe(ctx, "test_channel")
defer sub.Close()

for {
select {
case <-ctx.Done():
return
default:
msg, _ := sub.Receive(ctx)
//assert.NotNil(t, err)
if msg.Type() == model.MessageTypeSubscription {
continue
} else {
fmt.Print("Received message")
didReceive <- true
return
}
}
}
}(ctx)

queue.Publish(context.Background(), "test_channel", "test")

// cancel the context now that the goroutine is running
cancel()

// wait for the goroutine to exit
<-didReceive
})

}

}

0 comments on commit f9095af

Please sign in to comment.