Skip to content

Commit

Permalink
refactor: move duplicated code to scheme (#53)
Browse files Browse the repository at this point in the history
* refactor: remove unnessary code

* refactor: remove duplicated code

* fix: close stream when reconciler
  • Loading branch information
siyul-park authored Dec 15, 2023
1 parent fead3b4 commit d233ca4
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/resource/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *SpecCodec) Decode(data any) (scheme.Spec, error) {
return unstructured, nil
}

spec, ok := c.scheme.New(unstructured.GetKind())
spec, ok := c.scheme.NewSpec(unstructured.GetKind())
if !ok {
return nil, errors.WithStack(encoding.ErrUnsupportedValue)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/loader/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ func (r *Reconciler) watch(ctx context.Context) (*storage.Stream, error) {
r.mu.Lock()
defer r.mu.Unlock()

r.stream = nil
if r.stream == s {
r.stream = nil
}
case <-r.done:
return
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/scheme/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/siyul-park/uniflow/pkg/encoding"
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/primitive"
)

// Scheme defines a registry for handling decoding of Spec objects.
Expand Down Expand Up @@ -60,8 +61,8 @@ func (s *Scheme) Codec(kind string) (Codec, bool) {
return c, ok
}

// New creates a new instance of Spec with the given kind.
func (s *Scheme) New(kind string) (Spec, bool) {
// NewSpec creates a new instance of Spec with the given kind.
func (s *Scheme) NewSpec(kind string) (Spec, bool) {
s.mu.RLock()
defer s.mu.RUnlock()

Expand All @@ -77,6 +78,18 @@ func (s *Scheme) New(kind string) (Spec, bool) {
}
}

// NewSpecWithDoc creates a new instance of Spec with the given doc.
func (s *Scheme) NewSpecWithDoc(doc *primitive.Map) (Spec, error) {
unstructured := NewUnstructured(doc)
if spec, ok := s.NewSpec(unstructured.GetKind()); !ok {
return unstructured, nil
} else if err := unstructured.Unmarshal(spec); err != nil {
return nil, err
} else {
return spec, nil
}
}

// Decode decodes the given Spec into a node.Node.
func (s *Scheme) Decode(spec Spec) (node.Node, error) {
s.mu.RLock()
Expand All @@ -90,7 +103,7 @@ func (s *Scheme) Decode(spec Spec) (node.Node, error) {
}

if unstructured, ok := spec.(*Unstructured); ok {
if structured, ok := s.New(kind); ok {
if structured, ok := s.NewSpec(kind); ok {
if err := unstructured.Unmarshal(structured); err != nil {
return nil, err
} else {
Expand Down
24 changes: 22 additions & 2 deletions pkg/scheme/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/go-faker/faker/v4"
"github.com/oklog/ulid/v2"
"github.com/siyul-park/uniflow/pkg/node"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -34,17 +35,36 @@ func TestScheme_Codec(t *testing.T) {
assert.True(t, ok)
}

func TestScheme_New(t *testing.T) {
func TestScheme_NewSpec(t *testing.T) {
s := New()
kind := faker.Word()

s.AddKnownType(kind, &SpecMeta{})

spec, ok := s.New(kind)
spec, ok := s.NewSpec(kind)
assert.True(t, ok)
assert.IsType(t, spec, &SpecMeta{})
}

func TestScheme_NewSpecWithDoc(t *testing.T) {
s := New()
kind := faker.Word()

s.AddKnownType(kind, &SpecMeta{})

u := NewUnstructured(nil)
spec := &SpecMeta{
ID: ulid.Make(),
Kind: faker.Word(),
}

_ = u.Marshal(spec)

r, err := s.NewSpecWithDoc(u.Doc())
assert.NoError(t, err)
assert.NotNil(t, r, spec)
}

func TestScheme_Decode(t *testing.T) {
s := New()
kind := faker.Word()
Expand Down
28 changes: 7 additions & 21 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,7 @@ func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*datab
return nil, nil
}

unstructured := scheme.NewUnstructured(doc)
if spec, ok := s.scheme.New(unstructured.GetKind()); !ok {
return unstructured, nil
} else if err := unstructured.Unmarshal(spec); err != nil {
return nil, err
} else {
return spec, nil
}
return s.scheme.NewSpecWithDoc(doc)
}

// FindMany returns multiple scheme.Spec instances matched by the filter.
Expand All @@ -294,10 +287,7 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data
continue
}

unstructured := scheme.NewUnstructured(doc)
if spec, ok := s.scheme.New(unstructured.GetKind()); !ok {
specs = append(specs, unstructured)
} else if err := unstructured.Unmarshal(spec); err != nil {
if spec, err := s.scheme.NewSpecWithDoc(doc); err != nil {
return nil, err
} else {
specs = append(specs, spec)
Expand Down Expand Up @@ -335,15 +325,11 @@ func (s *Storage) ensureIndexes(ctx context.Context, indexes []database.IndexMod
return nil
}

func (s *Storage) validate(unstructured *scheme.Unstructured) error {
if spec, ok := s.scheme.New(unstructured.GetKind()); ok {
if err := unstructured.Unmarshal(spec); err != nil {
return err
} else if n, err := s.scheme.Decode(spec); err != nil {
return err
} else {
_ = n.Close()
}
func (s *Storage) validate(spec scheme.Spec) error {
if n, err := s.scheme.Decode(spec); err != nil {
return err
} else {
_ = n.Close()
}
return nil
}

0 comments on commit d233ca4

Please sign in to comment.