chore: store node requests manager (#4446)

This commit is contained in:
Igor Sirotin 2023-12-15 19:50:12 +00:00 committed by GitHub
parent 93aeefcb89
commit e3ef8c649a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1035 additions and 459 deletions

View File

@ -1 +1 @@
0.171.34
0.171.35

View File

@ -268,8 +268,8 @@ func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}
func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
return nil, errors.New("not implemented")
func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) {
return nil, 0, errors.New("not implemented")
}
func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {}

View File

@ -176,13 +176,14 @@ func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesR
return errors.New("DEPRECATED")
}
func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) {
var options []store.HistoryRequestOption
peer, err := peer.Decode(string(peerID))
if err != nil {
return nil, err
return nil, 0, err
}
options = []store.HistoryRequestOption{
store.WithPaging(false, uint64(r.Limit)),
}
@ -201,9 +202,9 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic))
}
pbCursor, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options)
pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes)
if err != nil {
return nil, err
return nil, 0, err
}
if pbCursor != nil {
@ -212,10 +213,10 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b
ReceiverTime: pbCursor.ReceiverTime,
SenderTime: pbCursor.SenderTime,
PubsubTopic: pbCursor.PubsubTopic,
}, nil
}, envelopesCount, nil
}
return nil, nil
return nil, envelopesCount, nil
}
// DEPRECATED: Not used in waku V2

View File

@ -151,7 +151,7 @@ type Waku interface {
SendMessagesRequest(peerID []byte, request MessagesRequest) error
// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest) (*StoreRequestCursor, error)
RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest, processEnvelopes bool) (*StoreRequestCursor, int, error)
// ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool

View File

