From e3ef8c649a57d30d5c5f724be433e86713f746ad Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 15 Dec 2023 19:50:12 +0000 Subject: [PATCH] chore: store node requests manager (#4446) --- VERSION | 2 +- eth-node/bridge/geth/waku.go | 4 +- eth-node/bridge/geth/wakuv2.go | 13 +- eth-node/types/waku.go | 2 +- protocol/communities/community.go | 7 + protocol/communities/manager.go | 2 + protocol/messenger.go | 39 +- protocol/messenger_communities.go | 261 +----------- protocol/messenger_curated_communities.go | 4 +- protocol/messenger_handler.go | 7 - protocol/messenger_mailserver.go | 70 ++- protocol/messenger_mailserver_cycle.go | 49 ++- ..._mailserver_processMailserverBatch_test.go | 10 +- .../messenger_store_node_request_manager.go | 334 +++++++++++++++ protocol/messenger_storenode_request_test.go | 403 +++++++++++++++--- protocol/messenger_testing_utils.go | 137 ++++-- protocol/transport/transport.go | 38 +- protocol/tt/logger.go | 7 +- wakuv2/waku.go | 105 +++-- 19 files changed, 1035 insertions(+), 459 deletions(-) create mode 100644 protocol/messenger_store_node_request_manager.go diff --git a/VERSION b/VERSION index 27a7ea230..80f2ec142 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.171.34 +0.171.35 diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 8f5a5f4d0..d9d5ba592 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -268,8 +268,8 @@ func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { w.waku.MarkP2PMessageAsProcessed(hash) } -func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { - return nil, errors.New("not implemented") +func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) { + return nil, 0, errors.New("not implemented") } func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {} diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index b88f1733a..3449b180a 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -176,13 +176,14 @@ func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesR return errors.New("DEPRECATED") } -func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { +func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) { var options []store.HistoryRequestOption peer, err := peer.Decode(string(peerID)) if err != nil { - return nil, err + return nil, 0, err } + options = []store.HistoryRequestOption{ store.WithPaging(false, uint64(r.Limit)), } @@ -201,9 +202,9 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic)) } - pbCursor, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options) + pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes) if err != nil { - return nil, err + return nil, 0, err } if pbCursor != nil { @@ -212,10 +213,10 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b ReceiverTime: pbCursor.ReceiverTime, SenderTime: pbCursor.SenderTime, PubsubTopic: pbCursor.PubsubTopic, - }, nil + }, envelopesCount, nil } - return nil, nil + return nil, envelopesCount, nil } // DEPRECATED: Not used in waku V2 diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index fd602aa19..1d737e513 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -151,7 +151,7 @@ type Waku interface { SendMessagesRequest(peerID []byte, request MessagesRequest) error // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages - RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest) (*StoreRequestCursor, error) + RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest, processEnvelopes bool) (*StoreRequestCursor, int, error) // ProcessingP2PMessages indicates whether there are in-flight p2p messages ProcessingP2PMessages() bool diff --git a/protocol/communities/community.go b/protocol/communities/community.go index a71a0efc2..92c445bcc 100644 --- a/protocol/communities/community.go +++ b/protocol/communities/community.go @@ -389,6 +389,13 @@ func (o *Community) Shard() *shard.Shard { return nil } +func (o *Community) CommunityShard() CommunityShard { + return CommunityShard{ + CommunityID: o.IDString(), + Shard: o.Shard(), + } +} + func (o *Community) IntroMessage() string { if o != nil && o.config != nil && diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index 66e2ec0a7..de68d5143 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -1519,6 +1519,8 @@ func (m *Manager) Queue(signer *ecdsa.PublicKey, community *Community, clock uin } func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte, verifiedOwner *ecdsa.PublicKey, communityShard *protobuf.Shard) (*CommunityResponse, error) { + m.logger.Debug("HandleCommunityDescriptionMessage", zap.String("communityID", description.ID)) + if signer == nil { return nil, errors.New("signer can't be nil") } diff --git a/protocol/messenger.go b/protocol/messenger.go index 28fb85303..af5efb08a 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -114,6 +114,7 @@ type Messenger struct { communitiesKeyDistributor CommunitiesKeyDistributor accountsManager account.Manager mentionsManager *MentionManager + storeNodeRequestsManager *StoreNodeRequestManager logger *zap.Logger outputCSV bool @@ -153,9 +154,6 @@ type Messenger struct { once sync.Once } - requestedCommunitiesLock sync.RWMutex - requestedCommunities map[string]*transport.Filter - requestedContactsLock sync.RWMutex requestedContacts map[string]*transport.Filter @@ -533,18 +531,16 @@ func NewMessenger( peers: make(map[string]peerStatus), availabilitySubscriptions: make([]chan struct{}, 0), }, - mailserversDatabase: c.mailserversDatabase, - account: c.account, - quit: make(chan struct{}), - ctx: ctx, - cancel: cancel, - requestedCommunitiesLock: sync.RWMutex{}, - requestedCommunities: make(map[string]*transport.Filter), - requestedContactsLock: sync.RWMutex{}, - requestedContacts: make(map[string]*transport.Filter), - importingCommunities: make(map[string]bool), - importingChannels: make(map[string]bool), - importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1), + mailserversDatabase: c.mailserversDatabase, + account: c.account, + quit: make(chan struct{}), + ctx: ctx, + cancel: cancel, + requestedContactsLock: sync.RWMutex{}, + requestedContacts: make(map[string]*transport.Filter), + importingCommunities: make(map[string]bool), + importingChannels: make(map[string]bool), + importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1), importDelayer: struct { wait chan struct{} once sync.Once @@ -587,6 +583,7 @@ func NewMessenger( } messenger.mentionsManager = NewMentionManager(messenger) + messenger.storeNodeRequestsManager = NewCommunityRequestsManager(messenger) if c.walletService != nil { messenger.walletAPI = walletAPI @@ -791,7 +788,6 @@ func (m *Messenger) Start() (*MessengerResponse, error) { m.handleCommunitiesSubscription(m.communitiesManager.Subscribe()) m.handleCommunitiesHistoryArchivesSubscription(m.communitiesManager.Subscribe()) m.updateCommunitiesActiveMembersPeriodically() - m.handleConnectionChange(m.online()) m.handleENSVerificationSubscription(ensSubscription) m.watchConnectionChange() m.watchChatsAndCommunitiesToUnmute() @@ -926,13 +922,6 @@ func (m *Messenger) handleConnectionChange(online bool) { } m.shouldPublishContactCode = false } - go func() { - _, err := m.RequestAllHistoricMessagesWithRetries(false) - if err != nil { - m.logger.Warn("failed to fetch historic messages", zap.Error(err)) - } - }() - } else { if m.pushNotificationClient != nil { m.pushNotificationClient.Offline() @@ -1457,6 +1446,7 @@ func (m *Messenger) handleENSVerificationSubscription(c chan []*ens.Verification func (m *Messenger) watchConnectionChange() { m.logger.Debug("watching connection changes") state := m.online() + m.handleConnectionChange(state) go func() { for { select { @@ -1870,6 +1860,9 @@ func (m *Messenger) Init() error { // Shutdown takes care of ensuring a clean shutdown of Messenger func (m *Messenger) Shutdown() (err error) { + if m == nil { + return nil + } close(m.quit) m.cancel() m.shutdownWaitGroup.Wait() diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 02a6b823a..400c0c9b9 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -6,11 +6,9 @@ import ( "database/sql" "errors" "fmt" - "math" "sync" "time" - "golang.org/x/exp/maps" "golang.org/x/exp/slices" "golang.org/x/time/rate" @@ -73,6 +71,9 @@ func (r *FetchCommunityRequest) Validate() error { if len(r.CommunityKey) <= 2 { return fmt.Errorf("community key is too short") } + if _, err := types.DecodeHex(r.CommunityKey); err != nil { + return fmt.Errorf("invalid community key") + } return nil } @@ -2554,260 +2555,26 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities } } - return m.requestCommunityInfoFromMailserver(communityID, request.Shard, request.WaitForResponse) + community, _, err := m.storeNodeRequestsManager.FetchCommunity(communities.CommunityShard{ + CommunityID: communityID, + Shard: request.Shard, + }, request.WaitForResponse) + + return community, err } -// requestCommunityInfoFromMailserver installs filter for community and requests its details -// from mailserver. When response received it will be passed through signals handler -func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *shard.Shard, waitForResponse bool) (*communities.Community, error) { - - m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard)) - - m.requestedCommunitiesLock.Lock() - defer m.requestedCommunitiesLock.Unlock() - - if _, ok := m.requestedCommunities[communityID]; ok { - return nil, nil - } - - //If filter wasn't installed we create it and remember for deinstalling after - //response received - filter := m.transport.FilterByChatID(communityID) - if filter == nil { - filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{ - ChatID: communityID, - PubsubTopic: shard.PubsubTopic(), - }}) - if err != nil { - return nil, fmt.Errorf("Can't install filter for community: %v", err) - } - if len(filters) != 1 { - return nil, fmt.Errorf("Unexpected amount of filters created") - } - filter = filters[0] - m.requestedCommunities[communityID] = filter - m.logger.Debug("created filter for community", - zap.String("filterID", filter.FilterID), - zap.String("communityID", communityID), - zap.String("PubsubTopic", filter.PubsubTopic), - ) - } else { - //we don't remember filter id associated with community because it was already installed - m.requestedCommunities[communityID] = nil - } - - defer m.forgetCommunityRequest(communityID) - - to := uint32(math.Ceil(float64(m.GetCurrentTimeInMillis()) / 1000)) - from := to - oneMonthInSeconds - - _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { - batch := MailserverBatch{ - From: from, - To: to, - Topics: []types.TopicType{filter.ContentTopic}, - PubsubTopic: filter.PubsubTopic, - } - m.logger.Info("requesting historic", zap.Any("batch", batch)) - err := m.processMailserverBatch(batch) - return nil, err - }) - - if err != nil { - return nil, err - } - - m.logger.Info("mailserver request performed", - zap.String("communityID", communityID), - zap.Bool("waitForResponse", waitForResponse), - ) - - if !waitForResponse { - return nil, nil - } - - // TODO: We can force to process all messages then we don't have to wait? - //m.ProcessAllMessages() - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - for { - select { - case <-time.After(200 * time.Millisecond): - //send signal to client that message status updated - community, err := m.communitiesManager.GetByIDString(communityID) - if err != nil { - return nil, err - } - if community != nil && community.Name() != "" && community.DescriptionText() != "" { - m.logger.Debug("community info found", - zap.String("communityID", communityID), - zap.String("displayName", community.Name())) - return community, nil - } - - case <-ctx.Done(): - m.logger.Error("failed to request community info", zap.String("communityID", communityID), zap.Error(ctx.Err())) - return nil, fmt.Errorf("failed to request community info for id '%s' from mailserver: %w", communityID, ctx.Err()) - } - } -} - -// requestCommunitiesFromMailserver installs filter for community and requests its details -// from mailserver. When response received it will be passed through signals handler -func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.CommunityShard) { - m.requestedCommunitiesLock.Lock() - defer m.requestedCommunitiesLock.Unlock() - - // we group topics by PubsubTopic - groupedTopics := map[string]map[types.TopicType]struct{}{} - - for _, c := range communities { - if _, ok := m.requestedCommunities[c.CommunityID]; ok { - continue - } - - //If filter wasn't installed we create it and remember for deinstalling after - //response received - filter := m.transport.FilterByChatID(c.CommunityID) - if filter == nil { - filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{ - ChatID: c.CommunityID, - PubsubTopic: c.Shard.PubsubTopic(), - }}) - if err != nil { - m.logger.Error("Can't install filter for community", zap.Error(err)) - continue - } - if len(filters) != 1 { - m.logger.Error("Unexpected amount of filters created") - continue - } - filter = filters[0] - m.requestedCommunities[c.CommunityID] = filter - } else { - //we don't remember filter id associated with community because it was already installed - m.requestedCommunities[c.CommunityID] = nil - } - - if _, ok := groupedTopics[filter.PubsubTopic]; !ok { - groupedTopics[filter.PubsubTopic] = map[types.TopicType]struct{}{} - } - - groupedTopics[filter.PubsubTopic][filter.ContentTopic] = struct{}{} - } - - defer func() { - for _, c := range communities { - m.forgetCommunityRequest(c.CommunityID) - } - }() - - to := uint32(m.transport.GetCurrentTime() / 1000) - from := to - oneMonthInSeconds - - wg := sync.WaitGroup{} - - for pubsubTopic, contentTopics := range groupedTopics { - wg.Add(1) - go func(pubsubTopic string, contentTopics map[types.TopicType]struct{}) { - batch := MailserverBatch{ - From: from, - To: to, - Topics: maps.Keys(contentTopics), - PubsubTopic: pubsubTopic, - } - _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { - m.logger.Info("requesting historic", zap.Any("batch", batch)) - err := m.processMailserverBatch(batch) - return nil, err - }) - if err != nil { - m.logger.Error("error performing mailserver request", zap.Any("batch", batch), zap.Error(err)) - } - wg.Done() - }(pubsubTopic, contentTopics) - } - - wg.Wait() - - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - - fetching := true - for fetching { - select { - case <-time.After(200 * time.Millisecond): - allLoaded := true - for _, c := range communities { - community, err := m.communitiesManager.GetByIDString(c.CommunityID) - if err != nil { - m.logger.Error("Error loading community", zap.Error(err)) - break - } - - if community == nil || community.Name() == "" || community.DescriptionText() == "" { - allLoaded = false - break - } - } - - if allLoaded { - fetching = false - } - - case <-ctx.Done(): - fetching = false - } - } - -} - -// forgetCommunityRequest removes community from requested ones and removes filter -func (m *Messenger) forgetCommunityRequest(communityID string) { - m.logger.Info("forgetting community request", zap.String("communityID", communityID)) - - filter, ok := m.requestedCommunities[communityID] - if !ok { - return - } - - if filter != nil { - err := m.transport.RemoveFilters([]*transport.Filter{filter}) - if err != nil { - m.logger.Warn("cant remove filter", zap.Error(err)) - } - } - - delete(m.requestedCommunities, communityID) +// fetchCommunities installs filter for community and requests its details from store node. +// When response received it will be passed through signals handler. +func (m *Messenger) fetchCommunities(communities []communities.CommunityShard) error { + return m.storeNodeRequestsManager.FetchCommunities(communities) } // passStoredCommunityInfoToSignalHandler calls signal handler with community info -func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) { +func (m *Messenger) passStoredCommunityInfoToSignalHandler(community *communities.Community) { if m.config.messengerSignalsHandler == nil { return } - - //send signal to client that message status updated - community, err := m.communitiesManager.GetByIDString(communityID) - if community == nil { - return - } - - if err != nil { - m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err)) - return - } - - //if there is no info helpful for client, we don't post it - if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 { - return - } - m.config.messengerSignalsHandler.CommunityInfoFound(community) - m.forgetCommunityRequest(communityID) } // handleCommunityDescription handles an community description diff --git a/protocol/messenger_curated_communities.go b/protocol/messenger_curated_communities.go index ea772382c..52032009f 100644 --- a/protocol/messenger_curated_communities.go +++ b/protocol/messenger_curated_communities.go @@ -133,7 +133,9 @@ func (m *Messenger) fetchCuratedCommunities(curatedCommunities *communities.Cura }) } - go m.requestCommunitiesFromMailserver(unknownCommunities) + go func() { + _ = m.fetchCommunities(unknownCommunities) + }() return response, nil } diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index 353b62795..39703193c 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -3665,13 +3665,6 @@ func (m *Messenger) HandleCommunityDescription(state *ReceivedMessageState, mess m.logger.Warn("failed to handle CommunityDescription", zap.Error(err)) return err } - - //if community was among requested ones, send its info and remove filter - for communityID := range m.requestedCommunities { - if _, ok := state.Response.communities[communityID]; ok { - m.passStoredCommunityInfoToSignalHandler(communityID) - } - } return nil } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 059bd65d2..94df00020 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -20,6 +20,11 @@ import ( "github.com/status-im/status-go/services/mailservers" ) +const ( + initialStoreNodeRequestPageSize = 4 + defaultStoreNodeRequestPageSize = 20 +) + // tolerance is how many seconds of potentially out-of-order messages we want to fetch var tolerance uint32 = 60 @@ -621,6 +626,7 @@ type work struct { contentTopics []types.TopicType cursor []byte storeCursor *types.StoreRequestCursor + limit uint32 } type messageRequester interface { @@ -632,11 +638,23 @@ type messageRequester interface { previousStoreCursor *types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, + limit uint32, waitForResponse bool, - ) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) + processEnvelopes bool, + ) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) } -func processMailserverBatch(ctx context.Context, messageRequester messageRequester, batch MailserverBatch, mailserverID []byte, logger *zap.Logger) error { +func processMailserverBatch( + ctx context.Context, + messageRequester messageRequester, + batch MailserverBatch, + mailserverID []byte, + logger *zap.Logger, + pageLimit uint32, + shouldProcessNextPage func(int) (bool, uint32), + processEnvelopes bool, +) error { + var topicStrings []string for _, t := range batch.Topics { topicStrings = append(topicStrings, t.String()) @@ -680,6 +698,7 @@ func processMailserverBatch(ctx context.Context, messageRequester messageRequest workCh <- work{ pubsubTopic: batch.PubsubTopic, contentTopics: batch.Topics[i:j], + limit: pageLimit, } time.Sleep(50 * time.Millisecond) } @@ -719,7 +738,7 @@ loop: }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) - cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, true) + cursor, storeCursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) queryCancel() @@ -729,16 +748,32 @@ loop: return } - if len(cursor) != 0 || storeCursor != nil { - logger.Debug("processBatch producer - creating work (cursor)") + processNextPage := true + nextPageLimit := pageLimit - workWg.Add(1) - workCh <- work{ - pubsubTopic: w.pubsubTopic, - contentTopics: w.contentTopics, - cursor: cursor, - storeCursor: storeCursor, - } + if shouldProcessNextPage != nil { + processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount) + } + + if !processNextPage { + return + } + + // Check the cursor after calling `shouldProcessNextPage`. + // The app might use process the fetched envelopes in the callback for own needs. + if len(cursor) == 0 && storeCursor == nil { + return + } + + logger.Debug("processBatch producer - creating work (cursor)") + + workWg.Add(1) + workCh <- work{ + pubsubTopic: w.pubsubTopic, + contentTopics: w.contentTopics, + cursor: cursor, + storeCursor: storeCursor, + limit: nextPageLimit, } }(w) case err := <-errCh: @@ -768,7 +803,16 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { return err } - return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger) + return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger, defaultStoreNodeRequestPageSize, nil, false) +} + +func (m *Messenger) processMailserverBatchWithOptions(batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error { + mailserverID, err := m.activeMailserverID() + if err != nil { + return err + } + + return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger, pageLimit, shouldProcessNextPage, processEnvelopes) } type MailserverBatch struct { diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index 9f00e287d..eddde0cfb 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -9,6 +9,7 @@ import ( "runtime" "sort" "strings" + "sync" "time" "github.com/pkg/errors" @@ -559,12 +560,11 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e } // Query mailserver go func() { - _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(false) }) + _, err := m.RequestAllHistoricMessagesWithRetries(false) if err != nil { - m.logger.Error("could not perform mailserver request", zap.Error(err)) + m.logger.Error("failed to request historic messages", zap.Error(err)) } }() - } else { m.mailPeersMutex.Unlock() } @@ -741,3 +741,46 @@ func (m *Messenger) disconnectStorenodeIfRequired() error { return nil } + +func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool { + // Add 1 second to timeout, because the mailserver cycle has 1 second ticker, which doesn't tick on start. + // This can be improved after merging https://github.com/status-im/status-go/pull/4380. + // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately + timeout += time.Second + + finish := make(chan struct{}) + cancel := make(chan struct{}) + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer func() { + wg.Done() + }() + for !m.isActiveMailserverAvailable() { + select { + case <-m.SubscribeMailserverAvailable(): + case <-cancel: + return + } + } + }() + + go func() { + defer func() { + close(finish) + }() + wg.Wait() + }() + + select { + case <-finish: + case <-time.After(timeout): + close(cancel) + case <-m.ctx.Done(): + close(cancel) + } + + return m.isActiveMailserverAvailable() +} diff --git a/protocol/messenger_mailserver_processMailserverBatch_test.go b/protocol/messenger_mailserver_processMailserverBatch_test.go index 76d0e06b5..c78ac6757 100644 --- a/protocol/messenger_mailserver_processMailserverBatch_test.go +++ b/protocol/messenger_mailserver_processMailserverBatch_test.go @@ -43,8 +43,10 @@ func (t *mockTransport) SendMessagesRequestForTopics( previousStoreCursor *types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, + limit uint32, waitForResponse bool, -) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) { + processEnvelopes bool, +) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) { var response queryResponse if previousCursor == nil { initialResponse := getInitialResponseKey(contentTopics) @@ -52,7 +54,7 @@ func (t *mockTransport) SendMessagesRequestForTopics( } else { response = t.queryResponses[hex.EncodeToString(previousCursor)] } - return response.cursor, nil, response.err + return response.cursor, nil, 0, response.err } func (t *mockTransport) Populate(topics []types.TopicType, responses int, includeRandomError bool) error { @@ -130,7 +132,7 @@ func TestProcessMailserverBatchHappyPath(t *testing.T) { Topics: topics, } - err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger) + err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) require.NoError(t, err) } @@ -151,6 +153,6 @@ func TestProcessMailserverBatchFailure(t *testing.T) { Topics: topics, } - err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger) + err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) require.Error(t, err) } diff --git a/protocol/messenger_store_node_request_manager.go b/protocol/messenger_store_node_request_manager.go new file mode 100644 index 000000000..d85d868e3 --- /dev/null +++ b/protocol/messenger_store_node_request_manager.go @@ -0,0 +1,334 @@ +package protocol + +import ( + "fmt" + "math" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/communities" + "github.com/status-im/status-go/protocol/transport" +) + +const ( + storeNodeAvailableTimeout = 30 * time.Second +) + +// FetchCommunityStats is used in tests +type FetchCommunityStats struct { + FetchedEnvelopesCount int + FetchedPagesCount int +} + +type StoreNodeRequestManager struct { + messenger *Messenger + logger *zap.Logger + + // activeRequests contain all ongoing store node requests. + // Map is indexed with `CommunityID`. + // Request might be duplicated in the map if the request is for multiple communities. + activeRequests map[string]*storeNodeRequest + + // activeRequestsLock should be locked each time activeRequests is being accessed or changed. + activeRequestsLock sync.RWMutex + + onPerformingBatch func(MailserverBatch) +} + +func NewCommunityRequestsManager(m *Messenger) *StoreNodeRequestManager { + return &StoreNodeRequestManager{ + messenger: m, + logger: m.logger.Named("StoreNodeRequestManager"), + activeRequests: map[string]*storeNodeRequest{}, + activeRequestsLock: sync.RWMutex{}, + onPerformingBatch: nil, + } +} + +// FetchCommunity makes a single request to store node for a given community id/shard pair. +// When a community is successfully fetched, a `CommunityFound` event will be emitted. If `waitForResponse == true`, +// the function will also wait for the store node response and return the fetched community. +// Automatically waits for an available store node. +// When a `nil` community and `nil` error is returned, that means the community wasn't found at the store node. +func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, waitForResponse bool) (*communities.Community, FetchCommunityStats, error) { + m.logger.Info("requesting community from store node", + zap.Any("community", community), + zap.Bool("waitForResponse", waitForResponse)) + + channel, err := m.subscribeToCommunityRequest(community) + if err != nil { + return nil, FetchCommunityStats{}, fmt.Errorf("failed to create a request for community: %w", err) + } + + if !waitForResponse { + return nil, FetchCommunityStats{}, nil + } + + result := <-channel + return result.community, result.stats, result.err +} + +// FetchCommunities makes a FetchCommunity for each element in given `communities` list. +// For each successfully fetched community, a `CommunityFound` event will be emitted. Ability to subscribe +// to results is not provided, because it's not needed and would complicate the code. `FetchCommunity` can +// be called directly if such functionality is needed. +// +// This function intentionally doesn't fetch multiple content topics in a single store node request. For now +// FetchCommunities is only used for regular (once in 2 minutes) fetching of curated communities. If one of +// those content topics is spammed with to many envelopes, then on each iteration we will have to fetch all +// of this spam first to get the envelopes in other content topics. To avoid this we keep independent requests +// for each content topic. +func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard) error { + m.logger.Info("requesting communities from store node", zap.Any("communities", communities)) + + var outErr error + + for _, community := range communities { + _, _, err := m.FetchCommunity(community, false) + if err != nil { + outErr = fmt.Errorf("%sfailed to create a request for community %s: %w", outErr, community.CommunityID, err) + } + } + + return outErr +} + +// subscribeToCommunityRequest checks if a request for given community is already in progress, creates and installs +// a new one if not found, and returns a subscription to the result of the found/started request. +// The subscription can then be used to get the result of the request, this could be either a community or an error. +func (m *StoreNodeRequestManager) subscribeToCommunityRequest(community communities.CommunityShard) (communitySubscriptionChannel, error) { + // It's important to unlock only after getting the subscription channel. + // We also lock `activeRequestsLock` during finalizing the requests. This ensures that the subscription + // created in this function will get the result even if the requests proceeds faster than this function ends. + m.activeRequestsLock.Lock() + defer m.activeRequestsLock.Unlock() + + request, requestFound := m.activeRequests[community.CommunityID] + + if !requestFound { + // Create corresponding filter + filter, filterCreated, err := m.getFilter(community) + if err != nil { + return nil, fmt.Errorf("failed to create community filter: %w", err) + } + + request = m.newStoreNodeRequest() + request.pubsubTopic = filter.PubsubTopic + request.communityID = community.CommunityID + request.contentTopic = filter.ContentTopic + if filterCreated { + request.filterToForget = filter + } + + m.activeRequests[community.CommunityID] = request + request.start() + } + + return request.subscribe(), nil +} + +// newStoreNodeRequest creates a new storeNodeRequest struct +func (m *StoreNodeRequestManager) newStoreNodeRequest() *storeNodeRequest { + return &storeNodeRequest{ + manager: m, + subscriptions: make([]communitySubscriptionChannel, 0), + } +} + +// getFilter checks if a filter for a given community is already created and creates one of not found. +// Returns the found/created filter, a flag if the filter was created by the function and an error. +func (m *StoreNodeRequestManager) getFilter(c communities.CommunityShard) (*transport.Filter, bool, error) { + // First check if such filter already exists. + filter := m.messenger.transport.FilterByChatID(c.CommunityID) + if filter != nil { + //we don't remember filter id associated with community because it was already installed + return filter, false, nil + } + + // If filter wasn't installed we create it and + // remember for uninstalling after response is received + filters, err := m.messenger.transport.InitPublicFilters([]transport.FiltersToInitialize{{ + ChatID: c.CommunityID, + PubsubTopic: c.Shard.PubsubTopic(), + }}) + + if err != nil { + m.logger.Error("can't install filter for community", zap.Error(err)) + return nil, true, err + } + + if len(filters) != 1 { + m.logger.Error("Unexpected number of filters created") + return nil, true, fmt.Errorf("unexepcted number of filters created") + } + + return filters[0], true, nil +} + +// forgetFilter uninstalls the given filter +func (m *StoreNodeRequestManager) forgetFilter(filter *transport.Filter) { + err := m.messenger.transport.RemoveFilters([]*transport.Filter{filter}) + if err != nil { + m.logger.Warn("failed to remove filter", zap.Error(err)) + } +} + +// storeNodeRequest represents a single store node batch request. +// For a valid storeNodeRequest to be performed, the user must set all the struct fields and call start method. +type storeNodeRequest struct { + // request parameters + pubsubTopic string + contentTopic types.TopicType + + // request corresponding metadata to be used in finalize + filterToForget *transport.Filter + communityID string + + // internal fields + manager *StoreNodeRequestManager + subscriptions []communitySubscriptionChannel + result fetchCommunityResult +} + +// fetchCommunityResult contains result of a single storeNodeRequest +// If any error occurs during the request, err field will be set. +// If a community was successfully fetched, community field will contain the fetched information. +// If a community wasn't found in store node, then a community will be set to `nil`. +// stats will contain information about the performed request that might be useful for testing. +type fetchCommunityResult struct { + err error + community *communities.Community + stats FetchCommunityStats +} + +type communitySubscriptionChannel = chan fetchCommunityResult + +func (r *storeNodeRequest) subscribe() communitySubscriptionChannel { + channel := make(communitySubscriptionChannel, 100) + r.subscriptions = append(r.subscriptions, channel) + return channel +} + +func (r *storeNodeRequest) finalize() { + r.manager.activeRequestsLock.Lock() + defer r.manager.activeRequestsLock.Unlock() + + r.manager.logger.Info("request finished", + zap.String("communityID", r.communityID), + zap.Bool("communityFound", r.result.community != nil), + zap.Error(r.result.err)) + + // Send the result to subscribers + // It's important that this is done with `activeRequestsLock` locked. + for _, s := range r.subscriptions { + s <- r.result + close(s) + } + + if r.result.community != nil { + r.manager.messenger.passStoredCommunityInfoToSignalHandler(r.result.community) + } + + delete(r.manager.activeRequests, r.communityID) + + if r.filterToForget != nil { + r.manager.forgetFilter(r.filterToForget) + } +} + +func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32) { + logger := r.manager.logger.With( + zap.String("communityID", r.communityID), + zap.Int("envelopesCount", envelopesCount)) + + r.result.stats.FetchedEnvelopesCount += envelopesCount + r.result.stats.FetchedPagesCount++ + + // Force all received envelopes to be processed + r.manager.messenger.ProcessAllMessages() + + // Try to get community from database + community, err := r.manager.messenger.communitiesManager.GetByIDString(r.communityID) + + if err != nil { + logger.Error("failed to read from database", + zap.String("communityID", r.communityID), + zap.Error(err)) + r.result = fetchCommunityResult{ + community: nil, + err: fmt.Errorf("failed to read from database: %w", err), + } + return false, 0 // failed to read from database, no sense to continue the procedure + } + + if community == nil { + // community not found in the database, request next page + logger.Debug("community still not fetched") + return true, defaultStoreNodeRequestPageSize + } + + logger.Debug("community found", + zap.String("displayName", community.Name())) + + r.result.community = community + + return false, 0 +} + +func (r *storeNodeRequest) routine() { + r.manager.logger.Info("starting store node request", + zap.String("communityID", r.communityID), + zap.String("pubsubTopic", r.pubsubTopic), + zap.Any("contentTopic", r.contentTopic), + ) + + // Return a nil community and no error when request was + // performed successfully, but no community found. + r.result = fetchCommunityResult{ + community: nil, + err: nil, + } + + defer func() { + r.finalize() + }() + + if !r.manager.messenger.waitForAvailableStoreNode(storeNodeAvailableTimeout) { + r.result.community = nil + r.result.err = fmt.Errorf("store node is not available") + return + } + + to := uint32(math.Ceil(float64(r.manager.messenger.GetCurrentTimeInMillis()) / 1000)) + from := to - oneMonthInSeconds + + _, err := r.manager.messenger.performMailserverRequest(func() (*MessengerResponse, error) { + batch := MailserverBatch{ + From: from, + To: to, + PubsubTopic: r.pubsubTopic, + Topics: []types.TopicType{r.contentTopic}, + } + r.manager.logger.Info("perform store node request", zap.Any("batch", batch)) + if r.manager.onPerformingBatch != nil { + r.manager.onPerformingBatch(batch) + } + + return nil, r.manager.messenger.processMailserverBatchWithOptions(batch, initialStoreNodeRequestPageSize, r.shouldFetchNextPage, true) + }) + + r.result.err = err +} + +func (r *storeNodeRequest) start() { + r.manager.logger.Debug("starting new community request", + zap.Any("communities", r.communityID), + zap.String("pubsubTopic", r.pubsubTopic), + zap.Any("contentTopic", r.contentTopic), + ) + + go r.routine() +} diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index 57c88eee3..a262386ef 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -1,20 +1,25 @@ package protocol import ( + "context" + "sync" "testing" "time" + gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" + "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/communities" + "github.com/status-im/status-go/protocol/tt" + "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/status-go/appdatabase" - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" - "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/t/helpers" mailserversDB "github.com/status-im/status-go/services/mailservers" @@ -22,8 +27,10 @@ import ( ) const ( - localFleet = "local-test-fleet-1" - localMailserverID = "local-test-mailserver" + localFleet = "local-test-fleet-1" + localMailserverID = "local-test-mailserver" + storeNodeConnectTimeout = 500 * time.Millisecond + runLocalTests = false ) func TestMessengerStoreNodeRequestSuite(t *testing.T) { @@ -33,16 +40,68 @@ func TestMessengerStoreNodeRequestSuite(t *testing.T) { type MessengerStoreNodeRequestSuite struct { suite.Suite + cancel chan struct{} + owner *Messenger bob *Messenger - wakuStoreNode *waku2.Waku - ownerWaku types.Waku - bobWaku types.Waku + wakuStoreNode *waku2.Waku + storeNodeAddress string + + ownerWaku types.Waku + bobWaku types.Waku logger *zap.Logger } +func (s *MessengerStoreNodeRequestSuite) SetupTest() { + cfg := zap.NewDevelopmentConfig() + cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + cfg.Development = false + cfg.DisableStacktrace = true + s.logger = tt.MustCreateTestLoggerWithConfig(cfg) + + s.cancel = make(chan struct{}, 10) + + storeNodeLogger := s.logger.With(zap.String("name", "store-node-waku")) + s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true) + + storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses() + s.Require().LessOrEqual(1, len(storeNodeListenAddresses)) + + s.storeNodeAddress = storeNodeListenAddresses[0] + s.logger.Info("store node ready", zap.String("address", s.storeNodeAddress)) +} + +func (s *MessengerStoreNodeRequestSuite) TearDown() { + close(s.cancel) + s.wakuStoreNode.Stop() // nolint: errcheck + s.owner.Shutdown() // nolint: errcheck + s.bob.Shutdown() // nolint: errcheck +} + +func (s *MessengerStoreNodeRequestSuite) createOwner() { + wakuLogger := s.logger.Named("owner-waku-node") + wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false) + s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2) + + messengerLogger := s.logger.Named("owner-messenger") + s.owner = s.newMessenger(s.ownerWaku, messengerLogger, s.storeNodeAddress) + + // We force the owner to use the store node as relay peer + err := s.owner.DialPeer(s.storeNodeAddress) + s.Require().NoError(err) +} + +func (s *MessengerStoreNodeRequestSuite) createBob() { + wakuLogger := s.logger.Named("bob-waku-node") + wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false) + s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2) + + messengerLogger := s.logger.Named("bob-messenger") + s.bob = s.newMessenger(s.bobWaku, messengerLogger, s.storeNodeAddress) +} + func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress string) *Messenger { privateKey, err := crypto.GenerateKey() s.Require().NoError(err) @@ -72,51 +131,14 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za return messenger } -func (s *MessengerStoreNodeRequestSuite) SetupTest() { - s.logger = tt.MustCreateTestLogger() - - // Create store node - - storeNodeLogger := s.logger.With(zap.String("name", "store-node-waku")) - s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true) - - storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses() - s.Require().LessOrEqual(1, len(storeNodeListenAddresses)) - - storeNodeAddress := storeNodeListenAddresses[0] - s.logger.Info("store node ready", zap.String("address", storeNodeAddress)) - - // Create community owner - - ownerWakuLogger := s.logger.With(zap.String("name", "owner-waku")) - s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, ownerWakuLogger, true, false)) - - ownerLogger := s.logger.With(zap.String("name", "owner")) - s.owner = s.newMessenger(s.ownerWaku, ownerLogger, storeNodeAddress) - - // Create an independent user - - bobWakuLogger := s.logger.With(zap.String("name", "owner-waku")) - s.bobWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, bobWakuLogger, true, false)) - - bobLogger := s.logger.With(zap.String("name", "bob")) - s.bob = s.newMessenger(s.bobWaku, bobLogger, storeNodeAddress) - s.bob.StartRetrieveMessagesLoop(time.Second, nil) - - // Connect owner to storenode so message is stored - err := s.ownerWaku.DialPeer(storeNodeAddress) - s.Require().NoError(err) -} - -func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() { - WaitForAvailableStoreNode(&s.Suite, s.owner, time.Second) +func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community { createCommunityRequest := &requests.CreateCommunity{ - Name: "panda-lovers", - Description: "we love pandas", + Name: RandomLettersString(10), + Description: RandomLettersString(20), + Color: RandomColor(), + Tags: RandomCommunityTags(3), Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, - Color: "#ff0000", - Tags: []string{"Web3"}, } response, err := s.owner.CreateCommunity(createCommunityRequest, true) @@ -124,20 +146,295 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() { s.Require().NotNil(response) s.Require().Len(response.Communities(), 1) - community := response.Communities()[0] - communityID := community.IDString() + return response.Communities()[0] +} - WaitForAvailableStoreNode(&s.Suite, s.bob, time.Second) +func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(fetchedCommunity *communities.Community, expectedCommunityInfo *communities.Community) { + s.Require().Equal(expectedCommunityInfo.Name(), fetchedCommunity.Name()) + s.Require().Equal(expectedCommunityInfo.Identity().Description, fetchedCommunity.Identity().Description) + s.Require().Equal(expectedCommunityInfo.Color(), fetchedCommunity.Color()) + s.Require().Equal(expectedCommunityInfo.Tags(), fetchedCommunity.Tags()) +} +func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunityInfo *communities.Community) FetchCommunityStats { + fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, true) + + s.Require().NoError(err) + s.Require().NotNil(fetchedCommunity) + s.Require().Equal(communityShard.CommunityID, fetchedCommunity.IDString()) + + if expectedCommunityInfo != nil { + s.requireCommunitiesEqual(fetchedCommunity, expectedCommunityInfo) + } + + return stats +} + +func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Messenger) { + WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() { + s.createOwner() + s.createBob() + + s.waitForAvailableStoreNode(s.owner) + community := s.createCommunity(s.owner) + + s.waitForAvailableStoreNode(s.bob) + s.fetchCommunity(s.bob, community.CommunityShard(), community) +} + +func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() { + s.createOwner() + s.createBob() + + s.waitForAvailableStoreNode(s.owner) + community := s.createCommunity(s.owner) + + // Test consecutive requests to check that requests in manager are finalized + for i := 0; i < 2; i++ { + s.waitForAvailableStoreNode(s.bob) + s.fetchCommunity(s.bob, community.CommunityShard(), community) + } +} + +func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() { + s.createOwner() + s.createBob() + + s.waitForAvailableStoreNode(s.owner) + community := s.createCommunity(s.owner) + + storeNodeRequestsCount := 0 + s.bob.storeNodeRequestsManager.onPerformingBatch = func(batch MailserverBatch) { + storeNodeRequestsCount++ + } + + s.waitForAvailableStoreNode(s.bob) + + wg := sync.WaitGroup{} + + // Make 2 simultaneous fetch requests, only 1 request to store node is expected + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + s.fetchCommunity(s.bob, community.CommunityShard(), community) + }() + } + + wg.Wait() + s.Require().Equal(1, storeNodeRequestsCount) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestNonExistentCommunity() { + // On test start store node database is empty, so just request any valid community ID. request := FetchCommunityRequest{ - CommunityKey: communityID, + CommunityKey: "0x036dc11a0663f88e15912f0adb68c3c5f68ca0ca7a233f1a88ff923a3d39b2cf07", Shard: nil, TryDatabase: false, WaitForResponse: true, } + s.createBob() + + s.waitForAvailableStoreNode(s.bob) fetchedCommunity, err := s.bob.FetchCommunity(&request) + + s.Require().NoError(err) + s.Require().Nil(fetchedCommunity) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDisconnected() { + s.createOwner() + s.createBob() + + s.waitForAvailableStoreNode(s.owner) + community := s.createCommunity(s.owner) + + // WaitForAvailableStoreNode is done internally + s.fetchCommunity(s.bob, community.CommunityShard(), community) +} + +// This test is intended to only run locally to test how fast is a big community fetched +// Shouldn't be executed in CI, because it relies on connection to status.prod store nodes. +func (s *MessengerStoreNodeRequestSuite) TestRequestBigCommunity() { + if !runLocalTests { + return + } + + // Status CC community + const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a" + + communityShard := communities.CommunityShard{ + CommunityID: communityID, + Shard: nil, + } + + wakuLogger := s.logger.Named("user-waku-node") + messengerLogger := s.logger.Named("user-messenger") + + wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false) + userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2) + + privateKey, err := crypto.GenerateKey() + s.Require().NoError(err) + + mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + s.Require().NoError(err) + + mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) + s.Require().NoError(err) + + options := []Option{ + WithMailserversDatabase(mailserversDatabase), + WithClusterConfig(params.ClusterConfig{ + Fleet: params.FleetStatusProd, + }), + } + + // Create bob without `createBob` without func to force status.prod fleet + s.bob, err = newMessengerWithKey(userWaku, privateKey, messengerLogger, options) + s.Require().NoError(err) + + fetchedCommunity, stats, err := s.bob.storeNodeRequestsManager.FetchCommunity(communityShard, true) + s.Require().NoError(err) s.Require().NotNil(fetchedCommunity) s.Require().Equal(communityID, fetchedCommunity.IDString()) + + s.Require().Equal(initialStoreNodeRequestPageSize, stats.FetchedEnvelopesCount) + s.Require().Equal(1, stats.FetchedPagesCount) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() { + s.createOwner() + s.createBob() + + // Create a community + s.waitForAvailableStoreNode(s.owner) + community := s.createCommunity(s.owner) + + // Push spam to the same ContentTopic & PubsubTopic + // The first requested page size is 1. All subsequent pages are limited to 20. + // We want to test the algorithm, so we push 21 spam envelopes. + for i := 0; i < defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize; i++ { + spamMessage := common.RawMessage{ + Payload: RandomBytes(16), + Sender: community.PrivateKey(), + SkipEncryptionLayer: true, + MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, + PubsubTopic: community.PubsubTopic(), + } + _, err := s.owner.sender.SendPublic(context.Background(), community.IDString(), spamMessage) + s.Require().NoError(err) + } + + // Fetch the community + stats := s.fetchCommunity(s.bob, community.CommunityShard(), community) + + // Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched. + // First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description, + // then we fetch another page of data and successfully decrypt a community description. + s.Require().Equal(defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize+2, stats.FetchedEnvelopesCount) + s.Require().Equal(3, stats.FetchedPagesCount) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() { + s.createOwner() + s.createBob() + + // Create a community + s.waitForAvailableStoreNode(s.owner) + community1 := s.createCommunity(s.owner) + community2 := s.createCommunity(s.owner) + + description2, err := community2.MarshaledDescription() + s.Require().NoError(err) + + // Push community2 description to the same ContentTopic & PubsubTopic as community1. + // This way we simulate 2 communities with same ContentTopic. + spamMessage := common.RawMessage{ + Payload: description2, + Sender: community2.PrivateKey(), + SkipEncryptionLayer: true, + MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, + PubsubTopic: community1.PubsubTopic(), + } + _, err = s.owner.sender.SendPublic(context.Background(), community1.IDString(), spamMessage) + s.Require().NoError(err) + + // Fetch the community + s.fetchCommunity(s.bob, community1.CommunityShard(), community1) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestMultipleCommunities() { + s.createOwner() + s.createBob() + + // Create 2 communities + s.waitForAvailableStoreNode(s.owner) + community1 := s.createCommunity(s.owner) + community2 := s.createCommunity(s.owner) + + fetchedCommunities := map[string]*communities.Community{} + + err := WaitOnSignaledCommunityFound(s.bob, + func() { + err := s.bob.fetchCommunities([]communities.CommunityShard{ + community1.CommunityShard(), + community2.CommunityShard(), + }) + s.Require().NoError(err) + }, + func(community *communities.Community) bool { + fetchedCommunities[community.IDString()] = community + return len(fetchedCommunities) == 2 + }, + 1*time.Second, + "communities were not signalled in time", + ) + + s.Require().NoError(err) + s.Require().Contains(fetchedCommunities, community1.IDString()) + s.Require().Contains(fetchedCommunities, community2.IDString()) +} + +func (s *MessengerStoreNodeRequestSuite) TestRequestWithoutWaitingResponse() { + s.createOwner() + s.createBob() + + // Create a community + s.waitForAvailableStoreNode(s.owner) + community := s.createCommunity(s.owner) + + request := FetchCommunityRequest{ + CommunityKey: community.IDString(), + Shard: nil, + TryDatabase: false, + WaitForResponse: false, + } + + fetchedCommunities := map[string]*communities.Community{} + + err := WaitOnSignaledCommunityFound(s.bob, + func() { + fetchedCommunity, err := s.bob.FetchCommunity(&request) + s.Require().NoError(err) + s.Require().Nil(fetchedCommunity) + }, + func(community *communities.Community) bool { + fetchedCommunities[community.IDString()] = community + return len(fetchedCommunities) == 1 + }, + 1*time.Second, + "communities weren't signalled", + ) + + s.Require().NoError(err) + s.Require().Len(fetchedCommunities, 1) + s.Require().Contains(fetchedCommunities, community.IDString()) + + s.requireCommunitiesEqual(fetchedCommunities[community.IDString()], community) } diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 9623c92df..c5381db18 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -2,12 +2,16 @@ package protocol import ( "context" + "crypto/rand" "database/sql" "errors" + "math/big" "os" "sync" "time" + "golang.org/x/exp/maps" + "go.uber.org/zap" "github.com/stretchr/testify/suite" @@ -18,7 +22,9 @@ import ( gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" + "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/t/helpers" waku2 "github.com/status-im/status-go/wakuv2" @@ -26,6 +32,9 @@ import ( const testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im" +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +var hexRunes = []rune("0123456789abcdef") + // WaitOnMessengerResponse Wait until the condition is true or the timeout is reached. func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) { response := &MessengerResponse{} @@ -51,7 +60,8 @@ func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bo type MessengerSignalsHandlerMock struct { MessengerSignalsHandler - responseChan chan *MessengerResponse + responseChan chan *MessengerResponse + communityFoundChan chan *communities.Community } func (m *MessengerSignalsHandlerMock) MessengerResponse(response *MessengerResponse) { @@ -64,6 +74,13 @@ func (m *MessengerSignalsHandlerMock) MessengerResponse(response *MessengerRespo func (m *MessengerSignalsHandlerMock) MessageDelivered(chatID string, messageID string) {} +func (m *MessengerSignalsHandlerMock) CommunityInfoFound(community *communities.Community) { + select { + case m.communityFoundChan <- community: + default: + } +} + func WaitOnSignaledMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) { interval := 500 * time.Millisecond timeoutChan := time.After(10 * time.Second) @@ -103,6 +120,38 @@ func WaitOnSignaledMessengerResponse(m *Messenger, condition func(*MessengerResp } } +func WaitOnSignaledCommunityFound(m *Messenger, action func(), condition func(community *communities.Community) bool, timeout time.Duration, errorMessage string) error { + timeoutChan := time.After(timeout) + + if m.config.messengerSignalsHandler != nil { + return errors.New("messengerSignalsHandler already provided/mocked") + } + + communityFoundChan := make(chan *communities.Community, 1) + m.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{ + communityFoundChan: communityFoundChan, + } + + defer func() { + m.config.messengerSignalsHandler = nil + }() + + // Call the action after setting up the mock + action() + + // Wait for condition after + for { + select { + case c := <-communityFoundChan: + if condition(c) { + return nil + } + case <-timeoutChan: + return errors.New("timed out: " + errorMessage) + } + } +} + func FindFirstByContentType(messages []*common.Message, contentType protobuf.ChatMessage_ContentType) *common.Message { for _, message := range messages { if message.ContentType == contentType { @@ -200,39 +249,8 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim } func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Duration) { - finish := make(chan struct{}) - cancel := make(chan struct{}) - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer func() { - wg.Done() - }() - for !m.isActiveMailserverAvailable() { - select { - case <-m.SubscribeMailserverAvailable(): - case <-cancel: - return - } - } - }() - - go func() { - defer func() { - close(finish) - }() - wg.Wait() - }() - - select { - case <-finish: - case <-time.After(timeout): - close(cancel) - } - - s.Require().True(m.isActiveMailserverAvailable()) + available := m.waitForAvailableStoreNode(timeout) + s.Require().True(available) } func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool) *waku2.Waku { @@ -333,3 +351,54 @@ func TearDownMessenger(s *suite.Suite, m *Messenger) { s.Require().NoError(m.multiAccounts.Close()) } } + +func randomInt(length int) int { + max := big.NewInt(int64(length)) + value, err := rand.Int(rand.Reader, max) + if err != nil { + panic(err) + } + return int(value.Int64()) +} + +func randomString(length int, runes []rune) string { + out := make([]rune, length) + for i := range out { + out[i] = runes[randomInt(len(runes))] // nolint: gosec + } + return string(out) +} + +func RandomLettersString(length int) string { + return randomString(length, letterRunes) +} + +func RandomColor() string { + return "#" + randomString(6, hexRunes) +} + +func RandomCommunityTags(count int) []string { + all := maps.Keys(requests.TagsEmojies) + tags := make([]string, 0, count) + indexes := map[int]struct{}{} + + for len(indexes) != count { + index := randomInt(len(all)) + indexes[index] = struct{}{} + } + + for index := range indexes { + tags = append(tags, all[index]) + } + + return tags +} + +func RandomBytes(length int) []byte { + out := make([]byte, length) + _, err := rand.Read(out) + if err != nil { + panic(err) + } + return out +} diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 76495ea09..0fe6fa0f0 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -469,7 +469,7 @@ func (t *Transport) createMessagesRequestV1( topics []types.TopicType, waitForResponse bool, ) (cursor []byte, err error) { - r := createMessagesRequest(from, to, previousCursor, nil, "", topics) + r := createMessagesRequest(from, to, previousCursor, nil, "", topics, 1000) events := make(chan types.EnvelopeEvent, 10) sub := t.waku.SubscribeEnvelopeEvents(events) @@ -502,33 +502,37 @@ func (t *Transport) createMessagesRequestV2( previousStoreCursor *types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, + limit uint32, waitForResponse bool, -) (storeCursor *types.StoreRequestCursor, err error) { - r := createMessagesRequest(from, to, nil, previousStoreCursor, pubsubTopic, contentTopics) + processEnvelopes bool, +) (storeCursor *types.StoreRequestCursor, envelopesCount int, err error) { + r := createMessagesRequest(from, to, nil, previousStoreCursor, pubsubTopic, contentTopics, limit) if waitForResponse { resultCh := make(chan struct { - storeCursor *types.StoreRequestCursor - err error + storeCursor *types.StoreRequestCursor + envelopesCount int + err error }) go func() { - storeCursor, err = t.waku.RequestStoreMessages(ctx, peerID, r) + storeCursor, envelopesCount, err = t.waku.RequestStoreMessages(ctx, peerID, r, processEnvelopes) resultCh <- struct { - storeCursor *types.StoreRequestCursor - err error - }{storeCursor, err} + storeCursor *types.StoreRequestCursor + envelopesCount int + err error + }{storeCursor, envelopesCount, err} }() select { case result := <-resultCh: - return result.storeCursor, result.err + return result.storeCursor, result.envelopesCount, result.err case <-ctx.Done(): - return nil, ctx.Err() + return nil, 0, ctx.Err() } } else { go func() { - _, err = t.waku.RequestStoreMessages(ctx, peerID, r) + _, _, err = t.waku.RequestStoreMessages(ctx, peerID, r, false) if err != nil { t.logger.Error("failed to request store messages", zap.Error(err)) } @@ -546,11 +550,13 @@ func (t *Transport) SendMessagesRequestForTopics( previousStoreCursor *types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, + limit uint32, waitForResponse bool, -) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) { + processEnvelopes bool, +) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) { switch t.waku.Version() { case 2: - storeCursor, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, waitForResponse) + storeCursor, envelopesCount, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes) case 1: cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, contentTopics, waitForResponse) default: @@ -559,7 +565,7 @@ func (t *Transport) SendMessagesRequestForTopics( return } -func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType) types.MessagesRequest { +func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest { aUUID := uuid.New() // uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest id := []byte(hex.EncodeToString(aUUID[:])) @@ -571,7 +577,7 @@ func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.St ID: id, From: from, To: to, - Limit: 1000, + Limit: limit, Cursor: cursor, PubsubTopic: pubsubTopic, ContentTopics: topicBytes, diff --git a/protocol/tt/logger.go b/protocol/tt/logger.go index 5c0f1ce98..ae4dad0d3 100644 --- a/protocol/tt/logger.go +++ b/protocol/tt/logger.go @@ -12,13 +12,16 @@ var registerOnce sync.Once // MustCreateTestLogger returns a logger based on the passed flags. func MustCreateTestLogger() *zap.Logger { + cfg := zap.NewDevelopmentConfig() + return MustCreateTestLoggerWithConfig(cfg) +} + +func MustCreateTestLoggerWithConfig(cfg zap.Config) *zap.Logger { registerOnce.Do(func() { if err := zaputil.RegisterConsoleHexEncoder(); err != nil { panic(err) } }) - - cfg := zap.NewDevelopmentConfig() cfg.Encoding = "console-hex" l, err := cfg.Build() if err != nil { diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 00fb0f09d..9020e633f 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -669,7 +669,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P } return case env := <-sub[0].Ch: - err := w.OnNewEnvelopes(env, common.RelayedMessageType) + err := w.OnNewEnvelopes(env, common.RelayedMessageType, false) if err != nil { w.logger.Error("OnNewEnvelopes error", zap.Error(err)) } @@ -1121,7 +1121,7 @@ func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, to return w.node.Store().Query(ctx, query, opts...) } -func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *storepb.Index, err error) { +func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption, processEnvelopes bool) (cursor *storepb.Index, envelopesCount int, err error) { requestID := protocol.GenerateRequestID() opts = append(opts, store.WithRequestID(requestID)) pubsubTopic = w.getPubsubTopic(pubsubTopic) @@ -1131,9 +1131,11 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to if w.onHistoricMessagesRequestFailed != nil { w.onHistoricMessagesRequestFailed(requestID, peerID, err) } - return nil, err + return nil, 0, err } + envelopesCount = len(result.Messages) + for _, msg := range result.Messages { // Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending // See https://github.com/vacp2p/rfc/issues/563 @@ -1141,9 +1143,10 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic)) - err = w.OnNewEnvelopes(envelope, common.StoreMessageType) + + err = w.OnNewEnvelopes(envelope, common.StoreMessageType, processEnvelopes) if err != nil { - return nil, err + return nil, 0, err } } @@ -1256,7 +1259,7 @@ func (w *Waku) Start() error { w.filterManager = newFilterManager(w.ctx, w.logger, func(id string) *common.Filter { return w.GetFilter(id) }, w.settings, - func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType) }, + func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) }, w.node) w.wg.Add(1) @@ -1270,7 +1273,7 @@ func (w *Waku) Start() error { numCPU := runtime.NumCPU() for i := 0; i < numCPU; i++ { - go w.processQueue() + go w.processQueueLoop() } go w.broadcast() @@ -1335,7 +1338,7 @@ func (w *Waku) Stop() error { return nil } -func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType) error { +func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil } @@ -1358,7 +1361,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag logger.Debug("received new envelope") trouble := false - _, err := w.add(recvMessage) + _, err := w.add(recvMessage, processImmediately) if err != nil { logger.Info("invalid envelope received", zap.Error(err)) trouble = true @@ -1382,7 +1385,7 @@ func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) { w.poolMu.Unlock() } -func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) { +func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) (bool, error) { common.EnvelopesReceivedCounter.Inc() hash := recvMessage.Hash() @@ -1407,8 +1410,13 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) { } if !alreadyCached || !envelope.Processed.Load() { - logger.Debug("waku: posting event") - w.postEvent(recvMessage) // notify the local node about the new message + if processImmediately { + logger.Debug("immediately processing envelope") + w.processReceivedMessage(recvMessage) + } else { + logger.Debug("posting event") + w.postEvent(recvMessage) // notify the local node about the new message + } } return true, nil @@ -1419,49 +1427,54 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope } -// processQueue delivers the messages to the watchers during the lifetime of the waku node. -func (w *Waku) processQueue() { +// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. +func (w *Waku) processQueueLoop() { for { select { case <-w.ctx.Done(): return case e := <-w.msgQueue: - logger := w.logger.With( - zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())), - zap.String("pubsubTopic", e.PubsubTopic), - zap.String("contentTopic", e.ContentTopic.ContentTopic()), - zap.Int64("timestamp", e.Envelope.Message().GetTimestamp()), - ) - if e.MsgType == common.StoreMessageType { - // We need to insert it first, and then remove it if not matched, - // as messages are processed asynchronously - w.storeMsgIDsMu.Lock() - w.storeMsgIDs[e.Hash()] = true - w.storeMsgIDsMu.Unlock() - } - - matched := w.filters.NotifyWatchers(e) - - // If not matched we remove it - if !matched { - logger.Debug("filters did not match") - w.storeMsgIDsMu.Lock() - delete(w.storeMsgIDs, e.Hash()) - w.storeMsgIDsMu.Unlock() - } else { - logger.Debug("filters did match") - e.Processed.Store(true) - } - - w.envelopeFeed.Send(common.EnvelopeEvent{ - Topic: e.ContentTopic, - Hash: e.Hash(), - Event: common.EventEnvelopeAvailable, - }) + w.processReceivedMessage(e) } } } +func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) { + logger := w.logger.With( + zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())), + zap.String("pubsubTopic", e.PubsubTopic), + zap.String("contentTopic", e.ContentTopic.ContentTopic()), + zap.Int64("timestamp", e.Envelope.Message().GetTimestamp()), + ) + + if e.MsgType == common.StoreMessageType { + // We need to insert it first, and then remove it if not matched, + // as messages are processed asynchronously + w.storeMsgIDsMu.Lock() + w.storeMsgIDs[e.Hash()] = true + w.storeMsgIDsMu.Unlock() + } + + matched := w.filters.NotifyWatchers(e) + + // If not matched we remove it + if !matched { + logger.Debug("filters did not match") + w.storeMsgIDsMu.Lock() + delete(w.storeMsgIDs, e.Hash()) + w.storeMsgIDsMu.Unlock() + } else { + logger.Debug("filters did match") + e.Processed.Store(true) + } + + w.envelopeFeed.Send(common.EnvelopeEvent{ + Topic: e.ContentTopic, + Hash: e.Hash(), + Event: common.EventEnvelopeAvailable, + }) +} + // Envelopes retrieves all the messages currently pooled by the node. func (w *Waku) Envelopes() []*common.ReceivedMessage { w.poolMu.RLock()