diff --git a/cmd/resource/scheme.go b/cmd/resource/scheme.go index b6a8ec63..30473ba5 100644 --- a/cmd/resource/scheme.go +++ b/cmd/resource/scheme.go @@ -60,7 +60,7 @@ func (c *SpecCodec) Decode(data any) (scheme.Spec, error) { return unstructured, nil } - spec, ok := c.scheme.New(unstructured.GetKind()) + spec, ok := c.scheme.NewSpec(unstructured.GetKind()) if !ok { return nil, errors.WithStack(encoding.ErrUnsupportedValue) } diff --git a/pkg/loader/reconciler.go b/pkg/loader/reconciler.go index 44372f19..17ca0957 100644 --- a/pkg/loader/reconciler.go +++ b/pkg/loader/reconciler.go @@ -116,7 +116,9 @@ func (r *Reconciler) watch(ctx context.Context) (*storage.Stream, error) { r.mu.Lock() defer r.mu.Unlock() - r.stream = nil + if r.stream == s { + r.stream = nil + } case <-r.done: return } diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 7f369748..1c649cdb 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/siyul-park/uniflow/pkg/encoding" "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/primitive" ) // Scheme defines a registry for handling decoding of Spec objects. @@ -60,8 +61,8 @@ func (s *Scheme) Codec(kind string) (Codec, bool) { return c, ok } -// New creates a new instance of Spec with the given kind. -func (s *Scheme) New(kind string) (Spec, bool) { +// NewSpec creates a new instance of Spec with the given kind. +func (s *Scheme) NewSpec(kind string) (Spec, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -77,6 +78,18 @@ func (s *Scheme) New(kind string) (Spec, bool) { } } +// NewSpecWithDoc creates a new instance of Spec with the given doc. +func (s *Scheme) NewSpecWithDoc(doc *primitive.Map) (Spec, error) { + unstructured := NewUnstructured(doc) + if spec, ok := s.NewSpec(unstructured.GetKind()); !ok { + return unstructured, nil + } else if err := unstructured.Unmarshal(spec); err != nil { + return nil, err + } else { + return spec, nil + } +} + // Decode decodes the given Spec into a node.Node. func (s *Scheme) Decode(spec Spec) (node.Node, error) { s.mu.RLock() @@ -90,7 +103,7 @@ func (s *Scheme) Decode(spec Spec) (node.Node, error) { } if unstructured, ok := spec.(*Unstructured); ok { - if structured, ok := s.New(kind); ok { + if structured, ok := s.NewSpec(kind); ok { if err := unstructured.Unmarshal(structured); err != nil { return nil, err } else { diff --git a/pkg/scheme/scheme_test.go b/pkg/scheme/scheme_test.go index 145301c2..31d57994 100644 --- a/pkg/scheme/scheme_test.go +++ b/pkg/scheme/scheme_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/go-faker/faker/v4" + "github.com/oklog/ulid/v2" "github.com/siyul-park/uniflow/pkg/node" "github.com/stretchr/testify/assert" ) @@ -34,17 +35,36 @@ func TestScheme_Codec(t *testing.T) { assert.True(t, ok) } -func TestScheme_New(t *testing.T) { +func TestScheme_NewSpec(t *testing.T) { s := New() kind := faker.Word() s.AddKnownType(kind, &SpecMeta{}) - spec, ok := s.New(kind) + spec, ok := s.NewSpec(kind) assert.True(t, ok) assert.IsType(t, spec, &SpecMeta{}) } +func TestScheme_NewSpecWithDoc(t *testing.T) { + s := New() + kind := faker.Word() + + s.AddKnownType(kind, &SpecMeta{}) + + u := NewUnstructured(nil) + spec := &SpecMeta{ + ID: ulid.Make(), + Kind: faker.Word(), + } + + _ = u.Marshal(spec) + + r, err := s.NewSpecWithDoc(u.Doc()) + assert.NoError(t, err) + assert.NotNil(t, r, spec) +} + func TestScheme_Decode(t *testing.T) { s := New() kind := faker.Word() diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d70a21ae..ca0fabd5 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -263,14 +263,7 @@ func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*datab return nil, nil } - unstructured := scheme.NewUnstructured(doc) - if spec, ok := s.scheme.New(unstructured.GetKind()); !ok { - return unstructured, nil - } else if err := unstructured.Unmarshal(spec); err != nil { - return nil, err - } else { - return spec, nil - } + return s.scheme.NewSpecWithDoc(doc) } // FindMany returns multiple scheme.Spec instances matched by the filter. @@ -294,10 +287,7 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data continue } - unstructured := scheme.NewUnstructured(doc) - if spec, ok := s.scheme.New(unstructured.GetKind()); !ok { - specs = append(specs, unstructured) - } else if err := unstructured.Unmarshal(spec); err != nil { + if spec, err := s.scheme.NewSpecWithDoc(doc); err != nil { return nil, err } else { specs = append(specs, spec) @@ -335,15 +325,11 @@ func (s *Storage) ensureIndexes(ctx context.Context, indexes []database.IndexMod return nil } -func (s *Storage) validate(unstructured *scheme.Unstructured) error { - if spec, ok := s.scheme.New(unstructured.GetKind()); ok { - if err := unstructured.Unmarshal(spec); err != nil { - return err - } else if n, err := s.scheme.Decode(spec); err != nil { - return err - } else { - _ = n.Close() - } +func (s *Storage) validate(spec scheme.Spec) error { + if n, err := s.scheme.Decode(spec); err != nil { + return err + } else { + _ = n.Close() } return nil }