Skip to content

Commit

Permalink
Implement ListEvents Runtime Interface to Support Cross-Cluster Events (
Browse files Browse the repository at this point in the history
  • Loading branch information
ushabelgur authored Aug 13, 2024
1 parent 1c79b7b commit f08c333
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 17 deletions.
17 changes: 17 additions & 0 deletions cmd/volumeprovider/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ironcore-dev/ceph-provider/internal/controllers"
"github.com/ironcore-dev/ceph-provider/internal/encryption"
"github.com/ironcore-dev/ceph-provider/internal/event"
eventrecorder "github.com/ironcore-dev/ceph-provider/internal/event/recorder"
"github.com/ironcore-dev/ceph-provider/internal/omap"
"github.com/ironcore-dev/ceph-provider/internal/strategy"
"github.com/ironcore-dev/ceph-provider/internal/vcr"
Expand Down Expand Up @@ -55,6 +56,8 @@ type CephOptions struct {
PopulatorBufferSize int64

KeyEncryptionKeyPath string

VolumeEventStoreOptions eventrecorder.EventStoreOptions
}

func (o *Options) Defaults() {
Expand Down Expand Up @@ -82,6 +85,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Ceph.Pool, "ceph-pool", o.Ceph.Pool, "Ceph pool which is used to store objects.")
fs.StringVar(&o.Ceph.Client, "ceph-client", o.Ceph.Client, "Ceph client which grants access to pools/images eg. 'client.volumes'")
fs.StringVar(&o.Ceph.KeyEncryptionKeyPath, "ceph-kek-path", o.Ceph.KeyEncryptionKeyPath, "path to the key encryption key file (32 Bit - KEK) to encrypt volume keys.")
fs.IntVar(&o.Ceph.VolumeEventStoreOptions.MaxEvents, "volume-event-max-events", 100, "Maximum number of volume events that can be stored.")
fs.DurationVar(&o.Ceph.VolumeEventStoreOptions.EventTTL, "volume-event-ttl", 5*time.Minute, "Time to live for volume events.")
fs.DurationVar(&o.Ceph.VolumeEventStoreOptions.EventResyncInterval, "volume-event-resync-interval", 1*time.Minute, "Interval for resynchronizing the volume events.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -234,11 +240,14 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("failed to initialize docker registry: %w", err)
}

volumeEventStore := eventrecorder.NewEventStore(log, opts.Ceph.VolumeEventStoreOptions)

imageReconciler, err := controllers.NewImageReconciler(
log.WithName("image-reconciler"),
conn,
reg,
imageStore, snapshotStore,
volumeEventStore,
imageEvents,
snapshotEvents,
encryptor,
Expand Down Expand Up @@ -304,6 +313,13 @@ func Run(ctx context.Context, opts Options) error {
}
}()

wg.Add(1)
go func() {
defer wg.Done()
setupLog.Info("Starting volume events garbage collector")
volumeEventStore.Start(ctx)
}()

supportedClasses, err := vcr.LoadVolumeClassesFile(opts.PathSupportedVolumeClasses)
if err != nil {
return fmt.Errorf("failed to load supported volume classes: %w", err)
Expand All @@ -326,6 +342,7 @@ func Run(ctx context.Context, opts Options) error {
encryptor,
cephCommandClient,
volumeserver.Options{
VolumeEventStore: volumeEventStore,
BurstFactor: opts.Ceph.BurstFactor,
BurstDurationInSeconds: opts.Ceph.BurstDurationInSeconds,
},
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
module github.com/ironcore-dev/ceph-provider

go 1.22.1
go 1.22.2

require (
github.com/ceph/go-ceph v0.28.0
github.com/containerd/containerd v1.7.20
github.com/go-logr/logr v1.4.2
github.com/gogo/protobuf v1.3.2
github.com/google/addlicense v1.1.1
github.com/ironcore-dev/controller-utils v0.9.3
github.com/ironcore-dev/ironcore v0.1.2-0.20231120144059-30dd02e88870
github.com/ironcore-dev/ironcore v0.1.3-0.20240724061641-c2f923ef40eb
github.com/ironcore-dev/ironcore-image v0.2.1
github.com/kube-object-storage/lib-bucket-provisioner v0.0.0-20221122204822-d1a8c34382f1
github.com/onsi/ginkgo/v2 v2.19.1
Expand Down Expand Up @@ -57,7 +58,6 @@ require (
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand Down Expand Up @@ -107,7 +107,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
Expand All @@ -134,7 +134,7 @@ require (
k8s.io/apiextensions-apiserver v0.30.1 // indirect
k8s.io/apiserver v0.30.1 // indirect
k8s.io/component-base v0.30.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240322212309-b815d8309940 // indirect
oras.land/oras-go v1.2.5 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ironcore-dev/controller-utils v0.9.3 h1:sTrnxSzX5RrLf4B8KrAH2axSC+gxfJXphkV6df2GSsw=
github.com/ironcore-dev/controller-utils v0.9.3/go.mod h1:djKnxDs0Hwxhhc0VmVY8tZnrOrElvrRV2jov/LiCZ2Y=
github.com/ironcore-dev/ironcore v0.1.2-0.20231120144059-30dd02e88870 h1:yAFYucF/zthz9zdgHLbaQ5q9Ee0hFfbNHtxflm/lU5o=
github.com/ironcore-dev/ironcore v0.1.2-0.20231120144059-30dd02e88870/go.mod h1:+rMMEH846ihqB+P55EX/NIvKQs9QDjcIurhRoR3jorU=
github.com/ironcore-dev/ironcore v0.1.3-0.20240724061641-c2f923ef40eb h1:eiZSYhN3tW7P0lN007/vs6hPDzzQx9YVAAVh2QMBU0Y=
github.com/ironcore-dev/ironcore v0.1.3-0.20240724061641-c2f923ef40eb/go.mod h1:OAq8mJxuR76Lq7HQDMwB11cWJrb/76nX1wBrXQfMrOw=
github.com/ironcore-dev/ironcore-image v0.2.1 h1:7LsIftIRX5btnSieo1J+xUSnW1tSIsGzYgD6NnpObyY=
github.com/ironcore-dev/ironcore-image v0.2.1/go.mod h1:BNaacvN5++9zGiTDJea4vvGDwHvPJE6S9Xb3G7hsFQU=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down Expand Up @@ -893,8 +893,8 @@ go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 h1:1u/AyyOqAWzy+SkPxDpahCNZParHV8Vid1RnI2clyDE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30=
go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4=
go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8=
Expand Down Expand Up @@ -1601,8 +1601,8 @@ k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E=
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o=
Expand Down
17 changes: 17 additions & 0 deletions internal/controllers/image_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
providerapi "github.com/ironcore-dev/ceph-provider/api"
"github.com/ironcore-dev/ceph-provider/internal/encryption"
"github.com/ironcore-dev/ceph-provider/internal/event"
eventrecorder "github.com/ironcore-dev/ceph-provider/internal/event/recorder"
"github.com/ironcore-dev/ceph-provider/internal/round"
"github.com/ironcore-dev/ceph-provider/internal/store"
"github.com/ironcore-dev/ceph-provider/internal/utils"
"github.com/ironcore-dev/ironcore-image/oci/image"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
)
Expand All @@ -46,6 +48,7 @@ func NewImageReconciler(
registry image.Source,
images store.Store[*providerapi.Image],
snapshots store.Store[*providerapi.Snapshot],
eventRecorder eventrecorder.EventRecorder,
imageEvents event.Source[*providerapi.Image],
snapshotEvents event.Source[*providerapi.Snapshot],
keyEncryption encryption.Encryptor,
Expand Down Expand Up @@ -98,6 +101,7 @@ func NewImageReconciler(
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
images: images,
snapshots: snapshots,
EventRecorder: eventRecorder,
imageEvents: imageEvents,
snapshotEvents: snapshotEvents,
monitors: opts.Monitors,
Expand All @@ -117,6 +121,7 @@ type ImageReconciler struct {
images store.Store[*providerapi.Image]
snapshots store.Store[*providerapi.Snapshot]

eventrecorder.EventRecorder
imageEvents event.Source[*providerapi.Image]
snapshotEvents event.Source[*providerapi.Snapshot]

Expand Down Expand Up @@ -156,6 +161,7 @@ func (r *ImageReconciler) Start(ctx context.Context) error {

for _, img := range imageList {
if snapshotRef := img.Spec.SnapshotRef; snapshotRef != nil && *snapshotRef == evt.Object.ID {
r.Eventf(log, img.Metadata, corev1.EventTypeNormal, "PulledImage", "Pulled image %s", *img.Spec.SnapshotRef)
r.queue.Add(img.ID)
}
}
Expand Down Expand Up @@ -226,6 +232,7 @@ func (r *ImageReconciler) deleteImage(ctx context.Context, log logr.Logger, ioCt
if _, err := r.images.Update(ctx, image); store.IgnoreErrNotFound(err) != nil {
return fmt.Errorf("failed to update image metadata: %w", err)
}
r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "CompletedDeletion", "Image deletion completed")
log.V(2).Info("Removed Finalizers")

return nil
Expand Down Expand Up @@ -282,6 +289,7 @@ func (r *ImageReconciler) reconcileSnapshot(ctx context.Context, log logr.Logger
if err != nil {
switch {
case errors.Is(err, store.ErrNotFound):
r.Eventf(log, img.Metadata, corev1.EventTypeNormal, "CreateImageSnapshot", "Image snapshot was not found. Creating new snapshot")
snap, err = r.snapshots.Create(ctx, &providerapi.Snapshot{
Metadata: providerapi.Metadata{
ID: snapshotDigest,
Expand All @@ -294,6 +302,7 @@ func (r *ImageReconciler) reconcileSnapshot(ctx context.Context, log logr.Logger
},
})
if err != nil {
r.Eventf(log, img.Metadata, corev1.EventTypeWarning, "CreateImageSnapshot", "Create image snapshot failed with error: %s", err)
return fmt.Errorf("failed to create snapshot: %w", err)
}
default:
Expand All @@ -307,6 +316,7 @@ func (r *ImageReconciler) reconcileSnapshot(ctx context.Context, log logr.Logger
return fmt.Errorf("failed to update image snapshot ref: %w", err)
}

r.Eventf(log, img.Metadata, corev1.EventTypeNormal, "UpdatedImageSnapshotRef", "Updated image snapshot ref: %s", *img.Spec.SnapshotRef)
return nil
}

Expand Down Expand Up @@ -349,13 +359,15 @@ func (r *ImageReconciler) updateImage(ctx context.Context, log logr.Logger, ioCt
log.V(2).Info("No update needed: Old and new image size same")
return nil
case requestedSize < currentImageSize:
r.Eventf(log, image.Metadata, corev1.EventTypeWarning, "UpdateImageSize", "Failed to shrink image: not supported")
return fmt.Errorf("failed to shrink image: not supported")
}

if err := img.Resize(requestedSize); err != nil {
return fmt.Errorf("failed to resize image: %w", err)
}

r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "UpdatedImageSize", "Image size changed. requestedSize: %d currentSize: %d", requestedSize, currentImageSize)
log.V(1).Info("Updated image", "requestedSize", requestedSize, "currentSize", currentImageSize)
return nil
}
Expand Down Expand Up @@ -442,6 +454,7 @@ func (r *ImageReconciler) reconcileImage(ctx context.Context, id string) error {
}

if err := r.setEncryptionHeader(ctx, log, ioCtx, img); err != nil {
r.Eventf(log, img.Metadata, corev1.EventTypeWarning, "SetEncryptionFormat", "Set encryption header failed with error: %s", err)
return fmt.Errorf("failed to set encryption header: %w", err)
}

Expand Down Expand Up @@ -488,6 +501,7 @@ func (r *ImageReconciler) setImageLimits(ctx context.Context, log logr.Logger, i
}
return fmt.Errorf("failed to set limit (%s): %w", limit, err)
}
r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "SetImageLimit", "Image limit set. limit: %s value: %d", limit, value)
log.V(3).Info("Set image limit", "limit", limit, "value", value)
}

Expand Down Expand Up @@ -554,6 +568,7 @@ func (r *ImageReconciler) setEncryptionHeader(ctx context.Context, log logr.Logg
if _, err = r.images.Update(ctx, image); err != nil {
return fmt.Errorf("failed to update image encryption state: %w", err)
}
r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "ConfiguredEncryption", "Configured encryption")

return nil
}
Expand All @@ -562,6 +577,7 @@ func (r *ImageReconciler) createEmptyImage(ctx context.Context, log logr.Logger,
if err := librbd.CreateImage(ioCtx, ImageIDToRBDID(image.ID), round.OffBytes(image.Spec.Size), options); err != nil {
return fmt.Errorf("failed to create rbd image: %w", err)
}
r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "CreatedImage", "Created image. bytes: %d", image.Spec.Size)
log.V(2).Info("Created image", "bytes", image.Spec.Size)

return nil
Expand Down Expand Up @@ -611,6 +627,7 @@ func (r *ImageReconciler) createImageFromSnapshot(ctx context.Context, log logr.
return false, fmt.Errorf("failed to close rbd image: %w", err)
}

r.Eventf(log, image.Metadata, corev1.EventTypeNormal, "ClonedImage", "Cloned image from snapshot. bytes:%d", image.Spec.Size)
log.V(2).Info("Cloned image")
return true, nil
}
Loading

0 comments on commit f08c333

Please sign in to comment.