@ -389,6 +389,13 @@ func (o *Community) Shard() *shard.Shard {
return nil
}
func (o *Community) CommunityShard() CommunityShard {
return CommunityShard{
CommunityID: o.IDString(),
Shard: o.Shard(),
}
}
func (o *Community) IntroMessage() string {
if o != nil &&
o.config != nil &&

View File

@ -1519,6 +1519,8 @@ func (m *Manager) Queue(signer *ecdsa.PublicKey, community *Community, clock uin
}
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte, verifiedOwner *ecdsa.PublicKey, communityShard *protobuf.Shard) (*CommunityResponse, error) {
m.logger.Debug("HandleCommunityDescriptionMessage", zap.String("communityID", description.ID))
if signer == nil {
return nil, errors.New("signer can't be nil")
}

View File

@ -114,6 +114,7 @@ type Messenger struct {
communitiesKeyDistributor CommunitiesKeyDistributor
accountsManager account.Manager
mentionsManager *MentionManager
storeNodeRequestsManager *StoreNodeRequestManager
logger *zap.Logger
outputCSV bool
@ -153,9 +154,6 @@ type Messenger struct {
once sync.Once
}
requestedCommunitiesLock sync.RWMutex
requestedCommunities map[string]*transport.Filter
requestedContactsLock sync.RWMutex
requestedContacts map[string]*transport.Filter
@ -533,18 +531,16 @@ func NewMessenger(
peers: make(map[string]peerStatus),
availabilitySubscriptions: make([]chan struct{}, 0),
},
mailserversDatabase: c.mailserversDatabase,
account: c.account,
quit: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestedCommunitiesLock: sync.RWMutex{},
requestedCommunities: make(map[string]*transport.Filter),
requestedContactsLock: sync.RWMutex{},
requestedContacts: make(map[string]*transport.Filter),
importingCommunities: make(map[string]bool),
importingChannels: make(map[string]bool),
importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1),
mailserversDatabase: c.mailserversDatabase,
account: c.account,
quit: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestedContactsLock: sync.RWMutex{},
requestedContacts: make(map[string]*transport.Filter),
importingCommunities: make(map[string]bool),
importingChannels: make(map[string]bool),
importRateLimiter: rate.NewLimiter(rate.Every(importSlowRate), 1),
importDelayer: struct {
wait chan struct{}
once sync.Once
@ -587,6 +583,7 @@ func NewMessenger(
}
messenger.mentionsManager = NewMentionManager(messenger)
messenger.storeNodeRequestsManager = NewCommunityRequestsManager(messenger)
if c.walletService != nil {
messenger.walletAPI = walletAPI
@ -791,7 +788,6 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
m.handleCommunitiesSubscription(m.communitiesManager.Subscribe())
m.handleCommunitiesHistoryArchivesSubscription(m.communitiesManager.Subscribe())
m.updateCommunitiesActiveMembersPeriodically()
m.handleConnectionChange(m.online())
m.handleENSVerificationSubscription(ensSubscription)
m.watchConnectionChange()
m.watchChatsAndCommunitiesToUnmute()
@ -926,13 +922,6 @@ func (m *Messenger) handleConnectionChange(online bool) {
}
m.shouldPublishContactCode = false
}
go func() {
_, err := m.RequestAllHistoricMessagesWithRetries(false)
if err != nil {
m.logger.Warn("failed to fetch historic messages", zap.Error(err))
}
}()
} else {
if m.pushNotificationClient != nil {
m.pushNotificationClient.Offline()
@ -1457,6 +1446,7 @@ func (m *Messenger) handleENSVerificationSubscription(c chan []*ens.Verification
func (m *Messenger) watchConnectionChange() {
m.logger.Debug("watching connection changes")
state := m.online()
m.handleConnectionChange(state)
go func() {
for {
select {
@ -1870,6 +1860,9 @@ func (m *Messenger) Init() error {
// Shutdown takes care of ensuring a clean shutdown of Messenger
func (m *Messenger) Shutdown() (err error) {
if m == nil {
return nil
}
close(m.quit)
m.cancel()
m.shutdownWaitGroup.Wait()

View File

@ -6,11 +6,9 @@ import (
"database/sql"
"errors"
"fmt"
"math"
"sync"
"time"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
@ -73,6 +71,9 @@ func (r *FetchCommunityRequest) Validate() error {
if len(r.CommunityKey) <= 2 {
return fmt.Errorf("community key is too short")
}
if _, err := types.DecodeHex(r.CommunityKey); err != nil {
return fmt.Errorf("invalid community key")
}
return nil
}
@ -2554,260 +2555,26 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities
}
}
return m.requestCommunityInfoFromMailserver(communityID, request.Shard, request.WaitForResponse)
community, _, err := m.storeNodeRequestsManager.FetchCommunity(communities.CommunityShard{
CommunityID: communityID,
Shard: request.Shard,
}, request.WaitForResponse)
return community, err
}
// requestCommunityInfoFromMailserver installs filter for community and requests its details
// from mailserver. When response received it will be passed through signals handler
func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *shard.Shard, waitForResponse bool) (*communities.Community, error) {
m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard))
m.requestedCommunitiesLock.Lock()
defer m.requestedCommunitiesLock.Unlock()
if _, ok := m.requestedCommunities[communityID]; ok {
return nil, nil
}
//If filter wasn't installed we create it and remember for deinstalling after
//response received
filter := m.transport.FilterByChatID(communityID)
if filter == nil {
filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: communityID,
PubsubTopic: shard.PubsubTopic(),
}})
if err != nil {
return nil, fmt.Errorf("Can't install filter for community: %v", err)
}
if len(filters) != 1 {
return nil, fmt.Errorf("Unexpected amount of filters created")
}
filter = filters[0]
m.requestedCommunities[communityID] = filter
m.logger.Debug("created filter for community",
zap.String("filterID", filter.FilterID),
zap.String("communityID", communityID),
zap.String("PubsubTopic", filter.PubsubTopic),
)
} else {
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[communityID] = nil
}
defer m.forgetCommunityRequest(communityID)
to := uint32(math.Ceil(float64(m.GetCurrentTimeInMillis()) / 1000))
from := to - oneMonthInSeconds
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
batch := MailserverBatch{
From: from,
To: to,
Topics: []types.TopicType{filter.ContentTopic},
PubsubTopic: filter.PubsubTopic,
}
m.logger.Info("requesting historic", zap.Any("batch", batch))
err := m.processMailserverBatch(batch)
return nil, err
})
if err != nil {
return nil, err
}
m.logger.Info("mailserver request performed",
zap.String("communityID", communityID),
zap.Bool("waitForResponse", waitForResponse),
)
if !waitForResponse {
return nil, nil
}
// TODO: We can force to process all messages then we don't have to wait?
//m.ProcessAllMessages()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
for {
select {
case <-time.After(200 * time.Millisecond):
//send signal to client that message status updated
community, err := m.communitiesManager.GetByIDString(communityID)
if err != nil {
return nil, err
}
if community != nil && community.Name() != "" && community.DescriptionText() != "" {
m.logger.Debug("community info found",
zap.String("communityID", communityID),
zap.String("displayName", community.Name()))
return community, nil
}
case <-ctx.Done():
m.logger.Error("failed to request community info", zap.String("communityID", communityID), zap.Error(ctx.Err()))
return nil, fmt.Errorf("failed to request community info for id '%s' from mailserver: %w", communityID, ctx.Err())
}
}
}
// requestCommunitiesFromMailserver installs filter for community and requests its details
// from mailserver. When response received it will be passed through signals handler
func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.CommunityShard) {
m.requestedCommunitiesLock.Lock()
defer m.requestedCommunitiesLock.Unlock()
// we group topics by PubsubTopic
groupedTopics := map[string]map[types.TopicType]struct{}{}
for _, c := range communities {
if _, ok := m.requestedCommunities[c.CommunityID]; ok {
continue
}
//If filter wasn't installed we create it and remember for deinstalling after
//response received
filter := m.transport.FilterByChatID(c.CommunityID)
if filter == nil {
filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: c.CommunityID,
PubsubTopic: c.Shard.PubsubTopic(),
}})
if err != nil {
m.logger.Error("Can't install filter for community", zap.Error(err))
continue
}
if len(filters) != 1 {
m.logger.Error("Unexpected amount of filters created")
continue
}
filter = filters[0]
m.requestedCommunities[c.CommunityID] = filter
} else {
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[c.CommunityID] = nil
}
if _, ok := groupedTopics[filter.PubsubTopic]; !ok {
groupedTopics[filter.PubsubTopic] = map[types.TopicType]struct{}{}
}
groupedTopics[filter.PubsubTopic][filter.ContentTopic] = struct{}{}
}
defer func() {
for _, c := range communities {
m.forgetCommunityRequest(c.CommunityID)
}
}()
to := uint32(m.transport.GetCurrentTime() / 1000)
from := to - oneMonthInSeconds
wg := sync.WaitGroup{}
for pubsubTopic, contentTopics := range groupedTopics {
wg.Add(1)
go func(pubsubTopic string, contentTopics map[types.TopicType]struct{}) {
batch := MailserverBatch{
From: from,
To: to,
Topics: maps.Keys(contentTopics),
PubsubTopic: pubsubTopic,
}
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
m.logger.Info("requesting historic", zap.Any("batch", batch))
err := m.processMailserverBatch(batch)
return nil, err
})
if err != nil {
m.logger.Error("error performing mailserver request", zap.Any("batch", batch), zap.Error(err))
}
wg.Done()
}(pubsubTopic, contentTopics)
}
wg.Wait()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
fetching := true
for fetching {
select {
case <-time.After(200 * time.Millisecond):
allLoaded := true
for _, c := range communities {
community, err := m.communitiesManager.GetByIDString(c.CommunityID)
if err != nil {
m.logger.Error("Error loading community", zap.Error(err))
break
}
if community == nil || community.Name() == "" || community.DescriptionText() == "" {
allLoaded = false
break
}
}
if allLoaded {
fetching = false
}
case <-ctx.Done():
fetching = false
}
}
}
// forgetCommunityRequest removes community from requested ones and removes filter
func (m *Messenger) forgetCommunityRequest(communityID string) {
m.logger.Info("forgetting community request", zap.String("communityID", communityID))
filter, ok := m.requestedCommunities[communityID]
if !ok {
return
}
if filter != nil {
err := m.transport.RemoveFilters([]*transport.Filter{filter})
if err != nil {
m.logger.Warn("cant remove filter", zap.Error(err))
}
}
delete(m.requestedCommunities, communityID)
// fetchCommunities installs filter for community and requests its details from store node.
// When response received it will be passed through signals handler.
func (m *Messenger) fetchCommunities(communities []communities.CommunityShard) error {
return m.storeNodeRequestsManager.FetchCommunities(communities)
}
// passStoredCommunityInfoToSignalHandler calls signal handler with community info
func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) {
func (m *Messenger) passStoredCommunityInfoToSignalHandler(community *communities.Community) {
if m.config.messengerSignalsHandler == nil {
return
}
//send signal to client that message status updated
community, err := m.communitiesManager.GetByIDString(communityID)
if community == nil {
return
}
if err != nil {
m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err))
return
}
//if there is no info helpful for client, we don't post it
if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 {
return
}
m.config.messengerSignalsHandler.CommunityInfoFound(community)
m.forgetCommunityRequest(communityID)
}
// handleCommunityDescription handles an community description

View File

@ -133,7 +133,9 @@ func (m *Messenger) fetchCuratedCommunities(curatedCommunities *communities.Cura
})
}
go m.requestCommunitiesFromMailserver(unknownCommunities)
go func() {
_ = m.fetchCommunities(unknownCommunities)
}()
return response, nil
}

View File

@ -3665,13 +3665,6 @@ func (m *Messenger) HandleCommunityDescription(state *ReceivedMessageState, mess
m.logger.Warn("failed to handle CommunityDescription", zap.Error(err))
return err
}
//if community was among requested ones, send its info and remove filter
for communityID := range m.requestedCommunities {
if _, ok := state.Response.communities[communityID]; ok {
m.passStoredCommunityInfoToSignalHandler(communityID)
}
}
return nil
}

