diff --git a/channel/connection.go b/channel/connection.go index b8fceca..b51d814 100644 --- a/channel/connection.go +++ b/channel/connection.go @@ -21,12 +21,14 @@ var ( // Conn is default implementation of Connection type Conn struct { + ctx context.Context ws *websocket.Conn reqWG *sync.WaitGroup - ctx context.Context onMessageCB wasabi.OnMessage onClose chan<- string ctxCancel context.CancelFunc + bufferPool *bufferPool + sem chan struct{} id string isClosed atomic.Bool } @@ -37,6 +39,8 @@ func NewConnection( ws *websocket.Conn, cb wasabi.OnMessage, onClose chan<- string, + bufferPool *bufferPool, + concurrencyLimit uint, ) *Conn { ctx, cancel := context.WithCancel(ctx) @@ -48,6 +52,8 @@ func NewConnection( onMessageCB: cb, onClose: onClose, reqWG: &sync.WaitGroup{}, + bufferPool: bufferPool, + sem: make(chan struct{}, concurrencyLimit), } } @@ -66,13 +72,16 @@ func (c *Conn) HandleRequests() { defer c.close() for c.ctx.Err() == nil { + c.sem <- struct{}{} + + buffer := c.bufferPool.get() msgType, reader, err := c.ws.Reader(c.ctx) if err != nil { return } - data, err := io.ReadAll(reader) + _, err = buffer.ReadFrom(reader) if err != nil { switch { case errors.Is(err, io.EOF): @@ -90,7 +99,9 @@ func (c *Conn) HandleRequests() { go func(wg *sync.WaitGroup) { defer wg.Done() - c.onMessageCB(c, msgType, data) + c.onMessageCB(c, msgType, buffer.Bytes()) + c.bufferPool.put(buffer) + <-c.sem }(c.reqWG) } } diff --git a/channel/connection_registry.go b/channel/connection_registry.go index 7e8f043..934f433 100644 --- a/channel/connection_registry.go +++ b/channel/connection_registry.go @@ -1,6 +1,7 @@ package channel import ( + "bytes" "context" "sync" @@ -8,18 +9,26 @@ import ( "nhooyr.io/websocket" ) +const ( + DefaultConcurencyLimitPerConnection = 25 +) + // DefaultConnectionRegistry is default implementation of ConnectionRegistry type DefaultConnectionRegistry struct { - connections map[string]wasabi.Connection - onClose chan string - mu sync.RWMutex + connections map[string]wasabi.Connection + onClose chan string + bufferPool *bufferPool + concurrencyLimit uint + mu sync.RWMutex } // NewDefaultConnectionRegistry creates new instance of DefaultConnectionRegistry func NewDefaultConnectionRegistry() *DefaultConnectionRegistry { reg := &DefaultConnectionRegistry{ - connections: make(map[string]wasabi.Connection), - onClose: make(chan string), + connections: make(map[string]wasabi.Connection), + onClose: make(chan string), + concurrencyLimit: DefaultConcurencyLimitPerConnection, + bufferPool: newBufferPool(), } go reg.handleClose() @@ -36,7 +45,7 @@ func (r *DefaultConnectionRegistry) AddConnection( r.mu.Lock() defer r.mu.Unlock() - conn := NewConnection(ctx, ws, cb, r.onClose) + conn := NewConnection(ctx, ws, cb, r.onClose, r.bufferPool, r.concurrencyLimit) r.connections[conn.ID()] = conn return conn @@ -58,3 +67,26 @@ func (r *DefaultConnectionRegistry) handleClose() { r.mu.Unlock() } } + +type bufferPool struct { + pool *sync.Pool +} + +func newBufferPool() *bufferPool { + return &bufferPool{ + pool: &sync.Pool{ + New: func() interface{} { + return &bytes.Buffer{} + }, + }, + } +} + +func (p *bufferPool) get() *bytes.Buffer { + return p.pool.Get().(*bytes.Buffer) +} + +func (p *bufferPool) put(b *bytes.Buffer) { + b.Reset() + p.pool.Put(b) +} diff --git a/channel/connection_test.go b/channel/connection_test.go index a48e676..f380355 100644 --- a/channel/connection_test.go +++ b/channel/connection_test.go @@ -48,7 +48,7 @@ var wsHandlerEcho = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request func TestConn_ID(t *testing.T) { ws := &websocket.Conn{} onClose := make(chan string) - conn := NewConnection(context.Background(), ws, nil, onClose) + conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1) if conn.ID() == "" { t.Error("Expected connection ID to be non-empty") @@ -58,7 +58,7 @@ func TestConn_ID(t *testing.T) { func TestConn_Context(t *testing.T) { ws := &websocket.Conn{} onClose := make(chan string) - conn := NewConnection(context.Background(), ws, nil, onClose) + conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1) if conn.Context() == nil { t.Error("Expected connection context to be non-nil") @@ -84,7 +84,7 @@ func TestConn_HandleRequests(t *testing.T) { defer func() { _ = ws.CloseNow() }() onClose := make(chan string) - conn := NewConnection(context.Background(), ws, nil, onClose) + conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1) // Mock OnMessage callback received := make(chan struct{}) @@ -124,7 +124,7 @@ func TestConn_Send(t *testing.T) { defer func() { _ = ws.CloseNow() }() onClose := make(chan string) - conn := NewConnection(context.Background(), ws, nil, onClose) + conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1) err = conn.Send(wasabi.MsgTypeText, []byte("test message")) if err != nil { @@ -149,7 +149,7 @@ func TestConn_close(t *testing.T) { defer func() { _ = ws.CloseNow() }() onClose := make(chan string) - conn := NewConnection(context.Background(), ws, nil, onClose) + conn := NewConnection(context.Background(), ws, nil, onClose, newBufferPool(), 1) // Mock OnClose channel closeChan := make(chan string)