Skip to content

Commit 9531f3f

Browse files
committed
fix: lazy load and unload child nodes
1 parent a3faf66 commit 9531f3f

File tree

9 files changed

+376
-81
lines changed

9 files changed

+376
-81
lines changed

pkg/chart/cluster.go

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
// ClusterNode manages the ports and symbol table for the cluster.
1414
type ClusterNode struct {
15+
symbols []*symbol.Symbol
1516
table *symbol.Table
1617
inPorts map[string]*port.InPort
1718
outPorts map[string]*port.OutPort
@@ -23,22 +24,42 @@ type ClusterNode struct {
2324
var _ node.Node = (*ClusterNode)(nil)
2425

2526
// NewClusterNode creates a new ClusterNode with the provided symbol table.
26-
func NewClusterNode(table *symbol.Table) *ClusterNode {
27+
func NewClusterNode(symbols []*symbol.Symbol, opts ...symbol.TableOption) *ClusterNode {
2728
return &ClusterNode{
28-
table: table,
29+
symbols: symbols,
30+
table: symbol.NewTable(opts...),
2931
inPorts: make(map[string]*port.InPort),
3032
outPorts: make(map[string]*port.OutPort),
3133
_inPorts: make(map[string]*port.InPort),
3234
_outPorts: make(map[string]*port.OutPort),
3335
}
3436
}
3537

36-
// Inbound sets up an input port and links it to the provided port.
38+
// Keys returns all keys from the symbol table.
39+
func (n *ClusterNode) Keys() []uuid.UUID {
40+
keys := make([]uuid.UUID, 0, len(n.symbols))
41+
for _, sb := range n.symbols {
42+
keys = append(keys, sb.ID())
43+
}
44+
return keys
45+
}
46+
47+
// Lookup retrieves a symbol from the table by its UUID.
48+
func (n *ClusterNode) Lookup(id uuid.UUID) *symbol.Symbol {
49+
for _, sb := range n.symbols {
50+
if sb.ID() == id {
51+
return sb
52+
}
53+
}
54+
return nil
55+
}
56+
57+
// Inbound links an external input to an internal symbol's input port.
3758
func (n *ClusterNode) Inbound(source string, id uuid.UUID, target string) bool {
3859
n.mu.Lock()
3960
defer n.mu.Unlock()
4061

41-
sb := n.table.Lookup(id)
62+
sb := n.Lookup(id)
4263
if sb == nil {
4364
return false
4465
}
@@ -71,12 +92,12 @@ func (n *ClusterNode) Inbound(source string, id uuid.UUID, target string) bool {
7192
return true
7293
}
7394

74-
// Outbound sets up an output port and links it to the provided port.
95+
// Outbound links an external output to an internal symbol's output port.
7596
func (n *ClusterNode) Outbound(source string, id uuid.UUID, target string) bool {
7697
n.mu.Lock()
7798
defer n.mu.Unlock()
7899

79-
sb := n.table.Lookup(id)
100+
sb := n.Lookup(id)
80101
if sb == nil {
81102
return false
82103
}
@@ -109,26 +130,39 @@ func (n *ClusterNode) Outbound(source string, id uuid.UUID, target string) bool
109130
return true
110131
}
111132

112-
// Symbols retrieves all symbols from the table.
113-
func (n *ClusterNode) Symbols() []*symbol.Symbol {
114-
n.mu.RLock()
115-
defer n.mu.RUnlock()
133+
// Load processes all initialization hooks for symbols.
134+
func (n *ClusterNode) Load(hook symbol.LoadHook) error {
135+
n.mu.Lock()
136+
defer n.mu.Unlock()
137+
138+
n.table.AddLoadHook(hook)
139+
defer n.table.RemoveLoadHook(hook)
116140

117-
var symbols []*symbol.Symbol
118-
for _, key := range n.table.Keys() {
119-
if sym := n.table.Lookup(key); sym != nil {
120-
symbols = append(symbols, sym)
141+
for _, sb := range n.symbols {
142+
if n.table.Lookup(sb.ID()) != nil {
143+
continue
144+
}
145+
146+
sb := &symbol.Symbol{
147+
Spec: sb.Spec,
148+
Node: node.NoCloser(sb.Node),
149+
}
150+
if err := n.table.Insert(sb); err != nil {
151+
return err
121152
}
122153
}
123-
return symbols
154+
return nil
124155
}
125156

126-
// Symbol retrieves a specific symbol by UUID.
127-
func (n *ClusterNode) Symbol(id uuid.UUID) *symbol.Symbol {
128-
n.mu.RLock()
129-
defer n.mu.RUnlock()
157+
// Unload processes all termination hooks for symbols.
158+
func (n *ClusterNode) Unload(hook symbol.UnloadHook) error {
159+
n.mu.Lock()
160+
defer n.mu.Unlock()
161+
162+
n.table.AddUnloadHook(hook)
163+
defer n.table.RemoveUnloadHook(hook)
130164

131-
return n.table.Lookup(id)
165+
return n.table.Close()
132166
}
133167

134168
// In returns the input port by name.
@@ -156,6 +190,12 @@ func (n *ClusterNode) Close() error {
156190
return err
157191
}
158192

193+
for _, sb := range n.symbols {
194+
if err := sb.Close(); err != nil {
195+
return err
196+
}
197+
}
198+
159199
for _, inPort := range n.inPorts {
160200
inPort.Close()
161201
}

pkg/chart/cluster_test.go

Lines changed: 86 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,93 +18,147 @@ import (
1818
)
1919

2020
func TestNewClusterNode(t *testing.T) {
21-
n := NewClusterNode(symbol.NewTable())
21+
n := NewClusterNode(nil)
2222
assert.NotNil(t, n)
2323
assert.NoError(t, n.Close())
2424
}
2525

26-
func TestClusterNode_Inbound(t *testing.T) {
27-
tb := symbol.NewTable()
26+
func TestClusterNode_Keys(t *testing.T) {
27+
sb := &symbol.Symbol{
28+
Spec: &spec.Meta{
29+
ID: uuid.Must(uuid.NewV7()),
30+
Kind: faker.Word(),
31+
},
32+
Node: node.NewOneToOneNode(nil),
33+
}
34+
35+
n := NewClusterNode([]*symbol.Symbol{sb})
36+
defer n.Close()
2837

38+
keys := n.Keys()
39+
assert.Len(t, keys, 1)
40+
assert.Equal(t, sb.ID(), keys[0])
41+
}
42+
43+
func TestClusterNode_Lookup(t *testing.T) {
44+
sb := &symbol.Symbol{
45+
Spec: &spec.Meta{
46+
ID: uuid.Must(uuid.NewV7()),
47+
Kind: faker.Word(),
48+
},
49+
Node: node.NewOneToOneNode(nil),
50+
}
51+
52+
n := NewClusterNode([]*symbol.Symbol{sb})
53+
defer n.Close()
54+
55+
assert.Equal(t, sb, n.Lookup(sb.ID()))
56+
}
57+
58+
func TestClusterNode_Inbound(t *testing.T) {
2959
sb := &symbol.Symbol{
3060
Spec: &spec.Meta{
3161
ID: uuid.Must(uuid.NewV7()),
3262
Kind: faker.Word(),
3363
},
3464
Node: node.NewOneToOneNode(nil),
3565
}
36-
tb.Insert(sb)
3766

38-
n := NewClusterNode(tb)
67+
n := NewClusterNode([]*symbol.Symbol{sb})
3968
defer n.Close()
4069

4170
n.Inbound(node.PortIn, sb.ID(), node.PortIn)
4271
assert.NotNil(t, n.In(node.PortIn))
4372
}
4473

4574
func TestClusterNode_Outbound(t *testing.T) {
46-
tb := symbol.NewTable()
47-
4875
sb := &symbol.Symbol{
4976
Spec: &spec.Meta{
5077
ID: uuid.Must(uuid.NewV7()),
5178
Kind: faker.Word(),
5279
},
5380
Node: node.NewOneToOneNode(nil),
5481
}
55-
tb.Insert(sb)
5682

57-
n := NewClusterNode(tb)
83+
n := NewClusterNode([]*symbol.Symbol{sb})
5884
defer n.Close()
5985

6086
n.Outbound(node.PortOut, sb.ID(), node.PortOut)
6187
assert.NotNil(t, n.Out(node.PortOut))
6288
}
6389

64-
func TestClusterNode_Symbols(t *testing.T) {
65-
tb := symbol.NewTable()
66-
67-
sb := &symbol.Symbol{
90+
func TestClusterNode_Load(t *testing.T) {
91+
sb1 := &symbol.Symbol{
6892
Spec: &spec.Meta{
6993
ID: uuid.Must(uuid.NewV7()),
7094
Kind: faker.Word(),
7195
},
7296
Node: node.NewOneToOneNode(nil),
7397
}
74-
tb.Insert(sb)
75-
76-
n := NewClusterNode(tb)
98+
sb2 := &symbol.Symbol{
99+
Spec: &spec.Meta{
100+
ID: uuid.Must(uuid.NewV7()),
101+
Kind: faker.Word(),
102+
Ports: map[string][]spec.Port{
103+
node.PortOut: {
104+
{
105+
ID: sb1.ID(),
106+
Port: node.PortIn,
107+
},
108+
},
109+
},
110+
},
111+
Node: node.NewOneToOneNode(nil),
112+
}
113+
n := NewClusterNode([]*symbol.Symbol{sb1, sb2})
77114
defer n.Close()
78115

79-
symbols := n.Symbols()
80-
assert.Len(t, symbols, 1)
81-
assert.Equal(t, sb, symbols[0])
82-
}
116+
err := n.Load(nil)
117+
assert.NoError(t, err)
83118

84-
func TestClusterNode_Symbol(t *testing.T) {
85-
tb := symbol.NewTable()
119+
out := sb2.Node.Out(node.PortOut)
120+
assert.Equal(t, 1, out.Links())
121+
}
86122

87-
sb := &symbol.Symbol{
123+
func TestClusterNode_Unload(t *testing.T) {
124+
sb1 := &symbol.Symbol{
88125
Spec: &spec.Meta{
89126
ID: uuid.Must(uuid.NewV7()),
90127
Kind: faker.Word(),
91128
},
92129
Node: node.NewOneToOneNode(nil),
93130
}
94-
tb.Insert(sb)
95-
96-
n := NewClusterNode(tb)
131+
sb2 := &symbol.Symbol{
132+
Spec: &spec.Meta{
133+
ID: uuid.Must(uuid.NewV7()),
134+
Kind: faker.Word(),
135+
Ports: map[string][]spec.Port{
136+
node.PortOut: {
137+
{
138+
ID: sb1.ID(),
139+
Port: node.PortIn,
140+
},
141+
},
142+
},
143+
},
144+
Node: node.NewOneToOneNode(nil),
145+
}
146+
n := NewClusterNode([]*symbol.Symbol{sb1, sb2})
97147
defer n.Close()
98148

99-
assert.Equal(t, sb, n.Symbol(sb.ID()))
149+
_ = n.Load(nil)
150+
151+
err := n.Unload(nil)
152+
assert.NoError(t, err)
153+
154+
out := sb2.Node.Out(node.PortOut)
155+
assert.Equal(t, 0, out.Links())
100156
}
101157

102158
func TestClusterNode_SendAndReceive(t *testing.T) {
103159
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
104160
defer cancel()
105161

106-
tb := symbol.NewTable()
107-
108162
sb := &symbol.Symbol{
109163
Spec: &spec.Meta{
110164
ID: uuid.Must(uuid.NewV7()),
@@ -114,11 +168,12 @@ func TestClusterNode_SendAndReceive(t *testing.T) {
114168
return inPck, nil
115169
}),
116170
}
117-
tb.Insert(sb)
118171

119-
n := NewClusterNode(tb)
172+
n := NewClusterNode([]*symbol.Symbol{sb})
120173
defer n.Close()
121174

175+
_ = n.Load(nil)
176+
122177
n.Inbound(node.PortIn, sb.ID(), node.PortIn)
123178
n.Outbound(node.PortOut, sb.ID(), node.PortOut)
124179

0 commit comments

Comments
 (0)