From f0e6fd31dea202a9b765bbf50370cf15a3fec3bc Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 16 Feb 2024 18:34:04 +0000 Subject: [PATCH] chore: TestFetchingHistoryWhenOnline (#4701) --- protocol/messenger_storenode_request_test.go | 92 ++++++++++++++++++++ protocol/messenger_testing_utils.go | 53 +++++++++++ 2 files changed, 145 insertions(+) diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index 9a48fcca7..4ae07a93a 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -1121,3 +1121,95 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingCommunityWithOwnerToken() { s.fetchCommunity(s.bob, community.CommunityShard(), community) } + +func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() { + storeAddress := s.storeNodeAddress + storePeerID := s.wakuStoreNode.PeerID().String() + + // Create messengers + s.createOwner() + s.createBob() + + s.logger.Debug("store node info", zap.String("peerID", s.wakuStoreNode.PeerID().String())) + s.logger.Debug("owner node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.ownerWaku).PeerID().String())) + s.logger.Debug("bob node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.bobWaku).PeerID().String())) + + // Connect to store node to force "online" status + { + WaitForPeerConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() string { + err := s.bob.DialPeer(storeAddress) + s.Require().NoError(err) + return storePeerID + }) + s.Require().True(s.bob.Online()) + + // Wait for bob to fetch backup and historic messages + time.Sleep(2 * time.Second) + } + + // bob goes offline + { + WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() bool { + err := s.bob.DropPeer(storePeerID) + s.Require().NoError(err) + return false + }) + s.Require().False(s.bob.Online()) + } + + // Owner sends a contact request while bob is offline + { + // Setup store nodes envelopes watcher + partitionedTopic := transport.PartitionedTopic(s.bob.IdentityPublicKey()) + topic := transport.ToTopic(partitionedTopic) + contentTopic := wakuV2common.BytesToTopic(topic) + storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic) + + // Send contact request + response, err := s.owner.SendContactRequest(context.Background(), &requests.SendContactRequest{ + ID: s.bob.IdentityPublicKeyString(), + Message: "1", + }) + s.Require().NoError(err) + s.Require().NotNil(response) + s.Require().Len(response.Messages(), 2) + + // Ensure contact request is stored + s.waitForEnvelopes(storeNodeSubscription, 1) + } + + // owner goes offline to prevent message resend and any other side effects + // to go offline we disconnect from both relay and store peers + WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() bool { + err := s.owner.DropPeer(storePeerID) + s.Require().NoError(err) + return false + }) + s.Require().False(s.owner.Online()) + + // bob goes back online, this should trigger fetching historic messages + { + // Enable auto request historic messages, so that when bob goes online it will fetch historic messages + // We don't enable it earlier to control when we connect to the store node. + s.bob.config.featureFlags.AutoRequestHistoricMessages = true + + WaitForPeerConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() string { + err := s.bob.DialPeer(storeAddress) + s.Require().NoError(err) + return storePeerID + }) + s.Require().True(s.bob.Online()) + + // Don't dial the peer, message should be fetched from store node + response, err := WaitOnMessengerResponse( + s.bob, + func(r *MessengerResponse) bool { + return len(r.Contacts) == 1 + }, + "no contact request received", + ) + s.Require().NoError(err) + s.Require().NotNil(response) + s.Require().Len(response.Contacts, 1) + } +} diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 55bc695d2..7b8560d5f 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -4,10 +4,13 @@ import ( "context" "crypto/rand" "errors" + "fmt" "math/big" "sync" "time" + waku2 "github.com/status-im/status-go/wakuv2" + "golang.org/x/exp/maps" "github.com/stretchr/testify/suite" @@ -138,6 +141,56 @@ func WaitOnSignaledCommunityFound(m *Messenger, action func(), condition func(co } } +func WaitForConnectionStatus(s *suite.Suite, waku *waku2.Waku, action func() bool) { + subscription := waku.SubscribeToConnStatusChanges() + defer subscription.Unsubscribe() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Action should return the desired online status + wantedOnline := action() + + for { + select { + case status := <-subscription.C: + if status.IsOnline == wantedOnline { + return + } + case <-ctx.Done(): + s.Require().Fail(fmt.Sprintf("timeout waiting for waku connection status '%t'", wantedOnline)) + return + } + } +} + +func WaitForPeerConnected(s *suite.Suite, waku *waku2.Waku, action func() string) { + subscription := waku.SubscribeToConnStatusChanges() + defer subscription.Unsubscribe() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Action should return the desired peer ID + peerID := action() + + if _, ok := waku.Peers()[peerID]; ok { + return + } + + for { + select { + case status := <-subscription.C: + if _, ok := status.Peers[peerID]; ok { + return + } + case <-ctx.Done(): + s.Require().Fail(fmt.Sprintf("timeout waiting for peer connected '%s'", peerID)) + return + } + } +} + func FindFirstByContentType(messages []*common.Message, contentType protobuf.ChatMessage_ContentType) *common.Message { for _, message := range messages { if message.ContentType == contentType {