From 05e3a35bf7e6068e9d60b0567b3afd7204f4fd4f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 21 Aug 2024 14:29:13 -0400 Subject: [PATCH] refactor_: storenode cycle and allow ENRs and multiaddresses --- cmd/node-canary/main.go | 131 +------ eth-node/bridge/geth/waku.go | 4 + eth-node/bridge/geth/wakuv2.go | 4 + eth-node/types/waku.go | 3 + protocol/messenger.go | 22 +- protocol/messenger_mailserver.go | 59 +-- protocol/messenger_mailserver_cycle.go | 347 ++++++++---------- ..._mailserver_processMailserverBatch_test.go | 33 +- protocol/messenger_peers.go | 3 +- protocol/messenger_storenode_comunity_test.go | 5 +- protocol/messenger_storenode_request_test.go | 2 +- protocol/messenger_testing_utils.go | 1 + protocol/storenodes/models.go | 1 + protocol/storenodes/storenodes.go | 4 +- protocol/storenodes/storenodes_test.go | 4 +- protocol/transport/transport.go | 10 +- services/ext/api.go | 2 +- services/mailservers/database.go | 3 + services/mailservers/fleet.go | 10 +- services/mailservers/tcp_ping.go | 152 -------- services/wakuext/api.go | 46 --- services/wakuext/api_test.go | 106 ------ waku/waku_version_test.go | 14 - wakuv2/waku.go | 14 + 24 files changed, 276 insertions(+), 704 deletions(-) delete mode 100644 services/mailservers/tcp_ping.go diff --git a/cmd/node-canary/main.go b/cmd/node-canary/main.go index 7d69422f5..70620c0ca 100644 --- a/cmd/node-canary/main.go +++ b/cmd/node-canary/main.go @@ -2,16 +2,13 @@ package main import ( - "errors" "flag" - "fmt" stdlog "log" "os" "path" "path/filepath" "time" - "golang.org/x/crypto/sha3" "golang.org/x/crypto/ssh/terminal" "github.com/ethereum/go-ethereum/log" @@ -19,32 +16,22 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/api" - "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/t/helpers" ) -const ( - mailboxPassword = "status-offline-inbox" -) - // All general log messages in this package should be routed through this logger. var logger = log.New("package", "status-go/cmd/node-canary") var ( - staticEnodeAddr = flag.String("staticnode", "", "checks if static node talks waku protocol (e.g. enode://abc123@1.2.3.4:30303)") - mailserverEnodeAddr = flag.String("mailserver", "", "queries mail server for historic messages (e.g. enode://123abc@4.3.2.1:30504)") - publicChannel = flag.String("channel", "status", "The public channel name to retrieve historic messages from (used with 'mailserver' flag)") - timeout = flag.Int("timeout", 10, "Timeout when connecting to node or fetching messages from mailserver, in seconds") - period = flag.Int("period", 24*60*60, "How far in the past to request messages from mailserver, in seconds") - minPow = flag.Float64("waku.pow", params.WakuMinimumPoW, "PoW for messages to be added to queue, in float format") - ttl = flag.Int("waku.ttl", params.WakuTTL, "Time to live for messages, in seconds") - homePath = flag.String("home-dir", ".", "Home directory where state is stored") - logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`) - logFile = flag.String("logfile", "", "Path to the log file") - logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors") + staticEnodeAddr = flag.String("staticnode", "", "checks if static node talks waku protocol (e.g. enode://abc123@1.2.3.4:30303)") + minPow = flag.Float64("waku.pow", params.WakuMinimumPoW, "PoW for messages to be added to queue, in float format") + ttl = flag.Int("waku.ttl", params.WakuTTL, "Time to live for messages, in seconds") + homePath = flag.String("home-dir", ".", "Home directory where state is stored") + logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`) + logFile = flag.String("logfile", "", "Path to the log file") + logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors") ) func main() { @@ -180,107 +167,3 @@ func startClientNode() (*api.GethStatusBackend, error) { } return clientBackend, err } - -func joinPublicChat(w types.Waku, rpcClient *rpc.Client, name string) (string, types.TopicType, string, error) { - keyID, err := w.AddSymKeyFromPassword(name) - if err != nil { - return "", types.TopicType{}, "", err - } - - h := sha3.NewLegacyKeccak256() - _, err = h.Write([]byte(name)) - if err != nil { - return "", types.TopicType{}, "", err - } - fullTopic := h.Sum(nil) - topic := types.BytesToTopic(fullTopic) - - wakuAPI := w.PublicWakuAPI() - filterID, err := wakuAPI.NewMessageFilter(types.Criteria{SymKeyID: keyID, Topics: []types.TopicType{topic}}) - - return keyID, topic, filterID, err -} - -func waitForMailServerRequestSent(events chan types.EnvelopeEvent, requestID types.Hash, timeout time.Duration) error { - timeoutTimer := time.NewTimer(timeout) - for { - select { - case event := <-events: - if event.Hash == requestID && event.Event == types.EventMailServerRequestSent { - timeoutTimer.Stop() - return nil - } - case <-timeoutTimer.C: - return errors.New("timed out waiting for mailserver request sent") - } - } -} - -func waitForMailServerResponse(events chan types.EnvelopeEvent, requestID types.Hash, timeout time.Duration) (*types.MailServerResponse, error) { - timeoutTimer := time.NewTimer(timeout) - for { - select { - case event := <-events: - if event.Hash == requestID { - resp, err := decodeMailServerResponse(event) - if resp != nil || err != nil { - timeoutTimer.Stop() - return resp, err - } - } - case <-timeoutTimer.C: - return nil, errors.New("timed out waiting for mailserver response") - } - } -} - -func decodeMailServerResponse(event types.EnvelopeEvent) (*types.MailServerResponse, error) { - switch event.Event { - case types.EventMailServerRequestSent: - return nil, nil - case types.EventMailServerRequestCompleted: - resp, ok := event.Data.(*types.MailServerResponse) - if !ok { - return nil, errors.New("failed to convert event to a *MailServerResponse") - } - - return resp, nil - case types.EventMailServerRequestExpired: - return nil, errors.New("no messages available from mailserver") - default: - return nil, fmt.Errorf("unexpected event type: %v", event.Event) - } -} - -func waitForEnvelopeEvents(events chan types.EnvelopeEvent, hashes []string, event types.EventType) error { - check := make(map[string]struct{}) - for _, hash := range hashes { - check[hash] = struct{}{} - } - - timeout := time.NewTimer(time.Second * 5) - for { - select { - case e := <-events: - if e.Event == event { - delete(check, e.Hash.String()) - if len(check) == 0 { - timeout.Stop() - return nil - } - } - case <-timeout.C: - return fmt.Errorf("timed out while waiting for event on envelopes. event: %s", event) - } - } -} - -// helper for checking LastEnvelopeHash -func isEmptyEnvelope(hash types.Hash) bool { - for _, b := range hash { - if b != 0 { - return false - } - } - return true -} diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index b3060badd..83b872724 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -318,3 +318,7 @@ func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) { func (w *GethWakuWrapper) PeerID() peer.ID { panic("not implemented") } + +func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) { + return 0, errors.New("not available in WakuV1") +} diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index eb30b51e7..750790be4 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -342,3 +342,7 @@ func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) { func (w *gethWakuV2Wrapper) PeerID() peer.ID { return w.waku.PeerID() } + +func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + return w.waku.PingPeer(ctx, peerID) +} diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index bb41f536a..1900505f3 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -199,4 +199,7 @@ type Waku interface { // PeerID returns node's PeerID PeerID() peer.ID + + // PingPeer returns the reply time + PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) } diff --git a/protocol/messenger.go b/protocol/messenger.go index 12ed26182..0ec7c88ab 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -25,7 +25,6 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/account" @@ -139,7 +138,7 @@ type Messenger struct { allInstallations *installationMap modifiedInstallations *stringBoolMap installationID string - storenodeCycle storenodeCycle + mailserverCycle mailserverCycle communityStorenodes *storenodes.CommunityStorenodes database *sql.DB multiAccounts *multiaccounts.Database @@ -203,7 +202,6 @@ type connStatus int const ( disconnected connStatus = iota + 1 - connecting connected ) @@ -211,15 +209,13 @@ type peerStatus struct { status connStatus canConnectAfter time.Time lastConnectionAttempt time.Time - storenode mailserversDB.Mailserver + mailserver mailserversDB.Mailserver } -type storenodeCycle struct { +type mailserverCycle struct { sync.RWMutex - allStorenodes []mailserversDB.Mailserver - activeStorenode *mailserversDB.Mailserver + allMailservers []mailserversDB.Mailserver + activeMailserver *mailserversDB.Mailserver peers map[string]peerStatus - events chan *p2p.PeerEvent - subscription event.Subscription availabilitySubscriptions []chan struct{} } @@ -601,7 +597,7 @@ func NewMessenger( peerStore: peerStore, mvdsStatusChangeEvent: make(chan datasyncnode.PeerStatusChangeEvent, 5), verificationDatabase: verification.NewPersistence(database), - storenodeCycle: storenodeCycle{ + mailserverCycle: mailserverCycle{ peers: make(map[string]peerStatus), availabilitySubscriptions: make([]chan struct{}, 0), }, @@ -860,13 +856,13 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } response := &MessengerResponse{} - storenodes, err := m.allStorenodes() + mailservers, err := m.allMailservers() if err != nil { return nil, err } - response.Mailservers = storenodes - err = m.StartStorenodeCycle(storenodes) + response.Mailservers = mailservers + err = m.StartMailserverCycle(mailservers) if err != nil { return nil, err } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index db38a1672..7c423dcab 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -43,7 +43,7 @@ var ErrNoFiltersForChat = errors.New("no filter registered for given chat") func (m *Messenger) shouldSync() (bool, error) { // TODO (pablo) support community store node as well - if m.storenodeCycle.activeStorenode == nil || !m.Online() { + if m.mailserverCycle.activeMailserver == nil || !m.Online() { return false, nil } @@ -91,18 +91,18 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) { func (m *Messenger) connectToNewMailserverAndWait() error { // Handle pinned mailservers - m.logger.Info("disconnecting storenode") + m.logger.Info("disconnecting mailserver") pinnedMailserver, err := m.getPinnedMailserver() if err != nil { - m.logger.Error("could not obtain the pinned storenode", zap.Error(err)) + m.logger.Error("could not obtain the pinned mailserver", zap.Error(err)) return err } // If pinned mailserver is not nil, no need to disconnect and wait for it to be available if pinnedMailserver == nil { - m.disconnectActiveStorenode(graylistBackoff) + m.disconnectActiveMailserver(graylistBackoff) } - return m.findNewStorenode() + return m.findNewMailserver() } func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func(mailServer mailservers.Mailserver) (*MessengerResponse, error)) (*MessengerResponse, error) { @@ -110,27 +110,27 @@ func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func return nil, errors.New("mailserver not available") } - m.storenodeCycle.RLock() - defer m.storenodeCycle.RUnlock() + m.mailserverCycle.RLock() + defer m.mailserverCycle.RUnlock() var tries uint = 0 for tries < mailserverMaxTries { if !m.communityStorenodes.IsCommunityStoreNode(ms.ID) && !m.isMailserverAvailable(ms.ID) { return nil, errors.New("storenode not available") } - m.logger.Info("trying performing store requests", zap.Uint("try", tries), zap.String("storenodeID", ms.ID)) + m.logger.Info("trying performing mailserver requests", zap.Uint("try", tries), zap.String("mailserverID", ms.ID)) // Peform request response, err := fn(*ms) // pass by value because we don't want the fn to modify the mailserver if err == nil { // Reset failed requests - m.logger.Debug("storenode request performed successfully", - zap.String("storenodeID", ms.ID)) + m.logger.Debug("mailserver request performed successfully", + zap.String("mailserverID", ms.ID)) ms.FailedRequests = 0 return response, nil } - m.logger.Error("failed to perform store request", - zap.String("storenodeID", ms.ID), + m.logger.Error("failed to perform mailserver request", + zap.String("mailserverID", ms.ID), zap.Uint("tries", tries), zap.Error(err), ) @@ -349,6 +349,10 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries return nil, err } + if m.mailserversDatabase == nil { + return nil, nil + } + if forceFetchingBackup || !backupFetched { m.logger.Info("fetching backup") err := m.syncBackup() @@ -715,7 +719,7 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag type work struct { pubsubTopic string contentTopics []types.TopicType - storeCursor types.StoreRequestCursor + cursor types.StoreRequestCursor limit uint32 } @@ -730,7 +734,7 @@ type messageRequester interface { limit uint32, waitForResponse bool, processEnvelopes bool, - ) (storeCursor types.StoreRequestCursor, envelopesCount int, err error) + ) (cursor types.StoreRequestCursor, envelopesCount int, err error) } func processMailserverBatch( @@ -833,8 +837,7 @@ loop: }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) - storeCursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.storeCursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) - + cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) queryCancel() if err != nil { @@ -856,7 +859,7 @@ loop: // Check the cursor after calling `shouldProcessNextPage`. // The app might use process the fetched envelopes in the callback for own needs. - if len(cursor) == 0 && storeCursor == nil { + if cursor == nil { return } @@ -866,7 +869,7 @@ loop: workCh <- work{ pubsubTopic: w.pubsubTopic, contentTopics: w.contentTopics, - storeCursor: storeCursor, + cursor: cursor, limit: nextPageLimit, } }(w) @@ -915,12 +918,12 @@ func (m *Messenger) processMailserverBatch(ms mailservers.Mailserver, batch Mail return nil } - storenodeID, err := ms.PeerID() + mailserverID, err := ms.PeerID() if err != nil { return err } - logger := m.logger.With(zap.String("storenode", ms.ID)) - return processMailserverBatch(m.ctx, m.transport, batch, storenodeID, logger, defaultStoreNodeRequestPageSize, nil, false) + logger := m.logger.With(zap.String("mailserverID", ms.ID)) + return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) } func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error { @@ -932,12 +935,12 @@ func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, return nil } - storenodeID, err := ms.PeerID() + mailserverID, err := ms.PeerID() if err != nil { return err } - logger := m.logger.With(zap.String("storenodeID", ms.ID)) - return processMailserverBatch(m.ctx, m.transport, batch, storenodeID, logger, pageLimit, shouldProcessNextPage, processEnvelopes) + logger := m.logger.With(zap.String("mailserverID", ms.ID)) + return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, pageLimit, shouldProcessNextPage, processEnvelopes) } type MailserverBatch struct { @@ -1089,15 +1092,15 @@ func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filte } func (m *Messenger) ToggleUseMailservers(value bool) error { - m.storenodeCycle.Lock() - defer m.storenodeCycle.Unlock() + m.mailserverCycle.Lock() + defer m.mailserverCycle.Unlock() err := m.settings.SetUseMailservers(value) if err != nil { return err } - m.disconnectActiveStorenode(backoffByUserAction) + m.disconnectActiveMailserver(backoffByUserAction) if value { m.cycleMailservers() return nil @@ -1111,7 +1114,7 @@ func (m *Messenger) SetPinnedMailservers(mailservers map[string]string) error { return err } - m.disconnectActiveStorenode(backoffByUserAction) + m.disconnectActiveMailserver(backoffByUserAction) m.cycleMailservers() return nil } diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index d6bf97a08..0ab853a71 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -8,13 +8,14 @@ import ( "net" "runtime" "sort" - "strings" "sync" "time" "github.com/pkg/errors" "go.uber.org/zap" + "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/storenodes" "github.com/status-im/status-go/services/mailservers" @@ -25,15 +26,11 @@ const defaultBackoff = 10 * time.Second const graylistBackoff = 3 * time.Minute const backoffByUserAction = 0 const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64" -const findNearestStorenode = !isAndroidEmulator +const findNearestMailServer = !isAndroidEmulator const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios" const bootstrapDNS = "8.8.8.8:53" -func (m *Messenger) storenodesByFleet(fleet string) []mailservers.Mailserver { - return mailservers.DefaultStorenodesByFleet(fleet) -} - -type byRTTMsAndCanConnectBefore []SortedStorenodes +type byRTTMsAndCanConnectBefore []SortedMailserver func (s byRTTMsAndCanConnectBefore) Len() int { return len(s) @@ -47,26 +44,32 @@ func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool { // Slightly inaccurate as time sensitive sorting, but it does not matter so much now := time.Now() if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) { - return s[i].RTTMs < s[j].RTTMs + return s[i].RTT < s[j].RTT } return s[i].CanConnectAfter.Before(s[j].CanConnectAfter) } -func (m *Messenger) StartStorenodeCycle(storenodes []mailservers.Mailserver) error { - m.storenodeCycle.allStorenodes = storenodes - - if len(storenodes) == 0 { - m.logger.Warn("not starting storenode cycle: empty storenode list") +func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error { + if m.transport.WakuVersion() != 2 { + m.logger.Warn("not starting mailserver cycle: requires wakuv2") return nil } - for _, storenode := range storenodes { + + m.mailserverCycle.allMailservers = mailservers + + if len(mailservers) == 0 { + m.logger.Warn("not starting mailserver cycle: empty mailservers list") + return nil + } + + for _, storenode := range mailservers { peerInfo, err := storenode.PeerInfo() if err != nil { return err } - for _, addr := range peerInfo.Addrs { + for _, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { _, err := m.transport.AddStorePeer(addr) if err != nil { return err @@ -75,59 +78,59 @@ func (m *Messenger) StartStorenodeCycle(storenodes []mailservers.Mailserver) err } go m.verifyStorenodeStatus() - m.logger.Debug("starting storenode cycle", + m.logger.Debug("starting mailserver cycle", zap.Uint("WakuVersion", m.transport.WakuVersion()), - zap.Any("storenode", storenodes), + zap.Any("mailservers", mailservers), ) return nil } -func (m *Messenger) DisconnectActiveStorenode() { - m.storenodeCycle.Lock() - defer m.storenodeCycle.Unlock() - m.disconnectActiveStorenode(graylistBackoff) +func (m *Messenger) DisconnectActiveMailserver() { + m.mailserverCycle.Lock() + defer m.mailserverCycle.Unlock() + m.disconnectActiveMailserver(graylistBackoff) } -func (m *Messenger) disconnecStorenode(backoffDuration time.Duration) error { - if m.storenodeCycle.activeStorenode == nil { - m.logger.Info("no active storenode") +func (m *Messenger) disconnectMailserver(backoffDuration time.Duration) error { + if m.mailserverCycle.activeMailserver == nil { + m.logger.Info("no active mailserver") return nil } - m.logger.Info("disconnecting active storenode", zap.String("nodeID", m.storenodeCycle.activeStorenode.ID)) + m.logger.Info("disconnecting active mailserver", zap.String("nodeID", m.mailserverCycle.activeMailserver.ID)) m.mailPeersMutex.Lock() - pInfo, ok := m.storenodeCycle.peers[m.storenodeCycle.activeStorenode.ID] + pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] if ok { pInfo.status = disconnected pInfo.canConnectAfter = time.Now().Add(backoffDuration) - m.storenodeCycle.peers[m.storenodeCycle.activeStorenode.ID] = pInfo + m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = pInfo } else { - m.storenodeCycle.peers[m.storenodeCycle.activeStorenode.ID] = peerStatus{ + m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = peerStatus{ status: disconnected, - storenode: *m.storenodeCycle.activeStorenode, + mailserver: *m.mailserverCycle.activeMailserver, canConnectAfter: time.Now().Add(backoffDuration), } } m.mailPeersMutex.Unlock() - m.storenodeCycle.activeStorenode = nil + m.mailserverCycle.activeMailserver = nil return nil } -func (m *Messenger) disconnectActiveStorenode(backoffDuration time.Duration) { - err := m.disconnecStorenode(backoffDuration) +func (m *Messenger) disconnectActiveMailserver(backoffDuration time.Duration) { + err := m.disconnectMailserver(backoffDuration) if err != nil { - m.logger.Error("failed to disconnect storenode", zap.Error(err)) + m.logger.Error("failed to disconnect mailserver", zap.Error(err)) } signal.SendMailserverChanged(nil) } func (m *Messenger) cycleMailservers() { - m.logger.Info("Automatically switching storenode") + m.logger.Info("Automatically switching mailserver") - if m.storenodeCycle.activeStorenode != nil { - m.disconnectActiveStorenode(graylistBackoff) + if m.mailserverCycle.activeMailserver != nil { + m.disconnectActiveMailserver(graylistBackoff) } useMailserver, err := m.settings.CanUseMailservers() @@ -137,13 +140,13 @@ func (m *Messenger) cycleMailservers() { } if !useMailserver { - m.logger.Info("Skipping storenode search due to useMailserver being false") + m.logger.Info("Skipping mailserver search due to useMailserver being false") return } - err = m.findNewStorenode() + err = m.findNewMailserver() if err != nil { - m.logger.Error("Error getting new storenode", zap.Error(err)) + m.logger.Error("Error getting new mailserver", zap.Error(err)) } } @@ -162,22 +165,22 @@ func (m *Messenger) getFleet() (string, error) { } else if m.config.clusterConfig.Fleet != "" { fleet = m.config.clusterConfig.Fleet } else { - fleet = params.FleetProd + fleet = params.FleetStatusProd } return fleet, nil } -func (m *Messenger) allStorenodes() ([]mailservers.Mailserver, error) { +func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) { // Get configured fleet fleet, err := m.getFleet() if err != nil { return nil, err } - // Get default storenode for given fleet - allStorenodes := m.storenodesByFleet(fleet) + // Get default mailservers for given fleet + allMailservers := mailservers.DefaultMailserversByFleet(fleet) - // Add custom configured storenode + // Add custom configured mailservers if m.mailserversDatabase != nil { customMailservers, err := m.mailserversDatabase.Mailservers() if err != nil { @@ -186,22 +189,85 @@ func (m *Messenger) allStorenodes() ([]mailservers.Mailserver, error) { for _, c := range customMailservers { if c.Fleet == fleet { - allStorenodes = append(allStorenodes, c) + allMailservers = append(allMailservers, c) } } } - return allStorenodes, nil + return allMailservers, nil } -type SortedStorenodes struct { +type SortedMailserver struct { Mailserver mailservers.Mailserver - RTTMs int + RTT time.Duration CanConnectAfter time.Time } -func (m *Messenger) findNewStorenode() error { +func (m *Messenger) getAvailableMailserversSortedByRTT(allMailservers []mailservers.Mailserver) []mailservers.Mailserver { + // TODO: this can be replaced by peer selector once code is moved to go-waku api + availableMailservers := make(map[string]time.Duration) + availableMailserversMutex := sync.Mutex{} + availableMailserversWg := sync.WaitGroup{} + for _, mailserver := range allMailservers { + availableMailserversWg.Add(1) + go func(mailserver mailservers.Mailserver) { + defer availableMailserversWg.Done() + peerID, err := mailserver.PeerID() + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(m.ctx, 4*time.Second) + defer cancel() + + rtt, err := m.transport.PingPeer(ctx, peerID) + if err == nil { // pinging mailservers might fail, but we don't care + availableMailserversMutex.Lock() + availableMailservers[mailserver.ID] = rtt + availableMailserversMutex.Unlock() + } + }(mailserver) + } + availableMailserversWg.Wait() + + if len(availableMailservers) == 0 { + m.logger.Warn("No mailservers available") // Do nothing... + return nil + } + + mailserversByID := make(map[string]mailservers.Mailserver) + for idx := range allMailservers { + mailserversByID[allMailservers[idx].ID] = allMailservers[idx] + } + var sortedMailservers []SortedMailserver + for mailserverID, rtt := range availableMailservers { + ms := mailserversByID[mailserverID] + sortedMailserver := SortedMailserver{ + Mailserver: ms, + RTT: rtt, + } + m.mailPeersMutex.Lock() + pInfo, ok := m.mailserverCycle.peers[ms.ID] + m.mailPeersMutex.Unlock() + if ok { + if time.Now().Before(pInfo.canConnectAfter) { + continue // We can't connect to this node yet + } + } + sortedMailservers = append(sortedMailservers, sortedMailserver) + } + sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers)) + + result := make([]mailservers.Mailserver, len(sortedMailservers)) + for i, s := range sortedMailservers { + result[i] = s.Mailserver + } + + return result +} + +func (m *Messenger) findNewMailserver() error { // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 if overrideDNS { var dialer net.Dialer @@ -226,95 +292,22 @@ func (m *Messenger) findNewStorenode() error { return m.connectToMailserver(*pinnedMailserver) } - allStorenodes := m.storenodeCycle.allStorenodes + m.logger.Info("Finding a new mailserver...") + + allMailservers := m.mailserverCycle.allMailservers // TODO: remove this check once sockets are stable on x86_64 emulators - if findNearestStorenode { - m.logger.Info("Finding a new storenode...") - - if len(allStorenodes) == 0 { - m.logger.Warn("no storenodes available") // Do nothing... - return nil - - } - - pingResult, err := m.transport.PingPeer(m.ctx, allStorenodes, 500) - if err != nil { - // pinging storenodes might fail, but we don't care - m.logger.Warn("ping failed with", zap.Error(err)) - } - - var availableStorenodes []*mailservers.PingResult - for _, result := range pingResult { - if result.Err != nil { - m.logger.Info("connecting error", zap.String("err", *result.Err)) - continue // The results with error are ignored - } - availableStorenodes = append(availableStorenodes, result) - } - - if len(availableStorenodes) == 0 { - m.logger.Warn("No storenodes available") // Do nothing... - return nil - } - - mailserversByID := make(map[string]mailservers.Mailserver) - for idx := range allStorenodes { - mailserversByID[allStorenodes[idx].ID] = allStorenodes[idx] - } - var sortedMailservers []SortedStorenodes - for _, ping := range availableStorenodes { - ms := mailserversByID[ping.ID] - sortedMailserver := SortedStorenodes{ - Mailserver: ms, - RTTMs: *ping.RTTMs, - } - m.mailPeersMutex.Lock() - pInfo, ok := m.storenodeCycle.peers[ms.ID] - m.mailPeersMutex.Unlock() - if ok { - if time.Now().Before(pInfo.canConnectAfter) { - continue // We can't connect to this node yet - } - } - - sortedMailservers = append(sortedMailservers, sortedMailserver) - - } - sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers)) - - // Picks a random mailserver amongs the ones with the lowest latency - // The pool size is 1/4 of the mailservers were pinged successfully - pSize := poolSize(len(sortedMailservers) - 1) - if pSize <= 0 { - pSize = len(sortedMailservers) - if pSize <= 0 { - m.logger.Warn("No mailservers available") // Do nothing... - return nil - } - } - - r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize))) - if err != nil { - return err - } - - msPing := sortedMailservers[r.Int64()] - ms := mailserversByID[msPing.Mailserver.ID] - m.logger.Info("connecting to mailserver", zap.String("address", ms.ID)) - return m.connectToMailserver(ms) + if findNearestMailServer { + allMailservers = m.getAvailableMailserversSortedByRTT(allMailservers) } - mailserversByID := make(map[string]mailservers.Mailserver) - for idx := range allStorenodes { - mailserversByID[allStorenodes[idx].ID] = allStorenodes[idx] - } - - pSize := poolSize(len(allStorenodes) - 1) + // Picks a random mailserver amongs the ones with the lowest latency + // The pool size is 1/4 of the mailservers were pinged successfully + pSize := poolSize(len(allMailservers) - 1) if pSize <= 0 { - pSize = len(allStorenodes) + pSize = len(allMailservers) if pSize <= 0 { - m.logger.Warn("No mailservers available") // Do nothing... + m.logger.Warn("No storenodes available") // Do nothing... return nil } } @@ -324,17 +317,14 @@ func (m *Messenger) findNewStorenode() error { return err } - msPing := allStorenodes[r.Int64()] - ms := mailserversByID[msPing.ID] - m.logger.Info("connecting to mailserver", zap.String("address", ms.ID)) - + ms := allMailservers[r.Int64()] return m.connectToMailserver(ms) } func (m *Messenger) mailserverStatus(mailserverID string) connStatus { m.mailPeersMutex.RLock() defer m.mailPeersMutex.RUnlock() - peer, ok := m.storenodeCycle.peers[mailserverID] + peer, ok := m.mailserverCycle.peers[mailserverID] if !ok { return disconnected } @@ -343,32 +333,32 @@ func (m *Messenger) mailserverStatus(mailserverID string) connStatus { func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error { - m.logger.Info("connecting to mailserver", zap.Any("peer", ms.ID)) + m.logger.Info("connecting to mailserver", zap.String("mailserverID", ms.ID)) - m.storenodeCycle.activeStorenode = &ms - signal.SendMailserverChanged(m.storenodeCycle.activeStorenode) + m.mailserverCycle.activeMailserver = &ms + signal.SendMailserverChanged(m.mailserverCycle.activeMailserver) - activeMailserverStatus := m.mailserverStatus(ms.ID) - if activeMailserverStatus != connected { + mailserverStatus := m.mailserverStatus(ms.ID) + if mailserverStatus != connected { m.mailPeersMutex.Lock() - m.storenodeCycle.peers[ms.ID] = peerStatus{ + m.mailserverCycle.peers[ms.ID] = peerStatus{ status: connected, lastConnectionAttempt: time.Now(), canConnectAfter: time.Now().Add(defaultBackoff), - storenode: ms, + mailserver: ms, } m.mailPeersMutex.Unlock() - m.storenodeCycle.activeStorenode.FailedRequests = 0 - peerID, err := m.storenodeCycle.activeStorenode.PeerID() + m.mailserverCycle.activeMailserver.FailedRequests = 0 + peerID, err := m.mailserverCycle.activeMailserver.PeerID() if err != nil { - m.logger.Error("could not decode the peer id of storenode", zap.Error(err)) + m.logger.Error("could not decode the peer id of mailserver", zap.Error(err)) return err } - m.logger.Info("storenode available", zap.String("storenodeID", m.storenodeCycle.activeStorenode.ID)) + m.logger.Info("mailserver available", zap.String("mailserverID", m.mailserverCycle.activeMailserver.ID)) m.EmitMailserverAvailable() - signal.SendMailserverAvailable(m.storenodeCycle.activeStorenode) + signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver) m.transport.SetStorePeerID(peerID) @@ -382,15 +372,15 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error { // for that community if it has a mailserver setup otherwise it'll return the global mailserver func (m *Messenger) getActiveMailserver(communityID ...string) *mailservers.Mailserver { if len(communityID) == 0 || communityID[0] == "" { - return m.storenodeCycle.activeStorenode + return m.mailserverCycle.activeMailserver } - ms, err := m.communityStorenodes.GetStorenodeByCommunnityID(communityID[0]) + ms, err := m.communityStorenodes.GetStorenodeByCommunityID(communityID[0]) if err != nil { if !errors.Is(err, storenodes.ErrNotFound) { m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err)) } // if we don't find a specific mailserver for the community, we just use the regular mailserverCycle's one - return m.storenodeCycle.activeStorenode + return m.mailserverCycle.activeMailserver } return &ms } @@ -407,43 +397,16 @@ func (m *Messenger) isMailserverAvailable(mailserverID string) bool { return m.mailserverStatus(mailserverID) == connected } -func mailserverAddressToID(uniqueID string, allStorenodes []mailservers.Mailserver) (string, error) { - for _, ms := range allStorenodes { - if uniqueID == ms.ID { - return ms.ID, nil - } - - } - - return "", nil -} - -type ConnectedPeer struct { - UniqueID string -} - -func (m *Messenger) mailserverPeersInfo() []ConnectedPeer { - var connectedPeers []ConnectedPeer - for _, connectedPeer := range m.server.PeersInfo() { - connectedPeers = append(connectedPeers, ConnectedPeer{ - // This is a bit fragile, but should work - UniqueID: strings.TrimSuffix(connectedPeer.Enode, "?discport=0"), - }) - } - - return connectedPeers -} - func (m *Messenger) penalizeMailserver(id string) { m.mailPeersMutex.Lock() defer m.mailPeersMutex.Unlock() - pInfo, ok := m.storenodeCycle.peers[id] + pInfo, ok := m.mailserverCycle.peers[id] if !ok { pInfo.status = disconnected } pInfo.canConnectAfter = time.Now().Add(graylistBackoff) - m.storenodeCycle.peers[id] = pInfo + m.mailserverCycle.peers[id] = pInfo } func (m *Messenger) asyncRequestAllHistoricMessages() { @@ -496,7 +459,7 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) { return nil, nil } - fleetMailservers := mailservers.DefaultStorenodes() + fleetMailservers := mailservers.DefaultMailservers() for _, c := range fleetMailservers { if c.Fleet == fleet && c.ID == pinnedMailserver { @@ -521,35 +484,35 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) { } func (m *Messenger) EmitMailserverAvailable() { - for _, s := range m.storenodeCycle.availabilitySubscriptions { + for _, s := range m.mailserverCycle.availabilitySubscriptions { s <- struct{}{} close(s) - l := len(m.storenodeCycle.availabilitySubscriptions) - m.storenodeCycle.availabilitySubscriptions = m.storenodeCycle.availabilitySubscriptions[:l-1] + l := len(m.mailserverCycle.availabilitySubscriptions) + m.mailserverCycle.availabilitySubscriptions = m.mailserverCycle.availabilitySubscriptions[:l-1] } } func (m *Messenger) SubscribeMailserverAvailable() chan struct{} { c := make(chan struct{}) - m.storenodeCycle.availabilitySubscriptions = append(m.storenodeCycle.availabilitySubscriptions, c) + m.mailserverCycle.availabilitySubscriptions = append(m.mailserverCycle.availabilitySubscriptions, c) return c } func (m *Messenger) disconnectStorenodeIfRequired() error { m.logger.Debug("wakuV2 storenode status verification") - if m.storenodeCycle.activeStorenode == nil { + if m.mailserverCycle.activeMailserver == nil { // No active storenode, find a new one m.cycleMailservers() return nil } // Check whether we want to disconnect the active storenode - if m.storenodeCycle.activeStorenode.FailedRequests >= mailserverMaxFailedRequests { - m.penalizeMailserver(m.storenodeCycle.activeStorenode.ID) + if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests { + m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID) signal.SendMailserverNotWorking() - m.logger.Info("too many failed requests", zap.String("storenode", m.storenodeCycle.activeStorenode.ID)) - m.storenodeCycle.activeStorenode.FailedRequests = 0 + m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.ID)) + m.mailserverCycle.activeMailserver.FailedRequests = 0 return m.connectToNewMailserverAndWait() } @@ -557,7 +520,7 @@ func (m *Messenger) disconnectStorenodeIfRequired() error { } func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool { - // Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. + // Add 1 second to timeout, because the mailserver cycle has 1 second ticker, which doesn't tick on start. // This can be improved after merging https://github.com/status-im/status-go/pull/4380. // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately timeout += time.Second diff --git a/protocol/messenger_mailserver_processMailserverBatch_test.go b/protocol/messenger_mailserver_processMailserverBatch_test.go index 389447cfb..71ea759a4 100644 --- a/protocol/messenger_mailserver_processMailserverBatch_test.go +++ b/protocol/messenger_mailserver_processMailserverBatch_test.go @@ -7,8 +7,10 @@ import ( "errors" "math/big" "testing" + "time" "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "github.com/status-im/status-go/eth-node/types" @@ -37,21 +39,21 @@ func getInitialResponseKey(topics []types.TopicType) string { func (t *mockTransport) SendMessagesRequestForTopics( ctx context.Context, - peerID []byte, + peerID peer.ID, from, to uint32, - previousStoreCursor types.StoreRequestCursor, + prevCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (storeCursor types.StoreRequestCursor, envelopesCount int, err error) { +) (cursor types.StoreRequestCursor, envelopesCount int, err error) { var response queryResponse - if previousStoreCursor == nil { + if prevCursor == nil { initialResponse := getInitialResponseKey(contentTopics) response = t.queryResponses[initialResponse] } else { - response = t.queryResponses[hex.EncodeToString(previousStoreCursor)] + response = t.queryResponses[hex.EncodeToString(prevCursor)] } return response.cursor, 0, response.err } @@ -115,44 +117,51 @@ func (t *mockTransport) Populate(topics []types.TopicType, responses int, includ } func TestProcessMailserverBatchHappyPath(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + logger := tt.MustCreateTestLogger() - peerID := peer. - mailserverID := []byte{1, 2, 3, 4, 5} + mailserverID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) topics := []types.TopicType{} for i := 0; i < 22; i++ { topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)})) } testTransport := newMockTransport() - err := testTransport.Populate(topics, 10, false) + err = testTransport.Populate(topics, 10, false) require.NoError(t, err) testBatch := MailserverBatch{ Topics: topics, } - err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) + err = processMailserverBatch(ctx, testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) require.NoError(t, err) } func TestProcessMailserverBatchFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + logger := tt.MustCreateTestLogger() - mailserverID := []byte{1, 2, 3, 4, 5} + mailserverID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) topics := []types.TopicType{} for i := 0; i < 5; i++ { topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)})) } testTransport := newMockTransport() - err := testTransport.Populate(topics, 4, true) + err = testTransport.Populate(topics, 4, true) require.NoError(t, err) testBatch := MailserverBatch{ Topics: topics, } - err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) + err = processMailserverBatch(ctx, testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) require.Error(t, err) } diff --git a/protocol/messenger_peers.go b/protocol/messenger_peers.go index 5e2724a07..b73e543a1 100644 --- a/protocol/messenger_peers.go +++ b/protocol/messenger_peers.go @@ -3,10 +3,11 @@ package protocol import ( "crypto/ecdsa" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/eth-node/types" ) diff --git a/protocol/messenger_storenode_comunity_test.go b/protocol/messenger_storenode_comunity_test.go index 1ba9ef69d..746951413 100644 --- a/protocol/messenger_storenode_comunity_test.go +++ b/protocol/messenger_storenode_comunity_test.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/status-im/status-go/protocol/storenodes" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" @@ -350,10 +351,10 @@ func (s *MessengerStoreNodeCommunitySuite) TestToggleUseMailservers() { // Enable use of mailservers err := s.owner.ToggleUseMailservers(true) s.Require().NoError(err) - s.Require().NotNil(s.owner.storenodeCycle.activeStorenode) + s.Require().NotNil(s.owner.mailserverCycle.activeMailserver) // Disable use of mailservers err = s.owner.ToggleUseMailservers(false) s.Require().NoError(err) - s.Require().Nil(s.owner.storenodeCycle.activeStorenode) + s.Require().Nil(s.owner.mailserverCycle.activeMailserver) } diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index fc8dde717..543e32fb3 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -1046,7 +1046,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { } // Prepare things depending on the configuration - nodesList := mailserversDB.DefaultStorenodesByFleet(fleet) + nodesList := mailserversDB.DefaultMailserversByFleet(fleet) descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID)) shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID))) diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index e77967f72..906d04346 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -11,6 +11,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/status-im/status-go/protocol/wakusync" "github.com/status-im/status-go/protocol/identity" diff --git a/protocol/storenodes/models.go b/protocol/storenodes/models.go index c233a6ae1..99f5bfc9a 100644 --- a/protocol/storenodes/models.go +++ b/protocol/storenodes/models.go @@ -2,6 +2,7 @@ package storenodes import ( "github.com/multiformats/go-multiaddr" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/services/mailservers" diff --git a/protocol/storenodes/storenodes.go b/protocol/storenodes/storenodes.go index db4c1bbde..66a6f4bc3 100644 --- a/protocol/storenodes/storenodes.go +++ b/protocol/storenodes/storenodes.go @@ -39,8 +39,8 @@ type storenodesData struct { storenodes []Storenode } -// GetStorenodeByCommunnityID returns the active storenode for a community -func (m *CommunityStorenodes) GetStorenodeByCommunnityID(communityID string) (mailservers.Mailserver, error) { +// GetStorenodeByCommunityID returns the active storenode for a community +func (m *CommunityStorenodes) GetStorenodeByCommunityID(communityID string) (mailservers.Mailserver, error) { m.storenodesByCommunityIDMutex.RLock() defer m.storenodesByCommunityIDMutex.RUnlock() diff --git a/protocol/storenodes/storenodes_test.go b/protocol/storenodes/storenodes_test.go index 0d7f1330c..21ee3629e 100644 --- a/protocol/storenodes/storenodes_test.go +++ b/protocol/storenodes/storenodes_test.go @@ -65,11 +65,11 @@ func TestUpdateStorenodesInDB(t *testing.T) { require.NoError(t, err) // check if storenodes are loaded - ms1, err := csn.GetStorenodeByCommunnityID(communityID1.String()) + ms1, err := csn.GetStorenodeByCommunityID(communityID1.String()) require.NoError(t, err) matchStoreNode(t, snodes1[0], ms1) - ms2, err := csn.GetStorenodeByCommunnityID(communityID2.String()) + ms2, err := csn.GetStorenodeByCommunityID(communityID2.String()) require.NoError(t, err) matchStoreNode(t, snodes2[0], ms2) } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 34e5d956d..2d873a175 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -511,14 +511,14 @@ func (t *Transport) SendMessagesRequestForTopics( ctx context.Context, peerID peer.ID, from, to uint32, - previousStoreCursor types.StoreRequestCursor, + prevCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (storeCursor types.StoreRequestCursor, envelopesCount int, err error) { - return t.createMessagesRequest(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes) +) (cursor types.StoreRequestCursor, envelopesCount int, err error) { + return t.createMessagesRequest(ctx, peerID, from, to, prevCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes) } func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest { @@ -631,6 +631,10 @@ func (t *Transport) ConnectionChanged(state connection.State) { t.waku.ConnectionChanged(state) } +func (t *Transport) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + return t.waku.PingPeer(ctx, peerID) +} + // Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { if t.waku.Version() == 2 { diff --git a/services/ext/api.go b/services/ext/api.go index 28309c98e..3554281af 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -1412,7 +1412,7 @@ func (api *PublicAPI) RequestAllHistoricMessagesWithRetries(forceFetchingBackup } func (api *PublicAPI) DisconnectActiveMailserver() { - api.service.messenger.DisconnectActiveStorenode() + api.service.messenger.DisconnectActiveMailserver() } // Echo is a method for testing purposes. diff --git a/services/mailservers/database.go b/services/mailservers/database.go index d66bc0adf..a52646a62 100644 --- a/services/mailservers/database.go +++ b/services/mailservers/database.go @@ -11,7 +11,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/ethereum/go-ethereum/p2p/enode" @@ -55,6 +57,7 @@ func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) { if err != nil { return nil, err } + addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...) maddrs = append(maddrs, addrInfo.Addrs...) } diff --git a/services/mailservers/fleet.go b/services/mailservers/fleet.go index 8a4156d27..446a7f15f 100644 --- a/services/mailservers/fleet.go +++ b/services/mailservers/fleet.go @@ -2,9 +2,9 @@ package mailservers import "github.com/status-im/status-go/params" -func DefaultStorenodesByFleet(fleet string) []Mailserver { +func DefaultMailserversByFleet(fleet string) []Mailserver { var items []Mailserver - for _, ms := range DefaultStorenodes() { + for _, ms := range DefaultMailservers() { if ms.Fleet == fleet { items = append(items, ms) } @@ -12,7 +12,7 @@ func DefaultStorenodesByFleet(fleet string) []Mailserver { return items } -func DefaultStorenodes() []Mailserver { +func DefaultMailservers() []Mailserver { return []Mailserver{ { @@ -53,13 +53,13 @@ func DefaultStorenodes() []Mailserver { }, { ID: "store-01.do-ams3.status.prod", - ENR: MustDecodeENR("enr:-QEeuEA08-NJJDuKh6V8739MPl2G7ykaC0EWyUg21KtjQ1UtKxuE2qNy5uES2_bobr7sC5C4sS_-GhDVYMpOrM2IFc8KAYJpZIJ2NIJpcIQiqsAnim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQN_aBxNsOBrceDLyC75vBFRuzv_tWfaHG50Jc9DQztwkIN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + ENR: MustDecodeENR("enr:-QEMuEAs8JmmyUI3b9v_ADqYtELHUYAsAMS21lA2BMtrzF86tVmyy9cCrhmzfHGHx_g3nybn7jIRybzXTGNj3C2KzrriAYJpZIJ2NIJpcISf3_Jeim11bHRpYWRkcnO4XAArNiZzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwAtNiZzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLfoaQH3oSYW59yxEBfeAZbltmUnC4BzYkHqer2VQMTyoN0Y3CCdl-DdWRwgiMohXdha3UyAw"), Addr: MustDecodeMultiaddress("/dns4/store-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT"), Fleet: params.FleetStatusProd, }, { ID: "store-02.do-ams3.status.prod", - ENR: MustDecodeENR("enr:-QEeuECQiv4VvUk04UnU3wxKXgWvErYcGMgYU8aDuc8VvEt1km2GvcEBq-R9XT-loNL5PZjxGKzB1rDtCOQaFVYQtgPnAYJpZIJ2NIJpcIQiqpoCim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQNbEg1bkMJCBiD5Tje3Z_11R-kd9munZF0v4iiYZa1jgoN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + ENR: MustDecodeENR("enr:-QEMuEDuTfD47Hz_NXDwf7LJMf0qhjp3CQhZ9Fy0Ulp4XehtEzewBzmJCoe77hjno3khH8kX2B9B1DgbJuc2n32fMZvOAYJpZIJ2NIJpcISf3_Kaim11bHRpYWRkcnO4XAArNiZzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwAtNiZzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLSM62HmqGpZ382YM4CyI-MCIlkxMP7ZbOwqwRPvk9wsIN0Y3CCdl-DdWRwgiMohXdha3UyAw"), Addr: MustDecodeMultiaddress("/dns4/store-02.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9aDJPkhGxc2SFcEACTFdZ91Q5TJjp76qZEhq9iF59x7R"), Fleet: params.FleetStatusProd, }, diff --git a/services/mailservers/tcp_ping.go b/services/mailservers/tcp_ping.go deleted file mode 100644 index e7acc5e81..000000000 --- a/services/mailservers/tcp_ping.go +++ /dev/null @@ -1,152 +0,0 @@ -package mailservers - -import ( - "context" - "fmt" - "net" - "time" - - multiaddr "github.com/multiformats/go-multiaddr" - - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/status-im/status-go/rtt" -) - -type PingQuery struct { - Addresses []string `json:"addresses"` - TimeoutMs int `json:"timeoutMs"` -} - -type PingResult struct { - Address string `json:"address"` - RTTMs *int `json:"rttMs"` - Err *string `json:"error"` -} - -type parseFn func(string) (string, error) - -func (pr *PingResult) Update(rttMs int, err error) { - if err != nil { - errStr := err.Error() - pr.Err = &errStr - } - if rttMs >= 0 { - pr.RTTMs = &rttMs - } else { - pr.RTTMs = nil - } -} - -func EnodeToAddr(node *enode.Node) (string, error) { - var ip4 enr.IPv4 - err := node.Load(&ip4) - if err != nil { - return "", err - } - var tcp enr.TCP - err = node.Load(&tcp) - if err != nil { - return "", err - } - return fmt.Sprintf("%s:%d", net.IP(ip4).String(), tcp), nil -} - -func EnodeStringToAddr(enodeAddr string) (string, error) { - node, err := enode.ParseV4(enodeAddr) - if err != nil { - return "", err - } - return EnodeToAddr(node) -} - -func parse(addresses []string, fn parseFn) (map[string]*PingResult, []string) { - results := make(map[string]*PingResult, len(addresses)) - var toPing []string - - for i := range addresses { - addr, err := fn(addresses[i]) - if err != nil { - errStr := err.Error() - results[addresses[i]] = &PingResult{Address: addresses[i], Err: &errStr} - continue - } - results[addr] = &PingResult{Address: addresses[i]} - toPing = append(toPing, addr) - } - return results, toPing -} - -func mapValues(m map[string]*PingResult) []*PingResult { - rval := make([]*PingResult, 0, len(m)) - for _, value := range m { - rval = append(rval, value) - } - return rval -} - -func DoPing(ctx context.Context, addresses []string, timeoutMs int, p parseFn) ([]*PingResult, error) { - timeout := time.Duration(timeoutMs) * time.Millisecond - - resultsMap, toPing := parse(addresses, p) - - // run the checks concurrently - results, err := rtt.CheckHosts(toPing, timeout) - if err != nil { - return nil, err - } - - // set ping results - for i := range results { - r := results[i] - pr := resultsMap[r.Addr] - if pr == nil { - continue - } - pr.Update(r.RTTMs, r.Err) - } - - return mapValues(resultsMap), nil -} - -func (a *API) Ping(ctx context.Context, pq PingQuery) ([]*PingResult, error) { - return DoPing(ctx, pq.Addresses, pq.TimeoutMs, EnodeStringToAddr) -} - -func MultiAddressToAddress(multiAddr string) (string, error) { - - ma, err := multiaddr.NewMultiaddr(multiAddr) - if err != nil { - return "", err - } - - dns4, err := ma.ValueForProtocol(multiaddr.P_DNS4) - if err != nil && err != multiaddr.ErrProtocolNotFound { - return "", err - } - - ip4 := "" - if dns4 != "" { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - ip4, err = net.DefaultResolver.LookupCNAME(ctx, dns4) - if err != nil { - return "", err - } - } else { - ip4, err = ma.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - return "", err - } - } - - tcp, err := ma.ValueForProtocol(multiaddr.P_TCP) - if err != nil { - return "", err - } - return fmt.Sprintf("%s:%s", ip4, tcp), nil -} - -func (a *API) MultiAddressPing(ctx context.Context, pq PingQuery) ([]*PingResult, error) { - return DoPing(ctx, pq.Addresses, pq.TimeoutMs, MultiAddressToAddress) -} diff --git a/services/wakuext/api.go b/services/wakuext/api.go index 1617bec0e..a021fdddc 100644 --- a/services/wakuext/api.go +++ b/services/wakuext/api.go @@ -1,19 +1,9 @@ package wakuext import ( - "crypto/ecdsa" - "time" - "github.com/ethereum/go-ethereum/log" - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/services/ext" - waku "github.com/status-im/status-go/waku/common" -) - -const ( - // defaultWorkTime is a work time reported in messages sent to MailServer nodes. - defaultWorkTime = 5 ) // PublicAPI extends waku public API. @@ -33,39 +23,3 @@ func NewPublicAPI(s *Service) *PublicAPI { log: log.New("package", "status-go/services/wakuext.PublicAPI"), } } - -// makeEnvelop makes an envelop for a historic messages request. -// Symmetric key is used to authenticate to MailServer. -// PK is the current node ID. -// DEPRECATED -func makeEnvelop( - payload []byte, - symKey []byte, - publicKey *ecdsa.PublicKey, - nodeID *ecdsa.PrivateKey, - pow float64, - now time.Time, -) (types.Envelope, error) { - params := waku.MessageParams{ - PoW: pow, - Payload: payload, - WorkTime: defaultWorkTime, - Src: nodeID, - } - // Either symKey or public key is required. - // This condition is verified in `message.Wrap()` method. - if len(symKey) > 0 { - params.KeySym = symKey - } else if publicKey != nil { - params.Dst = publicKey - } - message, err := waku.NewSentMessage(¶ms) - if err != nil { - return nil, err - } - envelope, err := message.Wrap(¶ms, now) - if err != nil { - return nil, err - } - return gethbridge.NewWakuEnvelope(envelope), nil -} diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index b9d55221e..33304a738 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -1,25 +1,18 @@ package wakuext import ( - "fmt" "io/ioutil" - "math" - "strconv" "testing" "go.uber.org/zap" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/appdatabase" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/crypto" - "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/params" "github.com/status-im/status-go/services/ext" @@ -66,102 +59,3 @@ func TestInitProtocol(t *testing.T) { err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop()) require.NoError(t, err) } - -func TestShhExtSuite(t *testing.T) { - suite.Run(t, new(ShhExtSuite)) -} - -type ShhExtSuite struct { - suite.Suite - - dir string - nodes []*node.Node - wakus []types.Waku - services []*Service -} - -func (s *ShhExtSuite) createAndAddNode() { - idx := len(s.nodes) - - // create a node - cfg := &node.Config{ - Name: strconv.Itoa(idx), - P2P: p2p.Config{ - MaxPeers: math.MaxInt32, - NoDiscovery: true, - ListenAddr: ":0", - }, - NoUSB: true, - } - stack, err := node.New(cfg) - s.NoError(err) - w := waku.New(nil, nil) - stack.RegisterLifecycle(w) - stack.RegisterAPIs(w.APIs()) - stack.RegisterProtocols(w.Protocols()) - s.NoError(err) - - // set up protocol - config := params.NodeConfig{ - RootDataDir: s.dir, - ShhextConfig: params.ShhextConfig{ - InstallationID: "1", - PFSEnabled: true, - MailServerConfirmations: true, - ConnectionTarget: 10, - }, - } - db, err := leveldb.Open(storage.NewMemStorage(), nil) - s.Require().NoError(err) - nodeWrapper := ext.NewTestNodeWrapper(nil, gethbridge.NewGethWakuWrapper(w)) - service := New(config, nodeWrapper, nil, nil, db) - - appDB, cleanupDB, err := helpers.SetupTestSQLDB(appdatabase.DbInitializer{}, fmt.Sprintf("%d", idx)) - s.Require().NoError(err) - defer func() { s.Require().NoError(cleanupDB()) }() - - tmpfile, err := ioutil.TempFile("", "multi-accounts-tests-") - s.Require().NoError(err) - - multiAccounts, err := multiaccounts.InitializeDB(tmpfile.Name()) - s.Require().NoError(err) - - privateKey, err := crypto.GenerateKey() - s.NoError(err) - - acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"} - - walletDB, err := helpers.SetupTestMemorySQLDB(&walletdatabase.DbInitializer{}) - s.Require().NoError(err) - - err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop()) - s.NoError(err) - - stack.RegisterLifecycle(service) - stack.RegisterAPIs(service.APIs()) - stack.RegisterProtocols(service.Protocols()) - - s.NoError(err) - - // start the node - err = stack.Start() - s.Require().NoError(err) - - // store references - s.nodes = append(s.nodes, stack) - s.wakus = append(s.wakus, gethbridge.NewGethWakuWrapper(w)) - s.services = append(s.services, service) -} - -func (s *ShhExtSuite) SetupTest() { - s.dir = s.T().TempDir() -} - -func (s *ShhExtSuite) TearDownTest() { - for _, n := range s.nodes { - s.NoError(n.Close()) - } - s.nodes = nil - s.wakus = nil - s.services = nil -} diff --git a/waku/waku_version_test.go b/waku/waku_version_test.go index 03bd0aa51..1b32040f4 100644 --- a/waku/waku_version_test.go +++ b/waku/waku_version_test.go @@ -278,20 +278,6 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() { timer.Stop() } -func discardPipe() *p2p.MsgPipeRW { - rw1, rw2 := p2p.MsgPipe() - go func() { - for { - msg, err := rw1.ReadMsg() - if err != nil { - return - } - msg.Discard() // nolint: errcheck - } - }() - return rw2 -} - func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() { c := &Config{ MaxMessageSize: common.DefaultMaxMessageSize, diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 6d5e66936..90f5abe44 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -36,6 +36,7 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" @@ -1792,6 +1793,19 @@ func (w *Waku) PeerID() peer.ID { return w.node.Host().ID() } +func (w *Waku) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + pingResultCh := ping.Ping(ctx, w.node.Host(), peerID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} + func (w *Waku) Peerstore() peerstore.Peerstore { return w.node.Host().Peerstore() }