diff --git a/.changelog/109.txt b/.changelog/109.txt new file mode 100644 index 0000000..753a6b1 --- /dev/null +++ b/.changelog/109.txt @@ -0,0 +1,3 @@ +```release-note:bug +`kimup-controller` - Now the application restart if the kubernetes watch connection is lost. +``` \ No newline at end of file diff --git a/cmd/kimup/main.go b/cmd/kimup/main.go index 367436d..2111e29 100644 --- a/cmd/kimup/main.go +++ b/cmd/kimup/main.go @@ -71,7 +71,7 @@ func main() { initScheduler(ctx, k) go func() { - x, err := k.Image().Watch(ctx) + x, xErr, err := k.Image().Watch(ctx) if err != nil { log.WithError(err).Panic("Error watching events") } @@ -80,6 +80,13 @@ func main() { select { case <-ctx.Done(): return + case err, ok := <-xErr: + if ok { + log.WithError(err).Error("Error watching events from kubernetes") + } + // Exit the program + c <- syscall.SIGINT + return case event, ok := <-x: if !ok { return @@ -135,6 +142,10 @@ func main() { case "DELETED": cleanTriggers(&event.Value) + case "ERROR": + log.Error("Error watching events from kubernetes") + // exit the program + c <- syscall.SIGINT } } } diff --git a/internal/kubeclient/image.go b/internal/kubeclient/image.go index 4428824..7840808 100644 --- a/internal/kubeclient/image.go +++ b/internal/kubeclient/image.go @@ -152,24 +152,28 @@ func (i *ImageObj) Find(ctx context.Context, namespace, imageName string) (v1alp // Watch watches for changes to the image object. // It takes a context and the namespace as parameters. // Returns a channel of WatchInterface[v1alpha1.Image] and an error if the operation fails. -func (i *ImageObj) Watch(ctx context.Context) (chan WatchInterface[v1alpha1.Image], error) { +func (i *ImageObj) Watch(ctx context.Context) (chan WatchInterface[v1alpha1.Image], chan error, error) { x, err := i.imageClient.Watch(ctx, metav1.ListOptions{}) if err != nil { - return nil, err + return nil, nil, err } ch := make(chan WatchInterface[v1alpha1.Image]) - + chErr := make(chan error) go func() { + defer close(chErr) + for { select { case <-ctx.Done(): close(ch) x.Stop() + chErr <- fmt.Errorf("watch channel closed") return case event, ok := <-x.ResultChan(): if !ok { close(ch) + chErr <- fmt.Errorf("watch channel closed") return } @@ -184,7 +188,7 @@ func (i *ImageObj) Watch(ctx context.Context) (chan WatchInterface[v1alpha1.Imag } }() - return ch, nil + return ch, chErr, nil } // UpdateStatus updates the status of the image object.