go-waku/waku/v2/protocol/store/message_queue.go

113 lines
2.0 KiB
Go

package store
import (
"sync"
"time"
"github.com/status-im/go-waku/waku/v2/utils"
)
type MessageQueue struct {
sync.RWMutex
seen map[[32]byte]struct{}
messages []IndexedWakuMessage
maxMessages int
maxDuration time.Duration
quit chan struct{}
}
func (self *MessageQueue) Push(msg IndexedWakuMessage) {
self.Lock()
defer self.Unlock()
var k [32]byte
copy(k[:], msg.index.Digest)
if _, ok := self.seen[k]; ok {
return
}
self.seen[k] = struct{}{}
self.messages = append(self.messages, msg)
if self.maxMessages != 0 && len(self.messages) > self.maxMessages {
numToPop := len(self.messages) - self.maxMessages
self.messages = self.messages[numToPop:len(self.messages)]
}
}
func (self *MessageQueue) Messages() <-chan IndexedWakuMessage {
c := make(chan IndexedWakuMessage)
f := func() {
self.RLock()
defer self.RUnlock()
for _, value := range self.messages {
c <- value
}
close(c)
}
go f()
return c
}
func (self *MessageQueue) cleanOlderRecords() {
self.Lock()
defer self.Unlock()
// TODO: check if retention days was set
t := utils.GetUnixEpochFrom(time.Now().Add(-self.maxDuration))
var idx int
for i := 0; i < len(self.messages); i++ {
if self.messages[i].index.ReceiverTime >= t {
idx = i
break
}
}
self.messages = self.messages[idx:]
}
func (self *MessageQueue) checkForOlderRecords(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-self.quit:
return
case <-ticker.C:
self.cleanOlderRecords()
}
}
}
func (self *MessageQueue) Length() int {
self.RLock()
defer self.RUnlock()
return len(self.messages)
}
func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue {
result := &MessageQueue{
maxMessages: maxMessages,
maxDuration: maxDuration,
seen: make(map[[32]byte]struct{}),
quit: make(chan struct{}),
}
if maxDuration != 0 {
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
}
return result
}
func (self *MessageQueue) Stop() {
close(self.quit)
}