Skip to content

Commit

Permalink
refactor: rewrite comment
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 30, 2023
1 parent add7b0c commit ae289cc
Showing 1 changed file with 23 additions and 53 deletions.
76 changes: 23 additions & 53 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,24 @@ import (
"github.com/siyul-park/uniflow/pkg/scheme"
)

type (
// Config is a config for Storage.
Config struct {
Scheme *scheme.Scheme
Database database.Database
}
// Config is a configuration struct for Storage.
type Config struct {
Scheme *scheme.Scheme
Database database.Database
}

// Storage is the storage that stores scheme.Spec.
Storage struct {
scheme *scheme.Scheme
collection database.Collection
mu sync.RWMutex
}
)
// Storage is responsible for storing scheme.Spec.
type Storage struct {
scheme *scheme.Scheme
collection database.Collection
mu sync.RWMutex
}

const (
CollectionNodes = "nodes"
)
// CollectionNodes is the name of the nodes collection in the storage.
const CollectionNodes = "nodes"

var (
// indexes defines the indexes for the Storage collection.
indexes = []database.IndexModel{
{
Name: "namespace_name",
Expand All @@ -41,7 +39,7 @@ var (
}
)

// New returns a new Storage.
// New creates a new Storage instance.
func New(ctx context.Context, config Config) (*Storage, error) {
scheme := config.Scheme
db := config.Database
Expand All @@ -56,6 +54,7 @@ func New(ctx context.Context, config Config) (*Storage, error) {
collection: collection,
}

// Ensure indexes are created.
if exists, err := s.collection.Indexes().List(ctx); err != nil {
return nil, err
} else {
Expand Down Expand Up @@ -86,7 +85,7 @@ func New(ctx context.Context, config Config) (*Storage, error) {
return s, nil
}

// Watch returns Stream to track changes.
// Watch returns a Stream to track changes based on the provided filter.
func (s *Storage) Watch(ctx context.Context, filter *Filter) (*Stream, error) {
f, err := filter.Encode()
if err != nil {
Expand All @@ -100,7 +99,7 @@ func (s *Storage) Watch(ctx context.Context, filter *Filter) (*Stream, error) {
return NewStream(stream), nil
}

// InsertOne inserts a single scheme.Spec and return ID.
// InsertOne inserts a single scheme.Spec and returns its ID.
func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand All @@ -117,10 +116,6 @@ func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, e
unstructured.SetID(ulid.Make())
}

if err := s.validate(unstructured); err != nil {
return ulid.ULID{}, err
}

var id ulid.ULID
if pk, err := s.collection.InsertOne(ctx, unstructured.Doc()); err != nil {
return ulid.ULID{}, err
Expand All @@ -132,7 +127,7 @@ func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, e
}
}

// InsertMany inserts multiple scheme.Spec and return IDs.
// InsertMany inserts multiple scheme.Spec instances and returns their IDs.
func (s *Storage) InsertMany(ctx context.Context, objs []scheme.Spec) ([]ulid.ULID, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand All @@ -151,10 +146,6 @@ func (s *Storage) InsertMany(ctx context.Context, objs []scheme.Spec) ([]ulid.UL
unstructured.SetID(ulid.Make())
}

if err := s.validate(unstructured); err != nil {
return nil, err
}

docs = append(docs, unstructured.Doc())
}

Expand Down Expand Up @@ -186,15 +177,11 @@ func (s *Storage) UpdateOne(ctx context.Context, spec scheme.Spec) (bool, error)
return false, nil
}

if err := s.validate(unstructured); err != nil {
return false, err
}

filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode()
return s.collection.UpdateOne(ctx, filter, unstructured.Doc())
}

// UpdateMany multiple scheme.Spec and return the number of success.
// UpdateMany updates multiple scheme.Spec instances and returns the number of successes.
func (s *Storage) UpdateMany(ctx context.Context, objs []scheme.Spec) (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand All @@ -213,10 +200,6 @@ func (s *Storage) UpdateMany(ctx context.Context, objs []scheme.Spec) (int, erro
continue
}

if err := s.validate(unstructured); err != nil {
return 0, err
}

unstructureds = append(unstructureds, unstructured)
}

Expand Down Expand Up @@ -246,7 +229,7 @@ func (s *Storage) DeleteOne(ctx context.Context, filter *Filter) (bool, error) {
return s.collection.DeleteOne(ctx, f)
}

// DeleteMany deletes multiple scheme.Spec and returns the number of success.
// DeleteMany deletes multiple scheme.Spec instances and returns the number of successes.
func (s *Storage) DeleteMany(ctx context.Context, filter *Filter) (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand All @@ -259,7 +242,7 @@ func (s *Storage) DeleteMany(ctx context.Context, filter *Filter) (int, error) {
return s.collection.DeleteMany(ctx, f)
}

// FindOne return the single scheme.Spec which is matched by the filter.
// FindOne returns a single scheme.Spec matched by the filter.
func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*database.FindOptions) (scheme.Spec, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand All @@ -285,7 +268,7 @@ func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*datab
return nil, nil
}

// FindMany returns multiple scheme.Spec which is matched by the filter.
// FindMany returns multiple scheme.Spec instances matched by the filter.
func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*database.FindOptions) ([]scheme.Spec, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down Expand Up @@ -316,16 +299,3 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data
return specs, nil
}
}

func (s *Storage) validate(unstructured *scheme.Unstructured) error {
if spec, ok := s.scheme.New(unstructured.GetKind()); ok {
if err := unstructured.Unmarshal(spec); err != nil {
return err
} else if n, err := s.scheme.Decode(spec); err != nil {
return err
} else {
_ = n.Close()
}
}
return nil
}

0 comments on commit ae289cc

Please sign in to comment.