Skip to content

Commit

Permalink
Merge pull request #615 from kthcloud/dev
Browse files Browse the repository at this point in the history
Fix regex match to find logger worker expired keys
  • Loading branch information
saffronjam authored May 23, 2024
2 parents 373a941 + 4c0863e commit 20370fa
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
42 changes: 24 additions & 18 deletions pkg/db/key_value/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/redis/go-redis/v9"
"go-deploy/pkg/db"
"go-deploy/utils"
"strings"
"regexp"
"time"
)

Expand Down Expand Up @@ -77,32 +77,38 @@ func (client *Client) Decr(key string) error {
return client.RedisClient.Decr(context.Background(), key).Err()
}

// SetUpExpirationListener sets up a listener for expired key events for every key beginning with the given prefix.
// SetUpExpirationListener sets up a listener for expired key events for every key that matches the given pattern.
// It is non-blocking and will run in a separate goroutine.
func (client *Client) SetUpExpirationListener(prefix string, handler func(key string) error) error {
func (client *Client) SetUpExpirationListener(ctx context.Context, pattern string, handler func(key string) error) error {
go func() {
pubsub := client.RedisClient.PSubscribe(context.TODO(), "__keyevent@0__:expired")
pubSub := client.RedisClient.PSubscribe(context.TODO(), "__keyevent@0__:expired")
defer func(channel *redis.PubSub) {
err := channel.Close()
if err != nil {
return
}
}(pubsub)
}(pubSub)

channel := pubsub.Channel()
for msg := range channel {
if msg.Payload == "" {
continue
}

if !strings.HasPrefix(msg.Payload, prefix) {
continue
}

err := handler(msg.Payload)
if err != nil {
utils.PrettyPrintError(fmt.Errorf("failed to handle expired key event for key %s. details: %w", msg.Payload, err))
channel := pubSub.Channel()
for {
select {
case <-ctx.Done():
return
case msg := <-channel:
if msg.Payload == "" {
continue
}

// Check regex match
if !regexp.MustCompile(pattern).MatchString(msg.Payload) {
continue
}

err := handler(msg.Payload)
if err != nil {
utils.PrettyPrintError(fmt.Errorf("failed to handle expired key event for key %s. details: %w", msg.Payload, err))
return
}
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/logger/pod_event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func PodEventListener(ctx context.Context) error {
kvc := key_value.New()
kvc.RedisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")

// Set up a listener for expired key events for every key beginning with "logs-"
// Set up a listener for expired key events for every key that matches "logs:[a-z0-9-]"
// This is used to ensure that a new logger is created for a pod if the previous one fails
err := kvc.SetUpExpirationListener(LogsKey, func(key string) error {
err := kvc.SetUpExpirationListener(ctx, "^logs:[a-zA-Z0-9-]+$", func(key string) error {
podName := PodNameFromLogKey(key)

// Reset the expired key so that it can be used again
Expand Down

0 comments on commit 20370fa

Please sign in to comment.