diff --git a/waku/v2/protocol/store/message_queue.go b/waku/v2/protocol/store/message_queue.go index 2b65cb69..38f40441 100644 --- a/waku/v2/protocol/store/message_queue.go +++ b/waku/v2/protocol/store/message_queue.go @@ -1,11 +1,39 @@ package store +import ( + "sync" + "time" + + "github.com/status-im/go-waku/waku/v2/utils" +) + type MessageQueue struct { + sync.RWMutex + + seen map[[32]byte]struct{} messages []IndexedWakuMessage maxMessages int + maxDuration time.Duration + + quit chan struct{} +} + +type MessageQueueItem struct { + Index int + Value IndexedWakuMessage } func (self *MessageQueue) Push(msg IndexedWakuMessage) { + self.Lock() + defer self.Unlock() + + var k [32]byte + copy(k[:], msg.index.Digest) + if _, ok := self.seen[k]; ok { + return + } + + self.seen[k] = struct{}{} self.messages = append(self.messages, msg) if self.maxMessages != 0 && len(self.messages) > self.maxMessages { @@ -14,12 +42,73 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) { } } -func (self *MessageQueue) Messages() []IndexedWakuMessage { - return self.messages +func (self *MessageQueue) Messages() <-chan MessageQueueItem { + c := make(chan MessageQueueItem) + + f := func() { + self.RLock() + defer self.RUnlock() + for index, value := range self.messages { + c <- MessageQueueItem{index, value} + } + close(c) + } + go f() + + return c } -func NewMessageQueue(maxMessages int) *MessageQueue { - return &MessageQueue{ - maxMessages: maxMessages, +func (self *MessageQueue) cleanOlderRecords() { + self.Lock() + defer self.Unlock() + + // TODO: check if retention days was set + + t := utils.GetUnixEpochFrom(time.Now().Add(-self.maxDuration)) + + var idx int + for i := 0; i < len(self.messages); i++ { + if self.messages[i].index.ReceiverTime >= t { + idx = i + break + } + } + + self.messages = self.messages[idx:] +} + +func (self *MessageQueue) checkForOlderRecords(d time.Duration) { + ticker := time.NewTicker(d) + + select { + case <-self.quit: + return + case <-ticker.C: + self.cleanOlderRecords() } } + +func (self *MessageQueue) Length() int { + self.RLock() + defer self.RUnlock() + return len(self.messages) +} + +func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue { + result := &MessageQueue{ + maxMessages: maxMessages, + maxDuration: maxDuration, + seen: make(map[[32]byte]struct{}), + quit: make(chan struct{}), + } + + if maxDuration != 0 { + go result.checkForOlderRecords(10 * time.Second) // is 10s okay? + } + + return result +} + +func (self *MessageQueue) Stop() { + close(self.quit) +} diff --git a/waku/v2/protocol/store/message_queue_test.go b/waku/v2/protocol/store/message_queue_test.go index 2e608410..2c3bc735 100644 --- a/waku/v2/protocol/store/message_queue_test.go +++ b/waku/v2/protocol/store/message_queue_test.go @@ -2,9 +2,11 @@ package store import ( "testing" + "time" "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -15,22 +17,39 @@ func TestMessageQueue(t *testing.T) { msg4 := tests.CreateWakuMessage("3", 3) msg5 := tests.CreateWakuMessage("3", 3) - msgQ := NewMessageQueue(3) - msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{}, pubsubTopic: "test"}) - msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{}, pubsubTopic: "test"}) - msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{}, pubsubTopic: "test"}) + msgQ := NewMessageQueue(3, 1*time.Minute) + msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{Digest: []byte{1}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-20 * time.Second))}, pubsubTopic: "test"}) + msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{Digest: []byte{2}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-15 * time.Second))}, pubsubTopic: "test"}) + msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{Digest: []byte{3}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-10 * time.Second))}, pubsubTopic: "test"}) - require.Len(t, msgQ.messages, 3) + require.Equal(t, msgQ.Length(), 3) - msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{}, pubsubTopic: "test"}) + msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{Digest: []byte{4}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-3 * time.Second))}, pubsubTopic: "test"}) require.Len(t, msgQ.messages, 3) require.Equal(t, msg2.Payload, msgQ.messages[0].msg.Payload) require.Equal(t, msg4.Payload, msgQ.messages[2].msg.Payload) - msgQ.Push(IndexedWakuMessage{msg: msg5, index: &pb.Index{}, pubsubTopic: "test"}) + indexedMsg5 := IndexedWakuMessage{msg: msg5, index: &pb.Index{Digest: []byte{5}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(0 * time.Second))}, pubsubTopic: "test"} + + msgQ.Push(indexedMsg5) require.Len(t, msgQ.messages, 3) require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload) require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload) + + // Test duplication + msgQ.Push(indexedMsg5) + + require.Len(t, msgQ.messages, 3) + require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload) + require.Equal(t, msg4.Payload, msgQ.messages[1].msg.Payload) + require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload) + + // Test retention + msgQ.maxDuration = 5 * time.Second + msgQ.cleanOlderRecords() + require.Len(t, msgQ.messages, 2) + require.Equal(t, msg4.Payload, msgQ.messages[0].msg.Payload) + require.Equal(t, msg5.Payload, msgQ.messages[1].msg.Payload) } diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index cfa055c3..191b0d6f 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -8,7 +8,6 @@ import ( "fmt" "math" "sort" - "sync" "time" logging "github.com/ipfs/go-log" @@ -228,28 +227,19 @@ type IndexedWakuMessage struct { type WakuStore struct { ctx context.Context MsgC chan *protocol.Envelope - seen map[[32]byte]struct{} - - messageQueue *MessageQueue - maxNumberOfMessages int - maxRetentionDays time.Duration started bool - messagesMutex sync.Mutex - - msgProvider MessageProvider - h host.Host + messageQueue *MessageQueue + msgProvider MessageProvider + h host.Host } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDays time.Duration) *WakuStore { +func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p - wakuStore.seen = make(map[[32]byte]struct{}) - wakuStore.maxNumberOfMessages = maxNumberOfMessages - wakuStore.maxRetentionDays = maxRetentionDays - wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages) + wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore } @@ -303,19 +293,11 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) - metrics.RecordMessage(ctx, "stored", len(store.messageQueue.messages)) + metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) } } func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) { - var k [32]byte - copy(k[:], idx.Digest) - - if _, ok := store.seen[k]; ok { - return - } - - store.seen[k] = struct{}{} store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } @@ -326,13 +308,10 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) { return } - store.messagesMutex.Lock() - defer store.messagesMutex.Unlock() - store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message()) if store.msgProvider == nil { - metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages)) + metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) return } @@ -344,7 +323,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) { return } - metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages)) + metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { @@ -531,7 +510,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return nil, err } - metrics.RecordMessage(ctx, "retrieved", len(store.messageQueue.messages)) + metrics.RecordMessage(ctx, "retrieved", store.messageQueue.Length()) return historyResponseRPC.Response, nil }