test: request community from storenode (#4364)

* feat: request community info from storenode test

* shutdownWaitGroup

* fix requestCommunityInfoFromMailserver timestamp roundin
This commit is contained in:
Igor Sirotin 2023-11-25 23:24:20 +00:00 committed by GitHub
parent 2d251e9a08
commit e32c5546e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 444 additions and 245 deletions

View File

@ -1,8 +1,9 @@
package shard
import (
"github.com/status-im/status-go/protocol/protobuf"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/status-im/status-go/protocol/protobuf"
)
type Shard struct {

View File

@ -5,8 +5,6 @@ import (
"crypto/ecdsa"
"encoding/json"
"errors"
"os"
"io/ioutil"
"sync"
"time"
@ -21,7 +19,6 @@ import (
"github.com/status-im/status-go/account"
"github.com/status-im/status-go/account/generator"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts"
@ -29,7 +26,6 @@ import (
"github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
@ -38,7 +34,6 @@ import (
walletToken "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/transactions"
waku "github.com/status-im/status-go/wakuv2"
"github.com/status-im/status-go/walletdatabase"
)
@ -146,83 +141,6 @@ func (c *CollectiblesServiceMock) DeploymentSignatureDigest(chainID uint64, addr
return gethcommon.Hex2Bytes("ccbb375343347491706cf4b43796f7b96ccc89c9e191a8b78679daeba1684ec7"), nil
}
func newWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool) *waku.Waku {
config := &waku.Config{
DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
}
var onPeerStats func(connStatus types.ConnStatus)
var connStatusChan chan struct{}
if !useLocalWaku {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{enrTreeAddress}
config.DiscoveryLimit = 20
config.WakuNodes = []string{enrTreeAddress}
connStatusChan = make(chan struct{})
terminator := sync.Once{}
onPeerStats = func(connStatus types.ConnStatus) {
if connStatus.IsOnline {
terminator.Do(func() {
connStatusChan <- struct{}{}
})
}
}
}
waku, err := waku.New("", "", config, logger, nil, nil, nil, onPeerStats)
s.Require().NoError(err)
s.Require().NoError(waku.Start())
if !useLocalWaku {
select {
case <-time.After(30 * time.Second):
s.Require().Fail("timeout elapsed")
case <-connStatusChan:
// proceed, peers found
close(connStatusChan)
}
}
return waku
}
func createWakuNetwork(s *suite.Suite, parentLogger *zap.Logger, nodeNames []string) []types.Waku {
nodes := make([]*waku.Waku, len(nodeNames))
for i, name := range nodeNames {
logger := parentLogger.With(zap.String("name", name+"-waku"))
node := newWakuV2(s, logger, true)
nodes[i] = node
}
// Setup local network graph
for i := 0; i < len(nodes); i++ {
for j := 0; j < len(nodes); j++ {
if i == j {
continue
}
addrs := nodes[j].ListenAddresses()
s.Require().Greater(len(addrs), 0)
_, err := nodes[i].AddRelayPeer(addrs[0])
s.Require().NoError(err)
err = nodes[i].DialPeer(addrs[0])
s.Require().NoError(err)
}
}
wrappers := make([]types.Waku, len(nodes))
for i, n := range nodes {
wrappers[i] = gethbridge.NewGethWakuV2Wrapper(n)
}
return wrappers
}
func newMessenger(s *suite.Suite, shh types.Waku, logger *zap.Logger, password string, walletAddresses []string,
mockedBalances *map[uint64]map[gethcommon.Address]map[gethcommon.Address]*hexutil.Big, collectiblesService communitytokens.ServiceInterface) *Messenger {
accountsManagerMock := &AccountManagerMock{}

View File

@ -26,7 +26,6 @@ import (
const testChainID1 = 1
const testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im"
const ownerPassword = "123456"
const alicePassword = "qwerty"
const bobPassword = "bob123"
@ -130,7 +129,7 @@ type MessengerCommunitiesTokenPermissionsSuite struct {
func (s *MessengerCommunitiesTokenPermissionsSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()
wakuNodes := createWakuNetwork(&s.Suite, s.logger, []string{"owner", "bob", "alice"})
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "bob", "alice"})
ownerLogger := s.logger.With(zap.String("name", "owner"))
s.ownerWaku = wakuNodes[0]

View File

@ -62,6 +62,7 @@ import (
"github.com/status-im/status-go/services/communitytokens"
ensservice "github.com/status-im/status-go/services/ens"
"github.com/status-im/status-go/services/ext/mailservers"
localnotifications "github.com/status-im/status-go/services/local-notifications"
mailserversDB "github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/services/wallet"
"github.com/status-im/status-go/services/wallet/token"
@ -135,16 +136,16 @@ type Messenger struct {
mailserverCycle mailserverCycle
database *sql.DB
multiAccounts *multiaccounts.Database
mailservers *mailserversDB.Database
settings *accounts.Database
account *multiaccounts.Account
mailserversDatabase *mailserversDB.Database
browserDatabase *browsers.Database
httpServer *server.MediaServer
quit chan struct{}
ctx context.Context
cancel context.CancelFunc
quit chan struct{}
ctx context.Context
cancel context.CancelFunc
shutdownWaitGroup sync.WaitGroup
importingCommunities map[string]bool
importingChannels map[string]bool
@ -160,13 +161,12 @@ type Messenger struct {
requestedContactsLock sync.RWMutex
requestedContacts map[string]*transport.Filter
connectionState connection.State
telemetryClient *telemetry.Client
contractMaker *contracts.ContractMaker
downloadHistoryArchiveTasksWaitGroup sync.WaitGroup
verificationDatabase *verification.Persistence
savedAddressesManager *wallet.SavedAddressesManager
walletAPI *wallet.API
connectionState connection.State
telemetryClient *telemetry.Client
contractMaker *contracts.ContractMaker
verificationDatabase *verification.Persistence
savedAddressesManager *wallet.SavedAddressesManager
walletAPI *wallet.API
// TODO(samyoul) Determine if/how the remaining usage of this mutex can be removed
mutex sync.Mutex
@ -198,6 +198,7 @@ type peerStatus struct {
}
type mailserverCycle struct {
sync.RWMutex
allMailservers []mailserversDB.Mailserver
activeMailserver *mailserversDB.Mailserver
peers map[string]peerStatus
events chan *p2p.PeerEvent
@ -480,8 +481,6 @@ func NewMessenger(
return nil, err
}
mailservers := mailserversDB.NewDB(database)
savedAddressesManager := wallet.NewSavedAddressesManager(c.walletDb)
selfContact, err := buildSelfContact(identity, settings, c.multiAccount, c.account)
@ -529,7 +528,6 @@ func NewMessenger(
settings: settings,
peerStore: peerStore,
verificationDatabase: verification.NewPersistence(database),
mailservers: mailservers,
mailserverCycle: mailserverCycle{
peers: make(map[string]peerStatus),
availabilitySubscriptions: make([]chan struct{}, 0),
@ -838,7 +836,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
}
response.Mailservers = mailservers
err = m.StartMailserverCycle()
err = m.StartMailserverCycle(mailservers)
if err != nil {
return nil, err
}
@ -1444,13 +1442,7 @@ func (m *Messenger) handleENSVerified(records []*ens.VerificationRecord) {
}
}
m.logger.Info("calling on contacts")
if m.config.onContactENSVerified != nil {
m.logger.Info("called on contacts")
response := &MessengerResponse{Contacts: contacts}
m.config.onContactENSVerified(response)
}
m.PublishMessengerResponse(&MessengerResponse{Contacts: contacts})
}
func (m *Messenger) handleENSVerificationSubscription(c chan []*ens.VerificationRecord) {
@ -1892,7 +1884,7 @@ func (m *Messenger) Init() error {
func (m *Messenger) Shutdown() (err error) {
close(m.quit)
m.cancel()
m.downloadHistoryArchiveTasksWaitGroup.Wait()
m.shutdownWaitGroup.Wait()
for i, task := range m.shutdownTasks {
m.logger.Debug("running shutdown task", zap.Int("n", i))
if tErr := task(); tErr != nil {
@ -3279,6 +3271,44 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
return m.handleRetrievedMessages(chatWithMessages, true, false)
}
func (m *Messenger) StartRetrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) {
m.shutdownWaitGroup.Add(1)
go func() {
defer m.shutdownWaitGroup.Done()
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.ProcessAllMessages()
case <-cancel:
return
}
}
}()
}
func (m *Messenger) ProcessAllMessages() {
response, err := m.RetrieveAll()
if err != nil {
m.logger.Error("failed to retrieve raw messages", zap.Error(err))
return
}
m.PublishMessengerResponse(response)
}
func (m *Messenger) PublishMessengerResponse(response *MessengerResponse) {
if response.IsEmpty() {
return
}
notifications := response.Notifications()
// Clear notifications as not used for now
response.ClearNotifications()
signal.SendNewMessages(response)
localnotifications.PushMessages(notifications)
}
func (m *Messenger) GetStats() types.StatsSummary {
return m.transport.GetStats()
}

View File

@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"errors"
"fmt"
"math"
"sync"
"time"
@ -2589,6 +2590,11 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard
}
filter = filters[0]
m.requestedCommunities[communityID] = filter
m.logger.Debug("created filter for community",
zap.String("filterID", filter.FilterID),
zap.String("communityID", communityID),
zap.String("PubsubTopic", filter.PubsubTopic),
)
} else {
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[communityID] = nil
@ -2596,7 +2602,7 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard
defer m.forgetCommunityRequest(communityID)
to := uint32(m.transport.GetCurrentTime() / 1000)
to := uint32(math.Ceil(float64(m.GetCurrentTimeInMillis()) / 1000))
from := to - oneMonthInSeconds
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
@ -2624,6 +2630,9 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard
return nil, nil
}
// TODO: We can force to process all messages then we don't have to wait?
//m.ProcessAllMessages()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

View File

@ -68,11 +68,6 @@ type MessengerSignalsHandler interface {
}
type config struct {
// This needs to be exposed until we move here mailserver logic
// as otherwise the client is not notified of a new filter and
// won't be pulling messages from mailservers until it reloads the chats/filters
onContactENSVerified func(*MessengerResponse)
// systemMessagesTranslations holds translations for system-messages
systemMessagesTranslations *systemMessageTranslationsMap
// Config for the envelopes monitor
@ -283,9 +278,8 @@ func WithSignalsHandler(h MessengerSignalsHandler) Option {
}
}
func WithENSVerificationConfig(onENSVerified func(*MessengerResponse), url, address string) Option {
func WithENSVerificationConfig(url, address string) Option {
return func(c *config) error {
c.onContactENSVerified = onENSVerified
c.verifyENSURL = url
c.verifyENSContractAddress = address
return nil

View File

@ -1333,8 +1333,8 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
defer task.Waiter.Done()
// this wait groups tracks all ongoing tasks across communities
m.downloadHistoryArchiveTasksWaitGroup.Add(1)
defer m.downloadHistoryArchiveTasksWaitGroup.Done()
m.shutdownWaitGroup.Add(1)
defer m.shutdownWaitGroup.Done()
m.downloadAndImportHistoryArchives(communityID, magnetlink, task.CancelChan)
}(currentTask, id)
@ -1646,8 +1646,8 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
task.Waiter.Add(1)
defer task.Waiter.Done()
m.downloadHistoryArchiveTasksWaitGroup.Add(1)
defer m.downloadHistoryArchiveTasksWaitGroup.Done()
m.shutdownWaitGroup.Add(1)
defer m.shutdownWaitGroup.Done()
m.downloadAndImportHistoryArchives(community.ID(), magnetlink, task.CancelChan)
}(currentTask)

View File

@ -3,6 +3,7 @@ package protocol
import (
"context"
"crypto/rand"
"fmt"
"math"
"math/big"
"sort"
@ -59,20 +60,34 @@ func (m *Messenger) activeMailserverID() ([]byte, error) {
return m.mailserverCycle.activeMailserver.IDBytes()
}
func (m *Messenger) StartMailserverCycle() error {
func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error {
m.mailserverCycle.allMailservers = mailservers
if m.server == nil {
m.logger.Warn("not starting mailserver cycle")
return nil
version := m.transport.WakuVersion()
switch version {
case 1:
if m.server == nil {
m.logger.Warn("not starting mailserver cycle")
return nil
}
m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20)
m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events)
go m.updateWakuV1PeerStatus()
case 2:
go m.updateWakuV2PeerStatus()
default:
return fmt.Errorf("unsupported waku version: %d", version)
}
m.logger.Debug("started mailserver cycle")
m.logger.Debug("starting mailserver cycle",
zap.Uint("WakuVersion", m.transport.WakuVersion()),
zap.Any("mailservers", mailservers),
)
m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20)
m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events)
go m.updateWakuV1PeerStatus()
go m.updateWakuV2PeerStatus()
return nil
}
@ -169,27 +184,41 @@ func (m *Messenger) getFleet() (string, error) {
}
func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) {
// Append user mailservers
// Get configured fleet
fleet, err := m.getFleet()
if err != nil {
return nil, err
}
// Get default mailservers for given fleet
allMailservers := m.mailserversByFleet(fleet)
customMailservers, err := m.mailservers.Mailservers()
if err != nil {
return nil, err
}
// Add custom configured mailservers
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return nil, err
}
for _, c := range customMailservers {
if c.Fleet == fleet {
c.Version = m.transport.WakuVersion()
allMailservers = append(allMailservers, c)
for _, c := range customMailservers {
if c.Fleet == fleet {
c.Version = m.transport.WakuVersion()
allMailservers = append(allMailservers, c)
}
}
}
return allMailservers, nil
// Filter mailservers by configured waku version
wakuVersion := m.transport.WakuVersion()
matchingMailservers := make([]mailservers.Mailserver, 0, len(allMailservers))
for _, ms := range allMailservers {
if ms.Version == wakuVersion {
matchingMailservers = append(matchingMailservers, ms)
}
}
return matchingMailservers, nil
}
type SortedMailserver struct {
@ -208,24 +237,7 @@ func (m *Messenger) findNewMailserver() error {
return m.connectToMailserver(*pinnedMailserver)
}
// Append user mailservers
fleet, err := m.getFleet()
if err != nil {
return err
}
allMailservers := m.mailserversByFleet(fleet)
customMailservers, err := m.mailservers.Mailservers()
if err != nil {
return err
}
for _, c := range customMailservers {
if c.Fleet == fleet {
allMailservers = append(allMailservers, c)
}
}
allMailservers := m.mailserverCycle.allMailservers
m.logger.Info("Finding a new mailserver...")
@ -340,6 +352,10 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
return err
}
if ms.Version != m.transport.WakuVersion() {
return errors.New("mailserver waku version doesn't match")
}
if activeMailserverStatus != connected {
// Attempt to connect to mailserver by adding it as a peer
@ -442,16 +458,12 @@ func (m *Messenger) penalizeMailserver(id string) {
}
func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) error {
m.logger.Debug("connected peers", zap.Any("connected", connectedPeers))
m.logger.Debug("peers info", zap.Any("peer-info", m.mailserverCycle.peers))
m.logger.Debug("mailserver cycle event",
zap.Any("connected", connectedPeers),
zap.Any("peer-info", m.mailserverCycle.peers))
m.mailPeersMutex.Lock()
allMailservers, err := m.allMailservers()
if err != nil {
return err
}
for pID, pInfo := range m.mailserverCycle.peers {
if pInfo.status == disconnected {
continue
@ -461,7 +473,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
found := false
for _, connectedPeer := range connectedPeers {
id, err := m.mailserverAddressToID(connectedPeer.UniqueID, allMailservers)
id, err := m.mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers)
if err != nil {
m.logger.Error("failed to convert id to hex", zap.Error(err))
return err
@ -487,7 +499,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
// not available error
if m.mailserverCycle.activeMailserver != nil {
for _, connectedPeer := range connectedPeers {
id, err := m.mailserverAddressToID(connectedPeer.UniqueID, allMailservers)
id, err := m.mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers)
if err != nil {
m.logger.Error("failed to convert id to hex", zap.Error(err))
return err
@ -562,12 +574,6 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
}
func (m *Messenger) updateWakuV1PeerStatus() {
if m.transport.WakuVersion() != 1 {
m.logger.Debug("waku version not 1, returning")
return
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
@ -609,11 +615,6 @@ func (m *Messenger) updateWakuV1PeerStatus() {
}
func (m *Messenger) updateWakuV2PeerStatus() {
if m.transport.WakuVersion() != 2 {
m.logger.Debug("waku version not 2, returning")
return
}
connSubscription, err := m.transport.SubscribeToConnStatusChanges()
if err != nil {
m.logger.Error("Could not subscribe to connection status changes", zap.Error(err))
@ -643,8 +644,6 @@ func (m *Messenger) updateWakuV2PeerStatus() {
}
case <-m.quit:
close(m.mailserverCycle.events)
m.mailserverCycle.subscription.Unsubscribe()
connSubscription.Unsubscribe()
return
}
@ -667,11 +666,6 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
return nil, nil
}
customMailservers, err := m.mailservers.Mailservers()
if err != nil {
return nil, err
}
fleetMailservers := mailservers.DefaultMailservers()
for _, c := range fleetMailservers {
@ -680,10 +674,17 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
}
}
for _, c := range customMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
c.Version = m.transport.WakuVersion()
return &c, nil
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return nil, err
}
for _, c := range customMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
c.Version = m.transport.WakuVersion()
return &c, nil
}
}
}

View File

@ -91,7 +91,7 @@ type MessengerMessagesTrackingSuite struct {
func (s *MessengerMessagesTrackingSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()
wakuNodes := createWakuNetwork(&s.Suite, s.logger, []string{"bob", "alice"})
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"bob", "alice"})
s.bobWaku = wakuNodes[0]
s.bob, s.bobInterceptor = s.newMessenger(s.bobWaku, s.logger.With(zap.String("name", "bob")))

View File

@ -0,0 +1,140 @@
package protocol
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/t/helpers"
mailserversDB "github.com/status-im/status-go/services/mailservers"
waku2 "github.com/status-im/status-go/wakuv2"
)
const (
localFleet = "local-test-fleet-1"
localMailserverID = "local-test-mailserver"
)
func TestMessengerStoreNodeRequestSuite(t *testing.T) {
suite.Run(t, new(MessengerStoreNodeRequestSuite))
}
type MessengerStoreNodeRequestSuite struct {
suite.Suite
owner *Messenger
bob *Messenger
wakuStoreNode *waku2.Waku
ownerWaku types.Waku
bobWaku types.Waku
logger *zap.Logger
}
func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress string) *Messenger {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)
err = mailserversDatabase.Add(mailserversDB.Mailserver{
ID: localMailserverID,
Name: localMailserverID,
Address: mailserverAddress,
Fleet: localFleet,
})
s.Require().NoError(err)
options := []Option{
WithMailserversDatabase(mailserversDatabase),
WithClusterConfig(params.ClusterConfig{
Fleet: localFleet,
}),
}
messenger, err := newMessengerWithKey(shh, privateKey, logger, options)
s.Require().NoError(err)
return messenger
}
func (s *MessengerStoreNodeRequestSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()
// Create store node
storeNodeLogger := s.logger.With(zap.String("name", "store-node-waku"))
s.wakuStoreNode = NewWakuV2(&s.Suite, storeNodeLogger, true, true)
storeNodeListenAddresses := s.wakuStoreNode.ListenAddresses()
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
storeNodeAddress := storeNodeListenAddresses[0]
s.logger.Info("store node ready", zap.String("address", storeNodeAddress))
// Create community owner
ownerWakuLogger := s.logger.With(zap.String("name", "owner-waku"))
s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, ownerWakuLogger, true, false))
ownerLogger := s.logger.With(zap.String("name", "owner"))
s.owner = s.newMessenger(s.ownerWaku, ownerLogger, storeNodeAddress)
// Create an independent user
bobWakuLogger := s.logger.With(zap.String("name", "owner-waku"))
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(NewWakuV2(&s.Suite, bobWakuLogger, true, false))
bobLogger := s.logger.With(zap.String("name", "bob"))
s.bob = s.newMessenger(s.bobWaku, bobLogger, storeNodeAddress)
s.bob.StartRetrieveMessagesLoop(time.Second, nil)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
WaitForAvailableStoreNode(&s.Suite, s.owner, time.Second)
createCommunityRequest := &requests.CreateCommunity{
Name: "panda-lovers",
Description: "we love pandas",
Membership: protobuf.CommunityPermissions_AUTO_ACCEPT,
Color: "#ff0000",
Tags: []string{"Web3"},
}
response, err := s.owner.CreateCommunity(createCommunityRequest, true)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
community := response.Communities()[0]
communityID := community.IDString()
WaitForAvailableStoreNode(&s.Suite, s.bob, time.Second)
request := FetchCommunityRequest{
CommunityKey: communityID,
Shard: nil,
TryDatabase: false,
WaitForResponse: true,
}
fetchedCommunity, err := s.bob.FetchCommunity(&request)
s.Require().NoError(err)
s.Require().NotNil(fetchedCommunity)
s.Require().Equal(communityID, fetchedCommunity.IDString())
}

View File

@ -10,8 +10,6 @@ import (
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/protobuf"
localnotifications "github.com/status-im/status-go/services/local-notifications"
"github.com/status-im/status-go/signal"
)
type RawMessageHandler func(ctx context.Context, rawMessage common.RawMessage) (common.RawMessage, error)
@ -313,17 +311,6 @@ func (m *Messenger) HandleSyncRawMessages(rawMessages []*protobuf.RawMessage) er
if err != nil {
return err
}
publishMessengerResponse(response)
m.PublishMessengerResponse(response)
return nil
}
// this is a copy implementation of the one in ext/service.go, we should refactor this?
func publishMessengerResponse(response *MessengerResponse) {
if !response.IsEmpty() {
notifications := response.Notifications()
// Clear notifications as not used for now
response.ClearNotifications()
signal.SendNewMessages(response)
localnotifications.PushMessages(notifications)
}
}

View File

@ -2,17 +2,30 @@ package protocol
import (
"context"
"database/sql"
"errors"
"os"
"sync"
"time"
"go.uber.org/zap"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/appdatabase"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/t/helpers"
waku2 "github.com/status-im/status-go/wakuv2"
)
const testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im"
// WaitOnMessengerResponse Wait until the condition is true or the timeout is reached.
func WaitOnMessengerResponse(m *Messenger, condition func(*MessengerResponse) bool, errorMessage string) (*MessengerResponse, error) {
response := &MessengerResponse{}
@ -185,3 +198,128 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim
s.Require().True(ok)
}
func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Duration) {
finish := make(chan struct{})
cancel := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
for !m.isActiveMailserverAvailable() {
select {
case <-m.SubscribeMailserverAvailable():
case <-cancel:
return
}
}
}()
go func() {
defer func() {
close(finish)
}()
wg.Wait()
}()
select {
case <-finish:
case <-time.After(timeout):
close(cancel)
}
s.Require().True(m.isActiveMailserverAvailable())
}
func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStore bool) *waku2.Waku {
wakuConfig := &waku2.Config{
DefaultShardPubsubTopic: relay.DefaultWakuTopic, // shard.DefaultShardPubsubTopic(),
}
var onPeerStats func(connStatus types.ConnStatus)
var connStatusChan chan struct{}
var db *sql.DB
if !useLocalWaku {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
wakuConfig.EnableDiscV5 = true
wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress}
wakuConfig.DiscoveryLimit = 20
wakuConfig.WakuNodes = []string{enrTreeAddress}
connStatusChan = make(chan struct{})
terminator := sync.Once{}
onPeerStats = func(connStatus types.ConnStatus) {
if connStatus.IsOnline {
terminator.Do(func() {
connStatusChan <- struct{}{}
})
}
}
}
if enableStore {
var err error
db, err = helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
wakuConfig.EnableStore = true
wakuConfig.StoreCapacity = 200
wakuConfig.StoreSeconds = 200
}
wakuNode, err := waku2.New("", "", wakuConfig, logger, db, nil, nil, onPeerStats)
s.Require().NoError(err)
s.Require().NoError(wakuNode.Start())
if !useLocalWaku {
select {
case <-time.After(30 * time.Second):
s.Require().Fail("timeout elapsed")
case <-connStatusChan:
// proceed, peers found
close(connStatusChan)
}
}
return wakuNode
}
func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []string) []types.Waku {
nodes := make([]*waku2.Waku, len(nodeNames))
for i, name := range nodeNames {
logger := parentLogger.With(zap.String("name", name+"-waku"))
wakuNode := NewWakuV2(s, logger, true, false)
nodes[i] = wakuNode
}
// Setup local network graph
for i := 0; i < len(nodes); i++ {
for j := 0; j < len(nodes); j++ {
if i == j {
continue
}
addrs := nodes[j].ListenAddresses()
s.Require().Greater(len(addrs), 0)
_, err := nodes[i].AddRelayPeer(addrs[0])
s.Require().NoError(err)
err = nodes[i].DialPeer(addrs[0])
s.Require().NoError(err)
}
}
wrappers := make([]types.Waku, len(nodes))
for i, n := range nodes {
wrappers[i] = gethbridge.NewGethWakuV2Wrapper(n)
}
return wrappers
}

