Skip to content

Commit

Permalink
Merge pull request #616 from kthcloud/dev
Browse files Browse the repository at this point in the history
Add check for Pod existance for loggers
  • Loading branch information
saffronjam authored May 23, 2024
2 parents 20370fa + febbc1c commit 9b41bac
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 23 deletions.
5 changes: 5 additions & 0 deletions pkg/db/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (dbCtx *Context) setupRedis() error {
return makeError(err)
}

err = dbCtx.RedisClient.ConfigSet(context.TODO(), "notify-keyspace-events", "Ex").Err()
if err != nil {
return makeError(err)
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/services/logger/deployment_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logger
import (
"context"
"encoding/json"
"errors"
"fmt"
configModels "go-deploy/models/config"
"go-deploy/models/model"
Expand All @@ -13,6 +14,7 @@ import (
"go-deploy/pkg/log"
"go-deploy/pkg/subsystems/k8s"
"go-deploy/service"
sErrors "go-deploy/service/errors"
"go-deploy/utils"
"os"
"time"
Expand Down Expand Up @@ -83,6 +85,10 @@ func OnPodEvent(ctx context.Context, zone *configModels.Zone, cancelFuncs map[st
err = service.V1().Deployments().K8s().SetupPodLogStream(loggerCtx, zone, logEvent.PodName, lastLogged, onLog)
if err != nil {
cancelFunc()
if errors.Is(err, sErrors.DeploymentNotFoundErr) {
return nil
}

return err
}

Expand Down
25 changes: 21 additions & 4 deletions pkg/services/logger/pod_event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,37 @@ func PodEventListener(ctx context.Context) error {

mqc := message_queue.New()
kvc := key_value.New()
kvc.RedisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")
z := zone

// Set up a listener for expired key events for every key that matches "logs:[a-z0-9-]"
// This is used to ensure that a new logger is created for a pod if the previous one fails
err := kvc.SetUpExpirationListener(ctx, "^logs:[a-zA-Z0-9-]+$", func(key string) error {
podName := PodNameFromLogKey(key)

// Reset the expired key so that it can be used again
_, err := kvc.SetNX(key, false, LoggerLifetime)
// Check if Pod still exists
exists, err := service.V1().Deployments().K8s().PodExists(&z, podName)
if err != nil {
return err
}

if !exists {
// Clean up the keys
_ = kvc.Del(LogKey(podName))
_ = kvc.Del(LastLogKey(podName))
_ = kvc.Del(OwnerLogKey(podName))
_ = mqc.Publish(LogQueueKey(zone.Name), LogEvent{
PodName: podName,
PodEvent: k8s.PodEventDeleted,
})
return nil
}

// Reset the expired key so that it can be used again
_, err = kvc.SetNX(key, false, LoggerLifetime)
if err == nil {
return err
}

// Check if there are any active listeners, otherwise mark this pod as being processed
count, err := mqc.GetListeners(LogQueueKey(zone.Name))
if err != nil {
Expand Down Expand Up @@ -75,7 +93,6 @@ func PodEventListener(ctx context.Context) error {
return nil
})

z := zone
err = service.V1().Deployments().K8s().SetupPodWatcher(ctx, &z, func(podName string, event string) {
switch event {
case k8s.PodEventAdded:
Expand Down
3 changes: 3 additions & 0 deletions pkg/subsystems/k8s/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ import "fmt"
var (
// IngressHostInUseErr is returned when the ingress host is already in use.
IngressHostInUseErr = fmt.Errorf("ingress host is already in use")

// NotFoundErr is returned when a resource is not found.
NotFoundErr = fmt.Errorf("resource not found")
)
29 changes: 18 additions & 11 deletions pkg/subsystems/k8s/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go-deploy/pkg/log"
"go-deploy/pkg/subsystems/k8s/keys"
"go-deploy/pkg/subsystems/k8s/models"
"go-deploy/service/errors"
"golang.org/x/exp/maps"
"io"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -37,22 +38,21 @@ func (client *Client) getPodNames(namespace, deploymentName string) ([]string, e
return podNames, nil
}

// SetupLogStream reads logs from a pod and sends them to the callback function
func (client *Client) SetupLogStream(ctx context.Context, podName string, from time.Time, onLog func(deploymentName string, lines []models.LogLine)) error {
// SetupPodLogStream reads logs from a pod and sends them to the callback function
func (client *Client) SetupPodLogStream(ctx context.Context, podName string, from time.Time, onLog func(deploymentName string, lines []models.LogLine)) error {
makeError := func(err error) error {
return fmt.Errorf("failed to set up log stream for pod %s. details: %w", podName, err)
}

deploymentName := client.getDeploymentName(podName)
if deploymentName == "" {
return makeError(fmt.Errorf("deployment name not found for pod %s", podName))
return makeError(errors.ResourceNotFoundErr)
}

logStream, err := client.getPodLogStream(ctx, client.Namespace, podName, from)
if err != nil {
if IsNotFoundErr(err) {
// Pod got deleted for some reason, so we just stop the log stream
return nil
return makeError(errors.ResourceNotFoundErr)
}

return makeError(err)
Expand All @@ -75,6 +75,16 @@ func (client *Client) SetupLogStream(ctx context.Context, podName string, from t

lines := make([]models.LogLine, 0, 10)
lastPush := time.Now()

// Push logs every 1 seconds or when the buffer is full (10 lines)
pushIfFull := func() {
if len(lines) > 0 || time.Since(lastPush) > 1*time.Second {
onLog(deploymentName, lines)
lines = nil
lastPush = time.Now()
}
}

for {
select {
case <-ctx.Done():
Expand All @@ -100,14 +110,11 @@ func (client *Client) SetupLogStream(ctx context.Context, podName string, from t
CreatedAt: time.Now(),
})

// Push logs every 5 seconds or when the buffer is full (10 lines)
if time.Since(lastPush) > 5*time.Second || len(lines) >= 10 {
onLog(deploymentName, lines)
lines = nil
lastPush = time.Now()
}
pushIfFull()
}

pushIfFull()

time.Sleep(1 * time.Second)
}
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/subsystems/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"go-deploy/pkg/subsystems/k8s/keys"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -17,14 +18,18 @@ const (
PodEventUpdated = "updated"
)

type PodEventType struct {
DeploymentName string
PodName string
Event string
}
// PodExists checks if a pod exists in the cluster.
func (client *Client) PodExists(podName string) (bool, error) {
_, err := client.K8sClient.CoreV1().Pods(client.Namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
if IsNotFoundErr(err) {
return false, nil
}

return false, err
}

func PodEvent(deploymentName, podName, event string) PodEventType {
return PodEventType{DeploymentName: deploymentName, PodName: podName, Event: event}
return true, nil
}

// SetupPodWatcher is a function that sets up a pod watcher with a callback.
Expand Down
21 changes: 20 additions & 1 deletion service/v1/deployments/k8s_service/k8s_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,16 @@ func (c *Client) Repair(id string) error {
return nil
}

// PodExists checks if a pod exists in the cluster.
func (c *Client) PodExists(zone *configModels.Zone, podName string) (bool, error) {
_, kc, _, err := c.Get(OptsOnlyClient(zone))
if err != nil {
return false, err
}

return kc.PodExists(podName)
}

// SetupPodLogStream sets up a log stream for a pod.
func (c *Client) SetupPodLogStream(ctx context.Context, zone *configModels.Zone, podName string, from time.Time, onLog func(deploymentName string, lines []model.Log)) error {
_, kc, _, err := c.Get(OptsOnlyClient(zone))
Expand All @@ -645,7 +655,16 @@ func (c *Client) SetupPodLogStream(ctx context.Context, zone *configModels.Zone,
onLog(deploymentName, lines)
}

return kc.SetupLogStream(ctx, podName, from, handler)
err = kc.SetupPodLogStream(ctx, podName, from, handler)
if err != nil {
if errors.Is(err, kErrors.NotFoundErr) {
return sErrors.DeploymentNotFoundErr
}

return err
}

return nil
}

// SetupPodWatcher sets up a pod watcher for the deployment.
Expand Down

0 comments on commit 9b41bac

Please sign in to comment.