Skip to content

Commit

Permalink
refactor: storage collection -> nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 30, 2023
1 parent ae289cc commit 88ac874
Showing 1 changed file with 51 additions and 44 deletions.
95 changes: 51 additions & 44 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ type Config struct {

// Storage is responsible for storing scheme.Spec.
type Storage struct {
scheme *scheme.Scheme
collection database.Collection
mu sync.RWMutex
scheme *scheme.Scheme
nodes database.Collection
mu sync.RWMutex
}

// CollectionNodes is the name of the nodes collection in the storage.
const CollectionNodes = "nodes"

var (
// indexes defines the indexes for the Storage collection.
indexes = []database.IndexModel{
indexesNode = []database.IndexModel{
{
Name: "namespace_name",
Keys: []string{scheme.KeyNamespace, scheme.KeyName},
Expand All @@ -44,42 +43,18 @@ func New(ctx context.Context, config Config) (*Storage, error) {
scheme := config.Scheme
db := config.Database

collection, err := db.Collection(ctx, CollectionNodes)
nodes, err := db.Collection(ctx, CollectionNodes)
if err != nil {
return nil, err
}

s := &Storage{
scheme: scheme,
collection: collection,
scheme: scheme,
nodes: nodes,
}

// Ensure indexes are created.
if exists, err := s.collection.Indexes().List(ctx); err != nil {
if err := ensureIndexes(ctx, nodes, indexesNode); err != nil {
return nil, err
} else {
for _, index := range indexes {
index = database.IndexModel{
Name: index.Name,
Keys: index.Keys,
Unique: index.Unique,
Partial: index.Partial,
}

var ok bool
for _, i := range exists {
if i.Name == index.Name {
if reflect.DeepEqual(i, index) {
s.collection.Indexes().Drop(ctx, i.Name)
}
break
}
}
if ok {
continue
}
s.collection.Indexes().Create(ctx, index)
}
}

return s, nil
Expand All @@ -92,7 +67,7 @@ func (s *Storage) Watch(ctx context.Context, filter *Filter) (*Stream, error) {
return nil, err
}

stream, err := s.collection.Watch(ctx, f)
stream, err := s.nodes.Watch(ctx, f)
if err != nil {
return nil, err
}
Expand All @@ -117,10 +92,10 @@ func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, e
}

var id ulid.ULID
if pk, err := s.collection.InsertOne(ctx, unstructured.Doc()); err != nil {
if pk, err := s.nodes.InsertOne(ctx, unstructured.Doc()); err != nil {
return ulid.ULID{}, err
} else if err := primitive.Unmarshal(pk, &id); err != nil {
_, _ = s.collection.DeleteOne(ctx, database.Where(scheme.KeyID).EQ(pk))
_, _ = s.nodes.DeleteOne(ctx, database.Where(scheme.KeyID).EQ(pk))
return ulid.ULID{}, err
} else {
return id, nil
Expand Down Expand Up @@ -150,10 +125,10 @@ func (s *Storage) InsertMany(ctx context.Context, objs []scheme.Spec) ([]ulid.UL
}

ids := make([]ulid.ULID, 0)
if pks, err := s.collection.InsertMany(ctx, docs); err != nil {
if pks, err := s.nodes.InsertMany(ctx, docs); err != nil {
return nil, err
} else if err := primitive.Unmarshal(primitive.NewSlice(pks...), &ids); err != nil {
_, _ = s.collection.DeleteMany(ctx, database.Where(scheme.KeyID).IN(pks...))
_, _ = s.nodes.DeleteMany(ctx, database.Where(scheme.KeyID).IN(pks...))
return nil, err
} else {
return ids, nil
Expand All @@ -178,7 +153,7 @@ func (s *Storage) UpdateOne(ctx context.Context, spec scheme.Spec) (bool, error)
}

filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode()
return s.collection.UpdateOne(ctx, filter, unstructured.Doc())
return s.nodes.UpdateOne(ctx, filter, unstructured.Doc())
}

// UpdateMany updates multiple scheme.Spec instances and returns the number of successes.
Expand Down Expand Up @@ -206,7 +181,7 @@ func (s *Storage) UpdateMany(ctx context.Context, objs []scheme.Spec) (int, erro
count := 0
for _, unstructured := range unstructureds {
filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode()
if ok, err := s.collection.UpdateOne(ctx, filter, unstructured.Doc()); err != nil {
if ok, err := s.nodes.UpdateOne(ctx, filter, unstructured.Doc()); err != nil {
return count, err
} else if ok {
count += 1
Expand All @@ -226,7 +201,7 @@ func (s *Storage) DeleteOne(ctx context.Context, filter *Filter) (bool, error) {
return false, err
}

return s.collection.DeleteOne(ctx, f)
return s.nodes.DeleteOne(ctx, f)
}

// DeleteMany deletes multiple scheme.Spec instances and returns the number of successes.
Expand All @@ -239,7 +214,7 @@ func (s *Storage) DeleteMany(ctx context.Context, filter *Filter) (int, error) {
return 0, err
}

return s.collection.DeleteMany(ctx, f)
return s.nodes.DeleteMany(ctx, f)
}

// FindOne returns a single scheme.Spec matched by the filter.
Expand All @@ -252,7 +227,7 @@ func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*datab
return nil, err
}

if doc, err := s.collection.FindOne(ctx, f, options...); err != nil {
if doc, err := s.nodes.FindOne(ctx, f, options...); err != nil {
return nil, err
} else if doc != nil {
unstructured := scheme.NewUnstructured(doc)
Expand All @@ -279,7 +254,7 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data
}

var specs []scheme.Spec
if docs, err := s.collection.FindMany(ctx, f, options...); err != nil {
if docs, err := s.nodes.FindMany(ctx, f, options...); err != nil {
return nil, err
} else {
for _, doc := range docs {
Expand All @@ -299,3 +274,35 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data
return specs, nil
}
}

func ensureIndexes(ctx context.Context, col database.Collection, indexes []database.IndexModel) error {
existingIndexes, err := col.Indexes().List(ctx)
if err != nil {
return err
}

for _, index := range indexes {
index := database.IndexModel{
Name: index.Name,
Keys: index.Keys,
Unique: index.Unique,
Partial: index.Partial,
}

var found bool
for _, existingIndex := range existingIndexes {
if existingIndex.Name == index.Name && reflect.DeepEqual(existingIndex, index) {
found = true
break
}
}

if !found {
if err := col.Indexes().Create(ctx, index); err != nil {
return err
}
}
}

return nil
}

0 comments on commit 88ac874

Please sign in to comment.