Skip to content

Commit

Permalink
chore: add event client
Browse files Browse the repository at this point in the history
  • Loading branch information
azrod committed Oct 22, 2024
1 parent d55cd0f commit d56fd52
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/admission-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)

// kubernetes golang library provide flag "kubeconfig" to specify the path to the kubeconfig file
kubeClient, err = client.New(flag.Lookup("kubeconfig").Value.String())
kubeClient, err = client.New(flag.Lookup("kubeconfig").Value.String(), client.ComponentAdmissionController)
if err != nil {
log.WithError(err).Panicf("Error creating kubeclient")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kimup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func main() {
log.WithField("version", version).Info("Starting kimup", version)

// kubernetes golang library provide flag "kubeconfig" to specify the path to the kubeconfig file
k, err := kubeclient.New(flag.Lookup("kubeconfig").Value.String())
k, err := kubeclient.New(flag.Lookup("kubeconfig").Value.String(), kubeclient.ComponentController)
if err != nil {
log.WithError(err).Panic("Error creating kubeclient")
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/kimup/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/gookit/event"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"

"github.com/orange-cloudavenue/kube-image-updater/internal/actions"
Expand Down Expand Up @@ -59,6 +61,7 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) {
}
return err
}
k.Image().Event(&image, corev1.EventTypeNormal, "Image update triggered", "")

var auths kubeclient.K8sDockerRegistrySecretData

Expand Down Expand Up @@ -107,10 +110,13 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) {
if err != nil {
// Prometheus metrics - Increment the counter for the tags with error
metrics.Tags().RequestErrorTotal.Inc()
k.Image().Event(&image, corev1.EventTypeWarning, "Fetch image tags", fmt.Sprintf("Error fetching tags: %v", err))
log.WithError(err).Error("Error fetching tags")
return err
}

metrics.Tags().AvailableSum.WithLabelValues(image.Spec.Image).Observe(float64(len(tagsAvailable)))
k.Image().Event(&image, corev1.EventTypeNormal, "Fetch image tags", fmt.Sprintf("Found %d tags", len(tagsAvailable)))

log.Debugf("[RefreshImage] %d tags available for %s", len(tagsAvailable), image.Spec.Image)

Expand Down Expand Up @@ -142,9 +148,12 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) {
metrics.Rules().EvaluatedErrorTotal.Inc()

log.Errorf("Error evaluating rule: %v", err)
k.Image().Event(&image, corev1.EventTypeWarning, "Evaluate rule", fmt.Sprintf("Error evaluating rule %s: %v", rule.Type, err))
continue
}

k.Image().Event(&image, corev1.EventTypeNormal, "Evaluate rule", fmt.Sprintf("Rule %s evaluated", rule.Type))

if match {
for _, action := range rule.Actions {
a, err := actions.GetActionWithUntypedName(action.Type)
Expand Down Expand Up @@ -173,8 +182,10 @@ func initScheduler(ctx context.Context, k kubeclient.Interface) {
metrics.Actions().ExecutedErrorTotal.Inc()

log.Errorf("Error executing action(%s): %v", action.Type, err)
k.Image().Event(&image, corev1.EventTypeWarning, "Execute action", fmt.Sprintf("Error executing action %s: %v", action.Type, err))
continue
}
k.Image().Event(&image, corev1.EventTypeNormal, "Execute action", fmt.Sprintf("Action %s executed", action.Type))
}
log.Debugf("[RefreshImage] Rule %s evaluated: %v -> %s", rule.Type, tag, newTag)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
c <- syscall.SIGINT
}

