Skip to content

Commit

Permalink
Implements connection pooling and background response handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ksysoev committed May 3, 2024
1 parent cdcfff0 commit 737ef65
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 18 deletions.
69 changes: 54 additions & 15 deletions backend/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,81 @@ package backend

import (
"bytes"
"sync"

"github.com/ksysoev/wasabi"
"nhooyr.io/websocket"
)

type WSBackend struct {
URL string
Origin string
URL string
connections map[string]*websocket.Conn
lock *sync.RWMutex
}

func NewWSBackend(url string) *WSBackend {
return &WSBackend{
URL: url,
connections: make(map[string]*websocket.Conn),
lock: &sync.RWMutex{},
}
}

func (b *WSBackend) Handle(conn wasabi.Connection, r wasabi.Request) error {
c, _, err := websocket.Dial(conn.Context(), b.URL, nil)
c, err := b.getConnection(conn)

if err != nil {
return err
}

defer c.CloseNow()
return c.Write(r.Context(), websocket.MessageText, r.Data())
}

err = c.Write(r.Context(), websocket.MessageText, r.Data())
func (b *WSBackend) getConnection(conn wasabi.Connection) (*websocket.Conn, error) {
b.lock.RLock()
c, ok := b.connections[conn.ID()]
b.lock.RUnlock()

if err != nil {
return err
if ok {
return c, nil
}

msgType, reader, err := c.Reader(r.Context())
c, _, err := websocket.Dial(conn.Context(), b.URL, nil)
if err != nil {
return err
return nil, err
}

buffer := bytes.NewBuffer(make([]byte, 0))
_, err = buffer.ReadFrom(reader)
b.lock.Lock()
b.connections[conn.ID()] = c

if err != nil {
return err
}
go func() {
defer func() {
b.lock.Lock()
delete(b.connections, conn.ID())
conn.Close(websocket.StatusNormalClosure, "")
b.lock.Unlock()
}()
buffer := bytes.NewBuffer(make([]byte, 0))
for {
buffer.Reset()
msgType, reader, err := c.Reader(conn.Context())
if err != nil {
return
}

_, err = buffer.ReadFrom(reader)

if err != nil {
return
}

err = conn.Send(msgType, buffer.Bytes())
if err != nil {
return
}
}
}()
b.lock.Unlock()

return conn.Send(msgType, buffer.Bytes())
return c, nil
}
4 changes: 1 addition & 3 deletions examples/passthrough/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ func main() {

slog.LogAttrs(context.Background(), slog.LevelDebug, "")

backend := &backend.WSBackend{
URL: "wss://ws.derivws.com/websockets/v3?app_id=1089",
}
backend := backend.NewWSBackend("wss://ws.derivws.com/websockets/v3?app_id=1089")

dispatcher := dispatch.NewRouterDispatcher(backend, func(conn wasabi.Connection, msgType wasabi.MessageType, data []byte) wasabi.Request {
return dispatch.NewRawRequest(conn.Context(), msgType, data)
Expand Down

0 comments on commit 737ef65

Please sign in to comment.