Skip to content

Commit ec71a4d

Browse files
committed
fix: more minimal lock
1 parent e4e4320 commit ec71a4d

File tree

13 files changed

+177
-141
lines changed

13 files changed

+177
-141
lines changed

ext/pkg/io/sql.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ func (n *SQLNode) action(proc *process.Process, inPck *packet.Packet) (*packet.P
109109

110110
proc.AddExitHook(process.ExitFunc(func(err error) {
111111
if err != nil {
112-
tx.Rollback()
112+
_ = tx.Rollback()
113113
} else {
114-
tx.Commit()
114+
_ = tx.Commit()
115115
}
116116
}))
117117

ext/pkg/language/javascript/compiler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ func NewCompiler(options ...api.TransformOptions) language.Compiler {
6868
},
6969
}
7070

71-
return language.RunFunc(func(ctx context.Context, args []any) ([]any, error) {
71+
return language.RunFunc(func(ctx context.Context, args []any) (_ []any, err error) {
72+
defer func() { err, _ = recover().(error) }()
73+
7274
vm := vms.Get().(*goja.Runtime)
7375
defer vms.Put(vm)
7476

ext/pkg/network/listener.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,6 @@ func (n *HTTPListenNode) Shutdown() error {
181181

182182
// ServeHTTP handles HTTP requests.
183183
func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
184-
n.mu.RLock()
185-
defer n.mu.RUnlock()
186-
187184
proc := process.New()
188185

189186
proc.Store(KeyHTTPResponseWriter, w)

pkg/node/manytoone.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,17 @@ func (n *ManyToOneNode) Close() error {
9191
}
9292

9393
func (n *ManyToOneNode) forward(index int) port.Listener {
94-
return port.ListenFunc(func(proc *process.Process) {
95-
n.mu.RLock()
96-
defer n.mu.RUnlock()
94+
inPort := n.inPorts[index]
9795

98-
inReader := n.inPorts[index].Open(proc)
96+
return port.ListenFunc(func(proc *process.Process) {
97+
inReader := inPort.Open(proc)
9998
var outWriter *packet.Writer
10099
var errWriter *packet.Writer
101100

102101
readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
102+
n.mu.RLock()
103+
defer n.mu.RUnlock()
104+
103105
inReaders := make([]*packet.Reader, len(n.inPorts))
104106
for i, inPort := range n.inPorts {
105107
inReaders[i] = inPort.Open(proc)
@@ -110,19 +112,20 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
110112
for inPck := range inReader.Read() {
111113
n.tracer.Read(inReader, inPck)
112114

113-
if outWriter == nil {
114-
outWriter = n.outPort.Open(proc)
115-
}
116-
if errWriter == nil {
117-
errWriter = n.errPort.Open(proc)
118-
}
119-
120115
if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
121116
n.tracer.Reduce(inPck)
122117
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
118+
if errWriter == nil {
119+
errWriter = n.errPort.Open(proc)
120+
}
121+
123122
n.tracer.Transform(inPck, errPck)
124123
n.tracer.Write(errWriter, errPck)
125124
} else if outPck != nil {
125+
if outWriter == nil {
126+
outWriter = n.outPort.Open(proc)
127+
}
128+
126129
n.tracer.Transform(inPck, outPck)
127130
n.tracer.Write(outWriter, outPck)
128131
} else {

pkg/node/onetomany.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,17 @@ func (n *OneToManyNode) forward(proc *process.Process) {
9494
defer n.mu.RUnlock()
9595

9696
inReader := n.inPort.Open(proc)
97-
outWriters := make([]*packet.Writer, 0, len(n.outPorts))
97+
outWriters := make([]*packet.Writer, len(n.outPorts))
9898
var errWriter *packet.Writer
9999

100100
for inPck := range inReader.Read() {
101101
n.tracer.Read(inReader, inPck)
102102

103-
if len(outWriters) == 0 {
104-
for _, outPort := range n.outPorts {
105-
outWriters = append(outWriters, outPort.Open(proc))
103+
if outPcks, errPck := n.action(proc, inPck); errPck != nil {
104+
if errWriter == nil {
105+
errWriter = n.errPort.Open(proc)
106106
}
107-
}
108-
if errWriter == nil {
109-
errWriter = n.errPort.Open(proc)
110-
}
111107

112-
if outPcks, errPck := n.action(proc, inPck); errPck != nil {
113108
n.tracer.Transform(inPck, errPck)
114109
n.tracer.Write(errWriter, errPck)
115110
} else {
@@ -122,6 +117,10 @@ func (n *OneToManyNode) forward(proc *process.Process) {
122117
count := 0
123118
for i, outPck := range outPcks {
124119
if i < len(outWriters) && outPck != nil {
120+
if outWriters[i] == nil {
121+
outWriters[i] = n.outPorts[i].Open(proc)
122+
}
123+
125124
n.tracer.Write(outWriters[i], outPck)
126125
count++
127126
}
@@ -135,12 +134,9 @@ func (n *OneToManyNode) forward(proc *process.Process) {
135134
}
136135

137136
func (n *OneToManyNode) backward(index int) port.Listener {
138-
return port.ListenFunc(func(proc *process.Process) {
139-
n.mu.RLock()
140-
defer n.mu.RUnlock()
141-
142-
outPort := n.outPorts[index]
137+
outPort := n.outPorts[index]
143138

139+
return port.ListenFunc(func(proc *process.Process) {
144140
outWriter := outPort.Open(proc)
145141

146142
for backPck := range outWriter.Receive() {

pkg/node/onetoone.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,18 @@ func (n *OneToOneNode) forward(proc *process.Process) {
7575
for inPck := range inReader.Read() {
7676
n.tracer.Read(inReader, inPck)
7777

78-
if outWriter == nil {
79-
outWriter = n.outPort.Open(proc)
80-
}
81-
if errWriter == nil {
82-
errWriter = n.errPort.Open(proc)
83-
}
84-
8578
if outPck, errPck := n.action(proc, inPck); errPck != nil {
79+
if errWriter == nil {
80+
errWriter = n.errPort.Open(proc)
81+
}
82+
8683
n.tracer.Transform(inPck, errPck)
8784
n.tracer.Write(errWriter, errPck)
8885
} else {
86+
if outWriter == nil {
87+
outWriter = n.outPort.Open(proc)
88+
}
89+
8990
n.tracer.Transform(inPck, outPck)
9091
n.tracer.Write(outWriter, outPck)
9192
}

pkg/packet/reader.go

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Reader struct {
1111
writers []*Writer
1212
in chan *Packet
1313
out chan *Packet
14-
done chan struct{}
14+
done bool
1515
inbounds Hooks
1616
outbounds Hooks
1717
mu sync.Mutex
@@ -20,21 +20,18 @@ type Reader struct {
2020
// NewReader creates a new Reader instance and starts its processing loop.
2121
func NewReader() *Reader {
2222
r := &Reader{
23-
in: make(chan *Packet),
24-
out: make(chan *Packet),
25-
done: make(chan struct{}),
23+
in: make(chan *Packet),
24+
out: make(chan *Packet),
2625
}
2726

2827
go func() {
2928
defer close(r.out)
30-
defer close(r.in)
3129

3230
buffer := make([]*Packet, 0, 2)
3331
for {
3432
var pck *Packet
35-
select {
36-
case pck = <-r.in:
37-
case <-r.done:
33+
var ok bool
34+
if pck, ok = <-r.in; !ok {
3835
return
3936
}
4037

@@ -63,37 +60,35 @@ func (r *Reader) AddInboundHook(hook Hook) bool {
6360
r.mu.Lock()
6461
defer r.mu.Unlock()
6562

66-
select {
67-
case <-r.done:
63+
if r.done {
6864
return false
69-
default:
70-
for _, h := range r.inbounds {
71-
if h == hook {
72-
return false
73-
}
65+
}
66+
67+
for _, h := range r.inbounds {
68+
if h == hook {
69+
return false
7470
}
75-
r.inbounds = append(r.inbounds, hook)
76-
return true
7771
}
72+
r.inbounds = append(r.inbounds, hook)
73+
return true
7874
}
7975

8076
// AddOutboundHook adds a handler to process outbound packets.
8177
func (r *Reader) AddOutboundHook(hook Hook) bool {
8278
r.mu.Lock()
8379
defer r.mu.Unlock()
8480

85-
select {
86-
case <-r.done:
81+
if r.done {
8782
return false
88-
default:
89-
for _, h := range r.outbounds {
90-
if h == hook {
91-
return false
92-
}
83+
}
84+
85+
for _, h := range r.outbounds {
86+
if h == hook {
87+
return false
9388
}
94-
r.outbounds = append(r.outbounds, hook)
95-
return true
9689
}
90+
r.outbounds = append(r.outbounds, hook)
91+
return true
9792
}
9893

9994
// Read returns the channel for reading packets from the reader.
@@ -125,10 +120,8 @@ func (r *Reader) Close() {
125120
r.mu.Lock()
126121
defer r.mu.Unlock()
127122

128-
select {
129-
case <-r.done:
123+
if r.done {
130124
return
131-
default:
132125
}
133126

134127
pck := New(types.NewError(ErrDroppedPacket))
@@ -137,8 +130,9 @@ func (r *Reader) Close() {
137130
go w.receive(pck, r)
138131
}
139132

140-
close(r.done)
133+
close(r.in)
141134

135+
r.done = true
142136
r.writers = nil
143137
r.inbounds = nil
144138
r.outbounds = nil
@@ -148,10 +142,8 @@ func (r *Reader) write(pck *Packet, writer *Writer) bool {
148142
r.mu.Lock()
149143
defer r.mu.Unlock()
150144

151-
select {
152-
case <-r.done:
145+
if r.done {
153146
return false
154-
default:
155147
}
156148

157149
r.writers = append(r.writers, writer)

0 commit comments

Comments
 (0)