Skip to content

Commit

Permalink
fix: escape death lock
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Oct 4, 2024
1 parent be90bca commit 10f65ea
Show file tree
Hide file tree
Showing 16 changed files with 308 additions and 168 deletions.
6 changes: 3 additions & 3 deletions ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {

var backPck *packet.Packet
if errPck != nil {
backPck = packet.Write(errWriter, errPck)
backPck = packet.Send(errWriter, errPck)
} else {
backPck = packet.Write(outWriter, outPck)
backPck = packet.Send(outWriter, outPck)
if _, ok := backPck.Payload().(types.Error); ok {
backPck = packet.WriteOrFallback(errWriter, backPck, backPck)
backPck = packet.SendOrFallback(errWriter, backPck, backPck)
}
}

Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/network/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (n *WebSocketConnNode) connect(proc *process.Process) {
for inPck := range ioReader.Read() {
if conn, err := n.action(proc, inPck); err != nil {
errPck := packet.New(types.NewError(err))
backPck := packet.WriteOrFallback(errWriter, errPck, errPck)
backPck := packet.SendOrFallback(errWriter, errPck, errPck)
ioReader.Receive(backPck)
} else {
n.conns.Store(proc, conn)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (n *WebSocketConnNode) produce(proc *process.Process) {
})

outPck := packet.New(outPayload)
packet.Write(outWriter, outPck)
packet.Send(outWriter, outPck)

proc.Wait()
proc.Exit(nil)
Expand All @@ -277,7 +277,7 @@ func (n *WebSocketConnNode) produce(proc *process.Process) {
})

outPck := packet.New(outPayload)
packet.Write(outWriter, outPck)
packet.Send(outWriter, outPck)

child.Wait()
child.Exit(nil)
Expand Down
24 changes: 12 additions & 12 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type Agent struct {
symbols map[uuid.UUID]*symbol.Symbol
processes map[uuid.UUID]*process.Process
frames map[uuid.UUID][]*Frame
inbounds map[uuid.UUID]map[string]port.Hook
outbounds map[uuid.UUID]map[string]port.Hook
inbounds map[uuid.UUID]map[string]port.OpenHook
outbounds map[uuid.UUID]map[string]port.OpenHook
watchers Watchers
mu sync.RWMutex
}
Expand All @@ -31,8 +31,8 @@ func New() *Agent {
symbols: make(map[uuid.UUID]*symbol.Symbol),
processes: make(map[uuid.UUID]*process.Process),
frames: make(map[uuid.UUID][]*Frame),
inbounds: make(map[uuid.UUID]map[string]port.Hook),
outbounds: make(map[uuid.UUID]map[string]port.Hook),
inbounds: make(map[uuid.UUID]map[string]port.OpenHook),
outbounds: make(map[uuid.UUID]map[string]port.OpenHook),
}
}

Expand Down Expand Up @@ -117,15 +117,15 @@ func (a *Agent) Load(sym *symbol.Symbol) error {
a.mu.Lock()
defer a.mu.Unlock()

inbounds := make(map[string]port.Hook)
outbounds := make(map[string]port.Hook)
inbounds := make(map[string]port.OpenHook)
outbounds := make(map[string]port.OpenHook)

a.symbols[sym.ID()] = sym
a.inbounds[sym.ID()] = inbounds
a.outbounds[sym.ID()] = outbounds

for name, in := range sym.Ins() {
hook := port.HookFunc(func(proc *process.Process) {
hook := port.OpenHookFunc(func(proc *process.Process) {
a.accept(proc)

inboundHook, outboundHook := a.hooks(proc, sym, in, nil)
Expand All @@ -135,12 +135,12 @@ func (a *Agent) Load(sym *symbol.Symbol) error {
reader.AddOutboundHook(outboundHook)
})

in.AddHook(hook)
in.AddOpenHook(hook)
inbounds[name] = hook
}

for name, out := range sym.Outs() {
hook := port.HookFunc(func(proc *process.Process) {
hook := port.OpenHookFunc(func(proc *process.Process) {
a.accept(proc)

inboundHook, outboundHook := a.hooks(proc, sym, nil, out)
Expand All @@ -150,7 +150,7 @@ func (a *Agent) Load(sym *symbol.Symbol) error {
writer.AddOutboundHook(outboundHook)
})

out.AddHook(hook)
out.AddOpenHook(hook)
outbounds[name] = hook
}

Expand All @@ -164,11 +164,11 @@ func (a *Agent) Unload(sym *symbol.Symbol) error {

for name, hook := range a.inbounds[sym.ID()] {
in := sym.In(name)
in.RemoveHook(hook)
in.RemoveOpenHook(hook)
}
for name, hook := range a.outbounds[sym.ID()] {
out := sym.Out(name)
out.RemoveHook(hook)
out.RemoveOpenHook(hook)
}

delete(a.inbounds, sym.ID())
Expand Down
10 changes: 5 additions & 5 deletions pkg/packet/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type Writer struct {
mu sync.Mutex
}

// Write sends a packet to the writer and returns the received packet or None if the write fails.
func Write(writer *Writer, pck *Packet) *Packet {
return WriteOrFallback(writer, pck, None)
// Send sends a packet to the writer and returns the received packet or None if the write fails.
func Send(writer *Writer, pck *Packet) *Packet {
return SendOrFallback(writer, pck, None)
}

// WriteOrFallback sends a packet to the writer and returns the received packet or a backup packet if the write fails.
func WriteOrFallback(writer *Writer, outPck *Packet, backPck *Packet) *Packet {
// SendOrFallback sends a packet to the writer and returns the received packet or a backup packet if the write fails.
func SendOrFallback(writer *Writer, outPck *Packet, backPck *Packet) *Packet {
if writer.Write(outPck) == 0 {
return backPck
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/packet/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestWrite(t *testing.T) {

outPck := New(nil)

backPck := Write(w, outPck)
backPck := Send(w, outPck)
assert.Equal(t, outPck, backPck)
}

Expand All @@ -53,7 +53,7 @@ func TestCallOrReturn(t *testing.T) {

outPck := New(nil)

backPck := WriteOrFallback(w, outPck, None)
backPck := SendOrFallback(w, outPck, None)
assert.Equal(t, outPck, backPck)
})

Expand All @@ -63,7 +63,7 @@ func TestCallOrReturn(t *testing.T) {

outPck := New(nil)

backPck := WriteOrFallback(w, outPck, None)
backPck := SendOrFallback(w, outPck, None)
assert.Equal(t, None, backPck)
})
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/port/closehook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package port

// CloseHook is an interface that defines a method for handling resource cleanup.
type CloseHook interface {
// Close performs the necessary cleanup actions.
Close()
}

// CloseHooks represents a slice of CloseHook interfaces, which are processed in reverse order when closed.
type CloseHooks []CloseHook

type closeHook struct {
close func()
}

var _ CloseHook = (CloseHooks)(nil)
var _ CloseHook = (*closeHook)(nil)

// CloseHookFunc creates a new CloseHook from the provided function.
func CloseHookFunc(fn func()) CloseHook {
return &closeHook{close: fn}
}

func (h CloseHooks) Close() {
for i := len(h) - 1; i >= 0; i-- {
hook := h[i]
hook.Close()
}
}

func (h *closeHook) Close() {
h.close()
}
35 changes: 0 additions & 35 deletions pkg/port/hook.go

This file was deleted.

73 changes: 53 additions & 20 deletions pkg/port/inport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

// InPort represents an input port used for receiving data.
type InPort struct {
readers map[*process.Process]*packet.Reader
hooks Hooks
listeners Listeners
mu sync.RWMutex
readers map[*process.Process]*packet.Reader
openHooks OpenHooks
closeHooks CloseHooks
listeners Listeners
mu sync.RWMutex
}

// NewIn creates and returns a new InPort instance.
Expand All @@ -22,28 +23,56 @@ func NewIn() *InPort {
}
}

// AddHook adds a hook to the port if it is not already present.
func (p *InPort) AddHook(hook Hook) bool {
// AddOpenHook adds a hook to the port if it is not already present.
func (p *InPort) AddOpenHook(hook OpenHook) bool {
p.mu.Lock()
defer p.mu.Unlock()

for _, h := range p.hooks {
for _, h := range p.openHooks {
if h == hook {
return false
}
}
p.hooks = append(p.hooks, hook)
p.openHooks = append(p.openHooks, hook)
return true
}

// RemoveHook removes a hook from the port if it ok.
func (p *InPort) RemoveHook(hook Hook) bool {
// RemoveOpenHook removes a hook from the port if it exists.
func (p *InPort) RemoveOpenHook(hook OpenHook) bool {
p.mu.Lock()
defer p.mu.Unlock()

for i, h := range p.hooks {
for i, h := range p.openHooks {
if h == hook {
p.hooks = append(p.hooks[:i], p.hooks[i+1:]...)
p.openHooks = append(p.openHooks[:i], p.openHooks[i+1:]...)
return true
}
}
return false
}

// AddCloseHook adds a close hook to the port if it is not already present.
func (p *InPort) AddCloseHook(hook CloseHook) bool {
p.mu.Lock()
defer p.mu.Unlock()

for _, h := range p.closeHooks {
if h == hook {
return false
}
}
p.closeHooks = append(p.closeHooks, hook)
return true
}

// RemoveCloseHook removes a close hook from the port if it exists.
func (p *InPort) RemoveCloseHook(hook CloseHook) bool {
p.mu.Lock()
defer p.mu.Unlock()

for i, h := range p.closeHooks {
if h == hook {
p.closeHooks = append(p.closeHooks[:i], p.closeHooks[i+1:]...)
return true
}
}
Expand Down Expand Up @@ -77,6 +106,11 @@ func (p *InPort) Open(proc *process.Process) *packet.Reader {
reader = packet.NewReader()
p.readers[proc] = reader

openHooks := p.openHooks
listeners := p.listeners

p.mu.Unlock()

proc.AddExitHook(process.ExitFunc(func(_ error) {
p.mu.Lock()
defer p.mu.Unlock()
Expand All @@ -85,26 +119,25 @@ func (p *InPort) Open(proc *process.Process) *packet.Reader {
reader.Close()
}))

hooks := p.hooks
listeners := p.listeners

p.mu.Unlock()

hooks.Open(proc)
openHooks.Open(proc)
listeners.Accept(proc)

return reader
}

// Close shuts down all readers associated with the input port and clears hooks and listeners.
// Close shuts down all readers associated with the input port and clears hooks, listeners, and processes close hooks.
func (p *InPort) Close() {
p.mu.Lock()
defer p.mu.Unlock()

p.closeHooks.Close()

for _, reader := range p.readers {
reader.Close()
}

p.readers = make(map[*process.Process]*packet.Reader)
p.hooks = nil
p.openHooks = nil
p.closeHooks = nil
p.listeners = nil
}
Loading

0 comments on commit 10f65ea

Please sign in to comment.