From 195982c950e34782988a5dea4f54da875a2e887c Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Wed, 27 Dec 2023 13:53:19 +0000 Subject: [PATCH] fix_(StoreNodeRequestManager): various fixes and improvements (#4509) --- .../mocks/database_settings_manager_mock.go | 5 +- protocol/messenger_communities.go | 12 +- protocol/messenger_contacts.go | 5 +- protocol/messenger_mailserver.go | 20 +- protocol/messenger_mailserver_cycle.go | 8 +- .../messenger_store_node_request_manager.go | 76 +++- ...enger_store_node_request_manager_config.go | 57 +++ protocol/messenger_storenode_request_test.go | 424 +++++++++++++++--- protocol/messenger_testing_utils.go | 12 +- services/mailservers/fleet.go | 10 + wakuv2/waku.go | 6 +- 11 files changed, 509 insertions(+), 126 deletions(-) create mode 100644 protocol/messenger_store_node_request_manager_config.go diff --git a/multiaccounts/settings/mocks/database_settings_manager_mock.go b/multiaccounts/settings/mocks/database_settings_manager_mock.go index a5c99d3a7..ee0fed306 100644 --- a/multiaccounts/settings/mocks/database_settings_manager_mock.go +++ b/multiaccounts/settings/mocks/database_settings_manager_mock.go @@ -125,8 +125,6 @@ func (mr *MockDatabaseSettingsManagerMockRecorder) CanUseMailservers() *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanUseMailservers", reflect.TypeOf((*MockDatabaseSettingsManager)(nil).CanUseMailservers)) } - - // DeviceName mocks base method. func (m *MockDatabaseSettingsManager) DeviceName() (string, error) { m.ctrl.T.Helper() @@ -202,7 +200,6 @@ func (mr *MockDatabaseSettingsManagerMockRecorder) GetEIP1581Address() *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEIP1581Address", reflect.TypeOf((*MockDatabaseSettingsManager)(nil).GetEIP1581Address)) } - // GetInstalledStickerPacks mocks base method. func (m *MockDatabaseSettingsManager) GetInstalledStickerPacks() (*json.RawMessage, error) { m.ctrl.T.Helper() @@ -306,4 +303,4 @@ func (m *MockDatabaseSettingsManager) GetRecentStickers() (*json.RawMessage, err func (mr *MockDatabaseSettingsManagerMockRecorder) GetRecentStickers() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRecentStickers", reflect.TypeOf((*MockDatabaseSettingsManager)(nil).GetRecentStickers)) -} \ No newline at end of file +} diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index f86db022f..e47271de8 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -2574,10 +2574,16 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities } } - community, _, err := m.storeNodeRequestsManager.FetchCommunity(communities.CommunityShard{ + communityAddress := communities.CommunityShard{ CommunityID: communityID, Shard: request.Shard, - }, request.WaitForResponse) + } + + options := []StoreNodeRequestOption{ + WithWaitForResponseOption(request.WaitForResponse), + } + + community, _, err := m.storeNodeRequestsManager.FetchCommunity(communityAddress, options) return community, err } @@ -2585,7 +2591,7 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities // fetchCommunities installs filter for community and requests its details from store node. // When response received it will be passed through signals handler. func (m *Messenger) fetchCommunities(communities []communities.CommunityShard) error { - return m.storeNodeRequestsManager.FetchCommunities(communities) + return m.storeNodeRequestsManager.FetchCommunities(communities, []StoreNodeRequestOption{}) } // passStoredCommunityInfoToSignalHandler calls signal handler with community info diff --git a/protocol/messenger_contacts.go b/protocol/messenger_contacts.go index 58fbd543c..f73413b95 100644 --- a/protocol/messenger_contacts.go +++ b/protocol/messenger_contacts.go @@ -1218,7 +1218,10 @@ func (m *Messenger) scheduleSyncFiltersForContact(publicKey *ecdsa.PublicKey) (* } func (m *Messenger) FetchContact(contactID string, waitForResponse bool) (*Contact, error) { - contact, _, err := m.storeNodeRequestsManager.FetchContact(contactID, waitForResponse) + options := []StoreNodeRequestOption{ + WithWaitForResponseOption(waitForResponse), + } + contact, _, err := m.storeNodeRequestsManager.FetchContact(contactID, options) return contact, err } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 94df00020..89e3d3870 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -23,18 +23,18 @@ import ( const ( initialStoreNodeRequestPageSize = 4 defaultStoreNodeRequestPageSize = 20 + + // tolerance is how many seconds of potentially out-of-order messages we want to fetch + tolerance uint32 = 60 + + mailserverRequestTimeout = 30 * time.Second + oneMonthInSeconds uint32 = 31 * 24 * 60 * 60 + mailserverMaxTries uint = 2 + mailserverMaxFailedRequests uint = 2 + + OneDayInSeconds = 86400 ) -// tolerance is how many seconds of potentially out-of-order messages we want to fetch -var tolerance uint32 = 60 - -var mailserverRequestTimeout = 30 * time.Second -var oneMonthInSeconds uint32 = 31 * 24 * 60 * 60 -var mailserverMaxTries uint = 2 -var mailserverMaxFailedRequests uint = 2 - -const OneDayInSeconds = 86400 - // maxTopicsPerRequest sets the batch size to limit the number of topics per store query var maxTopicsPerRequest int = 10 diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index 149e29ac5..1f69abf25 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -28,13 +28,7 @@ const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64" const findNearestMailServer = !isAndroidEmulator func (m *Messenger) mailserversByFleet(fleet string) []mailservers.Mailserver { - var items []mailservers.Mailserver - for _, ms := range mailservers.DefaultMailservers() { - if ms.Fleet == fleet { - items = append(items, ms) - } - } - return items + return mailservers.DefaultMailserversByFleet(fleet) } type byRTTMsAndCanConnectBefore []SortedMailserver diff --git a/protocol/messenger_store_node_request_manager.go b/protocol/messenger_store_node_request_manager.go index 0cf63d5ee..84d1dc746 100644 --- a/protocol/messenger_store_node_request_manager.go +++ b/protocol/messenger_store_node_request_manager.go @@ -63,18 +63,20 @@ func NewStoreNodeRequestManager(m *Messenger) *StoreNodeRequestManager { // the function will also wait for the store node response and return the fetched community. // Automatically waits for an available store node. // When a `nil` community and `nil` error is returned, that means the community wasn't found at the store node. -func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, waitForResponse bool) (*communities.Community, StoreNodeRequestStats, error) { +func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, opts []StoreNodeRequestOption) (*communities.Community, StoreNodeRequestStats, error) { + cfg := buildStoreNodeRequestConfig(opts) + m.logger.Info("requesting community from store node", zap.Any("community", community), - zap.Bool("waitForResponse", waitForResponse)) + zap.Any("config", cfg)) requestCommunity := func(communityID string, shard *shard.Shard) (*communities.Community, StoreNodeRequestStats, error) { - channel, err := m.subscribeToRequest(storeNodeCommunityRequest, communityID, shard) + channel, err := m.subscribeToRequest(storeNodeCommunityRequest, communityID, shard, cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err) } - if !waitForResponse { + if !cfg.WaitForResponse { return nil, StoreNodeRequestStats{}, nil } @@ -86,14 +88,14 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community communityShard := community.Shard if communityShard == nil { id := transport.CommunityShardInfoTopic(community.CommunityID) - shard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultShard()) + fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultShard(), cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err) } - if !waitForResponse { + if !cfg.WaitForResponse { go func() { - shardResult := <-shard + shardResult := <-fetchedShard communityShard = shardResult.shard _, _, _ = requestCommunity(community.CommunityID, communityShard) @@ -101,7 +103,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community return nil, StoreNodeRequestStats{}, nil } - shardResult := <-shard + shardResult := <-fetchedShard communityShard = shardResult.shard } @@ -119,13 +121,16 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community // those content topics is spammed with to many envelopes, then on each iteration we will have to fetch all // of this spam first to get the envelopes in other content topics. To avoid this we keep independent requests // for each content topic. -func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard) error { +func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard, opts []StoreNodeRequestOption) error { m.logger.Info("requesting communities from store node", zap.Any("communities", communities)) + // when fetching multiple communities we don't wait for the response + opts = append(opts, WithWaitForResponseOption(false)) + var outErr error for _, community := range communities { - _, _, err := m.FetchCommunity(community, false) + _, _, err := m.FetchCommunity(community, opts) if err != nil { outErr = fmt.Errorf("%sfailed to create a request for community %s: %w", outErr, community.CommunityID, err) } @@ -134,17 +139,20 @@ func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.Com return outErr } -func (m *StoreNodeRequestManager) FetchContact(contactID string, waitForResponse bool) (*Contact, StoreNodeRequestStats, error) { +func (m *StoreNodeRequestManager) FetchContact(contactID string, opts []StoreNodeRequestOption) (*Contact, StoreNodeRequestStats, error) { + + cfg := buildStoreNodeRequestConfig(opts) + m.logger.Info("requesting contact from store node", zap.Any("contactID", contactID), - zap.Bool("waitForResponse", waitForResponse)) + zap.Any("config", cfg)) - channel, err := m.subscribeToRequest(storeNodeContactRequest, contactID, nil) + channel, err := m.subscribeToRequest(storeNodeContactRequest, contactID, nil, cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err) } - if !waitForResponse { + if !cfg.WaitForResponse { return nil, StoreNodeRequestStats{}, nil } @@ -155,7 +163,7 @@ func (m *StoreNodeRequestManager) FetchContact(contactID string, waitForResponse // subscribeToRequest checks if a request for given community/contact is already in progress, creates and installs // a new one if not found, and returns a subscription to the result of the found/started request. // The subscription can then be used to get the result of the request, this could be either a community/contact or an error. -func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeRequestType, dataID string, shard *shard.Shard) (storeNodeResponseSubscription, error) { +func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeRequestType, dataID string, shard *shard.Shard, cfg StoreNodeRequestConfig) (storeNodeResponseSubscription, error) { // It's important to unlock only after getting the subscription channel. // We also lock `activeRequestsLock` during finalizing the requests. This ensures that the subscription // created in this function will get the result even if the requests proceeds faster than this function ends. @@ -184,6 +192,7 @@ func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeReques } request = m.newStoreNodeRequest() + request.config = cfg request.pubsubTopic = filter.PubsubTopic request.requestID = requestID request.contentTopic = filter.ContentTopic @@ -284,8 +293,10 @@ type storeNodeRequest struct { requestID storeNodeRequestID // request parameters - pubsubTopic string - contentTopic types.TopicType + pubsubTopic string + contentTopic types.TopicType + minimumDataClock uint64 + config StoreNodeRequestConfig // request corresponding metadata to be used in finalize filterToForget *transport.Filter @@ -378,7 +389,21 @@ func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32 if community == nil { // community not found in the database, request next page logger.Debug("community still not fetched") - return true, defaultStoreNodeRequestPageSize + return true, r.config.FurtherPageSize + } + + // We check here if the community was fetched actually fetched and updated, because it + // could be that the community was already in the database when we started the fetching. + // + // Would be perfect if we could track that the community was in these particular envelopes, + // but I don't think that's possible right now. We check if clock was updated instead. + + if community.Clock() <= r.minimumDataClock { + logger.Debug("local community description is not newer than existing", + zap.Any("existingClock", community.Clock()), + zap.Any("minimumDataClock", r.minimumDataClock), + ) + return true, r.config.FurtherPageSize } logger.Debug("community found", @@ -420,7 +445,7 @@ func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32 if contact == nil { // contact not found in the database, request next page logger.Debug("contact still not fetched") - return true, defaultStoreNodeRequestPageSize + return true, r.config.FurtherPageSize } logger.Debug("contact found", @@ -429,7 +454,7 @@ func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32 r.result.contact = contact } - return false, 0 + return !r.config.StopWhenDataFound, r.config.FurtherPageSize } func (r *storeNodeRequest) routine() { @@ -457,6 +482,15 @@ func (r *storeNodeRequest) routine() { return } + // Check if community already exists locally and get Clock. + + localCommunity, _ := r.manager.messenger.communitiesManager.GetByIDString(r.requestID.DataID) + + if localCommunity != nil { + r.minimumDataClock = localCommunity.Clock() + } + + // Start store node request to := uint32(math.Ceil(float64(r.manager.messenger.GetCurrentTimeInMillis()) / 1000)) from := to - oneMonthInSeconds @@ -472,7 +506,7 @@ func (r *storeNodeRequest) routine() { r.manager.onPerformingBatch(batch) } - return nil, r.manager.messenger.processMailserverBatchWithOptions(batch, initialStoreNodeRequestPageSize, r.shouldFetchNextPage, true) + return nil, r.manager.messenger.processMailserverBatchWithOptions(batch, r.config.InitialPageSize, r.shouldFetchNextPage, true) }) r.result.err = err diff --git a/protocol/messenger_store_node_request_manager_config.go b/protocol/messenger_store_node_request_manager_config.go new file mode 100644 index 000000000..a4e23f20f --- /dev/null +++ b/protocol/messenger_store_node_request_manager_config.go @@ -0,0 +1,57 @@ +package protocol + +type StoreNodeRequestConfig struct { + WaitForResponse bool + StopWhenDataFound bool + InitialPageSize uint32 + FurtherPageSize uint32 +} + +type StoreNodeRequestOption func(*StoreNodeRequestConfig) + +func defaultStoreNodeRequestConfig() StoreNodeRequestConfig { + return StoreNodeRequestConfig{ + WaitForResponse: true, + StopWhenDataFound: true, + InitialPageSize: initialStoreNodeRequestPageSize, + FurtherPageSize: defaultStoreNodeRequestPageSize, + } +} + +func buildStoreNodeRequestConfig(opts []StoreNodeRequestOption) StoreNodeRequestConfig { + cfg := defaultStoreNodeRequestConfig() + + // TODO: remove these 2 when fixed: https://github.com/waku-org/nwaku/issues/2317 + opts = append(opts, WithStopWhenDataFound(false)) + opts = append(opts, WithInitialPageSize(defaultStoreNodeRequestPageSize)) + + for _, opt := range opts { + opt(&cfg) + } + + return cfg +} + +func WithWaitForResponseOption(waitForResponse bool) StoreNodeRequestOption { + return func(c *StoreNodeRequestConfig) { + c.WaitForResponse = waitForResponse + } +} + +func WithStopWhenDataFound(stopWhenDataFound bool) StoreNodeRequestOption { + return func(c *StoreNodeRequestConfig) { + c.StopWhenDataFound = stopWhenDataFound + } +} + +func WithInitialPageSize(initialPageSize uint32) StoreNodeRequestOption { + return func(c *StoreNodeRequestConfig) { + c.InitialPageSize = initialPageSize + } +} + +func WithFurtherPageSize(furtherPageSize uint32) StoreNodeRequestOption { + return func(c *StoreNodeRequestConfig) { + c.FurtherPageSize = furtherPageSize + } +} diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index 739f41424..d652ff44f 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -2,10 +2,14 @@ package protocol import ( "context" + "fmt" "sync" "testing" "time" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/status-im/status-go/protocol/transport" + "github.com/status-im/status-go/multiaccounts/accounts" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" @@ -27,6 +31,7 @@ import ( mailserversDB "github.com/status-im/status-go/services/mailservers" waku2 "github.com/status-im/status-go/wakuv2" + wakuV2common "github.com/status-im/status-go/wakuv2/common" ) const ( @@ -57,6 +62,48 @@ type MessengerStoreNodeRequestSuite struct { logger *zap.Logger } +type singleResult struct { + EnvelopesCount int + Envelopes []*wakuV2common.ReceivedMessage + Error error + FetchedCommunity *communities.Community +} + +func (r *singleResult) toString() string { + resultString := "" + communityString := "" + + if r.FetchedCommunity != nil { + communityString = fmt.Sprintf("clock: %d (%s), name: %s, members: %d", + r.FetchedCommunity.Clock(), + time.Unix(int64(r.FetchedCommunity.Clock()), 0).UTC(), + r.FetchedCommunity.Name(), + len(r.FetchedCommunity.Members()), + ) + } + + if r.Error != nil { + resultString = fmt.Sprintf("error: %s", r.Error.Error()) + } else { + resultString = fmt.Sprintf("envelopes fetched: %d, community %s", + r.EnvelopesCount, communityString) + } + + for i, envelope := range r.Envelopes { + resultString += fmt.Sprintf("\n\tenvelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s", + i+1, + envelope.Hash().Hex(), + envelope.Envelope.Message().GetTimestamp(), + time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(), + len(envelope.Envelope.Message().Payload), + envelope.Envelope.Message().ContentTopic, + envelope.Envelope.PubsubTopic(), + ) + } + + return resultString +} + func (s *MessengerStoreNodeRequestSuite) SetupTest() { cfg := zap.NewDevelopmentConfig() cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) @@ -67,7 +114,7 @@ func (s *MessengerStoreNodeRequestSuite) SetupTest() { s.cancel = make(chan struct{}, 10) storeNodeLogger := s.logger.Named("store-node-waku") - s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true) + s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true, false) storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses() s.Require().LessOrEqual(1, len(storeNodeListenAddresses)) @@ -78,14 +125,14 @@ func (s *MessengerStoreNodeRequestSuite) SetupTest() { func (s *MessengerStoreNodeRequestSuite) TearDown() { close(s.cancel) - s.wakuStoreNode.Stop() // nolint: errcheck - s.owner.Shutdown() // nolint: errcheck - s.bob.Shutdown() // nolint: errcheck + s.Require().NoError(s.wakuStoreNode.Stop()) + TearDownMessenger(&s.Suite, s.owner) + TearDownMessenger(&s.Suite, s.bob) } func (s *MessengerStoreNodeRequestSuite) createOwner() { wakuLogger := s.logger.Named("owner-waku-node") - wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false) + wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false) s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2) messengerLogger := s.logger.Named("owner-messenger") @@ -98,7 +145,7 @@ func (s *MessengerStoreNodeRequestSuite) createOwner() { func (s *MessengerStoreNodeRequestSuite) createBob() { wakuLogger := s.logger.Named("bob-waku-node") - wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false) + wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false) s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2) messengerLogger := s.logger.Named("bob-messenger") @@ -137,6 +184,8 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community { s.waitForAvailableStoreNode(m) + storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil) + createCommunityRequest := &requests.CreateCommunity{ Name: RandomLettersString(10), Description: RandomLettersString(20), @@ -145,15 +194,23 @@ func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communit Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, } - response, err := m.CreateCommunity(createCommunityRequest, true) + response, err := m.CreateCommunity(createCommunityRequest, false) s.Require().NoError(err) s.Require().NotNil(response) s.Require().Len(response.Communities(), 1) + s.waitForEnvelopes(storeNodeSubscription, 1) + return response.Communities()[0] } func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(c *communities.Community, expected *communities.Community) { + if expected == nil { + s.Require().Nil(c) + return + } + s.Require().NotNil(c) + s.Require().Equal(expected.IDString(), c.IDString()) s.Require().Equal(expected.Clock(), c.Clock()) s.Require().Equal(expected.Name(), c.Name()) s.Require().Equal(expected.Identity().Description, c.Identity().Description) @@ -168,16 +225,15 @@ func (s *MessengerStoreNodeRequestSuite) requireContactsEqual(c *Contact, expect s.Require().Equal(expected.SocialLinks, c.SocialLinks) } -func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunityInfo *communities.Community) StoreNodeRequestStats { - fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, true) +func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunity *communities.Community) StoreNodeRequestStats { + options := []StoreNodeRequestOption{ + WithWaitForResponseOption(true), + } + + fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options) s.Require().NoError(err) - s.Require().NotNil(fetchedCommunity) - s.Require().Equal(communityShard.CommunityID, fetchedCommunity.IDString()) - - if expectedCommunityInfo != nil { - s.requireCommunitiesEqual(fetchedCommunity, expectedCommunityInfo) - } + s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity) return stats } @@ -197,6 +253,54 @@ func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Me WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout) } +func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { + envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100) + envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher) + + go func() { + defer envelopesSub.Unsubscribe() + for { + select { + case <-s.cancel: + return + + case envelopeEvent := <-envelopesWatcher: + if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable { + continue + } + if topic != nil && *topic != envelopeEvent.Topic { + continue + } + envelope := wakuNode.GetEnvelope(envelopeEvent.Hash) + cb(envelope) + s.logger.Debug("envelope available event for fetched content topic", + zap.Any("envelopeEvent", envelopeEvent), + zap.Any("envelope", envelope), + ) + } + + } + }() +} + +func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string { + storeNodeSubscription := make(chan string, 100) + s.setupEnvelopesWatcher(s.wakuStoreNode, topic, func(envelope *wakuV2common.ReceivedMessage) { + storeNodeSubscription <- envelope.Hash().String() + }) + return storeNodeSubscription +} + +func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan string, expectedEnvelopesCount int) { + for i := 0; i < expectedEnvelopesCount; i++ { + select { + case <-subscription: + case <-time.After(5 * time.Second): + s.Require().Fail("timeout waiting for store node to receive envelopes") + } + } +} + func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() { s.createOwner() s.createBob() @@ -214,10 +318,9 @@ func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() { community := s.createCommunity(s.owner) // Test consecutive requests to check that requests in manager are finalized - for i := 0; i < 2; i++ { - s.waitForAvailableStoreNode(s.bob) - s.fetchCommunity(s.bob, community.CommunityShard(), community) - } + // At second request we expect to fetch nothing, because the community is already in the database + s.fetchCommunity(s.bob, community.CommunityShard(), community) + s.fetchCommunity(s.bob, community.CommunityShard(), nil) } func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() { @@ -278,68 +381,21 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDi s.fetchCommunity(s.bob, community.CommunityShard(), community) } -// This test is intended to only run locally to test how fast is a big community fetched -// Shouldn't be executed in CI, because it relies on connection to status.prod store nodes. -func (s *MessengerStoreNodeRequestSuite) TestRequestBigCommunity() { - if !runLocalTests { - return - } - - // Status CC community - const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a" - - communityShard := communities.CommunityShard{ - CommunityID: communityID, - Shard: nil, - } - - wakuLogger := s.logger.Named("user-waku-node") - messengerLogger := s.logger.Named("user-messenger") - - wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false) - userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2) - - privateKey, err := crypto.GenerateKey() - s.Require().NoError(err) - - mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) - s.Require().NoError(err) - - mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) - s.Require().NoError(err) - - options := []Option{ - WithMailserversDatabase(mailserversDatabase), - WithClusterConfig(params.ClusterConfig{ - Fleet: params.FleetStatusProd, - }), - } - - // Create bob without `createBob` without func to force status.prod fleet - s.bob, err = newMessengerWithKey(userWaku, privateKey, messengerLogger, options) - s.Require().NoError(err) - - fetchedCommunity, stats, err := s.bob.storeNodeRequestsManager.FetchCommunity(communityShard, true) - - s.Require().NoError(err) - s.Require().NotNil(fetchedCommunity) - s.Require().Equal(communityID, fetchedCommunity.IDString()) - - s.Require().Equal(initialStoreNodeRequestPageSize, stats.FetchedEnvelopesCount) - s.Require().Equal(1, stats.FetchedPagesCount) -} - func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() { + const spamAmount = defaultStoreNodeRequestPageSize + initialStoreNodeRequestPageSize + s.createOwner() s.createBob() // Create a community community := s.createCommunity(s.owner) + contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) + storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Push spam to the same ContentTopic & PubsubTopic // The first requested page size is 1. All subsequent pages are limited to 20. // We want to test the algorithm, so we push 21 spam envelopes. - for i := 0; i < defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize; i++ { + for i := 0; i < spamAmount; i++ { spamMessage := common.RawMessage{ Payload: RandomBytes(16), Sender: community.PrivateKey(), @@ -351,14 +407,17 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() { s.Require().NoError(err) } + // Wait the store node to receive envelopes + s.waitForEnvelopes(storeNodeSubscription, spamAmount) + // Fetch the community stats := s.fetchCommunity(s.bob, community.CommunityShard(), community) // Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched. // First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description, // then we fetch another page of data and successfully decrypt a community description. - s.Require().Equal(defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize+2, stats.FetchedEnvelopesCount) - s.Require().Equal(3, stats.FetchedPagesCount) + s.Require().Equal(spamAmount+1, stats.FetchedEnvelopesCount) + s.Require().Equal(2, stats.FetchedPagesCount) // TODO: revert to 3 when fixed: https://github.com/waku-org/nwaku/issues/2317 } func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() { @@ -475,6 +534,47 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() { s.fetchProfile(s.bob, s.owner.selfContact.ID, s.owner.selfContact) } +// TestSequentialUpdates checks that making updates to the community +// immediately results in new store node fetched information. +// Before adding storeNodeSubscription we had problems with the test setup that we didn't have a mechanism to wait for store node to +// receive and process new messages. +func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() { + s.createOwner() + s.createBob() + + community := s.createCommunity(s.owner) + s.fetchCommunity(s.bob, community.CommunityShard(), community) + + contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) + communityName := community.Name() + + storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) + + for i := 0; i < 3; i++ { + // Change community name, this will automatically publish a new community description + ownerEditRequest := &requests.EditCommunity{ + CommunityID: community.ID(), + CreateCommunity: requests.CreateCommunity{ + Name: fmt.Sprintf("%s-%d", communityName, i), + Description: community.DescriptionText(), + Color: community.Color(), + Membership: community.Permissions().Access, + }, + } + _, err := s.owner.EditCommunity(ownerEditRequest) + s.Require().NoError(err) + + s.waitForEnvelopes(storeNodeSubscription, 1) + + // Get updated community from the database + community, err = s.owner.communitiesManager.GetByID(community.ID()) + s.Require().NoError(err) + s.Require().NotNil(community) + + s.fetchCommunity(s.bob, community.CommunityShard(), community) + } +} + func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() { s.createOwner() s.createBob() @@ -518,7 +618,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFiltersNotRemoved() { filterBefore := s.owner.transport.FilterByChatID(community.IDString()) s.Require().NotNil(filterBefore) - s.fetchCommunity(s.owner, community.CommunityShard(), community) + s.fetchCommunity(s.owner, community.CommunityShard(), nil) filterAfter := s.owner.transport.FilterByChatID(community.IDString()) s.Require().NotNil(filterAfter) @@ -542,3 +642,179 @@ func (s *MessengerStoreNodeRequestSuite) TestFiltersRemoved() { filterAfter := s.bob.transport.FilterByChatID(community.IDString()) s.Require().Nil(filterAfter) } + +func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() { + s.createOwner() + s.createBob() + + community := s.createCommunity(s.owner) + + // Push 5 descriptions to the store node + for i := 0; i < 4; i++ { + err := s.owner.publishOrg(community) + s.Require().NoError(err) + } + + // Subscribe to received envelope + + bobWakuV2 := gethbridge.GetGethWakuV2From(s.bobWaku) + contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) + + var prevEnvelope *wakuV2common.ReceivedMessage + + s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) { + // We check that each next envelope fetched is newer than the previous one + if prevEnvelope != nil { + s.Require().Greater( + envelope.Envelope.Message().GetTimestamp(), + prevEnvelope.Envelope.Message().GetTimestamp()) + } + prevEnvelope = envelope + }) + + // Force a single-envelope page size to be able to check the order. + // Also force all envelopes to be fetched. + + options := []StoreNodeRequestOption{ + WithWaitForResponseOption(true), + WithStopWhenDataFound(false), + WithInitialPageSize(1), + WithFurtherPageSize(1), + } + + // Fetch the community + + fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options) + s.Require().NoError(err) + s.requireCommunitiesEqual(fetchedCommunity, community) +} + +// TestFetchRealCommunity is intended to only run locally to check the community description in all of the store nodes. +// Shouldn't be executed in CI, because it relies on connection to the real network. +// +// To run this test, first set `runLocalTests` to true. +// Then carefully set all of communityID, communityShard, fleet and other const variables. +// NOTE: I only tested it with the default parameters, but in theory it should work for any configuration. +func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { + if !runLocalTests { + return + } + + const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a" + var communityShard *shard.Shard + + const fleet = params.FleetStatusProd + const useShardAsDefaultTopic = false + const clusterID = 0 + const userPrivateKeyString = "" // When empty a new user will be created + contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID)) + nodesList := mailserversDB.DefaultMailserversByFleet(fleet) + + results := map[string]singleResult{} + wg := sync.WaitGroup{} + + // We run a separate request for each node in the fleet. + + for i, mailserver := range nodesList { + wg.Add(1) + + go func(i int, mailserver mailserversDB.Mailserver) { + defer wg.Done() + + fmt.Printf("--- starting for %s\n", mailserver.ID) + + result := singleResult{} + + // + // Create WakuV2 node + // NOTE: Another option was to create a bare waku node and fetch envelopes directly with it + // and after that push all of the envelopes to a new messenger and check the result. + // But this turned out to be harder to implement. + // + + wakuLogger := s.logger.Named(fmt.Sprintf("user-waku-node-%d", i)) + messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i)) + + wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, useShardAsDefaultTopic) + userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2) + + // + // Create a messenger to process envelopes + // + + var privateKeyString = userPrivateKeyString + + if privateKeyString == "" { + privateKey, err := crypto.GenerateKey() + s.Require().NoError(err) + privateKeyString = hexutil.Encode(crypto.FromECDSA(privateKey)) + } + + privateKeyBytes, err := hexutil.Decode(privateKeyString) + s.Require().NoError(err) + privateKey, err := crypto.ToECDSA(privateKeyBytes) + s.Require().NoError(err) + + // Mock a local fleet with single store node + // This is done by settings custom store nodes in the database + + mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + s.Require().NoError(err) + mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) + + mailserver.Fleet = localFleet + err = mailserversDatabase.Add(mailserver) + s.Require().NoError(err) + + options := []Option{ + WithMailserversDatabase(mailserversDatabase), + WithClusterConfig(params.ClusterConfig{ + Fleet: localFleet, + ClusterID: clusterID, + }), + } + + // Create user without `createBob` func to force desired fleet + user, err := newMessengerWithKey(userWaku, privateKey, messengerLogger, options) + s.Require().NoError(err) + defer TearDownMessenger(&s.Suite, user) + + communityAddress := communities.CommunityShard{ + CommunityID: communityID, + Shard: communityShard, + } + + // Setup envelopes watcher to gather fetched envelopes + + s.setupEnvelopesWatcher(wakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) { + result.Envelopes = append(result.Envelopes, envelope) + }) + + // Start fetching + + storeNodeRequestOptions := []StoreNodeRequestOption{ + WithWaitForResponseOption(true), + WithStopWhenDataFound(false), // In this test we want all envelopes to be fetched + WithInitialPageSize(defaultStoreNodeRequestPageSize), // Because we're fetching all envelopes anyway + } + + fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions) + + result.EnvelopesCount = stats.FetchedEnvelopesCount + result.FetchedCommunity = fetchedCommunity + result.Error = err + + results[mailserver.ID] = result + }(i, mailserver) + } + + // Wait for all requests to finish + + wg.Wait() + + // Print the results + + for storeNodeName, result := range results { + fmt.Printf("%s --- %s\n", storeNodeName, result.toString()) + } +} diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index c5381db18..9a2096e04 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -16,8 +16,6 @@ import ( "github.com/stretchr/testify/suite" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/status-im/status-go/appdatabase" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" @@ -253,9 +251,10 @@ func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Durati s.Require().True(available) } -func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool) *waku2.Waku { +func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool, useShardAsDefaultTopic bool) *waku2.Waku { wakuConfig := &waku2.Config{ - DefaultShardPubsubTopic: relay.DefaultWakuTopic, // shard.DefaultShardPubsubTopic(), + DefaultShardPubsubTopic: "", // TODO: Empty string should work fine, for default value if not. + UseShardAsDefaultTopic: useShardAsDefaultTopic, } var onPeerStats func(connStatus types.ConnStatus) @@ -316,7 +315,7 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s nodes := make([]*waku2.Waku, len(nodeNames)) for i, name := range nodeNames { logger := parentLogger.With(zap.String("name", name+"-waku")) - wakuNode := NewWakuV2(s, logger, true, false) + wakuNode := NewWakuV2(s, logger, true, false, false) nodes[i] = wakuNode } @@ -343,6 +342,9 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s } func TearDownMessenger(s *suite.Suite, m *Messenger) { + if m == nil { + return + } s.Require().NoError(m.Shutdown()) if m.database != nil { s.Require().NoError(m.database.Close()) diff --git a/services/mailservers/fleet.go b/services/mailservers/fleet.go index d91886c84..55406bba7 100644 --- a/services/mailservers/fleet.go +++ b/services/mailservers/fleet.go @@ -2,6 +2,16 @@ package mailservers import "github.com/status-im/status-go/params" +func DefaultMailserversByFleet(fleet string) []Mailserver { + var items []Mailserver + for _, ms := range DefaultMailservers() { + if ms.Fleet == fleet { + items = append(items, ms) + } + } + return items +} + func DefaultMailservers() []Mailserver { return []Mailserver{ diff --git a/wakuv2/waku.go b/wakuv2/waku.go index ecc64b7e4..72805b46a 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1150,7 +1150,11 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to msg.RateLimitProof = nil envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) - w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic)) + w.logger.Info("received waku2 store message", + zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())), + zap.String("pubsubTopic", pubsubTopic), + zap.Int64p("timestamp", envelope.Message().Timestamp), + ) err = w.OnNewEnvelopes(envelope, common.StoreMessageType, processEnvelopes) if err != nil {