fix_(StoreNodeRequestManager): various fixes and improvements (#4509)

This commit is contained in:
Igor Sirotin 2023-12-27 13:53:19 +00:00 committed by GitHub
parent 843bae5659
commit 195982c950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 509 additions and 126 deletions

View File

@ -125,8 +125,6 @@ func (mr *MockDatabaseSettingsManagerMockRecorder) CanUseMailservers() *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanUseMailservers", reflect.TypeOf((*MockDatabaseSettingsManager)(nil).CanUseMailservers))
}
// DeviceName mocks base method.
func (m *MockDatabaseSettingsManager) DeviceName() (string, error) {
m.ctrl.T.Helper()
@ -202,7 +200,6 @@ func (mr *MockDatabaseSettingsManagerMockRecorder) GetEIP1581Address() *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEIP1581Address", reflect.TypeOf((*MockDatabaseSettingsManager)(nil).GetEIP1581Address))
}
// GetInstalledStickerPacks mocks base method.
func (m *MockDatabaseSettingsManager) GetInstalledStickerPacks() (*json.RawMessage, error) {
m.ctrl.T.Helper()

View File

@ -2574,10 +2574,16 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities
}
}
community, _, err := m.storeNodeRequestsManager.FetchCommunity(communities.CommunityShard{
communityAddress := communities.CommunityShard{
CommunityID: communityID,
Shard: request.Shard,
}, request.WaitForResponse)
}
options := []StoreNodeRequestOption{
WithWaitForResponseOption(request.WaitForResponse),
}
community, _, err := m.storeNodeRequestsManager.FetchCommunity(communityAddress, options)
return community, err
}
@ -2585,7 +2591,7 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities
// fetchCommunities installs filter for community and requests its details from store node.
// When response received it will be passed through signals handler.
func (m *Messenger) fetchCommunities(communities []communities.CommunityShard) error {
return m.storeNodeRequestsManager.FetchCommunities(communities)
return m.storeNodeRequestsManager.FetchCommunities(communities, []StoreNodeRequestOption{})
}
// passStoredCommunityInfoToSignalHandler calls signal handler with community info

View File

@ -1218,7 +1218,10 @@ func (m *Messenger) scheduleSyncFiltersForContact(publicKey *ecdsa.PublicKey) (*
}
func (m *Messenger) FetchContact(contactID string, waitForResponse bool) (*Contact, error) {
contact, _, err := m.storeNodeRequestsManager.FetchContact(contactID, waitForResponse)
options := []StoreNodeRequestOption{
WithWaitForResponseOption(waitForResponse),
}
contact, _, err := m.storeNodeRequestsManager.FetchContact(contactID, options)
return contact, err
}

View File

@ -23,17 +23,17 @@ import (
const (
initialStoreNodeRequestPageSize = 4
defaultStoreNodeRequestPageSize = 20
)
// tolerance is how many seconds of potentially out-of-order messages we want to fetch
var tolerance uint32 = 60
tolerance uint32 = 60
var mailserverRequestTimeout = 30 * time.Second
var oneMonthInSeconds uint32 = 31 * 24 * 60 * 60
var mailserverMaxTries uint = 2
var mailserverMaxFailedRequests uint = 2
mailserverRequestTimeout = 30 * time.Second
oneMonthInSeconds uint32 = 31 * 24 * 60 * 60
mailserverMaxTries uint = 2
mailserverMaxFailedRequests uint = 2
const OneDayInSeconds = 86400
OneDayInSeconds = 86400
)
// maxTopicsPerRequest sets the batch size to limit the number of topics per store query
var maxTopicsPerRequest int = 10

View File

@ -28,13 +28,7 @@ const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
const findNearestMailServer = !isAndroidEmulator
func (m *Messenger) mailserversByFleet(fleet string) []mailservers.Mailserver {
var items []mailservers.Mailserver
for _, ms := range mailservers.DefaultMailservers() {
if ms.Fleet == fleet {
items = append(items, ms)
}
}
return items
return mailservers.DefaultMailserversByFleet(fleet)
}
type byRTTMsAndCanConnectBefore []SortedMailserver

View File

