fix: make the envelope priority queue safe for concurrent access (#1215)

This commit is contained in:
richΛrd 2024-09-04 10:30:57 -04:00 committed by GitHub
parent 3066ff10b1
commit f9e7895202
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,6 +3,7 @@ package publish
import (
"container/heap"
"context"
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
@ -59,6 +60,44 @@ func (pq *envelopePriorityQueue) Pop() any {
return item
}
type safeEnvelopePriorityQueue struct {
pq envelopePriorityQueue
lock sync.Mutex
}
func (spq *safeEnvelopePriorityQueue) Push(task *envelopePriority) {
spq.lock.Lock()
defer spq.lock.Unlock()
heap.Push(&spq.pq, task)
}
func (spq *safeEnvelopePriorityQueue) Pop() *envelopePriority {
spq.lock.Lock()
defer spq.lock.Unlock()
if len(spq.pq) == 0 {
return nil
}
task := heap.Pop(&spq.pq).(*envelopePriority)
return task
}
// Len returns the length of the priority queue in a thread-safe manner
func (spq *safeEnvelopePriorityQueue) Len() int {
spq.lock.Lock()
defer spq.lock.Unlock()
return spq.pq.Len()
}
func newSafePriorityQueue() *safeEnvelopePriorityQueue {
result := &safeEnvelopePriorityQueue{
pq: make(envelopePriorityQueue, 0),
}
heap.Init(&result.pq)
return result
}
// MessageQueue is a structure used to handle the ordering of the messages to publish
type MessageQueue struct {
usePriorityQueue bool
@ -66,7 +105,7 @@ type MessageQueue struct {
toSendChan chan *protocol.Envelope
throttledPrioritySendQueue chan *envelopePriority
envelopeAvailableOnPriorityQueueSignal chan struct{}
envelopePriorityQueue envelopePriorityQueue
envelopePriorityQueue *safeEnvelopePriorityQueue
}
// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a
@ -77,10 +116,9 @@ func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue {
}
if m.usePriorityQueue {
m.envelopePriorityQueue = make(envelopePriorityQueue, 0)
m.envelopePriorityQueue = newSafePriorityQueue()
m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize)
m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize)
heap.Init(&m.envelopePriorityQueue)
} else {
m.toSendChan = make(chan *protocol.Envelope, bufferSize)
}
@ -98,8 +136,7 @@ func (m *MessageQueue) Start(ctx context.Context) {
continue
}
heap.Push(&m.envelopePriorityQueue, envelopePriority)
m.envelopePriorityQueue.Push(envelopePriority)
m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}
case <-ctx.Done():
@ -150,7 +187,10 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
select {
case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
if ok {
ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope
e := m.envelopePriorityQueue.Pop()
if e != nil {
ch <- e.envelope
}
}
case envelope, ok := <-m.toSendChan: