Skip to content

Commit

Permalink
Refactor HandlingRequest method on connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ksysoev committed Jun 2, 2024
1 parent f4ec718 commit f2c8be9
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 134 deletions.
4 changes: 1 addition & 3 deletions channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ func (c *Channel) wsConnectionHandler() http.Handler {
return
}

if conn := c.connRegistry.AddConnection(ctx, ws, c.disptacher.Dispatch); conn != nil {
conn.HandleRequests()
}
c.connRegistry.HandleConnection(ctx, ws, c.disptacher.Dispatch)
})
}

Expand Down
4 changes: 2 additions & 2 deletions channel/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (c *Conn) Context() context.Context {
return c.ctx
}

// HandleRequests handles incoming messages
func (c *Conn) HandleRequests() {
// handleRequests handles incoming messages
func (c *Conn) handleRequests() {
defer c.close()

for c.ctx.Err() == nil {
Expand Down
11 changes: 6 additions & 5 deletions channel/connection_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,22 @@ func NewConnectionRegistry(opts ...ConnectionRegistryOption) *ConnectionRegistry
}

// AddConnection adds new Websocket connection to registry
func (r *ConnectionRegistry) AddConnection(
func (r *ConnectionRegistry) HandleConnection(
ctx context.Context,
ws *websocket.Conn,
cb wasabi.OnMessage,
) wasabi.Connection {
) {
r.mu.Lock()
defer r.mu.Unlock()

if r.connectionLimit > 0 && len(r.connections) >= r.connectionLimit {
ws.Close(websocket.StatusTryAgainLater, "Connection limit reached")
return nil
return
}

if r.isClosed {
return nil
ws.Close(websocket.StatusServiceRestart, "Server is shutting down")
return
}

conn := NewConnection(ctx, ws, cb, r.onClose, r.bufferPool, r.concurrencyLimit, r.inActivityTimeout)
Expand All @@ -84,7 +85,7 @@ func (r *ConnectionRegistry) AddConnection(
r.onConnect(conn)
}

return conn
conn.handleRequests()
}

// CanAccept checks if the connection registry can accept new connections.
Expand Down
103 changes: 81 additions & 22 deletions channel/connection_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,58 @@ import (
"nhooyr.io/websocket"
)

func TestConnectionRegistry_AddConnection(t *testing.T) {
func TestConnectionRegistry_HandleConnection(t *testing.T) {
server := httptest.NewServer(wsHandlerEcho)
defer server.Close()
url := "ws://" + server.Listener.Addr().String()

ws, resp, err := websocket.Dial(context.Background(), url, nil)

if err != nil {
t.Error(err)
t.Errorf("Unexpected error dialing websocket: %v", err)
}

err = ws.Write(context.Background(), websocket.MessageText, []byte("test"))
if err != nil {
t.Errorf("Unexpected error writing to websocket: %v", err)
}

if resp.Body != nil {
resp.Body.Close()
}

ctx := context.Background()

cb := func(wasabi.Connection, wasabi.MessageType, []byte) {}
ready := make(chan struct{})
cb := func(wasabi.Connection, wasabi.MessageType, []byte) {
close(ready)
}

registry := NewConnectionRegistry()

conn := registry.AddConnection(ctx, ws, cb)
ctx, cancel := context.WithCancel(context.Background())

if conn == nil {
t.Error("Expected connection to be created")
done := make(chan struct{})
go func() {
registry.HandleConnection(ctx, ws, cb)
close(done)
}()

select {
case <-ready:
case <-time.After(1 * time.Second):
t.Error("Expected connection to be handled")
}

if _, ok := registry.connections[conn.ID()]; !ok {
if len(registry.connections) != 1 {
t.Error("Expected connection to be added to the registry")
}

cancel()

select {
case <-done:
case <-time.After(1 * time.Second):
t.Error("Expected connection to be closed")
}
}

func TestConnectionRegistry_AddConnection_ToClosedRegistry(t *testing.T) {
Expand All @@ -67,10 +89,16 @@ func TestConnectionRegistry_AddConnection_ToClosedRegistry(t *testing.T) {

cb := func(wasabi.Connection, wasabi.MessageType, []byte) {}

conn := registry.AddConnection(ctx, ws, cb)
done := make(chan struct{})
go func() {
registry.HandleConnection(ctx, ws, cb)
close(done)
}()

if conn != nil {
t.Error("Expected connection to be nil")
select {
case <-done:
case <-time.After(1 * time.Second):
t.Error("Expected connection to be closed")
}
}

Expand Down Expand Up @@ -181,6 +209,7 @@ func TestConnectionRegistry_WithInActivityTimeout(t *testing.T) {
t.Errorf("Unexpected inactivity timeout: got %s, expected %s", registry.inActivityTimeout, 5*time.Minute)
}
}

func TestConnectionRegistry_WithOnConnect(t *testing.T) {
registry := NewConnectionRegistry()

Expand Down Expand Up @@ -217,7 +246,20 @@ func TestConnectionRegistry_WithOnConnect(t *testing.T) {
t.Error("Expected onConnect callback to be set")
}

registry.AddConnection(context.Background(), ws, func(wasabi.Connection, wasabi.MessageType, []byte) {})
ctx, cancel := context.WithCancel(context.Background())
cancel()

done := make(chan struct{})
go func() {
registry.HandleConnection(ctx, ws, func(wasabi.Connection, wasabi.MessageType, []byte) {})
close(done)
}()

select {
case <-done:
case <-time.After(1 * time.Second):
t.Error("Expected connection to be closed")
}

if !executed {
t.Error("Expected onConnect callback to be executed")
Expand Down Expand Up @@ -260,11 +302,24 @@ func TestConnectionRegistry_WithOnDisconnectHook(t *testing.T) {
resp.Body.Close()
}

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
cancel()

cb := func(wasabi.Connection, wasabi.MessageType, []byte) {}
conn := registry.AddConnection(ctx, ws, cb)

registry.onClose <- conn.ID()
ready := make(chan struct{})

go func() {
registry.HandleConnection(ctx, ws, cb)
close(ready)
}()

select {
case <-ready:
case <-time.After(1 * time.Second):
t.Error("Expected connection to be handled")
}

close(registry.onClose)

select {
Expand Down Expand Up @@ -292,11 +347,9 @@ func TestConnectionRegistry_AddConnection_ConnectionLimitReached(t *testing.T) {
registry := NewConnectionRegistry(WithConnectionLimit(2))
conn1 := mocks.NewMockConnection(t)
conn2 := mocks.NewMockConnection(t)
conn3 := mocks.NewMockConnection(t)

conn1.EXPECT().ID().Return("conn1")
conn2.EXPECT().ID().Return("conn2")
conn3.EXPECT().ID().Return("conn3")

registry.connections[conn1.ID()] = conn1
registry.connections[conn2.ID()] = conn2
Expand All @@ -317,13 +370,19 @@ func TestConnectionRegistry_AddConnection_ConnectionLimitReached(t *testing.T) {
resp.Body.Close()
}

conn := registry.AddConnection(ctx, ws, cb)
done := make(chan struct{})
go func() {
registry.HandleConnection(ctx, ws, cb)
close(done)
}()

if conn != nil {
t.Error("Expected connection to be nil")
select {
case <-done:
case <-time.After(1 * time.Second):
t.Error("Expected connection to be handled")
}

if _, ok := registry.connections[conn3.ID()]; ok {
if len(registry.connections) != 2 {
t.Error("Expected connection to not be added to the registry")
}
}
Expand Down
4 changes: 2 additions & 2 deletions channel/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestConn_Context(t *testing.T) {
}
}

func TestConn_HandleRequests(t *testing.T) {
func TestConn_handleRequests(t *testing.T) {
server := httptest.NewServer(wsHandlerEcho)
defer server.Close()

Expand All @@ -91,7 +91,7 @@ func TestConn_HandleRequests(t *testing.T) {

conn.onMessageCB = func(c wasabi.Connection, msgType wasabi.MessageType, data []byte) { received <- struct{}{} }

go conn.HandleRequests()
go conn.handleRequests()

// Send message to trigger OnMessage callback
err = ws.Write(context.Background(), websocket.MessageText, []byte("test message"))
Expand Down
5 changes: 0 additions & 5 deletions channel/connection_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ func (cw *ConnectionWrapper) Context() context.Context {
return cw.connection.Context()
}

// HandleRequests handles incoming requests on the connection.
func (cw *ConnectionWrapper) HandleRequests() {
cw.connection.HandleRequests()
}

// Send sends a message of the specified type and content over the connection.
// If an onSendWrapper function is set, it will be called instead of directly sending the message.
// The onSendWrapper function should have the signature func(connection Connection, msgType MessageType, msg []byte) error.
Expand Down
10 changes: 0 additions & 10 deletions channel/connection_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ func TestConnectionWrapper_Context(t *testing.T) {
assert.Equal(t, expectedContext, actualContext)
}

func TestConnectionWrapper_HandleRequests(t *testing.T) {
mockConnection := mocks.NewMockConnection(t)
wrapper := NewConnectionWrapper(mockConnection)

mockConnection.On("HandleRequests").Once()

wrapper.HandleRequests()

mockConnection.AssertExpectations(t)
}
func TestConnectionWrapper_Send_WithOnSendWrapper(t *testing.T) {
mockConnection := mocks.NewMockConnection(t)
wrapper := NewConnectionWrapper(mockConnection)
Expand Down
5 changes: 2 additions & 3 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type Connection interface {
Send(msgType MessageType, msg []byte) error
Context() context.Context
ID() string
HandleRequests()
Close(status websocket.StatusCode, reason string, closingCtx ...context.Context) error
}

Expand All @@ -52,11 +51,11 @@ type Channel interface {

// ConnectionRegistry is interface for connection registries
type ConnectionRegistry interface {
AddConnection(
HandleConnection(
ctx context.Context,
ws *websocket.Conn,
cb OnMessage,
) Connection
)
GetConnection(id string) Connection
Close(ctx ...context.Context) error
CanAccept() bool
Expand Down
32 changes: 0 additions & 32 deletions mocks/mock_Connection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f2c8be9

Please sign in to comment.