From b2de2a74cd146ddfea3c726b8dd0f2c42b7c4a14 Mon Sep 17 00:00:00 2001 From: Ara Park Date: Mon, 27 Nov 2023 17:06:15 +0900 Subject: [PATCH] refactor: redesign compilation process (#10) * feat: move linking to symbol table * feat: change symbol table * feat: add symbol * refactor: symbol impliment node * refactor: remove unload in loader and migrate * fix: add filter load all * test: add load exists * refactor: remote -> storage * docs: add comment in symbol * docs: change comment in table * docs: add comment in loader * refactor: improve cohesion * docs: fix comment * fix: add hook free close * refactor: expose free * refactor: extract insert * fix: unload when free related symbol * fix: other namespace * chore: remove unused comment * feat: add index if name is not zero * refactor: add test and remove unused * fix: safe rollback linked * test: add more table test * test: add more test * refactor: remove unused if --- pkg/hook/hook.go | 84 +---- pkg/hook/hook_test.go | 52 +-- pkg/loader/loader.go | 529 +++++----------------------- pkg/loader/loader_test.go | 370 ++----------------- pkg/loader/reconciler.go | 34 +- pkg/loader/reconciler_test.go | 8 +- pkg/plugin/networkx/builder.go | 2 +- pkg/plugin/networkx/builder_test.go | 3 +- pkg/runtime/runtime.go | 42 +-- pkg/symbol/loadhook.go | 18 + pkg/symbol/postloadhook.go | 18 - pkg/symbol/postunloadhook.go | 18 - pkg/symbol/preloadhook.go | 18 - pkg/symbol/preunloadhook.go | 18 - pkg/symbol/symbol.go | 53 +++ pkg/symbol/symbol_test.go | 43 +++ pkg/symbol/table.go | 334 ++++++++++++++---- pkg/symbol/table_test.go | 485 ++++++++++++++++++++++--- pkg/symbol/unloadhook.go | 18 + 19 files changed, 1018 insertions(+), 1129 deletions(-) create mode 100644 pkg/symbol/loadhook.go delete mode 100644 pkg/symbol/postloadhook.go delete mode 100644 pkg/symbol/postunloadhook.go delete mode 100644 pkg/symbol/preloadhook.go delete mode 100644 pkg/symbol/preunloadhook.go create mode 100644 pkg/symbol/symbol.go create mode 100644 pkg/symbol/symbol_test.go create mode 100644 pkg/symbol/unloadhook.go diff --git a/pkg/hook/hook.go b/pkg/hook/hook.go index d1d938f6..aabf50ea 100644 --- a/pkg/hook/hook.go +++ b/pkg/hook/hook.go @@ -10,102 +10,56 @@ import ( type ( // Hook is a collection of hook functions. Hook struct { - preLoadHooks []symbol.PreLoadHook - postLoadHooks []symbol.PostLoadHook - preUnloadHooks []symbol.PreUnloadHook - postUnloadHooks []symbol.PostUnloadHook - mu sync.RWMutex + loadHooks []symbol.LoadHook + unloadHooks []symbol.UnloadHook + mu sync.RWMutex } ) -var _ symbol.PreLoadHook = &Hook{} -var _ symbol.PostLoadHook = &Hook{} -var _ symbol.PreUnloadHook = &Hook{} -var _ symbol.PostUnloadHook = &Hook{} +var _ symbol.LoadHook = &Hook{} +var _ symbol.UnloadHook = &Hook{} // New returns a new Hooks. func New() *Hook { return &Hook{} } -// AddPreLoadHook adds a PreLoadHook. -func (h *Hook) AddPreLoadHook(hook symbol.PreLoadHook) { +// AddLoadHook adds a LoadHook. +func (h *Hook) AddLoadHook(hook symbol.LoadHook) { h.mu.Lock() defer h.mu.Unlock() - h.preLoadHooks = append(h.preLoadHooks, hook) + h.loadHooks = append(h.loadHooks, hook) } -// AddPostLoadHook adds a PostLoadHook. -func (h *Hook) AddPostLoadHook(hook symbol.PostLoadHook) { +// AddUnloadHook adds a UnloadHook. +func (h *Hook) AddUnloadHook(hook symbol.UnloadHook) { h.mu.Lock() defer h.mu.Unlock() - h.postLoadHooks = append(h.postLoadHooks, hook) + h.unloadHooks = append(h.unloadHooks, hook) } -// AddPreUnloadHook adds a PreUnloadHook. -func (h *Hook) AddPreUnloadHook(hook symbol.PreUnloadHook) { - h.mu.Lock() - defer h.mu.Unlock() - - h.preUnloadHooks = append(h.preUnloadHooks, hook) -} - -// AddPostUnloadHook adds a PostUnloadHook. -func (h *Hook) AddPostUnloadHook(hook symbol.PostUnloadHook) { - h.mu.Lock() - defer h.mu.Unlock() - - h.postUnloadHooks = append(h.postUnloadHooks, hook) -} - -// PreLoad runs PreLoadHooks. -func (h *Hook) PreLoad(n node.Node) error { - h.mu.RLock() - defer h.mu.RUnlock() - - for _, hook := range h.preLoadHooks { - if err := hook.PreLoad(n); err != nil { - return err - } - } - return nil -} - -// PostLoad runs PostLoadHooks. -func (h *Hook) PostLoad(n node.Node) error { - h.mu.RLock() - defer h.mu.RUnlock() - - for _, hook := range h.postLoadHooks { - if err := hook.PostLoad(n); err != nil { - return err - } - } - return nil -} - -// PreUnload runs PreUnloadHooks. -func (h *Hook) PreUnload(n node.Node) error { +// Load runs LoadHooks. +func (h *Hook) Load(n node.Node) error { h.mu.RLock() defer h.mu.RUnlock() - for _, hook := range h.preUnloadHooks { - if err := hook.PreUnload(n); err != nil { + for _, hook := range h.loadHooks { + if err := hook.Load(n); err != nil { return err } } return nil } -// PostUnload runs PostUnloadHooks. -func (h *Hook) PostUnload(n node.Node) error { +// Unload runs UnloadHooks. +func (h *Hook) Unload(n node.Node) error { h.mu.RLock() defer h.mu.RUnlock() - for _, hook := range h.postUnloadHooks { - if err := hook.PostUnload(n); err != nil { + for _, hook := range h.unloadHooks { + if err := hook.Unload(n); err != nil { return err } } diff --git a/pkg/hook/hook_test.go b/pkg/hook/hook_test.go index 106c8230..e3663ad2 100644 --- a/pkg/hook/hook_test.go +++ b/pkg/hook/hook_test.go @@ -8,62 +8,38 @@ import ( "github.com/stretchr/testify/assert" ) -func TestHook_PreLoadHook(t *testing.T) { +func TestHook_LoadHook(t *testing.T) { hooks := New() n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) - h := symbol.PreLoadHookFunc(func(_ node.Node) error { + count := 0 + h := symbol.LoadHookFunc(func(_ node.Node) error { + count += 1 return nil }) - hooks.AddPreLoadHook(h) + hooks.AddLoadHook(h) - err := hooks.PreLoad(n) + err := hooks.Load(n) assert.NoError(t, err) + assert.Equal(t, 1, count) } -func TestHook_PostLoadHook(t *testing.T) { +func TestHook_UnloadHook(t *testing.T) { hooks := New() n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) - h := symbol.PostLoadHookFunc(func(_ node.Node) error { + count := 0 + h := symbol.UnloadHookFunc(func(_ node.Node) error { + count += 1 return nil }) - hooks.AddPostLoadHook(h) + hooks.AddUnloadHook(h) - err := hooks.PostLoad(n) - assert.NoError(t, err) -} - -func TestHook_PreUnloadHook(t *testing.T) { - hooks := New() - - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) - - h := symbol.PreUnloadHookFunc(func(_ node.Node) error { - return nil - }) - - hooks.AddPreUnloadHook(h) - - err := hooks.PreUnload(n) - assert.NoError(t, err) -} - -func TestHook_PostUnloadHook(t *testing.T) { - hooks := New() - - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) - - h := symbol.PostUnloadHookFunc(func(_ node.Node) error { - return nil - }) - - hooks.AddPostUnloadHook(h) - - err := hooks.PostUnload(n) + err := hooks.Unload(n) assert.NoError(t, err) + assert.Equal(t, 1, count) } diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 051730c0..10c80439 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -6,7 +6,8 @@ import ( "sync" "github.com/oklog/ulid/v2" - "github.com/siyul-park/uniflow/pkg/database/memdb" + "github.com/samber/lo" + "github.com/siyul-park/uniflow/pkg/database" "github.com/siyul-park/uniflow/pkg/node" "github.com/siyul-park/uniflow/pkg/scheme" "github.com/siyul-park/uniflow/pkg/storage" @@ -14,509 +15,163 @@ import ( ) type ( - // Config is a config for for the Loader. + // Config represents the configuration for the Loader. Config struct { - Table *symbol.Table - Scheme *scheme.Scheme - Storage *storage.Storage + Namespace string // Namespace is the namespace used by the Loader. + Table *symbol.Table // Table is the symbol table for managing symbols. + Scheme *scheme.Scheme // Scheme is the scheme used by the Loader. + Storage *storage.Storage // Storage is the storage used by the Loader. } - // Loader loads scheme.Spec into symbol.Table. + // Loader loads scheme.Spec into the symbol.Table. Loader struct { - scheme *scheme.Scheme - table *symbol.Table - remote *storage.Storage - local *storage.Storage - referenced map[ulid.ULID]links - undefined map[ulid.ULID]links - mu sync.RWMutex + namespace string + scheme *scheme.Scheme + table *symbol.Table + storage *storage.Storage + mu sync.RWMutex } - - links map[string][]scheme.PortLocation ) // New returns a new Loader. -func New(ctx context.Context, config Config) (*Loader, error) { +func New(config Config) *Loader { + namespace := config.Namespace table := config.Table scheme := config.Scheme - remote := config.Storage - - local, err := storage.New(ctx, storage.Config{ - Scheme: scheme, - Database: memdb.New(""), - }) - if err != nil { - return nil, err - } + storage := config.Storage return &Loader{ - scheme: scheme, - table: table, - remote: remote, - local: local, - referenced: make(map[ulid.ULID]links), - undefined: make(map[ulid.ULID]links), - }, nil + namespace: namespace, + scheme: scheme, + table: table, + storage: storage, + } } // LoadOne loads a single scheme.Spec from the storage.Storage -func (ld *Loader) LoadOne(ctx context.Context, filter *storage.Filter) (node.Node, error) { - ld.mu.Lock() - defer ld.mu.Unlock() - - return ld.loadOne(ctx, filter) -} - -// LoadMany loads multiple scheme.Spec from the storage.Storage -func (ld *Loader) LoadMany(ctx context.Context, filter *storage.Filter) ([]node.Node, error) { +func (ld *Loader) LoadOne(ctx context.Context, id ulid.ULID) (node.Node, error) { ld.mu.Lock() defer ld.mu.Unlock() - return ld.loadMany(ctx, filter) -} - -// UnloadOne unloads a single scheme.Spec from the storage.Storage -func (ld *Loader) UnloadOne(ctx context.Context, filter *storage.Filter) (bool, error) { - ld.mu.Lock() - defer ld.mu.Unlock() + namespace := ld.namespace - return ld.unloadOne(ctx, filter) -} + queue := []any{id} + for len(queue) > 0 { + prev := queue + queue = nil -// UnloadMany unloads multiple scheme.Spec from the storage.Storage -func (ld *Loader) UnloadMany(ctx context.Context, filter *storage.Filter) (int, error) { - ld.mu.Lock() - defer ld.mu.Unlock() - - return ld.unloadMany(ctx, filter) -} - -func (ld *Loader) loadOne(ctx context.Context, filter *storage.Filter) (node.Node, error) { - remote, err := ld.remote.FindOne(ctx, filter) - if err != nil { - return nil, err - } - local, err := ld.local.FindOne(ctx, filter) - if err != nil { - return nil, err - } + exists := map[any]bool{} - if remote != nil { - if local != nil { - if reflect.DeepEqual(remote, local) { - if n, ok := ld.table.Lookup(remote.GetID()); ok { - return n, nil - } + var filter *storage.Filter + for _, key := range prev { + if k, ok := key.(ulid.ULID); ok { + exists[k] = false + filter = filter.Or(storage.Where[ulid.ULID](scheme.KeyID).EQ(k)) + } else if k, ok := key.(string); ok { + exists[k] = false + filter = filter.Or(storage.Where[string](scheme.KeyName).EQ(k)) } } - } else { - if local != nil { - _, err := ld.unloadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(local.GetID())) - return nil, err + if namespace != "" { + filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(namespace)) } - return nil, nil - } - if n, err := ld.scheme.Decode(remote); err != nil { - return nil, err - } else { - n, err := ld.table.Insert(n) + specs, err := ld.storage.FindMany(ctx, filter, &database.FindOptions{Limit: lo.ToPtr(len(prev))}) if err != nil { return nil, err } - if local == nil { - if _, err := ld.local.InsertOne(ctx, remote); err != nil { - return nil, err - } - } else { - if _, err := ld.local.UpdateOne(ctx, remote); err != nil { - return nil, err + for _, spec := range specs { + exists[spec.GetID()] = true + if spec.GetName() != "" { + exists[spec.GetName()] = true } - } - if err := ld.resolveLinks(ctx, local, remote); err != nil { - return nil, err - } - - return n, nil - } -} - -func (ld *Loader) loadMany(ctx context.Context, filter *storage.Filter) ([]node.Node, error) { - remotes, err := ld.remote.FindMany(ctx, filter) - if err != nil { - return nil, err - } - locals, err := ld.local.FindMany(ctx, filter) - if err != nil { - return nil, err - } - - idToLocal := map[ulid.ULID]scheme.Spec{} - idToRemote := map[ulid.ULID]scheme.Spec{} - for _, spec := range locals { - idToLocal[spec.GetID()] = spec - } - for _, spec := range remotes { - idToRemote[spec.GetID()] = spec - } - - var removeIds []ulid.ULID - for id := range idToLocal { - if _, ok := idToRemote[id]; !ok { - removeIds = append(removeIds, id) - } - } - if len(removeIds) > 0 { - if _, err := ld.unloadMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(removeIds...)); err != nil { - return nil, err - } - } + if namespace == "" { + namespace = spec.GetNamespace() + } - var nodes []node.Node - for id, remote := range idToRemote { - local := idToLocal[id] - if local != nil { - if reflect.DeepEqual(remote, local) { - if n, ok := ld.table.Lookup(id); ok { - nodes = append(nodes, n) + if sym, ok := ld.table.LookupByID(spec.GetID()); ok { + if reflect.DeepEqual(sym.Spec, spec) { continue } } - } - if n, err := ld.scheme.Decode(remote); err != nil { - return nil, err - } else { - if sym, err := ld.table.Insert(n); err != nil { + if n, err := ld.scheme.Decode(spec); err != nil { + return nil, err + } else if err := ld.table.Insert(&symbol.Symbol{Node: n, Spec: spec}); err != nil { return nil, err - } else { - nodes = append(nodes, sym) - } - if local == nil { - if _, err := ld.local.InsertOne(ctx, remote); err != nil { - return nil, err - } - } else { - if _, err := ld.local.UpdateOne(ctx, remote); err != nil { - return nil, err - } - } - } - } - - for id, remote := range idToRemote { - local := idToLocal[id] - if err := ld.resolveLinks(ctx, local, remote); err != nil { - return nil, err - } - } - - return nodes, nil -} - -func (ld *Loader) unloadOne(ctx context.Context, filter *storage.Filter) (bool, error) { - local, err := ld.local.FindOne(ctx, filter) - if err != nil { - return false, err - } - if local == nil { - return false, nil - } - - if err := ld.resolveLinks(ctx, local, nil); err != nil { - return false, err - } - if _, err := ld.table.Free(local.GetID()); err != nil { - return false, err - } - return ld.local.DeleteOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(local.GetID())) -} - -func (ld *Loader) unloadMany(ctx context.Context, filter *storage.Filter) (int, error) { - locals, err := ld.local.FindMany(ctx, filter) - if err != nil { - return 0, err - } - - for _, local := range locals { - if err := ld.resolveLinks(ctx, local, nil); err != nil { - return 0, err - } - if _, err := ld.table.Free(local.GetID()); err != nil { - return 0, err - } - } - - var ids []ulid.ULID - for _, local := range locals { - ids = append(ids, local.GetID()) - } - return ld.local.DeleteMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...)) -} - -func (ld *Loader) resolveLinks(ctx context.Context, local scheme.Spec, remote scheme.Spec) error { - var n node.Node - var ok bool - - var spec scheme.Spec - var localLinks links - var remoteLinks links - - if local != nil { - spec = local - localLinks = local.GetLinks() - n, ok = ld.table.Lookup(local.GetID()) - } - if remote != nil { - spec = remote - remoteLinks = remote.GetLinks() - if !ok { - n, ok = ld.table.Lookup(remote.GetID()) - } - } - if !ok { - return nil - } - - deletions := localLinks - additions := remoteLinks - - undefined := links{} - - for name, locations := range deletions { - for _, location := range locations { - id := location.ID - - if id == (ulid.ULID{}) { - if location.Name != "" { - filter := storage.Where[string](scheme.KeyNamespace).EQ(spec.GetNamespace()) - filter = filter.And(storage.Where[string](scheme.KeyName).EQ(location.Name)) - if spec, err := ld.local.FindOne(ctx, filter); err != nil { - return err - } else if spec != nil { - id = spec.GetID() - } - } } - if id != (ulid.ULID{}) { - if ref, ok := ld.table.Lookup(id); ok { - referenced := ld.referenced[ref.ID()] - var locations []scheme.PortLocation - for _, location := range referenced[location.Port] { - if location.ID != n.ID() || location.Port != name { - locations = append(locations, location) - } - } - if len(locations) > 0 { - referenced[location.Port] = locations - ld.referenced[ref.ID()] = referenced - } else if referenced != nil { - delete(referenced, location.Port) - ld.referenced[ref.ID()] = referenced + for _, locations := range spec.GetLinks() { + for _, location := range locations { + if location.ID != (ulid.ULID{}) { + queue = append(queue, location.ID) + } else if location.Name != "" { + queue = append(queue, location.Name) } } } } - } - for name, locations := range additions { - p1, ok := n.Port(name) - if !ok { - undefined[name] = locations - continue - } - - for _, location := range locations { - filter := storage.Where[string](scheme.KeyNamespace).EQ(spec.GetNamespace()) - if location.ID != (ulid.ULID{}) { - filter = filter.And(storage.Where[ulid.ULID](scheme.KeyID).EQ(location.ID)) - } else if location.Name != "" { - filter = filter.And(storage.Where[string](scheme.KeyName).EQ(location.Name)) - } else { + for key, exist := range exists { + if exist { continue } - // TODO: use load many - if ref, err := ld.loadOne(ctx, filter); err != nil { - return err - } else if ref != nil { - if p2, ok := ref.Port(location.Port); ok { - p1.Link(p2) - - referenced := ld.referenced[ref.ID()] - if referenced == nil { - referenced = links{} - } - referenced[location.Port] = append(referenced[location.Port], scheme.PortLocation{ - ID: n.ID(), - Port: name, - }) - ld.referenced[ref.ID()] = referenced - } else { - undefined[name] = append(undefined[name], location) - } - } else { - undefined[name] = append(undefined[name], location) - } - } - } - - undefined = diffLinks(unionLinks(ld.undefined[n.ID()], undefined), deletions) - - if len(undefined) > 0 { - ld.undefined[n.ID()] = undefined - } else { - delete(ld.undefined, n.ID()) - } - - if remote == nil { - ld.removeReference(ctx, n.ID()) - } else { - for name, locations := range ld.referenced[spec.GetID()] { - p1, ok := n.Port(name) + id, ok := key.(ulid.ULID) if !ok { - continue - } - for _, location := range locations { - if ref, ok := ld.table.Lookup(location.ID); ok { - if p2, ok := ref.Port(location.Port); ok { - p1.Link(p2) + if name, ok := key.(string); ok { + if sym, ok := ld.table.LookupByName(namespace, name); ok { + id = sym.ID() } } } - } - - for id, additions := range ld.undefined { - if ref, err := ld.local.FindOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(id)); err != nil { - return err - } else if ref == nil { - ld.removeReference(ctx, id) - delete(ld.undefined, id) - continue - } else if ref.GetNamespace() != spec.GetNamespace() { - continue - } - undefined := make(links, len(additions)) - - if ref, ok := ld.table.Lookup(id); ok { - for name, locations := range additions { - p1, ok := ref.Port(name) - if !ok { - continue - } - - for _, location := range locations { - if (location.ID == spec.GetID()) || (location.Name != "" && location.Name == spec.GetName()) { - if p2, ok := n.Port(location.Port); ok { - p1.Link(p2) - - referenced := ld.referenced[n.ID()] - if referenced == nil { - referenced = links{} - } - referenced[location.Port] = append(referenced[location.Port], scheme.PortLocation{ - ID: ref.ID(), - Port: name, - }) - ld.referenced[n.ID()] = referenced - } else { - undefined[name] = append(undefined[name], location) - } - } else { - undefined[name] = append(undefined[name], location) - } - } + if id != (ulid.ULID{}) { + if _, err := ld.table.Free(id); err != nil { + return nil, err } } - - ld.undefined[id] = undefined } } - return nil -} - -func (ld *Loader) removeReference(ctx context.Context, id ulid.ULID) { - for name, locations := range ld.referenced[id] { - for _, location := range locations { - if ref, ok := ld.table.Lookup(location.ID); ok { - undefined := ld.undefined[ref.ID()] - if undefined == nil { - undefined = links{} - } - undefined[location.Port] = append(undefined[location.Port], scheme.PortLocation{ - ID: id, - Port: name, - }) - ld.undefined[ref.ID()] = undefined - } - } + if sym, ok := ld.table.LookupByID(id); !ok { + return nil, nil + } else { + return sym.Node, nil } - delete(ld.referenced, id) } -func diffLinks(l1 links, l2 links) links { - diff := make(links, len(l1)) - for name, locations1 := range l1 { - diffLocationSet := map[scheme.PortLocation]struct{}{} - for _, location := range locations1 { - diffLocationSet[location] = struct{}{} - } - if locations2, ok := l2[name]; ok { - for _, location := range locations2 { - delete(diffLocationSet, location) - } - } - - var diffLocations []scheme.PortLocation - for location := range diffLocationSet { - diffLocations = append(diffLocations, location) - } - - if len(diffLocations) > 0 { - diff[name] = diffLocations - } +// LoadAll loads all scheme.Spec from the storage.Storage +func (ld *Loader) LoadAll(ctx context.Context) ([]node.Node, error) { + var filter *storage.Filter + if ld.namespace != "" { + filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(ld.namespace)) } - if len(diff) == 0 { - return nil + specs, err := ld.storage.FindMany(ctx, filter) + if err != nil { + return nil, err } - return diff -} -func unionLinks(l1 links, l2 links) links { - unionSet := make(map[string]map[scheme.PortLocation]struct{}, len(l1)+len(l2)) - for name, locations := range l1 { - unionLocationSet := map[scheme.PortLocation]struct{}{} - for _, location := range locations { - unionLocationSet[location] = struct{}{} - } - unionSet[name] = unionLocationSet - } - for name, locations := range l2 { - unionLocationSet := unionSet[name] - if len(unionLocationSet) == 0 { - unionLocationSet = map[scheme.PortLocation]struct{}{} - } - for _, location := range locations { - unionLocationSet[location] = struct{}{} + var nodes []node.Node + for _, spec := range specs { + if sym, ok := ld.table.LookupByID(spec.GetID()); ok { + if reflect.DeepEqual(sym.Spec, spec) { + nodes = append(nodes, sym.Node) + continue + } } - unionSet[name] = unionLocationSet - } - union := make(links, len(unionSet)) - for name, locationSet := range unionSet { - var locations []scheme.PortLocation - for location := range locationSet { - locations = append(locations, location) + if n, err := ld.scheme.Decode(spec); err != nil { + return nil, err + } else if err := ld.table.Insert(&symbol.Symbol{Node: n, Spec: spec}); err != nil { + return nil, err + } else { + nodes = append(nodes, n) } - - union[name] = locations } - return union + return nodes, nil } diff --git a/pkg/loader/loader_test.go b/pkg/loader/loader_test.go index c9192b49..59479d12 100644 --- a/pkg/loader/loader_test.go +++ b/pkg/loader/loader_test.go @@ -15,285 +15,7 @@ import ( ) func TestLoader_LoadOne(t *testing.T) { - t.Run("linked all", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - ID: spec1.GetID(), - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec1) - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.Equal(t, p2.Links(), 1) - }) - - t.Run("linked all with name", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Name: faker.Word(), - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Name: faker.Word(), - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - Name: spec1.Name, - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec1) - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.Equal(t, p2.Links(), 1) - }) - - t.Run("unlinked any", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - ID: spec1.GetID(), - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - st.InsertOne(context.Background(), spec1) - - r1, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec1.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r1) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.Equal(t, p2.Links(), 1) - }) - - t.Run("relink any", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - ID: spec1.GetID(), - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec1) - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - ok, err := ld.UnloadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec1.GetID())) - assert.NoError(t, err) - assert.True(t, ok) - - r1, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec1.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r1) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.GreaterOrEqual(t, p2.Links(), 1) - }) -} - -func TestLoader_LoadMany(t *testing.T) { s := scheme.New() - st, _ := storage.New(context.Background(), storage.Config{ Scheme: s, Database: memdb.New(faker.Word()), @@ -302,7 +24,7 @@ func TestLoader_LoadMany(t *testing.T) { tb := symbol.NewTable() defer func() { _ = tb.Close() }() - ld, _ := New(context.Background(), Config{ + ld := New(Config{ Scheme: s, Storage: st, Table: tb, @@ -339,64 +61,35 @@ func TestLoader_LoadMany(t *testing.T) { st.InsertOne(context.Background(), spec1) st.InsertOne(context.Background(), spec2) - r, err := ld.LoadMany(context.Background(), nil) + r, err := ld.LoadOne(context.Background(), spec2.GetID()) assert.NoError(t, err) - assert.Len(t, r, 2) + assert.NotNil(t, r) - _, ok := tb.Lookup(spec1.GetID()) + _, ok := tb.LookupByID(spec1.GetID()) assert.True(t, ok) - _, ok = tb.Lookup(spec2.GetID()) + _, ok = tb.LookupByID(spec2.GetID()) assert.True(t, ok) -} - -func TestLoader_UnloadOne(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) + r, err = ld.LoadOne(context.Background(), spec2.GetID()) + assert.NoError(t, err) + assert.NotNil(t, r) - st.InsertOne(context.Background(), spec) + _, ok = tb.LookupByID(spec2.GetID()) + assert.True(t, ok) - _, _ = ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) + st.DeleteOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - ok, err := ld.UnloadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) + r, err = ld.LoadOne(context.Background(), spec2.GetID()) assert.NoError(t, err) - assert.True(t, ok) + assert.Nil(t, r) - _, ok = tb.Lookup(spec.GetID()) + _, ok = tb.LookupByID(spec2.GetID()) assert.False(t, ok) } -func TestLoader_UnloadMany(t *testing.T) { +func TestLoader_LoadAll(t *testing.T) { s := scheme.New() - st, _ := storage.New(context.Background(), storage.Config{ Scheme: s, Database: memdb.New(faker.Word()), @@ -405,7 +98,7 @@ func TestLoader_UnloadMany(t *testing.T) { tb := symbol.NewTable() defer func() { _ = tb.Close() }() - ld, _ := New(context.Background(), Config{ + ld := New(Config{ Scheme: s, Storage: st, Table: tb, @@ -413,10 +106,23 @@ func TestLoader_UnloadMany(t *testing.T) { kind := faker.Word() - spec := &scheme.SpecMeta{ + spec1 := &scheme.SpecMeta{ + ID: ulid.Make(), + Kind: kind, + Namespace: scheme.NamespaceDefault, + } + spec2 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortIO: { + { + ID: spec1.GetID(), + Port: node.PortIO, + }, + }, + }, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { @@ -426,14 +132,16 @@ func TestLoader_UnloadMany(t *testing.T) { s.AddKnownType(kind, &scheme.SpecMeta{}) s.AddCodec(kind, codec) - st.InsertOne(context.Background(), spec) - - _, _ = ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) + st.InsertOne(context.Background(), spec1) + st.InsertOne(context.Background(), spec2) - count, err := ld.UnloadMany(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) + r, err := ld.LoadAll(context.Background()) assert.NoError(t, err) - assert.Equal(t, 1, count) + assert.Len(t, r, 2) - _, ok := tb.Lookup(spec.GetID()) - assert.False(t, ok) + _, ok := tb.LookupByID(spec1.GetID()) + assert.True(t, ok) + + _, ok = tb.LookupByID(spec2.GetID()) + assert.True(t, ok) } diff --git a/pkg/loader/reconciler.go b/pkg/loader/reconciler.go index bc97a1cb..bb393478 100644 --- a/pkg/loader/reconciler.go +++ b/pkg/loader/reconciler.go @@ -4,41 +4,39 @@ import ( "context" "sync" - "github.com/oklog/ulid/v2" - "github.com/siyul-park/uniflow/pkg/scheme" "github.com/siyul-park/uniflow/pkg/storage" ) type ( // ReconcilerConfig is a config for for the Reconciler. ReconcilerConfig struct { - Remote *storage.Storage - Loader *Loader - Filter *storage.Filter + Storage *storage.Storage + Loader *Loader + Filter *storage.Filter } // Reconciler keeps up to date symbol.Table by tracking changes to the scheme.Spec. Reconciler struct { - remote *storage.Storage - loader *Loader - filter *storage.Filter - stream *storage.Stream - done chan struct{} - mu sync.Mutex + storage *storage.Storage + loader *Loader + filter *storage.Filter + stream *storage.Stream + done chan struct{} + mu sync.Mutex } ) // NewReconciler returns a new Reconciler. func NewReconciler(config ReconcilerConfig) *Reconciler { - remote := config.Remote + storage := config.Storage loader := config.Loader filter := config.Filter return &Reconciler{ - remote: remote, - loader: loader, - filter: filter, - done: make(chan struct{}), + storage: storage, + loader: loader, + filter: filter, + done: make(chan struct{}), } } @@ -71,7 +69,7 @@ func (r *Reconciler) Reconcile(ctx context.Context) error { return nil } - if _, err := r.loader.LoadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(event.NodeID)); err != nil { + if _, err := r.loader.LoadOne(ctx, event.NodeID); err != nil { return err } } @@ -107,7 +105,7 @@ func (r *Reconciler) watch(ctx context.Context) (*storage.Stream, error) { if r.stream != nil { return r.stream, nil } - s, err := r.remote.Watch(ctx, r.filter) + s, err := r.storage.Watch(ctx, r.filter) if err != nil { return nil, err } diff --git a/pkg/loader/reconciler_test.go b/pkg/loader/reconciler_test.go index 14d84a5b..349d6ec3 100644 --- a/pkg/loader/reconciler_test.go +++ b/pkg/loader/reconciler_test.go @@ -26,15 +26,15 @@ func TestReconciler_Reconcile(t *testing.T) { tb := symbol.NewTable() defer func() { _ = tb.Close() }() - ld, _ := New(context.Background(), Config{ + ld := New(Config{ Scheme: s, Storage: st, Table: tb, }) r := NewReconciler(ReconcilerConfig{ - Remote: st, - Loader: ld, + Storage: st, + Loader: ld, }) defer func() { _ = r.Close() }() @@ -72,7 +72,7 @@ func TestReconciler_Reconcile(t *testing.T) { assert.Fail(t, "timeout") return default: - if _, ok := tb.Lookup(m.GetID()); ok { + if _, ok := tb.LookupByID(m.GetID()); ok { return } } diff --git a/pkg/plugin/networkx/builder.go b/pkg/plugin/networkx/builder.go index 23d339a2..21e0804d 100644 --- a/pkg/plugin/networkx/builder.go +++ b/pkg/plugin/networkx/builder.go @@ -9,7 +9,7 @@ import ( func AddToHooks() func(*hook.Hook) error { return func(h *hook.Hook) error { - h.AddPostLoadHook(symbol.PostLoadHookFunc(func(n node.Node) error { + h.AddLoadHook(symbol.LoadHookFunc(func(n node.Node) error { if n, ok := n.(*HTTPNode); ok { go func() { n.Start() }() } diff --git a/pkg/plugin/networkx/builder_test.go b/pkg/plugin/networkx/builder_test.go index a87c7388..163ed0ba 100644 --- a/pkg/plugin/networkx/builder_test.go +++ b/pkg/plugin/networkx/builder_test.go @@ -23,8 +23,7 @@ func TestAddToHooks(t *testing.T) { Address: fmt.Sprintf(":%d", port), }) - err = hk.PostLoad(n) - assert.NoError(t, err) + hk.Load(n) errChan := make(chan error) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index ff75175a..0da767de 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -61,16 +61,15 @@ func New(ctx context.Context, config Config) (*Runtime, error) { } tb := symbol.NewTable(symbol.TableOptions{ - PreLoadHooks: []symbol.PreLoadHook{hk}, - PostLoadHooks: []symbol.PostLoadHook{hk}, - PreUnloadHooks: []symbol.PreUnloadHook{hk}, - PostUnloadHooks: []symbol.PostUnloadHook{hk}, + LoadHooks: []symbol.LoadHook{hk}, + UnloadHooks: []symbol.UnloadHook{hk}, }) - ld, err := loader.New(ctx, loader.Config{ - Scheme: sc, - Storage: st, - Table: tb, + ld := loader.New(loader.Config{ + Namespace: ns, + Scheme: sc, + Storage: st, + Table: tb, }) if err != nil { return nil, err @@ -81,9 +80,9 @@ func New(ctx context.Context, config Config) (*Runtime, error) { filter = storage.Where[string](scheme.KeyNamespace).EQ(ns) } rc := loader.NewReconciler(loader.ReconcilerConfig{ - Remote: st, - Loader: ld, - Filter: filter, + Storage: st, + Loader: ld, + Filter: filter, }) return &Runtime{ @@ -99,20 +98,16 @@ func New(ctx context.Context, config Config) (*Runtime, error) { // Lookup lookup node.Node in symbol.Table, and if it not exist load it from storage.Storage. func (r *Runtime) Lookup(ctx context.Context, id ulid.ULID) (node.Node, error) { - filter := storage.Where[ulid.ULID](scheme.KeyID).EQ(id) - if r.namespace != "" { - filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(r.namespace)) - } - if s, ok := r.table.Lookup(id); !ok { - return r.loader.LoadOne(ctx, filter) + if s, ok := r.table.LookupByID(id); !ok { + return r.loader.LoadOne(ctx, id) } else { return s, nil } } // Free unload node.Node from symbol.Table. -func (r *Runtime) Free(ctx context.Context, id ulid.ULID) (bool, error) { - return r.loader.UnloadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(id)) +func (r *Runtime) Free(_ context.Context, id ulid.ULID) (bool, error) { + return r.table.Free(id) } // Start starts the Runtime. @@ -122,11 +117,7 @@ func (r *Runtime) Start(ctx context.Context) error { if err := r.reconciler.Watch(ctx); err != nil { return err } - var filter *storage.Filter - if r.namespace != "" { - filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(r.namespace)) - } - if _, err := r.loader.LoadMany(ctx, filter); err != nil { + if _, err := r.loader.LoadAll(ctx); err != nil { return err } return r.reconciler.Reconcile(ctx) @@ -137,8 +128,5 @@ func (r *Runtime) Close(ctx context.Context) error { if err := r.reconciler.Close(); err != nil { return err } - if _, err := r.loader.UnloadMany(ctx, nil); err != nil { - return err - } return r.table.Close() } diff --git a/pkg/symbol/loadhook.go b/pkg/symbol/loadhook.go new file mode 100644 index 00000000..f0ef0b31 --- /dev/null +++ b/pkg/symbol/loadhook.go @@ -0,0 +1,18 @@ +package symbol + +import "github.com/siyul-park/uniflow/pkg/node" + +type ( + // LoadHook is a hook that is called node.Node is loaded. + LoadHook interface { + Load(n node.Node) error + } + + LoadHookFunc func(n node.Node) error +) + +var _ LoadHook = LoadHookFunc(func(n node.Node) error { return nil }) + +func (f LoadHookFunc) Load(n node.Node) error { + return f(n) +} diff --git a/pkg/symbol/postloadhook.go b/pkg/symbol/postloadhook.go deleted file mode 100644 index 707be9f9..00000000 --- a/pkg/symbol/postloadhook.go +++ /dev/null @@ -1,18 +0,0 @@ -package symbol - -import "github.com/siyul-park/uniflow/pkg/node" - -type ( - // PostLoadHook is a hook that is called after a node is loaded. - PostLoadHook interface { - PostLoad(n node.Node) error - } - - PostLoadHookFunc func(n node.Node) error -) - -var _ PostLoadHook = PostLoadHookFunc(func(n node.Node) error { return nil }) - -func (f PostLoadHookFunc) PostLoad(n node.Node) error { - return f(n) -} diff --git a/pkg/symbol/postunloadhook.go b/pkg/symbol/postunloadhook.go deleted file mode 100644 index bf5ccc23..00000000 --- a/pkg/symbol/postunloadhook.go +++ /dev/null @@ -1,18 +0,0 @@ -package symbol - -import "github.com/siyul-park/uniflow/pkg/node" - -type ( - // PostUnloadHook is a hook that is called after a node is unloaded. - PostUnloadHook interface { - PostUnload(n node.Node) error - } - - PostUnloadHookFunc func(n node.Node) error -) - -var _ PostUnloadHook = PostUnloadHookFunc(func(n node.Node) error { return nil }) - -func (f PostUnloadHookFunc) PostUnload(n node.Node) error { - return f(n) -} diff --git a/pkg/symbol/preloadhook.go b/pkg/symbol/preloadhook.go deleted file mode 100644 index 015e2500..00000000 --- a/pkg/symbol/preloadhook.go +++ /dev/null @@ -1,18 +0,0 @@ -package symbol - -import "github.com/siyul-park/uniflow/pkg/node" - -type ( - // PreLoadHook is a hook that is called before a node.Node is loaded. - PreLoadHook interface { - PreLoad(n node.Node) error - } - - PreLoadHookFunc func(n node.Node) error -) - -var _ PreLoadHook = PreLoadHookFunc(func(n node.Node) error { return nil }) - -func (f PreLoadHookFunc) PreLoad(n node.Node) error { - return f(n) -} diff --git a/pkg/symbol/preunloadhook.go b/pkg/symbol/preunloadhook.go deleted file mode 100644 index df07a152..00000000 --- a/pkg/symbol/preunloadhook.go +++ /dev/null @@ -1,18 +0,0 @@ -package symbol - -import "github.com/siyul-park/uniflow/pkg/node" - -type ( - // PreUnloadHook is a hook that is called before a node.Node is unloaded. - PreUnloadHook interface { - PreUnload(n node.Node) error - } - - PreUnloadHookFunc func(n node.Node) error -) - -var _ PreUnloadHook = PreUnloadHookFunc(func(n node.Node) error { return nil }) - -func (f PreUnloadHookFunc) PreUnload(n node.Node) error { - return f(n) -} diff --git a/pkg/symbol/symbol.go b/pkg/symbol/symbol.go new file mode 100644 index 00000000..f357954a --- /dev/null +++ b/pkg/symbol/symbol.go @@ -0,0 +1,53 @@ +package symbol + +import ( + "github.com/oklog/ulid/v2" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/scheme" +) + +type ( + // Symbol represents an object that binds a Node and a Spec. + Symbol struct { + Node node.Node + Spec scheme.Spec + } +) + +var _ node.Node = (*Symbol)(nil) + +// ID returns the unique identifier of the Symbol, based on its Node. +func (s *Symbol) ID() ulid.ULID { + return s.Node.ID() +} + +// Kind returns the kind of the Symbol, based on its Spec. +func (s *Symbol) Kind() string { + return s.Spec.GetKind() +} + +// Namespace returns the namespace of the Symbol, based on its Spec. +func (s *Symbol) Namespace() string { + return s.Spec.GetNamespace() +} + +// Name returns the name of the Symbol, based on its Spec. +func (s *Symbol) Name() string { + return s.Spec.GetName() +} + +// Links returns the links of the Symbol, based on its Spec. +func (s *Symbol) Links() map[string][]scheme.PortLocation { + return s.Spec.GetLinks() +} + +// Port returns the specified port of the Symbol, based on its Node. +func (s *Symbol) Port(name string) (*port.Port, bool) { + return s.Node.Port(name) +} + +// Close closes the Symbol, invoking the Close method of its Node. +func (s *Symbol) Close() error { + return s.Node.Close() +} diff --git a/pkg/symbol/symbol_test.go b/pkg/symbol/symbol_test.go new file mode 100644 index 00000000..b4a9f332 --- /dev/null +++ b/pkg/symbol/symbol_test.go @@ -0,0 +1,43 @@ +package symbol + +import ( + "testing" + + "github.com/go-faker/faker/v4" + "github.com/oklog/ulid/v2" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/stretchr/testify/assert" +) + +func TestSymbol_Getter(t *testing.T) { + n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n.Close() + spec := &scheme.SpecMeta{ + ID: n.ID(), + Kind: faker.Word(), + Namespace: scheme.NamespaceDefault, + Name: faker.UUIDHyphenated(), + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: ulid.Make(), + Port: node.PortIn, + }, + }, + }, + } + + sym := &Symbol{Node: n, Spec: spec} + + assert.Equal(t, n.ID(), sym.ID()) + assert.Equal(t, spec.GetKind(), sym.Kind()) + assert.Equal(t, spec.GetNamespace(), sym.Namespace()) + assert.Equal(t, spec.GetName(), sym.Name()) + assert.Equal(t, spec.GetLinks(), sym.Links()) + + p1, _ := n.Port(node.PortIn) + p2, _ := sym.Port(node.PortIn) + + assert.Equal(t, p1, p2) +} diff --git a/pkg/symbol/table.go b/pkg/symbol/table.go index 2888a9ec..3da320f8 100644 --- a/pkg/symbol/table.go +++ b/pkg/symbol/table.go @@ -4,150 +4,332 @@ import ( "sync" "github.com/oklog/ulid/v2" - "github.com/siyul-park/uniflow/pkg/node" + "github.com/samber/lo" + "github.com/siyul-park/uniflow/pkg/scheme" ) type ( - // TableOptions is a options for Table. + // TableOptions holds options for configuring a Table. TableOptions struct { - PreLoadHooks []PreLoadHook - PostLoadHooks []PostLoadHook - PreUnloadHooks []PreUnloadHook - PostUnloadHooks []PostUnloadHook + LoadHooks []LoadHook // LoadHooks define functions to be executed on symbol loading. + UnloadHooks []UnloadHook // UnloadHooks define functions to be executed on symbol unloading. } - // Table is the storage that manages Symbol. + // Table manages the storage and operations for Symbols. Table struct { - data map[ulid.ULID]node.Node - preLoadHooks []PreLoadHook - postLoadHooks []PostLoadHook - preUnloadHooks []PreUnloadHook - postUnloadHooks []PostUnloadHook - mu sync.RWMutex + symbols map[ulid.ULID]*Symbol + unlinks map[ulid.ULID]map[string][]scheme.PortLocation + linked map[ulid.ULID]map[string][]scheme.PortLocation + index map[string]map[string]ulid.ULID + loadHooks []LoadHook + unloadHooks []UnloadHook + mu sync.RWMutex } ) -// NewTable returns a new SymbolTable +// NewTable returns a new SymbolTable with the specified options. func NewTable(opts ...TableOptions) *Table { - var preLoadHooks []PreLoadHook - var postLoadHooks []PostLoadHook - var preUnloadHooks []PreUnloadHook - var postUnloadHooks []PostUnloadHook + var loadHooks []LoadHook + var unloadHooks []UnloadHook for _, opt := range opts { - preLoadHooks = append(preLoadHooks, opt.PreLoadHooks...) - postLoadHooks = append(postLoadHooks, opt.PostLoadHooks...) - preUnloadHooks = append(preUnloadHooks, opt.PreUnloadHooks...) - postUnloadHooks = append(postUnloadHooks, opt.PostUnloadHooks...) + loadHooks = append(loadHooks, opt.LoadHooks...) + unloadHooks = append(unloadHooks, opt.UnloadHooks...) } return &Table{ - data: make(map[ulid.ULID]node.Node), - preLoadHooks: preLoadHooks, - postLoadHooks: postLoadHooks, - preUnloadHooks: preUnloadHooks, - postUnloadHooks: postUnloadHooks, + symbols: make(map[ulid.ULID]*Symbol), + unlinks: make(map[ulid.ULID]map[string][]scheme.PortLocation), + linked: make(map[ulid.ULID]map[string][]scheme.PortLocation), + index: make(map[string]map[string]ulid.ULID), + loadHooks: loadHooks, + unloadHooks: unloadHooks, } } -// Insert inserts a node.Node. -func (t *Table) Insert(n node.Node) (node.Node, error) { +// Insert adds a Symbol to the table. +func (t *Table) Insert(sym *Symbol) error { t.mu.Lock() defer t.mu.Unlock() - if origin, ok := t.data[n.ID()]; ok { - if err := t.preUnload(origin); err != nil { - return nil, err - } - if err := origin.Close(); err != nil { - return nil, err - } - if err := t.postUnload(origin); err != nil { - return nil, err - } + if _, err := t.free(sym.ID()); err != nil { + return err } - - if err := t.preLoad(n); err != nil { - return nil, err - } - t.data[n.ID()] = n - if err := t.postLoad(n); err != nil { - return nil, err + if err := t.insert(sym); err != nil { + return err } - return n, nil + return nil } -// Free removes a Symbol. +// Free removes a Symbol from the table. func (t *Table) Free(id ulid.ULID) (bool, error) { t.mu.Lock() defer t.mu.Unlock() - if n, ok := t.data[id]; ok { - if err := n.Close(); err != nil { - return false, err - } - delete(t.data, id) + if sym, err := t.free(id); err != nil { + return false, err + } else if sym != nil { return true, nil } - return false, nil } -// Lookup returns a node.Node. -func (t *Table) Lookup(id ulid.ULID) (node.Node, bool) { +// LookupByID retrieves a Symbol by its ID. +func (t *Table) LookupByID(id ulid.ULID) (*Symbol, bool) { t.mu.RLock() defer t.mu.RUnlock() - n, ok := t.data[id] - return n, ok + sym, ok := t.symbols[id] + return sym, ok } -// Close closes the SymbolTable. +// LookupByName retrieves a Symbol by its namespace and name. +func (t *Table) LookupByName(namespace, name string) (*Symbol, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + + if namespace, ok := t.index[namespace]; ok { + if id, ok := namespace[name]; ok { + sym, ok := t.symbols[id] + return sym, ok + } + } + return nil, false +} + +// Close closes the SymbolTable, closing all associated symbols. func (t *Table) Close() error { t.mu.Lock() defer t.mu.Unlock() - for id, n := range t.data { - if err := n.Close(); err != nil { + for id := range t.symbols { + if _, err := t.free(id); err != nil { return err } - delete(t.data, id) } return nil } -func (t *Table) preLoad(n node.Node) error { - for _, hook := range t.preLoadHooks { - if err := hook.PreLoad(n); err != nil { +func (t *Table) insert(sym *Symbol) error { + t.symbols[sym.ID()] = sym + + if sym.Name() != "" { + t.index[sym.Namespace()] = lo.Assign(t.index[sym.Namespace()], map[string]ulid.ULID{sym.Name(): sym.ID()}) + } + + unlinks := map[string][]scheme.PortLocation{} + + for name, locations := range sym.Links() { + p1, ok := sym.Port(name) + if !ok { + unlinks[name] = locations + continue + } + + for _, location := range locations { + id := location.ID + if location.Name != "" { + if namespace, ok := t.index[sym.Namespace()]; ok { + id = namespace[location.Name] + } + } + + if id != (ulid.ULID{}) { + if ref, ok := t.symbols[id]; ok { + if ref.Namespace() == sym.Namespace() { + if p2, ok := ref.Port(location.Port); ok { + p1.Link(p2) + + linked := t.linked[ref.ID()] + if linked == nil { + linked = make(map[string][]scheme.PortLocation) + } + linked[location.Port] = append(linked[location.Port], scheme.PortLocation{ + ID: sym.ID(), + Name: location.Name, + Port: name, + }) + t.linked[ref.ID()] = linked + + continue + } + } + } + } + + unlinks[name] = append(unlinks[name], location) + } + } + + if len(unlinks) > 0 { + t.unlinks[sym.ID()] = unlinks + } + + if err := t.load(sym); err != nil { + return err + } + + for id, unlinks := range t.unlinks { + ref := t.symbols[id] + + if ref.Namespace() != sym.Namespace() { + continue + } + + for name, locations := range unlinks { + p1, ok := ref.Port(name) + if !ok { + continue + } + + for i, location := range locations { + if (location.ID == sym.ID()) || (location.Name != "" && location.Name == sym.Name()) { + if p2, ok := sym.Port(location.Port); ok { + p1.Link(p2) + + linked := t.linked[sym.ID()] + if linked == nil { + linked = make(map[string][]scheme.PortLocation) + } + linked[location.Port] = append(linked[location.Port], scheme.PortLocation{ + ID: ref.ID(), + Name: location.Name, + Port: name, + }) + t.linked[sym.ID()] = linked + + unlinks[name] = append(locations[:i], locations[i+1:]...) + } + } + } + + if len(unlinks[name]) == 0 { + delete(unlinks, name) + } + } + + if len(unlinks) > 0 { + t.unlinks[id] = unlinks + } else { + delete(t.unlinks, id) + } + + if err := t.load(ref); err != nil { return err } } + return nil } -func (t *Table) postLoad(n node.Node) error { - for _, hook := range t.postLoadHooks { - if err := hook.PostLoad(n); err != nil { - return err +func (t *Table) free(id ulid.ULID) (*Symbol, error) { + sym, ok := t.symbols[id] + if !ok { + return nil, nil + } + + if err := t.unload(sym); err != nil { + return nil, err + } + if err := sym.Close(); err != nil { + return nil, err + } + + if sym.Name() != "" { + if namespace, ok := t.index[sym.Namespace()]; ok { + delete(namespace, sym.Name()) + if len(namespace) == 0 { + delete(t.index, sym.Namespace()) + } } } - return nil + + for name, locations := range sym.Links() { + for _, location := range locations { + id := location.ID + if location.Name != "" { + if namespace, ok := t.index[sym.Namespace()]; ok { + id = namespace[location.Name] + } + } + + if id == (ulid.ULID{}) { + continue + } + + linked := t.linked[id] + var locations []scheme.PortLocation + for _, location := range linked[location.Port] { + if location.ID != sym.ID() && location.Port != name { + locations = append(locations, location) + } + } + if len(locations) > 0 { + linked[location.Port] = locations + t.linked[id] = linked + } else if len(linked) > 0 { + delete(linked, location.Port) + t.linked[id] = linked + } + } + } + + for name, locations := range t.linked[id] { + for _, location := range locations { + if err := t.unload(t.symbols[location.ID]); err != nil { + return nil, err + } + + unlinks := t.unlinks[location.ID] + if unlinks == nil { + unlinks = make(map[string][]scheme.PortLocation) + } + + var unlink scheme.PortLocation + if location.Name == "" { + unlink = scheme.PortLocation{ + ID: id, + Port: name, + } + } else { + unlink = scheme.PortLocation{ + Name: location.Name, + Port: name, + } + } + + unlinks[location.Port] = append(unlinks[location.Port], unlink) + t.unlinks[location.ID] = unlinks + } + } + + delete(t.symbols, id) + delete(t.unlinks, id) + delete(t.linked, id) + + return sym, nil } -func (t *Table) preUnload(n node.Node) error { - for _, hook := range t.preUnloadHooks { - if err := hook.PreUnload(n); err != nil { +func (t *Table) load(sym *Symbol) error { + if len(t.unlinks[sym.ID()]) > 0 { + return nil + } + for _, hook := range t.loadHooks { + if err := hook.Load(sym.Node); err != nil { return err } } return nil } -func (t *Table) postUnload(n node.Node) error { - for _, hook := range t.postUnloadHooks { - if err := hook.PostUnload(n); err != nil { +func (t *Table) unload(sym *Symbol) error { + if len(t.unlinks[sym.ID()]) > 0 { + return nil + } + for _, hook := range t.unloadHooks { + if err := hook.Unload(sym.Node); err != nil { return err } } diff --git a/pkg/symbol/table_test.go b/pkg/symbol/table_test.go index 13da21d6..863f5251 100644 --- a/pkg/symbol/table_test.go +++ b/pkg/symbol/table_test.go @@ -3,94 +3,463 @@ package symbol import ( "testing" + "github.com/go-faker/faker/v4" "github.com/oklog/ulid/v2" "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/scheme" "github.com/stretchr/testify/assert" ) func TestTable_Insert(t *testing.T) { - t.Run("not exists", func(t *testing.T) { - tb := NewTable() - defer func() { _ = tb.Close() }() + t.Run("ID", func(t *testing.T) { + t.Run("not exists", func(t *testing.T) { + tb := NewTable() + defer tb.Close() - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + n1 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n1.Close() + n2 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n2.Close() + n3 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() - s, err := tb.Insert(n) - assert.NoError(t, err) - assert.NotNil(t, s) - assert.Equal(t, n.ID(), s.ID()) + spec1 := &scheme.SpecMeta{ + ID: n1.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n2.ID(), + Port: node.PortIn, + }, + }, + }, + } + spec2 := &scheme.SpecMeta{ + ID: n2.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n3.ID(), + Port: node.PortIn, + }, + }, + }, + } + spec3 := &scheme.SpecMeta{ + ID: n3.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n1.ID(), + Port: node.PortIn, + }, + }, + }, + } + + p1, _ := n1.Port(node.PortIn) + p2, _ := n2.Port(node.PortIn) + p3, _ := n3.Port(node.PortIn) + + err := tb.Insert(&Symbol{Node: n1, Spec: spec1}) + assert.NoError(t, err) + + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 0, p2.Links()) + assert.Equal(t, 0, p3.Links()) + + err = tb.Insert(&Symbol{Node: n2, Spec: spec2}) + assert.NoError(t, err) + + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 1, p2.Links()) + assert.Equal(t, 0, p3.Links()) + + err = tb.Insert(&Symbol{Node: n3, Spec: spec3}) + assert.NoError(t, err) + + assert.Equal(t, 1, p1.Links()) + assert.Equal(t, 1, p2.Links()) + assert.Equal(t, 1, p3.Links()) + }) + + t.Run("exists", func(t *testing.T) { + tb := NewTable() + defer tb.Close() + + id := ulid.Make() + + n1 := node.NewOneToOneNode(node.OneToOneNodeConfig{ID: id}) + defer n1.Close() + n2 := node.NewOneToOneNode(node.OneToOneNodeConfig{ID: id}) + defer n2.Close() + n3 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() + n4 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() + + spec1 := &scheme.SpecMeta{ + ID: id, + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n3.ID(), + Port: node.PortIn, + }, + }, + }, + } + spec2 := &scheme.SpecMeta{ + ID: id, + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n4.ID(), + Port: node.PortIn, + }, + }, + }, + } + spec3 := &scheme.SpecMeta{ + ID: n3.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: id, + Port: node.PortIn, + }, + }, + }, + } + spec4 := &scheme.SpecMeta{ + ID: n4.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: id, + Port: node.PortIn, + }, + }, + }, + } + + p1, _ := n1.Port(node.PortIn) + p2, _ := n2.Port(node.PortIn) + p3, _ := n3.Port(node.PortIn) + p4, _ := n4.Port(node.PortIn) + + _ = tb.Insert(&Symbol{Node: n3, Spec: spec3}) + _ = tb.Insert(&Symbol{Node: n4, Spec: spec4}) + + err := tb.Insert(&Symbol{Node: n1, Spec: spec1}) + assert.NoError(t, err) + + assert.Equal(t, 2, p1.Links()) + assert.Equal(t, 0, p2.Links()) + assert.Equal(t, 1, p3.Links()) + + err = tb.Insert(&Symbol{Node: n2, Spec: spec2}) + assert.NoError(t, err) + + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 2, p2.Links()) + assert.Equal(t, 1, p4.Links()) + }) }) - t.Run("exists", func(t *testing.T) { - tb := NewTable() - defer func() { _ = tb.Close() }() + t.Run("Name", func(t *testing.T) { + t.Run("not exists", func(t *testing.T) { + tb := NewTable() + defer tb.Close() + + n1 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n1.Close() + n2 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n2.Close() + n3 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() + + spec1 := &scheme.SpecMeta{ + ID: n1.ID(), + Namespace: scheme.NamespaceDefault, + Name: faker.UUIDHyphenated(), + } + spec2 := &scheme.SpecMeta{ + ID: n2.ID(), + Namespace: scheme.NamespaceDefault, + Name: faker.UUIDHyphenated(), + } + spec3 := &scheme.SpecMeta{ + ID: n3.ID(), + Namespace: scheme.NamespaceDefault, + Name: faker.UUIDHyphenated(), + } + + spec1.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: spec2.GetName(), + Port: node.PortIn, + }, + }, + } + spec2.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: spec3.GetName(), + Port: node.PortIn, + }, + }, + } + spec3.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: spec1.GetName(), + Port: node.PortIn, + }, + }, + } + + p1, _ := n1.Port(node.PortIn) + p2, _ := n2.Port(node.PortIn) + p3, _ := n3.Port(node.PortIn) + + err := tb.Insert(&Symbol{Node: n1, Spec: spec1}) + assert.NoError(t, err) - id := ulid.Make() + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 0, p2.Links()) + assert.Equal(t, 0, p3.Links()) - n1 := node.NewOneToOneNode(node.OneToOneNodeConfig{ID: id}) - n2 := node.NewOneToOneNode(node.OneToOneNodeConfig{ID: id}) + err = tb.Insert(&Symbol{Node: n2, Spec: spec2}) + assert.NoError(t, err) - s1, err := tb.Insert(n1) - assert.NoError(t, err) - assert.NotNil(t, s1) - assert.Equal(t, n1.ID(), s1.ID()) + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 1, p2.Links()) + assert.Equal(t, 0, p3.Links()) - s2, err := tb.Insert(n1) - assert.NoError(t, err) - assert.NotNil(t, s2) - assert.Equal(t, n2.ID(), s2.ID()) + err = tb.Insert(&Symbol{Node: n3, Spec: spec3}) + assert.NoError(t, err) + + assert.Equal(t, 1, p1.Links()) + assert.Equal(t, 1, p2.Links()) + assert.Equal(t, 1, p3.Links()) + }) + + t.Run("exists", func(t *testing.T) { + tb := NewTable() + defer tb.Close() + + id := ulid.Make() + name := faker.UUIDHyphenated() + + n1 := node.NewOneToOneNode(node.OneToOneNodeConfig{ID: id}) + defer n1.Close() + n2 := node.NewOneToOneNode(node.OneToOneNodeConfig{ID: id}) + defer n2.Close() + n3 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() + n4 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() + + spec1 := &scheme.SpecMeta{ + ID: id, + Namespace: scheme.NamespaceDefault, + Name: name, + } + spec2 := &scheme.SpecMeta{ + ID: id, + Namespace: scheme.NamespaceDefault, + Name: name, + } + spec3 := &scheme.SpecMeta{ + ID: n3.ID(), + Namespace: scheme.NamespaceDefault, + Name: faker.UUIDHyphenated(), + } + spec4 := &scheme.SpecMeta{ + ID: n4.ID(), + Namespace: scheme.NamespaceDefault, + Name: faker.UUIDHyphenated(), + } + + spec1.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: spec3.GetName(), + Port: node.PortIn, + }, + }, + } + spec2.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: spec4.GetName(), + Port: node.PortIn, + }, + }, + } + spec3.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: name, + Port: node.PortIn, + }, + }, + } + spec4.Links = map[string][]scheme.PortLocation{ + node.PortOut: { + { + Name: name, + Port: node.PortIn, + }, + }, + } + + p1, _ := n1.Port(node.PortIn) + p2, _ := n2.Port(node.PortIn) + p3, _ := n3.Port(node.PortIn) + p4, _ := n4.Port(node.PortIn) + + _ = tb.Insert(&Symbol{Node: n3, Spec: spec3}) + _ = tb.Insert(&Symbol{Node: n4, Spec: spec4}) + + err := tb.Insert(&Symbol{Node: n1, Spec: spec1}) + assert.NoError(t, err) + + assert.Equal(t, 2, p1.Links()) + assert.Equal(t, 0, p2.Links()) + assert.Equal(t, 1, p3.Links()) + + err = tb.Insert(&Symbol{Node: n2, Spec: spec2}) + assert.NoError(t, err) + + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 2, p2.Links()) + assert.Equal(t, 1, p4.Links()) + }) }) } func TestTable_Free(t *testing.T) { - t.Run("not exists", func(t *testing.T) { - tb := NewTable() - defer func() { _ = tb.Close() }() + tb := NewTable() + defer tb.Close() - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + n1 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n1.Close() + n2 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n2.Close() + n3 := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n3.Close() - ok, err := tb.Free(n.ID()) - assert.NoError(t, err) - assert.False(t, ok) - }) + spec1 := &scheme.SpecMeta{ + ID: n1.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n2.ID(), + Port: node.PortIn, + }, + }, + }, + } + spec2 := &scheme.SpecMeta{ + ID: n2.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n3.ID(), + Port: node.PortIn, + }, + }, + }, + } + spec3 := &scheme.SpecMeta{ + ID: n3.ID(), + Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortOut: { + { + ID: n1.ID(), + Port: node.PortIn, + }, + }, + }, + } - t.Run("exists", func(t *testing.T) { - tb := NewTable() - defer func() { _ = tb.Close() }() + p1, _ := n1.Port(node.PortIn) + p2, _ := n2.Port(node.PortIn) + p3, _ := n3.Port(node.PortIn) - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + _ = tb.Insert(&Symbol{Node: n1, Spec: spec1}) + _ = tb.Insert(&Symbol{Node: n2, Spec: spec2}) + _ = tb.Insert(&Symbol{Node: n3, Spec: spec3}) - tb.Insert(n) + ok, err := tb.Free(n1.ID()) + assert.NoError(t, err) + assert.True(t, ok) - ok, err := tb.Free(n.ID()) - assert.NoError(t, err) - assert.True(t, ok) - }) + assert.Equal(t, 0, p1.Links()) + + ok, err = tb.Free(n2.ID()) + assert.NoError(t, err) + assert.True(t, ok) + + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 0, p2.Links()) + + ok, err = tb.Free(n3.ID()) + assert.NoError(t, err) + assert.True(t, ok) + + assert.Equal(t, 0, p1.Links()) + assert.Equal(t, 0, p2.Links()) + assert.Equal(t, 0, p3.Links()) } -func TestTable_Lookup(t *testing.T) { - t.Run("not exists", func(t *testing.T) { - tb := NewTable() - defer func() { _ = tb.Close() }() +func TestTable_LookupByID(t *testing.T) { + tb := NewTable() + defer tb.Close() - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n.Close() + spec := &scheme.SpecMeta{ + ID: n.ID(), + } + sym := &Symbol{Node: n, Spec: spec} - s, ok := tb.Lookup(n.ID()) - assert.False(t, ok) - assert.Nil(t, s) - }) + _ = tb.Insert(sym) - t.Run("exists", func(t *testing.T) { - tb := NewTable() - defer func() { _ = tb.Close() }() + r, ok := tb.LookupByID(n.ID()) + assert.True(t, ok) + assert.Equal(t, sym, r) +} - n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) +func TestTable_LookupByName(t *testing.T) { + tb := NewTable() + defer tb.Close() - tb.Insert(n) + n := node.NewOneToOneNode(node.OneToOneNodeConfig{}) + defer n.Close() + spec := &scheme.SpecMeta{ + ID: n.ID(), + Namespace: scheme.NamespaceDefault, + Name: faker.Word(), + } + sym := &Symbol{Node: n, Spec: spec} - s, ok := tb.Lookup(n.ID()) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, n.ID(), s.ID()) - }) + _ = tb.Insert(sym) + + r, ok := tb.LookupByName(spec.GetNamespace(), spec.GetName()) + assert.True(t, ok) + assert.Equal(t, sym, r) } diff --git a/pkg/symbol/unloadhook.go b/pkg/symbol/unloadhook.go new file mode 100644 index 00000000..70e50769 --- /dev/null +++ b/pkg/symbol/unloadhook.go @@ -0,0 +1,18 @@ +package symbol + +import "github.com/siyul-park/uniflow/pkg/node" + +type ( + // UnloadHook is a hook that is called node.Node is unloaded. + UnloadHook interface { + Unload(n node.Node) error + } + + UnloadHookFunc func(n node.Node) error +) + +var _ UnloadHook = UnloadHookFunc(func(n node.Node) error { return nil }) + +func (f UnloadHookFunc) Unload(n node.Node) error { + return f(n) +}