feat: clean up older records in message queue

Fixes #133
This commit is contained in:
Richard Ramos 2021-11-06 16:57:19 -04:00
parent a1cb371d5a
commit fcfe3568ab
3 changed files with 129 additions and 42 deletions

View File

@ -1,11 +1,39 @@
package store package store
import (
"sync"
"time"
"github.com/status-im/go-waku/waku/v2/utils"
)
type MessageQueue struct { type MessageQueue struct {
sync.RWMutex
seen map[[32]byte]struct{}
messages []IndexedWakuMessage messages []IndexedWakuMessage
maxMessages int maxMessages int
maxDuration time.Duration
quit chan struct{}
}
type MessageQueueItem struct {
Index int
Value IndexedWakuMessage
} }
func (self *MessageQueue) Push(msg IndexedWakuMessage) { func (self *MessageQueue) Push(msg IndexedWakuMessage) {
self.Lock()
defer self.Unlock()
var k [32]byte
copy(k[:], msg.index.Digest)
if _, ok := self.seen[k]; ok {
return
}
self.seen[k] = struct{}{}
self.messages = append(self.messages, msg) self.messages = append(self.messages, msg)
if self.maxMessages != 0 && len(self.messages) > self.maxMessages { if self.maxMessages != 0 && len(self.messages) > self.maxMessages {
@ -14,12 +42,73 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) {
} }
} }
func (self *MessageQueue) Messages() []IndexedWakuMessage { func (self *MessageQueue) Messages() <-chan MessageQueueItem {
return self.messages c := make(chan MessageQueueItem)
f := func() {
self.RLock()
defer self.RUnlock()
for index, value := range self.messages {
c <- MessageQueueItem{index, value}
}
close(c)
}
go f()
return c
} }
func NewMessageQueue(maxMessages int) *MessageQueue { func (self *MessageQueue) cleanOlderRecords() {
return &MessageQueue{ self.Lock()
maxMessages: maxMessages, defer self.Unlock()
// TODO: check if retention days was set
t := utils.GetUnixEpochFrom(time.Now().Add(-self.maxDuration))
var idx int
for i := 0; i < len(self.messages); i++ {
if self.messages[i].index.ReceiverTime >= t {
idx = i
break
}
}
self.messages = self.messages[idx:]
}
func (self *MessageQueue) checkForOlderRecords(d time.Duration) {
ticker := time.NewTicker(d)
select {
case <-self.quit:
return
case <-ticker.C:
self.cleanOlderRecords()
} }
} }
func (self *MessageQueue) Length() int {
self.RLock()
defer self.RUnlock()
return len(self.messages)
}
func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue {
result := &MessageQueue{
maxMessages: maxMessages,
maxDuration: maxDuration,
seen: make(map[[32]byte]struct{}),
quit: make(chan struct{}),
}
if maxDuration != 0 {
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
}
return result
}
func (self *MessageQueue) Stop() {
close(self.quit)
}

View File

@ -2,9 +2,11 @@ package store
import ( import (
"testing" "testing"
"time"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -15,22 +17,39 @@ func TestMessageQueue(t *testing.T) {
msg4 := tests.CreateWakuMessage("3", 3) msg4 := tests.CreateWakuMessage("3", 3)
msg5 := tests.CreateWakuMessage("3", 3) msg5 := tests.CreateWakuMessage("3", 3)
msgQ := NewMessageQueue(3) msgQ := NewMessageQueue(3, 1*time.Minute)
msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{}, pubsubTopic: "test"}) msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{Digest: []byte{1}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-20 * time.Second))}, pubsubTopic: "test"})
msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{}, pubsubTopic: "test"}) msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{Digest: []byte{2}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-15 * time.Second))}, pubsubTopic: "test"})
msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{}, pubsubTopic: "test"}) msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{Digest: []byte{3}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-10 * time.Second))}, pubsubTopic: "test"})
require.Len(t, msgQ.messages, 3) require.Equal(t, msgQ.Length(), 3)
msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{}, pubsubTopic: "test"}) msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{Digest: []byte{4}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-3 * time.Second))}, pubsubTopic: "test"})
require.Len(t, msgQ.messages, 3) require.Len(t, msgQ.messages, 3)
require.Equal(t, msg2.Payload, msgQ.messages[0].msg.Payload) require.Equal(t, msg2.Payload, msgQ.messages[0].msg.Payload)
require.Equal(t, msg4.Payload, msgQ.messages[2].msg.Payload) require.Equal(t, msg4.Payload, msgQ.messages[2].msg.Payload)
msgQ.Push(IndexedWakuMessage{msg: msg5, index: &pb.Index{}, pubsubTopic: "test"}) indexedMsg5 := IndexedWakuMessage{msg: msg5, index: &pb.Index{Digest: []byte{5}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(0 * time.Second))}, pubsubTopic: "test"}
msgQ.Push(indexedMsg5)
require.Len(t, msgQ.messages, 3) require.Len(t, msgQ.messages, 3)
require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload) require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload)
require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload) require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload)
// Test duplication
msgQ.Push(indexedMsg5)
require.Len(t, msgQ.messages, 3)
require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload)
require.Equal(t, msg4.Payload, msgQ.messages[1].msg.Payload)
require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload)
// Test retention
msgQ.maxDuration = 5 * time.Second
msgQ.cleanOlderRecords()
require.Len(t, msgQ.messages, 2)
require.Equal(t, msg4.Payload, msgQ.messages[0].msg.Payload)
require.Equal(t, msg5.Payload, msgQ.messages[1].msg.Payload)
} }

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"math" "math"
"sort" "sort"
"sync"
"time" "time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -228,28 +227,19 @@ type IndexedWakuMessage struct {
type WakuStore struct { type WakuStore struct {
ctx context.Context ctx context.Context
MsgC chan *protocol.Envelope MsgC chan *protocol.Envelope
seen map[[32]byte]struct{}
messageQueue *MessageQueue
maxNumberOfMessages int
maxRetentionDays time.Duration
started bool started bool
messagesMutex sync.Mutex messageQueue *MessageQueue
msgProvider MessageProvider
msgProvider MessageProvider h host.Host
h host.Host
} }
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDays time.Duration) *WakuStore { func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.msgProvider = p wakuStore.msgProvider = p
wakuStore.seen = make(map[[32]byte]struct{}) wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
wakuStore.maxNumberOfMessages = maxNumberOfMessages
wakuStore.maxRetentionDays = maxRetentionDays
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages)
return wakuStore return wakuStore
} }
@ -303,19 +293,11 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
metrics.RecordMessage(ctx, "stored", len(store.messageQueue.messages)) metrics.RecordMessage(ctx, "stored", store.messageQueue.Length())
} }
} }
func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) { func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) {
var k [32]byte
copy(k[:], idx.Digest)
if _, ok := store.seen[k]; ok {
return
}
store.seen[k] = struct{}{}
store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
} }
@ -326,13 +308,10 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) {
return return
} }
store.messagesMutex.Lock()
defer store.messagesMutex.Unlock()
store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message()) store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message())
if store.msgProvider == nil { if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages)) metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
return return
} }
@ -344,7 +323,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) {
return return
} }
metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages)) metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
} }
func (store *WakuStore) storeIncomingMessages(ctx context.Context) { func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
@ -531,7 +510,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
return nil, err return nil, err
} }
metrics.RecordMessage(ctx, "retrieved", len(store.messageQueue.messages)) metrics.RecordMessage(ctx, "retrieved", store.messageQueue.Length())
return historyResponseRPC.Response, nil return historyResponseRPC.Response, nil
} }