diff --git a/ext/pkg/network/listener.go b/ext/pkg/network/listener.go index 21c33cac..90e58a4f 100644 --- a/ext/pkg/network/listener.go +++ b/ext/pkg/network/listener.go @@ -211,11 +211,11 @@ func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { var backPck *packet.Packet if errPck != nil { - backPck = packet.Write(errWriter, errPck) + backPck = packet.Send(errWriter, errPck) } else { - backPck = packet.Write(outWriter, outPck) + backPck = packet.Send(outWriter, outPck) if _, ok := backPck.Payload().(types.Error); ok { - backPck = packet.WriteOrFallback(errWriter, backPck, backPck) + backPck = packet.SendOrFallback(errWriter, backPck, backPck) } } diff --git a/ext/pkg/network/websocket.go b/ext/pkg/network/websocket.go index e5a234e1..04aaa93a 100644 --- a/ext/pkg/network/websocket.go +++ b/ext/pkg/network/websocket.go @@ -177,7 +177,7 @@ func (n *WebSocketConnNode) connect(proc *process.Process) { for inPck := range ioReader.Read() { if conn, err := n.action(proc, inPck); err != nil { errPck := packet.New(types.NewError(err)) - backPck := packet.WriteOrFallback(errWriter, errPck, errPck) + backPck := packet.SendOrFallback(errWriter, errPck, errPck) ioReader.Receive(backPck) } else { n.conns.Store(proc, conn) @@ -254,7 +254,7 @@ func (n *WebSocketConnNode) produce(proc *process.Process) { }) outPck := packet.New(outPayload) - packet.Write(outWriter, outPck) + packet.Send(outWriter, outPck) proc.Wait() proc.Exit(nil) @@ -277,7 +277,7 @@ func (n *WebSocketConnNode) produce(proc *process.Process) { }) outPck := packet.New(outPayload) - packet.Write(outWriter, outPck) + packet.Send(outWriter, outPck) child.Wait() child.Exit(nil) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 2b11953e..a308ada3 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -16,8 +16,8 @@ type Agent struct { symbols map[uuid.UUID]*symbol.Symbol processes map[uuid.UUID]*process.Process frames map[uuid.UUID][]*Frame - inbounds map[uuid.UUID]map[string]port.Hook - outbounds map[uuid.UUID]map[string]port.Hook + inbounds map[uuid.UUID]map[string]port.OpenHook + outbounds map[uuid.UUID]map[string]port.OpenHook watchers Watchers mu sync.RWMutex } @@ -31,8 +31,8 @@ func New() *Agent { symbols: make(map[uuid.UUID]*symbol.Symbol), processes: make(map[uuid.UUID]*process.Process), frames: make(map[uuid.UUID][]*Frame), - inbounds: make(map[uuid.UUID]map[string]port.Hook), - outbounds: make(map[uuid.UUID]map[string]port.Hook), + inbounds: make(map[uuid.UUID]map[string]port.OpenHook), + outbounds: make(map[uuid.UUID]map[string]port.OpenHook), } } @@ -117,15 +117,15 @@ func (a *Agent) Load(sym *symbol.Symbol) error { a.mu.Lock() defer a.mu.Unlock() - inbounds := make(map[string]port.Hook) - outbounds := make(map[string]port.Hook) + inbounds := make(map[string]port.OpenHook) + outbounds := make(map[string]port.OpenHook) a.symbols[sym.ID()] = sym a.inbounds[sym.ID()] = inbounds a.outbounds[sym.ID()] = outbounds for name, in := range sym.Ins() { - hook := port.HookFunc(func(proc *process.Process) { + hook := port.OpenHookFunc(func(proc *process.Process) { a.accept(proc) inboundHook, outboundHook := a.hooks(proc, sym, in, nil) @@ -135,12 +135,12 @@ func (a *Agent) Load(sym *symbol.Symbol) error { reader.AddOutboundHook(outboundHook) }) - in.AddHook(hook) + in.AddOpenHook(hook) inbounds[name] = hook } for name, out := range sym.Outs() { - hook := port.HookFunc(func(proc *process.Process) { + hook := port.OpenHookFunc(func(proc *process.Process) { a.accept(proc) inboundHook, outboundHook := a.hooks(proc, sym, nil, out) @@ -150,7 +150,7 @@ func (a *Agent) Load(sym *symbol.Symbol) error { writer.AddOutboundHook(outboundHook) }) - out.AddHook(hook) + out.AddOpenHook(hook) outbounds[name] = hook } @@ -164,11 +164,11 @@ func (a *Agent) Unload(sym *symbol.Symbol) error { for name, hook := range a.inbounds[sym.ID()] { in := sym.In(name) - in.RemoveHook(hook) + in.RemoveOpenHook(hook) } for name, hook := range a.outbounds[sym.ID()] { out := sym.Out(name) - out.RemoveHook(hook) + out.RemoveOpenHook(hook) } delete(a.inbounds, sym.ID()) diff --git a/pkg/packet/writer.go b/pkg/packet/writer.go index c1d43ac8..cadb4470 100644 --- a/pkg/packet/writer.go +++ b/pkg/packet/writer.go @@ -18,13 +18,13 @@ type Writer struct { mu sync.Mutex } -// Write sends a packet to the writer and returns the received packet or None if the write fails. -func Write(writer *Writer, pck *Packet) *Packet { - return WriteOrFallback(writer, pck, None) +// Send sends a packet to the writer and returns the received packet or None if the write fails. +func Send(writer *Writer, pck *Packet) *Packet { + return SendOrFallback(writer, pck, None) } -// WriteOrFallback sends a packet to the writer and returns the received packet or a backup packet if the write fails. -func WriteOrFallback(writer *Writer, outPck *Packet, backPck *Packet) *Packet { +// SendOrFallback sends a packet to the writer and returns the received packet or a backup packet if the write fails. +func SendOrFallback(writer *Writer, outPck *Packet, backPck *Packet) *Packet { if writer.Write(outPck) == 0 { return backPck } diff --git a/pkg/packet/writer_test.go b/pkg/packet/writer_test.go index fe6fe890..87eb003b 100644 --- a/pkg/packet/writer_test.go +++ b/pkg/packet/writer_test.go @@ -27,7 +27,7 @@ func TestWrite(t *testing.T) { outPck := New(nil) - backPck := Write(w, outPck) + backPck := Send(w, outPck) assert.Equal(t, outPck, backPck) } @@ -53,7 +53,7 @@ func TestCallOrReturn(t *testing.T) { outPck := New(nil) - backPck := WriteOrFallback(w, outPck, None) + backPck := SendOrFallback(w, outPck, None) assert.Equal(t, outPck, backPck) }) @@ -63,7 +63,7 @@ func TestCallOrReturn(t *testing.T) { outPck := New(nil) - backPck := WriteOrFallback(w, outPck, None) + backPck := SendOrFallback(w, outPck, None) assert.Equal(t, None, backPck) }) } diff --git a/pkg/port/closehook.go b/pkg/port/closehook.go new file mode 100644 index 00000000..98454533 --- /dev/null +++ b/pkg/port/closehook.go @@ -0,0 +1,33 @@ +package port + +// CloseHook is an interface that defines a method for handling resource cleanup. +type CloseHook interface { + // Close performs the necessary cleanup actions. + Close() +} + +// CloseHooks represents a slice of CloseHook interfaces, which are processed in reverse order when closed. +type CloseHooks []CloseHook + +type closeHook struct { + close func() +} + +var _ CloseHook = (CloseHooks)(nil) +var _ CloseHook = (*closeHook)(nil) + +// CloseHookFunc creates a new CloseHook from the provided function. +func CloseHookFunc(fn func()) CloseHook { + return &closeHook{close: fn} +} + +func (h CloseHooks) Close() { + for i := len(h) - 1; i >= 0; i-- { + hook := h[i] + hook.Close() + } +} + +func (h *closeHook) Close() { + h.close() +} diff --git a/pkg/port/hook.go b/pkg/port/hook.go deleted file mode 100644 index a42f4e9e..00000000 --- a/pkg/port/hook.go +++ /dev/null @@ -1,35 +0,0 @@ -package port - -import "github.com/siyul-park/uniflow/pkg/process" - -// Hook defines an interface for processing packets associated with a process. -type Hook interface { - // Open processes the given process. - Open(*process.Process) -} - -// Hooks is a slice of Hook interfaces, processed in reverse order. -type Hooks []Hook - -type hook struct { - open func(*process.Process) -} - -var _ Hook = (Hooks)(nil) -var _ Hook = (*hook)(nil) - -// HookFunc creates a new Hook from the provided function. -func HookFunc(open func(*process.Process)) Hook { - return &hook{open: open} -} - -func (h Hooks) Open(proc *process.Process) { - for i := len(h) - 1; i >= 0; i-- { - hook := h[i] - hook.Open(proc) - } -} - -func (h *hook) Open(proc *process.Process) { - h.open(proc) -} diff --git a/pkg/port/inport.go b/pkg/port/inport.go index 09b6cea7..94db17a9 100644 --- a/pkg/port/inport.go +++ b/pkg/port/inport.go @@ -9,10 +9,11 @@ import ( // InPort represents an input port used for receiving data. type InPort struct { - readers map[*process.Process]*packet.Reader - hooks Hooks - listeners Listeners - mu sync.RWMutex + readers map[*process.Process]*packet.Reader + openHooks OpenHooks + closeHooks CloseHooks + listeners Listeners + mu sync.RWMutex } // NewIn creates and returns a new InPort instance. @@ -22,28 +23,56 @@ func NewIn() *InPort { } } -// AddHook adds a hook to the port if it is not already present. -func (p *InPort) AddHook(hook Hook) bool { +// AddOpenHook adds a hook to the port if it is not already present. +func (p *InPort) AddOpenHook(hook OpenHook) bool { p.mu.Lock() defer p.mu.Unlock() - for _, h := range p.hooks { + for _, h := range p.openHooks { if h == hook { return false } } - p.hooks = append(p.hooks, hook) + p.openHooks = append(p.openHooks, hook) return true } -// RemoveHook removes a hook from the port if it ok. -func (p *InPort) RemoveHook(hook Hook) bool { +// RemoveOpenHook removes a hook from the port if it exists. +func (p *InPort) RemoveOpenHook(hook OpenHook) bool { p.mu.Lock() defer p.mu.Unlock() - for i, h := range p.hooks { + for i, h := range p.openHooks { if h == hook { - p.hooks = append(p.hooks[:i], p.hooks[i+1:]...) + p.openHooks = append(p.openHooks[:i], p.openHooks[i+1:]...) + return true + } + } + return false +} + +// AddCloseHook adds a close hook to the port if it is not already present. +func (p *InPort) AddCloseHook(hook CloseHook) bool { + p.mu.Lock() + defer p.mu.Unlock() + + for _, h := range p.closeHooks { + if h == hook { + return false + } + } + p.closeHooks = append(p.closeHooks, hook) + return true +} + +// RemoveCloseHook removes a close hook from the port if it exists. +func (p *InPort) RemoveCloseHook(hook CloseHook) bool { + p.mu.Lock() + defer p.mu.Unlock() + + for i, h := range p.closeHooks { + if h == hook { + p.closeHooks = append(p.closeHooks[:i], p.closeHooks[i+1:]...) return true } } @@ -77,6 +106,11 @@ func (p *InPort) Open(proc *process.Process) *packet.Reader { reader = packet.NewReader() p.readers[proc] = reader + openHooks := p.openHooks + listeners := p.listeners + + p.mu.Unlock() + proc.AddExitHook(process.ExitFunc(func(_ error) { p.mu.Lock() defer p.mu.Unlock() @@ -85,26 +119,25 @@ func (p *InPort) Open(proc *process.Process) *packet.Reader { reader.Close() })) - hooks := p.hooks - listeners := p.listeners - - p.mu.Unlock() - - hooks.Open(proc) + openHooks.Open(proc) listeners.Accept(proc) return reader } -// Close shuts down all readers associated with the input port and clears hooks and listeners. +// Close shuts down all readers associated with the input port and clears hooks, listeners, and processes close hooks. func (p *InPort) Close() { p.mu.Lock() defer p.mu.Unlock() + p.closeHooks.Close() + for _, reader := range p.readers { reader.Close() } + p.readers = make(map[*process.Process]*packet.Reader) - p.hooks = nil + p.openHooks = nil + p.closeHooks = nil p.listeners = nil } diff --git a/pkg/port/inport_test.go b/pkg/port/inport_test.go index e42a78e3..baaeac3e 100644 --- a/pkg/port/inport_test.go +++ b/pkg/port/inport_test.go @@ -22,7 +22,7 @@ func TestInPort_Open(t *testing.T) { assert.Equal(t, r1, r2) } -func TestInPort_Hook(t *testing.T) { +func TestInPort_OpenHook(t *testing.T) { proc := process.New() defer proc.Exit(nil) @@ -30,14 +30,14 @@ func TestInPort_Hook(t *testing.T) { defer in.Close() done := make(chan struct{}) - h := HookFunc(func(proc *process.Process) { + h := OpenHookFunc(func(proc *process.Process) { close(done) }) - ok := in.AddHook(h) + ok := in.AddOpenHook(h) assert.True(t, ok) - ok = in.AddHook(h) + ok = in.AddOpenHook(h) assert.False(t, ok) _ = in.Open(proc) @@ -51,13 +51,40 @@ func TestInPort_Hook(t *testing.T) { assert.NoError(t, ctx.Err()) } - ok = in.RemoveHook(h) + ok = in.RemoveOpenHook(h) assert.True(t, ok) - ok = in.RemoveHook(h) + ok = in.RemoveOpenHook(h) assert.False(t, ok) } +func TestInPort_CloseHook(t *testing.T) { + in := NewIn() + defer in.Close() + + done := make(chan struct{}) + h := CloseHookFunc(func() { + close(done) + }) + + ok := in.AddCloseHook(h) + assert.True(t, ok) + + ok = in.AddCloseHook(h) + assert.False(t, ok) + + in.Close() + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + select { + case <-done: + case <-ctx.Done(): + assert.NoError(t, ctx.Err()) + } +} + func TestInPort_Listener(t *testing.T) { proc := process.New() defer proc.Exit(nil) diff --git a/pkg/port/openhook.go b/pkg/port/openhook.go new file mode 100644 index 00000000..b0d65a51 --- /dev/null +++ b/pkg/port/openhook.go @@ -0,0 +1,35 @@ +package port + +import "github.com/siyul-park/uniflow/pkg/process" + +// OpenHook defines an interface for processing packets associated with a process. +type OpenHook interface { + // Open processes the given process. + Open(*process.Process) +} + +// OpenHooks is a slice of Hook interfaces, processed in reverse order. +type OpenHooks []OpenHook + +type openHook struct { + open func(*process.Process) +} + +var _ OpenHook = (OpenHooks)(nil) +var _ OpenHook = (*openHook)(nil) + +// OpenHookFunc creates a new Hook from the provided function. +func OpenHookFunc(open func(*process.Process)) OpenHook { + return &openHook{open: open} +} + +func (h OpenHooks) Open(proc *process.Process) { + for i := len(h) - 1; i >= 0; i-- { + hook := h[i] + hook.Open(proc) + } +} + +func (h *openHook) Open(proc *process.Process) { + h.open(proc) +} diff --git a/pkg/port/outport.go b/pkg/port/outport.go index 24a2b8ef..8240ea99 100644 --- a/pkg/port/outport.go +++ b/pkg/port/outport.go @@ -10,15 +10,16 @@ import ( // OutPort represents an output port for sending data. type OutPort struct { - ins []*InPort - writers map[*process.Process]*packet.Writer - hooks Hooks - listeners Listeners - mu sync.RWMutex + ins []*InPort + writers map[*process.Process]*packet.Writer + openHooks OpenHooks + closeHooks CloseHooks + listeners Listeners + mu sync.RWMutex } -// Write sends the payload through the OutPort and returns the result or an error. -func Write(out *OutPort, payload types.Value) (types.Value, error) { +// Send sends the payload through the OutPort and returns the result or an error. +func Send(out *OutPort, payload types.Value) (types.Value, error) { var err error proc := process.New() @@ -28,7 +29,7 @@ func Write(out *OutPort, payload types.Value) (types.Value, error) { defer writer.Close() outPck := packet.New(payload) - backPck := packet.Write(writer, outPck) + backPck := packet.Send(writer, outPck) payload = backPck.Payload() @@ -49,28 +50,56 @@ func NewOut() *OutPort { } } -// AddHook adds a hook for packet processing if not already present. -func (p *OutPort) AddHook(hook Hook) bool { +// AddOpenHook adds a hook for packet processing if not already present. +func (p *OutPort) AddOpenHook(hook OpenHook) bool { p.mu.Lock() defer p.mu.Unlock() - for _, h := range p.hooks { + for _, h := range p.openHooks { if h == hook { return false } } - p.hooks = append(p.hooks, hook) + p.openHooks = append(p.openHooks, hook) return true } -// RemoveHook removes a hook from the port if present. -func (p *OutPort) RemoveHook(hook Hook) bool { +// RemoveOpenHook removes a hook from the port if present. +func (p *OutPort) RemoveOpenHook(hook OpenHook) bool { p.mu.Lock() defer p.mu.Unlock() - for i, h := range p.hooks { + for i, h := range p.openHooks { if h == hook { - p.hooks = append(p.hooks[:i], p.hooks[i+1:]...) + p.openHooks = append(p.openHooks[:i], p.openHooks[i+1:]...) + return true + } + } + return false +} + +// AddCloseHook adds a close hook to the port if not already present. +func (p *OutPort) AddCloseHook(hook CloseHook) bool { + p.mu.Lock() + defer p.mu.Unlock() + + for _, h := range p.closeHooks { + if h == hook { + return false + } + } + p.closeHooks = append(p.closeHooks, hook) + return true +} + +// RemoveCloseHook removes a close hook from the port if present. +func (p *OutPort) RemoveCloseHook(hook CloseHook) bool { + p.mu.Lock() + defer p.mu.Unlock() + + for i, h := range p.closeHooks { + if h == hook { + p.closeHooks = append(p.closeHooks[:i], p.closeHooks[i+1:]...) return true } } @@ -104,6 +133,10 @@ func (p *OutPort) Link(in *InPort) { p.mu.Lock() defer p.mu.Unlock() + in.AddCloseHook(CloseHookFunc(func() { + go p.Unlink(in) + })) + p.ins = append(p.ins, in) } @@ -122,46 +155,37 @@ func (p *OutPort) Unlink(in *InPort) { // Open opens the output port for the given process and returns a writer. func (p *OutPort) Open(proc *process.Process) *packet.Writer { - writer, ok := func() (*packet.Writer, bool) { - p.mu.Lock() - defer p.mu.Unlock() + p.mu.Lock() - writer, ok := p.writers[proc] - if !ok { - writer = packet.NewWriter() - if proc.Status() == process.StatusTerminated { - writer.Close() - return writer, true - } - - p.writers[proc] = writer - proc.AddExitHook(process.ExitFunc(func(_ error) { - p.mu.Lock() - defer p.mu.Unlock() - - delete(p.writers, proc) - writer.Close() - })) - } - return writer, ok - }() + writer, ok := p.writers[proc] + if ok { + p.mu.Unlock() + return writer + } - if !ok { - p.mu.RLock() + writer = packet.NewWriter() + p.writers[proc] = writer - for _, in := range p.ins { - reader := in.Open(proc) - writer.Link(reader) - } + for _, in := range p.ins { + reader := in.Open(proc) + writer.Link(reader) + } - hooks := p.hooks - listeners := p.listeners + openHooks := p.openHooks + listeners := p.listeners - p.mu.RUnlock() + p.mu.Unlock() - hooks.Open(proc) - listeners.Accept(proc) - } + proc.AddExitHook(process.ExitFunc(func(_ error) { + p.mu.Lock() + defer p.mu.Unlock() + + delete(p.writers, proc) + writer.Close() + })) + + openHooks.Open(proc) + listeners.Accept(proc) return writer } @@ -171,12 +195,15 @@ func (p *OutPort) Close() { p.mu.Lock() defer p.mu.Unlock() + p.closeHooks.Close() + for _, writer := range p.writers { writer.Close() } p.writers = make(map[*process.Process]*packet.Writer) p.ins = nil - p.hooks = nil + p.openHooks = nil + p.closeHooks = nil p.listeners = nil } diff --git a/pkg/port/outport_test.go b/pkg/port/outport_test.go index 3fe8083b..c3ff13a1 100644 --- a/pkg/port/outport_test.go +++ b/pkg/port/outport_test.go @@ -13,7 +13,7 @@ func TestWrite(t *testing.T) { out := NewOut() defer out.Close() - res, err := Write(out, nil) + res, err := Send(out, nil) assert.NoError(t, err) assert.Nil(t, res) } @@ -45,7 +45,7 @@ func TestOutPort_Link(t *testing.T) { assert.Equal(t, 1, out.Links()) } -func TestOutPort_Hook(t *testing.T) { +func TestOutPort_OpenHook(t *testing.T) { proc := process.New() defer proc.Exit(nil) @@ -53,14 +53,14 @@ func TestOutPort_Hook(t *testing.T) { defer out.Close() done := make(chan struct{}) - h := HookFunc(func(proc *process.Process) { + h := OpenHookFunc(func(proc *process.Process) { close(done) }) - ok := out.AddHook(h) + ok := out.AddOpenHook(h) assert.True(t, ok) - ok = out.AddHook(h) + ok = out.AddOpenHook(h) assert.False(t, ok) _ = out.Open(proc) @@ -75,6 +75,33 @@ func TestOutPort_Hook(t *testing.T) { } } +func TestOutPort_CloseHook(t *testing.T) { + out := NewOut() + defer out.Close() + + done := make(chan struct{}) + h := CloseHookFunc(func() { + close(done) + }) + + ok := out.AddCloseHook(h) + assert.True(t, ok) + + ok = out.AddCloseHook(h) + assert.False(t, ok) + + out.Close() + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + select { + case <-done: + case <-ctx.Done(): + assert.NoError(t, ctx.Err()) + } +} + func TestOutPort_Listener(t *testing.T) { proc := process.New() defer proc.Exit(nil) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index d1cafe5c..7eb428b8 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -53,7 +53,7 @@ func New(config Config) *Runtime { config.SecretStore = secret.NewStore() } - symbolTable := symbol.NewTable(symbol.TableOptions{ + symbolTable := symbol.NewTable(symbol.TableOption{ LoadHooks: []symbol.LoadHook{config.Hook}, UnloadHooks: []symbol.UnloadHook{config.Hook}, }) diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 05c938da..58b803eb 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -78,9 +78,6 @@ func (s *Scheme) AddCodec(kind string, codec Codec) bool { // RemoveCodec removes the codec associated with a kind. func (s *Scheme) RemoveCodec(kind string) bool { - s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.codecs[kind]; ok { delete(s.codecs, kind) return true diff --git a/pkg/symbol/table.go b/pkg/symbol/table.go index 9e2c8166..74c4fe71 100644 --- a/pkg/symbol/table.go +++ b/pkg/symbol/table.go @@ -11,8 +11,8 @@ import ( "github.com/siyul-park/uniflow/pkg/types" ) -// TableOptions holds configurations for a Table instance. -type TableOptions struct { +// TableOption holds configurations for a Table instance. +type TableOption struct { LoadHooks []LoadHook // LoadHooks are functions executed when symbols are loaded. UnloadHooks []UnloadHook // UnloadHooks are functions executed when symbols are unloaded. } @@ -28,7 +28,7 @@ type Table struct { } // NewTable creates a new Table instance. -func NewTable(opts ...TableOptions) *Table { +func NewTable(opts ...TableOption) *Table { var loadHooks []LoadHook var unloadHooks []UnloadHook for _, opt := range opts { @@ -186,10 +186,9 @@ func (t *Table) links(sb *Symbol) { if ref, ok := t.symbols[id]; ok { if ref.Namespace() == sb.Namespace() { - if out != nil { - if in := ref.In(port.Port); in != nil { - out.Link(in) - } + in := ref.In(port.Port) + if out != nil && in != nil { + out.Link(in) } refences := t.refences[ref.ID()] @@ -218,10 +217,9 @@ func (t *Table) links(sb *Symbol) { for _, port := range ports { if (port.ID == sb.ID()) || (port.Name != "" && port.Name == sb.Name()) { - if out != nil { - if in := sb.In(port.Port); in != nil { - out.Link(in) - } + in := sb.In(port.Port) + if out != nil && in != nil { + out.Link(in) } refences := t.refences[sb.ID()] @@ -280,17 +278,16 @@ func (t *Table) unlinks(sb *Symbol) { func (t *Table) linked(sb *Symbol) []*Symbol { var linked []*Symbol - - nexts := []*Symbol{sb} - for len(nexts) > 0 { - sb := nexts[len(nexts)-1] + paths := []*Symbol{sb} + for len(paths) > 0 { + sb := paths[len(paths)-1] ok := true for _, ports := range t.refences[sb.ID()] { for _, port := range ports { next := t.symbols[port.ID] - ok = slices.Contains(nexts, next) || slices.Contains(linked, next) + ok = slices.Contains(paths, next) || slices.Contains(linked, next) if !ok { - nexts = append(nexts, next) + paths = append(paths, next) break } } @@ -299,11 +296,10 @@ func (t *Table) linked(sb *Symbol) []*Symbol { } } if ok { - nexts = nexts[0 : len(nexts)-1] + paths = paths[0 : len(paths)-1] linked = append(linked, sb) } } - slices.Reverse(linked) return linked } @@ -364,7 +360,7 @@ func (t *Table) init(sb *Symbol) error { return err } - _, err = port.Write(out, payload) + _, err = port.Send(out, payload) return err } diff --git a/pkg/symbol/table_test.go b/pkg/symbol/table_test.go index fb7496b5..7ac35f20 100644 --- a/pkg/symbol/table_test.go +++ b/pkg/symbol/table_test.go @@ -429,7 +429,7 @@ func TestTable_Hook(t *testing.T) { loaded := 0 unloaded := 0 - tb := NewTable(TableOptions{ + tb := NewTable(TableOption{ LoadHooks: []LoadHook{ LoadFunc(func(_ *Symbol) error { loaded += 1