Skip to content

Commit

Permalink
chore: refacto event manager between operator & kimup (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
azrod authored Oct 1, 2024
1 parent a87f4f2 commit a52ec04
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 77 deletions.
73 changes: 73 additions & 0 deletions cmd/kimup/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"context"

log "github.com/sirupsen/logrus"

"github.com/orange-cloudavenue/kube-image-updater/api/v1alpha1"
"github.com/orange-cloudavenue/kube-image-updater/internal/actions"
"github.com/orange-cloudavenue/kube-image-updater/internal/annotations"
"github.com/orange-cloudavenue/kube-image-updater/internal/triggers"
"github.com/orange-cloudavenue/kube-image-updater/internal/triggers/crontab"
)

func setupTriggers(x *v1alpha1.Image) {
// * Triggers
for _, trigger := range x.Spec.Triggers {
switch trigger.Type {
case triggers.Crontab:
if ok, err := crontab.IsExistingJob(crontab.BuildKey(x.Namespace, x.Name)); err != nil || !ok {
if err := crontab.AddCronTab(x.Namespace, x.Name, trigger.Value); err != nil {
log.Errorf("Error adding cronjob: %v", err)
}
}
case triggers.Webhook:
log.Info("Webhook trigger not implemented yet")
}
}
}

func cleanTriggers(x *v1alpha1.Image) {
for _, trigger := range x.Spec.Triggers {
switch trigger.Type {
case triggers.Crontab:
if ok, err := crontab.IsExistingJob(crontab.BuildKey(x.Namespace, x.Name)); err != nil || ok {
if err := crontab.RemoveJob(crontab.BuildKey(x.Namespace, x.Name)); err != nil {
log.Errorf("Error removing crontab: %v", err)
}
}
case triggers.Webhook:
log.Info("Webhook trigger not implemented yet")
}
}
}

func refreshIfRequired(an annotations.Annotation, image v1alpha1.Image) {
if an.Action().Get() == annotations.ActionRefresh {
// * Here is only if the image has annotations.ActionRefresh
log.Infof("[Fire] Annotation trigger refresh for image %s in namespace %s", image.Name, image.Namespace)
_, err := triggers.Trigger(triggers.RefreshImage, image.Namespace, image.Name)
if err != nil {
log.Errorf("Error triggering event: %v", err)
}
an.Remove(annotations.KeyAction)
}
}

func setTagIfNotExists(ctx context.Context, an annotations.Annotation, image *v1alpha1.Image) error {
if an.Tag().IsNull() {
a, err := actions.GetAction(actions.Apply)
if err != nil {
return err
}

a.Init(image.Status.Tag, image.Spec.BaseTag, image)

if err := a.Execute(ctx); err != nil {
return err
}
}

return nil
}
109 changes: 56 additions & 53 deletions cmd/kimup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/orange-cloudavenue/kube-image-updater/internal/annotations"
"github.com/orange-cloudavenue/kube-image-updater/internal/kubeclient"
"github.com/orange-cloudavenue/kube-image-updater/internal/triggers"
"github.com/orange-cloudavenue/kube-image-updater/internal/triggers/crontab"
"github.com/orange-cloudavenue/kube-image-updater/internal/utils"
)