View File

@ -562,7 +562,12 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
f.filters[chatID] = chat
f.logger.Debug("registering filter for", zap.String("chatID", chatID), zap.String("type", "public"), zap.String("topic", filterAndTopic.Topic.String()))
f.logger.Debug("registering filter for",
zap.String("chatID", chatID),
zap.String("type", "public"),
zap.String("ContentTopic", filterAndTopic.Topic.String()),
zap.String("PubsubTopic", pubsubTopic),
)
return chat, nil
}

View File

@ -51,7 +51,6 @@ import (
"github.com/status-im/status-go/services/browsers"
"github.com/status-im/status-go/services/communitytokens"
"github.com/status-im/status-go/services/ext/mailservers"
localnotifications "github.com/status-im/status-go/services/local-notifications"
mailserversDB "github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/services/wallet"
w_common "github.com/status-im/status-go/services/wallet/common"
@ -194,7 +193,7 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) {
if err != nil {
return nil, err
}
go s.retrieveMessagesLoop(time.Second, s.cancelMessenger)
s.messenger.StartRetrieveMessagesLoop(time.Second, s.cancelMessenger)
go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger)
if s.config.ShhextConfig.BandwidthStatsEnabled {
@ -204,39 +203,6 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) {
return response, nil
}
func publishMessengerResponse(response *protocol.MessengerResponse) {
if !response.IsEmpty() {
notifications := response.Notifications()
// Clear notifications as not used for now
response.ClearNotifications()
PublisherSignalHandler{}.NewMessages(response)
localnotifications.PushMessages(notifications)
}
}
func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) {
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// We might be shutting down here
if s.messenger == nil {
return
}
response, err := s.messenger.RetrieveAll()
if err != nil {
log.Error("failed to retrieve raw messages", "err", err)
continue
}
publishMessengerResponse(response)
case <-cancel:
return
}
}
}
func (s *Service) retrieveStats(tick time.Duration, cancel <-chan struct{}) {
ticker := time.NewTicker(tick)
defer ticker.Stop()
@ -346,7 +312,7 @@ func (s *Service) verifyTransactionLoop(tick time.Duration, cancel <-chan struct
log.Error("failed to validate transactions", "err", err)
continue
}
publishMessengerResponse(response)
s.messenger.PublishMessengerResponse(response)
case <-cancel:
cancelVerifyTransaction()
@ -436,7 +402,7 @@ func buildMessengerOptions(
protocol.WithBrowserDatabase(browsers.NewDB(appDb)),
protocol.WithEnvelopesMonitorConfig(envelopesMonitorConfig),
protocol.WithSignalsHandler(messengerSignalsHandler),
protocol.WithENSVerificationConfig(publishMessengerResponse, config.ShhextConfig.VerifyENSURL, config.ShhextConfig.VerifyENSContractAddress),
protocol.WithENSVerificationConfig(config.ShhextConfig.VerifyENSURL, config.ShhextConfig.VerifyENSContractAddress),
protocol.WithClusterConfig(config.ClusterConfig),
protocol.WithTorrentConfig(&config.TorrentConfig),
protocol.WithHTTPServer(httpServer),

View File

@ -19,8 +19,9 @@
package wakuv2
import (
"github.com/status-im/status-go/wakuv2/common"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/wakuv2/common"
)
// Config represents the configuration state of a waku node.

View File

@ -1324,7 +1324,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
w.statusTelemetryClient.PushReceivedEnvelope(envelope)
}
logger := w.logger.With(zap.String("hash", recvMessage.Hash().Hex()), zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp))
logger := w.logger.With(
zap.String("envelopeHash", hexutil.Encode(envelope.Hash())),
zap.String("contentTopic", envelope.Message().ContentTopic),
zap.Int64("timestamp", envelope.Message().Timestamp),
)
logger.Debug("received new envelope")
trouble := false
@ -1396,7 +1401,12 @@ func (w *Waku) processQueue() {
case <-w.ctx.Done():
return
case e := <-w.msgQueue:
logger := w.logger.With(zap.String("hash", e.Hash().String()), zap.String("contentTopic", e.ContentTopic.ContentTopic()), zap.Int64("timestamp", e.Envelope.Message().Timestamp))
logger := w.logger.With(
zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())),
zap.String("pubsubTopic", e.PubsubTopic),
zap.String("contentTopic", e.ContentTopic.ContentTopic()),
zap.Int64("timestamp", e.Envelope.Message().Timestamp),
)
if e.MsgType == common.StoreMessageType {
// We need to insert it first, and then remove it if not matched,
// as messages are processed asynchronously