From d56fd523dd5444c807e950a4060ce5fbd8a1e8e3 Mon Sep 17 00:00:00 2001 From: Mickael Stanislas Date: Tue, 22 Oct 2024 09:49:43 +0200 Subject: [PATCH] chore: add event client --- cmd/admission-controller/main.go | 2 +- cmd/kimup/main.go | 2 +- cmd/kimup/scheduler.go | 11 +++++++ cmd/operator/main.go | 2 +- internal/kubeclient/client.go | 21 +++++++++++-- internal/kubeclient/image.go | 41 +++++++++++++++++++------ test/mocks/fakekubeclient/kubeclient.go | 4 +++ 7 files changed, 68 insertions(+), 15 deletions(-) diff --git a/cmd/admission-controller/main.go b/cmd/admission-controller/main.go index 571ea87..7ae5a48 100644 --- a/cmd/admission-controller/main.go +++ b/cmd/admission-controller/main.go @@ -67,7 +67,7 @@ func main() { signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) // kubernetes golang library provide flag "kubeconfig" to specify the path to the kubeconfig file - kubeClient, err = client.New(flag.Lookup("kubeconfig").Value.String()) + kubeClient, err = client.New(flag.Lookup("kubeconfig").Value.String(), client.ComponentAdmissionController) if err != nil { log.WithError(err).Panicf("Error creating kubeclient") } diff --git a/cmd/kimup/main.go b/cmd/kimup/main.go index 51c25ee..63e0270 100644 --- a/cmd/kimup/main.go +++ b/cmd/kimup/main.go @@ -43,7 +43,7 @@ func main() { log.WithField("version", version).Info("Starting kimup", version) // kubernetes golang library provide flag "kubeconfig" to specify the path to the kubeconfig file - k, err := kubeclient.New(flag.Lookup("kubeconfig").Value.String()) + k, err := kubeclient.New(flag.Lookup("kubeconfig").Value.String(), kubeclient.ComponentController) if err != nil { log.WithError(err).Panic("Error creating kubeclient") } diff --git a/cmd/kimup/scheduler.go b/cmd/kimup/scheduler.go index 8124204..3b75d07 100644 --- a/cmd/kimup/scheduler.go +++ b/cmd/kimup/scheduler.go @@ -2,11 +2,13 @@ package main import ( "context" + "fmt" "sync" "time" "github.com/gookit/event" log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" "github.com/orange-cloudavenue/kube-image-updater/internal/actions" @@ -59,6 +61,7 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) { } return err } + k.Image().Event(&image, corev1.EventTypeNormal, "Image update triggered", "") var auths kubeclient.K8sDockerRegistrySecretData @@ -107,10 +110,13 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) { if err != nil { // Prometheus metrics - Increment the counter for the tags with error metrics.Tags().RequestErrorTotal.Inc() + k.Image().Event(&image, corev1.EventTypeWarning, "Fetch image tags", fmt.Sprintf("Error fetching tags: %v", err)) + log.WithError(err).Error("Error fetching tags") return err } metrics.Tags().AvailableSum.WithLabelValues(image.Spec.Image).Observe(float64(len(tagsAvailable))) + k.Image().Event(&image, corev1.EventTypeNormal, "Fetch image tags", fmt.Sprintf("Found %d tags", len(tagsAvailable))) log.Debugf("[RefreshImage] %d tags available for %s", len(tagsAvailable), image.Spec.Image) @@ -142,9 +148,12 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) { metrics.Rules().EvaluatedErrorTotal.Inc() log.Errorf("Error evaluating rule: %v", err) + k.Image().Event(&image, corev1.EventTypeWarning, "Evaluate rule", fmt.Sprintf("Error evaluating rule %s: %v", rule.Type, err)) continue } + k.Image().Event(&image, corev1.EventTypeNormal, "Evaluate rule", fmt.Sprintf("Rule %s evaluated", rule.Type)) + if match { for _, action := range rule.Actions { a, err := actions.GetActionWithUntypedName(action.Type) @@ -173,8 +182,10 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) { metrics.Actions().ExecutedErrorTotal.Inc() log.Errorf("Error executing action(%s): %v", action.Type, err) + k.Image().Event(&image, corev1.EventTypeWarning, "Execute action", fmt.Sprintf("Error executing action %s: %v", action.Type, err)) continue } + k.Image().Event(&image, corev1.EventTypeNormal, "Execute action", fmt.Sprintf("Action %s executed", action.Type)) } log.Debugf("[RefreshImage] Rule %s evaluated: %v -> %s", rule.Type, tag, newTag) } diff --git a/cmd/operator/main.go b/cmd/operator/main.go index c51c493..83d5b99 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -84,7 +84,7 @@ func main() { c <- syscall.SIGINT } - kubeAPIClient, err := kubeclient.NewFromRestConfig(ctrl.GetConfigOrDie()) + kubeAPIClient, err := kubeclient.NewFromRestConfig(ctrl.GetConfigOrDie(), kubeclient.ComponentOperator) if err != nil { log.WithError(err).Error("unable to create kubeclient") c <- syscall.SIGINT diff --git a/internal/kubeclient/client.go b/internal/kubeclient/client.go index fbe11cb..f92d1b4 100644 --- a/internal/kubeclient/client.go +++ b/internal/kubeclient/client.go @@ -22,6 +22,7 @@ var _ Interface = &Client{} type ( Client struct { + component component kubernetes.Interface d dynamic.Interface } @@ -36,12 +37,21 @@ type ( DynamicResource(resource schema.GroupVersionResource) dynamic.NamespaceableResourceInterface GetPullSecretsForImage(ctx context.Context, image v1alpha1.Image) (auths K8sDockerRegistrySecretData, err error) GetValueOrValueFrom(ctx context.Context, namespace string, v v1alpha1.ValueOrValueFrom) (any, error) + GetComponent() string } InterfaceKimup interface { Image() *ImageObj Alert() *AlertObj } + + component string +) + +const ( + ComponentOperator component = "kimup-operator" + ComponentController component = "kimup-controller" + ComponentAdmissionController component = "kimup-admission-controller" ) func init() { @@ -52,17 +62,17 @@ func init() { // New creates a new kubernetes client // kubeConfigPath is the path to the kubeconfig file (empty for in-cluster) -func New(kubeConfigPath string) (Interface, error) { +func New(kubeConfigPath string, c component) (Interface, error) { config, err := getConfig(kubeConfigPath) if err != nil { return nil, err } - return NewFromRestConfig(config) + return NewFromRestConfig(config, c) } // NewFromRestConfig creates a new kubernetes client from a rest config -func NewFromRestConfig(config *rest.Config) (*Client, error) { +func NewFromRestConfig(config *rest.Config, c component) (*Client, error) { client, err := kubernetes.NewForConfig(config) if err != nil { return nil, err @@ -76,6 +86,7 @@ func NewFromRestConfig(config *rest.Config) (*Client, error) { return &Client{ Interface: client, d: dynamicClient, + component: c, }, nil } @@ -138,3 +149,7 @@ func (c *Client) GetPullSecretsForImage(ctx context.Context, image v1alpha1.Imag return auths, nil } + +func (c *Client) GetComponent() string { + return string(c.component) +} diff --git a/internal/kubeclient/image.go b/internal/kubeclient/image.go index bb8badc..fb74efd 100644 --- a/internal/kubeclient/image.go +++ b/internal/kubeclient/image.go @@ -4,10 +4,15 @@ import ( "context" "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" + typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "github.com/orange-cloudavenue/kube-image-updater/api/v1alpha1" "github.com/orange-cloudavenue/kube-image-updater/internal/log" @@ -16,6 +21,7 @@ import ( type ( ImageObj struct { InterfaceKubernetes + record.EventRecorder imageClient dynamic.NamespaceableResourceInterface } ) @@ -26,14 +32,31 @@ func (c *Client) Image() *ImageObj { } func NewImage(k InterfaceKubernetes) *ImageObj { - return &ImageObj{ + i := &ImageObj{ InterfaceKubernetes: k, imageClient: k.DynamicResource(schema.GroupVersionResource{ Group: v1alpha1.GroupVersion.Group, Version: v1alpha1.GroupVersion.Version, Resource: "images", }), + EventRecorder: nil, } + + i.newRecorder() + + return i +} + +func (i *ImageObj) newRecorder() { + scheme := runtime.NewScheme() + utilruntime.Must(v1alpha1.AddToScheme(scheme)) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(4) + eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: i.CoreV1().Events("")}) + i.EventRecorder = eventBroadcaster.NewRecorder(scheme, v1.EventSource{ + Component: i.GetComponent(), + }) } // Get retrieves an Image object by its name within the specified namespace. @@ -41,7 +64,7 @@ func NewImage(k InterfaceKubernetes) *ImageObj { // If the Image is found, it returns a pointer to the Image object and a nil error. // If there is an error during the retrieval process, it returns nil and the error encountered. func (i *ImageObj) Get(ctx context.Context, namespace, name string) (v1alpha1.Image, error) { - u, err := i.imageClient.Namespace(namespace).Get(ctx, name, v1.GetOptions{}) + u, err := i.imageClient.Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return v1alpha1.Image{}, err } @@ -52,14 +75,14 @@ func (i *ImageObj) Get(ctx context.Context, namespace, name string) (v1alpha1.Im // List retrieves a list of images from the specified namespace. // It takes a context, the namespace as a string, and list options. // Returns a pointer to a List of images and an error if the operation fails. -func (i *ImageObj) List(ctx context.Context, namespace string, opts v1.ListOptions) (v1alpha1.ImageList, error) { +func (i *ImageObj) List(ctx context.Context, namespace string, opts metav1.ListOptions) (v1alpha1.ImageList, error) { return i.listImages(ctx, namespace, opts) } // ListAll retrieves a list of images from all namespaces. // It takes a context and list options as parameters. // Returns a pointer to a List of images and an error if the operation fails. -func (i *ImageObj) ListAll(ctx context.Context, opts v1.ListOptions) (v1alpha1.ImageList, error) { +func (i *ImageObj) ListAll(ctx context.Context, opts metav1.ListOptions) (v1alpha1.ImageList, error) { return i.listImages(ctx, "", opts) } @@ -67,7 +90,7 @@ func (i *ImageObj) ListAll(ctx context.Context, opts v1.ListOptions) (v1alpha1.I // It takes a context and a namespace as parameters. // if namespace is empty, it lists all images in all namespaces. // Returns a pointer to a List of images and an error if the operation fails. -func (i *ImageObj) listImages(ctx context.Context, namespace string, opts v1.ListOptions) (v1alpha1.ImageList, error) { +func (i *ImageObj) listImages(ctx context.Context, namespace string, opts metav1.ListOptions) (v1alpha1.ImageList, error) { var ( err error u *unstructured.UnstructuredList @@ -100,7 +123,7 @@ func (i *ImageObj) Update(ctx context.Context, image v1alpha1.Image) error { return err } - _, err = i.imageClient.Namespace(image.Namespace).Update(ctx, u, v1.UpdateOptions{}) + _, err = i.imageClient.Namespace(image.Namespace).Update(ctx, u, metav1.UpdateOptions{}) if err != nil { return err } @@ -112,7 +135,7 @@ func (i *ImageObj) Update(ctx context.Context, image v1alpha1.Image) error { // It takes a context and the image name as parameters. // Returns a pointer to the Image object and an error if the operation fails. func (i *ImageObj) Find(ctx context.Context, namespace, imageName string) (v1alpha1.Image, error) { - images, err := i.listImages(ctx, namespace, v1.ListOptions{}) + images, err := i.listImages(ctx, namespace, metav1.ListOptions{}) if err != nil { return v1alpha1.Image{}, err } @@ -130,7 +153,7 @@ func (i *ImageObj) Find(ctx context.Context, namespace, imageName string) (v1alp // It takes a context and the namespace as parameters. // Returns a channel of WatchInterface[v1alpha1.Image] and an error if the operation fails. func (i *ImageObj) Watch(ctx context.Context) (chan WatchInterface[v1alpha1.Image], error) { - x, err := i.imageClient.Watch(ctx, v1.ListOptions{}) + x, err := i.imageClient.Watch(ctx, metav1.ListOptions{}) if err != nil { return nil, err } diff --git a/test/mocks/fakekubeclient/kubeclient.go b/test/mocks/fakekubeclient/kubeclient.go index 535fa6d..721351c 100644 --- a/test/mocks/fakekubeclient/kubeclient.go +++ b/test/mocks/fakekubeclient/kubeclient.go @@ -44,6 +44,10 @@ func (f *FakeKubeClient) GetValueOrValueFrom(ctx context.Context, namespace stri return args.Get(0), args.Error(1) } +func (f *FakeKubeClient) GetComponent() string { + return "unit-test" +} + func (f *FakeKubeClient) Image() *kubeclient.ImageObj { return kubeclient.NewImage(f) }