diff --git a/cmd/node-canary/main.go b/cmd/node-canary/main.go index 66d227898..70620c0ca 100644 --- a/cmd/node-canary/main.go +++ b/cmd/node-canary/main.go @@ -2,17 +2,13 @@ package main import ( - "context" - "errors" "flag" - "fmt" stdlog "log" "os" "path" "path/filepath" "time" - "golang.org/x/crypto/sha3" "golang.org/x/crypto/ssh/terminal" "github.com/ethereum/go-ethereum/log" @@ -20,40 +16,27 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/api" - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" - "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/rpc" - "github.com/status-im/status-go/services/ext" - "github.com/status-im/status-go/services/wakuext" "github.com/status-im/status-go/t/helpers" ) -const ( - mailboxPassword = "status-offline-inbox" -) - // All general log messages in this package should be routed through this logger. var logger = log.New("package", "status-go/cmd/node-canary") var ( - staticEnodeAddr = flag.String("staticnode", "", "checks if static node talks waku protocol (e.g. enode://abc123@1.2.3.4:30303)") - mailserverEnodeAddr = flag.String("mailserver", "", "queries mail server for historic messages (e.g. enode://123abc@4.3.2.1:30504)") - publicChannel = flag.String("channel", "status", "The public channel name to retrieve historic messages from (used with 'mailserver' flag)") - timeout = flag.Int("timeout", 10, "Timeout when connecting to node or fetching messages from mailserver, in seconds") - period = flag.Int("period", 24*60*60, "How far in the past to request messages from mailserver, in seconds") - minPow = flag.Float64("waku.pow", params.WakuMinimumPoW, "PoW for messages to be added to queue, in float format") - ttl = flag.Int("waku.ttl", params.WakuTTL, "Time to live for messages, in seconds") - homePath = flag.String("home-dir", ".", "Home directory where state is stored") - logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`) - logFile = flag.String("logfile", "", "Path to the log file") - logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors") + staticEnodeAddr = flag.String("staticnode", "", "checks if static node talks waku protocol (e.g. enode://abc123@1.2.3.4:30303)") + minPow = flag.Float64("waku.pow", params.WakuMinimumPoW, "PoW for messages to be added to queue, in float format") + ttl = flag.Int("waku.ttl", params.WakuTTL, "Time to live for messages, in seconds") + homePath = flag.String("home-dir", ".", "Home directory where state is stored") + logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`) + logFile = flag.String("logfile", "", "Path to the log file") + logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors") ) func main() { var err error - var staticParsedNode, mailserverParsedNode *enode.Node + var staticParsedNode *enode.Node if *staticEnodeAddr != "" { staticParsedNode, err = enode.ParseV4(*staticEnodeAddr) if err != nil { @@ -62,26 +45,12 @@ func main() { } } - if *mailserverEnodeAddr != "" { - mailserverParsedNode, err = enode.ParseV4(*mailserverEnodeAddr) - if err != nil { - logger.Crit("Invalid mailserver address specified", "mailserverEnodeAddr", *mailserverEnodeAddr, "error", err) - os.Exit(1) - } - } - if staticParsedNode != nil { verifyStaticNodeBehavior(staticParsedNode) logger.Info("Connected to static node correctly", "address", *staticEnodeAddr) os.Exit(0) } - if mailserverParsedNode != nil { - verifyMailserverBehavior(mailserverParsedNode) - logger.Info("Mailserver responded correctly", "address", *mailserverEnodeAddr) - os.Exit(0) - } - logger.Crit("No address specified") os.Exit(1) } @@ -99,119 +68,6 @@ func init() { } } -func verifyMailserverBehavior(mailserverNode *enode.Node) { - clientBackend, err := startClientNode() - if err != nil { - logger.Error("Node start failed", "error", err) - os.Exit(1) - } - defer func() { _ = clientBackend.StopNode() }() - - clientNode := clientBackend.StatusNode() - clientGethWakuService := clientNode.WakuService() - if clientGethWakuService == nil { - logger.Error("Could not retrieve waku service") - os.Exit(1) - } - clientWakuService := gethbridge.NewGethWakuWrapper(clientGethWakuService) - clientWakuExtService := clientNode.WakuExtService() - if clientWakuExtService == nil { - logger.Error("Could not retrieve wakuext service") - os.Exit(1) - } - - // add mailserver peer to client - clientErrCh := helpers.WaitForPeerAsync(clientNode.Server(), *mailserverEnodeAddr, p2p.PeerEventTypeAdd, time.Duration(*timeout)*time.Second) - - err = clientNode.AddPeer(*mailserverEnodeAddr) - if err != nil { - logger.Error("Failed to add mailserver peer to client", "error", err) - os.Exit(1) - } - - err = <-clientErrCh - if err != nil { - logger.Error("Error detected while waiting for mailserver peer to be added", "error", err) - os.Exit(1) - } - - // add mailserver sym key - mailServerKeyID, err := clientWakuService.AddSymKeyFromPassword(mailboxPassword) - if err != nil { - logger.Error("Error adding mailserver sym key to client peer", "error", err) - os.Exit(1) - } - - mailboxPeer := mailserverNode.ID().Bytes() - err = clientGethWakuService.AllowP2PMessagesFromPeer(mailboxPeer) - if err != nil { - logger.Error("Failed to allow P2P messages from mailserver peer", "error", err, mailserverNode.String()) - os.Exit(1) - } - - clientRPCClient := clientNode.RPCClient() - - _, topic, _, err := joinPublicChat(clientWakuService, clientRPCClient, *publicChannel) - if err != nil { - logger.Error("Failed to join public chat", "error", err) - os.Exit(1) - } - - // watch for envelopes to be available in filters in the client - envelopeAvailableWatcher := make(chan types.EnvelopeEvent, 1024) - sub := clientWakuService.SubscribeEnvelopeEvents(envelopeAvailableWatcher) - defer sub.Unsubscribe() - - // watch for mailserver responses in the client - mailServerResponseWatcher := make(chan types.EnvelopeEvent, 1024) - sub = clientWakuService.SubscribeEnvelopeEvents(mailServerResponseWatcher) - defer sub.Unsubscribe() - - // request messages from mailbox - wakuextAPI := wakuext.NewPublicAPI(clientWakuExtService) - requestIDBytes, err := wakuextAPI.RequestMessages(context.TODO(), - ext.MessagesRequest{ - MailServerPeer: mailserverNode.String(), - From: uint32(clientWakuService.GetCurrentTime().Add(-time.Duration(*period) * time.Second).Unix()), - Limit: 1, - Topic: topic, - SymKeyID: mailServerKeyID, - Timeout: time.Duration(*timeout), - }) - if err != nil { - logger.Error("Error requesting historic messages from mailserver", "error", err) - os.Exit(2) - } - requestID := types.BytesToHash(requestIDBytes) - - // wait for mailserver request sent event - err = waitForMailServerRequestSent(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) - if err != nil { - logger.Error("Error waiting for mailserver request sent event", "error", err) - os.Exit(3) - } - - // wait for mailserver response - resp, err := waitForMailServerResponse(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) - if err != nil { - logger.Error("Error waiting for mailserver response", "error", err) - os.Exit(3) - } - - // if last envelope is empty there are no messages to receive - if isEmptyEnvelope(resp.LastEnvelopeHash) { - logger.Warn("No messages available from mailserver") - return - } - - // wait for last envelope sent by the mailserver to be available for filters - err = waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, types.EventEnvelopeAvailable) - if err != nil { - logger.Error("Error waiting for envelopes to be available to client filter", "error", err) - os.Exit(4) - } -} - func verifyStaticNodeBehavior(staticNode *enode.Node) { clientBackend, err := startClientNode() if err != nil { @@ -311,107 +167,3 @@ func startClientNode() (*api.GethStatusBackend, error) { } return clientBackend, err } - -func joinPublicChat(w types.Waku, rpcClient *rpc.Client, name string) (string, types.TopicType, string, error) { - keyID, err := w.AddSymKeyFromPassword(name) - if err != nil { - return "", types.TopicType{}, "", err - } - - h := sha3.NewLegacyKeccak256() - _, err = h.Write([]byte(name)) - if err != nil { - return "", types.TopicType{}, "", err - } - fullTopic := h.Sum(nil) - topic := types.BytesToTopic(fullTopic) - - wakuAPI := w.PublicWakuAPI() - filterID, err := wakuAPI.NewMessageFilter(types.Criteria{SymKeyID: keyID, Topics: []types.TopicType{topic}}) - - return keyID, topic, filterID, err -} - -func waitForMailServerRequestSent(events chan types.EnvelopeEvent, requestID types.Hash, timeout time.Duration) error { - timeoutTimer := time.NewTimer(timeout) - for { - select { - case event := <-events: - if event.Hash == requestID && event.Event == types.EventMailServerRequestSent { - timeoutTimer.Stop() - return nil - } - case <-timeoutTimer.C: - return errors.New("timed out waiting for mailserver request sent") - } - } -} - -func waitForMailServerResponse(events chan types.EnvelopeEvent, requestID types.Hash, timeout time.Duration) (*types.MailServerResponse, error) { - timeoutTimer := time.NewTimer(timeout) - for { - select { - case event := <-events: - if event.Hash == requestID { - resp, err := decodeMailServerResponse(event) - if resp != nil || err != nil { - timeoutTimer.Stop() - return resp, err - } - } - case <-timeoutTimer.C: - return nil, errors.New("timed out waiting for mailserver response") - } - } -} - -func decodeMailServerResponse(event types.EnvelopeEvent) (*types.MailServerResponse, error) { - switch event.Event { - case types.EventMailServerRequestSent: - return nil, nil - case types.EventMailServerRequestCompleted: - resp, ok := event.Data.(*types.MailServerResponse) - if !ok { - return nil, errors.New("failed to convert event to a *MailServerResponse") - } - - return resp, nil - case types.EventMailServerRequestExpired: - return nil, errors.New("no messages available from mailserver") - default: - return nil, fmt.Errorf("unexpected event type: %v", event.Event) - } -} - -func waitForEnvelopeEvents(events chan types.EnvelopeEvent, hashes []string, event types.EventType) error { - check := make(map[string]struct{}) - for _, hash := range hashes { - check[hash] = struct{}{} - } - - timeout := time.NewTimer(time.Second * 5) - for { - select { - case e := <-events: - if e.Event == event { - delete(check, e.Hash.String()) - if len(check) == 0 { - timeout.Stop() - return nil - } - } - case <-timeout.C: - return fmt.Errorf("timed out while waiting for event on envelopes. event: %s", event) - } - } -} - -// helper for checking LastEnvelopeHash -func isEmptyEnvelope(hash types.Hash) bool { - for _, b := range hash { - if b != 0 { - return false - } - } - return true -} diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 35422ea1e..83b872724 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -264,27 +264,6 @@ func (w *GethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateK }, id), nil } -func (w *GethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error { - return w.waku.SendMessagesRequest(peerID, wakucommon.MessagesRequest{ - ID: r.ID, - From: r.From, - To: r.To, - Limit: r.Limit, - Cursor: r.Cursor, - Bloom: r.Bloom, - Topics: r.ContentTopics, - }) -} - -// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, -// which is known to implement MailServer interface, and is supposed to process this -// request and respond with a number of peer-to-peer messages (possibly expired), -// which are not supposed to be forwarded any further. -// The whisper protocol is agnostic of the format and contents of envelope. -func (w *GethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error { - return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout) -} - func (w *GethWakuWrapper) ProcessingP2PMessages() bool { return w.waku.ProcessingP2PMessages() } @@ -293,7 +272,7 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { w.waku.MarkP2PMessageAsProcessed(hash) } -func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) { +func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) { return nil, 0, errors.New("not implemented") } @@ -339,3 +318,7 @@ func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) { func (w *GethWakuWrapper) PeerID() peer.ID { panic("not implemented") } + +func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) { + return 0, errors.New("not available in WakuV1") +} diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 6ff134a46..750790be4 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -3,7 +3,6 @@ package gethbridge import ( "context" "crypto/ecdsa" - "errors" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -174,21 +173,8 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat }, id), nil } -// DEPRECATED: Not used in waku V2 -func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error { - return errors.New("DEPRECATED") -} - -func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerIDBytes []byte, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) { - var options []store.RequestOption - - var peerID peer.ID - err := peerID.Unmarshal(peerIDBytes) - if err != nil { - return nil, 0, err - } - - options = []store.RequestOption{ +func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) { + options := []store.RequestOption{ store.WithPaging(false, uint64(r.Limit)), } @@ -220,11 +206,6 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerIDByte return nil, envelopesCount, nil } -// DEPRECATED: Not used in waku V2 -func (w *gethWakuV2Wrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error { - return errors.New("DEPRECATED") -} - func (w *gethWakuV2Wrapper) StartDiscV5() error { return w.waku.StartDiscV5() } @@ -361,3 +342,7 @@ func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) { func (w *gethWakuV2Wrapper) PeerID() peer.ID { return w.waku.PeerID() } + +func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + return w.waku.PingPeer(ctx, peerID) +} diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index 32fe8af98..1900505f3 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -176,19 +176,8 @@ type Waku interface { Unsubscribe(ctx context.Context, id string) error UnsubscribeMany(ids []string) error - // RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, - // which is known to implement MailServer interface, and is supposed to process this - // request and respond with a number of peer-to-peer messages (possibly expired), - // which are not supposed to be forwarded any further. - // The whisper protocol is agnostic of the format and contents of envelope. - // A timeout of 0 never expires. - RequestHistoricMessagesWithTimeout(peerID []byte, envelope Envelope, timeout time.Duration) error - // SendMessagesRequest sends a MessagesRequest. This is an equivalent to RequestHistoricMessages - // in terms of the functionality. - SendMessagesRequest(peerID []byte, request MessagesRequest) error - // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages - RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error) + RequestStoreMessages(ctx context.Context, peerID peer.ID, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error) // ProcessingP2PMessages indicates whether there are in-flight p2p messages ProcessingP2PMessages() bool @@ -210,4 +199,7 @@ type Waku interface { // PeerID returns node's PeerID PeerID() peer.ID + + // PingPeer returns the reply time + PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) } diff --git a/eth-node/types/whisper.go b/eth-node/types/whisper.go index 3cba905e6..b4d69651e 100644 --- a/eth-node/types/whisper.go +++ b/eth-node/types/whisper.go @@ -44,16 +44,6 @@ type Whisper interface { Unsubscribe(id string) error UnsubscribeMany(ids []string) error - // RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, - // which is known to implement MailServer interface, and is supposed to process this - // request and respond with a number of peer-to-peer messages (possibly expired), - // which are not supposed to be forwarded any further. - // The whisper protocol is agnostic of the format and contents of envelope. - // A timeout of 0 never expires. - RequestHistoricMessagesWithTimeout(peerID []byte, envelope Envelope, timeout time.Duration) error - // SendMessagesRequest sends a MessagesRequest. This is an equivalent to RequestHistoricMessages - // in terms of the functionality. - SendMessagesRequest(peerID []byte, request MessagesRequest) error // SyncMessages can be sent between two Mail Servers and syncs envelopes between them. SyncMessages(peerID []byte, req SyncMailRequest) error } diff --git a/protocol/messenger.go b/protocol/messenger.go index 86fcec734..53a833513 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -25,7 +25,6 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/account" @@ -203,7 +202,6 @@ type connStatus int const ( disconnected connStatus = iota + 1 - connecting connected ) @@ -218,8 +216,6 @@ type mailserverCycle struct { allMailservers []mailserversDB.Mailserver activeMailserver *mailserversDB.Mailserver peers map[string]peerStatus - events chan *p2p.PeerEvent - subscription event.Subscription availabilitySubscriptions []chan struct{} } diff --git a/protocol/messenger_config_test.go b/protocol/messenger_config_test.go index 2828322dd..5bdaf5d81 100644 --- a/protocol/messenger_config_test.go +++ b/protocol/messenger_config_test.go @@ -18,10 +18,10 @@ func WithTestStoreNode(s *suite.Suite, id string, address multiaddr.Multiaddr, f db := mailservers.NewDB(sqldb) err = db.Add(mailservers.Mailserver{ - ID: id, - Name: id, - Address: address.String(), - Fleet: fleet, + ID: id, + Name: id, + Addr: &address, + Fleet: fleet, }) s.Require().NoError(err) diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 07cbc5d81..036317e20 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" @@ -114,7 +115,7 @@ func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func var tries uint = 0 for tries < mailserverMaxTries { if !m.communityStorenodes.IsCommunityStoreNode(ms.ID) && !m.isMailserverAvailable(ms.ID) { - return nil, errors.New("mailserver not available") + return nil, errors.New("storenode not available") } m.logger.Info("trying performing mailserver requests", zap.Uint("try", tries), zap.String("mailserverID", ms.ID)) @@ -348,6 +349,10 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries return nil, err } + if m.mailserversDatabase == nil { + return nil, nil + } + if forceFetchingBackup || !backupFetched { m.logger.Info("fetching backup") err := m.syncBackup() @@ -714,31 +719,29 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag type work struct { pubsubTopic string contentTopics []types.TopicType - cursor []byte - storeCursor types.StoreRequestCursor + cursor types.StoreRequestCursor limit uint32 } type messageRequester interface { SendMessagesRequestForTopics( ctx context.Context, - peerID []byte, + peerID peer.ID, from, to uint32, - previousCursor []byte, previousStoreCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, - ) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) + ) (cursor types.StoreRequestCursor, envelopesCount int, err error) } func processMailserverBatch( ctx context.Context, messageRequester messageRequester, batch MailserverBatch, - mailserverID []byte, + storenodeID peer.ID, logger *zap.Logger, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), @@ -834,8 +837,7 @@ loop: }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) - cursor, storeCursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) - + cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) queryCancel() if err != nil { @@ -857,7 +859,7 @@ loop: // Check the cursor after calling `shouldProcessNextPage`. // The app might use process the fetched envelopes in the callback for own needs. - if len(cursor) == 0 && storeCursor == nil { + if cursor == nil { return } @@ -868,7 +870,6 @@ loop: pubsubTopic: w.pubsubTopic, contentTopics: w.contentTopics, cursor: cursor, - storeCursor: storeCursor, limit: nextPageLimit, } }(w) @@ -917,7 +918,7 @@ func (m *Messenger) processMailserverBatch(ms mailservers.Mailserver, batch Mail return nil } - mailserverID, err := ms.IDBytes() + mailserverID, err := ms.PeerID() if err != nil { return err } @@ -934,7 +935,7 @@ func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, return nil } - mailserverID, err := ms.IDBytes() + mailserverID, err := ms.PeerID() if err != nil { return err } diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index 1b9bad27e..0ab853a71 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -3,22 +3,18 @@ package protocol import ( "context" "crypto/rand" - "fmt" "math" "math/big" "net" "runtime" "sort" - "strings" "sync" "time" - "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" "go.uber.org/zap" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/storenodes" @@ -34,10 +30,6 @@ const findNearestMailServer = !isAndroidEmulator const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios" const bootstrapDNS = "8.8.8.8:53" -func (m *Messenger) mailserversByFleet(fleet string) []mailservers.Mailserver { - return mailservers.DefaultMailserversByFleet(fleet) -} - type byRTTMsAndCanConnectBefore []SortedMailserver func (s byRTTMsAndCanConnectBefore) Len() int { @@ -52,49 +44,40 @@ func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool { // Slightly inaccurate as time sensitive sorting, but it does not matter so much now := time.Now() if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) { - return s[i].RTTMs < s[j].RTTMs + return s[i].RTT < s[j].RTT } return s[i].CanConnectAfter.Before(s[j].CanConnectAfter) } func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error { + if m.transport.WakuVersion() != 2 { + m.logger.Warn("not starting mailserver cycle: requires wakuv2") + return nil + } + m.mailserverCycle.allMailservers = mailservers - version := m.transport.WakuVersion() - - switch version { - case 1: - if m.server == nil { - m.logger.Warn("not starting mailserver cycle: no p2p server is set") - return nil - } - - m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20) - m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events) - go m.updateWakuV1PeerStatus() - - case 2: - if len(mailservers) == 0 { - m.logger.Warn("not starting mailserver cycle: empty mailservers list") - return nil - } - for _, storenode := range mailservers { - maddr, err := multiaddr.NewMultiaddr(storenode.Address) - if err != nil { - return err - } - - _, err = m.AddStorePeer(maddr) - if err != nil { - return err - } - } - go m.verifyStorenodeStatus() - - default: - return fmt.Errorf("unsupported waku version: %d", version) + if len(mailservers) == 0 { + m.logger.Warn("not starting mailserver cycle: empty mailservers list") + return nil } + for _, storenode := range mailservers { + + peerInfo, err := storenode.PeerInfo() + if err != nil { + return err + } + + for _, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { + _, err := m.transport.AddStorePeer(addr) + if err != nil { + return err + } + } + } + go m.verifyStorenodeStatus() + m.logger.Debug("starting mailserver cycle", zap.Uint("WakuVersion", m.transport.WakuVersion()), zap.Any("mailservers", mailservers), @@ -131,16 +114,6 @@ func (m *Messenger) disconnectMailserver(backoffDuration time.Duration) error { } m.mailPeersMutex.Unlock() - // WakuV2 does not keep an active storenode connection - - if m.mailserverCycle.activeMailserver.Version == 1 { - node, err := m.mailserverCycle.activeMailserver.Enode() - if err != nil { - return err - } - m.server.RemovePeer(node) - } - m.mailserverCycle.activeMailserver = nil return nil } @@ -150,7 +123,7 @@ func (m *Messenger) disconnectActiveMailserver(backoffDuration time.Duration) { if err != nil { m.logger.Error("failed to disconnect mailserver", zap.Error(err)) } - signal.SendMailserverChanged("", "") + signal.SendMailserverChanged(nil) } func (m *Messenger) cycleMailservers() { @@ -192,7 +165,7 @@ func (m *Messenger) getFleet() (string, error) { } else if m.config.clusterConfig.Fleet != "" { fleet = m.config.clusterConfig.Fleet } else { - fleet = params.FleetProd + fleet = params.FleetStatusProd } return fleet, nil } @@ -205,7 +178,7 @@ func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) { } // Get default mailservers for given fleet - allMailservers := m.mailserversByFleet(fleet) + allMailservers := mailservers.DefaultMailserversByFleet(fleet) // Add custom configured mailservers if m.mailserversDatabase != nil { @@ -216,33 +189,85 @@ func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) { for _, c := range customMailservers { if c.Fleet == fleet { - c.Version = m.transport.WakuVersion() allMailservers = append(allMailservers, c) } } } - // Filter mailservers by configured waku version - wakuVersion := m.transport.WakuVersion() - matchingMailservers := make([]mailservers.Mailserver, 0, len(allMailservers)) - - for _, ms := range allMailservers { - if ms.Version == wakuVersion { - matchingMailservers = append(matchingMailservers, ms) - } - } - - return matchingMailservers, nil + return allMailservers, nil } type SortedMailserver struct { - Address string - RTTMs int + Mailserver mailservers.Mailserver + RTT time.Duration CanConnectAfter time.Time } -func (m *Messenger) findNewMailserver() error { +func (m *Messenger) getAvailableMailserversSortedByRTT(allMailservers []mailservers.Mailserver) []mailservers.Mailserver { + // TODO: this can be replaced by peer selector once code is moved to go-waku api + availableMailservers := make(map[string]time.Duration) + availableMailserversMutex := sync.Mutex{} + availableMailserversWg := sync.WaitGroup{} + for _, mailserver := range allMailservers { + availableMailserversWg.Add(1) + go func(mailserver mailservers.Mailserver) { + defer availableMailserversWg.Done() + peerID, err := mailserver.PeerID() + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(m.ctx, 4*time.Second) + defer cancel() + + rtt, err := m.transport.PingPeer(ctx, peerID) + if err == nil { // pinging mailservers might fail, but we don't care + availableMailserversMutex.Lock() + availableMailservers[mailserver.ID] = rtt + availableMailserversMutex.Unlock() + } + }(mailserver) + } + availableMailserversWg.Wait() + + if len(availableMailservers) == 0 { + m.logger.Warn("No mailservers available") // Do nothing... + return nil + } + + mailserversByID := make(map[string]mailservers.Mailserver) + for idx := range allMailservers { + mailserversByID[allMailservers[idx].ID] = allMailservers[idx] + } + var sortedMailservers []SortedMailserver + for mailserverID, rtt := range availableMailservers { + ms := mailserversByID[mailserverID] + sortedMailserver := SortedMailserver{ + Mailserver: ms, + RTT: rtt, + } + m.mailPeersMutex.Lock() + pInfo, ok := m.mailserverCycle.peers[ms.ID] + m.mailPeersMutex.Unlock() + if ok { + if time.Now().Before(pInfo.canConnectAfter) { + continue // We can't connect to this node yet + } + } + sortedMailservers = append(sortedMailservers, sortedMailserver) + } + sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers)) + + result := make([]mailservers.Mailserver, len(sortedMailservers)) + for i, s := range sortedMailservers { + result[i] = s.Mailserver + } + + return result +} + +func (m *Messenger) findNewMailserver() error { // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 if overrideDNS { var dialer net.Dialer @@ -267,108 +292,22 @@ func (m *Messenger) findNewMailserver() error { return m.connectToMailserver(*pinnedMailserver) } + m.logger.Info("Finding a new mailserver...") + allMailservers := m.mailserverCycle.allMailservers // TODO: remove this check once sockets are stable on x86_64 emulators if findNearestMailServer { - m.logger.Info("Finding a new mailserver...") - - var mailserverStr []string - for _, m := range allMailservers { - mailserverStr = append(mailserverStr, m.Address) - } - - if len(allMailservers) == 0 { - m.logger.Warn("no mailservers available") // Do nothing... - return nil - - } - - var parseFn func(string) (string, error) - if allMailservers[0].Version == 2 { - parseFn = mailservers.MultiAddressToAddress - } else { - parseFn = mailservers.EnodeStringToAddr - } - - pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, parseFn) - if err != nil { - // pinging mailservers might fail, but we don't care - m.logger.Warn("mailservers.DoPing failed with", zap.Error(err)) - } - - var availableMailservers []*mailservers.PingResult - for _, result := range pingResult { - if result.Err != nil { - m.logger.Info("connecting error", zap.String("err", *result.Err)) - continue // The results with error are ignored - } - availableMailservers = append(availableMailservers, result) - } - - if len(availableMailservers) == 0 { - m.logger.Warn("No mailservers available") // Do nothing... - return nil - } - - mailserversByAddress := make(map[string]mailservers.Mailserver) - for idx := range allMailservers { - mailserversByAddress[allMailservers[idx].Address] = allMailservers[idx] - } - var sortedMailservers []SortedMailserver - for _, ping := range availableMailservers { - address := ping.Address - ms := mailserversByAddress[address] - sortedMailserver := SortedMailserver{ - Address: address, - RTTMs: *ping.RTTMs, - } - m.mailPeersMutex.Lock() - pInfo, ok := m.mailserverCycle.peers[ms.ID] - m.mailPeersMutex.Unlock() - if ok { - if time.Now().Before(pInfo.canConnectAfter) { - continue // We can't connect to this node yet - } - } - - sortedMailservers = append(sortedMailservers, sortedMailserver) - - } - sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers)) - - // Picks a random mailserver amongs the ones with the lowest latency - // The pool size is 1/4 of the mailservers were pinged successfully - pSize := poolSize(len(sortedMailservers) - 1) - if pSize <= 0 { - pSize = len(sortedMailservers) - if pSize <= 0 { - m.logger.Warn("No mailservers available") // Do nothing... - return nil - } - } - - r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize))) - if err != nil { - return err - } - - msPing := sortedMailservers[r.Int64()] - ms := mailserversByAddress[msPing.Address] - m.logger.Info("connecting to mailserver", zap.String("address", ms.Address)) - return m.connectToMailserver(ms) - } - - mailserversByAddress := make(map[string]mailservers.Mailserver) - for idx := range allMailservers { - mailserversByAddress[allMailservers[idx].Address] = allMailservers[idx] + allMailservers = m.getAvailableMailserversSortedByRTT(allMailservers) } + // Picks a random mailserver amongs the ones with the lowest latency + // The pool size is 1/4 of the mailservers were pinged successfully pSize := poolSize(len(allMailservers) - 1) if pSize <= 0 { pSize = len(allMailservers) if pSize <= 0 { - m.logger.Warn("No mailservers available") // Do nothing... + m.logger.Warn("No storenodes available") // Do nothing... return nil } } @@ -378,11 +317,8 @@ func (m *Messenger) findNewMailserver() error { return err } - msPing := allMailservers[r.Int64()] - ms := mailserversByAddress[msPing.Address] - m.logger.Info("connecting to mailserver", zap.String("address", ms.Address)) + ms := allMailservers[r.Int64()] return m.connectToMailserver(ms) - } func (m *Messenger) mailserverStatus(mailserverID string) connStatus { @@ -397,64 +333,37 @@ func (m *Messenger) mailserverStatus(mailserverID string) connStatus { func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error { - m.logger.Info("connecting to mailserver", zap.Any("peer", ms.ID)) + m.logger.Info("connecting to mailserver", zap.String("mailserverID", ms.ID)) m.mailserverCycle.activeMailserver = &ms - signal.SendMailserverChanged(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID) - - // Adding a peer and marking it as connected can't be executed sync in WakuV1, because - // There's a delay between requesting a peer being added, and a signal being - // received after the peer was added. So we first set the peer status as - // Connecting and once a peerConnected signal is received, we mark it as - // Connected - activeMailserverStatus := m.mailserverStatus(ms.ID) - if ms.Version != m.transport.WakuVersion() { - return errors.New("mailserver waku version doesn't match") - } - - if activeMailserverStatus != connected { - // WakuV2 does not require having the peer connected to query the peer - - // Attempt to connect to mailserver by adding it as a peer - if ms.Version == 1 { - node, err := ms.Enode() - if err != nil { - return err - } - m.server.AddPeer(node) - if err := m.peerStore.Update([]*enode.Node{node}); err != nil { - return err - } - } - - connectionStatus := connecting - if ms.Version == 2 { - connectionStatus = connected - } + signal.SendMailserverChanged(m.mailserverCycle.activeMailserver) + mailserverStatus := m.mailserverStatus(ms.ID) + if mailserverStatus != connected { m.mailPeersMutex.Lock() m.mailserverCycle.peers[ms.ID] = peerStatus{ - status: connectionStatus, + status: connected, lastConnectionAttempt: time.Now(), canConnectAfter: time.Now().Add(defaultBackoff), mailserver: ms, } m.mailPeersMutex.Unlock() - if ms.Version == 2 { - m.mailserverCycle.activeMailserver.FailedRequests = 0 - m.logger.Info("mailserver available", zap.String("address", m.mailserverCycle.activeMailserver.UniqueID())) - m.EmitMailserverAvailable() - signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID) - peerID, err := m.mailserverCycle.activeMailserver.PeerID() - if err != nil { - m.logger.Error("could not decode the peer id of mailserver", zap.Error(err)) - } - m.transport.SetStorePeerID(peerID) - - // Query mailserver - m.asyncRequestAllHistoricMessages() + m.mailserverCycle.activeMailserver.FailedRequests = 0 + peerID, err := m.mailserverCycle.activeMailserver.PeerID() + if err != nil { + m.logger.Error("could not decode the peer id of mailserver", zap.Error(err)) + return err } + + m.logger.Info("mailserver available", zap.String("mailserverID", m.mailserverCycle.activeMailserver.ID)) + m.EmitMailserverAvailable() + signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver) + + m.transport.SetStorePeerID(peerID) + + // Query mailserver + m.asyncRequestAllHistoricMessages() } return nil } @@ -465,7 +374,7 @@ func (m *Messenger) getActiveMailserver(communityID ...string) *mailservers.Mail if len(communityID) == 0 || communityID[0] == "" { return m.mailserverCycle.activeMailserver } - ms, err := m.communityStorenodes.GetStorenodeByCommunnityID(communityID[0]) + ms, err := m.communityStorenodes.GetStorenodeByCommunityID(communityID[0]) if err != nil { if !errors.Is(err, storenodes.ErrNotFound) { m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err)) @@ -488,33 +397,6 @@ func (m *Messenger) isMailserverAvailable(mailserverID string) bool { return m.mailserverStatus(mailserverID) == connected } -func mailserverAddressToID(uniqueID string, allMailservers []mailservers.Mailserver) (string, error) { - for _, ms := range allMailservers { - if uniqueID == ms.UniqueID() { - return ms.ID, nil - } - - } - - return "", nil -} - -type ConnectedPeer struct { - UniqueID string -} - -func (m *Messenger) mailserverPeersInfo() []ConnectedPeer { - var connectedPeers []ConnectedPeer - for _, connectedPeer := range m.server.PeersInfo() { - connectedPeers = append(connectedPeers, ConnectedPeer{ - // This is a bit fragile, but should work - UniqueID: strings.TrimSuffix(connectedPeer.Enode, "?discport=0"), - }) - } - - return connectedPeers -} - func (m *Messenger) penalizeMailserver(id string) { m.mailPeersMutex.Lock() defer m.mailPeersMutex.Unlock() @@ -527,119 +409,6 @@ func (m *Messenger) penalizeMailserver(id string) { m.mailserverCycle.peers[id] = pInfo } -// handleMailserverCycleEvent runs every 1 second or when updating peers to keep the data of the active mailserver updated -func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) error { - m.logger.Debug("mailserver cycle event", - zap.Any("connected", connectedPeers), - zap.Any("peer-info", m.mailserverCycle.peers)) - - m.mailPeersMutex.Lock() - - for pID, pInfo := range m.mailserverCycle.peers { - if pInfo.status == disconnected { - continue - } - - // Removing disconnected - - found := false - for _, connectedPeer := range connectedPeers { - id, err := mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers) - if err != nil { - m.logger.Error("failed to convert id to hex", zap.Error(err)) - return err - } - - if pID == id { - found = true - break - } - } - if !found && (pInfo.status == connected || (pInfo.status == connecting && pInfo.lastConnectionAttempt.Add(8*time.Second).Before(time.Now()))) { - m.logger.Info("peer disconnected", zap.String("peer", pID)) - pInfo.status = disconnected - pInfo.canConnectAfter = time.Now().Add(defaultBackoff) - } - - m.mailserverCycle.peers[pID] = pInfo - } - m.mailPeersMutex.Unlock() - - // Only evaluate connected peers once a mailserver has been set - // otherwise, we would attempt to retrieve history and end up with a mailserver - // not available error - if m.mailserverCycle.activeMailserver != nil { - for _, connectedPeer := range connectedPeers { - id, err := mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers) - if err != nil { - m.logger.Error("failed to convert id to hex", zap.Error(err)) - return err - } - if id == "" { - continue - } - - m.mailPeersMutex.Lock() - pInfo, ok := m.mailserverCycle.peers[id] - if !ok || pInfo.status != connected { - m.logger.Info("peer connected", zap.String("peer", connectedPeer.UniqueID)) - pInfo.status = connected - if pInfo.canConnectAfter.Before(time.Now()) { - pInfo.canConnectAfter = time.Now().Add(defaultBackoff) - } - m.mailserverCycle.peers[id] = pInfo - m.mailPeersMutex.Unlock() - - if id == m.mailserverCycle.activeMailserver.ID { - m.mailserverCycle.activeMailserver.FailedRequests = 0 - m.logger.Info("mailserver available", zap.String("address", connectedPeer.UniqueID)) - m.EmitMailserverAvailable() - signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID) - } - // Query mailserver - if m.config.codeControlFlags.AutoRequestHistoricMessages { - m.asyncRequestAllHistoricMessages() - } - } else { - m.mailPeersMutex.Unlock() - } - } - } - - // Check whether we want to disconnect the mailserver - if m.mailserverCycle.activeMailserver != nil { - if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests { - m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID) - signal.SendMailserverNotWorking() - m.logger.Info("connecting too many failed requests") - m.mailserverCycle.activeMailserver.FailedRequests = 0 - - return m.connectToNewMailserverAndWait() - } - - m.mailPeersMutex.Lock() - pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] - m.mailPeersMutex.Unlock() - - if ok { - if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(20*time.Second).Before(time.Now()) { - m.logger.Info("penalizing mailserver & disconnecting connecting", zap.String("id", m.mailserverCycle.activeMailserver.ID)) - - signal.SendMailserverNotWorking() - m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID) - m.disconnectActiveMailserver(graylistBackoff) - } - } - - } else { - m.cycleMailservers() - } - - m.logger.Debug("updated-peers", zap.Any("peers", m.mailserverCycle.peers)) - - return nil -} - func (m *Messenger) asyncRequestAllHistoricMessages() { if !m.config.codeControlFlags.AutoRequestHistoricMessages { return @@ -655,47 +424,6 @@ func (m *Messenger) asyncRequestAllHistoricMessages() { }() } -func (m *Messenger) updateWakuV1PeerStatus() { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := m.handleMailserverCycleEvent(m.mailserverPeersInfo()) - if err != nil { - m.logger.Error("failed to handle mailserver cycle event", zap.Error(err)) - continue - } - - ms := m.getActiveMailserver() - if ms != nil { - node, err := ms.Enode() - if err != nil { - m.logger.Error("failed to parse enode", zap.Error(err)) - continue - } - m.server.AddPeer(node) - if err := m.peerStore.Update([]*enode.Node{node}); err != nil { - m.logger.Error("failed to update peers", zap.Error(err)) - continue - } - } - - case <-m.mailserverCycle.events: - err := m.handleMailserverCycleEvent(m.mailserverPeersInfo()) - if err != nil { - m.logger.Error("failed to handle mailserver cycle event", zap.Error(err)) - return - } - case <-m.quit: - close(m.mailserverCycle.events) - m.mailserverCycle.subscription.Unsubscribe() - return - } - } -} - func (m *Messenger) verifyStorenodeStatus() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -747,7 +475,6 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) { for _, c := range customMailservers { if c.Fleet == fleet && c.ID == pinnedMailserver { - c.Version = m.transport.WakuVersion() return &c, nil } } @@ -784,7 +511,7 @@ func (m *Messenger) disconnectStorenodeIfRequired() error { if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests { m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID) signal.SendMailserverNotWorking() - m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.UniqueID())) + m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.ID)) m.mailserverCycle.activeMailserver.FailedRequests = 0 return m.connectToNewMailserverAndWait() } diff --git a/protocol/messenger_mailserver_processMailserverBatch_test.go b/protocol/messenger_mailserver_processMailserverBatch_test.go index c881d81b5..71ea759a4 100644 --- a/protocol/messenger_mailserver_processMailserverBatch_test.go +++ b/protocol/messenger_mailserver_processMailserverBatch_test.go @@ -7,8 +7,10 @@ import ( "errors" "math/big" "testing" + "time" "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "github.com/status-im/status-go/eth-node/types" @@ -37,24 +39,23 @@ func getInitialResponseKey(topics []types.TopicType) string { func (t *mockTransport) SendMessagesRequestForTopics( ctx context.Context, - peerID []byte, + peerID peer.ID, from, to uint32, - previousCursor []byte, - previousStoreCursor types.StoreRequestCursor, + prevCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) { +) (cursor types.StoreRequestCursor, envelopesCount int, err error) { var response queryResponse - if previousCursor == nil { + if prevCursor == nil { initialResponse := getInitialResponseKey(contentTopics) response = t.queryResponses[initialResponse] } else { - response = t.queryResponses[hex.EncodeToString(previousCursor)] + response = t.queryResponses[hex.EncodeToString(prevCursor)] } - return response.cursor, nil, 0, response.err + return response.cursor, 0, response.err } func (t *mockTransport) Populate(topics []types.TopicType, responses int, includeRandomError bool) error { @@ -116,43 +117,51 @@ func (t *mockTransport) Populate(topics []types.TopicType, responses int, includ } func TestProcessMailserverBatchHappyPath(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + logger := tt.MustCreateTestLogger() - mailserverID := []byte{1, 2, 3, 4, 5} + mailserverID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) topics := []types.TopicType{} for i := 0; i < 22; i++ { topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)})) } testTransport := newMockTransport() - err := testTransport.Populate(topics, 10, false) + err = testTransport.Populate(topics, 10, false) require.NoError(t, err) testBatch := MailserverBatch{ Topics: topics, } - err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) + err = processMailserverBatch(ctx, testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) require.NoError(t, err) } func TestProcessMailserverBatchFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + logger := tt.MustCreateTestLogger() - mailserverID := []byte{1, 2, 3, 4, 5} + mailserverID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3") + require.NoError(t, err) topics := []types.TopicType{} for i := 0; i < 5; i++ { topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)})) } testTransport := newMockTransport() - err := testTransport.Populate(topics, 4, true) + err = testTransport.Populate(topics, 4, true) require.NoError(t, err) testBatch := MailserverBatch{ Topics: topics, } - err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) + err = processMailserverBatch(ctx, testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false) require.Error(t, err) } diff --git a/protocol/messenger_storenode_comunity_test.go b/protocol/messenger_storenode_comunity_test.go index d2df04876..746951413 100644 --- a/protocol/messenger_storenode_comunity_test.go +++ b/protocol/messenger_storenode_comunity_test.go @@ -122,16 +122,12 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAd err = sqlite.Migrate(mailserversSQLDb) // migrate default s.Require().NoError(err) - var sAddr string - if storenodeAddress != nil { - sAddr = (*storenodeAddress).String() - } mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) err = mailserversDatabase.Add(mailserversDB.Mailserver{ - ID: localMailserverID, - Name: localMailserverID, - Address: sAddr, - Fleet: localFleet, + ID: localMailserverID, + Name: localMailserverID, + Addr: storenodeAddress, + Fleet: localFleet, }) s.Require().NoError(err) diff --git a/protocol/storenodes/models.go b/protocol/storenodes/models.go index 66ce0f1c1..99f5bfc9a 100644 --- a/protocol/storenodes/models.go +++ b/protocol/storenodes/models.go @@ -65,11 +65,10 @@ func FromProtobuf(storenodes []*protobuf.Storenode, clock uint64) Storenodes { func toMailserver(m Storenode) mailservers.Mailserver { return mailservers.Mailserver{ - ID: m.StorenodeID, - Name: m.Name, - Custom: true, - Address: m.Address.String(), - Fleet: m.Fleet, - Version: m.Version, + ID: m.StorenodeID, + Name: m.Name, + Custom: true, + Addr: &m.Address, + Fleet: m.Fleet, } } diff --git a/protocol/storenodes/storenodes.go b/protocol/storenodes/storenodes.go index db4c1bbde..66a6f4bc3 100644 --- a/protocol/storenodes/storenodes.go +++ b/protocol/storenodes/storenodes.go @@ -39,8 +39,8 @@ type storenodesData struct { storenodes []Storenode } -// GetStorenodeByCommunnityID returns the active storenode for a community -func (m *CommunityStorenodes) GetStorenodeByCommunnityID(communityID string) (mailservers.Mailserver, error) { +// GetStorenodeByCommunityID returns the active storenode for a community +func (m *CommunityStorenodes) GetStorenodeByCommunityID(communityID string) (mailservers.Mailserver, error) { m.storenodesByCommunityIDMutex.RLock() defer m.storenodesByCommunityIDMutex.RUnlock() diff --git a/protocol/storenodes/storenodes_test.go b/protocol/storenodes/storenodes_test.go index 6709f3048..21ee3629e 100644 --- a/protocol/storenodes/storenodes_test.go +++ b/protocol/storenodes/storenodes_test.go @@ -65,11 +65,11 @@ func TestUpdateStorenodesInDB(t *testing.T) { require.NoError(t, err) // check if storenodes are loaded - ms1, err := csn.GetStorenodeByCommunnityID(communityID1.String()) + ms1, err := csn.GetStorenodeByCommunityID(communityID1.String()) require.NoError(t, err) matchStoreNode(t, snodes1[0], ms1) - ms2, err := csn.GetStorenodeByCommunnityID(communityID2.String()) + ms2, err := csn.GetStorenodeByCommunityID(communityID2.String()) require.NoError(t, err) matchStoreNode(t, snodes2[0], ms2) } @@ -77,7 +77,6 @@ func TestUpdateStorenodesInDB(t *testing.T) { func matchStoreNode(t *testing.T, sn Storenode, ms mailservers.Mailserver) { require.Equal(t, sn.StorenodeID, ms.ID) require.Equal(t, sn.Name, ms.Name) - require.Equal(t, sn.Address.String(), ms.Address) + require.Equal(t, sn.Address.String(), (*ms.Addr).String()) require.Equal(t, sn.Fleet, ms.Fleet) - require.Equal(t, sn.Version, ms.Version) } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 3fc43b387..2d873a175 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -1,12 +1,10 @@ package transport import ( - "bytes" "context" "crypto/ecdsa" "database/sql" "encoding/hex" - "fmt" "sync" "time" @@ -18,7 +16,6 @@ import ( "golang.org/x/exp/maps" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/crypto" @@ -463,43 +460,9 @@ func (t *Transport) Peers() types.PeerStats { return t.waku.Peers() } -func (t *Transport) createMessagesRequestV1( +func (t *Transport) createMessagesRequest( ctx context.Context, - peerID []byte, - from, to uint32, - previousCursor []byte, - topics []types.TopicType, - waitForResponse bool, -) (cursor []byte, err error) { - r := createMessagesRequest(from, to, previousCursor, nil, "", topics, 1000) - - events := make(chan types.EnvelopeEvent, 10) - sub := t.waku.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - - err = t.waku.SendMessagesRequest(peerID, r) - if err != nil { - return - } - - if !waitForResponse { - return - } - - var resp *types.MailServerResponse - resp, err = t.waitForRequestCompleted(ctx, r.ID, events) - if err == nil && resp != nil && resp.Error != nil { - err = resp.Error - } else if err == nil && resp != nil { - cursor = resp.Cursor - } - - return -} - -func (t *Transport) createMessagesRequestV2( - ctx context.Context, - peerID []byte, + peerID peer.ID, from, to uint32, previousStoreCursor types.StoreRequestCursor, pubsubTopic string, @@ -546,25 +509,16 @@ func (t *Transport) createMessagesRequestV2( func (t *Transport) SendMessagesRequestForTopics( ctx context.Context, - peerID []byte, + peerID peer.ID, from, to uint32, - previousCursor []byte, - previousStoreCursor types.StoreRequestCursor, + prevCursor types.StoreRequestCursor, pubsubTopic string, contentTopics []types.TopicType, limit uint32, waitForResponse bool, processEnvelopes bool, -) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) { - switch t.waku.Version() { - case 2: - storeCursor, envelopesCount, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes) - case 1: - cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, contentTopics, waitForResponse) - default: - err = fmt.Errorf("unsupported version %d", t.waku.Version()) - } - return +) (cursor types.StoreRequestCursor, envelopesCount int, err error) { + return t.createMessagesRequest(ctx, peerID, from, to, prevCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes) } func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest { @@ -587,26 +541,6 @@ func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.Sto } } -func (t *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) { - for { - select { - case ev := <-events: - if !bytes.Equal(ev.Hash.Bytes(), requestID) { - continue - } - if ev.Event != types.EventMailServerRequestCompleted { - continue - } - data, ok := ev.Data.(*types.MailServerResponse) - if ok { - return data, nil - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - // ConfirmMessagesProcessed marks the messages as processed in the cache so // they won't be passed to the next layer anymore func (t *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error { @@ -697,6 +631,10 @@ func (t *Transport) ConnectionChanged(state connection.State) { t.waku.ConnectionChanged(state) } +func (t *Transport) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + return t.waku.PingPeer(ctx, peerID) +} + // Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { if t.waku.Version() == 2 { diff --git a/services/mailservers/api_test.go b/services/mailservers/api_test.go index 503eaf26c..c0d847b98 100644 --- a/services/mailservers/api_test.go +++ b/services/mailservers/api_test.go @@ -27,11 +27,11 @@ func TestAddGetDeleteMailserver(t *testing.T) { defer close() api := &API{db: db} testMailserver := Mailserver{ - ID: "mailserver001", - Name: "My Mailserver", - Address: "enode://...", - Custom: true, - Fleet: "prod", + ID: "mailserver001", + Name: "My Mailserver", + Addr: MustDecodeMultiaddress("/dns4/node-01.do-ams3.waku.test.status.im/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W"), + Custom: true, + Fleet: "prod", } testMailserverWithPassword := testMailserver testMailserverWithPassword.ID = "mailserver002" diff --git a/services/mailservers/database.go b/services/mailservers/database.go index 7be2ba1ab..a52646a62 100644 --- a/services/mailservers/database.go +++ b/services/mailservers/database.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/utils" "github.com/ethereum/go-ethereum/p2p/enode" @@ -19,57 +20,69 @@ import ( "github.com/status-im/status-go/protocol/transport" ) +func MustDecodeENR(enrStr string) *enode.Node { + node, err := enode.Parse(enode.ValidSchemes, enrStr) + if err != nil || node == nil { + panic("could not decode enr: " + enrStr) + } + return node +} + +func MustDecodeMultiaddress(multiaddrsStr string) *multiaddr.Multiaddr { + maddr, err := multiaddr.NewMultiaddr(multiaddrsStr) + if err != nil || maddr == nil { + panic("could not decode multiaddr: " + multiaddrsStr) + } + return &maddr +} + type Mailserver struct { - ID string `json:"id"` - Name string `json:"name"` - Custom bool `json:"custom"` - Address string `json:"address"` + ID string `json:"id"` + Name string `json:"name"` + Custom bool `json:"custom"` + ENR *enode.Node `json:"enr"` + Addr *multiaddr.Multiaddr `json:"addr"` + + // Deprecated: only used with WakuV1 Password string `json:"password,omitempty"` Fleet string `json:"fleet"` - Version uint `json:"version"` FailedRequests uint `json:"-"` } -func (m Mailserver) Enode() (*enode.Node, error) { - return enode.ParseV4(m.Address) -} +func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) { + var maddrs []multiaddr.Multiaddr -func (m Mailserver) IDBytes() ([]byte, error) { - if m.Version == 2 { - p, err := m.PeerID() + if m.ENR != nil { + addrInfo, err := enr.EnodeToPeerInfo(m.ENR) if err != nil { return nil, err } - - return p.Marshal() + addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...) + maddrs = append(maddrs, addrInfo.Addrs...) } - node, err := enode.ParseV4(m.Address) + if m.Addr != nil { + maddrs = append(maddrs, *m.Addr) + } + + p, err := peer.AddrInfosFromP2pAddrs(maddrs...) if err != nil { return nil, err } - return node.ID().Bytes(), nil + + if len(p) != 1 { + return nil, errors.New("invalid mailserver setup") + } + + return &p[0], nil } func (m Mailserver) PeerID() (peer.ID, error) { - if m.Version != 2 { - return "", errors.New("not available") - } - - addr, err := multiaddr.NewMultiaddr(m.Address) + p, err := m.PeerInfo() if err != nil { return "", err } - - return utils.GetPeerID(addr) -} - -func (m Mailserver) UniqueID() string { - if m.Version == 2 { - p, _ := m.PeerID() - return p.String() - } - return m.Address + return p.ID, nil } func (m Mailserver) nullablePassword() (val sql.NullString) { @@ -133,6 +146,8 @@ func NewDB(db *sql.DB) *Database { } func (d *Database) Add(mailserver Mailserver) error { + // TODO: we are only storing the multiaddress. + // In a future PR we must allow storing multiple multiaddresses and ENR _, err := d.db.Exec(`INSERT OR REPLACE INTO mailservers( id, name, @@ -142,7 +157,7 @@ func (d *Database) Add(mailserver Mailserver) error { ) VALUES (?, ?, ?, ?, ?)`, mailserver.ID, mailserver.Name, - mailserver.Address, + (*mailserver.Addr).String(), mailserver.nullablePassword(), mailserver.Fleet, ) @@ -164,12 +179,13 @@ func toMailservers(rows *sql.Rows) ([]Mailserver, error) { for rows.Next() { var ( m Mailserver + addrStr string password sql.NullString ) if err := rows.Scan( &m.ID, &m.Name, - &m.Address, + &addrStr, &password, &m.Fleet, ); err != nil { @@ -179,6 +195,15 @@ func toMailservers(rows *sql.Rows) ([]Mailserver, error) { if password.Valid { m.Password = password.String } + + // TODO: we are only storing the multiaddress. + // In a future PR we must allow storing multiple multiaddresses and ENR + maddr, err := multiaddr.NewMultiaddr(addrStr) + if err != nil { + return nil, err + } + m.Addr = &maddr + result = append(result, m) } diff --git a/services/mailservers/fleet.go b/services/mailservers/fleet.go index e25e0894a..446a7f15f 100644 --- a/services/mailservers/fleet.go +++ b/services/mailservers/fleet.go @@ -15,167 +15,113 @@ func DefaultMailserversByFleet(fleet string) []Mailserver { func DefaultMailservers() []Mailserver { return []Mailserver{ - Mailserver{ - ID: "mail-01.ac-cn-hongkong-c.eth.prod", - Address: "enode://606ae04a71e5db868a722c77a21c8244ae38f1bd6e81687cc6cfe88a3063fa1c245692232f64f45bd5408fed5133eab8ed78049332b04f9c110eac7f71c1b429@47.75.247.214:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "node-01.ac-cn-hongkong-c.waku.sandbox", + ENR: MustDecodeENR("enr:-QEkuEBfEzJm_kigJ2HoSS_RBFJYhKHocGdkhhBr6jSUAWjLdFPp6Pj1l4yiTQp7TGHyu1kC6FyaU573VN8klLsEm-XuAYJpZIJ2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOwsS69tgD7u1K50r5-qG5hweuTwa0W26aYPnvivpNlrYN0Y3CCdl-DdWRwgiMohXdha3UyDw"), + Addr: MustDecodeMultiaddress("/dns4/node-01.ac-cn-hongkong-c.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmSJvSJphxRdbnigUV5bjRRZFBhTtWFTSyiKaQByCjwmpV"), + Fleet: params.FleetWakuSandbox, }, - Mailserver{ - ID: "mail-01.do-ams3.eth.prod", - Address: "enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@178.128.142.54:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "node-01.do-ams3.waku.sandbox", + ENR: MustDecodeENR("enr:-QESuEB4Dchgjn7gfAvwB00CxTA-nGiyk-aALI-H4dYSZD3rUk7bZHmP8d2U6xDiQ2vZffpo45Jp7zKNdnwDUx6g4o6XAYJpZIJ2NIJpcIRA4VDAim11bHRpYWRkcnO4XAArNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwAtNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOvD3S3jUNICsrOILlmhENiWAMmMVlAl6-Q8wRB7hidY4N0Y3CCdl-DdWRwgiMohXdha3UyDw"), + Addr: MustDecodeMultiaddress("/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmQSMNExfUYUqfuXWkD5DaNZnMYnigRxFKbk3tcEFQeQeE"), + Fleet: params.FleetWakuSandbox, }, - Mailserver{ - ID: "mail-01.gc-us-central1-a.eth.prod", - Address: "enode://ee2b53b0ace9692167a410514bca3024695dbf0e1a68e1dff9716da620efb195f04a4b9e873fb9b74ac84de801106c465b8e2b6c4f0d93b8749d1578bfcaf03e@104.197.238.144:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "node-01.gc-us-central1-a.waku.sandbox", + ENR: MustDecodeENR("enr:-QEkuEBIkb8q8_mrorHndoXH9t5N6ZfD-jehQCrYeoJDPHqT0l0wyaONa2-piRQsi3oVKAzDShDVeoQhy0uwN1xbZfPZAYJpZIJ2NIJpcIQiQlleim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQKnGt-GSgqPSf3IAPM7bFgTlpczpMZZLF3geeoNNsxzSoN0Y3CCdl-DdWRwgiMohXdha3UyDw"), + Addr: MustDecodeMultiaddress("/dns4/node-01.gc-us-central1-a.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAm6fyqE1jB5MonzvoMdU8v76bWV8ZeNpncDamY1MQXfjdB"), + Fleet: params.FleetWakuSandbox, }, - Mailserver{ - ID: "mail-02.ac-cn-hongkong-c.eth.prod", - Address: "enode://2c8de3cbb27a3d30cbb5b3e003bc722b126f5aef82e2052aaef032ca94e0c7ad219e533ba88c70585ebd802de206693255335b100307645ab5170e88620d2a81@47.244.221.14:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "node-01.ac-cn-hongkong-c.waku.test", + ENR: MustDecodeENR("enr:-QEeuECvvBe6kIzHgMv_mD1YWQ3yfOfid2MO9a_A6ZZmS7E0FmAfntz2ZixAnPXvLWDJ81ARp4oV9UM4WXyc5D5USdEPAYJpZIJ2NIJpcIQI2ttrim11bHRpYWRkcnO4aAAxNixub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS50ZXN0LnN0YXR1cy5pbQZ2XwAzNixub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS50ZXN0LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQJIN4qwz3v4r2Q8Bv8zZD0eqBcKw6bdLvdkV7-JLjqIj4N0Y3CCdl-DdWRwgiMohXdha3UyDw"), + Addr: MustDecodeMultiaddress("/dns4/node-01.ac-cn-hongkong-c.waku.test.statusim.net/tcp/30303/p2p/16Uiu2HAkzHaTP5JsUwfR9NR8Rj9HC24puS6ocaU8wze4QrXr9iXp"), + Fleet: params.FleetWakuTest, }, - Mailserver{ - ID: "mail-02.do-ams3.eth.prod", - Address: "enode://7aa648d6e855950b2e3d3bf220c496e0cae4adfddef3e1e6062e6b177aec93bc6cdcf1282cb40d1656932ebfdd565729da440368d7c4da7dbd4d004b1ac02bf8@178.128.142.26:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "node-01.do-ams3.waku.test", + ENR: MustDecodeENR("enr:-QEMuEDbayK340kH24XzK5FPIYNzWNYuH01NASNIb1skZfe_6l4_JSsG-vZ0LgN4Cgzf455BaP5zrxMQADHL5OQpbW6OAYJpZIJ2NIJpcISygI2rim11bHRpYWRkcnO4VgAoNiNub2RlLTAxLmRvLWFtczMud2FrdS50ZXN0LnN0YXR1cy5pbQZ2XwAqNiNub2RlLTAxLmRvLWFtczMud2FrdS50ZXN0LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQJATXRSRSUyTw_QLB6H_U3oziVQgNRgrXpK7wp2AMyNxYN0Y3CCdl-DdWRwgiMohXdha3UyDw"), + Addr: MustDecodeMultiaddress("/dns4/node-01.do-ams3.waku.test.statusim.net/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W"), + Fleet: params.FleetWakuTest, }, - Mailserver{ - ID: "mail-02.gc-us-central1-a.eth.prod", - Address: "enode://30211cbd81c25f07b03a0196d56e6ce4604bb13db773ff1c0ea2253547fafd6c06eae6ad3533e2ba39d59564cfbdbb5e2ce7c137a5ebb85e99dcfc7a75f99f55@23.236.58.92:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "node-01.gc-us-central1-a.waku.test", + ENR: MustDecodeENR("enr:-QEeuEBO08GSjWDOV13HTf6L7iFoPQhv4S0-_Bd7Of3lFCBNBmpB9j6pGLedkX88KAXm6BFCS4ViQ_rLeDQuzj9Q6fs9AYJpZIJ2NIJpcIQiEAFDim11bHRpYWRkcnO4aAAxNixub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS50ZXN0LnN0YXR1cy5pbQZ2XwAzNixub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS50ZXN0LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQMIJwesBVgUiBCi8yiXGx7RWylBQkYm1U9dvEy-neLG2YN0Y3CCdl-DdWRwgiMohXdha3UyDw"), + Addr: MustDecodeMultiaddress("/dns4/node-01.gc-us-central1-a.waku.test.statusim.net/tcp/30303/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG"), + Fleet: params.FleetWakuTest, }, - Mailserver{ - ID: "mail-03.ac-cn-hongkong-c.eth.prod", - Address: "enode://e85f1d4209f2f99da801af18db8716e584a28ad0bdc47fbdcd8f26af74dbd97fc279144680553ec7cd9092afe683ddea1e0f9fc571ebcb4b1d857c03a088853d@47.244.129.82:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "store-01.do-ams3.status.prod", + ENR: MustDecodeENR("enr:-QEMuEAs8JmmyUI3b9v_ADqYtELHUYAsAMS21lA2BMtrzF86tVmyy9cCrhmzfHGHx_g3nybn7jIRybzXTGNj3C2KzrriAYJpZIJ2NIJpcISf3_Jeim11bHRpYWRkcnO4XAArNiZzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwAtNiZzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLfoaQH3oSYW59yxEBfeAZbltmUnC4BzYkHqer2VQMTyoN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT"), + Fleet: params.FleetStatusProd, }, - Mailserver{ - ID: "mail-03.do-ams3.eth.prod", - Address: "enode://8a64b3c349a2e0ef4a32ea49609ed6eb3364be1110253c20adc17a3cebbc39a219e5d3e13b151c0eee5d8e0f9a8ba2cd026014e67b41a4ab7d1d5dd67ca27427@178.128.142.94:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "store-02.do-ams3.status.prod", + ENR: MustDecodeENR("enr:-QEMuEDuTfD47Hz_NXDwf7LJMf0qhjp3CQhZ9Fy0Ulp4XehtEzewBzmJCoe77hjno3khH8kX2B9B1DgbJuc2n32fMZvOAYJpZIJ2NIJpcISf3_Kaim11bHRpYWRkcnO4XAArNiZzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwAtNiZzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLSM62HmqGpZ382YM4CyI-MCIlkxMP7ZbOwqwRPvk9wsIN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-02.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9aDJPkhGxc2SFcEACTFdZ91Q5TJjp76qZEhq9iF59x7R"), + Fleet: params.FleetStatusProd, }, - Mailserver{ - ID: "mail-03.gc-us-central1-a.eth.prod", - Address: "enode://44160e22e8b42bd32a06c1532165fa9e096eebedd7fa6d6e5f8bbef0440bc4a4591fe3651be68193a7ec029021cdb496cfe1d7f9f1dc69eb99226e6f39a7a5d4@35.225.221.245:443", - Fleet: params.FleetProd, - Version: 1, + { + ID: "store-01.gc-us-central1-a.status.prod", + ENR: MustDecodeENR("enr:-QEeuEA08-NJJDuKh6V8739MPl2G7ykaC0EWyUg21KtjQ1UtKxuE2qNy5uES2_bobr7sC5C4sS_-GhDVYMpOrM2IFc8KAYJpZIJ2NIJpcIQiqsAnim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQN_aBxNsOBrceDLyC75vBFRuzv_tWfaHG50Jc9DQztwkIN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-01.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmMELCo218hncCtTvC2Dwbej3rbyHQcR8erXNnKGei7WPZ"), + Fleet: params.FleetStatusProd, }, - Mailserver{ - ID: "node-01.ac-cn-hongkong-c.waku.sandbox", - Address: "/dns4/node-01.ac-cn-hongkong-c.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmSJvSJphxRdbnigUV5bjRRZFBhTtWFTSyiKaQByCjwmpV", - Fleet: params.FleetWakuSandbox, - Version: 2, + { + ID: "store-02.gc-us-central1-a.status.prod", + ENR: MustDecodeENR("enr:-QEeuECQiv4VvUk04UnU3wxKXgWvErYcGMgYU8aDuc8VvEt1km2GvcEBq-R9XT-loNL5PZjxGKzB1rDtCOQaFVYQtgPnAYJpZIJ2NIJpcIQiqpoCim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQNbEg1bkMJCBiD5Tje3Z_11R-kd9munZF0v4iiYZa1jgoN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-02.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmJnVR7ZzFaYvciPVafUXuYGLHPzSUigqAmeNw9nJUVGeM"), + Fleet: params.FleetStatusProd, }, - Mailserver{ - ID: "node-01.do-ams3.waku.sandbox", - Address: "/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmQSMNExfUYUqfuXWkD5DaNZnMYnigRxFKbk3tcEFQeQeE", - Fleet: params.FleetWakuSandbox, - Version: 2, + { + ID: "store-01.ac-cn-hongkong-c.status.prod", + ENR: MustDecodeENR("enr:-QEeuED6hfo5OQICpfwrjuG-qC8MMjw8bsLrF-xi8tY4nz3h7nl_KOXA2C1q7gXOzJ-bROP2ZzITdRlP0HN57jiBuim9AYJpZIJ2NIJpcIQI2kpJim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMS5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMS5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQJm10jdarzx9hcdhRKGfsAyS0Hc5pWj3yhyTvT5FIwKGIN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-01.ac-cn-hongkong-c.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm2M7xs7cLPc3jamawkEqbr7cUJX11uvY7LxQ6WFUdUKUT"), + Fleet: params.FleetStatusProd, }, - Mailserver{ - ID: "node-01.gc-us-central1-a.waku.sandbox", - Address: "/dns4/node-01.gc-us-central1-a.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAm6fyqE1jB5MonzvoMdU8v76bWV8ZeNpncDamY1MQXfjdB", - Fleet: params.FleetWakuSandbox, - Version: 2, + { + ID: "store-02.ac-cn-hongkong-c.status.prod", + ENR: MustDecodeENR("enr:-QEeuEC0VBi0VMXNL4oQUfdAJL7RBXpWyB54TqUDt93Li3yuax4ohwMMIAmI6sg2jgH_HxgDRy5Ar-5CbMDW1EFxYFplAYJpZIJ2NIJpcIQI2nnoim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMi5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMi5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLMncuu6pJ3DQRzYUqkB1PbaRxZXIGJi8waKbbBFbOSNIN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-02.ac-cn-hongkong-c.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9CQhsuwPR54q27kNj9iaQVfyRzTGKrhFmr94oD8ujU6P"), + Fleet: params.FleetStatusProd, }, - Mailserver{ - ID: "node-01.ac-cn-hongkong-c.waku.test", - Address: "/dns4/node-01.ac-cn-hongkong-c.waku.test.statusim.net/tcp/30303/p2p/16Uiu2HAkzHaTP5JsUwfR9NR8Rj9HC24puS6ocaU8wze4QrXr9iXp", - Fleet: params.FleetWakuTest, - Version: 2, + { + ID: "store-01.do-ams3.status.staging.status.im", + ENR: MustDecodeENR("enr:-QESuECcvLR_0SfeYbcXqxmQrnQwtdhDd4DlqzpYAsmCiWOJAkRBhXFXBNS99tzi53QrECSw9UyOhazKb7memK8eMshbAYJpZIJ2NIJpcIQYkE53im11bHRpYWRkcnO4YgAuNilzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQZ2XwAwNilzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQJ-wlTnBcknPNUG72hag4NXSa6SeozscHKtYg1Ss3pldoN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-01.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAm3xVDaz6SRJ6kErwC21zBJEZjavVXg7VSkoWzaV1aMA3F"), + Fleet: params.FleetStatusStaging, }, - Mailserver{ - ID: "node-01.do-ams3.waku.test", - Address: "/dns4/node-01.do-ams3.waku.test.statusim.net/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W", - Fleet: params.FleetWakuTest, - Version: 2, + { + ID: "store-02.do-ams3.status.staging.status.im", + ENR: MustDecodeENR("enr:-QESuEDD651gYmOSqKbT-wmVzMmgQBpEsoqm6JdLgX-xqPo6PGKasYBooHujyVVR9Q_G3XY1LlnOsSgcelvs4vfdumB8AYJpZIJ2NIJpcIQYkE54im11bHRpYWRkcnO4YgAuNilzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQZ2XwAwNilzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQL5dMmr5GzH0Fton8NGBlUW_rZG8-f3Ph0XhvMUMeVIM4N0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-02.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmCDSnT8oNpMR9HH6uipD71KstYuDCAQGpek9XDAVmqdEr"), + Fleet: params.FleetStatusStaging, }, - Mailserver{ - ID: "node-01.gc-us-central1-a.waku.test", - Address: "/dns4/node-01.gc-us-central1-a.waku.test.statusim.net/tcp/30303/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG", - Fleet: params.FleetWakuTest, - Version: 2, + { + ID: "store-01.gc-us-central1-a.status.staging.status.im", + ENR: MustDecodeENR("enr:-QEkuEByZrFPBtvSWe0YjNrpupQzQg5nyJsQuiTVjLX8V_Du2lcFWg2GIMBWvLR7kCiwQtxgNCPH_lxXMxVbEkovBdQOAYJpZIJ2NIJpcIQj4OfRim11bHRpYWRkcnO4dAA3NjJzdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQZ2XwA5NjJzdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLpEfMK4rQu4Vj5p2mH3YpiNCaiB8Q9JWuCa5sHA1BoJ4N0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-01.gc-us-central1-a.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmB7Ur9HQqo3cWDPovRQjo57fxWWDaQx27WxSzDGhN4JKg"), + Fleet: params.FleetStatusStaging, }, - Mailserver{ - ID: "store-01.do-ams3.status.prod", - Address: "/dns4/store-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT", - Fleet: params.FleetStatusProd, - Version: 2, + { + ID: "store-02.gc-us-central1-a.status.staging.status.im", + ENR: MustDecodeENR("enr:-QEkuEAPht9zlTwD-vZWOlYXehHnrTpTMu0YaTaqHjYmyuhaM0bvLWLKjvH4df9TRDKI7dl9HM15LS3Qeqy9Vf83kfjlAYJpZIJ2NIJpcIQiSIy3im11bHRpYWRkcnO4dAA3NjJzdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQZ2XwA5NjJzdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQNg_xiKKSUfqa798Ay2GZzh1iRx58F7v5TQBfzFb9T0QYN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-02.gc-us-central1-a.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmKBd6crqQNZ6nKCSCpHCAwUPn3DUDmkcPSWUTyVXpxKsW"), + Fleet: params.FleetStatusStaging, }, - Mailserver{ - ID: "store-02.do-ams3.status.prod", - Address: "/dns4/store-02.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9aDJPkhGxc2SFcEACTFdZ91Q5TJjp76qZEhq9iF59x7R", - Fleet: params.FleetStatusProd, - Version: 2, + { + ID: "store-01.ac-cn-hongkong-c.status.staging.status.im", + ENR: MustDecodeENR("enr:-QEkuEDCHMeQ7rxmz7TPJy87bLeYobNhxZ90Fkycawu-WlSHQ1uaqrjxLL0btJpnv4gekPoqU6RjkQJSzsS4NxU6CWnPAYJpZIJ2NIJpcIQI2s6Gim11bHRpYWRkcnO4dAA3NjJzdG9yZS0wMS5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQZ2XwA5NjJzdG9yZS0wMS5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQOC7-rlGZ1POquzYNLxqu1_RddP7HXIGafRaEKM934p54N0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-01.ac-cn-hongkong-c.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmMU7Y29oL6DmoJfBFv8J4JhYzYgazPL7nGKJFBV3qcj2E"), + Fleet: params.FleetStatusStaging, }, - Mailserver{ - ID: "store-01.gc-us-central1-a.status.prod", - Address: "/dns4/store-01.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmMELCo218hncCtTvC2Dwbej3rbyHQcR8erXNnKGei7WPZ", - Fleet: params.FleetStatusProd, - Version: 2, - }, - Mailserver{ - ID: "store-02.gc-us-central1-a.status.prod", - Address: "/dns4/store-02.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmJnVR7ZzFaYvciPVafUXuYGLHPzSUigqAmeNw9nJUVGeM", - Fleet: params.FleetStatusProd, - Version: 2, - }, - Mailserver{ - ID: "store-01.ac-cn-hongkong-c.status.prod", - Address: "/dns4/store-01.ac-cn-hongkong-c.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm2M7xs7cLPc3jamawkEqbr7cUJX11uvY7LxQ6WFUdUKUT", - Fleet: params.FleetStatusProd, - Version: 2, - }, - Mailserver{ - ID: "store-02.ac-cn-hongkong-c.status.prod", - Address: "/dns4/store-02.ac-cn-hongkong-c.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9CQhsuwPR54q27kNj9iaQVfyRzTGKrhFmr94oD8ujU6P", - Fleet: params.FleetStatusProd, - Version: 2, - }, - Mailserver{ - ID: "store-01.do-ams3.status.staging.status.im", - Address: "/dns4/store-01.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAm3xVDaz6SRJ6kErwC21zBJEZjavVXg7VSkoWzaV1aMA3F", - Fleet: params.FleetStatusStaging, - Version: 2, - }, - Mailserver{ - ID: "store-02.do-ams3.status.staging.status.im", - Address: "/dns4/store-02.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmCDSnT8oNpMR9HH6uipD71KstYuDCAQGpek9XDAVmqdEr", - Fleet: params.FleetStatusStaging, - Version: 2, - }, - Mailserver{ - ID: "store-01.gc-us-central1-a.status.staging.status.im", - Address: "/dns4/store-01.gc-us-central1-a.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmB7Ur9HQqo3cWDPovRQjo57fxWWDaQx27WxSzDGhN4JKg", - Fleet: params.FleetStatusStaging, - Version: 2, - }, - Mailserver{ - ID: "store-02.gc-us-central1-a.status.staging.status.im", - Address: "/dns4/store-02.gc-us-central1-a.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmKBd6crqQNZ6nKCSCpHCAwUPn3DUDmkcPSWUTyVXpxKsW", - Fleet: params.FleetStatusStaging, - Version: 2, - }, - Mailserver{ - ID: "store-01.ac-cn-hongkong-c.status.staging.status.im", - Address: "/dns4/store-01.ac-cn-hongkong-c.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmMU7Y29oL6DmoJfBFv8J4JhYzYgazPL7nGKJFBV3qcj2E", - Fleet: params.FleetStatusStaging, - Version: 2, - }, - Mailserver{ - ID: "store-02.ac-cn-hongkong-c.status.staging.status.im", - Address: "/dns4/store-02.ac-cn-hongkong-c.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmU7xtcwytXpGpeDrfyhJkiFvTkQbLB9upL5MXPLGceG9K", - Fleet: params.FleetStatusStaging, - Version: 2, + { + ID: "store-02.ac-cn-hongkong-c.status.staging.status.im", + ENR: MustDecodeENR("enr:-QEkuEAxgmSmx5RJ1odC-C_bXkDCE_VXTuB49ENTlI89p9uNLVKRqrwythgiAtjFxAokR4gvHvQMcX5Ts0N70Ut_kyPJAYJpZIJ2NIJpcIQvTLKkim11bHRpYWRkcnO4dAA3NjJzdG9yZS0wMi5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQZ2XwA5NjJzdG9yZS0wMi5hYy1jbi1ob25na29uZy1jLnN0YXR1cy5zdGFnaW5nLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQPlyFXKktjIFNaZtTIFI_4ZfNyt3RKWxSPEyH_nb7-YFoN0Y3CCdl-DdWRwgiMohXdha3UyAw"), + Addr: MustDecodeMultiaddress("/dns4/store-02.ac-cn-hongkong-c.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmU7xtcwytXpGpeDrfyhJkiFvTkQbLB9upL5MXPLGceG9K"), + Fleet: params.FleetStatusStaging, }, } } diff --git a/services/mailservers/tcp_ping.go b/services/mailservers/tcp_ping.go deleted file mode 100644 index e7acc5e81..000000000 --- a/services/mailservers/tcp_ping.go +++ /dev/null @@ -1,152 +0,0 @@ -package mailservers - -import ( - "context" - "fmt" - "net" - "time" - - multiaddr "github.com/multiformats/go-multiaddr" - - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/status-im/status-go/rtt" -) - -type PingQuery struct { - Addresses []string `json:"addresses"` - TimeoutMs int `json:"timeoutMs"` -} - -type PingResult struct { - Address string `json:"address"` - RTTMs *int `json:"rttMs"` - Err *string `json:"error"` -} - -type parseFn func(string) (string, error) - -func (pr *PingResult) Update(rttMs int, err error) { - if err != nil { - errStr := err.Error() - pr.Err = &errStr - } - if rttMs >= 0 { - pr.RTTMs = &rttMs - } else { - pr.RTTMs = nil - } -} - -func EnodeToAddr(node *enode.Node) (string, error) { - var ip4 enr.IPv4 - err := node.Load(&ip4) - if err != nil { - return "", err - } - var tcp enr.TCP - err = node.Load(&tcp) - if err != nil { - return "", err - } - return fmt.Sprintf("%s:%d", net.IP(ip4).String(), tcp), nil -} - -func EnodeStringToAddr(enodeAddr string) (string, error) { - node, err := enode.ParseV4(enodeAddr) - if err != nil { - return "", err - } - return EnodeToAddr(node) -} - -func parse(addresses []string, fn parseFn) (map[string]*PingResult, []string) { - results := make(map[string]*PingResult, len(addresses)) - var toPing []string - - for i := range addresses { - addr, err := fn(addresses[i]) - if err != nil { - errStr := err.Error() - results[addresses[i]] = &PingResult{Address: addresses[i], Err: &errStr} - continue - } - results[addr] = &PingResult{Address: addresses[i]} - toPing = append(toPing, addr) - } - return results, toPing -} - -func mapValues(m map[string]*PingResult) []*PingResult { - rval := make([]*PingResult, 0, len(m)) - for _, value := range m { - rval = append(rval, value) - } - return rval -} - -func DoPing(ctx context.Context, addresses []string, timeoutMs int, p parseFn) ([]*PingResult, error) { - timeout := time.Duration(timeoutMs) * time.Millisecond - - resultsMap, toPing := parse(addresses, p) - - // run the checks concurrently - results, err := rtt.CheckHosts(toPing, timeout) - if err != nil { - return nil, err - } - - // set ping results - for i := range results { - r := results[i] - pr := resultsMap[r.Addr] - if pr == nil { - continue - } - pr.Update(r.RTTMs, r.Err) - } - - return mapValues(resultsMap), nil -} - -func (a *API) Ping(ctx context.Context, pq PingQuery) ([]*PingResult, error) { - return DoPing(ctx, pq.Addresses, pq.TimeoutMs, EnodeStringToAddr) -} - -func MultiAddressToAddress(multiAddr string) (string, error) { - - ma, err := multiaddr.NewMultiaddr(multiAddr) - if err != nil { - return "", err - } - - dns4, err := ma.ValueForProtocol(multiaddr.P_DNS4) - if err != nil && err != multiaddr.ErrProtocolNotFound { - return "", err - } - - ip4 := "" - if dns4 != "" { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - ip4, err = net.DefaultResolver.LookupCNAME(ctx, dns4) - if err != nil { - return "", err - } - } else { - ip4, err = ma.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - return "", err - } - } - - tcp, err := ma.ValueForProtocol(multiaddr.P_TCP) - if err != nil { - return "", err - } - return fmt.Sprintf("%s:%s", ip4, tcp), nil -} - -func (a *API) MultiAddressPing(ctx context.Context, pq PingQuery) ([]*PingResult, error) { - return DoPing(ctx, pq.Addresses, pq.TimeoutMs, MultiAddressToAddress) -} diff --git a/services/wakuext/api.go b/services/wakuext/api.go index 2a4656547..a021fdddc 100644 --- a/services/wakuext/api.go +++ b/services/wakuext/api.go @@ -1,21 +1,9 @@ package wakuext import ( - "context" - "crypto/ecdsa" - "fmt" - "time" - "github.com/ethereum/go-ethereum/log" - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/services/ext" - waku "github.com/status-im/status-go/waku/common" -) - -const ( - // defaultWorkTime is a work time reported in messages sent to MailServer nodes. - defaultWorkTime = 5 ) // PublicAPI extends waku public API. @@ -35,94 +23,3 @@ func NewPublicAPI(s *Service) *PublicAPI { log: log.New("package", "status-go/services/wakuext.PublicAPI"), } } - -// makeEnvelop makes an envelop for a historic messages request. -// Symmetric key is used to authenticate to MailServer. -// PK is the current node ID. -// DEPRECATED -func makeEnvelop( - payload []byte, - symKey []byte, - publicKey *ecdsa.PublicKey, - nodeID *ecdsa.PrivateKey, - pow float64, - now time.Time, -) (types.Envelope, error) { - params := waku.MessageParams{ - PoW: pow, - Payload: payload, - WorkTime: defaultWorkTime, - Src: nodeID, - } - // Either symKey or public key is required. - // This condition is verified in `message.Wrap()` method. - if len(symKey) > 0 { - params.KeySym = symKey - } else if publicKey != nil { - params.Dst = publicKey - } - message, err := waku.NewSentMessage(¶ms) - if err != nil { - return nil, err - } - envelope, err := message.Wrap(¶ms, now) - if err != nil { - return nil, err - } - return gethbridge.NewWakuEnvelope(envelope), nil -} - -// RequestMessages sends a request for historic messages to a MailServer. -func (api *PublicAPI) RequestMessages(_ context.Context, r ext.MessagesRequest) (types.HexBytes, error) { - api.log.Info("RequestMessages", "request", r) - - now := api.service.w.GetCurrentTime() - r.SetDefaults(now) - - if r.From > r.To { - return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To) - } - - mailServerNode, err := api.service.GetPeer(r.MailServerPeer) - if err != nil { - return nil, fmt.Errorf("%v: %v", ext.ErrInvalidMailServerPeer, err) - } - - var ( - symKey []byte - publicKey *ecdsa.PublicKey - ) - - if r.SymKeyID != "" { - symKey, err = api.service.w.GetSymKey(r.SymKeyID) - if err != nil { - return nil, fmt.Errorf("%v: %v", ext.ErrInvalidSymKeyID, err) - } - } else { - publicKey = mailServerNode.Pubkey() - } - - payload, err := ext.MakeMessagesRequestPayload(r) - if err != nil { - return nil, err - } - - envelope, err := makeEnvelop( - payload, - symKey, - publicKey, - api.service.NodeID(), - api.service.w.MinPow(), - now, - ) - if err != nil { - return nil, err - } - hash := envelope.Hash() - - if err := api.service.w.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil { - return nil, err - } - - return hash[:], nil -} diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index 4f79a242c..08c77c7df 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -1,29 +1,19 @@ package wakuext import ( - "context" - "fmt" "io/ioutil" - "math" - "os" - "strconv" "testing" - "time" "go.uber.org/zap" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/appdatabase" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/crypto" - "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/params" "github.com/status-im/status-go/services/ext" @@ -32,76 +22,6 @@ import ( "github.com/status-im/status-go/walletdatabase" ) -func TestRequestMessagesErrors(t *testing.T) { - var err error - - waku := gethbridge.NewGethWakuWrapper(waku.New(nil, nil)) - aNode, err := node.New(&node.Config{ - P2P: p2p.Config{ - MaxPeers: math.MaxInt32, - NoDiscovery: true, - }, - NoUSB: true, - }) // in-memory node as no data dir - require.NoError(t, err) - w := gethbridge.GetGethWakuFrom(waku) - aNode.RegisterLifecycle(w) - aNode.RegisterAPIs(w.APIs()) - aNode.RegisterProtocols(w.Protocols()) - require.NoError(t, err) - - err = aNode.Start() - require.NoError(t, err) - defer func() { require.NoError(t, aNode.Close()) }() - - handler := ext.NewHandlerMock(1) - config := params.NodeConfig{ - RootDataDir: os.TempDir(), - ShhextConfig: params.ShhextConfig{ - InstallationID: "1", - PFSEnabled: true, - }, - } - nodeWrapper := ext.NewTestNodeWrapper(nil, waku) - service := New(config, nodeWrapper, nil, handler, nil) - api := NewPublicAPI(service) - - const mailServerPeer = "enode://b7e65e1bedc2499ee6cbd806945af5e7df0e59e4070c96821570bd581473eade24a489f5ec95d060c0db118c879403ab88d827d3766978f28708989d35474f87@[::]:51920" - - var hash []byte - - // invalid MailServer enode address - hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{MailServerPeer: "invalid-address"}) - require.Nil(t, hash) - require.EqualError(t, err, "invalid mailServerPeer value: invalid URL scheme, want \"enode\"") - - // non-existent symmetric key - hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{ - MailServerPeer: mailServerPeer, - SymKeyID: "invalid-sym-key-id", - }) - require.Nil(t, hash) - require.EqualError(t, err, "invalid symKeyID value: non-existent key ID") - - // with a symmetric key - symKeyID, symKeyErr := waku.AddSymKeyFromPassword("some-pass") - require.NoError(t, symKeyErr) - hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{ - MailServerPeer: mailServerPeer, - SymKeyID: symKeyID, - }) - require.Nil(t, hash) - require.Contains(t, err.Error(), "could not find peer with ID") - - // from is greater than to - hash, err = api.RequestMessages(context.TODO(), ext.MessagesRequest{ - From: 10, - To: 5, - }) - require.Nil(t, hash) - require.Contains(t, err.Error(), "Query range is invalid: from > to (10 > 5)") -} - func TestInitProtocol(t *testing.T) { config := params.NodeConfig{ RootDataDir: t.TempDir(), @@ -142,157 +62,3 @@ func TestInitProtocol(t *testing.T) { err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop(), accountsFeed) require.NoError(t, err) } - -func TestShhExtSuite(t *testing.T) { - suite.Run(t, new(ShhExtSuite)) -} - -type ShhExtSuite struct { - suite.Suite - - dir string - nodes []*node.Node - wakus []types.Waku - services []*Service -} - -func (s *ShhExtSuite) createAndAddNode() { - idx := len(s.nodes) - - // create a node - cfg := &node.Config{ - Name: strconv.Itoa(idx), - P2P: p2p.Config{ - MaxPeers: math.MaxInt32, - NoDiscovery: true, - ListenAddr: ":0", - }, - NoUSB: true, - } - stack, err := node.New(cfg) - s.NoError(err) - w := waku.New(nil, nil) - stack.RegisterLifecycle(w) - stack.RegisterAPIs(w.APIs()) - stack.RegisterProtocols(w.Protocols()) - s.NoError(err) - - // set up protocol - config := params.NodeConfig{ - RootDataDir: s.dir, - ShhextConfig: params.ShhextConfig{ - InstallationID: "1", - PFSEnabled: true, - MailServerConfirmations: true, - ConnectionTarget: 10, - }, - } - db, err := leveldb.Open(storage.NewMemStorage(), nil) - s.Require().NoError(err) - nodeWrapper := ext.NewTestNodeWrapper(nil, gethbridge.NewGethWakuWrapper(w)) - service := New(config, nodeWrapper, nil, nil, db) - - appDB, cleanupDB, err := helpers.SetupTestSQLDB(appdatabase.DbInitializer{}, fmt.Sprintf("%d", idx)) - s.Require().NoError(err) - defer func() { s.Require().NoError(cleanupDB()) }() - - tmpfile, err := ioutil.TempFile("", "multi-accounts-tests-") - s.Require().NoError(err) - - multiAccounts, err := multiaccounts.InitializeDB(tmpfile.Name()) - s.Require().NoError(err) - - privateKey, err := crypto.GenerateKey() - s.NoError(err) - - acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"} - - walletDB, err := helpers.SetupTestMemorySQLDB(&walletdatabase.DbInitializer{}) - s.Require().NoError(err) - - accountsFeed := &event.Feed{} - - err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop(), accountsFeed) - s.NoError(err) - - stack.RegisterLifecycle(service) - stack.RegisterAPIs(service.APIs()) - stack.RegisterProtocols(service.Protocols()) - - s.NoError(err) - - // start the node - err = stack.Start() - s.Require().NoError(err) - - // store references - s.nodes = append(s.nodes, stack) - s.wakus = append(s.wakus, gethbridge.NewGethWakuWrapper(w)) - s.services = append(s.services, service) -} - -func (s *ShhExtSuite) SetupTest() { - s.dir = s.T().TempDir() -} - -func (s *ShhExtSuite) TearDownTest() { - for _, n := range s.nodes { - s.NoError(n.Close()) - } - s.nodes = nil - s.wakus = nil - s.services = nil -} - -func (s *ShhExtSuite) TestRequestMessagesSuccess() { - // two nodes needed: client and mailserver - s.createAndAddNode() - s.createAndAddNode() - - waitErr := helpers.WaitForPeerAsync(s.nodes[0].Server(), s.nodes[1].Server().Self().URLv4(), p2p.PeerEventTypeAdd, time.Second) - s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) - s.Require().NoError(<-waitErr) - - api := NewPublicAPI(s.services[0]) - - _, err := api.RequestMessages(context.Background(), ext.MessagesRequest{ - MailServerPeer: s.nodes[1].Server().Self().URLv4(), - Topics: []types.TopicType{{1}}, - }) - s.NoError(err) -} - -func (s *ShhExtSuite) TestMultipleRequestMessagesWithoutForce() { - // two nodes needed: client and mailserver - s.createAndAddNode() - s.createAndAddNode() - - waitErr := helpers.WaitForPeerAsync(s.nodes[0].Server(), s.nodes[1].Server().Self().URLv4(), p2p.PeerEventTypeAdd, time.Second) - s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) - s.Require().NoError(<-waitErr) - - api := NewPublicAPI(s.services[0]) - - _, err := api.RequestMessages(context.Background(), ext.MessagesRequest{ - MailServerPeer: s.nodes[1].Server().Self().URLv4(), - Topics: []types.TopicType{{1}}, - }) - s.NoError(err) - _, err = api.RequestMessages(context.Background(), ext.MessagesRequest{ - MailServerPeer: s.nodes[1].Server().Self().URLv4(), - Topics: []types.TopicType{{2}}, - }) - s.NoError(err) -} - -func (s *ShhExtSuite) TestFailedRequestWithUnknownMailServerPeer() { - s.createAndAddNode() - - api := NewPublicAPI(s.services[0]) - - _, err := api.RequestMessages(context.Background(), ext.MessagesRequest{ - MailServerPeer: "enode://19872f94b1e776da3a13e25afa71b47dfa99e658afd6427ea8d6e03c22a99f13590205a8826443e95a37eee1d815fc433af7a8ca9a8d0df7943d1f55684045b7@0.0.0.0:30305", - Topics: []types.TopicType{{1}}, - }) - s.EqualError(err, "could not find peer with ID: 10841e6db5c02fc331bf36a8d2a9137a1696d9d3b6b1f872f780e02aa8ec5bba") -} diff --git a/signal/events_shhext.go b/signal/events_shhext.go index c093921e5..ad751a92c 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -5,9 +5,11 @@ import ( "encoding/json" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/services/mailservers" ) const ( @@ -102,8 +104,8 @@ type BundleAddedSignal struct { } type MailserverSignal struct { - Address string `json:"address"` - ID string `json:"id"` + Address *multiaddr.Multiaddr `json:"address"` + ID string `json:"id"` } type Filter struct { @@ -218,20 +220,23 @@ func SendNewMessages(obj json.Marshaler) { send(EventNewMessages, obj) } -func SendMailserverAvailable(nodeAddress, id string) { - send(EventMailserverAvailable, MailserverSignal{ - Address: nodeAddress, - ID: id, - }) +func sendMailserverSignal(ms *mailservers.Mailserver, event string) { + msSignal := MailserverSignal{} + if ms != nil { + msSignal.Address = ms.Addr + msSignal.ID = ms.ID + } + send(event, msSignal) } -func SendMailserverChanged(nodeAddress, id string) { - send(EventMailserverChanged, MailserverSignal{ - Address: nodeAddress, - ID: id, - }) +func SendMailserverAvailable(ms *mailservers.Mailserver) { + sendMailserverSignal(ms, EventMailserverAvailable) +} + +func SendMailserverChanged(ms *mailservers.Mailserver) { + sendMailserverSignal(ms, EventMailserverChanged) } func SendMailserverNotWorking() { - send(EventMailserverNotWorking, MailserverSignal{}) + sendMailserverSignal(nil, EventMailserverNotWorking) } diff --git a/waku/common/protocol.go b/waku/common/protocol.go index bec07bfb6..0bedb0267 100644 --- a/waku/common/protocol.go +++ b/waku/common/protocol.go @@ -32,7 +32,6 @@ type Peer interface { SetRWWriter(p2p.MsgReadWriter) RequestHistoricMessages(*Envelope) error - SendMessagesRequest(MessagesRequest) error SendHistoricMessageResponse([]byte) error SendP2PMessages([]*Envelope) error SendRawP2PDirect([]rlp.RawValue) error diff --git a/waku/v0/peer.go b/waku/v0/peer.go index 6ba6f2e11..8ae8d68ca 100644 --- a/waku/v0/peer.go +++ b/waku/v0/peer.go @@ -114,10 +114,6 @@ func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error { return p2p.Send(p.rw, p2pRequestCode, envelope) } -func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error { - return p2p.Send(p.rw, p2pRequestCode, request) -} - func (p *Peer) SendHistoricMessageResponse(payload []byte) error { size, r, err := rlp.EncodeToReader(payload) if err != nil { diff --git a/waku/v1/peer.go b/waku/v1/peer.go index 068b36738..87fd7eeba 100644 --- a/waku/v1/peer.go +++ b/waku/v1/peer.go @@ -133,14 +133,6 @@ func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) (err error) { return } -func (p *Peer) SendMessagesRequest(request common.MessagesRequest) (err error) { - err = p2p.Send(p.rw, p2pRequestCode, request) - if err != nil { - p.stats.AddUpload(request) - } - return -} - func (p *Peer) SendHistoricMessageResponse(payload []byte) (err error) { size, r, err := rlp.EncodeToReader(payload) if err != nil { diff --git a/waku/waku.go b/waku/waku.go index 5045efa23..218cfdf40 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -41,7 +41,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" @@ -641,73 +640,6 @@ func (w *Waku) AllowP2PMessagesFromPeer(peerID []byte) error { return nil } -// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer, -// which is known to implement MailServer interface, and is supposed to process this -// request and respond with a number of peer-to-peer messages (possibly expired), -// which are not supposed to be forwarded any further. -// The waku protocol is agnostic of the format and contents of envelope. -func (w *Waku) RequestHistoricMessages(peerID []byte, envelope *common.Envelope) error { - return w.RequestHistoricMessagesWithTimeout(peerID, envelope, 0) -} - -// RequestHistoricMessagesWithTimeout acts as RequestHistoricMessages but requires to pass a timeout. -// It sends an event EventMailServerRequestExpired after the timeout. -func (w *Waku) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *common.Envelope, timeout time.Duration) error { - p, err := w.getPeer(peerID) - if err != nil { - return err - } - p.SetPeerTrusted(true) - - w.envelopeFeed.Send(common.EnvelopeEvent{ - Peer: p.EnodeID(), - Topic: envelope.Topic, - Hash: envelope.Hash(), - Event: common.EventMailServerRequestSent, - }) - - err = p.RequestHistoricMessages(envelope) - if timeout != 0 { - go w.expireRequestHistoricMessages(p.EnodeID(), envelope.Hash(), timeout) - } - return err -} - -func (w *Waku) SendMessagesRequest(peerID []byte, request common.MessagesRequest) error { - if err := request.Validate(); err != nil { - return err - } - p, err := w.getPeer(peerID) - if err != nil { - return err - } - p.SetPeerTrusted(true) - if err := p.SendMessagesRequest(request); err != nil { - return err - } - w.envelopeFeed.Send(common.EnvelopeEvent{ - Peer: p.EnodeID(), - Hash: gethcommon.BytesToHash(request.ID), - Event: common.EventMailServerRequestSent, - }) - return nil -} - -func (w *Waku) expireRequestHistoricMessages(peer enode.ID, hash gethcommon.Hash, timeout time.Duration) { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case <-w.quit: - return - case <-timer.C: - w.envelopeFeed.Send(common.EnvelopeEvent{ - Peer: peer, - Hash: hash, - Event: common.EventMailServerRequestExpired, - }) - } -} - func (w *Waku) SendHistoricMessageResponse(peerID []byte, payload []byte) error { peer, err := w.getPeer(peerID) if err != nil { diff --git a/waku/waku_version_test.go b/waku/waku_version_test.go index 8bef970df..1b32040f4 100644 --- a/waku/waku_version_test.go +++ b/waku/waku_version_test.go @@ -19,7 +19,6 @@ package waku import ( - "errors" mrand "math/rand" "testing" "time" @@ -36,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/status-im/status-go/protocol/tt" ) func TestWakuV0(t *testing.T) { @@ -280,20 +278,6 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() { timer.Stop() } -func discardPipe() *p2p.MsgPipeRW { - rw1, rw2 := p2p.MsgPipe() - go func() { - for { - msg, err := rw1.ReadMsg() - if err != nil { - return - } - msg.Discard() // nolint: errcheck - } - }() - return rw2 -} - func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() { c := &Config{ MaxMessageSize: common.DefaultMaxMessageSize, @@ -339,31 +323,6 @@ func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() { } } -func (s *WakuTestSuite) TestRequestSentEventWithExpiry() { - w := New(nil, nil) - p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{Name: "waku", Version: 1}}) - rw := discardPipe() - defer func() { handleError(s.T(), rw.Close()) }() - w.peers[s.newPeer(w, p, rw, nil, s.stats)] = struct{}{} - events := make(chan common.EnvelopeEvent, 1) - sub := w.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - e := &common.Envelope{Nonce: 1} - s.Require().NoError(w.RequestHistoricMessagesWithTimeout(p.ID().Bytes(), e, time.Millisecond)) - verifyEvent := func(etype common.EventType) { - select { - case <-time.After(time.Second): - s.Require().FailNow("error waiting for a event type %s", etype) - case ev := <-events: - s.Require().Equal(etype, ev.Event) - s.Require().Equal(p.ID(), ev.Peer) - s.Require().Equal(e.Hash(), ev.Hash) - } - } - verifyEvent(common.EventMailServerRequestSent) - verifyEvent(common.EventMailServerRequestExpired) -} - type MockMailserver struct { deliverMail func([]byte, *common.Envelope) } @@ -381,87 +340,6 @@ func (m *MockMailserver) DeliverMail(peerID []byte, e *common.Envelope) { } } -func (s *WakuTestSuite) TestDeprecatedDeliverMail() { - - w1 := New(nil, nil) - w2 := New(nil, nil) - - var deliverMailCalled bool - - w2.RegisterMailServer(&MockMailserver{ - deliverMail: func(peerID []byte, e *common.Envelope) { - deliverMailCalled = true - }, - }) - - rw1, rw2 := p2p.MsgPipe() - p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{Name: "waku", Version: 0}}), rw2, nil, s.stats) - - go func() { handleError(s.T(), w1.HandlePeer(p1, rw2)) }() - - timer := time.AfterFunc(5*time.Second, func() { - handleError(s.T(), rw1.Close()) - }) - peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil, s.stats) - s.Require().NoError(peer2.Start()) - - go func() { handleError(s.T(), peer2.Run()) }() - - s.Require().NoError(w1.RequestHistoricMessages(p1.ID(), &common.Envelope{Data: []byte{1}})) - - err := tt.RetryWithBackOff(func() error { - if !deliverMailCalled { - return errors.New("DeliverMail not called") - } - return nil - }) - s.Require().NoError(err) - s.Require().NoError(rw1.Close()) - s.Require().NoError(rw2.Close()) - - timer.Stop() - -} - -func (s *WakuTestSuite) TestSendMessagesRequest() { - validMessagesRequest := common.MessagesRequest{ - ID: make([]byte, 32), - From: 0, - To: 10, - Bloom: []byte{0x01}, - } - - s.Run("InvalidID", func() { - w := New(nil, nil) - err := w.SendMessagesRequest([]byte{0x01, 0x02}, common.MessagesRequest{}) - s.Require().EqualError(err, "invalid 'ID', expected a 32-byte slice") - }) - - s.Run("WithoutPeer", func() { - w := New(nil, nil) - err := w.SendMessagesRequest([]byte{0x01, 0x02}, validMessagesRequest) - s.Require().EqualError(err, "could not find peer with ID: 0102") - }) - - s.Run("AllGood", func() { - p := p2p.NewPeer(enode.ID{0x01}, "peer01", nil) - rw1, rw2 := p2p.MsgPipe() - w := New(nil, nil) - w.peers[s.newPeer(w, p, rw1, nil, s.stats)] = struct{}{} - - go func() { - // Read out so that it's consumed - _, err := rw2.ReadMsg() - s.Require().NoError(err) - s.Require().NoError(rw1.Close()) - s.Require().NoError(rw2.Close()) - - }() - err := w.SendMessagesRequest(p.ID().Bytes(), validMessagesRequest) - s.Require().NoError(err) - }) -} - func (s *WakuTestSuite) TestRateLimiterIntegration() { conf := &Config{ MinimumAcceptedPoW: 0, diff --git a/wakuv2/waku.go b/wakuv2/waku.go index ed5849bd1..574381b5e 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -36,6 +36,7 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" @@ -1797,6 +1798,19 @@ func (w *Waku) PeerID() peer.ID { return w.node.Host().ID() } +func (w *Waku) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + pingResultCh := ping.Ping(ctx, w.node.Host(), peerID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} + func (w *Waku) Peerstore() peerstore.Peerstore { return w.node.Host().Peerstore() }