@ -63,18 +63,20 @@ func NewStoreNodeRequestManager(m *Messenger) *StoreNodeRequestManager {
// the function will also wait for the store node response and return the fetched community.
// Automatically waits for an available store node.
// When a `nil` community and `nil` error is returned, that means the community wasn't found at the store node.
func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, waitForResponse bool) (*communities.Community, StoreNodeRequestStats, error) {
func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, opts []StoreNodeRequestOption) (*communities.Community, StoreNodeRequestStats, error) {
cfg := buildStoreNodeRequestConfig(opts)
m.logger.Info("requesting community from store node",
zap.Any("community", community),
zap.Bool("waitForResponse", waitForResponse))
zap.Any("config", cfg))
requestCommunity := func(communityID string, shard *shard.Shard) (*communities.Community, StoreNodeRequestStats, error) {
channel, err := m.subscribeToRequest(storeNodeCommunityRequest, communityID, shard)
channel, err := m.subscribeToRequest(storeNodeCommunityRequest, communityID, shard, cfg)
if err != nil {
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err)
}
if !waitForResponse {
if !cfg.WaitForResponse {
return nil, StoreNodeRequestStats{}, nil
}
@ -86,14 +88,14 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community
communityShard := community.Shard
if communityShard == nil {
id := transport.CommunityShardInfoTopic(community.CommunityID)
shard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultShard())
fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultShard(), cfg)
if err != nil {
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err)
}
if !waitForResponse {
if !cfg.WaitForResponse {
go func() {
shardResult := <-shard
shardResult := <-fetchedShard
communityShard = shardResult.shard
_, _, _ = requestCommunity(community.CommunityID, communityShard)
@ -101,7 +103,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community
return nil, StoreNodeRequestStats{}, nil
}
shardResult := <-shard
shardResult := <-fetchedShard
communityShard = shardResult.shard
}
@ -119,13 +121,16 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community
// those content topics is spammed with to many envelopes, then on each iteration we will have to fetch all
// of this spam first to get the envelopes in other content topics. To avoid this we keep independent requests
// for each content topic.
func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard) error {
func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard, opts []StoreNodeRequestOption) error {
m.logger.Info("requesting communities from store node", zap.Any("communities", communities))
// when fetching multiple communities we don't wait for the response
opts = append(opts, WithWaitForResponseOption(false))
var outErr error
for _, community := range communities {
_, _, err := m.FetchCommunity(community, false)
_, _, err := m.FetchCommunity(community, opts)
if err != nil {
outErr = fmt.Errorf("%sfailed to create a request for community %s: %w", outErr, community.CommunityID, err)
}
@ -134,17 +139,20 @@ func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.Com
return outErr
}
func (m *StoreNodeRequestManager) FetchContact(contactID string, waitForResponse bool) (*Contact, StoreNodeRequestStats, error) {
func (m *StoreNodeRequestManager) FetchContact(contactID string, opts []StoreNodeRequestOption) (*Contact, StoreNodeRequestStats, error) {
cfg := buildStoreNodeRequestConfig(opts)
m.logger.Info("requesting contact from store node",
zap.Any("contactID", contactID),
zap.Bool("waitForResponse", waitForResponse))
zap.Any("config", cfg))
channel, err := m.subscribeToRequest(storeNodeContactRequest, contactID, nil)
channel, err := m.subscribeToRequest(storeNodeContactRequest, contactID, nil, cfg)
if err != nil {
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err)
}
if !waitForResponse {
if !cfg.WaitForResponse {
return nil, StoreNodeRequestStats{}, nil
}
@ -155,7 +163,7 @@ func (m *StoreNodeRequestManager) FetchContact(contactID string, waitForResponse
// subscribeToRequest checks if a request for given community/contact is already in progress, creates and installs
// a new one if not found, and returns a subscription to the result of the found/started request.
// The subscription can then be used to get the result of the request, this could be either a community/contact or an error.
func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeRequestType, dataID string, shard *shard.Shard) (storeNodeResponseSubscription, error) {
func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeRequestType, dataID string, shard *shard.Shard, cfg StoreNodeRequestConfig) (storeNodeResponseSubscription, error) {
// It's important to unlock only after getting the subscription channel.
// We also lock `activeRequestsLock` during finalizing the requests. This ensures that the subscription
// created in this function will get the result even if the requests proceeds faster than this function ends.
@ -184,6 +192,7 @@ func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeReques
}
request = m.newStoreNodeRequest()
request.config = cfg
request.pubsubTopic = filter.PubsubTopic
request.requestID = requestID
request.contentTopic = filter.ContentTopic
@ -286,6 +295,8 @@ type storeNodeRequest struct {
// request parameters
pubsubTopic string
contentTopic types.TopicType
minimumDataClock uint64
config StoreNodeRequestConfig
// request corresponding metadata to be used in finalize
filterToForget *transport.Filter
@ -378,7 +389,21 @@ func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32
if community == nil {
// community not found in the database, request next page
logger.Debug("community still not fetched")
return true, defaultStoreNodeRequestPageSize
return true, r.config.FurtherPageSize
}
// We check here if the community was fetched actually fetched and updated, because it
// could be that the community was already in the database when we started the fetching.
//
// Would be perfect if we could track that the community was in these particular envelopes,
// but I don't think that's possible right now. We check if clock was updated instead.
if community.Clock() <= r.minimumDataClock {
logger.Debug("local community description is not newer than existing",
zap.Any("existingClock", community.Clock()),
zap.Any("minimumDataClock", r.minimumDataClock),
)
return true, r.config.FurtherPageSize
}
logger.Debug("community found",
@ -420,7 +445,7 @@ func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32
if contact == nil {
// contact not found in the database, request next page
logger.Debug("contact still not fetched")
return true, defaultStoreNodeRequestPageSize
return true, r.config.FurtherPageSize
}
logger.Debug("contact found",
@ -429,7 +454,7 @@ func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32
r.result.contact = contact
}
return false, 0
return !r.config.StopWhenDataFound, r.config.FurtherPageSize
}
func (r *storeNodeRequest) routine() {
@ -457,6 +482,15 @@ func (r *storeNodeRequest) routine() {
return
}
// Check if community already exists locally and get Clock.
localCommunity, _ := r.manager.messenger.communitiesManager.GetByIDString(r.requestID.DataID)
if localCommunity != nil {
r.minimumDataClock = localCommunity.Clock()
}
// Start store node request
to := uint32(math.Ceil(float64(r.manager.messenger.GetCurrentTimeInMillis()) / 1000))
from := to - oneMonthInSeconds
@ -472,7 +506,7 @@ func (r *storeNodeRequest) routine() {
r.manager.onPerformingBatch(batch)
}
return nil, r.manager.messenger.processMailserverBatchWithOptions(batch, initialStoreNodeRequestPageSize, r.shouldFetchNextPage, true)
return nil, r.manager.messenger.processMailserverBatchWithOptions(batch, r.config.InitialPageSize, r.shouldFetchNextPage, true)
})
r.result.err = err

View File

@ -0,0 +1,57 @@
package protocol
type StoreNodeRequestConfig struct {
WaitForResponse bool
StopWhenDataFound bool
InitialPageSize uint32
FurtherPageSize uint32
}
type StoreNodeRequestOption func(*StoreNodeRequestConfig)
func defaultStoreNodeRequestConfig() StoreNodeRequestConfig {
return StoreNodeRequestConfig{
WaitForResponse: true,
StopWhenDataFound: true,
InitialPageSize: initialStoreNodeRequestPageSize,
FurtherPageSize: defaultStoreNodeRequestPageSize,
}
}
func buildStoreNodeRequestConfig(opts []StoreNodeRequestOption) StoreNodeRequestConfig {
cfg := defaultStoreNodeRequestConfig()
// TODO: remove these 2 when fixed: https://github.com/waku-org/nwaku/issues/2317
opts = append(opts, WithStopWhenDataFound(false))
opts = append(opts, WithInitialPageSize(defaultStoreNodeRequestPageSize))
for _, opt := range opts {
opt(&cfg)
}
return cfg
}
func WithWaitForResponseOption(waitForResponse bool) StoreNodeRequestOption {
return func(c *StoreNodeRequestConfig) {
c.WaitForResponse = waitForResponse
}
}
func WithStopWhenDataFound(stopWhenDataFound bool) StoreNodeRequestOption {
return func(c *StoreNodeRequestConfig) {
c.StopWhenDataFound = stopWhenDataFound
}
}
func WithInitialPageSize(initialPageSize uint32) StoreNodeRequestOption {
return func(c *StoreNodeRequestConfig) {
c.InitialPageSize = initialPageSize
}
}
func WithFurtherPageSize(furtherPageSize uint32) StoreNodeRequestOption {
return func(c *StoreNodeRequestConfig) {
c.FurtherPageSize = furtherPageSize
}
}

View File

@ -2,10 +2,14 @@ package protocol
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/multiaccounts/accounts"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
@ -27,6 +31,7 @@ import (
mailserversDB "github.com/status-im/status-go/services/mailservers"
waku2 "github.com/status-im/status-go/wakuv2"
wakuV2common "github.com/status-im/status-go/wakuv2/common"
)
const (
@ -57,6 +62,48 @@ type MessengerStoreNodeRequestSuite struct {
logger *zap.Logger
}
type singleResult struct {
EnvelopesCount int
Envelopes []*wakuV2common.ReceivedMessage
Error error
FetchedCommunity *communities.Community
}
func (r *singleResult) toString() string {
resultString := ""
communityString := ""
if r.FetchedCommunity != nil {
communityString = fmt.Sprintf("clock: %d (%s), name: %s, members: %d",
r.FetchedCommunity.Clock(),
time.Unix(int64(r.FetchedCommunity.Clock()), 0).UTC(),
r.FetchedCommunity.Name(),
len(r.FetchedCommunity.Members()),
)
}
if r.Error != nil {
resultString = fmt.Sprintf("error: %s", r.Error.Error())
} else {
resultString = fmt.Sprintf("envelopes fetched: %d, community %s",
r.EnvelopesCount, communityString)
}
for i, envelope := range r.Envelopes {
resultString += fmt.Sprintf("\n\tenvelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
i+1,
envelope.Hash().Hex(),
envelope.Envelope.Message().GetTimestamp(),
time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(),
len(envelope.Envelope.Message().Payload),
envelope.Envelope.Message().ContentTopic,
envelope.Envelope.PubsubTopic(),
)
}
return resultString
}
func (s *MessengerStoreNodeRequestSuite) SetupTest() {
cfg := zap.NewDevelopmentConfig()
cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
@ -67,7 +114,7 @@ func (s *MessengerStoreNodeRequestSuite) SetupTest() {
s.cancel = make(chan struct{}, 10)
storeNodeLogger := s.logger.Named("store-node-waku")
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true)
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true, false)
storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses()
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
@ -78,14 +125,14 @@ func (s *MessengerStoreNodeRequestSuite) SetupTest() {
func (s *MessengerStoreNodeRequestSuite) TearDown() {
close(s.cancel)
s.wakuStoreNode.Stop() // nolint: errcheck
s.owner.Shutdown() // nolint: errcheck
s.bob.Shutdown() // nolint: errcheck
s.Require().NoError(s.wakuStoreNode.Stop())
TearDownMessenger(&s.Suite, s.owner)
TearDownMessenger(&s.Suite, s.bob)
}
func (s *MessengerStoreNodeRequestSuite) createOwner() {
wakuLogger := s.logger.Named("owner-waku-node")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false)
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false)
s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("owner-messenger")
@ -98,7 +145,7 @@ func (s *MessengerStoreNodeRequestSuite) createOwner() {
func (s *MessengerStoreNodeRequestSuite) createBob() {
wakuLogger := s.logger.Named("bob-waku-node")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false)
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, false)
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("bob-messenger")
@ -137,6 +184,8 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za
func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community {
s.waitForAvailableStoreNode(m)
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil)
createCommunityRequest := &requests.CreateCommunity{
Name: RandomLettersString(10),
Description: RandomLettersString(20),
@ -145,15 +194,23 @@ func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communit
Membership: protobuf.CommunityPermissions_AUTO_ACCEPT,
}
response, err := m.CreateCommunity(createCommunityRequest, true)
response, err := m.CreateCommunity(createCommunityRequest, false)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
s.waitForEnvelopes(storeNodeSubscription, 1)
return response.Communities()[0]
}
func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(c *communities.Community, expected *communities.Community) {
if expected == nil {
s.Require().Nil(c)
return
}
s.Require().NotNil(c)
s.Require().Equal(expected.IDString(), c.IDString())
s.Require().Equal(expected.Clock(), c.Clock())
s.Require().Equal(expected.Name(), c.Name())
s.Require().Equal(expected.Identity().Description, c.Identity().Description)
@ -168,16 +225,15 @@ func (s *MessengerStoreNodeRequestSuite) requireContactsEqual(c *Contact, expect
s.Require().Equal(expected.SocialLinks, c.SocialLinks)
}
func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunityInfo *communities.Community) StoreNodeRequestStats {
fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, true)
func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunity *communities.Community) StoreNodeRequestStats {
options := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
}
fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options)
s.Require().NoError(err)
s.Require().NotNil(fetchedCommunity)
s.Require().Equal(communityShard.CommunityID, fetchedCommunity.IDString())
if expectedCommunityInfo != nil {
s.requireCommunitiesEqual(fetchedCommunity, expectedCommunityInfo)
}
s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity)
return stats
}
@ -197,6 +253,54 @@ func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Me
WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout)
}
func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) {
envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100)
envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher)
go func() {
defer envelopesSub.Unsubscribe()
for {
select {
case <-s.cancel:
return
case envelopeEvent := <-envelopesWatcher:
if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable {
continue
}
if topic != nil && *topic != envelopeEvent.Topic {
continue
}
envelope := wakuNode.GetEnvelope(envelopeEvent.Hash)
cb(envelope)
s.logger.Debug("envelope available event for fetched content topic",
zap.Any("envelopeEvent", envelopeEvent),
zap.Any("envelope", envelope),
)
}
}
}()
}
func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string {
storeNodeSubscription := make(chan string, 100)
s.setupEnvelopesWatcher(s.wakuStoreNode, topic, func(envelope *wakuV2common.ReceivedMessage) {
storeNodeSubscription <- envelope.Hash().String()
})
return storeNodeSubscription
}
func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan string, expectedEnvelopesCount int) {
for i := 0; i < expectedEnvelopesCount; i++ {
select {
case <-subscription:
case <-time.After(5 * time.Second):
s.Require().Fail("timeout waiting for store node to receive envelopes")
}
}
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
s.createOwner()
s.createBob()
@ -214,10 +318,9 @@ func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() {
community := s.createCommunity(s.owner)
// Test consecutive requests to check that requests in manager are finalized
for i := 0; i < 2; i++ {
s.waitForAvailableStoreNode(s.bob)
// At second request we expect to fetch nothing, because the community is already in the database
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
s.fetchCommunity(s.bob, community.CommunityShard(), nil)
}
func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() {
@ -278,68 +381,21 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDi
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
// This test is intended to only run locally to test how fast is a big community fetched
// Shouldn't be executed in CI, because it relies on connection to status.prod store nodes.
func (s *MessengerStoreNodeRequestSuite) TestRequestBigCommunity() {
if !runLocalTests {
return
}
// Status CC community
const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a"
communityShard := communities.CommunityShard{
CommunityID: communityID,
Shard: nil,
}
wakuLogger := s.logger.Named("user-waku-node")
messengerLogger := s.logger.Named("user-messenger")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false)
userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)
s.Require().NoError(err)
options := []Option{
WithMailserversDatabase(mailserversDatabase),
WithClusterConfig(params.ClusterConfig{
Fleet: params.FleetStatusProd,
}),
}
// Create bob without `createBob` without func to force status.prod fleet
s.bob, err = newMessengerWithKey(userWaku, privateKey, messengerLogger, options)
s.Require().NoError(err)
fetchedCommunity, stats, err := s.bob.storeNodeRequestsManager.FetchCommunity(communityShard, true)
s.Require().NoError(err)
s.Require().NotNil(fetchedCommunity)
s.Require().Equal(communityID, fetchedCommunity.IDString())
s.Require().Equal(initialStoreNodeRequestPageSize, stats.FetchedEnvelopesCount)
s.Require().Equal(1, stats.FetchedPagesCount)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() {
const spamAmount = defaultStoreNodeRequestPageSize + initialStoreNodeRequestPageSize
s.createOwner()
s.createBob()
// Create a community
community := s.createCommunity(s.owner)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
// Push spam to the same ContentTopic & PubsubTopic
// The first requested page size is 1. All subsequent pages are limited to 20.
// We want to test the algorithm, so we push 21 spam envelopes.
for i := 0; i < defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize; i++ {
for i := 0; i < spamAmount; i++ {
spamMessage := common.RawMessage{
Payload: RandomBytes(16),
Sender: community.PrivateKey(),
@ -351,14 +407,17 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() {
s.Require().NoError(err)
}
// Wait the store node to receive envelopes
s.waitForEnvelopes(storeNodeSubscription, spamAmount)
// Fetch the community
stats := s.fetchCommunity(s.bob, community.CommunityShard(), community)
// Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched.
// First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description,
// then we fetch another page of data and successfully decrypt a community description.
s.Require().Equal(defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize+2, stats.FetchedEnvelopesCount)
s.Require().Equal(3, stats.FetchedPagesCount)
s.Require().Equal(spamAmount+1, stats.FetchedEnvelopesCount)
s.Require().Equal(2, stats.FetchedPagesCount) // TODO: revert to 3 when fixed: https://github.com/waku-org/nwaku/issues/2317
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() {
@ -475,6 +534,47 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() {
s.fetchProfile(s.bob, s.owner.selfContact.ID, s.owner.selfContact)
}
// TestSequentialUpdates checks that making updates to the community
// immediately results in new store node fetched information.
// Before adding storeNodeSubscription we had problems with the test setup that we didn't have a mechanism to wait for store node to
// receive and process new messages.
func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
communityName := community.Name()
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)
for i := 0; i < 3; i++ {
// Change community name, this will automatically publish a new community description
ownerEditRequest := &requests.EditCommunity{
CommunityID: community.ID(),
CreateCommunity: requests.CreateCommunity{
Name: fmt.Sprintf("%s-%d", communityName, i),
Description: community.DescriptionText(),
Color: community.Color(),
Membership: community.Permissions().Access,
},
}
_, err := s.owner.EditCommunity(ownerEditRequest)
s.Require().NoError(err)
s.waitForEnvelopes(storeNodeSubscription, 1)
// Get updated community from the database
community, err = s.owner.communitiesManager.GetByID(community.ID())
s.Require().NoError(err)
s.Require().NotNil(community)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
}
func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() {
s.createOwner()
s.createBob()
@ -518,7 +618,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFiltersNotRemoved() {
filterBefore := s.owner.transport.FilterByChatID(community.IDString())
s.Require().NotNil(filterBefore)
s.fetchCommunity(s.owner, community.CommunityShard(), community)
s.fetchCommunity(s.owner, community.CommunityShard(), nil)
filterAfter := s.owner.transport.FilterByChatID(community.IDString())
s.Require().NotNil(filterAfter)
@ -542,3 +642,179 @@ func (s *MessengerStoreNodeRequestSuite) TestFiltersRemoved() {
filterAfter := s.bob.transport.FilterByChatID(community.IDString())
s.Require().Nil(filterAfter)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
s.createOwner()
s.createBob()
community := s.createCommunity(s.owner)
// Push 5 descriptions to the store node
for i := 0; i < 4; i++ {
err := s.owner.publishOrg(community)
s.Require().NoError(err)
}
// Subscribe to received envelope
bobWakuV2 := gethbridge.GetGethWakuV2From(s.bobWaku)
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
var prevEnvelope *wakuV2common.ReceivedMessage
s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) {
// We check that each next envelope fetched is newer than the previous one
if prevEnvelope != nil {
s.Require().Greater(
envelope.Envelope.Message().GetTimestamp(),
prevEnvelope.Envelope.Message().GetTimestamp())
}
prevEnvelope = envelope
})
// Force a single-envelope page size to be able to check the order.
// Also force all envelopes to be fetched.
options := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
WithStopWhenDataFound(false),
WithInitialPageSize(1),
WithFurtherPageSize(1),
}
// Fetch the community
fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options)
s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, community)
}
// TestFetchRealCommunity is intended to only run locally to check the community description in all of the store nodes.
// Shouldn't be executed in CI, because it relies on connection to the real network.
//
// To run this test, first set `runLocalTests` to true.
// Then carefully set all of communityID, communityShard, fleet and other const variables.
// NOTE: I only tested it with the default parameters, but in theory it should work for any configuration.
func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
if !runLocalTests {
return
}
const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a"
var communityShard *shard.Shard
const fleet = params.FleetStatusProd
const useShardAsDefaultTopic = false
const clusterID = 0
const userPrivateKeyString = "" // When empty a new user will be created
contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
nodesList := mailserversDB.DefaultMailserversByFleet(fleet)
results := map[string]singleResult{}
wg := sync.WaitGroup{}
// We run a separate request for each node in the fleet.
for i, mailserver := range nodesList {
wg.Add(1)
go func(i int, mailserver mailserversDB.Mailserver) {
defer wg.Done()
fmt.Printf("--- starting for %s\n", mailserver.ID)
result := singleResult{}
//
// Create WakuV2 node
// NOTE: Another option was to create a bare waku node and fetch envelopes directly with it
// and after that push all of the envelopes to a new messenger and check the result.
// But this turned out to be harder to implement.
//
wakuLogger := s.logger.Named(fmt.Sprintf("user-waku-node-%d", i))
messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i))
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false, useShardAsDefaultTopic)
userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)
//
// Create a messenger to process envelopes
//
var privateKeyString = userPrivateKeyString
if privateKeyString == "" {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
privateKeyString = hexutil.Encode(crypto.FromECDSA(privateKey))
}
privateKeyBytes, err := hexutil.Decode(privateKeyString)
s.Require().NoError(err)
privateKey, err := crypto.ToECDSA(privateKeyBytes)
s.Require().NoError(err)
// Mock a local fleet with single store node
// This is done by settings custom store nodes in the database
mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)
mailserver.Fleet = localFleet
err = mailserversDatabase.Add(mailserver)
s.Require().NoError(err)
options := []Option{
WithMailserversDatabase(mailserversDatabase),
WithClusterConfig(params.ClusterConfig{
Fleet: localFleet,
ClusterID: clusterID,
}),
}
// Create user without `createBob` func to force desired fleet
user, err := newMessengerWithKey(userWaku, privateKey, messengerLogger, options)
s.Require().NoError(err)
defer TearDownMessenger(&s.Suite, user)
communityAddress := communities.CommunityShard{
CommunityID: communityID,
Shard: communityShard,
}
// Setup envelopes watcher to gather fetched envelopes
s.setupEnvelopesWatcher(wakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) {
result.Envelopes = append(result.Envelopes, envelope)
})
// Start fetching
storeNodeRequestOptions := []StoreNodeRequestOption{
WithWaitForResponseOption(true),
WithStopWhenDataFound(false), // In this test we want all envelopes to be fetched
WithInitialPageSize(defaultStoreNodeRequestPageSize), // Because we're fetching all envelopes anyway
}
fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions)
result.EnvelopesCount = stats.FetchedEnvelopesCount
result.FetchedCommunity = fetchedCommunity
result.Error = err
results[mailserver.ID] = result
}(i, mailserver)
}
// Wait for all requests to finish
wg.Wait()
// Print the results
for storeNodeName, result := range results {
fmt.Printf("%s --- %s\n", storeNodeName, result.toString())
}
}