View File

@ -20,6 +20,11 @@ import (
"github.com/status-im/status-go/services/mailservers"
)
const (
initialStoreNodeRequestPageSize = 4
defaultStoreNodeRequestPageSize = 20
)
// tolerance is how many seconds of potentially out-of-order messages we want to fetch
var tolerance uint32 = 60
@ -621,6 +626,7 @@ type work struct {
contentTopics []types.TopicType
cursor []byte
storeCursor *types.StoreRequestCursor
limit uint32
}
type messageRequester interface {
@ -632,11 +638,23 @@ type messageRequester interface {
previousStoreCursor *types.StoreRequestCursor,
pubsubTopic string,
contentTopics []types.TopicType,
limit uint32,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error)
processEnvelopes bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error)
}
func processMailserverBatch(ctx context.Context, messageRequester messageRequester, batch MailserverBatch, mailserverID []byte, logger *zap.Logger) error {
func processMailserverBatch(
ctx context.Context,
messageRequester messageRequester,
batch MailserverBatch,
mailserverID []byte,
logger *zap.Logger,
pageLimit uint32,
shouldProcessNextPage func(int) (bool, uint32),
processEnvelopes bool,
) error {
var topicStrings []string
for _, t := range batch.Topics {
topicStrings = append(topicStrings, t.String())
@ -680,6 +698,7 @@ func processMailserverBatch(ctx context.Context, messageRequester messageRequest
workCh <- work{
pubsubTopic: batch.PubsubTopic,
contentTopics: batch.Topics[i:j],
limit: pageLimit,
}
time.Sleep(50 * time.Millisecond)
}
@ -719,7 +738,7 @@ loop:
}()
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, true)
cursor, storeCursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
queryCancel()
@ -729,16 +748,32 @@ loop:
return
}
if len(cursor) != 0 || storeCursor != nil {
logger.Debug("processBatch producer - creating work (cursor)")
processNextPage := true
nextPageLimit := pageLimit
workWg.Add(1)
workCh <- work{
pubsubTopic: w.pubsubTopic,
contentTopics: w.contentTopics,
cursor: cursor,
storeCursor: storeCursor,
}
if shouldProcessNextPage != nil {
processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount)
}
if !processNextPage {
return
}
// Check the cursor after calling `shouldProcessNextPage`.
// The app might use process the fetched envelopes in the callback for own needs.
if len(cursor) == 0 && storeCursor == nil {
return
}
logger.Debug("processBatch producer - creating work (cursor)")
workWg.Add(1)
workCh <- work{
pubsubTopic: w.pubsubTopic,
contentTopics: w.contentTopics,
cursor: cursor,
storeCursor: storeCursor,
limit: nextPageLimit,
}
}(w)
case err := <-errCh:
@ -768,7 +803,16 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
return err
}
return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger)
return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger, defaultStoreNodeRequestPageSize, nil, false)
}
func (m *Messenger) processMailserverBatchWithOptions(batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error {
mailserverID, err := m.activeMailserverID()
if err != nil {
return err
}
return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, m.logger, pageLimit, shouldProcessNextPage, processEnvelopes)
}
type MailserverBatch struct {

View File

@ -9,6 +9,7 @@ import (
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/pkg/errors"
@ -559,12 +560,11 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
}
// Query mailserver
go func() {
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(false) })
_, err := m.RequestAllHistoricMessagesWithRetries(false)
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
} else {
m.mailPeersMutex.Unlock()
}
@ -741,3 +741,46 @@ func (m *Messenger) disconnectStorenodeIfRequired() error {
return nil
}
func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool {
// Add 1 second to timeout, because the mailserver cycle has 1 second ticker, which doesn't tick on start.
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
timeout += time.Second
finish := make(chan struct{})
cancel := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
for !m.isActiveMailserverAvailable() {
select {
case <-m.SubscribeMailserverAvailable():
case <-cancel:
return
}
}
}()
go func() {
defer func() {
close(finish)
}()
wg.Wait()
}()
select {
case <-finish:
case <-time.After(timeout):
close(cancel)
case <-m.ctx.Done():
close(cancel)
}
return m.isActiveMailserverAvailable()
}

View File

@ -43,8 +43,10 @@ func (t *mockTransport) SendMessagesRequestForTopics(
previousStoreCursor *types.StoreRequestCursor,
pubsubTopic string,
contentTopics []types.TopicType,
limit uint32,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
processEnvelopes bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) {
var response queryResponse
if previousCursor == nil {
initialResponse := getInitialResponseKey(contentTopics)
@ -52,7 +54,7 @@ func (t *mockTransport) SendMessagesRequestForTopics(
} else {
response = t.queryResponses[hex.EncodeToString(previousCursor)]
}
return response.cursor, nil, response.err
return response.cursor, nil, 0, response.err
}
func (t *mockTransport) Populate(topics []types.TopicType, responses int, includeRandomError bool) error {
@ -130,7 +132,7 @@ func TestProcessMailserverBatchHappyPath(t *testing.T) {
Topics: topics,
}
err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger)
err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
require.NoError(t, err)
}
@ -151,6 +153,6 @@ func TestProcessMailserverBatchFailure(t *testing.T) {
Topics: topics,
}
err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger)
err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
require.Error(t, err)
}

View File

