From 903f351451c05141f00aa9f24bfb502bf3fc7d16 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Mon, 5 Aug 2024 12:16:54 +0200 Subject: [PATCH] Sync previous container logs --- pkg/schema/v1/container.go | 66 +++++++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 8 deletions(-) diff --git a/pkg/schema/v1/container.go b/pkg/schema/v1/container.go index da199abf..605c1856 100644 --- a/pkg/schema/v1/container.go +++ b/pkg/schema/v1/container.go @@ -252,7 +252,50 @@ func (cl *ContainerLog) Upsert() interface{} { // syncContainerLogs fetches the logs from the kubernetes API for the given container and syncs to the database. func (cl *ContainerLog) syncContainerLogs(ctx context.Context, clientset *kubernetes.Clientset, db *database.Database) error { - logOptions := &kcorev1.PodLogOptions{Container: cl.ContainerName} + prevLogOptions := &kcorev1.PodLogOptions{ + Container: cl.ContainerName, + Previous: true, + Timestamps: true, + } + if !cl.LastUpdate.Time().IsZero() { + sinceSeconds := int64(time.Since(cl.LastUpdate.Time()).Seconds()) + prevLogOptions.SinceSeconds = &sinceSeconds + if cl.ContainerName == "tester" { + fmt.Println("SinceSeconds: ", sinceSeconds) + } + } + prevReq := clientset.CoreV1().Pods(cl.Namespace).GetLogs(cl.PodName, prevLogOptions) + + prevBody, err := prevReq.Stream(ctx) + if err != nil { + return err + } + defer func() { _ = prevBody.Close() }() + + prevLogs, err := io.ReadAll(prevBody) + if err != nil { + return err + } + + if len(prevLogs) > 0 { + cl.Logs = truncate(cl.Logs+string(prevLogs), MaxLogLength) + + logLines := strings.Split(string(prevLogs), "\n") + lastLogLine := logLines[len(logLines)-1] + if lastLogLine == "" || lastLogLine == "\n" { + lastLogLine = logLines[len(logLines)-2] + } + if lastLogLine != "" { + ts := strings.Fields(lastLogLine)[0] + restartAtMessage := fmt.Sprintf("\nContainer %s restarted at %s\n\n", cl.ContainerName, ts) + cl.Logs = truncate(cl.Logs+restartAtMessage, MaxLogLength) + } + } + + logOptions := &kcorev1.PodLogOptions{ + Container: cl.ContainerName, + Timestamps: true, + } if !cl.LastUpdate.Time().IsZero() { sinceSeconds := int64(time.Since(cl.LastUpdate.Time()).Seconds()) logOptions.SinceSeconds = &sinceSeconds @@ -266,17 +309,24 @@ func (cl *ContainerLog) syncContainerLogs(ctx context.Context, clientset *kubern defer func() { _ = body.Close() }() logs, err := io.ReadAll(body) - if err != nil || len(logs) == 0 { + if err != nil { return err } - cl.LastUpdate = types.UnixMilli(time.Now()) - cl.Logs = truncate(cl.Logs+string(logs), MaxLogLength) - entities := make(chan interface{}, 1) - entities <- cl - close(entities) + if len(logs) > 0 { + cl.Logs = truncate(cl.Logs+string(logs), MaxLogLength) + } + + if len(logs) > 0 || len(prevLogs) > 0 { + cl.LastUpdate = types.UnixMilli(time.Now()) + entities := make(chan interface{}, 1) + entities <- cl + close(entities) + + return db.UpsertStreamed(ctx, entities) + } - return db.UpsertStreamed(ctx, entities) + return nil } func GetContainerState(container kcorev1.Container, status kcorev1.ContainerStatus) (IcingaState, string) {