fix(wakuv2): post envelopes previously cached but not processed (#4246)

This commit is contained in:
Igor Sirotin 2023-11-02 14:16:59 +00:00 committed by GitHub
parent 25d8c52dd5
commit 9f69c32593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 3 deletions

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/payload"
@ -55,6 +56,8 @@ type ReceivedMessage struct {
SymKeyHash common.Hash // The Keccak256Hash of the key SymKeyHash common.Hash // The Keccak256Hash of the key
hash common.Hash hash common.Hash
Processed atomic.Bool
} }
// MessagesRequest contains details of a request for historic messages. // MessagesRequest contains details of a request for historic messages.

View File

@ -1325,21 +1325,29 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) {
hash := recvMessage.Hash() hash := recvMessage.Hash()
w.poolMu.Lock() w.poolMu.Lock()
_, alreadyCached := w.envelopes[hash] envelope, alreadyCached := w.envelopes[hash]
w.poolMu.Unlock() w.poolMu.Unlock()
if !alreadyCached { if !alreadyCached {
recvMessage.Processed.Store(false)
w.addEnvelope(recvMessage) w.addEnvelope(recvMessage)
} }
logger := w.logger.With(zap.String("envelopeHash", recvMessage.Hash().Hex()))
if alreadyCached { if alreadyCached {
w.logger.Debug("w envelope already cached", zap.String("envelopeHash", recvMessage.Hash().Hex())) logger.Debug("w envelope already cached")
common.EnvelopesCachedCounter.WithLabelValues("hit").Inc() common.EnvelopesCachedCounter.WithLabelValues("hit").Inc()
} else { } else {
w.logger.Debug("cached w envelope", zap.String("envelopeHash", recvMessage.Hash().Hex())) logger.Debug("cached w envelope")
common.EnvelopesCachedCounter.WithLabelValues("miss").Inc() common.EnvelopesCachedCounter.WithLabelValues("miss").Inc()
common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload))) common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload)))
}
if !alreadyCached || !envelope.Processed.Load() {
logger.Debug("waku: posting event")
w.postEvent(recvMessage) // notify the local node about the new message w.postEvent(recvMessage) // notify the local node about the new message
} }
return true, nil return true, nil
} }
@ -1371,6 +1379,8 @@ func (w *Waku) processQueue() {
w.storeMsgIDsMu.Lock() w.storeMsgIDsMu.Lock()
delete(w.storeMsgIDs, e.Hash()) delete(w.storeMsgIDs, e.Hash())
w.storeMsgIDsMu.Unlock() w.storeMsgIDsMu.Unlock()
} else {
e.Processed.Store(true)
} }
w.envelopeFeed.Send(common.EnvelopeEvent{ w.envelopeFeed.Send(common.EnvelopeEvent{