diff --git a/pkg/db/key_value/client.go b/pkg/db/key_value/client.go index e1580f51..21646266 100644 --- a/pkg/db/key_value/client.go +++ b/pkg/db/key_value/client.go @@ -7,7 +7,7 @@ import ( "github.com/redis/go-redis/v9" "go-deploy/pkg/db" "go-deploy/utils" - "strings" + "regexp" "time" ) @@ -77,32 +77,38 @@ func (client *Client) Decr(key string) error { return client.RedisClient.Decr(context.Background(), key).Err() } -// SetUpExpirationListener sets up a listener for expired key events for every key beginning with the given prefix. +// SetUpExpirationListener sets up a listener for expired key events for every key that matches the given pattern. // It is non-blocking and will run in a separate goroutine. -func (client *Client) SetUpExpirationListener(prefix string, handler func(key string) error) error { +func (client *Client) SetUpExpirationListener(ctx context.Context, pattern string, handler func(key string) error) error { go func() { - pubsub := client.RedisClient.PSubscribe(context.TODO(), "__keyevent@0__:expired") + pubSub := client.RedisClient.PSubscribe(context.TODO(), "__keyevent@0__:expired") defer func(channel *redis.PubSub) { err := channel.Close() if err != nil { return } - }(pubsub) + }(pubSub) - channel := pubsub.Channel() - for msg := range channel { - if msg.Payload == "" { - continue - } - - if !strings.HasPrefix(msg.Payload, prefix) { - continue - } - - err := handler(msg.Payload) - if err != nil { - utils.PrettyPrintError(fmt.Errorf("failed to handle expired key event for key %s. details: %w", msg.Payload, err)) + channel := pubSub.Channel() + for { + select { + case <-ctx.Done(): return + case msg := <-channel: + if msg.Payload == "" { + continue + } + + // Check regex match + if !regexp.MustCompile(pattern).MatchString(msg.Payload) { + continue + } + + err := handler(msg.Payload) + if err != nil { + utils.PrettyPrintError(fmt.Errorf("failed to handle expired key event for key %s. details: %w", msg.Payload, err)) + return + } } } }() diff --git a/pkg/services/logger/pod_event_listener.go b/pkg/services/logger/pod_event_listener.go index 0ecbfdf3..bc148bad 100644 --- a/pkg/services/logger/pod_event_listener.go +++ b/pkg/services/logger/pod_event_listener.go @@ -31,9 +31,9 @@ func PodEventListener(ctx context.Context) error { kvc := key_value.New() kvc.RedisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex") - // Set up a listener for expired key events for every key beginning with "logs-" + // Set up a listener for expired key events for every key that matches "logs:[a-z0-9-]" // This is used to ensure that a new logger is created for a pod if the previous one fails - err := kvc.SetUpExpirationListener(LogsKey, func(key string) error { + err := kvc.SetUpExpirationListener(ctx, "^logs:[a-zA-Z0-9-]+$", func(key string) error { podName := PodNameFromLogKey(key) // Reset the expired key so that it can be used again