diff --git a/waku/v2/api/publish/message_queue.go b/waku/v2/api/publish/message_queue.go index ad153ff9c..03b7a16a6 100644 --- a/waku/v2/api/publish/message_queue.go +++ b/waku/v2/api/publish/message_queue.go @@ -3,6 +3,7 @@ package publish import ( "container/heap" "context" + "sync" "github.com/waku-org/go-waku/waku/v2/protocol" ) @@ -59,6 +60,44 @@ func (pq *envelopePriorityQueue) Pop() any { return item } +type safeEnvelopePriorityQueue struct { + pq envelopePriorityQueue + lock sync.Mutex +} + +func (spq *safeEnvelopePriorityQueue) Push(task *envelopePriority) { + spq.lock.Lock() + defer spq.lock.Unlock() + heap.Push(&spq.pq, task) +} + +func (spq *safeEnvelopePriorityQueue) Pop() *envelopePriority { + spq.lock.Lock() + defer spq.lock.Unlock() + + if len(spq.pq) == 0 { + return nil + } + task := heap.Pop(&spq.pq).(*envelopePriority) + return task +} + +// Len returns the length of the priority queue in a thread-safe manner +func (spq *safeEnvelopePriorityQueue) Len() int { + spq.lock.Lock() + defer spq.lock.Unlock() + + return spq.pq.Len() +} + +func newSafePriorityQueue() *safeEnvelopePriorityQueue { + result := &safeEnvelopePriorityQueue{ + pq: make(envelopePriorityQueue, 0), + } + heap.Init(&result.pq) + return result +} + // MessageQueue is a structure used to handle the ordering of the messages to publish type MessageQueue struct { usePriorityQueue bool @@ -66,7 +105,7 @@ type MessageQueue struct { toSendChan chan *protocol.Envelope throttledPrioritySendQueue chan *envelopePriority envelopeAvailableOnPriorityQueueSignal chan struct{} - envelopePriorityQueue envelopePriorityQueue + envelopePriorityQueue *safeEnvelopePriorityQueue } // NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a @@ -77,10 +116,9 @@ func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue { } if m.usePriorityQueue { - m.envelopePriorityQueue = make(envelopePriorityQueue, 0) + m.envelopePriorityQueue = newSafePriorityQueue() m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize) m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize) - heap.Init(&m.envelopePriorityQueue) } else { m.toSendChan = make(chan *protocol.Envelope, bufferSize) } @@ -98,8 +136,7 @@ func (m *MessageQueue) Start(ctx context.Context) { continue } - heap.Push(&m.envelopePriorityQueue, envelopePriority) - + m.envelopePriorityQueue.Push(envelopePriority) m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} case <-ctx.Done(): @@ -150,7 +187,10 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope { select { case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: if ok { - ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope + e := m.envelopePriorityQueue.Pop() + if e != nil { + ch <- e.envelope + } } case envelope, ok := <-m.toSendChan: