package protocol import ( "context" "fmt" "sync" "testing" "time" "github.com/stretchr/testify/suite" "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/status-go/appdatabase" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/tt" mailserversDB "github.com/status-im/status-go/services/mailservers" "github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/t/helpers" waku2 "github.com/status-im/status-go/wakuv2" wakuV2common "github.com/status-im/status-go/wakuv2/common" ) const ( localFleet = "local-test-fleet-1" localMailserverID = "local-test-mailserver" storeNodeConnectTimeout = 500 * time.Millisecond runLocalTests = false ) func TestMessengerStoreNodeRequestSuite(t *testing.T) { suite.Run(t, new(MessengerStoreNodeRequestSuite)) } type MessengerStoreNodeRequestSuite struct { suite.Suite cancel chan struct{} owner *Messenger bob *Messenger wakuStoreNode *waku2.Waku storeNodeAddress string ownerWaku types.Waku bobWaku types.Waku collectiblesServiceMock *CollectiblesServiceMock logger *zap.Logger } type singleResult struct { EnvelopesCount int Envelopes []*wakuV2common.ReceivedMessage ShardEnvelopes []*wakuV2common.ReceivedMessage Error error FetchedCommunity *communities.Community } func (r *singleResult) ShardEnvelopesHashes() []string { out := make([]string, 0, len(r.ShardEnvelopes)) for _, e := range r.ShardEnvelopes { out = append(out, e.Hash().String()) } return out } func (r *singleResult) EnvelopesHashes() []string { out := make([]string, 0, len(r.Envelopes)) for _, e := range r.Envelopes { out = append(out, e.Hash().String()) } return out } func (r *singleResult) toString() string { resultString := "" communityString := "" if r.FetchedCommunity != nil { communityString = fmt.Sprintf("clock: %d, name: '%s', members: %d", r.FetchedCommunity.Clock(), r.FetchedCommunity.Name(), len(r.FetchedCommunity.Members()), ) } if r.Error != nil { resultString = fmt.Sprintf("error: %s", r.Error.Error()) } else { resultString = fmt.Sprintf("envelopes fetched: %d, community - %s", r.EnvelopesCount, communityString) } for i, envelope := range r.ShardEnvelopes { resultString += fmt.Sprintf("\n\tshard envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s", i+1, envelope.Hash().Hex(), envelope.Envelope.Message().GetTimestamp(), time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(), len(envelope.Envelope.Message().Payload), envelope.Envelope.Message().ContentTopic, envelope.Envelope.PubsubTopic(), ) } for i, envelope := range r.Envelopes { resultString += fmt.Sprintf("\n\tdescription envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s", i+1, envelope.Hash().Hex(), envelope.Envelope.Message().GetTimestamp(), time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(), len(envelope.Envelope.Message().Payload), envelope.Envelope.Message().ContentTopic, envelope.Envelope.PubsubTopic(), ) } return resultString } func (s *MessengerStoreNodeRequestSuite) SetupTest() { s.logger = tt.MustCreateTestLogger() s.cancel = make(chan struct{}, 10) s.collectiblesServiceMock = &CollectiblesServiceMock{} s.createStore() } func (s *MessengerStoreNodeRequestSuite) TearDown() { close(s.cancel) s.Require().NoError(s.wakuStoreNode.Stop()) TearDownMessenger(&s.Suite, s.owner) TearDownMessenger(&s.Suite, s.bob) } func (s *MessengerStoreNodeRequestSuite) createStore() { cfg := testWakuV2Config{ logger: s.logger.Named("store-waku"), enableStore: true, useShardAsDefaultTopic: false, clusterID: shard.UndefinedShardValue, } s.wakuStoreNode = NewTestWakuV2(&s.Suite, cfg) s.storeNodeAddress = s.wakuListenAddress(s.wakuStoreNode) s.logger.Info("store node ready", zap.String("address", s.storeNodeAddress)) } func (s *MessengerStoreNodeRequestSuite) tearDownOwner() { _ = gethbridge.GetGethWakuV2From(s.ownerWaku).Stop() TearDownMessenger(&s.Suite, s.owner) } func (s *MessengerStoreNodeRequestSuite) createOwner() { cfg := testWakuV2Config{ logger: s.logger.Named("owner-waku"), enableStore: false, useShardAsDefaultTopic: false, clusterID: shard.UndefinedShardValue, } wakuV2 := NewTestWakuV2(&s.Suite, cfg) s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2) messengerLogger := s.logger.Named("owner-messenger") s.owner = s.newMessenger(s.ownerWaku, messengerLogger, s.storeNodeAddress) // We force the owner to use the store node as relay peer WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() []string { err := s.owner.DialPeer(s.storeNodeAddress) s.Require().NoError(err) return []string{s.wakuStoreNode.PeerID().String()} }) } func (s *MessengerStoreNodeRequestSuite) createBob() { cfg := testWakuV2Config{ logger: s.logger.Named("bob-waku"), enableStore: false, useShardAsDefaultTopic: false, clusterID: shard.UndefinedShardValue, } wakuV2 := NewTestWakuV2(&s.Suite, cfg) s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2) messengerLogger := s.logger.Named("bob-messenger") s.bob = s.newMessenger(s.bobWaku, messengerLogger, s.storeNodeAddress) } func (s *MessengerStoreNodeRequestSuite) tearDownBob() { _ = gethbridge.GetGethWakuV2From(s.bobWaku).Stop() TearDownMessenger(&s.Suite, s.bob) } func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress string) *Messenger { privateKey, err := crypto.GenerateKey() s.Require().NoError(err) options := []Option{ WithAutoRequestHistoricMessages(false), WithCuratedCommunitiesUpdateLoop(false), } if mailserverAddress != "" { options = append(options, WithTestStoreNode(&s.Suite, localMailserverID, mailserverAddress, localFleet, s.collectiblesServiceMock), ) } messenger, err := newMessengerWithKey(shh, privateKey, logger, options) s.Require().NoError(err) return messenger } func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community { s.waitForAvailableStoreNode(m) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil) createCommunityRequest := &requests.CreateCommunity{ Name: RandomLettersString(10), Description: RandomLettersString(20), Color: RandomColor(), Tags: RandomCommunityTags(3), Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, } response, err := m.CreateCommunity(createCommunityRequest, false) s.Require().NoError(err) s.Require().NotNil(response) s.Require().Len(response.Communities(), 1) s.waitForEnvelopes(storeNodeSubscription, 1) return response.Communities()[0] } func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(c *communities.Community, expected *communities.Community) { if expected == nil { s.Require().Nil(c) return } s.Require().NotNil(c) s.Require().Equal(expected.IDString(), c.IDString()) s.Require().Equal(expected.Clock(), c.Clock()) s.Require().Equal(expected.Name(), c.Name()) s.Require().Equal(expected.Identity().Description, c.Identity().Description) s.Require().Equal(expected.Color(), c.Color()) s.Require().Equal(expected.Tags(), c.Tags()) s.Require().Equal(expected.Shard(), c.Shard()) s.Require().Equal(len(expected.TokenPermissions()), len(c.TokenPermissions())) for k, v := range expected.TokenPermissions() { s.Require().True(proto.Equal(v, c.TokenPermissions()[k])) } s.Require().Equal(len(expected.CommunityTokensMetadata()), len(c.CommunityTokensMetadata())) for i, v := range expected.CommunityTokensMetadata() { s.Require().True(proto.Equal(v, c.CommunityTokensMetadata()[i])) } } func (s *MessengerStoreNodeRequestSuite) requireContactsEqual(c *Contact, expected *Contact) { s.Require().Equal(expected.DisplayName, c.DisplayName) s.Require().Equal(expected.Bio, c.Bio) s.Require().Equal(expected.SocialLinks, c.SocialLinks) } func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunity *communities.Community) StoreNodeRequestStats { options := []StoreNodeRequestOption{ WithWaitForResponseOption(true), } fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options) s.Require().NoError(err) s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity) return stats } func (s *MessengerStoreNodeRequestSuite) fetchProfile(m *Messenger, contactID string, expectedContact *Contact) { fetchedContact, err := m.FetchContact(contactID, true) s.Require().NoError(err) s.Require().NotNil(fetchedContact) s.Require().Equal(contactID, fetchedContact.ID) if expectedContact != nil { s.requireContactsEqual(fetchedContact, expectedContact) } } func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Messenger) { WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout) } func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100) envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher) go func() { defer envelopesSub.Unsubscribe() for { select { case <-s.cancel: return case envelopeEvent := <-envelopesWatcher: if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable { continue } if topic != nil && *topic != envelopeEvent.Topic { continue } envelope := wakuNode.GetEnvelope(envelopeEvent.Hash) cb(envelope) s.logger.Debug("envelope available event for fetched content topic", zap.Any("envelopeEvent", envelopeEvent), zap.Any("envelope", envelope), ) } } }() } func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string { storeNodeSubscription := make(chan string, 100) s.setupEnvelopesWatcher(s.wakuStoreNode, topic, func(envelope *wakuV2common.ReceivedMessage) { storeNodeSubscription <- envelope.Hash().String() }) return storeNodeSubscription } func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan string, expectedEnvelopesCount int) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() for i := 0; i < expectedEnvelopesCount; i++ { select { case <-subscription: case <-ctx.Done(): err := fmt.Sprintf("timeout waiting for store node to receive envelopes, received: %d, expected: %d", i, expectedEnvelopesCount) s.Require().Fail(err) } } } func (s *MessengerStoreNodeRequestSuite) wakuListenAddress(waku *waku2.Waku) string { addresses := waku.ListenAddresses() s.Require().LessOrEqual(1, len(addresses)) return addresses[0] } func (s *MessengerStoreNodeRequestSuite) ensureStoreNodeEnvelopes(contentTopic *wakuV2common.TopicType, minimumCount int) { // Give some time for store node to put envelope into database. Otherwise, the test is flaky. // Although we subscribed to EnvelopeEvents and waited, the actual saving to database happens asynchronously. // It would be nice to implement a subscription for database storing event, but it isn't worth it right now. time.Sleep(100 * time.Millisecond) // Directly ensure profile is available on store node queryOptions := []legacy_store.HistoryRequestOption{ legacy_store.WithLocalQuery(), } query := legacy_store.Query{ PubsubTopic: "", ContentTopics: []string{contentTopic.ContentTopic()}, } result, err := s.wakuStoreNode.StoreNode().Query(context.Background(), query, queryOptions...) s.Require().NoError(err) s.Require().GreaterOrEqual(len(result.Messages), minimumCount) s.logger.Debug("store node query result", zap.Int("messagesCount", len(result.Messages))) } func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) s.fetchCommunity(s.bob, community.CommunityShard(), community) } func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) // Test consecutive requests to check that requests in manager are finalized // At second request we expect to fetch nothing, because the community is already in the database s.fetchCommunity(s.bob, community.CommunityShard(), community) s.fetchCommunity(s.bob, community.CommunityShard(), nil) } func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) storeNodeRequestsCount := 0 s.bob.storeNodeRequestsManager.onPerformingBatch = func(batch MailserverBatch) { storeNodeRequestsCount++ } s.waitForAvailableStoreNode(s.bob) wg := sync.WaitGroup{} // Make 2 simultaneous fetch requests // 1 fetch request = 2 requests to store node (fetch shard and fetch community) // only 2 request to store node is expected for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() s.fetchCommunity(s.bob, community.CommunityShard(), community) }() } wg.Wait() s.Require().Equal(2, storeNodeRequestsCount) } func (s *MessengerStoreNodeRequestSuite) TestRequestNonExistentCommunity() { // On test start store node database is empty, so just request any valid community ID. request := FetchCommunityRequest{ CommunityKey: "0x036dc11a0663f88e15912f0adb68c3c5f68ca0ca7a233f1a88ff923a3d39b2cf07", Shard: nil, TryDatabase: false, WaitForResponse: true, } s.createBob() s.waitForAvailableStoreNode(s.bob) fetchedCommunity, err := s.bob.FetchCommunity(&request) s.Require().NoError(err) s.Require().Nil(fetchedCommunity) } func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDisconnected() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) // WaitForAvailableStoreNode is done internally s.fetchCommunity(s.bob, community.CommunityShard(), community) } func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() { const spamAmount = defaultStoreNodeRequestPageSize + initialStoreNodeRequestPageSize s.createOwner() s.createBob() // Create a community community := s.createCommunity(s.owner) contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Push spam to the same ContentTopic & PubsubTopic // The first requested page size is 1. All subsequent pages are limited to 20. // We want to test the algorithm, so we push 21 spam envelopes. for i := 0; i < spamAmount; i++ { spamMessage := common.RawMessage{ Payload: RandomBytes(16), Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, PubsubTopic: community.PubsubTopic(), } _, err := s.owner.sender.SendPublic(context.Background(), community.IDString(), spamMessage) s.Require().NoError(err) } // Wait the store node to receive envelopes s.waitForEnvelopes(storeNodeSubscription, spamAmount) // Fetch the community stats := s.fetchCommunity(s.bob, community.CommunityShard(), community) // Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched. // First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description, // then we fetch another page of data and successfully decrypt a community description. s.Require().Equal(spamAmount+1, stats.FetchedEnvelopesCount) s.Require().Equal(3, stats.FetchedPagesCount) } func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() { s.createOwner() s.createBob() // Create 2 communities community1 := s.createCommunity(s.owner) community2 := s.createCommunity(s.owner) description2, err := community2.MarshaledDescription() s.Require().NoError(err) // Push community2 description to the same ContentTopic & PubsubTopic as community1. // This way we simulate 2 communities with same ContentTopic. spamMessage := common.RawMessage{ Payload: description2, Sender: community2.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, PubsubTopic: community1.PubsubTopic(), } _, err = s.owner.sender.SendPublic(context.Background(), community1.IDString(), spamMessage) s.Require().NoError(err) // Fetch the community s.fetchCommunity(s.bob, community1.CommunityShard(), community1) } func (s *MessengerStoreNodeRequestSuite) TestRequestMultipleCommunities() { s.createOwner() s.createBob() // Create 2 communities community1 := s.createCommunity(s.owner) community2 := s.createCommunity(s.owner) fetchedCommunities := map[string]*communities.Community{} err := WaitOnSignaledCommunityFound(s.bob, func() { err := s.bob.fetchCommunities([]communities.CommunityShard{ community1.CommunityShard(), community2.CommunityShard(), }) s.Require().NoError(err) }, func(community *communities.Community) bool { fetchedCommunities[community.IDString()] = community return len(fetchedCommunities) == 2 }, 1*time.Second, "communities were not signalled in time", ) s.Require().NoError(err) s.Require().Contains(fetchedCommunities, community1.IDString()) s.Require().Contains(fetchedCommunities, community2.IDString()) } func (s *MessengerStoreNodeRequestSuite) TestRequestWithoutWaitingResponse() { s.createOwner() s.createBob() // Create a community community := s.createCommunity(s.owner) request := FetchCommunityRequest{ CommunityKey: community.IDString(), Shard: nil, TryDatabase: false, WaitForResponse: false, } fetchedCommunities := map[string]*communities.Community{} err := WaitOnSignaledCommunityFound(s.bob, func() { fetchedCommunity, err := s.bob.FetchCommunity(&request) s.Require().NoError(err) s.Require().Nil(fetchedCommunity) }, func(community *communities.Community) bool { fetchedCommunities[community.IDString()] = community return len(fetchedCommunities) == 1 }, 1*time.Second, "communities weren't signalled", ) s.Require().NoError(err) s.Require().Len(fetchedCommunities, 1) s.Require().Contains(fetchedCommunities, community.IDString()) s.requireCommunitiesEqual(fetchedCommunities[community.IDString()], community) } func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() { s.createOwner() defer s.tearDownOwner() // Set keypair (to be able to set displayName) ownerProfileKp := accounts.GetProfileKeypairForTest(true, false, false) ownerProfileKp.KeyUID = s.owner.account.KeyUID ownerProfileKp.Accounts[0].KeyUID = s.owner.account.KeyUID err := s.owner.settings.SaveOrUpdateKeypair(ownerProfileKp) s.Require().NoError(err) contentTopicString := transport.ContactCodeTopic(&s.owner.identity.PublicKey) contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(contentTopicString)) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Set display name, this will also publish contact code err = s.owner.SetDisplayName("super-owner") s.Require().NoError(err) s.waitForEnvelopes(storeNodeSubscription, 1) s.ensureStoreNodeEnvelopes(&contentTopic, 1) // Fetch profile s.createBob() defer s.tearDownBob() s.fetchProfile(s.bob, s.owner.selfContact.ID, s.owner.selfContact) } // TestSequentialUpdates checks that making updates to the community // immediately results in new store node fetched information. // Before adding storeNodeSubscription we had problems with the test setup that we didn't have a mechanism to wait for store node to // receive and process new messages. func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) s.fetchCommunity(s.bob, community.CommunityShard(), community) contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) communityName := community.Name() storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) for i := 0; i < 3; i++ { // Change community name, this will automatically publish a new community description ownerEditRequest := &requests.EditCommunity{ CommunityID: community.ID(), CreateCommunity: requests.CreateCommunity{ Name: fmt.Sprintf("%s-%d", communityName, i), Description: community.DescriptionText(), Color: community.Color(), Membership: community.Permissions().Access, }, } _, err := s.owner.EditCommunity(ownerEditRequest) s.Require().NoError(err) s.waitForEnvelopes(storeNodeSubscription, 1) // Get updated community from the database community, err = s.owner.communitiesManager.GetByID(community.ID()) s.Require().NoError(err) s.Require().NotNil(community) s.fetchCommunity(s.bob, community.CommunityShard(), community) } } func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) expectedShard := &shard.Shard{ Cluster: shard.MainStatusShardCluster, Index: 23, } shardRequest := &requests.SetCommunityShard{ CommunityID: community.ID(), Shard: expectedShard, } shardTopic := transport.CommunityShardInfoTopic(community.IDString()) contentContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(shardTopic)) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentContentTopic) _, err := s.owner.SetCommunityShard(shardRequest) s.Require().NoError(err) s.waitForEnvelopes(storeNodeSubscription, 1) s.waitForAvailableStoreNode(s.bob) communityShard := community.CommunityShard() community, err = s.owner.communitiesManager.GetByID(community.ID()) s.Require().NoError(err) s.Require().NotNil(community) s.Require().NotNil(community.Shard()) s.fetchCommunity(s.bob, communityShard, community) } func (s *MessengerStoreNodeRequestSuite) TestFiltersNotRemoved() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) // The owner is a member of the community, so he has a filter for community description content topic. // We want to check that filter is not removed by `FetchCommunity` call. filterBefore := s.owner.transport.FilterByChatID(community.IDString()) s.Require().NotNil(filterBefore) s.fetchCommunity(s.owner, community.CommunityShard(), nil) filterAfter := s.owner.transport.FilterByChatID(community.IDString()) s.Require().NotNil(filterAfter) s.Require().Equal(filterBefore.FilterID, filterAfter.FilterID) } func (s *MessengerStoreNodeRequestSuite) TestFiltersRemoved() { s.createOwner() s.createBob() community := s.createCommunity(s.owner) // The bob is a member of the community, so he has no filters for community description content topic. // We want to check that filter created by `FetchCommunity` is removed on request finish. filterBefore := s.bob.transport.FilterByChatID(community.IDString()) s.Require().Nil(filterBefore) s.fetchCommunity(s.bob, community.CommunityShard(), community) filterAfter := s.bob.transport.FilterByChatID(community.IDString()) s.Require().Nil(filterAfter) } func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() { s.createOwner() s.createBob() const descriptionsCount = 4 community := s.createCommunity(s.owner) contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString())) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Push a few descriptions to the store node for i := 0; i < descriptionsCount-1; i++ { err := s.owner.publishOrg(community, false) s.Require().NoError(err) } // Wait for store node to receive envelopes s.waitForEnvelopes(storeNodeSubscription, descriptionsCount-1) // Subscribe to received envelope bobWakuV2 := gethbridge.GetGethWakuV2From(s.bobWaku) var receivedEnvelopes []*wakuV2common.ReceivedMessage s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) { receivedEnvelopes = append(receivedEnvelopes, envelope) }) // Force a single-envelope page size to be able to check the order. // Also force all envelopes to be fetched. options := []StoreNodeRequestOption{ WithWaitForResponseOption(true), WithStopWhenDataFound(false), WithInitialPageSize(1), WithFurtherPageSize(1), } // Fetch the community fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options) s.Require().NoError(err) s.requireCommunitiesEqual(fetchedCommunity, community) // Ensure all expected envelopes were received s.Require().Equal(descriptionsCount, len(receivedEnvelopes)) // We check that each next envelope fetched is newer than the previous one for i := 1; i < len(receivedEnvelopes); i++ { s.Require().Less( receivedEnvelopes[i].Envelope.Message().GetTimestamp(), receivedEnvelopes[i-1].Envelope.Message().GetTimestamp()) } } /* TestFetchRealCommunity is not actually a test, but an utility to check the community description in all of the store nodes. It's intended to only run locally and shouldn't be executed in CI, because it relies on connection to the real network. TODO: It would be nice to move this code to a real utility in /cmd. It should allow us to fairly verify the community owner and do other good things. To run this test, first set `runLocalTests` to true. Then carefully set all of communityID, communityShard, fleet and other const variables. NOTE: I only tested it with the default parameters, but in theory it should work for any configuration. */ type testFetchRealCommunityExampleTokenInfo struct { ChainID uint64 ContractAddress string } var testFetchRealCommunityExample = []struct { CommunityID string CommunityURL string // If set, takes precedence over CommunityID CommunityShard *shard.Shard // WARNING: I didn't test a sharded community Fleet string UseShardAsDefaultTopic bool ClusterID uint16 UserPrivateKeyString string // When empty a new user will be created // Optional request parameters CustomOptions []StoreNodeRequestOption // Setup OwnerPublicKey and CommunityTokens if the community has owner token // This is needed to mock the owner verification OwnerPublicKey string CommunityTokens []testFetchRealCommunityExampleTokenInfo // Fill these if you know what envelopes are expected. // The test will fail if fetched array doesn't equal to the expected one. CheckExpectedEnvelopes bool ExpectedShardEnvelopes []string ExpectedDescriptionEnvelopes []string }{ { //Example 1, status.prod fleet CommunityID: "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a", CommunityShard: nil, Fleet: params.FleetStatusProd, UseShardAsDefaultTopic: false, ClusterID: shard.UndefinedShardValue, }, { // Example 3, shards.test fleet // https://status.app/c/CxiACi8KFGFwIHJlcSAxIHN0dCBiZWMgbWVtEgdkc2Fkc2FkGAMiByM0MzYwREYqAxkrHAM=#zQ3shwDYZHtrLE7NqoTGjTWzWUu6hom5D4qxfskLZfgfyGRyL CommunityID: "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b", Fleet: params.FleetShardsTest, UseShardAsDefaultTopic: true, ClusterID: shard.MainStatusShardCluster, CheckExpectedEnvelopes: true, ExpectedShardEnvelopes: []string{ "0x8173eecd7ff9ebcaae3dde0e704daf9bdeb6d33b0d8505a67e7dc56d0d8fc07c", "0x596bbafbe0e0b625d165378cd4c7641a4d23aa1145c705aad666ddeaf60c88cd", "0x8a1ee798f3657da5a463e5f878ab2455d05b8f552359b58330ccd7fa4f5624b0", "0x97bcde2103a01984bb45a8590a6cb6972411445a1b2d40e181d5f2b5366fa5f1", "0x26e3c0c880d1a2c4e81bf4fffbdb8b7e1ecc91fce7c6a05ee87d200d62ffc11e", "0x1a8820bd61ebcc9de75c25f31c9b05eb6e880a5a4902679bb6ce2f43f61bf159", "0xce450cfb5f79d761f34dea5b2ccec63751886e43ae63477e12f517c31f800aeb", "0x9607bd1cf08355c44bcce055da197ba177201882736fa8874910194ccdaa8760", "0x0c4b989ca69f529e571e6ea8b3230a85e057d8b2ae6147d1fedc2a01f2816ed6", "0xe40ea64c9007a064b6324b614976510f2a433c9f84d87139df8f66b536e37ee4", "0x7a028466a095e40650bb0ef16e903309b0c38c5a7cb7e2e9debd0acf2151448d", "0x96419c6be375b2b348778d4694e3a491de84eecde601d5d405a0e72e9cece4a1", "0xbcaeb5e86128638fab7203428daddd741df44ceeabe7d9d25936a10cd0a8b808", "0x2e0b5872cb5a7c9a3273048eb2dfcb1d6a28faad3fe307a7db6c2dbaca9ce462", "0xfa96bbe4125514ce73c52ef3ccb1c4ad9c4ad4afe8803de8ab9309fe9483b1d0", "0xdeddfc82f70cce77c26959d91851fbc33afe648428c3e6ea349b2a2456b92111", "0x5b12f17d7b712071f57bb48b7dcd0d6568ff5e7c3f8b3811013aea8dde9c6243", "0x18928fd044482c75518162104d487e6fe504f086eb8c5e9f21aa4bce2811d0fa", "0x543c156ced76138d69229a39425a0a1cabd617770e023333c10501b979f52d61", "0xf46ea6bf5ab6a70662bcf227cc5d2c8c7a70ce42a88e5bb7ebe9e598668a8ae2", "0xedb9628dc1ce5b0ec899c3813dd4159a2e06fb3dc88ffaae047e927c804ad0b3", "0x16248eccc3544af3fc4a73467d0925ca2f3741eb623516ee369f710e4aa8a3ef", "0x6a85f784a9004b56bbb47d87f5541173f05bd61ff5b26e41c714adbb5516e9ff", "0x91e320be2cb5c6178027390cbce165fe088a1a35e1442382064ddbc9aabea8a2", "0x676496dd36ae40e184863725bfb7425e46d916f73f4b0dd5d10324f4e9325da6", }, ExpectedDescriptionEnvelopes: []string{ "0xe2c38667ee160861b3dc5a00e4422f47de1303c8b61f0a33c4853ce0b71d0ae4", "0x7d8392baa9dd134e43287e58d69b8c9f50aa5c144adda6a3c7d32f00c5dea309", "0xf74918d445709ccf9c29e776d27e9b7dc31f25a28473e8fcd89ed9de8a2e6df4", "0xafd7e9b6245d88ea2b4fe70265aa3c5ce5618827c1092f8e3058315ac27c5b98", "0x4d16bd4fcfc2d8736dc29d8c7287671f7e1df62af74943dc40a39ae18f388a07", "0xd101f14bcf6bb9a5e72b934b3e74ebe3f77774037cb5b193803b264d69bfb9bf", "0xaa857389e8886401678690bb4dbc664486bd7039427ca53e826197b303696cff", "0x4443448e575824330a96d5114a9a7d3fe0ee7168ab6c7a646057ca4502fb91d2", "0xfbb79b8d0ee1a61109c543cbe580412fe6d23de33d163006f74fef4addbdad37", "0x51bc55c732e0e9db40fd8865a8ae20cbd99bfd4f95c62cf8b591e793d9b642d2", "0xc28ca742c0e159a60941fb68ee5832016b510afe54e8ee8bb2200495ac29f1e3", "0xf44f1714743a55f0170ba3627390d84cb9307326028abf7236222c93104db833", "0x69d067da262b124d2eb6a8ba9f08e0a1ad66afb8d7ff3641a256992fff53a6de", "0xa86a08457854fbea347cdd92fd390a330a9971967c551bebc53b18ccfea876fe", "0x592850f8bc3c6079826f168971746bd2a1b50a5011fe3b233ead6c72c92a3373", "0x1b3151d7b9b37350e86c937dc2e7964d472815bbf9275714bcb16a0c4327fe3d", "0x1f2beca64e52f996127647cf3f3abd2e4fe501646fc39e98713ae064333388b3", "0xeafc8f9d3114426c08748e6171710874074fb1eb732d9364830ce9b58955c83c", "0x6e014a5ec75465efaf036353ec5811b8f710f608a04002925eba3a0a37a30423", "0xf7e12a5829cf90e4272132e7b62c5bf0dd09100c8d498c66c9740a798db559b3", "0x62da7e828862c3c8692cbd077c5a62266d811764184378e55f9f1066510b4652", "0x74201394d05b914bc7e6ce4d2fcf0b119491c80644f48ac9fd37b842e4a0275a", "0x9d4c0b1be53810c45c2fa744baa8d16ef4ae3a319b09043f4b4e053127461bf0", "0x1937979514ea1dba8ab3b621fc3d0a3f6246b4bf1f9b4073888b8dbd9b4a765a", "0xf767f3f36fdecb5ef6650542232334df836bfca1e7f72f1215df50d3f9f9c9bc", "0x3a06002325502bc39a77962241fe274d4e88f61762194d321f9cc95272ed4a74", "0x13caae58261c181d4974d7a68e0b7c8580c3cc569840179d53ae76407548d8b8", "0x36f3b4afbcc4177a7aef26ad567839ffce51896d4c40d0a08d222cebd1255e3b", "0x159e685bbab26a5d54ab817c93c9c610055bcf2af75290abcc9a84f1b85a2de9", "0x1fd5ff73d7ea9a19f282bd0716f04a5e86b7c515839f0c721f66b3fe99161054", "0x95b1e9ada4913ca809c9c28fc225a21753f18a90253660750900c78f79ad2a00", "0x4334826934a7cbfb7446ec9d581fa6433c5d1f7f51b97f24717f55cffa320c65", "0x0c01d07108c448797ffd14a2152fd38d1764c8a9c5e2f3da12f70551588add7a", "0xe7071c6587fc277c4f4c0d7e4575de1a0843d3cf6c2a4aac79be79edc1608038", "0x5da4e482f3e6eacf080db685e00c199c8cbbad9a8f43b1d94944426444a7a84a", "0x638f551acdd7ccffd6a40ad12bbba1da8fd8a58157fdf9625b12d4a95b4eef71", "0xa1a52c28e0481f6004d98bbce906676fff67f04246454bd33fba02c640355af0", "0xe0300eb9e0f215ace491b1104665b48b9f6bff039af40e0cfc52a3ce766e747f", "0xd092c04d51ee963d59953324d84188a0c1636e8600cb0f5f6f3f4f826d70c8f3", "0x8d94bbfee687d534361fc3069079cf4e4f7db2a179d24e6419f67e38b5f0bd34", "0x1fd7a4d2c04fca3875126b7a951f619b4da0000ca47496df0c2fb1048a145108", "0xbbefbd116cbb23de193318b328412addc500af965d31ba481d70fa1d9e99461d", "0xaa4e0e8bd820438e22b93371bda24a29922d33c15fb312b343d2e81a22cbdd95", "0x76aef29ea4dde107c22c520efb2a4516b69ae83bc237281d9990f68397d801f5", "0x804789119513a065d892cba5d240cb4d89d7329aeee93fcd8e85379a4d362fc9", "0x9029b4a13903a3369e3466f1bfabae3f26b6721628db138eaba25c1f55f6fc1e", "0xee38c209cb95035a289034c737e5775877145efea31b2a01f7c9241ff02f3e92", "0x3e76da87895ca821db3b7ed7dc6949557c620d9cbcaf97af39ed4955d37b734e", "0xf5e77eb8f9a5c52e09a56dd5e461bfee6cf9a73e1253f1d41bcde81fe3646997", "0x083e06375c366283e541b249ea8646c3f31feb970078e95861ea399f0a57d09a", "0xcd7db07ba557ec1ba0104909fdb958661c60c82213a75e8d15e7b262ef4f58b7", "0x57b49dc83e1d3ac7b56bb7d758a9bf448339593311103bce4f0a53028587d577", "0xb08cd92a5ec6f44a6129a60107132ca17d5fef29fb2bc5ebba14028d57a8038e", "0x76959f98c8c734307a985294c26f008f3f705912aab02a5b3a0602a8598c02e2", "0xd8ad7df58ffeec20b16a140bdd91484a34fca3ad7fc602043530ca63c307809a", "0x6872ef39653bb208ef51f5b11c4bda3eb494a13f5da14e33687d30aef99ef383", "0x0fabebc2e0c02f4add94886216b01ecdbd34929ad303d1a20a4505bf729038f6", "0xc18270cd532bb3d34f62704feb722c40be48aedb5ad38d4d09fd67f5843b686d", "0xbc16217ece82998783d4209ed3bc3f2f33d92630e43933b0129eb8b792500a3f", "0xbda651e3b9c82f4bcf5b16252407fc888952820c842c49c06b4f01c8127e359a", "0xb4b1799950c6aca3b011ffb775d0f973437d7d46e40cf7b379ff736d08f24eb2", "0x38f12fb09c71dd720cacbb2102ac78ad6fbf830558adc7af9fb773f39e728bdc", "0x489eb6fa2f5ee5b2a071c7083bf36a0a6cb4ec96049707d25843d9a97b4ac7be", "0x64ea5655c8caf89a53c94edd5a47ba750d9fbcf099ec0dcd4026656b044486f1", "0x501aee1c5da6aaeaae14abffefbc377b59ebe3fcaa9981bc83bfeffb25344749", "0x9a3d360ea866102a6268ffd2001617c442b74b221d131fb3c08ae29bfac18203", }, }, { //Example 1, shards.test fleet CommunityID: "0x02471dd922756a3a50b623e59cf3b99355d6587e43d5c517eb55f9aea9d3fe9fe9", Fleet: params.FleetShardsTest, UseShardAsDefaultTopic: true, ClusterID: shard.MainStatusShardCluster, CheckExpectedEnvelopes: true, ExpectedShardEnvelopes: []string{ "0xc3e68e838d09e0117b3f3fd27aabe5f5a509d13e9045263c78e6890953d43547", "0x5ee13d052bedb855ce2b9ba6f43c78233fbd4e6539a3bdf156497053c6ddf76d", "0xfb6638b7e050f9323a0fe7b84986b5c6f8827965e67e3b3bd0fea21cf24e43de", }, ExpectedDescriptionEnvelopes: []string{ "0x5b4fa95d430c939c1cbbb26175eabfb4ee058d508c6b4c0e26624958ba02c3ce", "0xbf44409ee40dea7816186b37a45dfebabcee59f76855ad5af663ccdf598861ab", "0x98d98453f6017517d0114989da0938aad59a3ad9a10839c181f453283f64f5c9", }, }, { CommunityURL: "https://status.app/c/G4IAAMQn9ucHF-V3W5Ouuy0xf0BtTjlwCANJEmwB2CG5p2xKUYzK_l37kzXulUppltT1t6mBcCEJsljRoGrKCP7rWommQomrMA2gBN7RrvCMkFqQwnCNzkNYWrLG85E6GVoM_nolTtfIzl53J1N-tj8fz4_TnO4IIw==#zQ3shZeEJqTC1xhGUjxuS4rtHSrhJ8vUYp64v6qWkLpvdy9L9", //CommunityID: "0x02b5bdaf5a25fcfe2ee14c501fab1836b8de57f61621080c3d52073d16de0d98d6", Fleet: params.FleetShardsTest, UseShardAsDefaultTopic: true, OwnerPublicKey: "0x04953f5f0d355b37c39d1d6460a31ed1114455f8263b3fd1b84406c5f12c9eb7dfb76ba7513b92186010928254984fe98aee069b4c7e20f9ea3da497c3ae769477", CommunityTokens: []testFetchRealCommunityExampleTokenInfo{ { ChainID: 10, ContractAddress: "0x9eDc11E5932372387E76ff3dcF66DB5465893823", }, { ChainID: 10, ContractAddress: "0xD91d2E898f996308D643E3b9C78f5FEb5c5F404F", }, { ChainID: 10, ContractAddress: "0x852E13D2BDFC4C3a761DA7B450f631b555b39C5E", }, { ChainID: 10, ContractAddress: "0x000CfAd029EE94d120e0c136278582F94Cdc532c", }, { ChainID: 10, ContractAddress: "0x21F6F5Cb75E81e5104D890D750270eD6538C50cb", }, }, ClusterID: shard.MainStatusShardCluster, CheckExpectedEnvelopes: false, CustomOptions: []StoreNodeRequestOption{ WithInitialPageSize(1), WithStopWhenDataFound(true), }, }, } func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { if !runLocalTests { return } exampleToRun := testFetchRealCommunityExample[3] // Test configuration communityID := exampleToRun.CommunityID communityShard := exampleToRun.CommunityShard fleet := exampleToRun.Fleet useShardAsDefaultTopic := exampleToRun.UseShardAsDefaultTopic clusterID := exampleToRun.ClusterID userPrivateKeyString := exampleToRun.UserPrivateKeyString ownerPublicKey := exampleToRun.OwnerPublicKey communityTokens := exampleToRun.CommunityTokens if exampleToRun.CommunityURL != "" { urlResponse, err := ParseSharedURL(exampleToRun.CommunityURL) s.Require().NoError(err) s.Require().NotNil(urlResponse.Community) communityID = urlResponse.Community.CommunityID } // Prepare things depending on the configuration nodesList := mailserversDB.DefaultMailserversByFleet(fleet) descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID)) shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID))) communityIDBytes, err := types.DecodeHex(communityID) s.Require().NoError(err) // update mock - the signer for the community returned by the contracts should be owner for _, communityToken := range communityTokens { s.collectiblesServiceMock.SetSignerPubkeyForCommunity(communityIDBytes, ownerPublicKey) s.collectiblesServiceMock.SetMockCollectibleContractData(communityToken.ChainID, communityToken.ContractAddress, &communities.CollectibleContractData{TotalSupply: &bigint.BigInt{}}) } results := map[string]singleResult{} wg := sync.WaitGroup{} // We run a separate request for each node in the fleet. for i, mailserver := range nodesList { wg.Add(1) go func(i int, mailserver mailserversDB.Mailserver) { defer wg.Done() fmt.Printf("--- starting request [%d] from %s\n", i, mailserver.ID) result := singleResult{} // // Create WakuV2 node // NOTE: Another option was to create a bare waku node and fetch envelopes directly with it // and after that push all of the envelopes to a new messenger and check the result. // But this turned out to be harder to implement. // wakuLogger := s.logger.Named(fmt.Sprintf("user-waku-%d", i)) messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i)) cfg := testWakuV2Config{ logger: wakuLogger, enableStore: false, useShardAsDefaultTopic: useShardAsDefaultTopic, clusterID: clusterID, } wakuV2 := NewTestWakuV2(&s.Suite, cfg) userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2) // // Create a messenger to process envelopes // var privateKeyString = userPrivateKeyString if privateKeyString == "" { privateKey, err := crypto.GenerateKey() s.Require().NoError(err) privateKeyString = hexutil.Encode(crypto.FromECDSA(privateKey)) } privateKeyBytes, err := hexutil.Decode(privateKeyString) s.Require().NoError(err) privateKey, err := crypto.ToECDSA(privateKeyBytes) s.Require().NoError(err) // Mock a local fleet with single store node // This is done by settings custom store nodes in the database mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) s.Require().NoError(err) mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) mailserver.Fleet = localFleet err = mailserversDatabase.Add(mailserver) s.Require().NoError(err) options := []Option{ WithMailserversDatabase(mailserversDatabase), WithClusterConfig(params.ClusterConfig{ Fleet: localFleet, ClusterID: clusterID, }), WithCommunityTokensService(s.collectiblesServiceMock), } // Create user without `createBob` func to force desired fleet user, err := newMessengerWithKey(userWaku, privateKey, messengerLogger, options) s.Require().NoError(err) defer TearDownMessenger(&s.Suite, user) communityAddress := communities.CommunityShard{ CommunityID: communityID, Shard: communityShard, } // Setup envelopes watcher to gather fetched envelopes s.setupEnvelopesWatcher(wakuV2, &shardContentTopic, func(envelope *wakuV2common.ReceivedMessage) { result.ShardEnvelopes = append(result.ShardEnvelopes, envelope) }) s.setupEnvelopesWatcher(wakuV2, &descriptionContentTopic, func(envelope *wakuV2common.ReceivedMessage) { result.Envelopes = append(result.Envelopes, envelope) }) // Start fetching storeNodeRequestOptions := []StoreNodeRequestOption{ WithWaitForResponseOption(true), WithStopWhenDataFound(false), // In this test we want all envelopes to be fetched WithInitialPageSize(defaultStoreNodeRequestPageSize), // Because we're fetching all envelopes anyway } storeNodeRequestOptions = append(storeNodeRequestOptions, exampleToRun.CustomOptions...) fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions) result.EnvelopesCount = stats.FetchedEnvelopesCount result.FetchedCommunity = fetchedCommunity result.Error = err results[mailserver.ID] = result }(i, mailserver) } // Wait for all requests to finish wg.Wait() // Print the results for storeNodeName, result := range results { fmt.Printf("%s --- %s\n", storeNodeName, result.toString()) } // Check that results has no errors and contain correct envelopes for storeNodeName, result := range results { s.Require().NoError(result.Error) if exampleToRun.CheckExpectedEnvelopes { s.Require().Equal(exampleToRun.ExpectedShardEnvelopes, result.ShardEnvelopesHashes(), fmt.Sprintf("wrong shard envelopes for store node %s", storeNodeName)) s.Require().Equal(exampleToRun.ExpectedDescriptionEnvelopes, result.EnvelopesHashes(), fmt.Sprintf("wrong envelopes for store node %s", storeNodeName)) } } } func (s *MessengerStoreNodeRequestSuite) TestFetchingCommunityWithOwnerToken() { s.createOwner() s.createBob() s.waitForAvailableStoreNode(s.owner) community := s.createCommunity(s.owner) // owner mints owner token var chainID uint64 = 1 tokenAddress := "token-address" tokenName := "tokenName" tokenSymbol := "TSM" _, err := s.owner.SaveCommunityToken(&token.CommunityToken{ TokenType: protobuf.CommunityTokenType_ERC721, CommunityID: community.IDString(), Address: tokenAddress, ChainID: int(chainID), Name: tokenName, Supply: &bigint.BigInt{}, Symbol: tokenSymbol, PrivilegesLevel: token.OwnerLevel, }, nil) s.Require().NoError(err) // owner adds minted owner token to community err = s.owner.AddCommunityToken(community.IDString(), int(chainID), tokenAddress) s.Require().NoError(err) // update mock - the signer for the community returned by the contracts should be owner s.collectiblesServiceMock.SetSignerPubkeyForCommunity(community.ID(), common.PubkeyToHex(&s.owner.identity.PublicKey)) s.collectiblesServiceMock.SetMockCollectibleContractData(chainID, tokenAddress, &communities.CollectibleContractData{TotalSupply: &bigint.BigInt{}}) community, err = s.owner.communitiesManager.GetByID(community.ID()) s.Require().NoError(err) s.Require().Len(community.TokenPermissions(), 1) s.waitForAvailableStoreNode(s.bob) s.fetchCommunity(s.bob, community.CommunityShard(), community) } func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { storeAddress := s.storeNodeAddress storePeerID := s.wakuStoreNode.PeerID().String() // Create messengers s.createOwner() s.createBob() s.logger.Debug("store node info", zap.String("peerID", s.wakuStoreNode.PeerID().String())) s.logger.Debug("owner node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.ownerWaku).PeerID().String())) s.logger.Debug("bob node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.bobWaku).PeerID().String())) // Connect to store node to force "online" status { WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() []string { err := s.bob.DialPeer(storeAddress) s.Require().NoError(err) return []string{storePeerID} }) s.Require().True(s.bob.Online()) // Wait for bob to fetch backup and historic messages time.Sleep(2 * time.Second) } // bob goes offline { WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() bool { err := s.bob.DropPeer(storePeerID) s.Require().NoError(err) return false }) s.Require().False(s.bob.Online()) } // Owner sends a contact request while bob is offline { // Setup store nodes envelopes watcher partitionedTopic := transport.PartitionedTopic(s.bob.IdentityPublicKey()) topic := transport.ToTopic(partitionedTopic) contentTopic := wakuV2common.BytesToTopic(topic) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) // Send contact request response, err := s.owner.SendContactRequest(context.Background(), &requests.SendContactRequest{ ID: s.bob.IdentityPublicKeyString(), Message: "1", }) s.Require().NoError(err) s.Require().NotNil(response) s.Require().Len(response.Messages(), 2) // Ensure contact request is stored s.waitForEnvelopes(storeNodeSubscription, 1) } // owner goes offline to prevent message resend and any other side effects // to go offline we disconnect from both relay and store peers WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() bool { err := s.owner.DropPeer(storePeerID) s.Require().NoError(err) return false }) s.Require().False(s.owner.Online()) // bob goes back online, this should trigger fetching historic messages { // Enable auto request historic messages, so that when bob goes online it will fetch historic messages // We don't enable it earlier to control when we connect to the store node. s.bob.config.codeControlFlags.AutoRequestHistoricMessages = true WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() []string { err := s.bob.DialPeer(storeAddress) s.Require().NoError(err) return []string{storePeerID} }) s.Require().True(s.bob.Online()) // Don't dial the peer, message should be fetched from store node response, err := WaitOnMessengerResponse( s.bob, func(r *MessengerResponse) bool { return len(r.Contacts) == 1 }, "no contact request received", ) s.Require().NoError(err) s.Require().NotNil(response) s.Require().Len(response.Contacts, 1) } }