Expand All @@ -25,6 +24,7 @@ var (

func init() {
flag.String("loglevel", "info", "log level (debug, info, warn, error, fatal, panic)")
// TODO add namespace scope
flag.Parse()

log.SetLevel(utils.ParseLogLevel(flag.Lookup("loglevel").Value.String()))
Expand All @@ -42,76 +42,79 @@ func main() {
log.Panicf("Error creating kubeclient: %v", err)
}

initScheduler(k)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

initScheduler(ctx, k)

go func() {
x, err := k.WatchEventsImage(ctx)
if err != nil {
log.Panicf("Error watching events: %v", err)
}

for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
select {
case <-ctx.Done():
return
case event, ok := <-x:
if !ok {
return
}

images, err := k.ListAllImages(ctx)
if err != nil {
log.Errorf("Error listing images: %v", err)
continue
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

for _, image := range images.Items {
an := annotations.New(ctx, &image)
switch an.Action().Get() {
case annotations.ActionReload:
// * Here is only if the yaml has been updated and the operator has detected it

log.Infof("Image configuration %s in namespace %s has changed", image.Name, image.Namespace)

for _, trigger := range image.Spec.Triggers {
switch trigger.Type {
case triggers.Crontab:
if ok, err := crontab.IsExistingJob(crontab.BuildKey(image.Namespace, image.Name)); err != nil || ok {
if err := crontab.RemoveJob(crontab.BuildKey(image.Namespace, image.Name)); err != nil {
log.Errorf("Error removing crontab: %v", err)
}
}
case triggers.Webhook:
log.Info("Webhook trigger not implemented yet")
}
}
an := annotations.New(ctx, event.Value)

// Remove the annotation annotations.AnnotationActionKey in the map[string]string
switch event.Type {
case "ADDED":
// Clean old action
an.Remove(annotations.KeyAction)

case annotations.ActionRefresh:
// * Here is only if the image has annotations.ActionRefresh
// Trigger the refresh of the image is deferred to the end of the loop to avoid Update kube API call
log.Infof("[Fire] Annotation trigger refresh for image %s in namespace %s", image.Name, image.Namespace)
_, err := triggers.Trigger(triggers.RefreshImage, image.Namespace, image.Name)
if err != nil {
log.Errorf("Error triggering event: %v", err)
setupTriggers(event.Value)
refreshIfRequired(an, *event.Value)
if err := setTagIfNotExists(ctx, an, event.Value); err != nil {
log.Errorf("Error setting tag: %v", err)
}
an.Remove(annotations.KeyAction)
}

if err := k.SetImage(ctx, image); err != nil {
log.Errorf("Error updating image: %v", err)
}
if err := k.SetImage(ctx, *event.Value); err != nil {
log.Errorf("Error updating image: %v", err)
}

// * Triggers
for _, trigger := range image.Spec.Triggers {
switch trigger.Type {
case triggers.Crontab:
if ok, err := crontab.IsExistingJob(crontab.BuildKey(image.Namespace, image.Name)); err != nil || !ok {
if err := crontab.AddCronTab(image.Namespace, image.Name, trigger.Value); err != nil {
log.Errorf("Error adding cronjob: %v", err)
case "MODIFIED":
switch an.Action().Get() { //nolint:gocritic
case annotations.ActionReload:

// * Here is only if the yaml has been updated and the operator has detected it
for _, trigger := range event.Value.Spec.Triggers {
switch trigger.Type {
case triggers.Crontab:
cleanTriggers(event.Value)
case triggers.Webhook:
log.Info("Webhook trigger not implemented yet")
}
}
case triggers.Webhook:
log.Info("Webhook trigger not implemented yet")

// Remove the annotation annotations.AnnotationActionKey in the map[string]string
an.Remove(annotations.KeyAction)
}

refreshIfRequired(an, *event.Value)

if err := k.SetImage(ctx, *event.Value); err != nil {
log.Errorf("Error updating image: %v", err)
}

setupTriggers(event.Value)

case "DELETED":
cleanTriggers(event.Value)
}
}

time.Sleep(2 * time.Second)
}
}()

<-c
cancel()
}
20 changes: 16 additions & 4 deletions cmd/kimup/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"sync"
"time"

"github.com/gookit/event"
Expand All @@ -17,16 +18,27 @@ import (
"github.com/orange-cloudavenue/kube-image-updater/internal/utils"
)

func initScheduler(k *kubeclient.Client) {
// Start Crontab client
crontab.New(context.Background())
type locks map[string]*sync.RWMutex

func initScheduler(ctx context.Context, k *kubeclient.Client) {
l := make(locks)

// Start Crontab client
crontab.New(ctx)
event.On(triggers.RefreshImage.String(), event.ListenerFunc(func(e event.Event) (err error) {
if l[e.Data()["namespace"].(string)+"/"+e.Data()["image"].(string)] == nil {
l[e.Data()["namespace"].(string)+"/"+e.Data()["image"].(string)] = &sync.RWMutex{}
}

// Lock the image to prevent concurrent refreshes
l[e.Data()["namespace"].(string)+"/"+e.Data()["image"].(string)].Lock()
defer l[e.Data()["namespace"].(string)+"/"+e.Data()["image"].(string)].Unlock()

retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// TODO: implement image refresh
log.Infof("Refreshing image %s in namespace %s", e.Data()["image"], e.Data()["namespace"])

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

image, err := k.GetImage(ctx, e.Data()["namespace"].(string), e.Data()["image"].(string))
Expand Down
5 changes: 3 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ func main() {
}

if err = (&controller.ImageReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("kimup-operator"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Image")
os.Exit(1)
Expand Down
32 changes: 21 additions & 11 deletions internal/controller/image_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -32,9 +33,16 @@ import (
// ImageReconciler reconciles a Image object
type ImageReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
Recorder record.EventRecorder
}

type ImageEvent string

const (
ImageUpdate ImageEvent = "ImageUpdate"
)

// +kubebuilder:rbac:groups=kimup.cloudavenue.io,resources=images,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kimup.cloudavenue.io,resources=images/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=kimup.cloudavenue.io,resources=images/finalizers,verbs=update
Expand Down Expand Up @@ -62,19 +70,21 @@ func (r *ImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

log.Log.Info(fmt.Sprintf("Reconciling Image %s in namespace %s", req.Name, req.Namespace))

ok, err := an.CheckSum().IsEqual(image.Spec)
if err != nil || !ok {
equal, err := an.CheckSum().IsEqual(image.Spec)
if err != nil || !equal {
an.Action().Set(annotations.ActionReload)
r.Recorder.Event(&image, "Normal", string(ImageUpdate), "Image configuration has changed. Reloading image.")
}

if err := an.CheckSum().Set(image.Spec); err != nil {
log.Log.Error(err, "unable to refresh checksum")
return ctrl.Result{}, err
}

if err := r.Client.Update(ctx, &image); err != nil {
log.Log.Error(err, "unable to update Image")
return ctrl.Result{}, err
if an.CheckSum().IsNull() || !equal {
if err := an.CheckSum().Set(image.Spec); err != nil {
log.Log.Error(err, "unable to refresh checksum")
return ctrl.Result{}, err
}
if err := r.Client.Update(ctx, &image); err != nil {
log.Log.Error(err, "unable to update Image")
return ctrl.Result{}, err
}
}

// * Status
Expand Down
35 changes: 28 additions & 7 deletions internal/kubeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,28 @@ func (c *Client) cImage() dynamic.NamespaceableResourceInterface {
})
}

type UnstructuredFunc interface {
UnstructuredContent() map[string]interface{}
}

func decodeUnstructured[T any](v UnstructuredFunc) (t T, err error) {
if err := runtime.DefaultUnstructuredConverter.
FromUnstructured(v.UnstructuredContent(), &t); err != nil {
return t, fmt.Errorf("failed to convert resource: %w", err)
}

return
}

func encodeUnstructured[T any](t T) (*unstructured.Unstructured, error) {
x, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&t)
if err != nil {
return nil, fmt.Errorf("failed to convert resource: %w", err)
}

return &unstructured.Unstructured{Object: x}, nil
}

func (c *Client) listImages(ctx context.Context, namespace string) (list v1alpha1.ImageList, err error) {
var v *unstructured.UnstructuredList

Expand All @@ -105,8 +127,8 @@ func (c *Client) listImages(ctx context.Context, namespace string) (list v1alpha
return list, fmt.Errorf("failed to list resources: %w", err)
}

if err := runtime.DefaultUnstructuredConverter.
FromUnstructured(v.UnstructuredContent(), &list); err != nil {
list, err = decodeUnstructured[v1alpha1.ImageList](v)
if err != nil {
return list, fmt.Errorf("failed to convert resource: %w", err)
}

Expand Down Expand Up @@ -138,8 +160,8 @@ func (c *Client) GetImage(ctx context.Context, namespace, name string) (image v1
return image, fmt.Errorf("failed to get resource: %w", err)
}

if err := runtime.DefaultUnstructuredConverter.
FromUnstructured(v.UnstructuredContent(), &image); err != nil {
image, err = decodeUnstructured[v1alpha1.Image](v)
if err != nil {
return image, fmt.Errorf("failed to convert resource: %w", err)
}

Expand All @@ -148,13 +170,12 @@ func (c *Client) GetImage(ctx context.Context, namespace, name string) (image v1

// SetImage sets an image in a namespace
func (c *Client) SetImage(ctx context.Context, image v1alpha1.Image) (err error) {
unstructedImage, err := runtime.DefaultUnstructuredConverter.
ToUnstructured(&image)
unstructedImage, err := encodeUnstructured(image)
if err != nil {
return fmt.Errorf("failed to convert resource: %w", err)
}

_, err = c.cImage().Namespace(image.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructedImage}, metav1.UpdateOptions{})
_, err = c.cImage().Namespace(image.Namespace).Update(ctx, unstructedImage, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update resource: %w", err)
}
Expand Down
Loading

0 comments on commit a52ec04

Please sign in to comment.