chore: TestFetchingHistoryWhenOnline (#4701)

This commit is contained in:
Igor Sirotin 2024-02-16 18:34:04 +00:00 committed by GitHub
parent 8a3e71378f
commit f0e6fd31de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 145 additions and 0 deletions

View File

@ -1121,3 +1121,95 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchingCommunityWithOwnerToken() {
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() {
storeAddress := s.storeNodeAddress
storePeerID := s.wakuStoreNode.PeerID().String()
// Create messengers
s.createOwner()
s.createBob()
s.logger.Debug("store node info", zap.String("peerID", s.wakuStoreNode.PeerID().String()))
s.logger.Debug("owner node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.ownerWaku).PeerID().String()))
s.logger.Debug("bob node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.bobWaku).PeerID().String()))
// Connect to store node to force "online" status
{
WaitForPeerConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() string {
err := s.bob.DialPeer(storeAddress)
s.Require().NoError(err)
return storePeerID
})
s.Require().True(s.bob.Online())
// Wait for bob to fetch backup and historic messages
time.Sleep(2 * time.Second)
}
// bob goes offline
{
WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() bool {
err := s.bob.DropPeer(storePeerID)
s.Require().NoError(err)
return false
})
s.Require().False(s.bob.Online())
}
// Owner sends a contact request while bob is offline
{
// Setup store nodes envelopes watcher
partitionedTopic := transport.PartitionedTopic(s.bob.IdentityPublicKey())
topic := transport.ToTopic(partitionedTopic)
contentTopic := wakuV2common.BytesToTopic(topic)
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Send contact request
response, err := s.owner.SendContactRequest(context.Background(), &requests.SendContactRequest{
ID: s.bob.IdentityPublicKeyString(),
Message: "1",
})
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Messages(), 2)
// Ensure contact request is stored
s.waitForEnvelopes(storeNodeSubscription, 1)
}
// owner goes offline to prevent message resend and any other side effects
// to go offline we disconnect from both relay and store peers
WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() bool {
err := s.owner.DropPeer(storePeerID)
s.Require().NoError(err)
return false
})
s.Require().False(s.owner.Online())
// bob goes back online, this should trigger fetching historic messages
{
// Enable auto request historic messages, so that when bob goes online it will fetch historic messages
// We don't enable it earlier to control when we connect to the store node.
s.bob.config.featureFlags.AutoRequestHistoricMessages = true
WaitForPeerConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() string {
err := s.bob.DialPeer(storeAddress)
s.Require().NoError(err)
return storePeerID
})
s.Require().True(s.bob.Online())
// Don't dial the peer, message should be fetched from store node
response, err := WaitOnMessengerResponse(
s.bob,
func(r *MessengerResponse) bool {
return len(r.Contacts) == 1
},
"no contact request received",
)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Contacts, 1)
}
}

View File

@ -4,10 +4,13 @@ import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"sync"
"time"
waku2 "github.com/status-im/status-go/wakuv2"
"golang.org/x/exp/maps"
"github.com/stretchr/testify/suite"
@ -138,6 +141,56 @@ func WaitOnSignaledCommunityFound(m *Messenger, action func(), condition func(co
}
}
func WaitForConnectionStatus(s *suite.Suite, waku *waku2.Waku, action func() bool) {
subscription := waku.SubscribeToConnStatusChanges()
defer subscription.Unsubscribe()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Action should return the desired online status
wantedOnline := action()
for {
select {
case status := <-subscription.C:
if status.IsOnline == wantedOnline {
return
}
case <-ctx.Done():
s.Require().Fail(fmt.Sprintf("timeout waiting for waku connection status '%t'", wantedOnline))
return
}
}
}
func WaitForPeerConnected(s *suite.Suite, waku *waku2.Waku, action func() string) {
subscription := waku.SubscribeToConnStatusChanges()
defer subscription.Unsubscribe()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Action should return the desired peer ID
peerID := action()
if _, ok := waku.Peers()[peerID]; ok {
return
}
for {
select {
case status := <-subscription.C:
if _, ok := status.Peers[peerID]; ok {
return
}
case <-ctx.Done():
s.Require().Fail(fmt.Sprintf("timeout waiting for peer connected '%s'", peerID))
return
}
}
}
func FindFirstByContentType(messages []*common.Message, contentType protobuf.ChatMessage_ContentType) *common.Message {
for _, message := range messages {
if message.ContentType == contentType {