diff --git a/backend/queue.go b/backend/queue.go index 242af5c..f4edd55 100644 --- a/backend/queue.go +++ b/backend/queue.go @@ -1,14 +1,16 @@ package backend import ( - "github.com/google/uuid" + "strconv" + "sync" + "github.com/ksysoev/wasabi" "nhooyr.io/websocket" ) type response struct { - msgType websocket.MessageType data []byte + msgType websocket.MessageType } type OnRequestCallback func(conn wasabi.Connection, req wasabi.Request, id string) error @@ -16,20 +18,33 @@ type OnRequestCallback func(conn wasabi.Connection, req wasabi.Request, id strin type QueueBackend struct { requests map[string]chan response onRequest OnRequestCallback + lock *sync.Mutex + lastReqID int } func NewQueueBackend(onRequest OnRequestCallback) *QueueBackend { return &QueueBackend{ requests: make(map[string]chan response), onRequest: onRequest, + lock: &sync.Mutex{}, + lastReqID: 1, } } func (b *QueueBackend) Handle(conn wasabi.Connection, r wasabi.Request) error { - id := uuid.New().String() respChan := make(chan response) + b.lock.Lock() + b.lastReqID++ + id := strconv.Itoa(b.lastReqID) b.requests[id] = respChan + b.lock.Unlock() + + defer func() { + b.lock.Lock() + delete(b.requests, id) + b.lock.Unlock() + }() err := b.onRequest(conn, r, id) @@ -41,11 +56,23 @@ func (b *QueueBackend) Handle(conn wasabi.Connection, r wasabi.Request) error { case resp := <-respChan: return conn.Send(resp.msgType, resp.data) case <-r.Context().Done(): - return nil + return r.Context().Err() } } func (b *QueueBackend) OnResponse(id string, msgType websocket.MessageType, data []byte) { - b.requests[id] <- response{msgType, data} - close(b.requests[id]) + b.lock.Lock() + respChan, ok := b.requests[id] + b.lock.Unlock() + + if !ok { + return + } + + defer close(respChan) + + select { + case respChan <- response{data, msgType}: + default: + } }