package protocol import ( "context" "fmt" "testing" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/status-im/status-go/protocol/storenodes" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/tt" "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/sqlite" "github.com/status-im/status-go/t/helpers" mailserversDB "github.com/status-im/status-go/services/mailservers" waku2 "github.com/status-im/status-go/wakuv2" wakuV2common "github.com/status-im/status-go/wakuv2/common" ) func TestMessengerStoreNodeCommunitySuite(t *testing.T) { t.Skip("requires storev3 node") suite.Run(t, new(MessengerStoreNodeCommunitySuite)) } type MessengerStoreNodeCommunitySuite struct { suite.Suite cancel chan struct{} owner *Messenger ownerWaku types.Waku bob *Messenger bobWaku types.Waku storeNode *waku2.Waku storeNodeAddress multiaddr.Multiaddr communityStoreNode *waku2.Waku communityStoreNodeAddress multiaddr.Multiaddr collectiblesServiceMock *CollectiblesServiceMock logger *zap.Logger } func (s *MessengerStoreNodeCommunitySuite) SetupTest() { s.logger = tt.MustCreateTestLogger() s.cancel = make(chan struct{}, 10) s.collectiblesServiceMock = &CollectiblesServiceMock{} s.storeNode, s.storeNodeAddress = s.createStore("store-1") s.communityStoreNode, s.communityStoreNodeAddress = s.createStore("store-community") s.owner, s.ownerWaku = s.newMessenger("owner", &s.storeNodeAddress) s.bob, s.bobWaku = s.newMessenger("bob", &s.storeNodeAddress) } func (s *MessengerStoreNodeCommunitySuite) TearDown() { close(s.cancel) if s.storeNode != nil { s.Require().NoError(s.storeNode.Stop()) } if s.communityStoreNode != nil { s.Require().NoError(s.communityStoreNode.Stop()) } if s.owner != nil { TearDownMessenger(&s.Suite, s.owner) } if s.bob != nil { TearDownMessenger(&s.Suite, s.bob) } } func (s *MessengerStoreNodeCommunitySuite) createStore(name string) (*waku2.Waku, multiaddr.Multiaddr) { cfg := testWakuV2Config{ logger: s.logger.Named(name), enableStore: true, clusterID: shard.MainStatusShardCluster, } storeNode := NewTestWakuV2(&s.Suite, cfg) addresses := storeNode.ListenAddresses() s.Require().GreaterOrEqual(len(addresses), 1, "no storenode listen address") return storeNode, addresses[0] } func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAddress *multiaddr.Multiaddr) (*Messenger, types.Waku) { localMailserverID := "local-mailserver-007" localFleet := "local-fleet-007" logger := s.logger.Named(name) cfg := testWakuV2Config{ logger: logger, enableStore: false, clusterID: shard.MainStatusShardCluster, } wakuV2 := NewTestWakuV2(&s.Suite, cfg) wakuV2Wrapper := gethbridge.NewGethWakuV2Wrapper(wakuV2) privateKey, err := crypto.GenerateKey() s.Require().NoError(err) mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) s.Require().NoError(err) err = sqlite.Migrate(mailserversSQLDb) // migrate default s.Require().NoError(err) var sAddr string if storenodeAddress != nil { sAddr = (*storenodeAddress).String() } mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) err = mailserversDatabase.Add(mailserversDB.Mailserver{ ID: localMailserverID, Name: localMailserverID, Address: sAddr, Fleet: localFleet, }) s.Require().NoError(err) options := []Option{ WithAutoRequestHistoricMessages(false), WithCuratedCommunitiesUpdateLoop(false), } if storenodeAddress != nil { options = append(options, WithTestStoreNode(&s.Suite, localMailserverID, *storenodeAddress, localFleet, s.collectiblesServiceMock), ) } messenger, err := newMessengerWithKey(wakuV2Wrapper, privateKey, logger, options) s.Require().NoError(err) return messenger, wakuV2Wrapper } func (s *MessengerStoreNodeCommunitySuite) createCommunityWithChat(m *Messenger) (*communities.Community, *Chat) { WaitForAvailableStoreNode(&s.Suite, m, 500*time.Millisecond) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil) createCommunityRequest := &requests.CreateCommunity{ Name: RandomLettersString(10), Description: RandomLettersString(20), Color: RandomColor(), Tags: RandomCommunityTags(3), Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, } response, err := m.CreateCommunity(createCommunityRequest, true) s.Require().NoError(err) s.Require().NotNil(response) s.Require().Len(response.Communities(), 1) s.Require().Len(response.Chats(), 1) s.Require().True(response.Communities()[0].Joined()) s.Require().True(response.Communities()[0].IsControlNode()) s.Require().True(response.Communities()[0].IsMemberOwner(&m.identity.PublicKey)) s.waitForEnvelopes(storeNodeSubscription, 1) return response.Communities()[0], response.Chats()[0] } func (s *MessengerStoreNodeCommunitySuite) requireCommunitiesEqual(c *communities.Community, expected *communities.Community) { if expected == nil { s.Require().Nil(c) return } s.Require().NotNil(c) s.Require().Equal(expected.IDString(), c.IDString()) s.Require().Equal(expected.Clock(), c.Clock()) s.Require().Equal(expected.Name(), c.Name()) s.Require().Equal(expected.Identity().Description, c.Identity().Description) s.Require().Equal(expected.Color(), c.Color()) s.Require().Equal(expected.Tags(), c.Tags()) s.Require().Equal(expected.Shard(), c.Shard()) s.Require().Equal(expected.TokenPermissions(), c.TokenPermissions()) s.Require().Equal(expected.CommunityTokensMetadata(), c.CommunityTokensMetadata()) } func (s *MessengerStoreNodeCommunitySuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunity *communities.Community) StoreNodeRequestStats { options := []StoreNodeRequestOption{ WithWaitForResponseOption(true), } fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options) s.Require().NoError(err) s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity) return stats } func (s *MessengerStoreNodeCommunitySuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100) envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher) go func() { defer envelopesSub.Unsubscribe() for { select { case <-s.cancel: return case envelopeEvent := <-envelopesWatcher: if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable { continue } if topic != nil && *topic != envelopeEvent.Topic { continue } envelope := wakuNode.GetEnvelope(envelopeEvent.Hash) cb(envelope) s.logger.Debug("envelope available event for fetched content topic", zap.Any("envelopeEvent", envelopeEvent), zap.Any("envelope", envelope), ) } } }() } func (s *MessengerStoreNodeCommunitySuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string { storeNodeSubscription := make(chan string, 100) s.setupEnvelopesWatcher(s.storeNode, topic, func(envelope *wakuV2common.ReceivedMessage) { storeNodeSubscription <- envelope.Hash().String() }) return storeNodeSubscription } func (s *MessengerStoreNodeCommunitySuite) waitForEnvelopes(subscription <-chan string, expectedEnvelopesCount int) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() for i := 0; i < expectedEnvelopesCount; i++ { select { case <-subscription: case <-ctx.Done(): err := fmt.Sprintf("timeout waiting for store node to receive envelopes, received: %d, expected: %d", i, expectedEnvelopesCount) s.Require().Fail(err) } } } func (s *MessengerStoreNodeCommunitySuite) TestSetCommunityStorenodesAndFetch() { err := s.owner.DialPeer(s.storeNodeAddress) s.Require().NoError(err) err = s.bob.DialPeer(s.storeNodeAddress) s.Require().NoError(err) // Create a community community, _ := s.createCommunityWithChat(s.owner) // Set the storenode for the community _, err = s.owner.SetCommunityStorenodes(&requests.SetCommunityStorenodes{ CommunityID: community.ID(), Storenodes: []storenodes.Storenode{ { StorenodeID: "community-store-node", Name: "community-store-node", CommunityID: community.ID(), Version: 2, Address: s.communityStoreNodeAddress, Fleet: "aaa", }, }, }) s.Require().NoError(err) // Bob tetches the community s.fetchCommunity(s.bob, community.CommunityShard(), community) } func (s *MessengerStoreNodeCommunitySuite) TestSetStorenodeForCommunity_fetchMessagesFromNewStorenode() { s.T().Skip("flaky") err := s.owner.DialPeer(s.storeNodeAddress) s.Require().NoError(err) err = s.bob.DialPeer(s.storeNodeAddress) s.Require().NoError(err) ownerPeerID := gethbridge.GetGethWakuV2From(s.ownerWaku).PeerID() bobPeerID := gethbridge.GetGethWakuV2From(s.bobWaku).PeerID() // 1. Owner creates a community community, chat := s.createCommunityWithChat(s.owner) // waits for onwer and bob to connect to the store node WaitForPeersConnected(&s.Suite, s.storeNode, func() peer.IDSlice { return peer.IDSlice{ownerPeerID, bobPeerID} }) // 2. Bob joins the community advertiseCommunityTo(&s.Suite, community, s.owner, s.bob) joinCommunity(&s.Suite, community.ID(), s.owner, s.bob, bobPassword, []string{bobAccountAddress}) // waits for onwer and bob to connect to the community store node WaitForPeersConnected(&s.Suite, s.communityStoreNode, func() peer.IDSlice { err := s.bob.DialPeer(s.communityStoreNodeAddress) s.Require().NoError(err) err = s.owner.DialPeer(s.communityStoreNodeAddress) s.Require().NoError(err) return peer.IDSlice{ownerPeerID, bobPeerID} }) // 3. Owner sets the storenode for the community _, err = s.owner.SetCommunityStorenodes(&requests.SetCommunityStorenodes{ CommunityID: community.ID(), Storenodes: []storenodes.Storenode{ { StorenodeID: "community-store-node", Name: "community-store-node", CommunityID: community.ID(), Version: 2, Address: s.communityStoreNodeAddress, Fleet: "aaa", }, }, }) s.Require().NoError(err) // 5. Bob sends a message to the community chat inputMessage := buildTestMessage(*chat) _, err = s.bob.SendChatMessage(context.Background(), inputMessage) s.Require().NoError(err) // 6. Owner fetches the message from the new storenode err = s.owner.FetchMessages(&requests.FetchMessages{ ID: chat.ID, }) s.Require().NoError(err) } func (s *MessengerStoreNodeCommunitySuite) TestToggleUseMailservers() { // Enable use of mailservers err := s.owner.ToggleUseMailservers(true) s.Require().NoError(err) s.Require().NotNil(s.owner.mailserverCycle.activeMailserver) // Disable use of mailservers err = s.owner.ToggleUseMailservers(false) s.Require().NoError(err) s.Require().Nil(s.owner.mailserverCycle.activeMailserver) }