Skip to content

Commit

Permalink
Implements concurrency handling for queue backend
Browse files Browse the repository at this point in the history
  • Loading branch information
ksysoev committed May 20, 2024
1 parent dd0ef5a commit 5833037
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions backend/queue.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,50 @@
package backend

import (
"github.com/google/uuid"
"strconv"
"sync"

"github.com/ksysoev/wasabi"
"nhooyr.io/websocket"
)

type response struct {
msgType websocket.MessageType
data []byte
msgType websocket.MessageType
}

type OnRequestCallback func(conn wasabi.Connection, req wasabi.Request, id string) error

type QueueBackend struct {
requests map[string]chan response
onRequest OnRequestCallback
lock *sync.Mutex
lastReqID int
}

func NewQueueBackend(onRequest OnRequestCallback) *QueueBackend {
return &QueueBackend{
requests: make(map[string]chan response),
onRequest: onRequest,
lock: &sync.Mutex{},
lastReqID: 1,
}
}

func (b *QueueBackend) Handle(conn wasabi.Connection, r wasabi.Request) error {
id := uuid.New().String()
respChan := make(chan response)

b.lock.Lock()
b.lastReqID++
id := strconv.Itoa(b.lastReqID)
b.requests[id] = respChan
b.lock.Unlock()

defer func() {
b.lock.Lock()
delete(b.requests, id)
b.lock.Unlock()
}()

err := b.onRequest(conn, r, id)

Expand All @@ -41,11 +56,23 @@ func (b *QueueBackend) Handle(conn wasabi.Connection, r wasabi.Request) error {
case resp := <-respChan:
return conn.Send(resp.msgType, resp.data)
case <-r.Context().Done():
return nil
return r.Context().Err()
}
}

func (b *QueueBackend) OnResponse(id string, msgType websocket.MessageType, data []byte) {
b.requests[id] <- response{msgType, data}
close(b.requests[id])
b.lock.Lock()
respChan, ok := b.requests[id]
b.lock.Unlock()

if !ok {
return
}

defer close(respChan)

select {
case respChan <- response{data, msgType}:
default:
}
}

0 comments on commit 5833037

Please sign in to comment.