diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index ee32a00d7..c596e4474 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -123,6 +123,10 @@ func (w *GethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubsc return nil, errors.New("not available in WakuV1") } +func (w *GethWakuWrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []string) error { + return errors.New("not available in WakuV1") +} + // Peers function only added for compatibility with waku V2 func (w *GethWakuWrapper) Peers() map[string]types.WakuV2Peer { p := make(map[string]types.WakuV2Peer) diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 2dd426407..1290ea0f9 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -304,6 +304,15 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub return w.waku.SubscribeToConnStatusChanges(), nil } +func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []string) error { + pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic) + w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, contentTopics) + + // No err can be be generated by this function. The function returns an error + // Just so there's compatibility with GethWakuWrapper from V1 + return nil +} + func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) { w.waku.ConnectionChanged(state) } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index e1904e321..90be801ca 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -122,6 +122,8 @@ type Waku interface { SubscribeToConnStatusChanges() (*ConnStatusSubscription, error) + SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []string) error + // MinPow returns the PoW value required by this node. MinPow() float64 // BloomFilter returns the aggregated bloom filter for all the topics of interest. diff --git a/protocol/messenger.go b/protocol/messenger.go index 478f96113..c3efc08c6 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -844,6 +844,8 @@ func (m *Messenger) Start() (*MessengerResponse, error) { return nil, err } + go m.checkForMissingMessagesLoop() + controlledCommunities, err := m.communitiesManager.Controlled() if err != nil { return nil, err diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index a26013dfb..ce370933b 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -379,6 +379,37 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries return allResponses, nil } +const missingMessageCheckPeriod = 30 * time.Second + +func (m *Messenger) checkForMissingMessagesLoop() { + t := time.NewTicker(missingMessageCheckPeriod) + defer t.Stop() + + for { + select { + case <-m.quit: + return + + case <-t.C: + filters := m.transport.Filters() + filtersByMs := m.SplitFiltersByStoreNode(filters) + for communityID, filtersForMs := range filtersByMs { + ms := m.getActiveMailserver(communityID) + if ms == nil { + continue + } + + peerID, err := ms.PeerID() + if err != nil { + m.logger.Error("could not obtain the peerID") + return + } + m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs) + } + } + } +} + func getPrioritizedBatches() []int { return []int{1, 5, 10} } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 926d5e5d8..745eed5f5 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/exp/maps" "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/connection" @@ -740,3 +741,36 @@ func (t *Transport) ConfirmMessageDelivered(messageID string) { func (t *Transport) SetStorePeerID(peerID peer.ID) { t.waku.SetStorePeerID(peerID) } + +func (t *Transport) SetCriteriaForMissingMessageVerification(peerID peer.ID, filters []*Filter) { + if t.waku.Version() != 2 { + return + } + + topicMap := make(map[string]map[string]struct{}) + for _, f := range filters { + if !f.Listen || f.Ephemeral { + continue + } + + _, ok := topicMap[f.PubsubTopic] + if !ok { + topicMap[f.PubsubTopic] = make(map[string]struct{}) + } + + topicMap[f.PubsubTopic][f.ContentTopic.String()] = struct{}{} + } + + for pubsubTopic, contentTopics := range topicMap { + ctList := maps.Keys(contentTopics) + err := t.waku.SetCriteriaForMissingMessageVerification(peerID, pubsubTopic, ctList) + if err != nil { + t.logger.Error("could not check for missing messages", + zap.Error(err), + zap.Stringer("peerID", peerID), + zap.String("pubsubTopic", pubsubTopic), + zap.Strings("contentTopics", ctList)) + return + } + } +} diff --git a/wakuv2/missing_messages.go b/wakuv2/missing_messages.go new file mode 100644 index 000000000..cee4e24e8 --- /dev/null +++ b/wakuv2/missing_messages.go @@ -0,0 +1,279 @@ +package wakuv2 + +import ( + "context" + "encoding/hex" + "errors" + "slices" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "google.golang.org/protobuf/proto" + + "go.uber.org/zap" + + gethcommon "github.com/ethereum/go-ethereum/common" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + + "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/wakuv2/common" + + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +const maxContentTopicsPerRequest = 10 +const maxAttemptsToRetrieveHistory = 3 +const delay = 10 * time.Second + +type TopicInterest struct { + peerID peer.ID + pubsubTopic string + contentTopics []string + lastChecked time.Time + + ctx context.Context + cancel context.CancelFunc +} + +func (p TopicInterest) Equals(other TopicInterest) bool { + if p.peerID != other.peerID { + return false + } + + if p.pubsubTopic != other.pubsubTopic { + return false + } + + slices.Sort(p.contentTopics) + slices.Sort(other.contentTopics) + + if len(p.contentTopics) != len(other.contentTopics) { + return false + } + + for i, contentTopic := range p.contentTopics { + if contentTopic != other.contentTopics[i] { + return false + } + } + + return true +} + +func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { + w.topicInterestMu.Lock() + defer w.topicInterestMu.Unlock() + + ctx, cancel := context.WithCancel(w.ctx) + newMissingMessageRequest := TopicInterest{ + peerID: peerID, + pubsubTopic: pubsubTopic, + contentTopics: contentTopics, + lastChecked: w.timesource.Now().Add(delay), + ctx: ctx, + cancel: cancel, + } + + currMessageVerificationRequest, ok := w.topicInterest[pubsubTopic] + + if ok && currMessageVerificationRequest.Equals(newMissingMessageRequest) { + return + } + + if ok { + // If there is an ongoing request, we cancel it before replacing it + // by the new list. This can be probably optimized further by tracking + // the last time a content topic was synced, but might not be necessary + // since cancelling an ongoing request would mean cancelling just a single + // page of results + currMessageVerificationRequest.cancel() + } + + w.topicInterest[pubsubTopic] = newMissingMessageRequest +} + +func (w *Waku) checkForMissingMessages() { + defer w.wg.Done() + defer w.logger.Debug("checkForMissingMessages - done") + + t := time.NewTicker(time.Minute) + defer t.Stop() + + var semaphore = make(chan struct{}, 5) + for { + select { + case <-t.C: + w.logger.Debug("checking for missing messages...") + w.topicInterestMu.Lock() + for _, request := range w.topicInterest { + select { + case <-w.ctx.Done(): + return + default: + semaphore <- struct{}{} + go func(r TopicInterest) { + w.FetchHistory(r) + <-semaphore + }(request) + } + } + w.topicInterestMu.Unlock() + + case <-w.ctx.Done(): + return + } + } +} + +func (w *Waku) FetchHistory(missingHistoryRequest TopicInterest) { + for i := 0; i < len(missingHistoryRequest.contentTopics); i += maxContentTopicsPerRequest { + j := i + maxContentTopicsPerRequest + if j > len(missingHistoryRequest.contentTopics) { + j = len(missingHistoryRequest.contentTopics) + } + + now := w.timesource.Now() + err := w.fetchMessagesBatch(missingHistoryRequest, i, j, now) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + w.logger.Error("could not fetch history", zap.Stringer("peerID", missingHistoryRequest.peerID), zap.String("pubsubTopic", missingHistoryRequest.pubsubTopic), zap.Strings("contentTopics", missingHistoryRequest.contentTopics)) + continue + } + + w.topicInterestMu.Lock() + c := w.topicInterest[missingHistoryRequest.pubsubTopic] + if c.Equals(missingHistoryRequest) { + c.lastChecked = now + w.topicInterest[missingHistoryRequest.pubsubTopic] = c + } + w.topicInterestMu.Unlock() + } +} + +func (w *Waku) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (*store.Result, error), logger *zap.Logger, logMsg string) (*store.Result, error) { + retry := true + count := 1 + for retry && count <= maxAttemptsToRetrieveHistory { + logger.Debug(logMsg, zap.Int("attempt", count)) + tCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + result, err := queryFunc(tCtx) + cancel() + if err != nil { + logger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) + select { + case <-w.ctx.Done(): + return nil, w.ctx.Err() + case <-time.After(2 * time.Second): + } + } else { + return result, nil + } + } + + return nil, errors.New("storenode not available") +} + +func (w *Waku) fetchMessagesBatch(missingHistoryRequest TopicInterest, batchFrom int, batchTo int, now time.Time) error { + logger := w.logger.With( + zap.Stringer("peerID", missingHistoryRequest.peerID), + zap.Strings("contentTopics", missingHistoryRequest.contentTopics[batchFrom:batchTo]), + zap.String("pubsubTopic", missingHistoryRequest.pubsubTopic), + logutils.WakuMessageTimestamp("from", proto.Int64(missingHistoryRequest.lastChecked.UnixNano())), + logutils.WakuMessageTimestamp("to", proto.Int64(now.UnixNano())), + ) + + result, err := w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { + return w.node.Store().Query(ctx, store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(missingHistoryRequest.pubsubTopic, missingHistoryRequest.contentTopics[batchFrom:batchTo]...), + TimeStart: proto.Int64(missingHistoryRequest.lastChecked.Add(-delay).UnixNano()), + TimeEnd: proto.Int64(now.Add(-delay).UnixNano()), + }, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100), store.IncludeData(false)) + }, logger, "retrieving history to check for missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + + var missingMessages []pb.MessageHash + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + hash := pb.ToMessageHash(mkv.MessageHash) + + w.poolMu.Lock() + alreadyCached := w.envelopeCache.Has(gethcommon.Hash(hash)) + w.poolMu.Unlock() + if alreadyCached { + continue + } + + missingMessages = append(missingMessages, hash) + } + + result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + } + + if len(missingMessages) == 0 { + // Nothing to do here + return nil + } + + result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { + return w.node.Store().QueryByHash(ctx, missingMessages, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100)) + }, logger, "retrieving missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()) + w.logger.Info("received waku2 store message", + zap.Stringer("envelopeHash", envelope.Hash()), + zap.String("pubsubTopic", mkv.GetPubsubTopic()), + zap.Int64p("timestamp", envelope.Message().Timestamp), + ) + + err = w.OnNewEnvelopes(envelope, common.StoreMessageType, false) + if err != nil { + return err + } + } + + result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return err + } + } + + return nil +} diff --git a/wakuv2/missing_messages_test.go b/wakuv2/missing_messages_test.go new file mode 100644 index 000000000..b9f40a145 --- /dev/null +++ b/wakuv2/missing_messages_test.go @@ -0,0 +1,43 @@ +package wakuv2 + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + + "github.com/status-im/status-go/timesource" +) + +func TestSetTopicInterest(t *testing.T) { + w := &Waku{ + ctx: context.TODO(), + timesource: timesource.Default(), + topicInterest: make(map[string]TopicInterest), + } + + peerID, err := peer.Decode("16Uiu2HAm3xVDaz6SRJ6kErwC21zBJEZjavVXg7VSkoWzaV1aMA3F") + require.NoError(t, err) + + pubsubTopic1 := "topic1" + contentTopics1 := []string{"A", "B", "C"} + contentTopics1_1 := []string{"C", "D", "E", "F"} + + w.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic1, contentTopics1) + + storedTopicInterest, ok := w.topicInterest[pubsubTopic1] + require.True(t, ok) + require.Equal(t, storedTopicInterest.contentTopics, contentTopics1) + require.Equal(t, storedTopicInterest.pubsubTopic, pubsubTopic1) + + w.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic1, contentTopics1_1) + storedTopicInterest_2, ok := w.topicInterest[pubsubTopic1] + require.True(t, ok) + require.Equal(t, storedTopicInterest_2.contentTopics, contentTopics1_1) + require.Equal(t, storedTopicInterest_2.pubsubTopic, pubsubTopic1) + + require.Error(t, storedTopicInterest.ctx.Err(), context.Canceled) + require.NoError(t, w.topicInterest[pubsubTopic1].ctx.Err()) + +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index d7cb1e5e8..37e9ff03c 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -125,6 +125,9 @@ type Waku struct { sendQueue chan *protocol.Envelope msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded + topicInterest map[string]TopicInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages + topicInterestMu sync.Mutex + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -214,6 +217,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge sendQueue: make(chan *protocol.Envelope, 1000), topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), + topicInterest: make(map[string]TopicInterest), ctx: ctx, cancel: cancel, wg: sync.WaitGroup{}, @@ -1343,7 +1347,7 @@ func (w *Waku) Start() error { } } - w.wg.Add(2) + w.wg.Add(3) go func() { defer w.wg.Done() @@ -1411,6 +1415,7 @@ func (w *Waku) Start() error { //TODO: commenting for now so that only fleet nodes are used. //Need to uncomment once filter peer scoring etc is implemented. go w.runPeerExchangeLoop() + go w.checkForMissingMessages() if w.cfg.LightClient { // Create FilterManager that will main peer connectivity