Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Refactor HandleRequests method on connection" #65

Merged
merged 2 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ func (c *Channel) wsConnectionHandler() http.Handler {
return
}

if conn := c.connRegistry.AddConnection(ctx, ws, c.disptacher.Dispatch); conn != nil {
conn.HandleRequests()
}
c.connRegistry.HandleConnection(ctx, ws, c.disptacher.Dispatch)
})
}

Expand Down
13 changes: 4 additions & 9 deletions channel/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type Conn struct {
ws *websocket.Conn
reqWG *sync.WaitGroup
onMessageCB wasabi.OnMessage
onClose chan<- string
ctxCancel context.CancelFunc
bufferPool *bufferPool
state *atomic.Int32
Expand All @@ -50,7 +49,6 @@ func NewConnection(
ctx context.Context,
ws *websocket.Conn,
cb wasabi.OnMessage,
onClose chan<- string,
bufferPool *bufferPool,
concurrencyLimit uint,
inActivityTimeout time.Duration,
Expand All @@ -65,7 +63,6 @@ func NewConnection(
ctx: ctx,
ctxCancel: cancel,
onMessageCB: cb,
onClose: onClose,
reqWG: &sync.WaitGroup{},
state: &state,
bufferPool: bufferPool,
Expand All @@ -91,8 +88,8 @@ func (c *Conn) Context() context.Context {
return c.ctx
}

// HandleRequests handles incoming messages
func (c *Conn) HandleRequests() {
// handleRequests handles incoming messages
func (c *Conn) handleRequests() {
defer c.close()

for c.ctx.Err() == nil {
Expand Down Expand Up @@ -153,7 +150,7 @@ func (c *Conn) Send(msgType wasabi.MessageType, msg []byte) error {
}

// close closes the connection.
// It cancels the context, sends the connection ID to the onClose channel,
// It cancels the context
// marks the connection as closed, and waits for any pending requests to complete.
func (c *Conn) close() {
if !c.state.CompareAndSwap(int32(connected), int32(terminated)) &&
Expand All @@ -162,7 +159,6 @@ func (c *Conn) close() {
}

c.ctxCancel()
c.onClose <- c.id

// Terminate the connection immediately.
_ = c.ws.CloseNow()
Expand All @@ -177,7 +173,7 @@ func (c *Conn) close() {
// before closing the connection. If the context is canceled, the connection
// is closed immediately. If there are no pending requests, the connection is
// closed immediately. After closing the connection, the connection state is
// set to terminated and the `onClose` channel is notified with the connection ID.
// set to terminated
func (c *Conn) Close(status websocket.StatusCode, reason string, ctx ...context.Context) error {
if !c.state.CompareAndSwap(int32(connected), int32(closing)) {
return ErrConnectionClosed
Expand All @@ -202,7 +198,6 @@ func (c *Conn) Close(status websocket.StatusCode, reason string, ctx ...context.

c.ctxCancel()
c.state.Store(int32(terminated))
c.onClose <- c.id

return nil
}
Expand Down
74 changes: 29 additions & 45 deletions channel/connection_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package channel

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -22,7 +21,6 @@ type ConnectionHook func(wasabi.Connection)
// ConnectionRegistry is default implementation of ConnectionRegistry
type ConnectionRegistry struct {
connections map[string]wasabi.Connection
onClose chan string
bufferPool *bufferPool
onConnect ConnectionHook
onDisconnect ConnectionHook
Expand All @@ -40,7 +38,6 @@ type ConnectionRegistryOption func(*ConnectionRegistry)
func NewConnectionRegistry(opts ...ConnectionRegistryOption) *ConnectionRegistry {
reg := &ConnectionRegistry{
connections: make(map[string]wasabi.Connection),
onClose: make(chan string),
concurrencyLimit: concurencyLimitPerConnection,
bufferPool: newBufferPool(),
frameSizeLimit: frameSizeLimitInBytes,
Expand All @@ -52,55 +49,65 @@ func NewConnectionRegistry(opts ...ConnectionRegistryOption) *ConnectionRegistry
opt(reg)
}

go reg.handleClose()

return reg
}

// AddConnection adds new Websocket connection to registry
func (r *ConnectionRegistry) AddConnection(
func (r *ConnectionRegistry) HandleConnection(
ctx context.Context,
ws *websocket.Conn,
cb wasabi.OnMessage,
) wasabi.Connection {
r.mu.Lock()
defer r.mu.Unlock()
) {
r.mu.RLock()
isClosed := r.isClosed
numOfConnections := len(r.connections)
r.mu.RUnlock()

if r.connectionLimit > 0 && len(r.connections) >= r.connectionLimit {
ws.Close(websocket.StatusTryAgainLater, "Connection limit reached")
return nil
if isClosed {
ws.Close(websocket.StatusServiceRestart, "Server is shutting down")
return
}

if r.isClosed {
return nil
if r.connectionLimit > 0 && numOfConnections >= r.connectionLimit {
ws.Close(websocket.StatusTryAgainLater, "Connection limit reached")
return
}

conn := NewConnection(ctx, ws, cb, r.onClose, r.bufferPool, r.concurrencyLimit, r.inActivityTimeout)
r.connections[conn.ID()] = conn

conn := NewConnection(ctx, ws, cb, r.bufferPool, r.concurrencyLimit, r.inActivityTimeout)
conn.ws.SetReadLimit(r.frameSizeLimit)

id := conn.ID()

r.mu.Lock()
r.connections[id] = conn
r.mu.Unlock()

if r.onConnect != nil {
r.onConnect(conn)
}

return conn
conn.handleRequests()

r.mu.Lock()
connection := r.connections[id]
delete(r.connections, id)
r.mu.Unlock()

if r.onDisconnect != nil {
r.onDisconnect(connection)
}
}

// CanAccept checks if the connection registry can accept new connections.
// It returns true if the registry can accept new connections, and false otherwise.
func (r *ConnectionRegistry) CanAccept() bool {
fmt.Println("Connection limit", r.connectionLimit)

if r.connectionLimit <= 0 {
return true
}

r.mu.RLock()
defer r.mu.RUnlock()

fmt.Println("Connections", len(r.connections))

return len(r.connections) < r.connectionLimit
}

Expand All @@ -112,29 +119,6 @@ func (r *ConnectionRegistry) GetConnection(id string) wasabi.Connection {
return r.connections[id]
}

// handleClose handles connection cloasures and removes them from registry
func (r *ConnectionRegistry) handleClose() {
wg := sync.WaitGroup{}

for id := range r.onClose {
r.mu.Lock()
connection := r.connections[id]
delete(r.connections, id)
r.mu.Unlock()

if r.onDisconnect != nil {
wg.Add(1)

go func() {
defer wg.Done()
r.onDisconnect(connection)
}()
}
}

wg.Wait()
}

// Shutdown closes all connections in the ConnectionRegistry.
// It sets the isClosed flag to true, indicating that the registry is closed.
// It then iterates over all connections, closes them with the given context,
Expand Down
Loading
Loading