Skip to content

Commit

Permalink
fix: lazy load and unload child nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 19, 2024
1 parent a3faf66 commit 0972f89
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 73 deletions.
80 changes: 60 additions & 20 deletions pkg/chart/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

// ClusterNode manages the ports and symbol table for the cluster.
type ClusterNode struct {
symbols []*symbol.Symbol
table *symbol.Table
inPorts map[string]*port.InPort
outPorts map[string]*port.OutPort
Expand All @@ -23,22 +24,42 @@ type ClusterNode struct {
var _ node.Node = (*ClusterNode)(nil)

// NewClusterNode creates a new ClusterNode with the provided symbol table.
func NewClusterNode(table *symbol.Table) *ClusterNode {
func NewClusterNode(symbols []*symbol.Symbol, opts ...symbol.TableOption) *ClusterNode {
return &ClusterNode{
table: table,
symbols: symbols,
table: symbol.NewTable(opts...),
inPorts: make(map[string]*port.InPort),
outPorts: make(map[string]*port.OutPort),
_inPorts: make(map[string]*port.InPort),
_outPorts: make(map[string]*port.OutPort),
}
}

// Inbound sets up an input port and links it to the provided port.
// Keys returns all keys from the symbol table.
func (n *ClusterNode) Keys() []uuid.UUID {
keys := make([]uuid.UUID, 0, len(n.symbols))
for _, sb := range n.symbols {
keys = append(keys, sb.ID())
}
return keys
}

// Lookup retrieves a symbol from the table by its UUID.
func (n *ClusterNode) Lookup(id uuid.UUID) *symbol.Symbol {
for _, sb := range n.symbols {
if sb.ID() == id {
return sb
}
}
return nil
}

// Inbound links an external input to an internal symbol's input port.
func (n *ClusterNode) Inbound(source string, id uuid.UUID, target string) bool {
n.mu.Lock()
defer n.mu.Unlock()

sb := n.table.Lookup(id)
sb := n.Lookup(id)
if sb == nil {
return false
}
Expand Down Expand Up @@ -71,12 +92,12 @@ func (n *ClusterNode) Inbound(source string, id uuid.UUID, target string) bool {
return true
}

// Outbound sets up an output port and links it to the provided port.
// Outbound links an external output to an internal symbol's output port.
func (n *ClusterNode) Outbound(source string, id uuid.UUID, target string) bool {
n.mu.Lock()
defer n.mu.Unlock()

sb := n.table.Lookup(id)
sb := n.Lookup(id)
if sb == nil {
return false
}
Expand Down Expand Up @@ -109,26 +130,39 @@ func (n *ClusterNode) Outbound(source string, id uuid.UUID, target string) bool
return true
}

// Symbols retrieves all symbols from the table.
func (n *ClusterNode) Symbols() []*symbol.Symbol {
n.mu.RLock()
defer n.mu.RUnlock()
// Load processes all initialization hooks for symbols.
func (n *ClusterNode) Load(hook symbol.LoadHook) error {
n.mu.Lock()
defer n.mu.Unlock()

n.table.AddLoadHook(hook)
defer n.table.RemoveLoadHook(hook)

var symbols []*symbol.Symbol
for _, key := range n.table.Keys() {
if sym := n.table.Lookup(key); sym != nil {
symbols = append(symbols, sym)
for _, sb := range n.symbols {
if n.table.Lookup(sb.ID()) != nil {
continue
}

sb := &symbol.Symbol{
Spec: sb.Spec,
Node: node.NoCloser(sb.Node),
}
if err := n.table.Insert(sb); err != nil {
return err
}
}
return symbols
return nil
}

// Symbol retrieves a specific symbol by UUID.
func (n *ClusterNode) Symbol(id uuid.UUID) *symbol.Symbol {
n.mu.RLock()
defer n.mu.RUnlock()
// Unload processes all termination hooks for symbols.
func (n *ClusterNode) Unload(hook symbol.UnloadHook) error {
n.mu.Lock()
defer n.mu.Unlock()

n.table.AddUnloadHook(hook)
defer n.table.RemoveUnloadHook(hook)

return n.table.Lookup(id)
return n.table.Close()
}

// In returns the input port by name.
Expand Down Expand Up @@ -156,6 +190,12 @@ func (n *ClusterNode) Close() error {
return err
}

for _, sb := range n.symbols {
if err := sb.Close(); err != nil {
return err
}
}

for _, inPort := range n.inPorts {
inPort.Close()
}
Expand Down
117 changes: 86 additions & 31 deletions pkg/chart/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,93 +18,147 @@ import (
)

func TestNewClusterNode(t *testing.T) {
n := NewClusterNode(symbol.NewTable())
n := NewClusterNode(nil)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestClusterNode_Inbound(t *testing.T) {
tb := symbol.NewTable()
func TestClusterNode_Keys(t *testing.T) {
sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}

n := NewClusterNode([]*symbol.Symbol{sb})
defer n.Close()

keys := n.Keys()
assert.Len(t, keys, 1)
assert.Equal(t, sb.ID(), keys[0])
}

func TestClusterNode_Lookup(t *testing.T) {
sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}

n := NewClusterNode([]*symbol.Symbol{sb})
defer n.Close()

assert.Equal(t, sb, n.Lookup(sb.ID()))
}

func TestClusterNode_Inbound(t *testing.T) {
sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}
tb.Insert(sb)

n := NewClusterNode(tb)
n := NewClusterNode([]*symbol.Symbol{sb})
defer n.Close()

n.Inbound(node.PortIn, sb.ID(), node.PortIn)
assert.NotNil(t, n.In(node.PortIn))
}

func TestClusterNode_Outbound(t *testing.T) {
tb := symbol.NewTable()

sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}
tb.Insert(sb)

n := NewClusterNode(tb)
n := NewClusterNode([]*symbol.Symbol{sb})
defer n.Close()

n.Outbound(node.PortOut, sb.ID(), node.PortOut)
assert.NotNil(t, n.Out(node.PortOut))
}

func TestClusterNode_Symbols(t *testing.T) {
tb := symbol.NewTable()

sb := &symbol.Symbol{
func TestClusterNode_Load(t *testing.T) {
sb1 := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}
tb.Insert(sb)

n := NewClusterNode(tb)
sb2 := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
Ports: map[string][]spec.Port{
node.PortOut: {
{
ID: sb1.ID(),
Port: node.PortIn,
},
},
},
},
Node: node.NewOneToOneNode(nil),
}
n := NewClusterNode([]*symbol.Symbol{sb1, sb2})
defer n.Close()

symbols := n.Symbols()
assert.Len(t, symbols, 1)
assert.Equal(t, sb, symbols[0])
}
err := n.Load(nil)
assert.NoError(t, err)

func TestClusterNode_Symbol(t *testing.T) {
tb := symbol.NewTable()
out := sb2.Node.Out(node.PortOut)
assert.Equal(t, 1, out.Links())
}

sb := &symbol.Symbol{
func TestClusterNode_Unload(t *testing.T) {
sb1 := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}
tb.Insert(sb)

n := NewClusterNode(tb)
sb2 := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
Ports: map[string][]spec.Port{
node.PortOut: {
{
ID: sb1.ID(),
Port: node.PortIn,
},
},
},
},
Node: node.NewOneToOneNode(nil),
}
n := NewClusterNode([]*symbol.Symbol{sb1, sb2})
defer n.Close()

assert.Equal(t, sb, n.Symbol(sb.ID()))
_ = n.Load(nil)

err := n.Unload(nil)
assert.NoError(t, err)

out := sb2.Node.Out(node.PortOut)
assert.Equal(t, 0, out.Links())
}

func TestClusterNode_SendAndReceive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

tb := symbol.NewTable()

sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Expand All @@ -114,11 +168,12 @@ func TestClusterNode_SendAndReceive(t *testing.T) {
return inPck, nil
}),
}
tb.Insert(sb)

n := NewClusterNode(tb)
n := NewClusterNode([]*symbol.Symbol{sb})
defer n.Close()

_ = n.Load(nil)

n.Inbound(node.PortIn, sb.ID(), node.PortIn)
n.Outbound(node.PortOut, sb.ID(), node.PortOut)

Expand Down
46 changes: 29 additions & 17 deletions pkg/chart/linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ type LinkerConfig struct {
type Linker struct {
scheme *scheme.Scheme
codecs map[string]scheme.Codec
loadHooks []symbol.LoadHook
unloadHooks []symbol.UnloadHook
loadHooks symbol.LoadHooks
unloadHooks symbol.UnloadHooks
mu sync.RWMutex
}

var _ LinkHook = (*Linker)(nil)
var _ UnlinkHook = (*Linker)(nil)
var _ symbol.LoadHook = (*Linker)(nil)
var _ symbol.UnloadHook = (*Linker)(nil)

// NewLinker creates a new Linker.
func NewLinker(config LinkerConfig) *Linker {
Expand Down Expand Up @@ -92,23 +94,11 @@ func (l *Linker) Link(chrt *Chart) error {
})
}

table := symbol.NewTable(symbol.TableOption{
LoadHooks: l.loadHooks,
UnloadHooks: l.unloadHooks,
n := NewClusterNode(symbols, symbol.TableOption{
LoadHooks: []symbol.LoadHook{l.loadHooks},
UnloadHooks: []symbol.UnloadHook{l.unloadHooks},
})

for _, sb := range symbols {
if err := table.Insert(sb); err != nil {
_ = table.Close()
for _, sb := range symbols {
_ = sb.Close()
}
return nil, err
}
}

n := NewClusterNode(table)

for name, ports := range chrt.GetInbound() {
for _, port := range ports {
for _, sb := range symbols {
Expand Down Expand Up @@ -153,3 +143,25 @@ func (l *Linker) Unlink(chrt *Chart) error {
delete(l.codecs, kind)
return nil
}

// Load loads the symbol's node if it is a ClusterNode.
func (l *Linker) Load(sb *symbol.Symbol) error {
n := sb.Node
if n, ok := n.(*ClusterNode); ok {
if err := n.Load(nil); err != nil {
return err
}
}
return nil
}

// Unload unloads the symbol's node if it is a ClusterNode.
func (l *Linker) Unload(sb *symbol.Symbol) error {
n := sb.Node
if n, ok := n.(*ClusterNode); ok {
if err := n.Unload(nil); err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 0972f89

Please sign in to comment.