Skip to content

Commit

Permalink
Remove WriteTo from recvBuffer to prevent blocking on external Writer.
Browse files Browse the repository at this point in the history
Fixes #229
  • Loading branch information
cbeuw committed Nov 12, 2023
1 parent fcb600e commit eca5f13
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 143 deletions.
48 changes: 0 additions & 48 deletions internal/multiplex/datagramBufferedPipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,46 +66,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
return dataLen, nil
}

func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
for {
if d.closed && len(d.pLens) == 0 {
return 0, io.EOF
}

hasRDeadline := !d.rDeadline.IsZero()
if hasRDeadline {
if time.Until(d.rDeadline) <= 0 {
return 0, ErrTimeout
}
}

if len(d.pLens) > 0 {
var dataLen int
dataLen, d.pLens = d.pLens[0], d.pLens[1:]
written, er := w.Write(d.buf.Next(dataLen))
n += int64(written)
if er != nil {
d.rwCond.Broadcast()
return n, er
}
d.rwCond.Broadcast()
} else {
if d.wtTimeout == 0 {
if hasRDeadline {
d.broadcastAfter(time.Until(d.rDeadline))
}
} else {
d.rDeadline = time.Now().Add(d.wtTimeout)
d.broadcastAfter(d.wtTimeout)
}

d.rwCond.Wait()
}
}
}

func (d *datagramBufferedPipe) Write(f *Frame) (toBeClosed bool, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
Expand Down Expand Up @@ -151,14 +111,6 @@ func (d *datagramBufferedPipe) SetReadDeadline(t time.Time) {
d.rwCond.Broadcast()
}

func (d *datagramBufferedPipe) SetWriteToTimeout(t time.Duration) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()

d.wtTimeout = t
d.rwCond.Broadcast()
}