@ -0,0 +1,334 @@
package protocol
import (
"fmt"
"math"
"sync"
"time"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/transport"
)
const (
storeNodeAvailableTimeout = 30 * time.Second
)
// FetchCommunityStats is used in tests
type FetchCommunityStats struct {
FetchedEnvelopesCount int
FetchedPagesCount int
}
type StoreNodeRequestManager struct {
messenger *Messenger
logger *zap.Logger
// activeRequests contain all ongoing store node requests.
// Map is indexed with `CommunityID`.
// Request might be duplicated in the map if the request is for multiple communities.
activeRequests map[string]*storeNodeRequest
// activeRequestsLock should be locked each time activeRequests is being accessed or changed.
activeRequestsLock sync.RWMutex
onPerformingBatch func(MailserverBatch)
}
func NewCommunityRequestsManager(m *Messenger) *StoreNodeRequestManager {
return &StoreNodeRequestManager{
messenger: m,
logger: m.logger.Named("StoreNodeRequestManager"),
activeRequests: map[string]*storeNodeRequest{},
activeRequestsLock: sync.RWMutex{},
onPerformingBatch: nil,
}
}
// FetchCommunity makes a single request to store node for a given community id/shard pair.
// When a community is successfully fetched, a `CommunityFound` event will be emitted. If `waitForResponse == true`,
// the function will also wait for the store node response and return the fetched community.
// Automatically waits for an available store node.
// When a `nil` community and `nil` error is returned, that means the community wasn't found at the store node.
func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, waitForResponse bool) (*communities.Community, FetchCommunityStats, error) {
m.logger.Info("requesting community from store node",
zap.Any("community", community),
zap.Bool("waitForResponse", waitForResponse))
channel, err := m.subscribeToCommunityRequest(community)
if err != nil {
return nil, FetchCommunityStats{}, fmt.Errorf("failed to create a request for community: %w", err)
}
if !waitForResponse {
return nil, FetchCommunityStats{}, nil
}
result := <-channel
return result.community, result.stats, result.err
}
// FetchCommunities makes a FetchCommunity for each element in given `communities` list.
// For each successfully fetched community, a `CommunityFound` event will be emitted. Ability to subscribe
// to results is not provided, because it's not needed and would complicate the code. `FetchCommunity` can
// be called directly if such functionality is needed.
//
// This function intentionally doesn't fetch multiple content topics in a single store node request. For now
// FetchCommunities is only used for regular (once in 2 minutes) fetching of curated communities. If one of
// those content topics is spammed with to many envelopes, then on each iteration we will have to fetch all
// of this spam first to get the envelopes in other content topics. To avoid this we keep independent requests
// for each content topic.
func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard) error {
m.logger.Info("requesting communities from store node", zap.Any("communities", communities))
var outErr error
for _, community := range communities {
_, _, err := m.FetchCommunity(community, false)
if err != nil {
outErr = fmt.Errorf("%sfailed to create a request for community %s: %w", outErr, community.CommunityID, err)
}
}
return outErr
}
// subscribeToCommunityRequest checks if a request for given community is already in progress, creates and installs
// a new one if not found, and returns a subscription to the result of the found/started request.
// The subscription can then be used to get the result of the request, this could be either a community or an error.
func (m *StoreNodeRequestManager) subscribeToCommunityRequest(community communities.CommunityShard) (communitySubscriptionChannel, error) {
// It's important to unlock only after getting the subscription channel.
// We also lock `activeRequestsLock` during finalizing the requests. This ensures that the subscription
// created in this function will get the result even if the requests proceeds faster than this function ends.
m.activeRequestsLock.Lock()
defer m.activeRequestsLock.Unlock()
request, requestFound := m.activeRequests[community.CommunityID]
if !requestFound {
// Create corresponding filter
filter, filterCreated, err := m.getFilter(community)
if err != nil {
return nil, fmt.Errorf("failed to create community filter: %w", err)
}
request = m.newStoreNodeRequest()
request.pubsubTopic = filter.PubsubTopic
request.communityID = community.CommunityID
request.contentTopic = filter.ContentTopic
if filterCreated {
request.filterToForget = filter
}
m.activeRequests[community.CommunityID] = request
request.start()
}
return request.subscribe(), nil
}
// newStoreNodeRequest creates a new storeNodeRequest struct
func (m *StoreNodeRequestManager) newStoreNodeRequest() *storeNodeRequest {
return &storeNodeRequest{
manager: m,
subscriptions: make([]communitySubscriptionChannel, 0),
}
}
// getFilter checks if a filter for a given community is already created and creates one of not found.
// Returns the found/created filter, a flag if the filter was created by the function and an error.
func (m *StoreNodeRequestManager) getFilter(c communities.CommunityShard) (*transport.Filter, bool, error) {
// First check if such filter already exists.
filter := m.messenger.transport.FilterByChatID(c.CommunityID)
if filter != nil {
//we don't remember filter id associated with community because it was already installed
return filter, false, nil
}
// If filter wasn't installed we create it and
// remember for uninstalling after response is received
filters, err := m.messenger.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: c.CommunityID,
PubsubTopic: c.Shard.PubsubTopic(),
}})
if err != nil {
m.logger.Error("can't install filter for community", zap.Error(err))
return nil, true, err
}
if len(filters) != 1 {
m.logger.Error("Unexpected number of filters created")
return nil, true, fmt.Errorf("unexepcted number of filters created")
}
return filters[0], true, nil
}
// forgetFilter uninstalls the given filter
func (m *StoreNodeRequestManager) forgetFilter(filter *transport.Filter) {
err := m.messenger.transport.RemoveFilters([]*transport.Filter{filter})
if err != nil {
m.logger.Warn("failed to remove filter", zap.Error(err))
}
}
// storeNodeRequest represents a single store node batch request.
// For a valid storeNodeRequest to be performed, the user must set all the struct fields and call start method.
type storeNodeRequest struct {
// request parameters
pubsubTopic string
contentTopic types.TopicType
// request corresponding metadata to be used in finalize
filterToForget *transport.Filter
communityID string
// internal fields
manager *StoreNodeRequestManager
subscriptions []communitySubscriptionChannel
result fetchCommunityResult
}
// fetchCommunityResult contains result of a single storeNodeRequest
// If any error occurs during the request, err field will be set.
// If a community was successfully fetched, community field will contain the fetched information.
// If a community wasn't found in store node, then a community will be set to `nil`.
// stats will contain information about the performed request that might be useful for testing.
type fetchCommunityResult struct {
err error
community *communities.Community
stats FetchCommunityStats
}
type communitySubscriptionChannel = chan fetchCommunityResult
func (r *storeNodeRequest) subscribe() communitySubscriptionChannel {
channel := make(communitySubscriptionChannel, 100)
r.subscriptions = append(r.subscriptions, channel)
return channel
}
func (r *storeNodeRequest) finalize() {
r.manager.activeRequestsLock.Lock()
defer r.manager.activeRequestsLock.Unlock()
r.manager.logger.Info("request finished",
zap.String("communityID", r.communityID),
zap.Bool("communityFound", r.result.community != nil),
zap.Error(r.result.err))
// Send the result to subscribers
// It's important that this is done with `activeRequestsLock` locked.
for _, s := range r.subscriptions {
s <- r.result
close(s)
}
if r.result.community != nil {
r.manager.messenger.passStoredCommunityInfoToSignalHandler(r.result.community)
}
delete(r.manager.activeRequests, r.communityID)
if r.filterToForget != nil {
r.manager.forgetFilter(r.filterToForget)
}
}
func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32) {
logger := r.manager.logger.With(
zap.String("communityID", r.communityID),
zap.Int("envelopesCount", envelopesCount))
r.result.stats.FetchedEnvelopesCount += envelopesCount
r.result.stats.FetchedPagesCount++
// Force all received envelopes to be processed
r.manager.messenger.ProcessAllMessages()
// Try to get community from database
community, err := r.manager.messenger.communitiesManager.GetByIDString(r.communityID)
if err != nil {
logger.Error("failed to read from database",
zap.String("communityID", r.communityID),
zap.Error(err))
r.result = fetchCommunityResult{
community: nil,
err: fmt.Errorf("failed to read from database: %w", err),
}
return false, 0 // failed to read from database, no sense to continue the procedure
}
if community == nil {
// community not found in the database, request next page
logger.Debug("community still not fetched")
return true, defaultStoreNodeRequestPageSize
}
logger.Debug("community found",
zap.String("displayName", community.Name()))
r.result.community = community
return false, 0
}
func (r *storeNodeRequest) routine() {
r.manager.logger.Info("starting store node request",
zap.String("communityID", r.communityID),
zap.String("pubsubTopic", r.pubsubTopic),
zap.Any("contentTopic", r.contentTopic),
)
// Return a nil community and no error when request was
// performed successfully, but no community found.
r.result = fetchCommunityResult{
community: nil,
err: nil,
}
defer func() {
r.finalize()
}()
if !r.manager.messenger.waitForAvailableStoreNode(storeNodeAvailableTimeout) {
r.result.community = nil
r.result.err = fmt.Errorf("store node is not available")
return
}
to := uint32(math.Ceil(float64(r.manager.messenger.GetCurrentTimeInMillis()) / 1000))
from := to - oneMonthInSeconds
_, err := r.manager.messenger.performMailserverRequest(func() (*MessengerResponse, error) {
batch := MailserverBatch{
From: from,
To: to,
PubsubTopic: r.pubsubTopic,
Topics: []types.TopicType{r.contentTopic},
}
r.manager.logger.Info("perform store node request", zap.Any("batch", batch))
if r.manager.onPerformingBatch != nil {
r.manager.onPerformingBatch(batch)
}
return nil, r.manager.messenger.processMailserverBatchWithOptions(batch, initialStoreNodeRequestPageSize, r.shouldFetchNextPage, true)
})
r.result.err = err
}
func (r *storeNodeRequest) start() {
r.manager.logger.Debug("starting new community request",
zap.Any("communities", r.communityID),
zap.String("pubsubTopic", r.pubsubTopic),
zap.Any("contentTopic", r.contentTopic),
)
go r.routine()
}

