diff --git a/pkg/api/api.go b/pkg/api/api.go index e1aac7ce..7adeea9e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -13,10 +13,19 @@ type Metadata struct { CreatedAt time.Time `json:"createdAt"` DeletedAt *time.Time `json:"deletedAt,omitempty"` Generation int64 `json:"generation"` + ResourceVersion uint64 `json:"resourceVersion"` Finalizers []string `json:"finalizers,omitempty"` } +func (m *Metadata) GetResourceVersion() uint64 { + return m.ResourceVersion +} + +func (m *Metadata) IncrementResourceVersion() { + m.ResourceVersion++ +} + func (m *Metadata) GetID() string { return m.ID } @@ -81,6 +90,7 @@ type Object interface { GetDeletedAt() *time.Time GetGeneration() int64 GetFinalizers() []string + GetResourceVersion() uint64 SetID(id string) SetAnnotations(annotations map[string]string) @@ -89,4 +99,5 @@ type Object interface { SetDeletedAt(deleted *time.Time) SetGeneration(generation int64) SetFinalizers(finalizers []string) + IncrementResourceVersion() } diff --git a/pkg/omap/omap.go b/pkg/omap/omap.go index dd3028bc..dec69c18 100644 --- a/pkg/omap/omap.go +++ b/pkg/omap/omap.go @@ -23,6 +23,9 @@ type CreateStrategy[E api.Object] interface { PrepareForCreate(obj E) } + +var ErrResourceVersionNotLatest = errors.New("resourceVersion is not latest") + type Options[E api.Object] struct { OmapName string NewFunc func() E @@ -146,6 +149,7 @@ func (s *Store[E]) Create(ctx context.Context, obj E) (E, error) { } obj.SetCreatedAt(time.Now()) + obj.IncrementResourceVersion() obj, err = s.set(ioCtx, obj) if err != nil { @@ -181,6 +185,7 @@ func (s *Store[E]) Delete(ctx context.Context, id string) error { now := time.Now() obj.SetDeletedAt(&now) + obj.IncrementResourceVersion() if _, err := s.set(ioCtx, obj); err != nil { return fmt.Errorf("failed to set object metadata: %w", err) @@ -221,7 +226,7 @@ func (s *Store[E]) Update(ctx context.Context, obj E) (E, error) { } defer ioCtx.Destroy() - _, err = s.get(ioCtx, obj.GetID()) + oldObj, err := s.get(ioCtx, obj.GetID()) if err != nil { return utils.Zero[E](), err } @@ -233,11 +238,17 @@ func (s *Store[E]) Update(ctx context.Context, obj E) (E, error) { return obj, nil } + if oldObj.GetResourceVersion() != obj.GetResourceVersion() { + return utils.Zero[E](), fmt.Errorf("failed to update object: %w", ErrResourceVersionNotLatest) + } + obj.IncrementResourceVersion() + //Todo: update version obj, err = s.set(ioCtx, obj) if err != nil { return utils.Zero[E](), err } + s.enqueue(store.WatchEvent[E]{ Type: store.WatchEventTypeUpdated,