Skip to content

Commit

Permalink
improves error handling and go routing control to wait all request to…
Browse files Browse the repository at this point in the history
… finish before terminating connection goroutine
  • Loading branch information
ksysoev committed Apr 7, 2024
1 parent 8a7d9aa commit 12444e6
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package wasabi

import (
"context"
"fmt"
"errors"
"io"
"log/slog"
"sync"
"sync/atomic"
Expand All @@ -11,6 +12,11 @@ import (
"golang.org/x/net/websocket"
)

var (
// ErrConnectionClosed is error for closed connections
ErrConnectionClosed = errors.New("connection is closed")
)

// ConnectionRegistry is interface for connection registries
type ConnectionRegistry interface {
AddConnection(
Expand Down Expand Up @@ -89,6 +95,7 @@ type Conn struct {
ctxCancel context.CancelFunc
id string
isClosed atomic.Bool
reqWG *sync.WaitGroup
}

// OnMessage is type for OnMessage callback
Expand All @@ -110,6 +117,7 @@ func NewConnection(
ctxCancel: cancel,
onMessageCB: cb,
onClose: onClose,
reqWG: &sync.WaitGroup{},
}
}

Expand All @@ -136,34 +144,27 @@ func (c *Conn) HandleRequests() {
return
}

if err.Error() == "EOF" {
slog.Debug("Connection closed")
c.close()

return
}

if err.Error() == "ErrFrameTooLarge" {
// Unexpectedkly large message received
// it's probably more safe to close connection
c.close()

if errors.Is(err, io.EOF) || errors.Is(err, websocket.ErrFrameTooLarge) {
return
}

slog.Info("Error reading message: " + err.Error())
slog.Warn("Error reading message: " + err.Error())

continue
}

go c.onMessageCB(c, data)
c.reqWG.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
c.onMessageCB(c, data)
}(c.reqWG)
}
}

// Send sends message to connection
func (c *Conn) Send(msg []byte) error {
if c.isClosed.Load() || c.ctx.Err() != nil {
return fmt.Errorf("connection is closed")
return ErrConnectionClosed
}

return websocket.Message.Send(c.ws, msg)
Expand All @@ -180,4 +181,5 @@ func (c *Conn) close() {
c.isClosed.Store(true)

c.ws.Close()
c.reqWG.Wait()
}

0 comments on commit 12444e6

Please sign in to comment.