func (d *datagramBufferedPipe) broadcastAfter(t time.Duration) {
if d.timeoutTimer != nil {
d.timeoutTimer.Stop()
Expand Down
4 changes: 0 additions & 4 deletions internal/multiplex/recvBuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ type recvBuffer interface {
// Instead, it should behave as if it hasn't been closed. Closure is only relevant
// when the buffer is empty.
io.ReadCloser
io.WriterTo
Write(*Frame) (toBeClosed bool, err error)
SetReadDeadline(time time.Time)
// SetWriteToTimeout sets the duration a recvBuffer waits in a WriteTo call when nothing
// has been written for a while. After that duration it should return ErrTimeout
SetWriteToTimeout(d time.Duration)
}

// size we want the amount of unread data in buffer to grow before recvBuffer.Write blocks.
Expand Down
2 changes: 1 addition & 1 deletion internal/multiplex/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func BenchmarkRecvDataFromRemote(b *testing.B) {

go func() {
stream, _ := sesh.Accept()
stream.(*Stream).WriteTo(ioutil.Discard)
io.Copy(ioutil.Discard, stream)
}()

binaryFrames := [maxIter][]byte{}
Expand Down
12 changes: 0 additions & 12 deletions internal/multiplex/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,6 @@ func (s *Stream) Read(buf []byte) (n int, err error) {
return
}

// WriteTo continuously write data Stream has received into the writer w.
func (s *Stream) WriteTo(w io.Writer) (int64, error) {
// will keep writing until the underlying buffer is closed
n, err := s.recvBuf.WriteTo(w)
log.Tracef("%v read from stream %v with err %v", n, s.id, err)
if err == io.EOF {
return n, ErrBrokenStream
}
return n, nil
}

func (s *Stream) obfuscateAndSend(buf []byte, payloadOffsetInBuf int) error {
cipherTextLen, err := s.session.obfuscate(&s.writingFrame, buf, payloadOffsetInBuf)
s.writingFrame.Seq++
Expand Down Expand Up @@ -210,7 +199,6 @@ func (s *Stream) Close() error {
func (s *Stream) LocalAddr() net.Addr { return s.session.addrs.Load().([]net.Addr)[0] }
func (s *Stream) RemoteAddr() net.Addr { return s.session.addrs.Load().([]net.Addr)[1] }

func (s *Stream) SetWriteToTimeout(d time.Duration) { s.recvBuf.SetWriteToTimeout(d) }
func (s *Stream) SetReadDeadline(t time.Time) error { s.recvBuf.SetReadDeadline(t); return nil }
func (s *Stream) SetReadFromTimeout(d time.Duration) { s.readFromTimeout = d }

Expand Down
8 changes: 1 addition & 7 deletions internal/multiplex/streamBuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package multiplex
import (
"container/heap"
"fmt"
"io"
"sync"
"time"
)
Expand Down Expand Up @@ -102,16 +101,11 @@ func (sb *streamBuffer) Read(buf []byte) (int, error) {
return sb.buf.Read(buf)
}

func (sb *streamBuffer) WriteTo(w io.Writer) (int64, error) {
return sb.buf.WriteTo(w)
}

func (sb *streamBuffer) Close() error {
sb.recvM.Lock()
defer sb.recvM.Unlock()

return sb.buf.Close()
}

func (sb *streamBuffer) SetReadDeadline(t time.Time) { sb.buf.SetReadDeadline(t) }
func (sb *streamBuffer) SetWriteToTimeout(d time.Duration) { sb.buf.SetWriteToTimeout(d) }
func (sb *streamBuffer) SetReadDeadline(t time.Time) { sb.buf.SetReadDeadline(t) }
45 changes: 0 additions & 45 deletions internal/multiplex/streamBufferedPipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,43 +58,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
return n, err
}

func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
for {
if p.closed && p.buf.Len() == 0 {
return 0, io.EOF
}

hasRDeadline := !p.rDeadline.IsZero()
if hasRDeadline {
if time.Until(p.rDeadline) <= 0 {
return 0, ErrTimeout
}
}
if p.buf.Len() > 0 {
written, er := p.buf.WriteTo(w)
n += written
if er != nil {
p.rwCond.Broadcast()
return n, er
}
p.rwCond.Broadcast()
} else {
if p.wtTimeout == 0 {
if hasRDeadline {
p.broadcastAfter(time.Until(p.rDeadline))
}
} else {
p.rDeadline = time.Now().Add(p.wtTimeout)
p.broadcastAfter(p.wtTimeout)
}

p.rwCond.Wait()
}
}
}

func (p *streamBufferedPipe) Write(input []byte) (int, error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
Expand Down Expand Up @@ -131,14 +94,6 @@ func (p *streamBufferedPipe) SetReadDeadline(t time.Time) {
p.rwCond.Broadcast()
}

func (p *streamBufferedPipe) SetWriteToTimeout(d time.Duration) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()

p.wtTimeout = d
p.rwCond.Broadcast()
}

func (p *streamBufferedPipe) broadcastAfter(d time.Duration) {
if p.timeoutTimer != nil {
p.timeoutTimer.Stop()
Expand Down
26 changes: 0 additions & 26 deletions internal/multiplex/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multiplex
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -364,31 +363,6 @@ func TestStream_Read(t *testing.T) {
}
}

func TestStream_SetWriteToTimeout(t *testing.T) {
seshes := map[string]*Session{
"ordered": setupSesh(false, emptyKey, EncryptionMethodPlain),
"unordered": setupSesh(true, emptyKey, EncryptionMethodPlain),
}
for name, sesh := range seshes {
t.Run(name, func(t *testing.T) {
stream, _ := sesh.OpenStream()
stream.SetWriteToTimeout(100 * time.Millisecond)
done := make(chan struct{})
go func() {
stream.WriteTo(ioutil.Discard)
done <- struct{}{}
}()

select {
case <-done:
return
case <-time.After(500 * time.Millisecond):
t.Error("didn't timeout")
}
})
}
}

func TestStream_SetReadFromTimeout(t *testing.T) {
seshes := map[string]*Session{
"ordered": setupSesh(false, emptyKey, EncryptionMethodPlain),
Expand Down

0 comments on commit eca5f13

Please sign in to comment.