diff --git a/connection.go b/connection.go index 2ef9aba..50f7ae5 100644 --- a/connection.go +++ b/connection.go @@ -2,7 +2,8 @@ package wasabi import ( "context" - "fmt" + "errors" + "io" "log/slog" "sync" "sync/atomic" @@ -11,6 +12,11 @@ import ( "golang.org/x/net/websocket" ) +var ( + // ErrConnectionClosed is error for closed connections + ErrConnectionClosed = errors.New("connection is closed") +) + // ConnectionRegistry is interface for connection registries type ConnectionRegistry interface { AddConnection( @@ -89,6 +95,7 @@ type Conn struct { ctxCancel context.CancelFunc id string isClosed atomic.Bool + reqWG *sync.WaitGroup } // OnMessage is type for OnMessage callback @@ -110,6 +117,7 @@ func NewConnection( ctxCancel: cancel, onMessageCB: cb, onClose: onClose, + reqWG: &sync.WaitGroup{}, } } @@ -136,34 +144,27 @@ func (c *Conn) HandleRequests() { return } - if err.Error() == "EOF" { - slog.Debug("Connection closed") - c.close() - - return - } - - if err.Error() == "ErrFrameTooLarge" { - // Unexpectedkly large message received - // it's probably more safe to close connection - c.close() - + if errors.Is(err, io.EOF) || errors.Is(err, websocket.ErrFrameTooLarge) { return } - slog.Info("Error reading message: " + err.Error()) + slog.Warn("Error reading message: " + err.Error()) continue } - go c.onMessageCB(c, data) + c.reqWG.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + c.onMessageCB(c, data) + }(c.reqWG) } } // Send sends message to connection func (c *Conn) Send(msg []byte) error { if c.isClosed.Load() || c.ctx.Err() != nil { - return fmt.Errorf("connection is closed") + return ErrConnectionClosed } return websocket.Message.Send(c.ws, msg) @@ -180,4 +181,5 @@ func (c *Conn) close() { c.isClosed.Store(true) c.ws.Close() + c.reqWG.Wait() }