View File

@ -1,20 +1,25 @@
package protocol
import (
"context"
"sync"
"testing"
"time"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/tt"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/t/helpers"
mailserversDB "github.com/status-im/status-go/services/mailservers"
@ -22,8 +27,10 @@ import (
)
const (
localFleet = "local-test-fleet-1"
localMailserverID = "local-test-mailserver"
localFleet = "local-test-fleet-1"
localMailserverID = "local-test-mailserver"
storeNodeConnectTimeout = 500 * time.Millisecond
runLocalTests = false
)
func TestMessengerStoreNodeRequestSuite(t *testing.T) {
@ -33,16 +40,68 @@ func TestMessengerStoreNodeRequestSuite(t *testing.T) {
type MessengerStoreNodeRequestSuite struct {
suite.Suite
cancel chan struct{}
owner *Messenger
bob *Messenger
wakuStoreNode *waku2.Waku
ownerWaku types.Waku
bobWaku types.Waku
wakuStoreNode *waku2.Waku
storeNodeAddress string
ownerWaku types.Waku
bobWaku types.Waku
logger *zap.Logger
}
func (s *MessengerStoreNodeRequestSuite) SetupTest() {
cfg := zap.NewDevelopmentConfig()
cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
cfg.Development = false
cfg.DisableStacktrace = true
s.logger = tt.MustCreateTestLoggerWithConfig(cfg)
s.cancel = make(chan struct{}, 10)
storeNodeLogger := s.logger.With(zap.String("name", "store-node-waku"))
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true)
storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses()
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
s.storeNodeAddress = storeNodeListenAddresses[0]
s.logger.Info("store node ready", zap.String("address", s.storeNodeAddress))
}
func (s *MessengerStoreNodeRequestSuite) TearDown() {
close(s.cancel)
s.wakuStoreNode.Stop() // nolint: errcheck
s.owner.Shutdown() // nolint: errcheck
s.bob.Shutdown() // nolint: errcheck
}
func (s *MessengerStoreNodeRequestSuite) createOwner() {
wakuLogger := s.logger.Named("owner-waku-node")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false)
s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("owner-messenger")
s.owner = s.newMessenger(s.ownerWaku, messengerLogger, s.storeNodeAddress)
// We force the owner to use the store node as relay peer
err := s.owner.DialPeer(s.storeNodeAddress)
s.Require().NoError(err)
}
func (s *MessengerStoreNodeRequestSuite) createBob() {
wakuLogger := s.logger.Named("bob-waku-node")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false)
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
messengerLogger := s.logger.Named("bob-messenger")
s.bob = s.newMessenger(s.bobWaku, messengerLogger, s.storeNodeAddress)
}
func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress string) *Messenger {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
@ -72,51 +131,14 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za
return messenger
}
func (s *MessengerStoreNodeRequestSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()
// Create store node
storeNodeLogger := s.logger.With(zap.String("name", "store-node-waku"))
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true)
storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses()
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
storeNodeAddress := storeNodeListenAddresses[0]
s.logger.Info("store node ready", zap.String("address", storeNodeAddress))
// Create community owner
ownerWakuLogger := s.logger.With(zap.String("name", "owner-waku"))
s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, ownerWakuLogger, true, false))
ownerLogger := s.logger.With(zap.String("name", "owner"))
s.owner = s.newMessenger(s.ownerWaku, ownerLogger, storeNodeAddress)
// Create an independent user
bobWakuLogger := s.logger.With(zap.String("name", "owner-waku"))
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, bobWakuLogger, true, false))
bobLogger := s.logger.With(zap.String("name", "bob"))
s.bob = s.newMessenger(s.bobWaku, bobLogger, storeNodeAddress)
s.bob.StartRetrieveMessagesLoop(time.Second, nil)
// Connect owner to storenode so message is stored
err := s.ownerWaku.DialPeer(storeNodeAddress)
s.Require().NoError(err)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
WaitForAvailableStoreNode(&s.Suite, s.owner, time.Second)
func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community {
createCommunityRequest := &requests.CreateCommunity{
Name: "panda-lovers",
Description: "we love pandas",
Name: RandomLettersString(10),
Description: RandomLettersString(20),
Color: RandomColor(),
Tags: RandomCommunityTags(3),
Membership: protobuf.CommunityPermissions_AUTO_ACCEPT,
Color: "#ff0000",
Tags: []string{"Web3"},
}
response, err := s.owner.CreateCommunity(createCommunityRequest, true)
@ -124,20 +146,295 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
community := response.Communities()[0]
communityID := community.IDString()
return response.Communities()[0]
}
WaitForAvailableStoreNode(&s.Suite, s.bob, time.Second)
func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(fetchedCommunity *communities.Community, expectedCommunityInfo *communities.Community) {
s.Require().Equal(expectedCommunityInfo.Name(), fetchedCommunity.Name())
s.Require().Equal(expectedCommunityInfo.Identity().Description, fetchedCommunity.Identity().Description)
s.Require().Equal(expectedCommunityInfo.Color(), fetchedCommunity.Color())
s.Require().Equal(expectedCommunityInfo.Tags(), fetchedCommunity.Tags())
}
func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunityInfo *communities.Community) FetchCommunityStats {
fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, true)
s.Require().NoError(err)
s.Require().NotNil(fetchedCommunity)
s.Require().Equal(communityShard.CommunityID, fetchedCommunity.IDString())
if expectedCommunityInfo != nil {
s.requireCommunitiesEqual(fetchedCommunity, expectedCommunityInfo)
}
return stats
}
func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Messenger) {
WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
s.createOwner()
s.createBob()
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
s.waitForAvailableStoreNode(s.bob)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() {
s.createOwner()
s.createBob()
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
// Test consecutive requests to check that requests in manager are finalized
for i := 0; i < 2; i++ {
s.waitForAvailableStoreNode(s.bob)
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
}
func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() {
s.createOwner()
s.createBob()
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
storeNodeRequestsCount := 0
s.bob.storeNodeRequestsManager.onPerformingBatch = func(batch MailserverBatch) {
storeNodeRequestsCount++
}
s.waitForAvailableStoreNode(s.bob)
wg := sync.WaitGroup{}
// Make 2 simultaneous fetch requests, only 1 request to store node is expected
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}()
}
wg.Wait()
s.Require().Equal(1, storeNodeRequestsCount)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestNonExistentCommunity() {
// On test start store node database is empty, so just request any valid community ID.
request := FetchCommunityRequest{
CommunityKey: communityID,
CommunityKey: "0x036dc11a0663f88e15912f0adb68c3c5f68ca0ca7a233f1a88ff923a3d39b2cf07",
Shard: nil,
TryDatabase: false,
WaitForResponse: true,
}
s.createBob()
s.waitForAvailableStoreNode(s.bob)
fetchedCommunity, err := s.bob.FetchCommunity(&request)
s.Require().NoError(err)
s.Require().Nil(fetchedCommunity)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDisconnected() {
s.createOwner()
s.createBob()
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
// WaitForAvailableStoreNode is done internally
s.fetchCommunity(s.bob, community.CommunityShard(), community)
}
// This test is intended to only run locally to test how fast is a big community fetched
// Shouldn't be executed in CI, because it relies on connection to status.prod store nodes.
func (s *MessengerStoreNodeRequestSuite) TestRequestBigCommunity() {
if !runLocalTests {
return
}
// Status CC community
const communityID = "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a"
communityShard := communities.CommunityShard{
CommunityID: communityID,
Shard: nil,
}
wakuLogger := s.logger.Named("user-waku-node")
messengerLogger := s.logger.Named("user-messenger")
wakuV2 := NewWakuV2(&s.Suite, wakuLogger, true, false)
userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)
s.Require().NoError(err)
options := []Option{
WithMailserversDatabase(mailserversDatabase),
WithClusterConfig(params.ClusterConfig{
Fleet: params.FleetStatusProd,
}),
}
// Create bob without `createBob` without func to force status.prod fleet
s.bob, err = newMessengerWithKey(userWaku, privateKey, messengerLogger, options)
s.Require().NoError(err)
fetchedCommunity, stats, err := s.bob.storeNodeRequestsManager.FetchCommunity(communityShard, true)
s.Require().NoError(err)
s.Require().NotNil(fetchedCommunity)
s.Require().Equal(communityID, fetchedCommunity.IDString())
s.Require().Equal(initialStoreNodeRequestPageSize, stats.FetchedEnvelopesCount)
s.Require().Equal(1, stats.FetchedPagesCount)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() {
s.createOwner()
s.createBob()
// Create a community
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
// Push spam to the same ContentTopic & PubsubTopic
// The first requested page size is 1. All subsequent pages are limited to 20.
// We want to test the algorithm, so we push 21 spam envelopes.
for i := 0; i < defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize; i++ {
spamMessage := common.RawMessage{
Payload: RandomBytes(16),
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
PubsubTopic: community.PubsubTopic(),
}
_, err := s.owner.sender.SendPublic(context.Background(), community.IDString(), spamMessage)
s.Require().NoError(err)
}
// Fetch the community
stats := s.fetchCommunity(s.bob, community.CommunityShard(), community)
// Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched.
// First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description,
// then we fetch another page of data and successfully decrypt a community description.
s.Require().Equal(defaultStoreNodeRequestPageSize+initialStoreNodeRequestPageSize+2, stats.FetchedEnvelopesCount)
s.Require().Equal(3, stats.FetchedPagesCount)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() {
s.createOwner()
s.createBob()
// Create a community
s.waitForAvailableStoreNode(s.owner)
community1 := s.createCommunity(s.owner)
community2 := s.createCommunity(s.owner)
description2, err := community2.MarshaledDescription()
s.Require().NoError(err)
// Push community2 description to the same ContentTopic & PubsubTopic as community1.
// This way we simulate 2 communities with same ContentTopic.
spamMessage := common.RawMessage{
Payload: description2,
Sender: community2.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
PubsubTopic: community1.PubsubTopic(),
}
_, err = s.owner.sender.SendPublic(context.Background(), community1.IDString(), spamMessage)
s.Require().NoError(err)
// Fetch the community
s.fetchCommunity(s.bob, community1.CommunityShard(), community1)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestMultipleCommunities() {
s.createOwner()
s.createBob()
// Create 2 communities
s.waitForAvailableStoreNode(s.owner)
community1 := s.createCommunity(s.owner)
community2 := s.createCommunity(s.owner)
fetchedCommunities := map[string]*communities.Community{}
err := WaitOnSignaledCommunityFound(s.bob,
func() {
err := s.bob.fetchCommunities([]communities.CommunityShard{
community1.CommunityShard(),
community2.CommunityShard(),
})
s.Require().NoError(err)
},
func(community *communities.Community) bool {
fetchedCommunities[community.IDString()] = community
return len(fetchedCommunities) == 2
},
1*time.Second,
"communities were not signalled in time",
)
s.Require().NoError(err)
s.Require().Contains(fetchedCommunities, community1.IDString())
s.Require().Contains(fetchedCommunities, community2.IDString())
}
func (s *MessengerStoreNodeRequestSuite) TestRequestWithoutWaitingResponse() {
s.createOwner()
s.createBob()
// Create a community
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
request := FetchCommunityRequest{
CommunityKey: community.IDString(),
Shard: nil,
TryDatabase: false,
WaitForResponse: false,
}
fetchedCommunities := map[string]*communities.Community{}
err := WaitOnSignaledCommunityFound(s.bob,
func() {
fetchedCommunity, err := s.bob.FetchCommunity(&request)
s.Require().NoError(err)
s.Require().Nil(fetchedCommunity)
},
func(community *communities.Community) bool {
fetchedCommunities[community.IDString()] = community
return len(fetchedCommunities) == 1
},
1*time.Second,
"communities weren't signalled",
)
s.Require().NoError(err)
s.Require().Len(fetchedCommunities, 1)
s.Require().Contains(fetchedCommunities, community.IDString())
s.requireCommunitiesEqual(fetchedCommunities[community.IDString()], community)
}

View File

@ -2,12 +2,16 @@ package protocol
import (
"context"
"crypto/rand"
"database/sql"
"errors"
"math/big"
"os"
"sync"
"time"
"golang.org/x/exp/maps"
"go.uber.org/zap"
"github.com/stretchr/testify/suite"
@ -18,7 +22,9 @@ import (
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/t/helpers"
waku2 "github.com/status-im/status-go/wakuv2"
@ -26,6 +32,9 @@ import (
const testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im"
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var hexRunes = []rune("0123456789abcdef")
// WaitOnMessengerResponse Wait until the condition is true or the timeout is reached.
func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) {
response := &MessengerResponse{}
@ -51,7 +60,8 @@ func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bo
type MessengerSignalsHandlerMock struct {
MessengerSignalsHandler
responseChan chan *MessengerResponse
responseChan chan *MessengerResponse
communityFoundChan chan *communities.Community
}
func (m *MessengerSignalsHandlerMock) MessengerResponse(response *MessengerResponse) {
@ -64,6 +74,13 @@ func (m *MessengerSignalsHandlerMock) MessengerResponse(response *MessengerRespo
func (m *MessengerSignalsHandlerMock) MessageDelivered(chatID string, messageID string) {}
func (m *MessengerSignalsHandlerMock) CommunityInfoFound(community *communities.Community) {
select {
case m.communityFoundChan <- community:
default:
}
}
func WaitOnSignaledMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) {
interval := 500 * time.Millisecond
timeoutChan := time.After(10 * time.Second)
@ -103,6 +120,38 @@ func WaitOnSignaledMessengerResponse(m *Messenger, condition func(*MessengerResp
}
}
func WaitOnSignaledCommunityFound(m *Messenger, action func(), condition func(community *communities.Community) bool, timeout time.Duration, errorMessage string) error {
timeoutChan := time.After(timeout)
if m.config.messengerSignalsHandler != nil {
return errors.New("messengerSignalsHandler already provided/mocked")
}
communityFoundChan := make(chan *communities.Community, 1)
m.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{
communityFoundChan: communityFoundChan,
}
defer func() {
m.config.messengerSignalsHandler = nil
}()
// Call the action after setting up the mock
action()
// Wait for condition after
for {
select {
case c := <-communityFoundChan:
if condition(c) {
return nil
}
case <-timeoutChan:
return errors.New("timed out: " + errorMessage)
}
}
}
func FindFirstByContentType(messages []*common.Message, contentType protobuf.ChatMessage_ContentType) *common.Message {
for _, message := range messages {
if message.ContentType == contentType {
@ -200,39 +249,8 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim
}
func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Duration) {
finish := make(chan struct{})
cancel := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
for !m.isActiveMailserverAvailable() {
select {
case <-m.SubscribeMailserverAvailable():
case <-cancel:
return
}
}
}()
go func() {
defer func() {
close(finish)
}()
wg.Wait()
}()
select {
case <-finish:
case <-time.After(timeout):
close(cancel)
}
s.Require().True(m.isActiveMailserverAvailable())
available := m.waitForAvailableStoreNode(timeout)
s.Require().True(available)
}
func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool) *waku2.Waku {
@ -333,3 +351,54 @@ func TearDownMessenger(s *suite.Suite, m *Messenger) {
s.Require().NoError(m.multiAccounts.Close())
}
}
func randomInt(length int) int {
max := big.NewInt(int64(length))
value, err := rand.Int(rand.Reader, max)
if err != nil {
panic(err)
}
return int(value.Int64())
}
func randomString(length int, runes []rune) string {
out := make([]rune, length)
for i := range out {
out[i] = runes[randomInt(len(runes))] // nolint: gosec
}
return string(out)
}
func RandomLettersString(length int) string {
return randomString(length, letterRunes)
}
func RandomColor() string {
return "#" + randomString(6, hexRunes)
}
func RandomCommunityTags(count int) []string {
all := maps.Keys(requests.TagsEmojies)
tags := make([]string, 0, count)
indexes := map[int]struct{}{}
for len(indexes) != count {
index := randomInt(len(all))
indexes[index] = struct{}{}
}
for index := range indexes {
tags = append(tags, all[index])
}
return tags
}
func RandomBytes(length int) []byte {
out := make([]byte, length)
_, err := rand.Read(out)
if err != nil {
panic(err)
}
return out
}

View File

@ -469,7 +469,7 @@ func (t *Transport) createMessagesRequestV1(
topics []types.TopicType,
waitForResponse bool,
) (cursor []byte, err error) {
r := createMessagesRequest(from, to, previousCursor, nil, "", topics)
r := createMessagesRequest(from, to, previousCursor, nil, "", topics, 1000)
events := make(chan types.EnvelopeEvent, 10)
sub := t.waku.SubscribeEnvelopeEvents(events)
@ -502,33 +502,37 @@ func (t *Transport) createMessagesRequestV2(
previousStoreCursor *types.StoreRequestCursor,
pubsubTopic string,
contentTopics []types.TopicType,
limit uint32,
waitForResponse bool,
) (storeCursor *types.StoreRequestCursor, err error) {
r := createMessagesRequest(from, to, nil, previousStoreCursor, pubsubTopic, contentTopics)
processEnvelopes bool,
) (storeCursor *types.StoreRequestCursor, envelopesCount int, err error) {
r := createMessagesRequest(from, to, nil, previousStoreCursor, pubsubTopic, contentTopics, limit)
if waitForResponse {
resultCh := make(chan struct {
storeCursor *types.StoreRequestCursor
err error
storeCursor *types.StoreRequestCursor
envelopesCount int
err error
})
go func() {
storeCursor, err = t.waku.RequestStoreMessages(ctx, peerID, r)
storeCursor, envelopesCount, err = t.waku.RequestStoreMessages(ctx, peerID, r, processEnvelopes)
resultCh <- struct {
storeCursor *types.StoreRequestCursor
err error
}{storeCursor, err}
storeCursor *types.StoreRequestCursor
envelopesCount int
err error
}{storeCursor, envelopesCount, err}
}()
select {
case result := <-resultCh:
return result.storeCursor, result.err
return result.storeCursor, result.envelopesCount, result.err
case <-ctx.Done():
return nil, ctx.Err()
return nil, 0, ctx.Err()
}
} else {
go func() {
_, err = t.waku.RequestStoreMessages(ctx, peerID, r)
_, _, err = t.waku.RequestStoreMessages(ctx, peerID, r, false)
if err != nil {
t.logger.Error("failed to request store messages", zap.Error(err))
}
@ -546,11 +550,13 @@ func (t *Transport) SendMessagesRequestForTopics(
previousStoreCursor *types.StoreRequestCursor,
pubsubTopic string,
contentTopics []types.TopicType,
limit uint32,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
processEnvelopes bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error) {
switch t.waku.Version() {
case 2:
storeCursor, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, waitForResponse)
storeCursor, envelopesCount, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes)
case 1:
cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, contentTopics, waitForResponse)
default:
@ -559,7 +565,7 @@ func (t *Transport) SendMessagesRequestForTopics(
return
}
func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType) types.MessagesRequest {
func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest {
aUUID := uuid.New()
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
id := []byte(hex.EncodeToString(aUUID[:]))
@ -571,7 +577,7 @@ func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.St
ID: id,
From: from,
To: to,
Limit: 1000,
Limit: limit,
Cursor: cursor,
PubsubTopic: pubsubTopic,
ContentTopics: topicBytes,

View File

@ -12,13 +12,16 @@ var registerOnce sync.Once
// MustCreateTestLogger returns a logger based on the passed flags.
func MustCreateTestLogger() *zap.Logger {
cfg := zap.NewDevelopmentConfig()
return MustCreateTestLoggerWithConfig(cfg)
}
func MustCreateTestLoggerWithConfig(cfg zap.Config) *zap.Logger {
registerOnce.Do(func() {
if err := zaputil.RegisterConsoleHexEncoder(); err != nil {
panic(err)
}
})
cfg := zap.NewDevelopmentConfig()
cfg.Encoding = "console-hex"
l, err := cfg.Build()
if err != nil {

View File

@ -669,7 +669,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P
}
return
case env := <-sub[0].Ch:
err := w.OnNewEnvelopes(env, common.RelayedMessageType)
err := w.OnNewEnvelopes(env, common.RelayedMessageType, false)
if err != nil {
w.logger.Error("OnNewEnvelopes error", zap.Error(err))
}
@ -1121,7 +1121,7 @@ func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
return w.node.Store().Query(ctx, query, opts...)
}
func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *storepb.Index, err error) {
func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption, processEnvelopes bool) (cursor *storepb.Index, envelopesCount int, err error) {
requestID := protocol.GenerateRequestID()
opts = append(opts, store.WithRequestID(requestID))
pubsubTopic = w.getPubsubTopic(pubsubTopic)
@ -1131,9 +1131,11 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
if w.onHistoricMessagesRequestFailed != nil {
w.onHistoricMessagesRequestFailed(requestID, peerID, err)
}
return nil, err
return nil, 0, err
}
envelopesCount = len(result.Messages)
for _, msg := range result.Messages {
// Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending
// See https://github.com/vacp2p/rfc/issues/563
@ -1141,9 +1143,10 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic)
w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic))
err = w.OnNewEnvelopes(envelope, common.StoreMessageType)
err = w.OnNewEnvelopes(envelope, common.StoreMessageType, processEnvelopes)
if err != nil {
return nil, err
return nil, 0, err
}
}
@ -1256,7 +1259,7 @@ func (w *Waku) Start() error {
w.filterManager = newFilterManager(w.ctx, w.logger,
func(id string) *common.Filter { return w.GetFilter(id) },
w.settings,
func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType) },
func(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) },
w.node)
w.wg.Add(1)
@ -1270,7 +1273,7 @@ func (w *Waku) Start() error {
numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ {
go w.processQueue()
go w.processQueueLoop()
}
go w.broadcast()
@ -1335,7 +1338,7 @@ func (w *Waku) Stop() error {
return nil
}
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType) error {
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
if envelope == nil {
return nil
}
@ -1358,7 +1361,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
logger.Debug("received new envelope")
trouble := false
_, err := w.add(recvMessage)
_, err := w.add(recvMessage, processImmediately)
if err != nil {
logger.Info("invalid envelope received", zap.Error(err))
trouble = true
@ -1382,7 +1385,7 @@ func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) {
w.poolMu.Unlock()
}
func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) {
func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) (bool, error) {
common.EnvelopesReceivedCounter.Inc()
hash := recvMessage.Hash()
@ -1407,8 +1410,13 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) {
}
if !alreadyCached || !envelope.Processed.Load() {
logger.Debug("waku: posting event")
w.postEvent(recvMessage) // notify the local node about the new message
if processImmediately {
logger.Debug("immediately processing envelope")
w.processReceivedMessage(recvMessage)
} else {
logger.Debug("posting event")
w.postEvent(recvMessage) // notify the local node about the new message
}
}
return true, nil
@ -1419,49 +1427,54 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) {
w.msgQueue <- envelope
}
// processQueue delivers the messages to the watchers during the lifetime of the waku node.
func (w *Waku) processQueue() {
// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node.
func (w *Waku) processQueueLoop() {
for {
select {
case <-w.ctx.Done():
return
case e := <-w.msgQueue:
logger := w.logger.With(
zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())),
zap.String("pubsubTopic", e.PubsubTopic),
zap.String("contentTopic", e.ContentTopic.ContentTopic()),
zap.Int64("timestamp", e.Envelope.Message().GetTimestamp()),
)
if e.MsgType == common.StoreMessageType {
// We need to insert it first, and then remove it if not matched,
// as messages are processed asynchronously
w.storeMsgIDsMu.Lock()
w.storeMsgIDs[e.Hash()] = true
w.storeMsgIDsMu.Unlock()
}
matched := w.filters.NotifyWatchers(e)
// If not matched we remove it
if !matched {
logger.Debug("filters did not match")
w.storeMsgIDsMu.Lock()
delete(w.storeMsgIDs, e.Hash())
w.storeMsgIDsMu.Unlock()
} else {
logger.Debug("filters did match")
e.Processed.Store(true)
}
w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: e.ContentTopic,
Hash: e.Hash(),
Event: common.EventEnvelopeAvailable,
})
w.processReceivedMessage(e)
}
}
}
func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) {
logger := w.logger.With(
zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())),
zap.String("pubsubTopic", e.PubsubTopic),
zap.String("contentTopic", e.ContentTopic.ContentTopic()),
zap.Int64("timestamp", e.Envelope.Message().GetTimestamp()),
)
if e.MsgType == common.StoreMessageType {
// We need to insert it first, and then remove it if not matched,
// as messages are processed asynchronously
w.storeMsgIDsMu.Lock()
w.storeMsgIDs[e.Hash()] = true
w.storeMsgIDsMu.Unlock()
}
matched := w.filters.NotifyWatchers(e)
// If not matched we remove it
if !matched {
logger.Debug("filters did not match")
w.storeMsgIDsMu.Lock()
delete(w.storeMsgIDs, e.Hash())
w.storeMsgIDsMu.Unlock()
} else {
logger.Debug("filters did match")
e.Processed.Store(true)
}
w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: e.ContentTopic,
Hash: e.Hash(),
Event: common.EventEnvelopeAvailable,
})
}
// Envelopes retrieves all the messages currently pooled by the node.
func (w *Waku) Envelopes() []*common.ReceivedMessage {
w.poolMu.RLock()