Skip to content

Commit

Permalink
Expose message type to Wasabi components
Browse files Browse the repository at this point in the history
  • Loading branch information
ksysoev committed Apr 13, 2024
1 parent e0acb79 commit f0a4bd6
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 60 deletions.
2 changes: 1 addition & 1 deletion backend/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (b *HTTPBackend) Handle(conn wasabi.Connection, r wasabi.Request) error {
return err
}

return conn.Send(respBody.Bytes())
return conn.Send(wasabi.MsgTypeText, respBody.Bytes())
}

func WithDefaultHTTPTimeout(timeout time.Duration) HTTPBackendOption {
Expand Down
2 changes: 1 addition & 1 deletion backend/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestHTTPBackend_Handle(t *testing.T) {

mockReq.EXPECT().Context().Return(context.Background())

mockConn.EXPECT().Send([]byte("OK")).Return(nil)
mockConn.EXPECT().Send(wasabi.MsgTypeText, []byte("OK")).Return(nil)
mockReq.EXPECT().Data().Return([]byte("test request"))

backend := NewBackend(func(req wasabi.Request) (*http.Request, error) {
Expand Down
8 changes: 4 additions & 4 deletions channel/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Conn) HandleRequests() {
defer c.close()

for c.ctx.Err() == nil {
_, reader, err := c.ws.Reader(c.ctx)
msgType, reader, err := c.ws.Reader(c.ctx)

if err != nil {
slog.Warn("Error reading message: " + err.Error())
Expand All @@ -86,18 +86,18 @@ func (c *Conn) HandleRequests() {

go func(wg *sync.WaitGroup) {
defer wg.Done()
c.onMessageCB(c, data)
c.onMessageCB(c, msgType, data)
}(c.reqWG)
}
}

// Send sends message to connection
func (c *Conn) Send(msg []byte) error {
func (c *Conn) Send(msgType wasabi.MessageType, msg []byte) error {
if c.isClosed.Load() || c.ctx.Err() != nil {
return ErrConnectionClosed
}

return c.ws.Write(c.ctx, websocket.MessageText, msg)
return c.ws.Write(c.ctx, msgType, msg)
}

// close closes the connection.
Expand Down
2 changes: 1 addition & 1 deletion channel/connection_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestDefaultConnectionRegistry_AddConnection(t *testing.T) {
ctx := context.Background()
ws := &websocket.Conn{}
cb := func(wasabi.Connection, []byte) {}
cb := func(wasabi.Connection, wasabi.MessageType, []byte) {}

registry := NewDefaultConnectionRegistry()

Expand Down
4 changes: 2 additions & 2 deletions channel/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestConn_HandleRequests(t *testing.T) {
// Mock OnMessage callback
received := make(chan struct{})

conn.onMessageCB = func(c wasabi.Connection, data []byte) { received <- struct{}{} }
conn.onMessageCB = func(c wasabi.Connection, msgType wasabi.MessageType, data []byte) { received <- struct{}{} }

go conn.HandleRequests()

Expand Down Expand Up @@ -126,7 +126,7 @@ func TestConn_Send(t *testing.T) {
onClose := make(chan string)
conn := NewConnection(context.Background(), ws, nil, onClose)

err = conn.Send([]byte("test message"))
err = conn.Send(wasabi.MsgTypeText, []byte("test message"))
if err != nil {
t.Errorf("Unexpected error sending message: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion dispatch/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ func (f RequestHandlerFunc) Handle(conn wasabi.Connection, req wasabi.Request) e
return f(conn, req)
}

type RequestParser func(conn wasabi.Connection, data []byte) wasabi.Request
type RequestParser func(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) wasabi.Request
4 changes: 2 additions & 2 deletions dispatch/pipe_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func NewPipeDispatcher(backend wasabi.Backend) *PipeDispatcher {
}

// Dispatch dispatches request to backend
func (d *PipeDispatcher) Dispatch(conn wasabi.Connection, data []byte) {
req := NewRawRequest(conn.Context(), data)
func (d *PipeDispatcher) Dispatch(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) {
req := NewRawRequest(conn.Context(), msgType, data)

err := d.useMiddleware(d.backend).Handle(conn, req)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dispatch/pipe_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func TestPipeDispatcher_Dispatch(t *testing.T) {
testError := fmt.Errorf("test error")

conn.On("Context").Return(context.Background())
backend.EXPECT().Handle(conn, NewRawRequest(conn.Context(), data)).Return(testError)
backend.EXPECT().Handle(conn, NewRawRequest(conn.Context(), wasabi.MsgTypeText, data)).Return(testError)

dispatcher.Dispatch(conn, data)
dispatcher.Dispatch(conn, wasabi.MsgTypeText, data)
}

func TestPipeDispatcher_Use(t *testing.T) {
Expand Down
18 changes: 13 additions & 5 deletions dispatch/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,32 @@ import (
)

type RawRequest struct {
ctx context.Context
data []byte
ctx context.Context
data []byte
msgType wasabi.MessageType
}

func NewRawRequest(ctx context.Context, data []byte) *RawRequest {
func NewRawRequest(ctx context.Context, msgType wasabi.MessageType, data []byte) *RawRequest {
if ctx == nil {
panic("nil context")
}

return &RawRequest{ctx: ctx, data: data}
return &RawRequest{ctx: ctx, data: data, msgType: msgType}
}

func (r *RawRequest) Data() []byte {
return r.data
}

func (r *RawRequest) RoutingKey() string {
return ""
switch r.msgType {
case wasabi.MsgTypeText:
return "text"
case wasabi.MsgTypeBinary:
return "binary"
default:
panic("unknown message type " + r.msgType.String())
}
}

func (r *RawRequest) Context() context.Context {
Expand Down
12 changes: 7 additions & 5 deletions dispatch/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@ import (
"bytes"
"context"
"testing"

"github.com/ksysoev/wasabi"
)

func TestRawRequest_Data(t *testing.T) {
data := []byte("test data")
req := NewRawRequest(context.Background(), data)
req := NewRawRequest(context.Background(), wasabi.MsgTypeText, data)

if !bytes.Equal(req.Data(), data) {
t.Errorf("Expected data to be '%s', but got '%s'", data, req.Data())
}
}

func TestRawRequest_RoutingKey(t *testing.T) {
req := NewRawRequest(context.Background(), []byte{})
req := NewRawRequest(context.Background(), wasabi.MsgTypeText, []byte{})

if req.RoutingKey() != "" {
if req.RoutingKey() != "text" {
t.Errorf("Expected routing key to be empty, but got %v", req.RoutingKey())
}
}

func TestRawRequest_Context(t *testing.T) {
ctx := context.Background()
req := NewRawRequest(ctx, []byte{})
req := NewRawRequest(ctx, wasabi.MsgTypeText, []byte{})

if req.Context() != ctx {
t.Errorf("Expected context to be %v, but got %v", ctx, req.Context())
Expand All @@ -34,7 +36,7 @@ func TestRawRequest_Context(t *testing.T) {

func TestRawRequest_WithContext(t *testing.T) {
ctx := context.Background()
req := NewRawRequest(context.Background(), []byte{})
req := NewRawRequest(context.Background(), wasabi.MsgTypeText, []byte{})

newReq := req.WithContext(ctx)

Expand Down
4 changes: 2 additions & 2 deletions dispatch/router_dipatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (d *RouterDispatcher) AddBackend(backend wasabi.Backend, routingKeys []stri
// Dispatch handles the incoming connection and data by parsing the request,
// determining the appropriate backend, and handling the request using middleware.
// If an error occurs during handling, it is logged.
func (d *RouterDispatcher) Dispatch(conn wasabi.Connection, data []byte) {
req := d.parser(conn, data)
func (d *RouterDispatcher) Dispatch(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) {
req := d.parser(conn, msgType, data)

if req == nil {
return
Expand Down
32 changes: 20 additions & 12 deletions dispatch/router_dipatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
func TestNewRouterDispatcher(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)

parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request {
return mocks.NewMockRequest(t)
}

dispatcher := NewRouterDispatcher(defaultBackend, parser)

Expand All @@ -25,7 +27,9 @@ func TestNewRouterDispatcher(t *testing.T) {
}
func TestRouterDispatcher_AddBackend(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)
parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request {
return mocks.NewMockRequest(t)
}
dispatcher := NewRouterDispatcher(defaultBackend, parser)

backend := mocks.NewMockBackend(t)
Expand Down Expand Up @@ -58,7 +62,7 @@ func TestRouterDispatcher_DispatchDefault(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)

req := mocks.NewMockRequest(t)
parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return req }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return req }
dispatcher := NewRouterDispatcher(defaultBackend, parser)

conn := mocks.NewMockConnection(t)
Expand All @@ -70,13 +74,13 @@ func TestRouterDispatcher_DispatchDefault(t *testing.T) {

defaultBackend.EXPECT().Handle(conn, req).Return(nil)

dispatcher.Dispatch(conn, data)
dispatcher.Dispatch(conn, wasabi.MsgTypeText, data)
}

func TestRouterDispatcher_DispatchByRoutingKey(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)
req := mocks.NewMockRequest(t)
parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return req }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return req }
dispatcher := NewRouterDispatcher(defaultBackend, parser)

conn := mocks.NewMockConnection(t)
Expand All @@ -90,24 +94,24 @@ func TestRouterDispatcher_DispatchByRoutingKey(t *testing.T) {
mockBackend.EXPECT().Handle(conn, req).Return(nil)
dispatcher.backendMap[routingKey] = mockBackend

dispatcher.Dispatch(conn, data)
dispatcher.Dispatch(conn, wasabi.MsgTypeText, data)
}

func TestRouterDispatcher_DispatchWrongRequest(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)
parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return nil }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return nil }
dispatcher := NewRouterDispatcher(defaultBackend, parser)

conn := mocks.NewMockConnection(t)
data := []byte("test data")

dispatcher.Dispatch(conn, data)
dispatcher.Dispatch(conn, wasabi.MsgTypeText, data)
}

func TestRouterDispatcher_DispatchErrorHandlingRequest(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)
req := mocks.NewMockRequest(t)
parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return req }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request { return req }
dispatcher := NewRouterDispatcher(defaultBackend, parser)

conn := mocks.NewMockConnection(t)
Expand All @@ -120,11 +124,13 @@ func TestRouterDispatcher_DispatchErrorHandlingRequest(t *testing.T) {
mockBackend.EXPECT().Handle(conn, req).Return(fmt.Errorf("test error"))
dispatcher.backendMap[routingKey] = mockBackend

dispatcher.Dispatch(conn, data)
dispatcher.Dispatch(conn, wasabi.MsgTypeText, data)
}
func TestRouterDispatcher_Use(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)
parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request {
return mocks.NewMockRequest(t)
}
dispatcher := NewRouterDispatcher(defaultBackend, parser)

middleware := RequestMiddlewere(func(next wasabi.RequestHandler) wasabi.RequestHandler { return next })
Expand All @@ -143,7 +149,9 @@ func TestRouterDispatcher_UseMiddleware(t *testing.T) {
defaultBackend := mocks.NewMockBackend(t)
defaultBackend.EXPECT().Handle(mockConn, mockReq).Return(testError)

parser := func(_ wasabi.Connection, _ []byte) wasabi.Request { return mocks.NewMockRequest(t) }
parser := func(_ wasabi.Connection, _ wasabi.MessageType, _ []byte) wasabi.Request {
return mocks.NewMockRequest(t)
}
dispatcher := NewRouterDispatcher(defaultBackend, parser)

middleware1 := RequestMiddlewere(func(next wasabi.RequestHandler) wasabi.RequestHandler { return next })
Expand Down
2 changes: 1 addition & 1 deletion examples/http_backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
})

ErrHandler := request.NewErrorHandlingMiddleware(func(conn wasabi.Connection, req wasabi.Request, err error) error {
conn.Send([]byte("Failed to process request: " + err.Error()))
conn.Send(wasabi.MsgTypeText, []byte("Failed to process request: "+err.Error()))
return nil
})

Expand Down
15 changes: 12 additions & 3 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ package wasabi
import (
"context"
"net/http"

"nhooyr.io/websocket"
)

type MessageType = websocket.MessageType

const (
MsgTypeText MessageType = websocket.MessageText
MsgTypeBinary MessageType = websocket.MessageBinary
)

type Request interface {
Expand All @@ -18,15 +27,15 @@ type Backend interface {

// Dispatcher is interface for dispatchers
type Dispatcher interface {
Dispatch(conn Connection, data []byte)
Dispatch(conn Connection, msgType MessageType, data []byte)
}

// OnMessage is type for OnMessage callback
type OnMessage func(conn Connection, data []byte)
type OnMessage func(conn Connection, msgType MessageType, data []byte)

// Connection is interface for connections
type Connection interface {
Send(msg []byte) error
Send(msgType MessageType, msg []byte) error
Context() context.Context
ID() string
HandleRequests()
Expand Down
Loading

0 comments on commit f0a4bd6

Please sign in to comment.