kubeAPIClient, err := kubeclient.NewFromRestConfig(ctrl.GetConfigOrDie())
kubeAPIClient, err := kubeclient.NewFromRestConfig(ctrl.GetConfigOrDie(), kubeclient.ComponentOperator)
if err != nil {
log.WithError(err).Error("unable to create kubeclient")
c <- syscall.SIGINT
Expand Down
21 changes: 18 additions & 3 deletions internal/kubeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ Interface = &Client{}

type (
Client struct {
component component
kubernetes.Interface
d dynamic.Interface
}
Expand All @@ -36,12 +37,21 @@ type (
DynamicResource(resource schema.GroupVersionResource) dynamic.NamespaceableResourceInterface
GetPullSecretsForImage(ctx context.Context, image v1alpha1.Image) (auths K8sDockerRegistrySecretData, err error)
GetValueOrValueFrom(ctx context.Context, namespace string, v v1alpha1.ValueOrValueFrom) (any, error)
GetComponent() string
}

InterfaceKimup interface {
Image() *ImageObj
Alert() *AlertObj
}

component string
)

const (
ComponentOperator component = "kimup-operator"
ComponentController component = "kimup-controller"
ComponentAdmissionController component = "kimup-admission-controller"
)

func init() {
Expand All @@ -52,17 +62,17 @@ func init() {

// New creates a new kubernetes client
// kubeConfigPath is the path to the kubeconfig file (empty for in-cluster)
func New(kubeConfigPath string) (Interface, error) {
func New(kubeConfigPath string, c component) (Interface, error) {
config, err := getConfig(kubeConfigPath)
if err != nil {
return nil, err
}

return NewFromRestConfig(config)
return NewFromRestConfig(config, c)
}

// NewFromRestConfig creates a new kubernetes client from a rest config
func NewFromRestConfig(config *rest.Config) (*Client, error) {
func NewFromRestConfig(config *rest.Config, c component) (*Client, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
Expand All @@ -76,6 +86,7 @@ func NewFromRestConfig(config *rest.Config) (*Client, error) {
return &Client{
Interface: client,
d: dynamicClient,
component: c,
}, nil
}

Expand Down Expand Up @@ -138,3 +149,7 @@ func (c *Client) GetPullSecretsForImage(ctx context.Context, image v1alpha1.Imag

return auths, nil
}

func (c *Client) GetComponent() string {
return string(c.component)
}
41 changes: 32 additions & 9 deletions internal/kubeclient/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"fmt"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"github.com/orange-cloudavenue/kube-image-updater/api/v1alpha1"
"github.com/orange-cloudavenue/kube-image-updater/internal/log"
Expand All @@ -16,6 +21,7 @@ import (
type (
ImageObj struct {
InterfaceKubernetes
record.EventRecorder
imageClient dynamic.NamespaceableResourceInterface
}
)
Expand All @@ -26,22 +32,39 @@ func (c *Client) Image() *ImageObj {
}

func NewImage(k InterfaceKubernetes) *ImageObj {
return &ImageObj{
i := &ImageObj{
InterfaceKubernetes: k,
imageClient: k.DynamicResource(schema.GroupVersionResource{
Group: v1alpha1.GroupVersion.Group,
Version: v1alpha1.GroupVersion.Version,
Resource: "images",
}),
EventRecorder: nil,
}

i.newRecorder()

return i
}

func (i *ImageObj) newRecorder() {
scheme := runtime.NewScheme()
utilruntime.Must(v1alpha1.AddToScheme(scheme))

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(4)
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: i.CoreV1().Events("")})
i.EventRecorder = eventBroadcaster.NewRecorder(scheme, v1.EventSource{
Component: i.GetComponent(),
})
}

// Get retrieves an Image object by its name within the specified namespace.
// It takes a context, the namespace, and the name of the Image as parameters.
// If the Image is found, it returns a pointer to the Image object and a nil error.
// If there is an error during the retrieval process, it returns nil and the error encountered.
func (i *ImageObj) Get(ctx context.Context, namespace, name string) (v1alpha1.Image, error) {
u, err := i.imageClient.Namespace(namespace).Get(ctx, name, v1.GetOptions{})
u, err := i.imageClient.Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return v1alpha1.Image{}, err
}
Expand All @@ -52,22 +75,22 @@ func (i *ImageObj) Get(ctx context.Context, namespace, name string) (v1alpha1.Im
// List retrieves a list of images from the specified namespace.
// It takes a context, the namespace as a string, and list options.
// Returns a pointer to a List of images and an error if the operation fails.
func (i *ImageObj) List(ctx context.Context, namespace string, opts v1.ListOptions) (v1alpha1.ImageList, error) {
func (i *ImageObj) List(ctx context.Context, namespace string, opts metav1.ListOptions) (v1alpha1.ImageList, error) {
return i.listImages(ctx, namespace, opts)
}

// ListAll retrieves a list of images from all namespaces.
// It takes a context and list options as parameters.
// Returns a pointer to a List of images and an error if the operation fails.
func (i *ImageObj) ListAll(ctx context.Context, opts v1.ListOptions) (v1alpha1.ImageList, error) {
func (i *ImageObj) ListAll(ctx context.Context, opts metav1.ListOptions) (v1alpha1.ImageList, error) {
return i.listImages(ctx, "", opts)
}

// listImages lists all images
// It takes a context and a namespace as parameters.
// if namespace is empty, it lists all images in all namespaces.
// Returns a pointer to a List of images and an error if the operation fails.
func (i *ImageObj) listImages(ctx context.Context, namespace string, opts v1.ListOptions) (v1alpha1.ImageList, error) {
func (i *ImageObj) listImages(ctx context.Context, namespace string, opts metav1.ListOptions) (v1alpha1.ImageList, error) {
var (
err error
u *unstructured.UnstructuredList
Expand Down Expand Up @@ -100,7 +123,7 @@ func (i *ImageObj) Update(ctx context.Context, image v1alpha1.Image) error {
return err
}

_, err = i.imageClient.Namespace(image.Namespace).Update(ctx, u, v1.UpdateOptions{})
_, err = i.imageClient.Namespace(image.Namespace).Update(ctx, u, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -112,7 +135,7 @@ func (i *ImageObj) Update(ctx context.Context, image v1alpha1.Image) error {
// It takes a context and the image name as parameters.
// Returns a pointer to the Image object and an error if the operation fails.
func (i *ImageObj) Find(ctx context.Context, namespace, imageName string) (v1alpha1.Image, error) {
images, err := i.listImages(ctx, namespace, v1.ListOptions{})
images, err := i.listImages(ctx, namespace, metav1.ListOptions{})
if err != nil {
return v1alpha1.Image{}, err
}
Expand All @@ -130,7 +153,7 @@ func (i *ImageObj) Find(ctx context.Context, namespace, imageName string) (v1alp
// It takes a context and the namespace as parameters.
// Returns a channel of WatchInterface[v1alpha1.Image] and an error if the operation fails.
func (i *ImageObj) Watch(ctx context.Context) (chan WatchInterface[v1alpha1.Image], error) {
x, err := i.imageClient.Watch(ctx, v1.ListOptions{})
x, err := i.imageClient.Watch(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/fakekubeclient/kubeclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (f *FakeKubeClient) GetValueOrValueFrom(ctx context.Context, namespace stri
return args.Get(0), args.Error(1)
}

func (f *FakeKubeClient) GetComponent() string {
return "unit-test"
}

func (f *FakeKubeClient) Image() *kubeclient.ImageObj {
return kubeclient.NewImage(f)
}
Expand Down

0 comments on commit d56fd52

Please sign in to comment.