From f0a4bd6b2c21bf68c39f5d3586996264539bf2cb Mon Sep 17 00:00:00 2001 From: Kirill Sysoev Date: Sat, 13 Apr 2024 19:28:51 +0800 Subject: [PATCH] Expose message type to Wasabi components --- backend/http.go | 2 +- backend/http_test.go | 2 +- channel/connection.go | 8 ++++---- channel/connection_registry_test.go | 2 +- channel/connection_test.go | 4 ++-- dispatch/common.go | 2 +- dispatch/pipe_dispatcher.go | 4 ++-- dispatch/pipe_dispatcher_test.go | 4 ++-- dispatch/request.go | 18 +++++++++++----- dispatch/request_test.go | 12 ++++++----- dispatch/router_dipatcher.go | 4 ++-- dispatch/router_dipatcher_test.go | 32 ++++++++++++++++++----------- examples/http_backend/main.go | 2 +- interfaces.go | 15 +++++++++++--- mocks/mock_Connection.go | 23 ++++++++++++--------- mocks/mock_Dispatcher.go | 19 +++++++++-------- 16 files changed, 93 insertions(+), 60 deletions(-) diff --git a/backend/http.go b/backend/http.go index e8a30e6..eb0192c 100644 --- a/backend/http.go +++ b/backend/http.go @@ -62,7 +62,7 @@ func (b *HTTPBackend) Handle(conn wasabi.Connection, r wasabi.Request) error { return err } - return conn.Send(respBody.Bytes()) + return conn.Send(wasabi.MsgTypeText, respBody.Bytes()) } func WithDefaultHTTPTimeout(timeout time.Duration) HTTPBackendOption { diff --git a/backend/http_test.go b/backend/http_test.go index faf6420..c5647b9 100644 --- a/backend/http_test.go +++ b/backend/http_test.go @@ -41,7 +41,7 @@ func TestHTTPBackend_Handle(t *testing.T) { mockReq.EXPECT().Context().Return(context.Background()) - mockConn.EXPECT().Send([]byte("OK")).Return(nil) + mockConn.EXPECT().Send(wasabi.MsgTypeText, []byte("OK")).Return(nil) mockReq.EXPECT().Data().Return([]byte("test request")) backend := NewBackend(func(req wasabi.Request) (*http.Request, error) { diff --git a/channel/connection.go b/channel/connection.go index a2f5ec9..b874857 100644 --- a/channel/connection.go +++ b/channel/connection.go @@ -66,7 +66,7 @@ func (c *Conn) HandleRequests() { defer c.close() for c.ctx.Err() == nil { - _, reader, err := c.ws.Reader(c.ctx) + msgType, reader, err := c.ws.Reader(c.ctx) if err != nil { slog.Warn("Error reading message: " + err.Error()) @@ -86,18 +86,18 @@ func (c *Conn) HandleRequests() { go func(wg *sync.WaitGroup) { defer wg.Done() - c.onMessageCB(c, data) + c.onMessageCB(c, msgType, data) }(c.reqWG) } } // Send sends message to connection -func (c *Conn) Send(msg []byte) error { +func (c *Conn) Send(msgType wasabi.MessageType, msg []byte) error { if c.isClosed.Load() || c.ctx.Err() != nil { return ErrConnectionClosed } - return c.ws.Write(c.ctx, websocket.MessageText, msg) + return c.ws.Write(c.ctx, msgType, msg) } // close closes the connection. diff --git a/channel/connection_registry_test.go b/channel/connection_registry_test.go index b49bbfa..73c9791 100644 --- a/channel/connection_registry_test.go +++ b/channel/connection_registry_test.go @@ -13,7 +13,7 @@ import ( func TestDefaultConnectionRegistry_AddConnection(t *testing.T) { ctx := context.Background() ws := &websocket.Conn{} - cb := func(wasabi.Connection, []byte) {} + cb := func(wasabi.Connection, wasabi.MessageType, []byte) {} registry := NewDefaultConnectionRegistry() diff --git a/channel/connection_test.go b/channel/connection_test.go index 4a090e7..a48e676 100644 --- a/channel/connection_test.go +++ b/channel/connection_test.go @@ -89,7 +89,7 @@ func TestConn_HandleRequests(t *testing.T) { // Mock OnMessage callback received := make(chan struct{}) - conn.onMessageCB = func(c wasabi.Connection, data []byte) { received <- struct{}{} } + conn.onMessageCB = func(c wasabi.Connection, msgType wasabi.MessageType, data []byte) { received <- struct{}{} } go conn.HandleRequests() @@ -126,7 +126,7 @@ func TestConn_Send(t *testing.T) { onClose := make(chan string) conn := NewConnection(context.Background(), ws, nil, onClose) - err = conn.Send([]byte("test message")) + err = conn.Send(wasabi.MsgTypeText, []byte("test message")) if err != nil { t.Errorf("Unexpected error sending message: %v", err) } diff --git a/dispatch/common.go b/dispatch/common.go index 312b5b5..b4b8d30 100644 --- a/dispatch/common.go +++ b/dispatch/common.go @@ -13,4 +13,4 @@ func (f RequestHandlerFunc) Handle(conn wasabi.Connection, req wasabi.Request) e return f(conn, req) } -type RequestParser func(conn wasabi.Connection, data []byte) wasabi.Request +type RequestParser func(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) wasabi.Request diff --git a/dispatch/pipe_dispatcher.go b/dispatch/pipe_dispatcher.go index 4557a1c..75f0be0 100644 --- a/dispatch/pipe_dispatcher.go +++ b/dispatch/pipe_dispatcher.go @@ -19,8 +19,8 @@ func NewPipeDispatcher(backend wasabi.Backend) *PipeDispatcher { } // Dispatch dispatches request to backend -func (d *PipeDispatcher) Dispatch(conn wasabi.Connection, data []byte) { - req := NewRawRequest(conn.Context(), data) +func (d *PipeDispatcher) Dispatch(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) { + req := NewRawRequest(conn.Context(), msgType, data) err := d.useMiddleware(d.backend).Handle(conn, req) if err != nil { diff --git a/dispatch/pipe_dispatcher_test.go b/dispatch/pipe_dispatcher_test.go index 7da2ad3..e4335c1 100644 --- a/dispatch/pipe_dispatcher_test.go +++ b/dispatch/pipe_dispatcher_test.go @@ -31,9 +31,9 @@ func TestPipeDispatcher_Dispatch(t *testing.T) { testError := fmt.Errorf("test error") conn.On("Context").Return(context.Background()) - backend.EXPECT().Handle(conn, NewRawRequest(conn.Context(), data)).Return(testError) + backend.EXPECT().Handle(conn, NewRawRequest(conn.Context(), wasabi.MsgTypeText, data)).Return(testError) - dispatcher.Dispatch(conn, data) + dispatcher.Dispatch(conn, wasabi.MsgTypeText, data) } func TestPipeDispatcher_Use(t *testing.T) { diff --git a/dispatch/request.go b/dispatch/request.go index bb9fcf6..5158e1e 100644 --- a/dispatch/request.go +++ b/dispatch/request.go @@ -7,16 +7,17 @@ import ( ) type RawRequest struct { - ctx context.Context - data []byte + ctx context.Context + data []byte + msgType wasabi.MessageType } -func NewRawRequest(ctx context.Context, data []byte) *RawRequest { +func NewRawRequest(ctx context.Context, msgType wasabi.MessageType, data []byte) *RawRequest { if ctx == nil { panic("nil context") } - return &RawRequest{ctx: ctx, data: data} + return &RawRequest{ctx: ctx, data: data, msgType: msgType} } func (r *RawRequest) Data() []byte { @@ -24,7 +25,14 @@ func (r *RawRequest) Data() []byte { } func (r *RawRequest) RoutingKey() string { - return "" + switch r.msgType { + case wasabi.MsgTypeText: + return "text" + case wasabi.MsgTypeBinary: + return "binary" + default: + panic("unknown message type " + r.msgType.String()) + } } func (r *RawRequest) Context() context.Context { diff --git a/dispatch/request_test.go b/dispatch/request_test.go index 260be4b..6d2a3ab 100644 --- a/dispatch/request_test.go +++ b/dispatch/request_test.go @@ -4,11 +4,13 @@ import ( "bytes" "context" "testing" + + "github.com/ksysoev/wasabi" ) func TestRawRequest_Data(t *testing.T) { data := []byte("test data") - req := NewRawRequest(context.Background(), data) + req := NewRawRequest(context.Background(), wasabi.MsgTypeText, data) if !bytes.Equal(req.Data(), data) { t.Errorf("Expected data to be '%s', but got '%s'", data, req.Data()) @@ -16,16 +18,16 @@ func TestRawRequest_Data(t *testing.T) { } func TestRawRequest_RoutingKey(t *testing.T) { - req := NewRawRequest(context.Background(), []byte{}) + req := NewRawRequest(context.Background(), wasabi.MsgTypeText, []byte{}) - if req.RoutingKey() != "" { + if req.RoutingKey() != "text" { t.Errorf("Expected routing key to be empty, but got %v", req.RoutingKey()) } } func TestRawRequest_Context(t *testing.T) { ctx := context.Background() - req := NewRawRequest(ctx, []byte{}) + req := NewRawRequest(ctx, wasabi.MsgTypeText, []byte{}) if req.Context() != ctx { t.Errorf("Expected context to be %v, but got %v", ctx, req.Context()) @@ -34,7 +36,7 @@ func TestRawRequest_Context(t *testing.T) { func TestRawRequest_WithContext(t *testing.T) { ctx := context.Background() - req := NewRawRequest(context.Background(), []byte{}) + req := NewRawRequest(context.Background(), wasabi.MsgTypeText, []byte{}) newReq := req.WithContext(ctx) diff --git a/dispatch/router_dipatcher.go b/dispatch/router_dipatcher.go index adf61cb..4da386d 100644 --- a/dispatch/router_dipatcher.go +++ b/dispatch/router_dipatcher.go @@ -43,8 +43,8 @@ func (d *RouterDispatcher) AddBackend(backend wasabi.Backend, routingKeys []stri // Dispatch handles the incoming connection and data by parsing the request, // determining the appropriate backend, and handling the request using middleware. // If an error occurs during handling, it is logged. -func (d *RouterDispatcher) Dispatch(conn wasabi.Connection, data []byte) { - req := d.parser(conn, data) +func (d *RouterDispatcher) Dispatch(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) { + req := d.parser(conn, msgType, data) if req == nil { return diff --git a/dispatch/router_dipatcher_test.go b/dispatch/router_dipatcher_test.go index a332273..6c92666 100644 --- a/dispatch/router_dipatcher_test.go +++ b/dispatch/router_dipatcher_test.go @@ -11,7 +11,9 @@ import ( func TestNewRouterDispatcher(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { + return mocks.NewMockRequest(t) + } dispatcher := NewRouterDispatcher(defaultBackend, parser) @@ -25,7 +27,9 @@ func TestNewRouterDispatcher(t *testing.T) { } func TestRouterDispatcher_AddBackend(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { + return mocks.NewMockRequest(t) + } dispatcher := NewRouterDispatcher(defaultBackend, parser) backend := mocks.NewMockBackend(t) @@ -58,7 +62,7 @@ func TestRouterDispatcher_DispatchDefault(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) req := mocks.NewMockRequest(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return req } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return req } dispatcher := NewRouterDispatcher(defaultBackend, parser) conn := mocks.NewMockConnection(t) @@ -70,13 +74,13 @@ func TestRouterDispatcher_DispatchDefault(t *testing.T) { defaultBackend.EXPECT().Handle(conn, req).Return(nil) - dispatcher.Dispatch(conn, data) + dispatcher.Dispatch(conn, wasabi.MsgTypeText, data) } func TestRouterDispatcher_DispatchByRoutingKey(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) req := mocks.NewMockRequest(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return req } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return req } dispatcher := NewRouterDispatcher(defaultBackend, parser) conn := mocks.NewMockConnection(t) @@ -90,24 +94,24 @@ func TestRouterDispatcher_DispatchByRoutingKey(t *testing.T) { mockBackend.EXPECT().Handle(conn, req).Return(nil) dispatcher.backendMap[routingKey] = mockBackend - dispatcher.Dispatch(conn, data) + dispatcher.Dispatch(conn, wasabi.MsgTypeText, data) } func TestRouterDispatcher_DispatchWrongRequest(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return nil } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return nil } dispatcher := NewRouterDispatcher(defaultBackend, parser) conn := mocks.NewMockConnection(t) data := []byte("test data") - dispatcher.Dispatch(conn, data) + dispatcher.Dispatch(conn, wasabi.MsgTypeText, data) } func TestRouterDispatcher_DispatchErrorHandlingRequest(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) req := mocks.NewMockRequest(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return req } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return req } dispatcher := NewRouterDispatcher(defaultBackend, parser) conn := mocks.NewMockConnection(t) @@ -120,11 +124,13 @@ func TestRouterDispatcher_DispatchErrorHandlingRequest(t *testing.T) { mockBackend.EXPECT().Handle(conn, req).Return(fmt.Errorf("test error")) dispatcher.backendMap[routingKey] = mockBackend - dispatcher.Dispatch(conn, data) + dispatcher.Dispatch(conn, wasabi.MsgTypeText, data) } func TestRouterDispatcher_Use(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { + return mocks.NewMockRequest(t) + } dispatcher := NewRouterDispatcher(defaultBackend, parser) middleware := RequestMiddlewere(func(next wasabi.RequestHandler) wasabi.RequestHandler { return next }) @@ -143,7 +149,9 @@ func TestRouterDispatcher_UseMiddleware(t *testing.T) { defaultBackend := mocks.NewMockBackend(t) defaultBackend.EXPECT().Handle(mockConn, mockReq).Return(testError) - parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) } + parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { + return mocks.NewMockRequest(t) + } dispatcher := NewRouterDispatcher(defaultBackend, parser) middleware1 := RequestMiddlewere(func(next wasabi.RequestHandler) wasabi.RequestHandler { return next }) diff --git a/examples/http_backend/main.go b/examples/http_backend/main.go index 8330540..f4f5bd9 100644 --- a/examples/http_backend/main.go +++ b/examples/http_backend/main.go @@ -32,7 +32,7 @@ func main() { }) ErrHandler := request.NewErrorHandlingMiddleware(func(conn wasabi.Connection, req wasabi.Request, err error) error { - conn.Send([]byte("Failed to process request: " + err.Error())) + conn.Send(wasabi.MsgTypeText, []byte("Failed to process request: "+err.Error())) return nil }) diff --git a/interfaces.go b/interfaces.go index 7b2fbb5..4b76fff 100644 --- a/interfaces.go +++ b/interfaces.go @@ -3,6 +3,15 @@ package wasabi import ( "context" "net/http" + + "nhooyr.io/websocket" +) + +type MessageType = websocket.MessageType + +const ( + MsgTypeText MessageType = websocket.MessageText + MsgTypeBinary MessageType = websocket.MessageBinary ) type Request interface { @@ -18,15 +27,15 @@ type Backend interface { // Dispatcher is interface for dispatchers type Dispatcher interface { - Dispatch(conn Connection, data []byte) + Dispatch(conn Connection, msgType MessageType, data []byte) } // OnMessage is type for OnMessage callback -type OnMessage func(conn Connection, data []byte) +type OnMessage func(conn Connection, msgType MessageType, data []byte) // Connection is interface for connections type Connection interface { - Send(msg []byte) error + Send(msgType MessageType, msg []byte) error Context() context.Context ID() string HandleRequests() diff --git a/mocks/mock_Connection.go b/mocks/mock_Connection.go index 04d5fb1..4409b58 100644 --- a/mocks/mock_Connection.go +++ b/mocks/mock_Connection.go @@ -8,6 +8,8 @@ import ( context "context" mock "github.com/stretchr/testify/mock" + + websocket "nhooyr.io/websocket" ) // MockConnection is an autogenerated mock type for the Connection type @@ -147,17 +149,17 @@ func (_c *MockConnection_ID_Call) RunAndReturn(run func() string) *MockConnectio return _c } -// Send provides a mock function with given fields: msg -func (_m *MockConnection) Send(msg []byte) error { - ret := _m.Called(msg) +// Send provides a mock function with given fields: msgType, msg +func (_m *MockConnection) Send(msgType websocket.MessageType, msg []byte) error { + ret := _m.Called(msgType, msg) if len(ret) == 0 { panic("no return value specified for Send") } var r0 error - if rf, ok := ret.Get(0).(func([]byte) error); ok { - r0 = rf(msg) + if rf, ok := ret.Get(0).(func(websocket.MessageType, []byte) error); ok { + r0 = rf(msgType, msg) } else { r0 = ret.Error(0) } @@ -171,14 +173,15 @@ type MockConnection_Send_Call struct { } // Send is a helper method to define mock.On call +// - msgType websocket.MessageType // - msg []byte -func (_e *MockConnection_Expecter) Send(msg interface{}) *MockConnection_Send_Call { - return &MockConnection_Send_Call{Call: _e.mock.On("Send", msg)} +func (_e *MockConnection_Expecter) Send(msgType interface{}, msg interface{}) *MockConnection_Send_Call { + return &MockConnection_Send_Call{Call: _e.mock.On("Send", msgType, msg)} } -func (_c *MockConnection_Send_Call) Run(run func(msg []byte)) *MockConnection_Send_Call { +func (_c *MockConnection_Send_Call) Run(run func(msgType websocket.MessageType, msg []byte)) *MockConnection_Send_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]byte)) + run(args[0].(websocket.MessageType), args[1].([]byte)) }) return _c } @@ -188,7 +191,7 @@ func (_c *MockConnection_Send_Call) Return(_a0 error) *MockConnection_Send_Call return _c } -func (_c *MockConnection_Send_Call) RunAndReturn(run func([]byte) error) *MockConnection_Send_Call { +func (_c *MockConnection_Send_Call) RunAndReturn(run func(websocket.MessageType, []byte) error) *MockConnection_Send_Call { _c.Call.Return(run) return _c } diff --git a/mocks/mock_Dispatcher.go b/mocks/mock_Dispatcher.go index ba287cb..0ee4d86 100644 --- a/mocks/mock_Dispatcher.go +++ b/mocks/mock_Dispatcher.go @@ -7,6 +7,8 @@ package mocks import ( wasabi "github.com/ksysoev/wasabi" mock "github.com/stretchr/testify/mock" + + websocket "nhooyr.io/websocket" ) // MockDispatcher is an autogenerated mock type for the Dispatcher type @@ -22,9 +24,9 @@ func (_m *MockDispatcher) EXPECT() *MockDispatcher_Expecter { return &MockDispatcher_Expecter{mock: &_m.Mock} } -// Dispatch provides a mock function with given fields: conn, data -func (_m *MockDispatcher) Dispatch(conn wasabi.Connection, data []byte) { - _m.Called(conn, data) +// Dispatch provides a mock function with given fields: conn, msgType, data +func (_m *MockDispatcher) Dispatch(conn wasabi.Connection, msgType websocket.MessageType, data []byte) { + _m.Called(conn, msgType, data) } // MockDispatcher_Dispatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Dispatch' @@ -34,14 +36,15 @@ type MockDispatcher_Dispatch_Call struct { // Dispatch is a helper method to define mock.On call // - conn wasabi.Connection +// - msgType websocket.MessageType // - data []byte -func (_e *MockDispatcher_Expecter) Dispatch(conn interface{}, data interface{}) *MockDispatcher_Dispatch_Call { - return &MockDispatcher_Dispatch_Call{Call: _e.mock.On("Dispatch", conn, data)} +func (_e *MockDispatcher_Expecter) Dispatch(conn interface{}, msgType interface{}, data interface{}) *MockDispatcher_Dispatch_Call { + return &MockDispatcher_Dispatch_Call{Call: _e.mock.On("Dispatch", conn, msgType, data)} } -func (_c *MockDispatcher_Dispatch_Call) Run(run func(conn wasabi.Connection, data []byte)) *MockDispatcher_Dispatch_Call { +func (_c *MockDispatcher_Dispatch_Call) Run(run func(conn wasabi.Connection, msgType websocket.MessageType, data []byte)) *MockDispatcher_Dispatch_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(wasabi.Connection), args[1].([]byte)) + run(args[0].(wasabi.Connection), args[1].(websocket.MessageType), args[2].([]byte)) }) return _c } @@ -51,7 +54,7 @@ func (_c *MockDispatcher_Dispatch_Call) Return() *MockDispatcher_Dispatch_Call { return _c } -func (_c *MockDispatcher_Dispatch_Call) RunAndReturn(run func(wasabi.Connection, []byte)) *MockDispatcher_Dispatch_Call { +func (_c *MockDispatcher_Dispatch_Call) RunAndReturn(run func(wasabi.Connection, websocket.MessageType, []byte)) *MockDispatcher_Dispatch_Call { _c.Call.Return(run) return _c }