diff --git a/worker/kubernetes.go b/worker/kubernetes.go index b6420948..66121697 100644 --- a/worker/kubernetes.go +++ b/worker/kubernetes.go @@ -7,6 +7,7 @@ import ( "io" "strings" "text/template" + "time" "github.com/ohsu-comp-bio/funnel/tes" v1 "k8s.io/api/batch/v1" @@ -95,8 +96,19 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error { _, err = client.Create(ctx, job, metav1.CreateOptions{}) if err != nil { - // TODO: Retry creating the Exeuctor Pod on failure - return fmt.Errorf("creating job in worker: %v", err) + // Retry creating the Executor Pod on failure + var retryCount int + for retryCount < 3 { + _, err = client.Create(ctx, job, metav1.CreateOptions{}) + if err == nil { + break + } + retryCount++ + time.Sleep(2 * time.Second) + } + if retryCount == 3 { + return fmt.Errorf("creating job in worker after 3 attempts: %v", err) + } } // Wait until the job finishes @@ -114,7 +126,26 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error { podLogs, err := req.Stream(ctx) if err != nil { - // TODO: Retry reading the Executor Logs on failure + // Retry reading the Executor Logs on failure + var retryCount int + for retryCount < 3 { + podLogs, err := req.Stream(ctx) + if err == nil { + defer podLogs.Close() + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err == nil { + var bytes = buf.Bytes() + kcmd.Stdout.Write(bytes) + break + } + } + retryCount++ + time.Sleep(2 * time.Second) + } + if retryCount == 3 { + return fmt.Errorf("failed to read logs after 3 attempts: %v", err) + } return err }