diff --git a/pkg/db/message_queue/client.go b/pkg/db/message_queue/client.go index bd32c0ae..7149ad04 100644 --- a/pkg/db/message_queue/client.go +++ b/pkg/db/message_queue/client.go @@ -39,7 +39,7 @@ func (client *Client) Publish(queueName string, jsonData interface{}) error { // Consume starts consuming messages from the given queue. // It is non-blocking and will run in a separate goroutine. -func (client *Client) Consume(queueName string, handler func(data []byte) error) error { +func (client *Client) Consume(ctx context.Context, queueName string, handler func(data []byte) error) error { go func() { pubSub := client.RedisClient.Subscribe(context.TODO(), queueName) defer func(channel *redis.PubSub) { @@ -51,10 +51,16 @@ func (client *Client) Consume(queueName string, handler func(data []byte) error) }(pubSub) channel := pubSub.Channel() - for msg := range channel { - err := handler([]byte(msg.Payload)) - if err != nil { - log.Println("Failed to handle queue message. Details: " + err.Error()) + for { + select { + case <-ctx.Done(): + return + case msg := <-channel: + err := handler([]byte(msg.Payload)) + if err != nil { + log.Println("Failed to handle message. Details: " + err.Error()) + continue + } } } }() diff --git a/pkg/services/logger/deployment_logger.go b/pkg/services/logger/deployment_logger.go index 6c9b8587..6ed98d47 100644 --- a/pkg/services/logger/deployment_logger.go +++ b/pkg/services/logger/deployment_logger.go @@ -24,7 +24,7 @@ func DeploymentLogger(ctx context.Context) error { dZone := zone cancelFuncs := make(map[string]context.CancelFunc) - err := message_queue.New().Consume(LogQueueKey(zone.Name), OnPodEvent(ctx, &dZone, cancelFuncs)) + err := message_queue.New().Consume(ctx, LogQueueKey(zone.Name), OnPodEvent(ctx, &dZone, cancelFuncs)) if err != nil { return err } @@ -98,6 +98,7 @@ func OnPodEvent(ctx context.Context, zone *configModels.Zone, cancelFuncs map[st return } } + time.Sleep(1 * time.Second) } }(ctx, loggerCtx) case k8s.PodEventDeleted: diff --git a/pkg/subsystems/k8s/logs.go b/pkg/subsystems/k8s/logs.go index 487915c5..2e20def8 100644 --- a/pkg/subsystems/k8s/logs.go +++ b/pkg/subsystems/k8s/logs.go @@ -107,6 +107,8 @@ func (client *Client) SetupLogStream(ctx context.Context, podName string, from t lastPush = time.Now() } } + + time.Sleep(1 * time.Second) } } }()