diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 43446c2c..6c149908 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -19,17 +19,16 @@ type Config struct { // Storage is responsible for storing scheme.Spec. type Storage struct { - scheme *scheme.Scheme - collection database.Collection - mu sync.RWMutex + scheme *scheme.Scheme + nodes database.Collection + mu sync.RWMutex } // CollectionNodes is the name of the nodes collection in the storage. const CollectionNodes = "nodes" var ( - // indexes defines the indexes for the Storage collection. - indexes = []database.IndexModel{ + indexesNode = []database.IndexModel{ { Name: "namespace_name", Keys: []string{scheme.KeyNamespace, scheme.KeyName}, @@ -44,42 +43,18 @@ func New(ctx context.Context, config Config) (*Storage, error) { scheme := config.Scheme db := config.Database - collection, err := db.Collection(ctx, CollectionNodes) + nodes, err := db.Collection(ctx, CollectionNodes) if err != nil { return nil, err } s := &Storage{ - scheme: scheme, - collection: collection, + scheme: scheme, + nodes: nodes, } - // Ensure indexes are created. - if exists, err := s.collection.Indexes().List(ctx); err != nil { + if err := ensureIndexes(ctx, nodes, indexesNode); err != nil { return nil, err - } else { - for _, index := range indexes { - index = database.IndexModel{ - Name: index.Name, - Keys: index.Keys, - Unique: index.Unique, - Partial: index.Partial, - } - - var ok bool - for _, i := range exists { - if i.Name == index.Name { - if reflect.DeepEqual(i, index) { - s.collection.Indexes().Drop(ctx, i.Name) - } - break - } - } - if ok { - continue - } - s.collection.Indexes().Create(ctx, index) - } } return s, nil @@ -92,7 +67,7 @@ func (s *Storage) Watch(ctx context.Context, filter *Filter) (*Stream, error) { return nil, err } - stream, err := s.collection.Watch(ctx, f) + stream, err := s.nodes.Watch(ctx, f) if err != nil { return nil, err } @@ -117,10 +92,10 @@ func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, e } var id ulid.ULID - if pk, err := s.collection.InsertOne(ctx, unstructured.Doc()); err != nil { + if pk, err := s.nodes.InsertOne(ctx, unstructured.Doc()); err != nil { return ulid.ULID{}, err } else if err := primitive.Unmarshal(pk, &id); err != nil { - _, _ = s.collection.DeleteOne(ctx, database.Where(scheme.KeyID).EQ(pk)) + _, _ = s.nodes.DeleteOne(ctx, database.Where(scheme.KeyID).EQ(pk)) return ulid.ULID{}, err } else { return id, nil @@ -150,10 +125,10 @@ func (s *Storage) InsertMany(ctx context.Context, objs []scheme.Spec) ([]ulid.UL } ids := make([]ulid.ULID, 0) - if pks, err := s.collection.InsertMany(ctx, docs); err != nil { + if pks, err := s.nodes.InsertMany(ctx, docs); err != nil { return nil, err } else if err := primitive.Unmarshal(primitive.NewSlice(pks...), &ids); err != nil { - _, _ = s.collection.DeleteMany(ctx, database.Where(scheme.KeyID).IN(pks...)) + _, _ = s.nodes.DeleteMany(ctx, database.Where(scheme.KeyID).IN(pks...)) return nil, err } else { return ids, nil @@ -178,7 +153,7 @@ func (s *Storage) UpdateOne(ctx context.Context, spec scheme.Spec) (bool, error) } filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode() - return s.collection.UpdateOne(ctx, filter, unstructured.Doc()) + return s.nodes.UpdateOne(ctx, filter, unstructured.Doc()) } // UpdateMany updates multiple scheme.Spec instances and returns the number of successes. @@ -206,7 +181,7 @@ func (s *Storage) UpdateMany(ctx context.Context, objs []scheme.Spec) (int, erro count := 0 for _, unstructured := range unstructureds { filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode() - if ok, err := s.collection.UpdateOne(ctx, filter, unstructured.Doc()); err != nil { + if ok, err := s.nodes.UpdateOne(ctx, filter, unstructured.Doc()); err != nil { return count, err } else if ok { count += 1 @@ -226,7 +201,7 @@ func (s *Storage) DeleteOne(ctx context.Context, filter *Filter) (bool, error) { return false, err } - return s.collection.DeleteOne(ctx, f) + return s.nodes.DeleteOne(ctx, f) } // DeleteMany deletes multiple scheme.Spec instances and returns the number of successes. @@ -239,7 +214,7 @@ func (s *Storage) DeleteMany(ctx context.Context, filter *Filter) (int, error) { return 0, err } - return s.collection.DeleteMany(ctx, f) + return s.nodes.DeleteMany(ctx, f) } // FindOne returns a single scheme.Spec matched by the filter. @@ -252,7 +227,7 @@ func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*datab return nil, err } - if doc, err := s.collection.FindOne(ctx, f, options...); err != nil { + if doc, err := s.nodes.FindOne(ctx, f, options...); err != nil { return nil, err } else if doc != nil { unstructured := scheme.NewUnstructured(doc) @@ -279,7 +254,7 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data } var specs []scheme.Spec - if docs, err := s.collection.FindMany(ctx, f, options...); err != nil { + if docs, err := s.nodes.FindMany(ctx, f, options...); err != nil { return nil, err } else { for _, doc := range docs { @@ -299,3 +274,35 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data return specs, nil } } + +func ensureIndexes(ctx context.Context, col database.Collection, indexes []database.IndexModel) error { + existingIndexes, err := col.Indexes().List(ctx) + if err != nil { + return err + } + + for _, index := range indexes { + index := database.IndexModel{ + Name: index.Name, + Keys: index.Keys, + Unique: index.Unique, + Partial: index.Partial, + } + + var found bool + for _, existingIndex := range existingIndexes { + if existingIndex.Name == index.Name && reflect.DeepEqual(existingIndex, index) { + found = true + break + } + } + + if !found { + if err := col.Indexes().Create(ctx, index); err != nil { + return err + } + } + } + + return nil +}