From 20d6d4eb9a7955a835499fd11048f549f98b637f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 6 Aug 2024 14:49:26 -0400 Subject: [PATCH] refactor: extract missing messages logic to go-waku (#5638) --- protocol/messenger_mailserver.go | 35 ++- .../waku/v2/api/missing/criteria_interest.go | 47 +++ .../waku/v2/api/missing/missing_messages.go | 284 ++++++++++++++++++ .../go-waku/waku/v2/api/missing/options.go | 39 +++ vendor/modules.txt | 1 + wakuv2/common/message.go | 9 +- wakuv2/missing_messages.go | 279 ----------------- wakuv2/missing_messages_test.go | 43 --- wakuv2/waku.go | 54 +++- wakuv2/waku_test.go | 19 +- 10 files changed, 450 insertions(+), 360 deletions(-) create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go delete mode 100644 wakuv2/missing_messages.go delete mode 100644 wakuv2/missing_messages_test.go diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index be76ffe7c..4b09875a8 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -396,27 +396,34 @@ func (m *Messenger) checkForMissingMessagesLoop() { t := time.NewTicker(missingMessageCheckPeriod) defer t.Stop() + mailserverAvailableSignal := m.SubscribeMailserverAvailable() + for { select { case <-m.quit: return - case <-t.C: - filters := m.transport.Filters() - filtersByMs := m.SplitFiltersByStoreNode(filters) - for communityID, filtersForMs := range filtersByMs { - ms := m.getActiveMailserver(communityID) - if ms == nil { - continue - } + // Wait for mailserver available, also triggered on mailserver change + case <-mailserverAvailableSignal: - peerID, err := ms.PeerID() - if err != nil { - m.logger.Error("could not obtain the peerID") - return - } - m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs) + case <-t.C: + + } + + filters := m.transport.Filters() + filtersByMs := m.SplitFiltersByStoreNode(filters) + for communityID, filtersForMs := range filtersByMs { + ms := m.getActiveMailserver(communityID) + if ms == nil { + continue } + + peerID, err := ms.PeerID() + if err != nil { + m.logger.Error("could not obtain the peerID") + return + } + m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs) } } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go new file mode 100644 index 000000000..919b2fc91 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/criteria_interest.go @@ -0,0 +1,47 @@ +package missing + +import ( + "context" + "slices" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type criteriaInterest struct { + peerID peer.ID + contentFilter protocol.ContentFilter + lastChecked time.Time + + ctx context.Context + cancel context.CancelFunc +} + +func (c criteriaInterest) equals(other criteriaInterest) bool { + if c.peerID != other.peerID { + return false + } + + if c.contentFilter.PubsubTopic != other.contentFilter.PubsubTopic { + return false + } + + contentTopics := c.contentFilter.ContentTopics.ToList() + otherContentTopics := other.contentFilter.ContentTopics.ToList() + + slices.Sort(contentTopics) + slices.Sort(otherContentTopics) + + if len(contentTopics) != len(otherContentTopics) { + return false + } + + for i, contentTopic := range contentTopics { + if contentTopic != otherContentTopics[i] { + return false + } + } + + return true +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go new file mode 100644 index 000000000..a50e60718 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -0,0 +1,284 @@ +package missing + +// test + +import ( + "context" + "encoding/hex" + "errors" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/timesource" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +const maxContentTopicsPerRequest = 10 + +// MessageTracker should keep track of messages it has seen before and +// provide a way to determine whether a message exists or not. This +// is application specific +type MessageTracker interface { + MessageExists(pb.MessageHash) (bool, error) +} + +// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria +type MissingMessageVerifier struct { + ctx context.Context + params missingMessageVerifierParams + + messageTracker MessageTracker + + criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages + criteriaInterestMu sync.Mutex + + C <-chan *protocol.Envelope + + store *store.WakuStore + timesource timesource.Timesource + logger *zap.Logger +} + +// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier +func NewMissingMessageVerifier(store *store.WakuStore, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { + options = append(defaultMissingMessagesVerifierOptions, options...) + params := missingMessageVerifierParams{} + for _, opt := range options { + opt(¶ms) + } + + return &MissingMessageVerifier{ + store: store, + timesource: timesource, + messageTracker: messageTracker, + logger: logger.Named("missing-msg-verifier"), + params: params, + } +} + +func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) { + m.criteriaInterestMu.Lock() + defer m.criteriaInterestMu.Unlock() + + ctx, cancel := context.WithCancel(m.ctx) + criteriaInterest := criteriaInterest{ + peerID: peerID, + contentFilter: contentFilter, + lastChecked: m.timesource.Now().Add(-m.params.delay), + ctx: ctx, + cancel: cancel, + } + + currMessageVerificationRequest, ok := m.criteriaInterest[contentFilter.PubsubTopic] + + if ok && currMessageVerificationRequest.equals(criteriaInterest) { + return + } + + if ok { + // If there is an ongoing request, we cancel it before replacing it + // by the new list. This can be probably optimized further by tracking + // the last time a content topic was synced, but might not be necessary + // since cancelling an ongoing request would mean cancelling just a single + // page of results + currMessageVerificationRequest.cancel() + } + + m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest +} + +func (m *MissingMessageVerifier) Start(ctx context.Context) { + m.ctx = ctx + m.criteriaInterest = make(map[string]criteriaInterest) + + c := make(chan *protocol.Envelope, 1000) + m.C = c + + go func() { + t := time.NewTicker(m.params.interval) + defer t.Stop() + + var semaphore = make(chan struct{}, 5) + for { + select { + case <-t.C: + m.logger.Debug("checking for missing messages...") + m.criteriaInterestMu.Lock() + for _, interest := range m.criteriaInterest { + select { + case <-ctx.Done(): + return + default: + semaphore <- struct{}{} + go func(interest criteriaInterest) { + m.fetchHistory(c, interest) + <-semaphore + }(interest) + } + } + m.criteriaInterestMu.Unlock() + + case <-ctx.Done(): + return + } + } + }() +} + +func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) { + contentTopics := interest.contentFilter.ContentTopics.ToList() + for i := 0; i < len(contentTopics); i += maxContentTopicsPerRequest { + j := i + maxContentTopicsPerRequest + if j > len(contentTopics) { + j = len(contentTopics) + } + + now := m.timesource.Now() + err := m.fetchMessagesBatch(c, interest, i, j, now) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + m.logger.Error("could not fetch history", + zap.Stringer("peerID", interest.peerID), + zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), + zap.Strings("contentTopics", contentTopics)) + continue + } + + m.criteriaInterestMu.Lock() + c := m.criteriaInterest[interest.contentFilter.PubsubTopic] + if c.equals(interest) { + c.lastChecked = now + m.criteriaInterest[interest.contentFilter.PubsubTopic] = c + } + m.criteriaInterestMu.Unlock() + } +} + +func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) { + retry := true + count := 1 + for retry && count <= m.params.maxAttemptsToRetrieveHistory { + logger.Debug(logMsg, zap.Int("attempt", count)) + tCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + result, err := queryFunc(tCtx) + cancel() + if err != nil { + logger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) + select { + case <-m.ctx.Done(): + return nil, m.ctx.Err() + case <-time.After(2 * time.Second): + } + } else { + return result, nil + } + } + + return nil, errors.New("storenode not available") +} + +func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, interest criteriaInterest, batchFrom int, batchTo int, now time.Time) error { + contentTopics := interest.contentFilter.ContentTopics.ToList() + + logger := m.logger.With( + zap.Stringer("peerID", interest.peerID), + zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]), + zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), + logging.Epoch("from", interest.lastChecked), + logging.Epoch("to", now), + ) + + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + return m.store.Query(ctx, store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...), + TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), + TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()), + }, store.WithPeer(interest.peerID), store.WithPaging(false, 100), store.IncludeData(false)) + }, logger, "retrieving history to check for missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + + var missingHashes []pb.MessageHash + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + hash := pb.ToMessageHash(mkv.MessageHash) + exists, err := m.messageTracker.MessageExists(hash) + if err != nil { + return err + } + + if exists { + continue + } + + missingHashes = append(missingHashes, hash) + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + } + + if len(missingHashes) == 0 { + // Nothing to do here + return nil + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100)) + }, logger, "retrieving missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + select { + case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()): + default: + m.logger.Warn("subscriber is too slow!") + } + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + } + + return nil +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go new file mode 100644 index 000000000..b16abbc70 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/options.go @@ -0,0 +1,39 @@ +package missing + +import "time" + +type missingMessageVerifierParams struct { + delay time.Duration + interval time.Duration + maxAttemptsToRetrieveHistory int +} + +// MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior +type MissingMessageVerifierOption func(*missingMessageVerifierParams) + +// WithVerificationInterval is an option used to setup the verification interval +func WithVerificationInterval(t time.Duration) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.interval = t + } +} + +// WithDelay is an option used to indicate the delay to apply for verifying messages +func WithDelay(t time.Duration) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.delay = t + } +} + +// WithMaxAttempts indicates how many times will the message verifier retry a failed storenode request +func WithMaxRetryAttempts(max int) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.maxAttemptsToRetrieveHistory = max + } +} + +var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{ + WithVerificationInterval(time.Minute), + WithDelay(20 * time.Second), + WithMaxRetryAttempts(3), +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 47417a712..fd31c9e36 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1024,6 +1024,7 @@ github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/waku/persistence github.com/waku-org/go-waku/waku/v2/api/filter +github.com/waku-org/go-waku/waku/v2/api/missing github.com/waku-org/go-waku/waku/v2/api/publish github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc diff --git a/wakuv2/common/message.go b/wakuv2/common/message.go index 4c4703da0..d10a12960 100644 --- a/wakuv2/common/message.go +++ b/wakuv2/common/message.go @@ -17,12 +17,13 @@ import ( ) // MessageType represents where this message comes from -type MessageType int +type MessageType = string const ( - RelayedMessageType MessageType = iota - StoreMessageType - SendMessageType + RelayedMessageType MessageType = "relay" + StoreMessageType MessageType = "store" + SendMessageType MessageType = "send" + MissingMessageType MessageType = "missing" ) // MessageParams specifies the exact way a message should be wrapped diff --git a/wakuv2/missing_messages.go b/wakuv2/missing_messages.go deleted file mode 100644 index 976fe16f8..000000000 --- a/wakuv2/missing_messages.go +++ /dev/null @@ -1,279 +0,0 @@ -package wakuv2 - -import ( - "context" - "encoding/hex" - "errors" - "slices" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "google.golang.org/protobuf/proto" - - "go.uber.org/zap" - - gethcommon "github.com/ethereum/go-ethereum/common" - - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/store" - - "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/wakuv2/common" - - "github.com/waku-org/go-waku/waku/v2/protocol/pb" -) - -const maxContentTopicsPerRequest = 10 -const maxAttemptsToRetrieveHistory = 3 -const delay = 10 * time.Second - -type TopicInterest struct { - peerID peer.ID - pubsubTopic string - contentTopics []string - lastChecked time.Time - - ctx context.Context - cancel context.CancelFunc -} - -func (p TopicInterest) Equals(other TopicInterest) bool { - if p.peerID != other.peerID { - return false - } - - if p.pubsubTopic != other.pubsubTopic { - return false - } - - slices.Sort(p.contentTopics) - slices.Sort(other.contentTopics) - - if len(p.contentTopics) != len(other.contentTopics) { - return false - } - - for i, contentTopic := range p.contentTopics { - if contentTopic != other.contentTopics[i] { - return false - } - } - - return true -} - -func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { - w.topicInterestMu.Lock() - defer w.topicInterestMu.Unlock() - - ctx, cancel := context.WithCancel(w.ctx) - newMissingMessageRequest := TopicInterest{ - peerID: peerID, - pubsubTopic: pubsubTopic, - contentTopics: contentTopics, - lastChecked: w.timesource.Now().Add(-delay), - ctx: ctx, - cancel: cancel, - } - - currMessageVerificationRequest, ok := w.topicInterest[pubsubTopic] - - if ok && currMessageVerificationRequest.Equals(newMissingMessageRequest) { - return - } - - if ok { - // If there is an ongoing request, we cancel it before replacing it - // by the new list. This can be probably optimized further by tracking - // the last time a content topic was synced, but might not be necessary - // since cancelling an ongoing request would mean cancelling just a single - // page of results - currMessageVerificationRequest.cancel() - } - - w.topicInterest[pubsubTopic] = newMissingMessageRequest -} - -func (w *Waku) checkForMissingMessages() { - defer w.wg.Done() - defer w.logger.Debug("checkForMissingMessages - done") - - t := time.NewTicker(time.Minute) - defer t.Stop() - - var semaphore = make(chan struct{}, 5) - for { - select { - case <-t.C: - w.logger.Debug("checking for missing messages...") - w.topicInterestMu.Lock() - for _, request := range w.topicInterest { - select { - case <-w.ctx.Done(): - return - default: - semaphore <- struct{}{} - go func(r TopicInterest) { - w.FetchHistory(r) - <-semaphore - }(request) - } - } - w.topicInterestMu.Unlock() - - case <-w.ctx.Done(): - return - } - } -} - -func (w *Waku) FetchHistory(missingHistoryRequest TopicInterest) { - for i := 0; i < len(missingHistoryRequest.contentTopics); i += maxContentTopicsPerRequest { - j := i + maxContentTopicsPerRequest - if j > len(missingHistoryRequest.contentTopics) { - j = len(missingHistoryRequest.contentTopics) - } - - now := w.timesource.Now() - err := w.fetchMessagesBatch(missingHistoryRequest, i, j, now) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - - w.logger.Error("could not fetch history", zap.Stringer("peerID", missingHistoryRequest.peerID), zap.String("pubsubTopic", missingHistoryRequest.pubsubTopic), zap.Strings("contentTopics", missingHistoryRequest.contentTopics)) - continue - } - - w.topicInterestMu.Lock() - c := w.topicInterest[missingHistoryRequest.pubsubTopic] - if c.Equals(missingHistoryRequest) { - c.lastChecked = now - w.topicInterest[missingHistoryRequest.pubsubTopic] = c - } - w.topicInterestMu.Unlock() - } -} - -func (w *Waku) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) { - retry := true - count := 1 - for retry && count <= maxAttemptsToRetrieveHistory { - logger.Debug(logMsg, zap.Int("attempt", count)) - tCtx, cancel := context.WithTimeout(ctx, 20*time.Second) - result, err := queryFunc(tCtx) - cancel() - if err != nil { - logger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) - select { - case <-w.ctx.Done(): - return nil, w.ctx.Err() - case <-time.After(2 * time.Second): - } - } else { - return result, nil - } - } - - return nil, errors.New("storenode not available") -} - -func (w *Waku) fetchMessagesBatch(missingHistoryRequest TopicInterest, batchFrom int, batchTo int, now time.Time) error { - logger := w.logger.With( - zap.Stringer("peerID", missingHistoryRequest.peerID), - zap.Strings("contentTopics", missingHistoryRequest.contentTopics[batchFrom:batchTo]), - zap.String("pubsubTopic", missingHistoryRequest.pubsubTopic), - logutils.WakuMessageTimestamp("from", proto.Int64(missingHistoryRequest.lastChecked.UnixNano())), - logutils.WakuMessageTimestamp("to", proto.Int64(now.UnixNano())), - ) - - result, err := w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { - return w.node.Store().Query(ctx, store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(missingHistoryRequest.pubsubTopic, missingHistoryRequest.contentTopics[batchFrom:batchTo]...), - TimeStart: proto.Int64(missingHistoryRequest.lastChecked.Add(-delay).UnixNano()), - TimeEnd: proto.Int64(now.Add(-delay).UnixNano()), - }, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100), store.IncludeData(false)) - }, logger, "retrieving history to check for missing messages") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) - } - return err - } - - var missingHashes []pb.MessageHash - - for !result.IsComplete() { - for _, mkv := range result.Messages() { - hash := pb.ToMessageHash(mkv.MessageHash) - - w.poolMu.Lock() - alreadyCached := w.envelopeCache.Has(gethcommon.Hash(hash)) - w.poolMu.Unlock() - if alreadyCached { - continue - } - - missingHashes = append(missingHashes, hash) - } - - result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { - if err = result.Next(ctx); err != nil { - return nil, err - } - return result, nil - }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) - } - return err - } - } - - if len(missingHashes) == 0 { - // Nothing to do here - return nil - } - - result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { - return w.node.Store().QueryByHash(ctx, missingHashes, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100)) - }, logger, "retrieving missing messages") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) - } - return err - } - - for !result.IsComplete() { - for _, mkv := range result.Messages() { - envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()) - w.logger.Info("received waku2 store message", - zap.Stringer("envelopeHash", envelope.Hash()), - zap.String("pubsubTopic", mkv.GetPubsubTopic()), - zap.Int64p("timestamp", envelope.Message().Timestamp), - ) - - err = w.OnNewEnvelopes(envelope, common.StoreMessageType, false) - if err != nil { - return err - } - } - - result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { - if err = result.Next(ctx); err != nil { - return nil, err - } - return result, nil - }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) - } - return err - } - } - - return nil -} diff --git a/wakuv2/missing_messages_test.go b/wakuv2/missing_messages_test.go deleted file mode 100644 index b9f40a145..000000000 --- a/wakuv2/missing_messages_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package wakuv2 - -import ( - "context" - "testing" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/require" - - "github.com/status-im/status-go/timesource" -) - -func TestSetTopicInterest(t *testing.T) { - w := &Waku{ - ctx: context.TODO(), - timesource: timesource.Default(), - topicInterest: make(map[string]TopicInterest), - } - - peerID, err := peer.Decode("16Uiu2HAm3xVDaz6SRJ6kErwC21zBJEZjavVXg7VSkoWzaV1aMA3F") - require.NoError(t, err) - - pubsubTopic1 := "topic1" - contentTopics1 := []string{"A", "B", "C"} - contentTopics1_1 := []string{"C", "D", "E", "F"} - - w.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic1, contentTopics1) - - storedTopicInterest, ok := w.topicInterest[pubsubTopic1] - require.True(t, ok) - require.Equal(t, storedTopicInterest.contentTopics, contentTopics1) - require.Equal(t, storedTopicInterest.pubsubTopic, pubsubTopic1) - - w.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic1, contentTopics1_1) - storedTopicInterest_2, ok := w.topicInterest[pubsubTopic1] - require.True(t, ok) - require.Equal(t, storedTopicInterest_2.contentTopics, contentTopics1_1) - require.Equal(t, storedTopicInterest_2.pubsubTopic, pubsubTopic1) - - require.Error(t, storedTopicInterest.ctx.Err(), context.Canceled) - require.NoError(t, w.topicInterest[pubsubTopic1].ctx.Err()) - -} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 22dfac270..663fd2b11 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -58,6 +58,7 @@ import ( "github.com/libp2p/go-libp2p/core/metrics" filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" + "github.com/waku-org/go-waku/waku/v2/api/missing" "github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/onlinechecker" @@ -144,10 +145,9 @@ type Waku struct { sendQueue *publish.MessageQueue limiter *publish.PublishRateLimiter - msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded + missingMsgVerifier *missing.MissingMessageVerifier - topicInterest map[string]TopicInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages - topicInterestMu sync.Mutex + msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded ctx context.Context cancel context.CancelFunc @@ -240,7 +240,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), connectionNotifChan: make(chan node.PeerConnection, 20), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), - topicInterest: make(map[string]TopicInterest), ctx: ctx, cancel: cancel, wg: sync.WaitGroup{}, @@ -1194,11 +1193,6 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCrit mkv.Message.RateLimitProof = nil envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), query.PubsubTopic) - logger.Info("received waku2 store message", - zap.Stringer("envelopeHash", envelope.Hash()), - zap.String("pubsubTopic", query.PubsubTopic), - zap.Int64p("timestamp", envelope.Message().Timestamp), - ) err = w.OnNewEnvelopes(envelope, common.StoreMessageType, processEnvelopes) if err != nil { @@ -1303,7 +1297,28 @@ func (w *Waku) Start() error { if w.cfg.EnableMissingMessageVerification { w.wg.Add(1) - go w.checkForMissingMessages() + + w.missingMsgVerifier = missing.NewMissingMessageVerifier( + w.node.Store(), + w, + w.node.Timesource(), + w.logger) + + w.missingMsgVerifier.Start(w.ctx) + + go func() { + for { + select { + case <-w.ctx.Done(): + return + case envelope := <-w.missingMsgVerifier.C: + err = w.OnNewEnvelopes(envelope, common.MissingMessageType, false) + if err != nil { + w.logger.Error("OnNewEnvelopes error", zap.Error(err)) + } + } + } + }() } if w.cfg.LightClient { @@ -1337,6 +1352,20 @@ func (w *Waku) Start() error { return nil } +func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { + w.poolMu.Lock() + defer w.poolMu.Unlock() + return w.envelopeCache.Has(gethcommon.Hash(mh)), nil +} + +func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { + if !w.cfg.EnableMissingMessageVerification { + return + } + + w.missingMsgVerifier.SetCriteriaInterest(peerID, protocol.NewContentFilter(pubsubTopic, contentTopics...)) +} + func (w *Waku) setupRelaySubscriptions() error { if w.cfg.LightClient { return nil @@ -1405,10 +1434,11 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag } logger := w.logger.With( - zap.Any("messageType", msgType), + zap.String("messageType", msgType), zap.Stringer("envelopeHash", envelope.Hash()), + zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), - zap.Int64("timestamp", envelope.Message().GetTimestamp()), + logutils.WakuMessageTimestamp("timestamp", envelope.Message().Timestamp), ) logger.Debug("received new envelope") diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 8fd0663ec..e522b93f0 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -135,7 +135,9 @@ func TestRestartDiscoveryV5(t *testing.T) { } func TestRelayPeers(t *testing.T) { - config := &Config{} + config := &Config{ + EnableMissingMessageVerification: true, + } setDefaultConfig(config, false) w, err := New(nil, "", config, nil, nil, nil, nil, nil) require.NoError(t, err) @@ -458,13 +460,14 @@ func TestWakuV2Store(t *testing.T) { // Configuration for the first Waku node config1 := &Config{ - Port: 0, - ClusterID: 16, - EnableDiscV5: false, - DiscoveryLimit: 20, - EnableStore: false, - StoreCapacity: 100, - StoreSeconds: 3600, + Port: 0, + ClusterID: 16, + EnableDiscV5: false, + DiscoveryLimit: 20, + EnableStore: false, + StoreCapacity: 100, + StoreSeconds: 3600, + EnableMissingMessageVerification: true, } w1PeersCh := make(chan []string, 100) // buffered not to block on the send side