Skip to content

Commit

Permalink
Improves memory managment by limiting concurrent requests and reusing…
Browse files Browse the repository at this point in the history
… allocated buffers
  • Loading branch information
ksysoev committed Apr 15, 2024
1 parent 30a38f6 commit b6d11a5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
17 changes: 14 additions & 3 deletions channel/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ var (

// Conn is default implementation of Connection
type Conn struct {
ctx context.Context
ws *websocket.Conn
reqWG *sync.WaitGroup
ctx context.Context
onMessageCB wasabi.OnMessage
onClose chan<- string
ctxCancel context.CancelFunc
bufferPool *bufferPool
sem chan struct{}
id string
isClosed atomic.Bool
}
Expand All @@ -37,6 +39,8 @@ func NewConnection(
ws *websocket.Conn,
cb wasabi.OnMessage,
onClose chan<- string,
bufferPool *bufferPool,
concurrencyLimit uint,
) *Conn {
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -48,6 +52,8 @@ func NewConnection(
onMessageCB: cb,
onClose: onClose,
reqWG: &sync.WaitGroup{},
bufferPool: bufferPool,
sem: make(chan struct{}, concurrencyLimit),
}
}

Expand All @@ -66,13 +72,16 @@ func (c *Conn) HandleRequests() {
defer c.close()

for c.ctx.Err() == nil {
c.sem <- struct{}{}

buffer := c.bufferPool.get()
msgType, reader, err := c.ws.Reader(c.ctx)

if err != nil {
return
}

data, err := io.ReadAll(reader)
_, err = buffer.ReadFrom(reader)
if err != nil {
switch {
case errors.Is(err, io.EOF):
Expand All @@ -90,7 +99,9 @@ func (c *Conn) HandleRequests() {

go func(wg *sync.WaitGroup) {
defer wg.Done()
c.onMessageCB(c, msgType, data)
c.onMessageCB(c, msgType, buffer.Bytes())
c.bufferPool.put(buffer)
<-c.sem
}(c.reqWG)
}
}
Expand Down
44 changes: 38 additions & 6 deletions channel/connection_registry.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
package channel

import (
"bytes"
"context"
"sync"

"github.com/ksysoev/wasabi"
"nhooyr.io/websocket"
)

const (
DefaultConcurencyLimitPerConnection = 25
)

// DefaultConnectionRegistry is default implementation of ConnectionRegistry
type DefaultConnectionRegistry struct {
connections map[string]wasabi.Connection
onClose chan string
mu sync.RWMutex
connections map[string]wasabi.Connection
onClose chan string
bufferPool *bufferPool
concurrencyLimit uint
mu sync.RWMutex
}

// NewDefaultConnectionRegistry creates new instance of DefaultConnectionRegistry
func NewDefaultConnectionRegistry() *DefaultConnectionRegistry {
reg := &DefaultConnectionRegistry{
connections: make(map[string]wasabi.Connection),
onClose: make(chan string),
connections: make(map[string]wasabi.Connection),
onClose: make(chan string),
concurrencyLimit: DefaultConcurencyLimitPerConnection,
bufferPool: newBufferPool(),
}

go reg.handleClose()
Expand All @@ -36,7 +45,7 @@ func (r *DefaultConnectionRegistry) AddConnection(
r.mu.Lock()
defer r.mu.Unlock()

conn := NewConnection(ctx, ws, cb, r.onClose)
conn := NewConnection(ctx, ws, cb, r.onClose, r.bufferPool, r.concurrencyLimit)
r.connections[conn.ID()] = conn

return conn
Expand All @@ -58,3 +67,26 @@ func (r *DefaultConnectionRegistry) handleClose() {
r.mu.Unlock()
}
}

type bufferPool struct {
pool *sync.Pool
}

func newBufferPool() *bufferPool {
return &bufferPool{
pool: &sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}

func (p *bufferPool) get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}

func (p *bufferPool) put(b *bytes.Buffer) {
b.Reset()
p.pool.Put(b)
}
10 changes: 5 additions & 5 deletions channel/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var wsHandlerEcho = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request
func TestConn_ID(t *testing.T) {
ws := &websocket.Conn{}
onClose := make(chan string)
conn := NewConnection(context.Background(), ws, nil, onClose)
conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1)

if conn.ID() == "" {
t.Error("Expected connection ID to be non-empty")
Expand All @@ -58,7 +58,7 @@ func TestConn_ID(t *testing.T) {
func TestConn_Context(t *testing.T) {
ws := &websocket.Conn{}
onClose := make(chan string)
conn := NewConnection(context.Background(), ws, nil, onClose)
conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1)

if conn.Context() == nil {
t.Error("Expected connection context to be non-nil")
Expand All @@ -84,7 +84,7 @@ func TestConn_HandleRequests(t *testing.T) {
defer func() { _ = ws.CloseNow() }()

onClose := make(chan string)
conn := NewConnection(context.Background(), ws, nil, onClose)
conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1)

// Mock OnMessage callback
received := make(chan struct{})
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestConn_Send(t *testing.T) {
defer func() { _ = ws.CloseNow() }()

onClose := make(chan string)
conn := NewConnection(context.Background(), ws, nil, onClose)
conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1)

err = conn.Send(wasabi.MsgTypeText, []byte("test message"))
if err != nil {
Expand All @@ -149,7 +149,7 @@ func TestConn_close(t *testing.T) {
defer func() { _ = ws.CloseNow() }()

onClose := make(chan string)
conn := NewConnection(context.Background(), ws, nil, onClose)
conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1)

// Mock OnClose channel
closeChan := make(chan string)
Expand Down

0 comments on commit b6d11a5

Please sign in to comment.