Skip to content

Commit

Permalink
Merge pull request #613 from kthcloud/dev
Browse files Browse the repository at this point in the history
Fix loggers using too many resources
  • Loading branch information
saffronjam authored May 22, 2024
2 parents ad48d6c + 63ab7f3 commit 2e2db84
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
16 changes: 11 additions & 5 deletions pkg/db/message_queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (client *Client) Publish(queueName string, jsonData interface{}) error {

// Consume starts consuming messages from the given queue.
// It is non-blocking and will run in a separate goroutine.
func (client *Client) Consume(queueName string, handler func(data []byte) error) error {
func (client *Client) Consume(ctx context.Context, queueName string, handler func(data []byte) error) error {
go func() {
pubSub := client.RedisClient.Subscribe(context.TODO(), queueName)
defer func(channel *redis.PubSub) {
Expand All @@ -51,10 +51,16 @@ func (client *Client) Consume(queueName string, handler func(data []byte) error)
}(pubSub)

channel := pubSub.Channel()
for msg := range channel {
err := handler([]byte(msg.Payload))
if err != nil {
log.Println("Failed to handle queue message. Details: " + err.Error())
for {
select {
case <-ctx.Done():
return
case msg := <-channel:
err := handler([]byte(msg.Payload))
if err != nil {
log.Println("Failed to handle message. Details: " + err.Error())
continue
}
}
}
}()
Expand Down
3 changes: 2 additions & 1 deletion pkg/services/logger/deployment_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func DeploymentLogger(ctx context.Context) error {

dZone := zone
cancelFuncs := make(map[string]context.CancelFunc)
err := message_queue.New().Consume(LogQueueKey(zone.Name), OnPodEvent(ctx, &dZone, cancelFuncs))
err := message_queue.New().Consume(ctx, LogQueueKey(zone.Name), OnPodEvent(ctx, &dZone, cancelFuncs))
if err != nil {
return err
}
Expand Down Expand Up @@ -98,6 +98,7 @@ func OnPodEvent(ctx context.Context, zone *configModels.Zone, cancelFuncs map[st
return
}
}
time.Sleep(1 * time.Second)
}
}(ctx, loggerCtx)
case k8s.PodEventDeleted:
Expand Down
2 changes: 2 additions & 0 deletions pkg/subsystems/k8s/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (client *Client) SetupLogStream(ctx context.Context, podName string, from t
lastPush = time.Now()
}
}

time.Sleep(1 * time.Second)
}
}
}()
Expand Down

0 comments on commit 2e2db84

Please sign in to comment.