View File

@ -16,8 +16,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
@ -253,9 +251,10 @@ func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Durati
s.Require().True(available)
}
func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool) *waku2.Waku {
func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool, useShardAsDefaultTopic bool) *waku2.Waku {
wakuConfig := &waku2.Config{
DefaultShardPubsubTopic: relay.DefaultWakuTopic, // shard.DefaultShardPubsubTopic(),
DefaultShardPubsubTopic: "", // TODO: Empty string should work fine, for default value if not.
UseShardAsDefaultTopic: useShardAsDefaultTopic,
}
var onPeerStats func(connStatus types.ConnStatus)
@ -316,7 +315,7 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s
nodes := make([]*waku2.Waku, len(nodeNames))
for i, name := range nodeNames {
logger := parentLogger.With(zap.String("name", name+"-waku"))
wakuNode := NewWakuV2(s, logger, true, false)
wakuNode := NewWakuV2(s, logger, true, false, false)
nodes[i] = wakuNode
}
@ -343,6 +342,9 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s
}
func TearDownMessenger(s *suite.Suite, m *Messenger) {
if m == nil {
return
}
s.Require().NoError(m.Shutdown())
if m.database != nil {
s.Require().NoError(m.database.Close())

View File

@ -2,6 +2,16 @@ package mailservers
import "github.com/status-im/status-go/params"
func DefaultMailserversByFleet(fleet string) []Mailserver {
var items []Mailserver
for _, ms := range DefaultMailservers() {
if ms.Fleet == fleet {
items = append(items, ms)
}
}
return items
}
func DefaultMailservers() []Mailserver {
return []Mailserver{

View File

@ -1150,7 +1150,11 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
msg.RateLimitProof = nil
envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic)
w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic))
w.logger.Info("received waku2 store message",
zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())),
zap.String("pubsubTopic", pubsubTopic),
zap.Int64p("timestamp", envelope.Message().Timestamp),
)
err = w.OnNewEnvelopes(envelope, common.StoreMessageType, processEnvelopes)
if err != nil {