From f08c333e7e4e4e19e934c5b23b38da87f6359383 Mon Sep 17 00:00:00 2001 From: ushabelgur Date: Tue, 13 Aug 2024 13:57:07 +0530 Subject: [PATCH] Implement ListEvents Runtime Interface to Support Cross-Cluster Events (#570) --- cmd/volumeprovider/app/app.go | 17 ++ go.mod | 10 +- go.sum | 12 +- internal/controllers/image_controller.go | 17 ++ internal/event/recorder/event_recorder.go | 154 ++++++++++++++++ .../event/recorder/event_recorder_test.go | 174 ++++++++++++++++++ internal/volumeserver/event_list.go | 45 +++++ internal/volumeserver/server.go | 17 +- 8 files changed, 429 insertions(+), 17 deletions(-) create mode 100644 internal/event/recorder/event_recorder.go create mode 100644 internal/event/recorder/event_recorder_test.go create mode 100644 internal/volumeserver/event_list.go diff --git a/cmd/volumeprovider/app/app.go b/cmd/volumeprovider/app/app.go index 305e1ffd..12233b8f 100644 --- a/cmd/volumeprovider/app/app.go +++ b/cmd/volumeprovider/app/app.go @@ -17,6 +17,7 @@ import ( "github.com/ironcore-dev/ceph-provider/internal/controllers" "github.com/ironcore-dev/ceph-provider/internal/encryption" "github.com/ironcore-dev/ceph-provider/internal/event" + eventrecorder "github.com/ironcore-dev/ceph-provider/internal/event/recorder" "github.com/ironcore-dev/ceph-provider/internal/omap" "github.com/ironcore-dev/ceph-provider/internal/strategy" "github.com/ironcore-dev/ceph-provider/internal/vcr" @@ -55,6 +56,8 @@ type CephOptions struct { PopulatorBufferSize int64 KeyEncryptionKeyPath string + + VolumeEventStoreOptions eventrecorder.EventStoreOptions } func (o *Options) Defaults() { @@ -82,6 +85,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Ceph.Pool, "ceph-pool", o.Ceph.Pool, "Ceph pool which is used to store objects.") fs.StringVar(&o.Ceph.Client, "ceph-client", o.Ceph.Client, "Ceph client which grants access to pools/images eg. 'client.volumes'") fs.StringVar(&o.Ceph.KeyEncryptionKeyPath, "ceph-kek-path", o.Ceph.KeyEncryptionKeyPath, "path to the key encryption key file (32 Bit - KEK) to encrypt volume keys.") + fs.IntVar(&o.Ceph.VolumeEventStoreOptions.MaxEvents, "volume-event-max-events", 100, "Maximum number of volume events that can be stored.") + fs.DurationVar(&o.Ceph.VolumeEventStoreOptions.EventTTL, "volume-event-ttl", 5*time.Minute, "Time to live for volume events.") + fs.DurationVar(&o.Ceph.VolumeEventStoreOptions.EventResyncInterval, "volume-event-resync-interval", 1*time.Minute, "Interval for resynchronizing the volume events.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -234,11 +240,14 @@ func Run(ctx context.Context, opts Options) error { return fmt.Errorf("failed to initialize docker registry: %w", err) } + volumeEventStore := eventrecorder.NewEventStore(log, opts.Ceph.VolumeEventStoreOptions) + imageReconciler, err := controllers.NewImageReconciler( log.WithName("image-reconciler"), conn, reg, imageStore, snapshotStore, + volumeEventStore, imageEvents, snapshotEvents, encryptor, @@ -304,6 +313,13 @@ func Run(ctx context.Context, opts Options) error { } }() + wg.Add(1) + go func() { + defer wg.Done() + setupLog.Info("Starting volume events garbage collector") + volumeEventStore.Start(ctx) + }() + supportedClasses, err := vcr.LoadVolumeClassesFile(opts.PathSupportedVolumeClasses) if err != nil { return fmt.Errorf("failed to load supported volume classes: %w", err) @@ -326,6 +342,7 @@ func Run(ctx context.Context, opts Options) error { encryptor, cephCommandClient, volumeserver.Options{ + VolumeEventStore: volumeEventStore, BurstFactor: opts.Ceph.BurstFactor, BurstDurationInSeconds: opts.Ceph.BurstDurationInSeconds, }, diff --git a/go.mod b/go.mod index b167a462..c5a114f2 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,15 @@ module github.com/ironcore-dev/ceph-provider -go 1.22.1 +go 1.22.2 require ( github.com/ceph/go-ceph v0.28.0 github.com/containerd/containerd v1.7.20 github.com/go-logr/logr v1.4.2 + github.com/gogo/protobuf v1.3.2 github.com/google/addlicense v1.1.1 github.com/ironcore-dev/controller-utils v0.9.3 - github.com/ironcore-dev/ironcore v0.1.2-0.20231120144059-30dd02e88870 + github.com/ironcore-dev/ironcore v0.1.3-0.20240724061641-c2f923ef40eb github.com/ironcore-dev/ironcore-image v0.2.1 github.com/kube-object-storage/lib-bucket-provisioner v0.0.0-20221122204822-d1a8c34382f1 github.com/onsi/ginkgo/v2 v2.19.1 @@ -57,7 +58,6 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect @@ -107,7 +107,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect go.opentelemetry.io/otel v1.26.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect go.opentelemetry.io/otel/metric v1.26.0 // indirect go.opentelemetry.io/otel/sdk v1.26.0 // indirect go.opentelemetry.io/otel/trace v1.26.0 // indirect @@ -134,7 +134,7 @@ require ( k8s.io/apiextensions-apiserver v0.30.1 // indirect k8s.io/apiserver v0.30.1 // indirect k8s.io/component-base v0.30.1 // indirect - k8s.io/klog/v2 v2.120.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240322212309-b815d8309940 // indirect oras.land/oras-go v1.2.5 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 // indirect diff --git a/go.sum b/go.sum index d64223a8..7269f6d5 100644 --- a/go.sum +++ b/go.sum @@ -565,8 +565,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ironcore-dev/controller-utils v0.9.3 h1:sTrnxSzX5RrLf4B8KrAH2axSC+gxfJXphkV6df2GSsw= github.com/ironcore-dev/controller-utils v0.9.3/go.mod h1:djKnxDs0Hwxhhc0VmVY8tZnrOrElvrRV2jov/LiCZ2Y= -github.com/ironcore-dev/ironcore v0.1.2-0.20231120144059-30dd02e88870 h1:yAFYucF/zthz9zdgHLbaQ5q9Ee0hFfbNHtxflm/lU5o= -github.com/ironcore-dev/ironcore v0.1.2-0.20231120144059-30dd02e88870/go.mod h1:+rMMEH846ihqB+P55EX/NIvKQs9QDjcIurhRoR3jorU= +github.com/ironcore-dev/ironcore v0.1.3-0.20240724061641-c2f923ef40eb h1:eiZSYhN3tW7P0lN007/vs6hPDzzQx9YVAAVh2QMBU0Y= +github.com/ironcore-dev/ironcore v0.1.3-0.20240724061641-c2f923ef40eb/go.mod h1:OAq8mJxuR76Lq7HQDMwB11cWJrb/76nX1wBrXQfMrOw= github.com/ironcore-dev/ironcore-image v0.2.1 h1:7LsIftIRX5btnSieo1J+xUSnW1tSIsGzYgD6NnpObyY= github.com/ironcore-dev/ironcore-image v0.2.1/go.mod h1:BNaacvN5++9zGiTDJea4vvGDwHvPJE6S9Xb3G7hsFQU= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -893,8 +893,8 @@ go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 h1:1u/AyyOqAWzy+SkPxDpahCNZParHV8Vid1RnI2clyDE= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0= go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8= @@ -1601,8 +1601,8 @@ k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= -k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E= k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o= diff --git a/internal/controllers/image_controller.go b/internal/controllers/image_controller.go index 3b48f876..9cf14232 100644 --- a/internal/controllers/image_controller.go +++ b/internal/controllers/image_controller.go @@ -20,10 +20,12 @@ import ( providerapi "github.com/ironcore-dev/ceph-provider/api" "github.com/ironcore-dev/ceph-provider/internal/encryption" "github.com/ironcore-dev/ceph-provider/internal/event" + eventrecorder "github.com/ironcore-dev/ceph-provider/internal/event/recorder" "github.com/ironcore-dev/ceph-provider/internal/round" "github.com/ironcore-dev/ceph-provider/internal/store" "github.com/ironcore-dev/ceph-provider/internal/utils" "github.com/ironcore-dev/ironcore-image/oci/image" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" ) @@ -46,6 +48,7 @@ func NewImageReconciler( registry image.Source, images store.Store[*providerapi.Image], snapshots store.Store[*providerapi.Snapshot], + eventRecorder eventrecorder.EventRecorder, imageEvents event.Source[*providerapi.Image], snapshotEvents event.Source[*providerapi.Snapshot], keyEncryption encryption.Encryptor, @@ -98,6 +101,7 @@ func NewImageReconciler( queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), images: images, snapshots: snapshots, + EventRecorder: eventRecorder, imageEvents: imageEvents, snapshotEvents: snapshotEvents, monitors: opts.Monitors, @@ -117,6 +121,7 @@ type ImageReconciler struct { images store.Store[*providerapi.Image] snapshots store.Store[*providerapi.Snapshot] + eventrecorder.EventRecorder imageEvents event.Source[*providerapi.Image] snapshotEvents event.Source[*providerapi.Snapshot] @@ -156,6 +161,7 @@ func (r *ImageReconciler) Start(ctx context.Context) error { for _, img := range imageList { if snapshotRef := img.Spec.SnapshotRef; snapshotRef != nil && *snapshotRef == evt.Object.ID { + r.Eventf(log, img.Metadata, corev1.EventTypeNormal, "PulledImage", "Pulled image %s", *img.Spec.SnapshotRef) r.queue.Add(img.ID) } } @@ -226,6 +232,7 @@ func (r *ImageReconciler) deleteImage(ctx context.Context, log logr.Logger, ioCt if _, err := r.images.Update(ctx, image); store.IgnoreErrNotFound(err) != nil { return fmt.Errorf("failed to update image metadata: %w", err) } + r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "CompletedDeletion", "Image deletion completed") log.V(2).Info("Removed Finalizers") return nil @@ -282,6 +289,7 @@ func (r *ImageReconciler) reconcileSnapshot(ctx context.Context, log logr.Logger if err != nil { switch { case errors.Is(err, store.ErrNotFound): + r.Eventf(log, img.Metadata, corev1.EventTypeNormal, "CreateImageSnapshot", "Image snapshot was not found. Creating new snapshot") snap, err = r.snapshots.Create(ctx, &providerapi.Snapshot{ Metadata: providerapi.Metadata{ ID: snapshotDigest, @@ -294,6 +302,7 @@ func (r *ImageReconciler) reconcileSnapshot(ctx context.Context, log logr.Logger }, }) if err != nil { + r.Eventf(log, img.Metadata, corev1.EventTypeWarning, "CreateImageSnapshot", "Create image snapshot failed with error: %s", err) return fmt.Errorf("failed to create snapshot: %w", err) } default: @@ -307,6 +316,7 @@ func (r *ImageReconciler) reconcileSnapshot(ctx context.Context, log logr.Logger return fmt.Errorf("failed to update image snapshot ref: %w", err) } + r.Eventf(log, img.Metadata, corev1.EventTypeNormal, "UpdatedImageSnapshotRef", "Updated image snapshot ref: %s", *img.Spec.SnapshotRef) return nil } @@ -349,6 +359,7 @@ func (r *ImageReconciler) updateImage(ctx context.Context, log logr.Logger, ioCt log.V(2).Info("No update needed: Old and new image size same") return nil case requestedSize < currentImageSize: + r.Eventf(log, image.Metadata, corev1.EventTypeWarning, "UpdateImageSize", "Failed to shrink image: not supported") return fmt.Errorf("failed to shrink image: not supported") } @@ -356,6 +367,7 @@ func (r *ImageReconciler) updateImage(ctx context.Context, log logr.Logger, ioCt return fmt.Errorf("failed to resize image: %w", err) } + r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "UpdatedImageSize", "Image size changed. requestedSize: %d currentSize: %d", requestedSize, currentImageSize) log.V(1).Info("Updated image", "requestedSize", requestedSize, "currentSize", currentImageSize) return nil } @@ -442,6 +454,7 @@ func (r *ImageReconciler) reconcileImage(ctx context.Context, id string) error { } if err := r.setEncryptionHeader(ctx, log, ioCtx, img); err != nil { + r.Eventf(log, img.Metadata, corev1.EventTypeWarning, "SetEncryptionFormat", "Set encryption header failed with error: %s", err) return fmt.Errorf("failed to set encryption header: %w", err) } @@ -488,6 +501,7 @@ func (r *ImageReconciler) setImageLimits(ctx context.Context, log logr.Logger, i } return fmt.Errorf("failed to set limit (%s): %w", limit, err) } + r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "SetImageLimit", "Image limit set. limit: %s value: %d", limit, value) log.V(3).Info("Set image limit", "limit", limit, "value", value) } @@ -554,6 +568,7 @@ func (r *ImageReconciler) setEncryptionHeader(ctx context.Context, log logr.Logg if _, err = r.images.Update(ctx, image); err != nil { return fmt.Errorf("failed to update image encryption state: %w", err) } + r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "ConfiguredEncryption", "Configured encryption") return nil } @@ -562,6 +577,7 @@ func (r *ImageReconciler) createEmptyImage(ctx context.Context, log logr.Logger, if err := librbd.CreateImage(ioCtx, ImageIDToRBDID(image.ID), round.OffBytes(image.Spec.Size), options); err != nil { return fmt.Errorf("failed to create rbd image: %w", err) } + r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "CreatedImage", "Created image. bytes: %d", image.Spec.Size) log.V(2).Info("Created image", "bytes", image.Spec.Size) return nil @@ -611,6 +627,7 @@ func (r *ImageReconciler) createImageFromSnapshot(ctx context.Context, log logr. return false, fmt.Errorf("failed to close rbd image: %w", err) } + r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "ClonedImage", "Cloned image from snapshot. bytes:%d", image.Spec.Size) log.V(2).Info("Cloned image") return true, nil } diff --git a/internal/event/recorder/event_recorder.go b/internal/event/recorder/event_recorder.go new file mode 100644 index 00000000..3d5a7251 --- /dev/null +++ b/internal/event/recorder/event_recorder.go @@ -0,0 +1,154 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package recorder + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + "github.com/gogo/protobuf/proto" + "github.com/ironcore-dev/ceph-provider/api" + irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1" + irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1" + "k8s.io/apimachinery/pkg/util/wait" +) + +// EventRecorder defines an interface for recording events +type EventRecorder interface { + Eventf(log logr.Logger, apiMetadata api.Metadata, eventType string, reason string, messageFormat string, args ...any) +} + +// EventStore defines an interface for listing events +type EventStore interface { + ListEvents() []*irievent.Event +} + +// EventStoreOptions defines options to initialize the event store +type EventStoreOptions struct { + MaxEvents int + EventTTL time.Duration + EventResyncInterval time.Duration +} + +// Store implements the EventRecorder and EventStore interface and represents an in-memory event store with TTL for events. +type Store struct { + maxEvents int // Maximum number of events in the store + events []*irievent.Event // Slice of events + mutex sync.Mutex // Mutex for thread safety + eventTTL time.Duration // TTL for events + eventResyncInterval time.Duration // Resync interval for event store's TTL expiration check + head int // Index of the oldest event + count int // Current number of events in the store + log logr.Logger // Logger for logging overridden events +} + +// NewEventStore creates a new EventStore with a fixed number of events and set TTL for events. +func NewEventStore(log logr.Logger, opts EventStoreOptions) *Store { + return &Store{ + maxEvents: opts.MaxEvents, + events: make([]*irievent.Event, opts.MaxEvents), + eventTTL: opts.EventTTL, + eventResyncInterval: opts.EventResyncInterval, + head: 0, + count: 0, + log: log, + } +} + +// Eventf logs and records an event with formatted message. +func (es *Store) Eventf(log logr.Logger, apiMetadata api.Metadata, eventType, reason, messageFormat string, args ...any) { + metadata, err := api.GetObjectMetadataFromObjectID(apiMetadata) + if err != nil { + log.Error(err, "error getting iri metadata") + return + } + + // Format the message using the provided format and arguments + message := fmt.Sprintf(messageFormat, args...) + + es.recordEvent(metadata, eventType, reason, message) +} + +// recordEvent adds a new Event to the store. Implements the EventRecorder interface. +func (es *Store) recordEvent(metadata *irimeta.ObjectMetadata, eventType, reason, message string) { + es.mutex.Lock() + defer es.mutex.Unlock() + + // Calculate the index where the new event will be inserted + index := (es.head + es.count) % es.maxEvents + + // If the store is full, log and overwrite the oldest event and move the head + if es.count == es.maxEvents { + es.log.V(1).Info("Overriding event", "event", es.events[es.head]) + es.head = (es.head + 1) % es.maxEvents + } else { + es.count++ + } + + event := &irievent.Event{ + Spec: &irievent.EventSpec{ + InvolvedObjectMeta: metadata, + Type: eventType, + Reason: reason, + Message: message, + EventTime: time.Now().Unix(), + }, + } + + es.events[index] = event +} + +// removeExpiredEvents checks and removes events whose TTL has expired. +func (es *Store) removeExpiredEvents() { + es.mutex.Lock() + defer es.mutex.Unlock() + + now := time.Now() + + for es.count > 0 { + index := es.head % es.maxEvents + event := es.events[index] + eventTime := time.Unix(event.Spec.EventTime, 0) + eventTimeWithDuration := eventTime.Add(es.eventTTL) + + if eventTimeWithDuration.After(now) { + break + } + + // Clear the reference to the expired event + es.events[index] = nil + es.head = (es.head + 1) % es.maxEvents + es.count-- + } +} + +// Start initializes and starts the event store's TTL expiration check. +func (es *Store) Start(ctx context.Context) { + wait.UntilWithContext(ctx, func(ctx context.Context) { + es.removeExpiredEvents() + }, es.eventResyncInterval) +} + +// ListEvents returns a copy of all events currently in the store. +func (es *Store) ListEvents() []*irievent.Event { + es.mutex.Lock() + defer es.mutex.Unlock() + + result := make([]*irievent.Event, 0, es.count) + for i := 0; i < es.count; i++ { + index := (es.head + i) % es.maxEvents + // Create a deep copy of the event to break the reference + clone, ok := proto.Clone(es.events[index]).(*irievent.Event) + if !ok { + es.log.Error(fmt.Errorf("failed to clone event: %s", es.events[index]), "assertion error") + continue + } + result = append(result, clone) + } + + return result +} diff --git a/internal/event/recorder/event_recorder_test.go b/internal/event/recorder/event_recorder_test.go new file mode 100644 index 00000000..2a0fc7a1 --- /dev/null +++ b/internal/event/recorder/event_recorder_test.go @@ -0,0 +1,174 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package recorder_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/ironcore-dev/ceph-provider/api" + + "github.com/go-logr/logr" + "github.com/go-logr/logr/funcr" + . "github.com/ironcore-dev/ceph-provider/internal/event/recorder" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestHandler(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Event Recorder Suite") +} + +const ( + maxEvents = 5 + eventTTL = 2 * time.Second + eventType = "TestType" + reason = "TestReason" + message = "TestMessage" + resyncInterval = 2 * time.Second +) + +var ( + logOutput strings.Builder + log logr.Logger + es *Store + apiMetadata = api.Metadata{ + ID: "test-id-1234", + Annotations: map[string]string{ + "ceph-provider.ironcore.dev/annotations": "{\"key1\":\"value1\", \"key2\":\"value2\"}", + "ceph-provider.ironcore.dev/labels": "{\"downward-api.volumepoollet.ironcore.dev/root-volume-namespace\":\"default\", \"downward-api.volumepoollet.ironcore.dev/root-volume-name\":\"volume1\"}", + }} + opts = EventStoreOptions{ + MaxEvents: maxEvents, + EventTTL: eventTTL, + EventResyncInterval: resyncInterval, + } +) + +var _ = Describe("EventStore", func() { + BeforeEach(func() { + logOutput.Reset() + log = funcr.New(func(prefix, args string) { + logOutput.WriteString(args) + }, funcr.Options{}) + + es = NewEventStore(log, opts) + }) + + Context("Initialization", func() { + It("should initialize events slice with no elements", func() { + Expect(es.ListEvents()).To(BeEmpty()) + }) + }) + + Context("AddEvent", func() { + It("should add an event to the store", func() { + es.Eventf(log, apiMetadata, eventType, reason, message) + Expect(logOutput.String()).To(BeEmpty()) + Expect(es.ListEvents()).To(HaveLen(1)) + }) + + It("should handle error when retrieving metadata", func() { + badMetadata := api.Metadata{ + ID: "test-id-1234"} + es.Eventf(log, badMetadata, eventType, reason, message) + Expect(logOutput.String()).To(ContainSubstring("error getting iri metadata")) + Expect(es.ListEvents()).To(HaveLen(0)) + }) + + It("should override the oldest event when the store is full", func() { + for i := 0; i < maxEvents; i++ { + es.Eventf(log, apiMetadata, eventType, reason, fmt.Sprintf("%s %d", message, i)) + Expect(logOutput.String()).To(BeEmpty()) + Expect(es.ListEvents()).To(HaveLen(i + 1)) + } + + es.Eventf(log, apiMetadata, eventType, reason, "New Event") + Expect(logOutput.String()).To(BeEmpty()) + + events := es.ListEvents() + Expect(events).To(HaveLen(maxEvents)) + + for i := 0; i < maxEvents-1; i++ { + Expect(events[i].Spec.Message).To(Equal(fmt.Sprintf("%s %d", message, i+1))) + } + + Expect(events[maxEvents-1].Spec.Message).To(Equal("New Event")) + }) + }) + + Context("removeExpiredEvents", func() { + It("should remove events whose TTL has expired", func() { + es.Eventf(log, apiMetadata, eventType, reason, message) + Expect(logOutput.String()).To(BeEmpty()) + Expect(es.ListEvents()).To(HaveLen(1)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go es.Start(ctx) + + Eventually(func(g Gomega) bool { + return g.Expect(es.ListEvents()).To(HaveLen(0)) + }).WithTimeout(eventTTL + 1*time.Second).WithPolling(100 * time.Millisecond).Should(BeTrue()) + }) + + It("should not remove events whose TTL has not expired", func() { + es.Eventf(log, apiMetadata, eventType, reason, message) + Expect(logOutput.String()).To(BeEmpty()) + Expect(es.ListEvents()).To(HaveLen(1)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go es.Start(ctx) + + Expect(es.ListEvents()).To(HaveLen(1)) + }) + }) + + Context("Start", func() { + It("should periodically remove expired events", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go es.Start(ctx) + + es.Eventf(log, apiMetadata, eventType, reason, message) + Expect(logOutput.String()).To(BeEmpty()) + Expect(es.ListEvents()).To(HaveLen(1)) + + Eventually(func(g Gomega) bool { + return g.Expect(es.ListEvents()).To(HaveLen(0)) + }).WithTimeout(resyncInterval + 1*time.Second).WithPolling(100 * time.Millisecond).Should(BeTrue()) + }) + }) + + Context("ListEvents", func() { + It("should return all current events", func() { + es.Eventf(log, apiMetadata, eventType, reason, message) + Expect(logOutput.String()).To(BeEmpty()) + + events := es.ListEvents() + Expect(events).To(HaveLen(1)) + Expect(events[0].Spec.Message).To(Equal(message)) + }) + + It("should return a copy of events", func() { + es.Eventf(log, apiMetadata, eventType, reason, message) + Expect(logOutput.String()).To(BeEmpty()) + events := es.ListEvents() + Expect(events).To(HaveLen(1)) + + events[0].Spec.Message = "Changed Message" + + storedEvents := es.ListEvents() + Expect(storedEvents[0].Spec.Message).ToNot(Equal(events[0].Spec.Message)) + }) + }) +}) diff --git a/internal/volumeserver/event_list.go b/internal/volumeserver/event_list.go new file mode 100644 index 00000000..f4be5d6f --- /dev/null +++ b/internal/volumeserver/event_list.go @@ -0,0 +1,45 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package volumeserver + +import ( + "context" + + irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1" + iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" + "k8s.io/apimachinery/pkg/labels" +) + +func (s *Server) filterEvents(events []*irievent.Event, filter *iri.EventFilter) []*irievent.Event { + if filter == nil { + return events + } + + var ( + res []*irievent.Event + sel = labels.SelectorFromSet(filter.LabelSelector) + ) + for _, iriEvent := range events { + if !sel.Matches(labels.Set(iriEvent.Spec.InvolvedObjectMeta.Labels)) { + continue + } + + if filter.EventsFromTime > 0 && filter.EventsToTime > 0 { + if iriEvent.Spec.EventTime < filter.EventsFromTime || iriEvent.Spec.EventTime > filter.EventsToTime { + continue + } + } + + res = append(res, iriEvent) + } + return res +} + +func (s *Server) ListEvents(ctx context.Context, req *iri.ListEventsRequest) (*iri.ListEventsResponse, error) { + iriEvents := s.filterEvents(s.volumeEventStore.ListEvents(), req.Filter) + + return &iri.ListEventsResponse{ + Events: iriEvents, + }, nil +} diff --git a/internal/volumeserver/server.go b/internal/volumeserver/server.go index 1ace74be..4c1b23d2 100644 --- a/internal/volumeserver/server.go +++ b/internal/volumeserver/server.go @@ -10,6 +10,7 @@ import ( "github.com/ironcore-dev/ceph-provider/api" "github.com/ironcore-dev/ceph-provider/internal/ceph" "github.com/ironcore-dev/ceph-provider/internal/encryption" + "github.com/ironcore-dev/ceph-provider/internal/event/recorder" "github.com/ironcore-dev/ceph-provider/internal/store" "github.com/ironcore-dev/ironcore/broker/common/idgen" iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" @@ -24,8 +25,9 @@ type VolumeClassRegistry interface { type Server struct { idGen idgen.IDGen - imageStore store.Store[*api.Image] - snapshotStore store.Store[*api.Snapshot] + imageStore store.Store[*api.Image] + snapshotStore store.Store[*api.Snapshot] + volumeEventStore recorder.EventStore volumeClasses VolumeClassRegistry cephCommandClient ceph.Command @@ -45,6 +47,8 @@ type Options struct { BurstFactor int64 BurstDurationInSeconds int64 + + VolumeEventStore recorder.EventStore } func setOptionsDefaults(o *Options) { @@ -66,10 +70,11 @@ func New(imageStore store.Store[*api.Image], setOptionsDefaults(&opts) return &Server{ - idGen: opts.IDGen, - imageStore: imageStore, - snapshotStore: snapshotStore, - volumeClasses: volumeClassRegistry, + idGen: opts.IDGen, + imageStore: imageStore, + snapshotStore: snapshotStore, + volumeEventStore: opts.VolumeEventStore, + volumeClasses: volumeClassRegistry, keyEncryption: keyEncryption, cephCommandClient: cephCommandClient,