status-go/protocol/messenger_storenode_request...

1324 lines
48 KiB
Go
Raw Permalink Normal View History

package protocol
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
2024-05-15 23:15:00 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/tt"
mailserversDB "github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/t/helpers"
waku2 "github.com/status-im/status-go/wakuv2"
wakuV2common "github.com/status-im/status-go/wakuv2/common"
)
const (
localFleet = "local-test-fleet-1"
localMailserverID = "local-test-mailserver"
storeNodeConnectTimeout = 500 * time.Millisecond
runLocalTests = false
)
func TestMessengerStoreNodeRequestSuite(t *testing.T) {
t.Skip("requires storev3 node")
suite.Run(t, new(MessengerStoreNodeRequestSuite))
}
type MessengerStoreNodeRequestSuite struct {
suite.Suite
cancel chan struct{}
owner *Messenger
bob *Messenger
wakuStoreNode *waku2.Waku
storeNodeAddress multiaddr.Multiaddr
ownerWaku types.Waku
bobWaku types.Waku
collectiblesServiceMock *CollectiblesServiceMock
logger *zap.Logger
}
type singleResult struct {
EnvelopesCount int
Envelopes []*wakuV2common.ReceivedMessage
ShardEnvelopes []*wakuV2common.ReceivedMessage
Error error
FetchedCommunity *communities.Community
}
func (r *singleResult) ShardEnvelopesHashes() []string {
out := make([]string, 0, len(r.ShardEnvelopes))
for _, e := range r.ShardEnvelopes {
out = append(out, e.Hash().String())
}
return out
}
func (r *singleResult) EnvelopesHashes() []string {
out := make([]string, 0, len(r.Envelopes))
for _, e := range r.Envelopes {
out = append(out, e.Hash().String())
}
return out
}
func (r *singleResult) toString() string {
resultString := ""
communityString := ""
if r.FetchedCommunity != nil {
communityString = fmt.Sprintf("clock: %d, name: '%s', members: %d",
r.FetchedCommunity.Clock(),
r.FetchedCommunity.Name(),
len(r.FetchedCommunity.Members()),
)
}
if r.Error != nil {
resultString = fmt.Sprintf("error: %s", r.Error.Error())
} else {
resultString = fmt.Sprintf("envelopes fetched: %d, community - %s",
r.EnvelopesCount, communityString)
}
for i, envelope := range r.ShardEnvelopes {
resultString += fmt.Sprintf("\n\tshard envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
i+1,
envelope.Hash().Hex(),
envelope.Envelope.Message().GetTimestamp(),
time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(),
len(envelope.Envelope.Message().Payload),
envelope.Envelope.Message().ContentTopic,
envelope.Envelope.PubsubTopic(),
)
}
for i, envelope := range r.Envelopes {
resultString += fmt.Sprintf("\n\tdescription envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
i+1,
envelope.Hash().Hex(),
envelope.Envelope.Message().GetTimestamp(),
time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(),
len(envelope.Envelope.Message().Payload),
envelope.Envelope.Message().ContentTopic,
envelope.Envelope.PubsubTopic(),
)
}
return resultString
}
func (s *MessengerStoreNodeRequestSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()
s.cancel = make(chan struct{}, 10)
s.collectiblesServiceMock = &CollectiblesServiceMock{}
s.createStore()
}
func (s *MessengerStoreNodeRequestSuite) TearDown() {
close(s.cancel)
s.Require().NoError(s.wakuStoreNode.Stop())
TearDownMessenger(&s.Suite, s.owner)
TearDownMessenger(&s.Suite, s.bob)
}
func (s *MessengerStoreNodeRequestSuite) createStore() {
cfg := testWakuV2Config{
logger: s.logger.Named("store-waku"),
enableStore: true,
clusterID: shard.MainStatusShardCluster,
}
s.wakuStoreNode = NewTestWakuV2(&s.Suite, cfg)
s.storeNodeAddress = s.wakuListenAddress(s.wakuStoreNode)
s.logger.Info("store node ready", zap.Stringer("address", s.storeNodeAddress))
}
func (s *MessengerStoreNodeRequestSuite) tearDownOwner() {
_ = gethbridge.GetGethWakuV2From(s.ownerWaku).Stop()
TearDownMessenger(&s.Suite, s.owner)
}
func (s *MessengerStoreNodeRequestSuite) createOwner() {
cfg := testWakuV2Config{
logger: s.logger.Named("owner-waku"),
enableStore: false,
clusterID: shard.MainStatusShardCluster,
}
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("owner-messenger")
s.owner = s.newMessenger(s.ownerWaku, messengerLogger, &s.storeNodeAddress)
// We force the owner to use the store node as relay peer
WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() peer.IDSlice {
err := s.owner.DialPeer(s.storeNodeAddress)
s.Require().NoError(err)
return peer.IDSlice{s.wakuStoreNode.PeerID()}
})
}
func (s *MessengerStoreNodeRequestSuite) createBob() {
cfg := testWakuV2Config{
logger: s.logger.Named("bob-waku"),
enableStore: false,
clusterID: shard.MainStatusShardCluster,
}
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("bob-messenger")
s.bob = s.newMessenger(s.bobWaku, messengerLogger, &s.storeNodeAddress)
}
func (s *MessengerStoreNodeRequestSuite) tearDownBob() {
_ = gethbridge.GetGethWakuV2From(s.bobWaku).Stop()
TearDownMessenger(&s.Suite, s.bob)
}
func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress *multiaddr.Multiaddr) *Messenger {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
options := []Option{
WithAutoRequestHistoricMessages(false),
WithCuratedCommunitiesUpdateLoop(false),
}
if mailserverAddress != nil {
options = append(options,
WithTestStoreNode(&s.Suite, localMailserverID, *mailserverAddress, localFleet, s.collectiblesServiceMock),
)
}
messenger, err := newMessengerWithKey(shh, privateKey, logger, options)
s.Require().NoError(err)
return messenger
}
func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community {
s.waitForAvailableStoreNode(m)
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil)
createCommunityRequest := &requests.CreateCommunity{
Name: RandomLettersString(10),
Description: RandomLettersString(20),
Color: RandomColor(),
Tags: RandomCommunityTags(3),
Membership: protobuf.CommunityPermissions_AUTO_ACCEPT,
}
response, err := m.CreateCommunity(createCommunityRequest, false)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
s.waitForEnvelopes(storeNodeSubscription, 1)
return response.Communities()[0]
}
func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(c *communities.Community, expected *communities.Community) {
if expected == nil {
s.Require().Nil(c)
return
}
s.Require().NotNil(c)
s.Require().Equal(expected.IDString(), c.IDString())
s.Require().Equal(expected.Clock(), c.Clock())
s.Require().Equal(expected.Name(), c.Name())
s.Require().Equal(expected.Identity().Description, c.Identity().Description)
s.Require().Equal(expected.Color(), c.Color())
s.Require().Equal(expected.Tags(), c.Tags())
s.Require().Equal(expected.Shard(), c.Shard())
s.Require().Equal(len(expected.TokenPermissions()), len(c.TokenPermissions()))
for k, v := range expected.TokenPermissions() {
s.Require().True(proto.Equal(v, c.TokenPermissions()[k]))
}
s.Require().Equal(len(expected.CommunityTokensMetadata()), len(c.CommunityTokensMetadata()))
for i, v := range expected.CommunityTokensMetadata() {
s.Require().True(proto.Equal(v, c.CommunityTokensMetadata()[i]))
}
}
func (s *MessengerStoreNodeRequestSuite) requireContactsEqual(c *Contact, expected *Contact) {
s.Require().Equal(expected.DisplayName, c.DisplayName)
s.Require().Equal(expected.Bio, c.Bio)
}
func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunity *communities.Community) StoreNodeRequestStats {
options := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
}
fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options)
s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity)
return stats
}
func (s *MessengerStoreNodeRequestSuite) fetchProfile(m *Messenger, contactID string, expectedContact *Contact) {
fetchedContact, err := m.FetchContact(contactID, true)
s.Require().NoError(err)
s.Require().NotNil(fetchedContact)
s.Require().Equal(contactID, fetchedContact.ID)
if expectedContact != nil {
s.requireContactsEqual(fetchedContact, expectedContact)
}
}
func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Messenger) {
WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout)
}
func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) {
envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100)
envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher)
go func() {
defer envelopesSub.Unsubscribe()
for {
select {
case <-s.cancel:
return
case envelopeEvent := <-envelopesWatcher:
if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable {
continue
}
if topic != nil && *topic != envelopeEvent.Topic {
continue
}
envelope := wakuNode.GetEnvelope(envelopeEvent.Hash)
cb(envelope)
s.logger.Debug("envelope available event for fetched content topic",
zap.Any("envelopeEvent", envelopeEvent),
zap.Any("envelope", envelope),
)
}
}
}()
}
func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string {
storeNodeSubscription := make(chan string, 100)
s.setupEnvelopesWatcher(s.wakuStoreNode, topic, func(envelope *wakuV2common.ReceivedMessage) {
storeNodeSubscription <- envelope.Hash().String()
})
return storeNodeSubscription
}
func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan string, expectedEnvelopesCount int) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for i := 0; i < expectedEnvelopesCount; i++ {
select {
case <-subscription:
case <-ctx.Done():
err := fmt.Sprintf("timeout waiting for store node to receive envelopes, received: %d, expected: %d", i, expectedEnvelopesCount)
s.Require().Fail(err)
}
}
}
func (s *MessengerStoreNodeRequestSuite) wakuListenAddress(waku *waku2.Waku) multiaddr.Multiaddr {
addresses := waku.ListenAddresses()
s.Require().LessOrEqual(1, len(addresses))
return addresses[0]
}
func (s *MessengerStoreNodeRequestSuite) ensureStoreNodeEnvelopes(contentTopic *wakuV2common.TopicType, minimumCount int) {
// Give some time for store node to put envelope into database. Otherwise, the test is flaky.
// Although we subscribed to EnvelopeEvents and waited, the actual saving to database happens asynchronously.
// It would be nice to implement a subscription for database storing event, but it isn't worth it right now.
time.Sleep(100 * time.Millisecond)
// Directly ensure profile is available on store node
2024-05-15 23:15:00 +00:00
queryOptions := []legacy_store.HistoryRequestOption{
legacy_store.WithLocalQuery(),
}
2024-05-15 23:15:00 +00:00
query := legacy_store.Query{
PubsubTopic: "",
ContentTopics: []string{contentTopic.ContentTopic()},
}
result, err := s.wakuStoreNode.LegacyStoreNode().Query(context.Background(), query, queryOptions...)
s.Require().NoError(err)
s.Require().GreaterOrEqual(len(result.Messages), minimumCount)
s.logger.Debug("store node query result", zap.Int("messagesCount", len(result.Messages)))
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
// Test consecutive requests to check that requests in manager are finalized
// At second request we expect to fetch nothing, because the community is already in the database
s.fetchCommunity(s.bob, community.CommunityShard(), community)
s.fetchCommunity(s.bob, community.CommunityShard(), nil)
}
func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
storeNodeRequestsCount := 0
s.bob.storeNodeRequestsManager.onPerformingBatch = func(batch MailserverBatch) {
storeNodeRequestsCount++
}
s.waitForAvailableStoreNode(s.bob)
wg := sync.WaitGroup{}
// Make 2 simultaneous fetch requests
// 1 fetch request = 2 requests to store node (fetch shard and fetch community)
// only 2 request to store node is expected
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}()
}
wg.Wait()
s.Require().Equal(2, storeNodeRequestsCount)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestNonExistentCommunity() {
// On test start store node database is empty, so just request any valid community ID.
request := FetchCommunityRequest{
CommunityKey: "0x036dc11a0663f88e15912f0adb68c3c5f68ca0ca7a233f1a88ff923a3d39b2cf07",
Shard: nil,
TryDatabase: false,
WaitForResponse: true,
}
s.createBob()
s.waitForAvailableStoreNode(s.bob)
fetchedCommunity, err := s.bob.FetchCommunity(&request)
s.Require().NoError(err)
s.Require().Nil(fetchedCommunity)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDisconnected() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
// WaitForAvailableStoreNode is done internally
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() {
const spamAmount = defaultStoreNodeRequestPageSize + initialStoreNodeRequestPageSize
s.createOwner()
s.createBob()
// Create a community
community := s.createCommunity(s.owner)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Push spam to the same ContentTopic & PubsubTopic
// The first requested page size is 1. All subsequent pages are limited to 20.
// We want to test the algorithm, so we push 21 spam envelopes.
for i := 0; i < spamAmount; i++ {
spamMessage := common.RawMessage{
Payload: RandomBytes(16),
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
PubsubTopic: community.PubsubTopic(),
}
_, err := s.owner.sender.SendPublic(context.Background(), community.IDString(), spamMessage)
s.Require().NoError(err)
}
// Wait the store node to receive envelopes
s.waitForEnvelopes(storeNodeSubscription, spamAmount)
// Fetch the community
stats := s.fetchCommunity(s.bob, community.CommunityShard(), community)
// Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched.
// First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description,
// then we fetch another page of data and successfully decrypt a community description.
s.Require().Equal(spamAmount+1, stats.FetchedEnvelopesCount)
s.Require().Equal(3, stats.FetchedPagesCount)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() {
s.createOwner()
s.createBob()
// Create 2 communities
community1 := s.createCommunity(s.owner)
community2 := s.createCommunity(s.owner)
description2, err := community2.MarshaledDescription()
s.Require().NoError(err)
// Push community2 description to the same ContentTopic & PubsubTopic as community1.
// This way we simulate 2 communities with same ContentTopic.
spamMessage := common.RawMessage{
Payload: description2,
Sender: community2.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
PubsubTopic: community1.PubsubTopic(),
}
_, err = s.owner.sender.SendPublic(context.Background(), community1.IDString(), spamMessage)
s.Require().NoError(err)
// Fetch the community
s.fetchCommunity(s.bob, community1.CommunityShard(), community1)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestMultipleCommunities() {
s.createOwner()
s.createBob()
// Create 2 communities
community1 := s.createCommunity(s.owner)
community2 := s.createCommunity(s.owner)
fetchedCommunities := map[string]*communities.Community{}
err := WaitOnSignaledCommunityFound(s.bob,
func() {
err := s.bob.fetchCommunities([]communities.CommunityShard{
community1.CommunityShard(),
community2.CommunityShard(),
})
s.Require().NoError(err)
},
func(community *communities.Community) bool {
fetchedCommunities[community.IDString()] = community
return len(fetchedCommunities) == 2
},
1*time.Second,
"communities were not signalled in time",
)
s.Require().NoError(err)
s.Require().Contains(fetchedCommunities, community1.IDString())
s.Require().Contains(fetchedCommunities, community2.IDString())
}
func (s *MessengerStoreNodeRequestSuite) TestRequestWithoutWaitingResponse() {
s.createOwner()
s.createBob()
// Create a community
community := s.createCommunity(s.owner)
request := FetchCommunityRequest{
CommunityKey: community.IDString(),
Shard: nil,
TryDatabase: false,
WaitForResponse: false,
}
fetchedCommunities := map[string]*communities.Community{}
err := WaitOnSignaledCommunityFound(s.bob,
func() {
fetchedCommunity, err := s.bob.FetchCommunity(&request)
s.Require().NoError(err)
s.Require().Nil(fetchedCommunity)
},
func(community *communities.Community) bool {
fetchedCommunities[community.IDString()] = community
return len(fetchedCommunities) == 1
},
1*time.Second,
"communities weren't signalled",
)
s.Require().NoError(err)
s.Require().Len(fetchedCommunities, 1)
s.Require().Contains(fetchedCommunities, community.IDString())
s.requireCommunitiesEqual(fetchedCommunities[community.IDString()], community)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() {
s.createOwner()
defer s.tearDownOwner()
// Set keypair (to be able to set displayName)
ownerProfileKp := accounts.GetProfileKeypairForTest(true, false, false)
ownerProfileKp.KeyUID = s.owner.account.KeyUID
ownerProfileKp.Accounts[0].KeyUID = s.owner.account.KeyUID
err := s.owner.settings.SaveOrUpdateKeypair(ownerProfileKp)
s.Require().NoError(err)
contentTopicString := transport.ContactCodeTopic(&s.owner.identity.PublicKey)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(contentTopicString))
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Set display name, this will also publish contact code
err = s.owner.SetDisplayName("super-owner")
s.Require().NoError(err)
s.waitForEnvelopes(storeNodeSubscription, 1)
s.ensureStoreNodeEnvelopes(&contentTopic, 1)
// Fetch profile
s.createBob()
defer s.tearDownBob()
s.fetchProfile(s.bob, s.owner.selfContact.ID, s.owner.selfContact)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfoError() {
s.createOwner()
defer s.tearDownOwner()
// Non-existent contact used by QA team for e2e autotests: (https://github.com/status-im/status-desktop/issues/15297)
fetchedContact, err := s.owner.FetchContact("0x04e972b2a794c315e16411fc0930a65bffffe4f885341683f4532fbbd883a447d849ac0be63d6a4f721affa0d0408160974ff831408433972de2c4556ef06d1ae1", true)
s.Require().Nil(fetchedContact)
s.Require().Nil(err)
u := "https://status.app/u/iwWACgoKCHNxdWlzaGVyAw==#zQ3shvMPZSyaUbjBjaNpNP1bPGsGpQDp59dZ4Gmz7UEy5o791"
response, err := s.owner.UnfurlURLs(nil, []string{u})
s.Require().NoError(err)
s.Require().Len(response.LinkPreviews, 0)
}
// TestSequentialUpdates checks that making updates to the community
// immediately results in new store node fetched information.
// Before adding storeNodeSubscription we had problems with the test setup that we didn't have a mechanism to wait for store node to
// receive and process new messages.
func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
communityName := community.Name()
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
for i := 0; i < 3; i++ {
// Change community name, this will automatically publish a new community description
ownerEditRequest := &requests.EditCommunity{
CommunityID: community.ID(),
CreateCommunity: requests.CreateCommunity{
Name: fmt.Sprintf("%s-%d", communityName, i),
Description: community.DescriptionText(),
Color: community.Color(),
Membership: community.Permissions().Access,
},
}
_, err := s.owner.EditCommunity(ownerEditRequest)
s.Require().NoError(err)
s.waitForEnvelopes(storeNodeSubscription, 1)
// Get updated community from the database
community, err = s.owner.communitiesManager.GetByID(community.ID())
s.Require().NoError(err)
s.Require().NotNil(community)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
}
func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
topicPrivKey, err := crypto.GenerateKey()
s.Require().NoError(err)
expectedShard := &shard.Shard{
Cluster: shard.MainStatusShardCluster,
Index: 23,
}
err = s.wakuStoreNode.SubscribeToPubsubTopic(expectedShard.PubsubTopic(), &topicPrivKey.PublicKey)
s.Require().NoError(err)
topicPrivKeyBytes := crypto.FromECDSA(topicPrivKey)
h := types.HexBytes(topicPrivKeyBytes)
shardRequest := &requests.SetCommunityShard{
CommunityID: community.ID(),
Shard: expectedShard,
PrivateKey: &h,
}
shardTopic := transport.CommunityShardInfoTopic(community.IDString())
contentContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(shardTopic))
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentContentTopic)
_, err = s.owner.SetCommunityShard(shardRequest)
s.Require().NoError(err)
s.waitForEnvelopes(storeNodeSubscription, 1)
s.waitForAvailableStoreNode(s.bob)
communityShard := community.CommunityShard()
community, err = s.owner.communitiesManager.GetByID(community.ID())
s.Require().NoError(err)
s.Require().NotNil(community)
s.Require().NotNil(community.Shard())
s.fetchCommunity(s.bob, communityShard, community)
}
func (s *MessengerStoreNodeRequestSuite) TestFiltersNotRemoved() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
// The owner is a member of the community, so he has a filter for community description content topic.
// We want to check that filter is not removed by `FetchCommunity` call.
filterBefore := s.owner.transport.FilterByChatID(community.IDString())
s.Require().NotNil(filterBefore)
s.fetchCommunity(s.owner, community.CommunityShard(), nil)
filterAfter := s.owner.transport.FilterByChatID(community.IDString())
s.Require().NotNil(filterAfter)
s.Require().Equal(filterBefore.FilterID, filterAfter.FilterID)
}
func (s *MessengerStoreNodeRequestSuite) TestFiltersRemoved() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
// The bob is a member of the community, so he has no filters for community description content topic.
// We want to check that filter created by `FetchCommunity` is removed on request finish.
filterBefore := s.bob.transport.FilterByChatID(community.IDString())
s.Require().Nil(filterBefore)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
filterAfter := s.bob.transport.FilterByChatID(community.IDString())
s.Require().Nil(filterAfter)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
s.createOwner()
s.createBob()
const descriptionsCount = 4
community := s.createCommunity(s.owner)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Push a few descriptions to the store node
for i := 0; i < descriptionsCount-1; i++ {
err := s.owner.publishOrg(community, false)
s.Require().NoError(err)
}
// Wait for store node to receive envelopes
s.waitForEnvelopes(storeNodeSubscription, descriptionsCount-1)
// Subscribe to received envelope
bobWakuV2 := gethbridge.GetGethWakuV2From(s.bobWaku)
var receivedEnvelopes []*wakuV2common.ReceivedMessage
s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) {
receivedEnvelopes = append(receivedEnvelopes, envelope)
})
// Force a single-envelope page size to be able to check the order.
// Also force all envelopes to be fetched.
options := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
WithStopWhenDataFound(false),
WithInitialPageSize(1),
WithFurtherPageSize(1),
}
// Fetch the community
fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options)
s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, community)
// Ensure all expected envelopes were received
s.Require().Equal(descriptionsCount, len(receivedEnvelopes))
// We check that each next envelope fetched is newer than the previous one
for i := 1; i < len(receivedEnvelopes); i++ {
s.Require().Less(
receivedEnvelopes[i].Envelope.Message().GetTimestamp(),
receivedEnvelopes[i-1].Envelope.Message().GetTimestamp())
}
}
/*
TestFetchRealCommunity is not actually a test, but an utility to check the community description in all of the store nodes.
It's intended to only run locally and shouldn't be executed in CI, because it relies on connection to the real network.
TODO: It would be nice to move this code to a real utility in /cmd.
It should allow us to fairly verify the community owner and do other good things.
To run this test, first set `runLocalTests` to true.
Then carefully set all of communityID, communityShard, fleet and other const variables.
NOTE: I only tested it with the default parameters, but in theory it should work for any configuration.
*/
type testFetchRealCommunityExampleTokenInfo struct {
ChainID uint64
ContractAddress string
}
var testFetchRealCommunityExample = []struct {
CommunityID string
CommunityURL string // If set, takes precedence over CommunityID
CommunityShard *shard.Shard // WARNING: I didn't test a sharded community
Fleet string
ClusterID uint16
UserPrivateKeyString string // When empty a new user will be created
// Optional request parameters
CustomOptions []StoreNodeRequestOption
// Setup OwnerPublicKey and CommunityTokens if the community has owner token
// This is needed to mock the owner verification
OwnerPublicKey string
CommunityTokens []testFetchRealCommunityExampleTokenInfo
// Fill these if you know what envelopes are expected.
// The test will fail if fetched array doesn't equal to the expected one.
CheckExpectedEnvelopes bool
ExpectedShardEnvelopes []string
ExpectedDescriptionEnvelopes []string
}{
{
//Example 1,
CommunityID: "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a",
CommunityShard: nil,
Fleet: params.FleetStatusProd,
ClusterID: shard.MainStatusShardCluster,
},
{
// Example 3,
// https://status.app/c/CxiACi8KFGFwIHJlcSAxIHN0dCBiZWMgbWVtEgdkc2Fkc2FkGAMiByM0MzYwREYqAxkrHAM=#zQ3shwDYZHtrLE7NqoTGjTWzWUu6hom5D4qxfskLZfgfyGRyL
CommunityID: "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b",
Fleet: params.FleetStatusProd,
ClusterID: shard.MainStatusShardCluster,
CheckExpectedEnvelopes: true,
ExpectedShardEnvelopes: []string{
"0x8173eecd7ff9ebcaae3dde0e704daf9bdeb6d33b0d8505a67e7dc56d0d8fc07c",
"0x596bbafbe0e0b625d165378cd4c7641a4d23aa1145c705aad666ddeaf60c88cd",
"0x8a1ee798f3657da5a463e5f878ab2455d05b8f552359b58330ccd7fa4f5624b0",
"0x97bcde2103a01984bb45a8590a6cb6972411445a1b2d40e181d5f2b5366fa5f1",
"0x26e3c0c880d1a2c4e81bf4fffbdb8b7e1ecc91fce7c6a05ee87d200d62ffc11e",
"0x1a8820bd61ebcc9de75c25f31c9b05eb6e880a5a4902679bb6ce2f43f61bf159",
"0xce450cfb5f79d761f34dea5b2ccec63751886e43ae63477e12f517c31f800aeb",
"0x9607bd1cf08355c44bcce055da197ba177201882736fa8874910194ccdaa8760",
"0x0c4b989ca69f529e571e6ea8b3230a85e057d8b2ae6147d1fedc2a01f2816ed6",
"0xe40ea64c9007a064b6324b614976510f2a433c9f84d87139df8f66b536e37ee4",
"0x7a028466a095e40650bb0ef16e903309b0c38c5a7cb7e2e9debd0acf2151448d",
"0x96419c6be375b2b348778d4694e3a491de84eecde601d5d405a0e72e9cece4a1",
"0xbcaeb5e86128638fab7203428daddd741df44ceeabe7d9d25936a10cd0a8b808",
"0x2e0b5872cb5a7c9a3273048eb2dfcb1d6a28faad3fe307a7db6c2dbaca9ce462",
"0xfa96bbe4125514ce73c52ef3ccb1c4ad9c4ad4afe8803de8ab9309fe9483b1d0",
"0xdeddfc82f70cce77c26959d91851fbc33afe648428c3e6ea349b2a2456b92111",
"0x5b12f17d7b712071f57bb48b7dcd0d6568ff5e7c3f8b3811013aea8dde9c6243",
"0x18928fd044482c75518162104d487e6fe504f086eb8c5e9f21aa4bce2811d0fa",
"0x543c156ced76138d69229a39425a0a1cabd617770e023333c10501b979f52d61",
"0xf46ea6bf5ab6a70662bcf227cc5d2c8c7a70ce42a88e5bb7ebe9e598668a8ae2",
"0xedb9628dc1ce5b0ec899c3813dd4159a2e06fb3dc88ffaae047e927c804ad0b3",
"0x16248eccc3544af3fc4a73467d0925ca2f3741eb623516ee369f710e4aa8a3ef",
"0x6a85f784a9004b56bbb47d87f5541173f05bd61ff5b26e41c714adbb5516e9ff",
"0x91e320be2cb5c6178027390cbce165fe088a1a35e1442382064ddbc9aabea8a2",
"0x676496dd36ae40e184863725bfb7425e46d916f73f4b0dd5d10324f4e9325da6",
},
ExpectedDescriptionEnvelopes: []string{
"0xe2c38667ee160861b3dc5a00e4422f47de1303c8b61f0a33c4853ce0b71d0ae4",
"0x7d8392baa9dd134e43287e58d69b8c9f50aa5c144adda6a3c7d32f00c5dea309",
"0xf74918d445709ccf9c29e776d27e9b7dc31f25a28473e8fcd89ed9de8a2e6df4",
"0xafd7e9b6245d88ea2b4fe70265aa3c5ce5618827c1092f8e3058315ac27c5b98",
"0x4d16bd4fcfc2d8736dc29d8c7287671f7e1df62af74943dc40a39ae18f388a07",
"0xd101f14bcf6bb9a5e72b934b3e74ebe3f77774037cb5b193803b264d69bfb9bf",
"0xaa857389e8886401678690bb4dbc664486bd7039427ca53e826197b303696cff",
"0x4443448e575824330a96d5114a9a7d3fe0ee7168ab6c7a646057ca4502fb91d2",
"0xfbb79b8d0ee1a61109c543cbe580412fe6d23de33d163006f74fef4addbdad37",
"0x51bc55c732e0e9db40fd8865a8ae20cbd99bfd4f95c62cf8b591e793d9b642d2",
"0xc28ca742c0e159a60941fb68ee5832016b510afe54e8ee8bb2200495ac29f1e3",
"0xf44f1714743a55f0170ba3627390d84cb9307326028abf7236222c93104db833",
"0x69d067da262b124d2eb6a8ba9f08e0a1ad66afb8d7ff3641a256992fff53a6de",
"0xa86a08457854fbea347cdd92fd390a330a9971967c551bebc53b18ccfea876fe",
"0x592850f8bc3c6079826f168971746bd2a1b50a5011fe3b233ead6c72c92a3373",
"0x1b3151d7b9b37350e86c937dc2e7964d472815bbf9275714bcb16a0c4327fe3d",
"0x1f2beca64e52f996127647cf3f3abd2e4fe501646fc39e98713ae064333388b3",
"0xeafc8f9d3114426c08748e6171710874074fb1eb732d9364830ce9b58955c83c",
"0x6e014a5ec75465efaf036353ec5811b8f710f608a04002925eba3a0a37a30423",
"0xf7e12a5829cf90e4272132e7b62c5bf0dd09100c8d498c66c9740a798db559b3",
"0x62da7e828862c3c8692cbd077c5a62266d811764184378e55f9f1066510b4652",
"0x74201394d05b914bc7e6ce4d2fcf0b119491c80644f48ac9fd37b842e4a0275a",
"0x9d4c0b1be53810c45c2fa744baa8d16ef4ae3a319b09043f4b4e053127461bf0",
"0x1937979514ea1dba8ab3b621fc3d0a3f6246b4bf1f9b4073888b8dbd9b4a765a",
"0xf767f3f36fdecb5ef6650542232334df836bfca1e7f72f1215df50d3f9f9c9bc",
"0x3a06002325502bc39a77962241fe274d4e88f61762194d321f9cc95272ed4a74",
"0x13caae58261c181d4974d7a68e0b7c8580c3cc569840179d53ae76407548d8b8",
"0x36f3b4afbcc4177a7aef26ad567839ffce51896d4c40d0a08d222cebd1255e3b",
"0x159e685bbab26a5d54ab817c93c9c610055bcf2af75290abcc9a84f1b85a2de9",
"0x1fd5ff73d7ea9a19f282bd0716f04a5e86b7c515839f0c721f66b3fe99161054",
"0x95b1e9ada4913ca809c9c28fc225a21753f18a90253660750900c78f79ad2a00",
"0x4334826934a7cbfb7446ec9d581fa6433c5d1f7f51b97f24717f55cffa320c65",
"0x0c01d07108c448797ffd14a2152fd38d1764c8a9c5e2f3da12f70551588add7a",
"0xe7071c6587fc277c4f4c0d7e4575de1a0843d3cf6c2a4aac79be79edc1608038",
"0x5da4e482f3e6eacf080db685e00c199c8cbbad9a8f43b1d94944426444a7a84a",
"0x638f551acdd7ccffd6a40ad12bbba1da8fd8a58157fdf9625b12d4a95b4eef71",
"0xa1a52c28e0481f6004d98bbce906676fff67f04246454bd33fba02c640355af0",
"0xe0300eb9e0f215ace491b1104665b48b9f6bff039af40e0cfc52a3ce766e747f",
"0xd092c04d51ee963d59953324d84188a0c1636e8600cb0f5f6f3f4f826d70c8f3",
"0x8d94bbfee687d534361fc3069079cf4e4f7db2a179d24e6419f67e38b5f0bd34",
"0x1fd7a4d2c04fca3875126b7a951f619b4da0000ca47496df0c2fb1048a145108",
"0xbbefbd116cbb23de193318b328412addc500af965d31ba481d70fa1d9e99461d",
"0xaa4e0e8bd820438e22b93371bda24a29922d33c15fb312b343d2e81a22cbdd95",
"0x76aef29ea4dde107c22c520efb2a4516b69ae83bc237281d9990f68397d801f5",
"0x804789119513a065d892cba5d240cb4d89d7329aeee93fcd8e85379a4d362fc9",
"0x9029b4a13903a3369e3466f1bfabae3f26b6721628db138eaba25c1f55f6fc1e",
"0xee38c209cb95035a289034c737e5775877145efea31b2a01f7c9241ff02f3e92",
"0x3e76da87895ca821db3b7ed7dc6949557c620d9cbcaf97af39ed4955d37b734e",
"0xf5e77eb8f9a5c52e09a56dd5e461bfee6cf9a73e1253f1d41bcde81fe3646997",
"0x083e06375c366283e541b249ea8646c3f31feb970078e95861ea399f0a57d09a",
"0xcd7db07ba557ec1ba0104909fdb958661c60c82213a75e8d15e7b262ef4f58b7",
"0x57b49dc83e1d3ac7b56bb7d758a9bf448339593311103bce4f0a53028587d577",
"0xb08cd92a5ec6f44a6129a60107132ca17d5fef29fb2bc5ebba14028d57a8038e",
"0x76959f98c8c734307a985294c26f008f3f705912aab02a5b3a0602a8598c02e2",
"0xd8ad7df58ffeec20b16a140bdd91484a34fca3ad7fc602043530ca63c307809a",
"0x6872ef39653bb208ef51f5b11c4bda3eb494a13f5da14e33687d30aef99ef383",
"0x0fabebc2e0c02f4add94886216b01ecdbd34929ad303d1a20a4505bf729038f6",
"0xc18270cd532bb3d34f62704feb722c40be48aedb5ad38d4d09fd67f5843b686d",
"0xbc16217ece82998783d4209ed3bc3f2f33d92630e43933b0129eb8b792500a3f",
"0xbda651e3b9c82f4bcf5b16252407fc888952820c842c49c06b4f01c8127e359a",
"0xb4b1799950c6aca3b011ffb775d0f973437d7d46e40cf7b379ff736d08f24eb2",
"0x38f12fb09c71dd720cacbb2102ac78ad6fbf830558adc7af9fb773f39e728bdc",
"0x489eb6fa2f5ee5b2a071c7083bf36a0a6cb4ec96049707d25843d9a97b4ac7be",
"0x64ea5655c8caf89a53c94edd5a47ba750d9fbcf099ec0dcd4026656b044486f1",
"0x501aee1c5da6aaeaae14abffefbc377b59ebe3fcaa9981bc83bfeffb25344749",
"0x9a3d360ea866102a6268ffd2001617c442b74b221d131fb3c08ae29bfac18203",
},
},
{
//Example 1,
CommunityID: "0x02471dd922756a3a50b623e59cf3b99355d6587e43d5c517eb55f9aea9d3fe9fe9",
Fleet: params.FleetStatusProd,
ClusterID: shard.MainStatusShardCluster,
CheckExpectedEnvelopes: true,
ExpectedShardEnvelopes: []string{
"0xc3e68e838d09e0117b3f3fd27aabe5f5a509d13e9045263c78e6890953d43547",
"0x5ee13d052bedb855ce2b9ba6f43c78233fbd4e6539a3bdf156497053c6ddf76d",
"0xfb6638b7e050f9323a0fe7b84986b5c6f8827965e67e3b3bd0fea21cf24e43de",
},
ExpectedDescriptionEnvelopes: []string{
"0x5b4fa95d430c939c1cbbb26175eabfb4ee058d508c6b4c0e26624958ba02c3ce",
"0xbf44409ee40dea7816186b37a45dfebabcee59f76855ad5af663ccdf598861ab",
"0x98d98453f6017517d0114989da0938aad59a3ad9a10839c181f453283f64f5c9",
},
},
{
CommunityURL: "https://status.app/c/G4IAAMQn9ucHF-V3W5Ouuy0xf0BtTjlwCANJEmwB2CG5p2xKUYzK_l37kzXulUppltT1t6mBcCEJsljRoGrKCP7rWommQomrMA2gBN7RrvCMkFqQwnCNzkNYWrLG85E6GVoM_nolTtfIzl53J1N-tj8fz4_TnO4IIw==#zQ3shZeEJqTC1xhGUjxuS4rtHSrhJ8vUYp64v6qWkLpvdy9L9",
//CommunityID: "0x02b5bdaf5a25fcfe2ee14c501fab1836b8de57f61621080c3d52073d16de0d98d6",
Fleet: params.FleetStatusProd,
OwnerPublicKey: "0x04953f5f0d355b37c39d1d6460a31ed1114455f8263b3fd1b84406c5f12c9eb7dfb76ba7513b92186010928254984fe98aee069b4c7e20f9ea3da497c3ae769477",
CommunityTokens: []testFetchRealCommunityExampleTokenInfo{
{
ChainID: 10,
ContractAddress: "0x9eDc11E5932372387E76ff3dcF66DB5465893823",
},
{
ChainID: 10,
ContractAddress: "0xD91d2E898f996308D643E3b9C78f5FEb5c5F404F",
},
{
ChainID: 10,
ContractAddress: "0x852E13D2BDFC4C3a761DA7B450f631b555b39C5E",
},
{
ChainID: 10,
ContractAddress: "0x000CfAd029EE94d120e0c136278582F94Cdc532c",
},
{
ChainID: 10,
ContractAddress: "0x21F6F5Cb75E81e5104D890D750270eD6538C50cb",
},
},
ClusterID: shard.MainStatusShardCluster,
CheckExpectedEnvelopes: false,
CustomOptions: []StoreNodeRequestOption{
WithInitialPageSize(1),
WithStopWhenDataFound(true),
},
},
}
func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
if !runLocalTests {
return
}
exampleToRun := testFetchRealCommunityExample[3]
// Test configuration
communityID := exampleToRun.CommunityID
communityShard := exampleToRun.CommunityShard
fleet := exampleToRun.Fleet
clusterID := exampleToRun.ClusterID
userPrivateKeyString := exampleToRun.UserPrivateKeyString
ownerPublicKey := exampleToRun.OwnerPublicKey
communityTokens := exampleToRun.CommunityTokens
if exampleToRun.CommunityURL != "" {
urlResponse, err := ParseSharedURL(exampleToRun.CommunityURL)
s.Require().NoError(err)
s.Require().NotNil(urlResponse.Community)
communityID = urlResponse.Community.CommunityID
}
// Prepare things depending on the configuration
nodesList := mailserversDB.DefaultMailserversByFleet(fleet)
descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID)))
communityIDBytes, err := types.DecodeHex(communityID)
s.Require().NoError(err)
// update mock - the signer for the community returned by the contracts should be owner
for _, communityToken := range communityTokens {
s.collectiblesServiceMock.SetSignerPubkeyForCommunity(communityIDBytes, ownerPublicKey)
s.collectiblesServiceMock.SetMockCollectibleContractData(communityToken.ChainID, communityToken.ContractAddress,
&communities.CollectibleContractData{TotalSupply: &bigint.BigInt{}})
}
results := map[string]singleResult{}
wg := sync.WaitGroup{}
// We run a separate request for each node in the fleet.
for i, mailserver := range nodesList {
wg.Add(1)
go func(i int, mailserver mailserversDB.Mailserver) {
defer wg.Done()
fmt.Printf("--- starting request [%d] from %s\n", i, mailserver.ID)
result := singleResult{}
//
// Create WakuV2 node
// NOTE: Another option was to create a bare waku node and fetch envelopes directly with it
// and after that push all of the envelopes to a new messenger and check the result.
// But this turned out to be harder to implement.
//
wakuLogger := s.logger.Named(fmt.Sprintf("user-waku-%d", i))
messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i))
cfg := testWakuV2Config{
logger: wakuLogger,
enableStore: false,
clusterID: clusterID,
}
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)
//
// Create a messenger to process envelopes
//
var privateKeyString = userPrivateKeyString
if privateKeyString == "" {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
privateKeyString = hexutil.Encode(crypto.FromECDSA(privateKey))
}
privateKeyBytes, err := hexutil.Decode(privateKeyString)
s.Require().NoError(err)
privateKey, err := crypto.ToECDSA(privateKeyBytes)
s.Require().NoError(err)
// Mock a local fleet with single store node
// This is done by settings custom store nodes in the database
mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)
mailserver.Fleet = localFleet
err = mailserversDatabase.Add(mailserver)
s.Require().NoError(err)
options := []Option{
WithMailserversDatabase(mailserversDatabase),
WithClusterConfig(params.ClusterConfig{
Fleet: localFleet,
ClusterID: clusterID,
}),
WithCommunityTokensService(s.collectiblesServiceMock),
}
// Create user without `createBob` func to force desired fleet
user, err := newMessengerWithKey(userWaku, privateKey, messengerLogger, options)
s.Require().NoError(err)
defer TearDownMessenger(&s.Suite, user)
communityAddress := communities.CommunityShard{
CommunityID: communityID,
Shard: communityShard,
}
// Setup envelopes watcher to gather fetched envelopes
s.setupEnvelopesWatcher(wakuV2, &shardContentTopic, func(envelope *wakuV2common.ReceivedMessage) {
result.ShardEnvelopes = append(result.ShardEnvelopes, envelope)
})
s.setupEnvelopesWatcher(wakuV2, &descriptionContentTopic, func(envelope *wakuV2common.ReceivedMessage) {
result.Envelopes = append(result.Envelopes, envelope)
})
// Start fetching
storeNodeRequestOptions := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
WithStopWhenDataFound(false), // In this test we want all envelopes to be fetched
WithInitialPageSize(defaultStoreNodeRequestPageSize), // Because we're fetching all envelopes anyway
}
storeNodeRequestOptions = append(storeNodeRequestOptions, exampleToRun.CustomOptions...)
fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions)
result.EnvelopesCount = stats.FetchedEnvelopesCount
result.FetchedCommunity = fetchedCommunity
result.Error = err
results[mailserver.ID] = result
}(i, mailserver)
}
// Wait for all requests to finish
wg.Wait()
// Print the results
for storeNodeName, result := range results {
fmt.Printf("%s --- %s\n", storeNodeName, result.toString())
}
// Check that results has no errors and contain correct envelopes
for storeNodeName, result := range results {
s.Require().NoError(result.Error)
if exampleToRun.CheckExpectedEnvelopes {
s.Require().Equal(exampleToRun.ExpectedShardEnvelopes, result.ShardEnvelopesHashes(),
fmt.Sprintf("wrong shard envelopes for store node %s", storeNodeName))
s.Require().Equal(exampleToRun.ExpectedDescriptionEnvelopes, result.EnvelopesHashes(),
fmt.Sprintf("wrong envelopes for store node %s", storeNodeName))
}
}
}
func (s *MessengerStoreNodeRequestSuite) TestFetchingCommunityWithOwnerToken() {
s.createOwner()
s.createBob()
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
// owner mints owner token
var chainID uint64 = 1
tokenAddress := "token-address"
tokenName := "tokenName"
tokenSymbol := "TSM"
_, err := s.owner.SaveCommunityToken(&token.CommunityToken{
TokenType: protobuf.CommunityTokenType_ERC721,
CommunityID: community.IDString(),
Address: tokenAddress,
ChainID: int(chainID),
Name: tokenName,
Supply: &bigint.BigInt{},
Symbol: tokenSymbol,
PrivilegesLevel: token.OwnerLevel,
}, nil)
s.Require().NoError(err)
// owner adds minted owner token to community
err = s.owner.AddCommunityToken(community.IDString(), int(chainID), tokenAddress)
s.Require().NoError(err)
// update mock - the signer for the community returned by the contracts should be owner
s.collectiblesServiceMock.SetSignerPubkeyForCommunity(community.ID(), common.PubkeyToHex(&s.owner.identity.PublicKey))
s.collectiblesServiceMock.SetMockCollectibleContractData(chainID, tokenAddress,
&communities.CollectibleContractData{TotalSupply: &bigint.BigInt{}})
community, err = s.owner.communitiesManager.GetByID(community.ID())
s.Require().NoError(err)
s.Require().Len(community.TokenPermissions(), 1)
s.waitForAvailableStoreNode(s.bob)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() {
storeAddress := s.storeNodeAddress
storePeerID := s.wakuStoreNode.PeerID()
// Create messengers
s.createOwner()
s.createBob()
s.logger.Debug("store node info", zap.String("peerID", s.wakuStoreNode.PeerID().String()))
s.logger.Debug("owner node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.ownerWaku).PeerID().String()))
s.logger.Debug("bob node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.bobWaku).PeerID().String()))
// Connect to store node to force "online" status
{
WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() peer.IDSlice {
err := s.bob.DialPeer(storeAddress)
s.Require().NoError(err)
return peer.IDSlice{storePeerID}
})
s.Require().True(s.bob.Online())
// Wait for bob to fetch backup and historic messages
time.Sleep(2 * time.Second)
}
// bob goes offline
{
WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() bool {
err := s.bob.DropPeer(storePeerID)
s.Require().NoError(err)
return false
})
s.Require().False(s.bob.Online())
}
// Owner sends a contact request while bob is offline
{
// Setup store nodes envelopes watcher
partitionedTopic := transport.PartitionedTopic(s.bob.IdentityPublicKey())
topic := transport.ToTopic(partitionedTopic)
contentTopic := wakuV2common.BytesToTopic(topic)
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Send contact request
response, err := s.owner.SendContactRequest(context.Background(), &requests.SendContactRequest{
ID: s.bob.IdentityPublicKeyString(),
Message: "1",
})
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Messages(), 2)
// Ensure contact request is stored
s.waitForEnvelopes(storeNodeSubscription, 1)
}
// owner goes offline to prevent message resend and any other side effects
// to go offline we disconnect from both relay and store peers
WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() bool {
err := s.owner.DropPeer(storePeerID)
s.Require().NoError(err)
return false
})
s.Require().False(s.owner.Online())
// bob goes back online, this should trigger fetching historic messages
{
// Enable auto request historic messages, so that when bob goes online it will fetch historic messages
// We don't enable it earlier to control when we connect to the store node.
s.bob.config.codeControlFlags.AutoRequestHistoricMessages = true
WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() peer.IDSlice {
err := s.bob.DialPeer(storeAddress)
s.Require().NoError(err)
return peer.IDSlice{storePeerID}
})
s.Require().True(s.bob.Online())
// Don't dial the peer, message should be fetched from store node
response, err := WaitOnMessengerResponse(
s.bob,
func(r *MessengerResponse) bool {
return len(r.Contacts) == 1
},
"no contact request received",
)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Contacts, 1)
}
}