refactor_: storenode cycle and allow ENRs and multiaddresses
This commit is contained in:
parent
302c798b3e
commit
05e3a35bf7
|
@ -2,16 +2,13 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
|
||||||
stdlog "log"
|
stdlog "log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/crypto/sha3"
|
|
||||||
"golang.org/x/crypto/ssh/terminal"
|
"golang.org/x/crypto/ssh/terminal"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
@ -19,32 +16,22 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
|
||||||
"github.com/status-im/status-go/api"
|
"github.com/status-im/status-go/api"
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
|
||||||
"github.com/status-im/status-go/logutils"
|
"github.com/status-im/status-go/logutils"
|
||||||
"github.com/status-im/status-go/params"
|
"github.com/status-im/status-go/params"
|
||||||
"github.com/status-im/status-go/rpc"
|
|
||||||
"github.com/status-im/status-go/t/helpers"
|
"github.com/status-im/status-go/t/helpers"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
mailboxPassword = "status-offline-inbox"
|
|
||||||
)
|
|
||||||
|
|
||||||
// All general log messages in this package should be routed through this logger.
|
// All general log messages in this package should be routed through this logger.
|
||||||
var logger = log.New("package", "status-go/cmd/node-canary")
|
var logger = log.New("package", "status-go/cmd/node-canary")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
staticEnodeAddr = flag.String("staticnode", "", "checks if static node talks waku protocol (e.g. enode://abc123@1.2.3.4:30303)")
|
staticEnodeAddr = flag.String("staticnode", "", "checks if static node talks waku protocol (e.g. enode://abc123@1.2.3.4:30303)")
|
||||||
mailserverEnodeAddr = flag.String("mailserver", "", "queries mail server for historic messages (e.g. enode://123abc@4.3.2.1:30504)")
|
minPow = flag.Float64("waku.pow", params.WakuMinimumPoW, "PoW for messages to be added to queue, in float format")
|
||||||
publicChannel = flag.String("channel", "status", "The public channel name to retrieve historic messages from (used with 'mailserver' flag)")
|
ttl = flag.Int("waku.ttl", params.WakuTTL, "Time to live for messages, in seconds")
|
||||||
timeout = flag.Int("timeout", 10, "Timeout when connecting to node or fetching messages from mailserver, in seconds")
|
homePath = flag.String("home-dir", ".", "Home directory where state is stored")
|
||||||
period = flag.Int("period", 24*60*60, "How far in the past to request messages from mailserver, in seconds")
|
logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`)
|
||||||
minPow = flag.Float64("waku.pow", params.WakuMinimumPoW, "PoW for messages to be added to queue, in float format")
|
logFile = flag.String("logfile", "", "Path to the log file")
|
||||||
ttl = flag.Int("waku.ttl", params.WakuTTL, "Time to live for messages, in seconds")
|
logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors")
|
||||||
homePath = flag.String("home-dir", ".", "Home directory where state is stored")
|
|
||||||
logLevel = flag.String("log", "INFO", `Log level, one of: "ERROR", "WARN", "INFO", "DEBUG", and "TRACE"`)
|
|
||||||
logFile = flag.String("logfile", "", "Path to the log file")
|
|
||||||
logWithoutColors = flag.Bool("log-without-color", false, "Disables log colors")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -180,107 +167,3 @@ func startClientNode() (*api.GethStatusBackend, error) {
|
||||||
}
|
}
|
||||||
return clientBackend, err
|
return clientBackend, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func joinPublicChat(w types.Waku, rpcClient *rpc.Client, name string) (string, types.TopicType, string, error) {
|
|
||||||
keyID, err := w.AddSymKeyFromPassword(name)
|
|
||||||
if err != nil {
|
|
||||||
return "", types.TopicType{}, "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
h := sha3.NewLegacyKeccak256()
|
|
||||||
_, err = h.Write([]byte(name))
|
|
||||||
if err != nil {
|
|
||||||
return "", types.TopicType{}, "", err
|
|
||||||
}
|
|
||||||
fullTopic := h.Sum(nil)
|
|
||||||
topic := types.BytesToTopic(fullTopic)
|
|
||||||
|
|
||||||
wakuAPI := w.PublicWakuAPI()
|
|
||||||
filterID, err := wakuAPI.NewMessageFilter(types.Criteria{SymKeyID: keyID, Topics: []types.TopicType{topic}})
|
|
||||||
|
|
||||||
return keyID, topic, filterID, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForMailServerRequestSent(events chan types.EnvelopeEvent, requestID types.Hash, timeout time.Duration) error {
|
|
||||||
timeoutTimer := time.NewTimer(timeout)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-events:
|
|
||||||
if event.Hash == requestID && event.Event == types.EventMailServerRequestSent {
|
|
||||||
timeoutTimer.Stop()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
case <-timeoutTimer.C:
|
|
||||||
return errors.New("timed out waiting for mailserver request sent")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForMailServerResponse(events chan types.EnvelopeEvent, requestID types.Hash, timeout time.Duration) (*types.MailServerResponse, error) {
|
|
||||||
timeoutTimer := time.NewTimer(timeout)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-events:
|
|
||||||
if event.Hash == requestID {
|
|
||||||
resp, err := decodeMailServerResponse(event)
|
|
||||||
if resp != nil || err != nil {
|
|
||||||
timeoutTimer.Stop()
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-timeoutTimer.C:
|
|
||||||
return nil, errors.New("timed out waiting for mailserver response")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeMailServerResponse(event types.EnvelopeEvent) (*types.MailServerResponse, error) {
|
|
||||||
switch event.Event {
|
|
||||||
case types.EventMailServerRequestSent:
|
|
||||||
return nil, nil
|
|
||||||
case types.EventMailServerRequestCompleted:
|
|
||||||
resp, ok := event.Data.(*types.MailServerResponse)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("failed to convert event to a *MailServerResponse")
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
case types.EventMailServerRequestExpired:
|
|
||||||
return nil, errors.New("no messages available from mailserver")
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unexpected event type: %v", event.Event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForEnvelopeEvents(events chan types.EnvelopeEvent, hashes []string, event types.EventType) error {
|
|
||||||
check := make(map[string]struct{})
|
|
||||||
for _, hash := range hashes {
|
|
||||||
check[hash] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
timeout := time.NewTimer(time.Second * 5)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case e := <-events:
|
|
||||||
if e.Event == event {
|
|
||||||
delete(check, e.Hash.String())
|
|
||||||
if len(check) == 0 {
|
|
||||||
timeout.Stop()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-timeout.C:
|
|
||||||
return fmt.Errorf("timed out while waiting for event on envelopes. event: %s", event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// helper for checking LastEnvelopeHash
|
|
||||||
func isEmptyEnvelope(hash types.Hash) bool {
|
|
||||||
for _, b := range hash {
|
|
||||||
if b != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -318,3 +318,7 @@ func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
|
||||||
func (w *GethWakuWrapper) PeerID() peer.ID {
|
func (w *GethWakuWrapper) PeerID() peer.ID {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) {
|
||||||
|
return 0, errors.New("not available in WakuV1")
|
||||||
|
}
|
||||||
|
|
|
@ -342,3 +342,7 @@ func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
|
||||||
func (w *gethWakuV2Wrapper) PeerID() peer.ID {
|
func (w *gethWakuV2Wrapper) PeerID() peer.ID {
|
||||||
return w.waku.PeerID()
|
return w.waku.PeerID()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||||
|
return w.waku.PingPeer(ctx, peerID)
|
||||||
|
}
|
||||||
|
|
|
@ -199,4 +199,7 @@ type Waku interface {
|
||||||
|
|
||||||
// PeerID returns node's PeerID
|
// PeerID returns node's PeerID
|
||||||
PeerID() peer.ID
|
PeerID() peer.ID
|
||||||
|
|
||||||
|
// PingPeer returns the reply time
|
||||||
|
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import (
|
||||||
|
|
||||||
gethcommon "github.com/ethereum/go-ethereum/common"
|
gethcommon "github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
|
|
||||||
"github.com/status-im/status-go/account"
|
"github.com/status-im/status-go/account"
|
||||||
|
@ -139,7 +138,7 @@ type Messenger struct {
|
||||||
allInstallations *installationMap
|
allInstallations *installationMap
|
||||||
modifiedInstallations *stringBoolMap
|
modifiedInstallations *stringBoolMap
|
||||||
installationID string
|
installationID string
|
||||||
storenodeCycle storenodeCycle
|
mailserverCycle mailserverCycle
|
||||||
communityStorenodes *storenodes.CommunityStorenodes
|
communityStorenodes *storenodes.CommunityStorenodes
|
||||||
database *sql.DB
|
database *sql.DB
|
||||||
multiAccounts *multiaccounts.Database
|
multiAccounts *multiaccounts.Database
|
||||||
|
@ -203,7 +202,6 @@ type connStatus int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
disconnected connStatus = iota + 1
|
disconnected connStatus = iota + 1
|
||||||
connecting
|
|
||||||
connected
|
connected
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -211,15 +209,13 @@ type peerStatus struct {
|
||||||
status connStatus
|
status connStatus
|
||||||
canConnectAfter time.Time
|
canConnectAfter time.Time
|
||||||
lastConnectionAttempt time.Time
|
lastConnectionAttempt time.Time
|
||||||
storenode mailserversDB.Mailserver
|
mailserver mailserversDB.Mailserver
|
||||||
}
|
}
|
||||||
type storenodeCycle struct {
|
type mailserverCycle struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
allStorenodes []mailserversDB.Mailserver
|
allMailservers []mailserversDB.Mailserver
|
||||||
activeStorenode *mailserversDB.Mailserver
|
activeMailserver *mailserversDB.Mailserver
|
||||||
peers map[string]peerStatus
|
peers map[string]peerStatus
|
||||||
events chan *p2p.PeerEvent
|
|
||||||
subscription event.Subscription
|
|
||||||
availabilitySubscriptions []chan struct{}
|
availabilitySubscriptions []chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -601,7 +597,7 @@ func NewMessenger(
|
||||||
peerStore: peerStore,
|
peerStore: peerStore,
|
||||||
mvdsStatusChangeEvent: make(chan datasyncnode.PeerStatusChangeEvent, 5),
|
mvdsStatusChangeEvent: make(chan datasyncnode.PeerStatusChangeEvent, 5),
|
||||||
verificationDatabase: verification.NewPersistence(database),
|
verificationDatabase: verification.NewPersistence(database),
|
||||||
storenodeCycle: storenodeCycle{
|
mailserverCycle: mailserverCycle{
|
||||||
peers: make(map[string]peerStatus),
|
peers: make(map[string]peerStatus),
|
||||||
availabilitySubscriptions: make([]chan struct{}, 0),
|
availabilitySubscriptions: make([]chan struct{}, 0),
|
||||||
},
|
},
|
||||||
|
@ -860,13 +856,13 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
|
||||||
}
|
}
|
||||||
response := &MessengerResponse{}
|
response := &MessengerResponse{}
|
||||||
|
|
||||||
storenodes, err := m.allStorenodes()
|
mailservers, err := m.allMailservers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
response.Mailservers = storenodes
|
response.Mailservers = mailservers
|
||||||
err = m.StartStorenodeCycle(storenodes)
|
err = m.StartMailserverCycle(mailservers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ var ErrNoFiltersForChat = errors.New("no filter registered for given chat")
|
||||||
|
|
||||||
func (m *Messenger) shouldSync() (bool, error) {
|
func (m *Messenger) shouldSync() (bool, error) {
|
||||||
// TODO (pablo) support community store node as well
|
// TODO (pablo) support community store node as well
|
||||||
if m.storenodeCycle.activeStorenode == nil || !m.Online() {
|
if m.mailserverCycle.activeMailserver == nil || !m.Online() {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,18 +91,18 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) {
|
||||||
|
|
||||||
func (m *Messenger) connectToNewMailserverAndWait() error {
|
func (m *Messenger) connectToNewMailserverAndWait() error {
|
||||||
// Handle pinned mailservers
|
// Handle pinned mailservers
|
||||||
m.logger.Info("disconnecting storenode")
|
m.logger.Info("disconnecting mailserver")
|
||||||
pinnedMailserver, err := m.getPinnedMailserver()
|
pinnedMailserver, err := m.getPinnedMailserver()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
|
m.logger.Error("could not obtain the pinned mailserver", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If pinned mailserver is not nil, no need to disconnect and wait for it to be available
|
// If pinned mailserver is not nil, no need to disconnect and wait for it to be available
|
||||||
if pinnedMailserver == nil {
|
if pinnedMailserver == nil {
|
||||||
m.disconnectActiveStorenode(graylistBackoff)
|
m.disconnectActiveMailserver(graylistBackoff)
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.findNewStorenode()
|
return m.findNewMailserver()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func(mailServer mailservers.Mailserver) (*MessengerResponse, error)) (*MessengerResponse, error) {
|
func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func(mailServer mailservers.Mailserver) (*MessengerResponse, error)) (*MessengerResponse, error) {
|
||||||
|
@ -110,27 +110,27 @@ func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func
|
||||||
return nil, errors.New("mailserver not available")
|
return nil, errors.New("mailserver not available")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.storenodeCycle.RLock()
|
m.mailserverCycle.RLock()
|
||||||
defer m.storenodeCycle.RUnlock()
|
defer m.mailserverCycle.RUnlock()
|
||||||
var tries uint = 0
|
var tries uint = 0
|
||||||
for tries < mailserverMaxTries {
|
for tries < mailserverMaxTries {
|
||||||
if !m.communityStorenodes.IsCommunityStoreNode(ms.ID) && !m.isMailserverAvailable(ms.ID) {
|
if !m.communityStorenodes.IsCommunityStoreNode(ms.ID) && !m.isMailserverAvailable(ms.ID) {
|
||||||
return nil, errors.New("storenode not available")
|
return nil, errors.New("storenode not available")
|
||||||
}
|
}
|
||||||
m.logger.Info("trying performing store requests", zap.Uint("try", tries), zap.String("storenodeID", ms.ID))
|
m.logger.Info("trying performing mailserver requests", zap.Uint("try", tries), zap.String("mailserverID", ms.ID))
|
||||||
|
|
||||||
// Peform request
|
// Peform request
|
||||||
response, err := fn(*ms) // pass by value because we don't want the fn to modify the mailserver
|
response, err := fn(*ms) // pass by value because we don't want the fn to modify the mailserver
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// Reset failed requests
|
// Reset failed requests
|
||||||
m.logger.Debug("storenode request performed successfully",
|
m.logger.Debug("mailserver request performed successfully",
|
||||||
zap.String("storenodeID", ms.ID))
|
zap.String("mailserverID", ms.ID))
|
||||||
ms.FailedRequests = 0
|
ms.FailedRequests = 0
|
||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Error("failed to perform store request",
|
m.logger.Error("failed to perform mailserver request",
|
||||||
zap.String("storenodeID", ms.ID),
|
zap.String("mailserverID", ms.ID),
|
||||||
zap.Uint("tries", tries),
|
zap.Uint("tries", tries),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
@ -349,6 +349,10 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.mailserversDatabase == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
if forceFetchingBackup || !backupFetched {
|
if forceFetchingBackup || !backupFetched {
|
||||||
m.logger.Info("fetching backup")
|
m.logger.Info("fetching backup")
|
||||||
err := m.syncBackup()
|
err := m.syncBackup()
|
||||||
|
@ -715,7 +719,7 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag
|
||||||
type work struct {
|
type work struct {
|
||||||
pubsubTopic string
|
pubsubTopic string
|
||||||
contentTopics []types.TopicType
|
contentTopics []types.TopicType
|
||||||
storeCursor types.StoreRequestCursor
|
cursor types.StoreRequestCursor
|
||||||
limit uint32
|
limit uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -730,7 +734,7 @@ type messageRequester interface {
|
||||||
limit uint32,
|
limit uint32,
|
||||||
waitForResponse bool,
|
waitForResponse bool,
|
||||||
processEnvelopes bool,
|
processEnvelopes bool,
|
||||||
) (storeCursor types.StoreRequestCursor, envelopesCount int, err error)
|
) (cursor types.StoreRequestCursor, envelopesCount int, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func processMailserverBatch(
|
func processMailserverBatch(
|
||||||
|
@ -833,8 +837,7 @@ loop:
|
||||||
}()
|
}()
|
||||||
|
|
||||||
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
|
||||||
storeCursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.storeCursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
|
cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
|
||||||
|
|
||||||
queryCancel()
|
queryCancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -856,7 +859,7 @@ loop:
|
||||||
|
|
||||||
// Check the cursor after calling `shouldProcessNextPage`.
|
// Check the cursor after calling `shouldProcessNextPage`.
|
||||||
// The app might use process the fetched envelopes in the callback for own needs.
|
// The app might use process the fetched envelopes in the callback for own needs.
|
||||||
if len(cursor) == 0 && storeCursor == nil {
|
if cursor == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,7 +869,7 @@ loop:
|
||||||
workCh <- work{
|
workCh <- work{
|
||||||
pubsubTopic: w.pubsubTopic,
|
pubsubTopic: w.pubsubTopic,
|
||||||
contentTopics: w.contentTopics,
|
contentTopics: w.contentTopics,
|
||||||
storeCursor: storeCursor,
|
cursor: cursor,
|
||||||
limit: nextPageLimit,
|
limit: nextPageLimit,
|
||||||
}
|
}
|
||||||
}(w)
|
}(w)
|
||||||
|
@ -915,12 +918,12 @@ func (m *Messenger) processMailserverBatch(ms mailservers.Mailserver, batch Mail
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
storenodeID, err := ms.PeerID()
|
mailserverID, err := ms.PeerID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logger := m.logger.With(zap.String("storenode", ms.ID))
|
logger := m.logger.With(zap.String("mailserverID", ms.ID))
|
||||||
return processMailserverBatch(m.ctx, m.transport, batch, storenodeID, logger, defaultStoreNodeRequestPageSize, nil, false)
|
return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error {
|
func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error {
|
||||||
|
@ -932,12 +935,12 @@ func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
storenodeID, err := ms.PeerID()
|
mailserverID, err := ms.PeerID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logger := m.logger.With(zap.String("storenodeID", ms.ID))
|
logger := m.logger.With(zap.String("mailserverID", ms.ID))
|
||||||
return processMailserverBatch(m.ctx, m.transport, batch, storenodeID, logger, pageLimit, shouldProcessNextPage, processEnvelopes)
|
return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, pageLimit, shouldProcessNextPage, processEnvelopes)
|
||||||
}
|
}
|
||||||
|
|
||||||
type MailserverBatch struct {
|
type MailserverBatch struct {
|
||||||
|
@ -1089,15 +1092,15 @@ func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) ToggleUseMailservers(value bool) error {
|
func (m *Messenger) ToggleUseMailservers(value bool) error {
|
||||||
m.storenodeCycle.Lock()
|
m.mailserverCycle.Lock()
|
||||||
defer m.storenodeCycle.Unlock()
|
defer m.mailserverCycle.Unlock()
|
||||||
|
|
||||||
err := m.settings.SetUseMailservers(value)
|
err := m.settings.SetUseMailservers(value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.disconnectActiveStorenode(backoffByUserAction)
|
m.disconnectActiveMailserver(backoffByUserAction)
|
||||||
if value {
|
if value {
|
||||||
m.cycleMailservers()
|
m.cycleMailservers()
|
||||||
return nil
|
return nil
|
||||||
|
@ -1111,7 +1114,7 @@ func (m *Messenger) SetPinnedMailservers(mailservers map[string]string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.disconnectActiveStorenode(backoffByUserAction)
|
m.disconnectActiveMailserver(backoffByUserAction)
|
||||||
m.cycleMailservers()
|
m.cycleMailservers()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,13 +8,14 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
|
||||||
"github.com/status-im/status-go/params"
|
"github.com/status-im/status-go/params"
|
||||||
"github.com/status-im/status-go/protocol/storenodes"
|
"github.com/status-im/status-go/protocol/storenodes"
|
||||||
"github.com/status-im/status-go/services/mailservers"
|
"github.com/status-im/status-go/services/mailservers"
|
||||||
|
@ -25,15 +26,11 @@ const defaultBackoff = 10 * time.Second
|
||||||
const graylistBackoff = 3 * time.Minute
|
const graylistBackoff = 3 * time.Minute
|
||||||
const backoffByUserAction = 0
|
const backoffByUserAction = 0
|
||||||
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
|
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
|
||||||
const findNearestStorenode = !isAndroidEmulator
|
const findNearestMailServer = !isAndroidEmulator
|
||||||
const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios"
|
const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios"
|
||||||
const bootstrapDNS = "8.8.8.8:53"
|
const bootstrapDNS = "8.8.8.8:53"
|
||||||
|
|
||||||
func (m *Messenger) storenodesByFleet(fleet string) []mailservers.Mailserver {
|
type byRTTMsAndCanConnectBefore []SortedMailserver
|
||||||
return mailservers.DefaultStorenodesByFleet(fleet)
|
|
||||||
}
|
|
||||||
|
|
||||||
type byRTTMsAndCanConnectBefore []SortedStorenodes
|
|
||||||
|
|
||||||
func (s byRTTMsAndCanConnectBefore) Len() int {
|
func (s byRTTMsAndCanConnectBefore) Len() int {
|
||||||
return len(s)
|
return len(s)
|
||||||
|
@ -47,26 +44,32 @@ func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool {
|
||||||
// Slightly inaccurate as time sensitive sorting, but it does not matter so much
|
// Slightly inaccurate as time sensitive sorting, but it does not matter so much
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) {
|
if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) {
|
||||||
return s[i].RTTMs < s[j].RTTMs
|
return s[i].RTT < s[j].RTT
|
||||||
}
|
}
|
||||||
return s[i].CanConnectAfter.Before(s[j].CanConnectAfter)
|
return s[i].CanConnectAfter.Before(s[j].CanConnectAfter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) StartStorenodeCycle(storenodes []mailservers.Mailserver) error {
|
func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error {
|
||||||
m.storenodeCycle.allStorenodes = storenodes
|
if m.transport.WakuVersion() != 2 {
|
||||||
|
m.logger.Warn("not starting mailserver cycle: requires wakuv2")
|
||||||
if len(storenodes) == 0 {
|
|
||||||
m.logger.Warn("not starting storenode cycle: empty storenode list")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for _, storenode := range storenodes {
|
|
||||||
|
m.mailserverCycle.allMailservers = mailservers
|
||||||
|
|
||||||
|
if len(mailservers) == 0 {
|
||||||
|
m.logger.Warn("not starting mailserver cycle: empty mailservers list")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, storenode := range mailservers {
|
||||||
|
|
||||||
peerInfo, err := storenode.PeerInfo()
|
peerInfo, err := storenode.PeerInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range peerInfo.Addrs {
|
for _, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
|
||||||
_, err := m.transport.AddStorePeer(addr)
|
_, err := m.transport.AddStorePeer(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -75,59 +78,59 @@ func (m *Messenger) StartStorenodeCycle(storenodes []mailservers.Mailserver) err
|
||||||
}
|
}
|
||||||
go m.verifyStorenodeStatus()
|
go m.verifyStorenodeStatus()
|
||||||
|
|
||||||
m.logger.Debug("starting storenode cycle",
|
m.logger.Debug("starting mailserver cycle",
|
||||||
zap.Uint("WakuVersion", m.transport.WakuVersion()),
|
zap.Uint("WakuVersion", m.transport.WakuVersion()),
|
||||||
zap.Any("storenode", storenodes),
|
zap.Any("mailservers", mailservers),
|
||||||
)
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) DisconnectActiveStorenode() {
|
func (m *Messenger) DisconnectActiveMailserver() {
|
||||||
m.storenodeCycle.Lock()
|
m.mailserverCycle.Lock()
|
||||||
defer m.storenodeCycle.Unlock()
|
defer m.mailserverCycle.Unlock()
|
||||||
m.disconnectActiveStorenode(graylistBackoff)
|
m.disconnectActiveMailserver(graylistBackoff)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) disconnecStorenode(backoffDuration time.Duration) error {
|
func (m *Messenger) disconnectMailserver(backoffDuration time.Duration) error {
|
||||||
if m.storenodeCycle.activeStorenode == nil {
|
if m.mailserverCycle.activeMailserver == nil {
|
||||||
m.logger.Info("no active storenode")
|
m.logger.Info("no active mailserver")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
m.logger.Info("disconnecting active storenode", zap.String("nodeID", m.storenodeCycle.activeStorenode.ID))
|
m.logger.Info("disconnecting active mailserver", zap.String("nodeID", m.mailserverCycle.activeMailserver.ID))
|
||||||
m.mailPeersMutex.Lock()
|
m.mailPeersMutex.Lock()
|
||||||
pInfo, ok := m.storenodeCycle.peers[m.storenodeCycle.activeStorenode.ID]
|
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID]
|
||||||
if ok {
|
if ok {
|
||||||
pInfo.status = disconnected
|
pInfo.status = disconnected
|
||||||
|
|
||||||
pInfo.canConnectAfter = time.Now().Add(backoffDuration)
|
pInfo.canConnectAfter = time.Now().Add(backoffDuration)
|
||||||
m.storenodeCycle.peers[m.storenodeCycle.activeStorenode.ID] = pInfo
|
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = pInfo
|
||||||
} else {
|
} else {
|
||||||
m.storenodeCycle.peers[m.storenodeCycle.activeStorenode.ID] = peerStatus{
|
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = peerStatus{
|
||||||
status: disconnected,
|
status: disconnected,
|
||||||
storenode: *m.storenodeCycle.activeStorenode,
|
mailserver: *m.mailserverCycle.activeMailserver,
|
||||||
canConnectAfter: time.Now().Add(backoffDuration),
|
canConnectAfter: time.Now().Add(backoffDuration),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.mailPeersMutex.Unlock()
|
m.mailPeersMutex.Unlock()
|
||||||
|
|
||||||
m.storenodeCycle.activeStorenode = nil
|
m.mailserverCycle.activeMailserver = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) disconnectActiveStorenode(backoffDuration time.Duration) {
|
func (m *Messenger) disconnectActiveMailserver(backoffDuration time.Duration) {
|
||||||
err := m.disconnecStorenode(backoffDuration)
|
err := m.disconnectMailserver(backoffDuration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("failed to disconnect storenode", zap.Error(err))
|
m.logger.Error("failed to disconnect mailserver", zap.Error(err))
|
||||||
}
|
}
|
||||||
signal.SendMailserverChanged(nil)
|
signal.SendMailserverChanged(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) cycleMailservers() {
|
func (m *Messenger) cycleMailservers() {
|
||||||
m.logger.Info("Automatically switching storenode")
|
m.logger.Info("Automatically switching mailserver")
|
||||||
|
|
||||||
if m.storenodeCycle.activeStorenode != nil {
|
if m.mailserverCycle.activeMailserver != nil {
|
||||||
m.disconnectActiveStorenode(graylistBackoff)
|
m.disconnectActiveMailserver(graylistBackoff)
|
||||||
}
|
}
|
||||||
|
|
||||||
useMailserver, err := m.settings.CanUseMailservers()
|
useMailserver, err := m.settings.CanUseMailservers()
|
||||||
|
@ -137,13 +140,13 @@ func (m *Messenger) cycleMailservers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !useMailserver {
|
if !useMailserver {
|
||||||
m.logger.Info("Skipping storenode search due to useMailserver being false")
|
m.logger.Info("Skipping mailserver search due to useMailserver being false")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.findNewStorenode()
|
err = m.findNewMailserver()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("Error getting new storenode", zap.Error(err))
|
m.logger.Error("Error getting new mailserver", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,22 +165,22 @@ func (m *Messenger) getFleet() (string, error) {
|
||||||
} else if m.config.clusterConfig.Fleet != "" {
|
} else if m.config.clusterConfig.Fleet != "" {
|
||||||
fleet = m.config.clusterConfig.Fleet
|
fleet = m.config.clusterConfig.Fleet
|
||||||
} else {
|
} else {
|
||||||
fleet = params.FleetProd
|
fleet = params.FleetStatusProd
|
||||||
}
|
}
|
||||||
return fleet, nil
|
return fleet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) allStorenodes() ([]mailservers.Mailserver, error) {
|
func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) {
|
||||||
// Get configured fleet
|
// Get configured fleet
|
||||||
fleet, err := m.getFleet()
|
fleet, err := m.getFleet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get default storenode for given fleet
|
// Get default mailservers for given fleet
|
||||||
allStorenodes := m.storenodesByFleet(fleet)
|
allMailservers := mailservers.DefaultMailserversByFleet(fleet)
|
||||||
|
|
||||||
// Add custom configured storenode
|
// Add custom configured mailservers
|
||||||
if m.mailserversDatabase != nil {
|
if m.mailserversDatabase != nil {
|
||||||
customMailservers, err := m.mailserversDatabase.Mailservers()
|
customMailservers, err := m.mailserversDatabase.Mailservers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -186,22 +189,85 @@ func (m *Messenger) allStorenodes() ([]mailservers.Mailserver, error) {
|
||||||
|
|
||||||
for _, c := range customMailservers {
|
for _, c := range customMailservers {
|
||||||
if c.Fleet == fleet {
|
if c.Fleet == fleet {
|
||||||
allStorenodes = append(allStorenodes, c)
|
allMailservers = append(allMailservers, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return allStorenodes, nil
|
return allMailservers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type SortedStorenodes struct {
|
type SortedMailserver struct {
|
||||||
Mailserver mailservers.Mailserver
|
Mailserver mailservers.Mailserver
|
||||||
RTTMs int
|
RTT time.Duration
|
||||||
CanConnectAfter time.Time
|
CanConnectAfter time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) findNewStorenode() error {
|
func (m *Messenger) getAvailableMailserversSortedByRTT(allMailservers []mailservers.Mailserver) []mailservers.Mailserver {
|
||||||
|
// TODO: this can be replaced by peer selector once code is moved to go-waku api
|
||||||
|
availableMailservers := make(map[string]time.Duration)
|
||||||
|
availableMailserversMutex := sync.Mutex{}
|
||||||
|
availableMailserversWg := sync.WaitGroup{}
|
||||||
|
for _, mailserver := range allMailservers {
|
||||||
|
availableMailserversWg.Add(1)
|
||||||
|
go func(mailserver mailservers.Mailserver) {
|
||||||
|
defer availableMailserversWg.Done()
|
||||||
|
|
||||||
|
peerID, err := mailserver.PeerID()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(m.ctx, 4*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rtt, err := m.transport.PingPeer(ctx, peerID)
|
||||||
|
if err == nil { // pinging mailservers might fail, but we don't care
|
||||||
|
availableMailserversMutex.Lock()
|
||||||
|
availableMailservers[mailserver.ID] = rtt
|
||||||
|
availableMailserversMutex.Unlock()
|
||||||
|
}
|
||||||
|
}(mailserver)
|
||||||
|
}
|
||||||
|
availableMailserversWg.Wait()
|
||||||
|
|
||||||
|
if len(availableMailservers) == 0 {
|
||||||
|
m.logger.Warn("No mailservers available") // Do nothing...
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
mailserversByID := make(map[string]mailservers.Mailserver)
|
||||||
|
for idx := range allMailservers {
|
||||||
|
mailserversByID[allMailservers[idx].ID] = allMailservers[idx]
|
||||||
|
}
|
||||||
|
var sortedMailservers []SortedMailserver
|
||||||
|
for mailserverID, rtt := range availableMailservers {
|
||||||
|
ms := mailserversByID[mailserverID]
|
||||||
|
sortedMailserver := SortedMailserver{
|
||||||
|
Mailserver: ms,
|
||||||
|
RTT: rtt,
|
||||||
|
}
|
||||||
|
m.mailPeersMutex.Lock()
|
||||||
|
pInfo, ok := m.mailserverCycle.peers[ms.ID]
|
||||||
|
m.mailPeersMutex.Unlock()
|
||||||
|
if ok {
|
||||||
|
if time.Now().Before(pInfo.canConnectAfter) {
|
||||||
|
continue // We can't connect to this node yet
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sortedMailservers = append(sortedMailservers, sortedMailserver)
|
||||||
|
}
|
||||||
|
sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers))
|
||||||
|
|
||||||
|
result := make([]mailservers.Mailserver, len(sortedMailservers))
|
||||||
|
for i, s := range sortedMailservers {
|
||||||
|
result[i] = s.Mailserver
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Messenger) findNewMailserver() error {
|
||||||
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
||||||
if overrideDNS {
|
if overrideDNS {
|
||||||
var dialer net.Dialer
|
var dialer net.Dialer
|
||||||
|
@ -226,95 +292,22 @@ func (m *Messenger) findNewStorenode() error {
|
||||||
return m.connectToMailserver(*pinnedMailserver)
|
return m.connectToMailserver(*pinnedMailserver)
|
||||||
}
|
}
|
||||||
|
|
||||||
allStorenodes := m.storenodeCycle.allStorenodes
|
m.logger.Info("Finding a new mailserver...")
|
||||||
|
|
||||||
|
allMailservers := m.mailserverCycle.allMailservers
|
||||||
|
|
||||||
// TODO: remove this check once sockets are stable on x86_64 emulators
|
// TODO: remove this check once sockets are stable on x86_64 emulators
|
||||||
if findNearestStorenode {
|
if findNearestMailServer {
|
||||||
m.logger.Info("Finding a new storenode...")
|
allMailservers = m.getAvailableMailserversSortedByRTT(allMailservers)
|
||||||
|
|
||||||
if len(allStorenodes) == 0 {
|
|
||||||
m.logger.Warn("no storenodes available") // Do nothing...
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
pingResult, err := m.transport.PingPeer(m.ctx, allStorenodes, 500)
|
|
||||||
if err != nil {
|
|
||||||
// pinging storenodes might fail, but we don't care
|
|
||||||
m.logger.Warn("ping failed with", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
var availableStorenodes []*mailservers.PingResult
|
|
||||||
for _, result := range pingResult {
|
|
||||||
if result.Err != nil {
|
|
||||||
m.logger.Info("connecting error", zap.String("err", *result.Err))
|
|
||||||
continue // The results with error are ignored
|
|
||||||
}
|
|
||||||
availableStorenodes = append(availableStorenodes, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(availableStorenodes) == 0 {
|
|
||||||
m.logger.Warn("No storenodes available") // Do nothing...
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
mailserversByID := make(map[string]mailservers.Mailserver)
|
|
||||||
for idx := range allStorenodes {
|
|
||||||
mailserversByID[allStorenodes[idx].ID] = allStorenodes[idx]
|
|
||||||
}
|
|
||||||
var sortedMailservers []SortedStorenodes
|
|
||||||
for _, ping := range availableStorenodes {
|
|
||||||
ms := mailserversByID[ping.ID]
|
|
||||||
sortedMailserver := SortedStorenodes{
|
|
||||||
Mailserver: ms,
|
|
||||||
RTTMs: *ping.RTTMs,
|
|
||||||
}
|
|
||||||
m.mailPeersMutex.Lock()
|
|
||||||
pInfo, ok := m.storenodeCycle.peers[ms.ID]
|
|
||||||
m.mailPeersMutex.Unlock()
|
|
||||||
if ok {
|
|
||||||
if time.Now().Before(pInfo.canConnectAfter) {
|
|
||||||
continue // We can't connect to this node yet
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sortedMailservers = append(sortedMailservers, sortedMailserver)
|
|
||||||
|
|
||||||
}
|
|
||||||
sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers))
|
|
||||||
|
|
||||||
// Picks a random mailserver amongs the ones with the lowest latency
|
|
||||||
// The pool size is 1/4 of the mailservers were pinged successfully
|
|
||||||
pSize := poolSize(len(sortedMailservers) - 1)
|
|
||||||
if pSize <= 0 {
|
|
||||||
pSize = len(sortedMailservers)
|
|
||||||
if pSize <= 0 {
|
|
||||||
m.logger.Warn("No mailservers available") // Do nothing...
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
msPing := sortedMailservers[r.Int64()]
|
|
||||||
ms := mailserversByID[msPing.Mailserver.ID]
|
|
||||||
m.logger.Info("connecting to mailserver", zap.String("address", ms.ID))
|
|
||||||
return m.connectToMailserver(ms)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mailserversByID := make(map[string]mailservers.Mailserver)
|
// Picks a random mailserver amongs the ones with the lowest latency
|
||||||
for idx := range allStorenodes {
|
// The pool size is 1/4 of the mailservers were pinged successfully
|
||||||
mailserversByID[allStorenodes[idx].ID] = allStorenodes[idx]
|
pSize := poolSize(len(allMailservers) - 1)
|
||||||
}
|
|
||||||
|
|
||||||
pSize := poolSize(len(allStorenodes) - 1)
|
|
||||||
if pSize <= 0 {
|
if pSize <= 0 {
|
||||||
pSize = len(allStorenodes)
|
pSize = len(allMailservers)
|
||||||
if pSize <= 0 {
|
if pSize <= 0 {
|
||||||
m.logger.Warn("No mailservers available") // Do nothing...
|
m.logger.Warn("No storenodes available") // Do nothing...
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -324,17 +317,14 @@ func (m *Messenger) findNewStorenode() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
msPing := allStorenodes[r.Int64()]
|
ms := allMailservers[r.Int64()]
|
||||||
ms := mailserversByID[msPing.ID]
|
|
||||||
m.logger.Info("connecting to mailserver", zap.String("address", ms.ID))
|
|
||||||
|
|
||||||
return m.connectToMailserver(ms)
|
return m.connectToMailserver(ms)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) mailserverStatus(mailserverID string) connStatus {
|
func (m *Messenger) mailserverStatus(mailserverID string) connStatus {
|
||||||
m.mailPeersMutex.RLock()
|
m.mailPeersMutex.RLock()
|
||||||
defer m.mailPeersMutex.RUnlock()
|
defer m.mailPeersMutex.RUnlock()
|
||||||
peer, ok := m.storenodeCycle.peers[mailserverID]
|
peer, ok := m.mailserverCycle.peers[mailserverID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return disconnected
|
return disconnected
|
||||||
}
|
}
|
||||||
|
@ -343,32 +333,32 @@ func (m *Messenger) mailserverStatus(mailserverID string) connStatus {
|
||||||
|
|
||||||
func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
|
func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
|
||||||
|
|
||||||
m.logger.Info("connecting to mailserver", zap.Any("peer", ms.ID))
|
m.logger.Info("connecting to mailserver", zap.String("mailserverID", ms.ID))
|
||||||
|
|
||||||
m.storenodeCycle.activeStorenode = &ms
|
m.mailserverCycle.activeMailserver = &ms
|
||||||
signal.SendMailserverChanged(m.storenodeCycle.activeStorenode)
|
signal.SendMailserverChanged(m.mailserverCycle.activeMailserver)
|
||||||
|
|
||||||
activeMailserverStatus := m.mailserverStatus(ms.ID)
|
mailserverStatus := m.mailserverStatus(ms.ID)
|
||||||
if activeMailserverStatus != connected {
|
if mailserverStatus != connected {
|
||||||
m.mailPeersMutex.Lock()
|
m.mailPeersMutex.Lock()
|
||||||
m.storenodeCycle.peers[ms.ID] = peerStatus{
|
m.mailserverCycle.peers[ms.ID] = peerStatus{
|
||||||
status: connected,
|
status: connected,
|
||||||
lastConnectionAttempt: time.Now(),
|
lastConnectionAttempt: time.Now(),
|
||||||
canConnectAfter: time.Now().Add(defaultBackoff),
|
canConnectAfter: time.Now().Add(defaultBackoff),
|
||||||
storenode: ms,
|
mailserver: ms,
|
||||||
}
|
}
|
||||||
m.mailPeersMutex.Unlock()
|
m.mailPeersMutex.Unlock()
|
||||||
|
|
||||||
m.storenodeCycle.activeStorenode.FailedRequests = 0
|
m.mailserverCycle.activeMailserver.FailedRequests = 0
|
||||||
peerID, err := m.storenodeCycle.activeStorenode.PeerID()
|
peerID, err := m.mailserverCycle.activeMailserver.PeerID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Error("could not decode the peer id of storenode", zap.Error(err))
|
m.logger.Error("could not decode the peer id of mailserver", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Info("storenode available", zap.String("storenodeID", m.storenodeCycle.activeStorenode.ID))
|
m.logger.Info("mailserver available", zap.String("mailserverID", m.mailserverCycle.activeMailserver.ID))
|
||||||
m.EmitMailserverAvailable()
|
m.EmitMailserverAvailable()
|
||||||
signal.SendMailserverAvailable(m.storenodeCycle.activeStorenode)
|
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver)
|
||||||
|
|
||||||
m.transport.SetStorePeerID(peerID)
|
m.transport.SetStorePeerID(peerID)
|
||||||
|
|
||||||
|
@ -382,15 +372,15 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
|
||||||
// for that community if it has a mailserver setup otherwise it'll return the global mailserver
|
// for that community if it has a mailserver setup otherwise it'll return the global mailserver
|
||||||
func (m *Messenger) getActiveMailserver(communityID ...string) *mailservers.Mailserver {
|
func (m *Messenger) getActiveMailserver(communityID ...string) *mailservers.Mailserver {
|
||||||
if len(communityID) == 0 || communityID[0] == "" {
|
if len(communityID) == 0 || communityID[0] == "" {
|
||||||
return m.storenodeCycle.activeStorenode
|
return m.mailserverCycle.activeMailserver
|
||||||
}
|
}
|
||||||
ms, err := m.communityStorenodes.GetStorenodeByCommunnityID(communityID[0])
|
ms, err := m.communityStorenodes.GetStorenodeByCommunityID(communityID[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, storenodes.ErrNotFound) {
|
if !errors.Is(err, storenodes.ErrNotFound) {
|
||||||
m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err))
|
m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err))
|
||||||
}
|
}
|
||||||
// if we don't find a specific mailserver for the community, we just use the regular mailserverCycle's one
|
// if we don't find a specific mailserver for the community, we just use the regular mailserverCycle's one
|
||||||
return m.storenodeCycle.activeStorenode
|
return m.mailserverCycle.activeMailserver
|
||||||
}
|
}
|
||||||
return &ms
|
return &ms
|
||||||
}
|
}
|
||||||
|
@ -407,43 +397,16 @@ func (m *Messenger) isMailserverAvailable(mailserverID string) bool {
|
||||||
return m.mailserverStatus(mailserverID) == connected
|
return m.mailserverStatus(mailserverID) == connected
|
||||||
}
|
}
|
||||||
|
|
||||||
func mailserverAddressToID(uniqueID string, allStorenodes []mailservers.Mailserver) (string, error) {
|
|
||||||
for _, ms := range allStorenodes {
|
|
||||||
if uniqueID == ms.ID {
|
|
||||||
return ms.ID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type ConnectedPeer struct {
|
|
||||||
UniqueID string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Messenger) mailserverPeersInfo() []ConnectedPeer {
|
|
||||||
var connectedPeers []ConnectedPeer
|
|
||||||
for _, connectedPeer := range m.server.PeersInfo() {
|
|
||||||
connectedPeers = append(connectedPeers, ConnectedPeer{
|
|
||||||
// This is a bit fragile, but should work
|
|
||||||
UniqueID: strings.TrimSuffix(connectedPeer.Enode, "?discport=0"),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return connectedPeers
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Messenger) penalizeMailserver(id string) {
|
func (m *Messenger) penalizeMailserver(id string) {
|
||||||
m.mailPeersMutex.Lock()
|
m.mailPeersMutex.Lock()
|
||||||
defer m.mailPeersMutex.Unlock()
|
defer m.mailPeersMutex.Unlock()
|
||||||
pInfo, ok := m.storenodeCycle.peers[id]
|
pInfo, ok := m.mailserverCycle.peers[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
pInfo.status = disconnected
|
pInfo.status = disconnected
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
|
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
|
||||||
m.storenodeCycle.peers[id] = pInfo
|
m.mailserverCycle.peers[id] = pInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) asyncRequestAllHistoricMessages() {
|
func (m *Messenger) asyncRequestAllHistoricMessages() {
|
||||||
|
@ -496,7 +459,7 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fleetMailservers := mailservers.DefaultStorenodes()
|
fleetMailservers := mailservers.DefaultMailservers()
|
||||||
|
|
||||||
for _, c := range fleetMailservers {
|
for _, c := range fleetMailservers {
|
||||||
if c.Fleet == fleet && c.ID == pinnedMailserver {
|
if c.Fleet == fleet && c.ID == pinnedMailserver {
|
||||||
|
@ -521,35 +484,35 @@ func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) EmitMailserverAvailable() {
|
func (m *Messenger) EmitMailserverAvailable() {
|
||||||
for _, s := range m.storenodeCycle.availabilitySubscriptions {
|
for _, s := range m.mailserverCycle.availabilitySubscriptions {
|
||||||
s <- struct{}{}
|
s <- struct{}{}
|
||||||
close(s)
|
close(s)
|
||||||
l := len(m.storenodeCycle.availabilitySubscriptions)
|
l := len(m.mailserverCycle.availabilitySubscriptions)
|
||||||
m.storenodeCycle.availabilitySubscriptions = m.storenodeCycle.availabilitySubscriptions[:l-1]
|
m.mailserverCycle.availabilitySubscriptions = m.mailserverCycle.availabilitySubscriptions[:l-1]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) SubscribeMailserverAvailable() chan struct{} {
|
func (m *Messenger) SubscribeMailserverAvailable() chan struct{} {
|
||||||
c := make(chan struct{})
|
c := make(chan struct{})
|
||||||
m.storenodeCycle.availabilitySubscriptions = append(m.storenodeCycle.availabilitySubscriptions, c)
|
m.mailserverCycle.availabilitySubscriptions = append(m.mailserverCycle.availabilitySubscriptions, c)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) disconnectStorenodeIfRequired() error {
|
func (m *Messenger) disconnectStorenodeIfRequired() error {
|
||||||
m.logger.Debug("wakuV2 storenode status verification")
|
m.logger.Debug("wakuV2 storenode status verification")
|
||||||
|
|
||||||
if m.storenodeCycle.activeStorenode == nil {
|
if m.mailserverCycle.activeMailserver == nil {
|
||||||
// No active storenode, find a new one
|
// No active storenode, find a new one
|
||||||
m.cycleMailservers()
|
m.cycleMailservers()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check whether we want to disconnect the active storenode
|
// Check whether we want to disconnect the active storenode
|
||||||
if m.storenodeCycle.activeStorenode.FailedRequests >= mailserverMaxFailedRequests {
|
if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests {
|
||||||
m.penalizeMailserver(m.storenodeCycle.activeStorenode.ID)
|
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
|
||||||
signal.SendMailserverNotWorking()
|
signal.SendMailserverNotWorking()
|
||||||
m.logger.Info("too many failed requests", zap.String("storenode", m.storenodeCycle.activeStorenode.ID))
|
m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.ID))
|
||||||
m.storenodeCycle.activeStorenode.FailedRequests = 0
|
m.mailserverCycle.activeMailserver.FailedRequests = 0
|
||||||
return m.connectToNewMailserverAndWait()
|
return m.connectToNewMailserverAndWait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,7 +520,7 @@ func (m *Messenger) disconnectStorenodeIfRequired() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool {
|
func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool {
|
||||||
// Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
|
// Add 1 second to timeout, because the mailserver cycle has 1 second ticker, which doesn't tick on start.
|
||||||
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
|
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
|
||||||
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
|
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
|
||||||
timeout += time.Second
|
timeout += time.Second
|
||||||
|
|
|
@ -7,8 +7,10 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"math/big"
|
"math/big"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
|
@ -37,21 +39,21 @@ func getInitialResponseKey(topics []types.TopicType) string {
|
||||||
|
|
||||||
func (t *mockTransport) SendMessagesRequestForTopics(
|
func (t *mockTransport) SendMessagesRequestForTopics(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
peerID []byte,
|
peerID peer.ID,
|
||||||
from, to uint32,
|
from, to uint32,
|
||||||
previousStoreCursor types.StoreRequestCursor,
|
prevCursor types.StoreRequestCursor,
|
||||||
pubsubTopic string,
|
pubsubTopic string,
|
||||||
contentTopics []types.TopicType,
|
contentTopics []types.TopicType,
|
||||||
limit uint32,
|
limit uint32,
|
||||||
waitForResponse bool,
|
waitForResponse bool,
|
||||||
processEnvelopes bool,
|
processEnvelopes bool,
|
||||||
) (storeCursor types.StoreRequestCursor, envelopesCount int, err error) {
|
) (cursor types.StoreRequestCursor, envelopesCount int, err error) {
|
||||||
var response queryResponse
|
var response queryResponse
|
||||||
if previousStoreCursor == nil {
|
if prevCursor == nil {
|
||||||
initialResponse := getInitialResponseKey(contentTopics)
|
initialResponse := getInitialResponseKey(contentTopics)
|
||||||
response = t.queryResponses[initialResponse]
|
response = t.queryResponses[initialResponse]
|
||||||
} else {
|
} else {
|
||||||
response = t.queryResponses[hex.EncodeToString(previousStoreCursor)]
|
response = t.queryResponses[hex.EncodeToString(prevCursor)]
|
||||||
}
|
}
|
||||||
return response.cursor, 0, response.err
|
return response.cursor, 0, response.err
|
||||||
}
|
}
|
||||||
|
@ -115,44 +117,51 @@ func (t *mockTransport) Populate(topics []types.TopicType, responses int, includ
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessMailserverBatchHappyPath(t *testing.T) {
|
func TestProcessMailserverBatchHappyPath(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
logger := tt.MustCreateTestLogger()
|
logger := tt.MustCreateTestLogger()
|
||||||
|
|
||||||
peerID := peer.
|
mailserverID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3")
|
||||||
mailserverID := []byte{1, 2, 3, 4, 5}
|
require.NoError(t, err)
|
||||||
topics := []types.TopicType{}
|
topics := []types.TopicType{}
|
||||||
for i := 0; i < 22; i++ {
|
for i := 0; i < 22; i++ {
|
||||||
topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)}))
|
topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
testTransport := newMockTransport()
|
testTransport := newMockTransport()
|
||||||
err := testTransport.Populate(topics, 10, false)
|
err = testTransport.Populate(topics, 10, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
testBatch := MailserverBatch{
|
testBatch := MailserverBatch{
|
||||||
Topics: topics,
|
Topics: topics,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
|
err = processMailserverBatch(ctx, testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessMailserverBatchFailure(t *testing.T) {
|
func TestProcessMailserverBatchFailure(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
logger := tt.MustCreateTestLogger()
|
logger := tt.MustCreateTestLogger()
|
||||||
|
|
||||||
mailserverID := []byte{1, 2, 3, 4, 5}
|
mailserverID, err := peer.Decode("16Uiu2HAkw3x97MbbZSWHbdF5bob45vcZvPPK4s4Mjyv2mxyB9GS3")
|
||||||
|
require.NoError(t, err)
|
||||||
topics := []types.TopicType{}
|
topics := []types.TopicType{}
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)}))
|
topics = append(topics, types.BytesToTopic([]byte{0, 0, 0, byte(i)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
testTransport := newMockTransport()
|
testTransport := newMockTransport()
|
||||||
err := testTransport.Populate(topics, 4, true)
|
err = testTransport.Populate(topics, 4, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
testBatch := MailserverBatch{
|
testBatch := MailserverBatch{
|
||||||
Topics: topics,
|
Topics: topics,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = processMailserverBatch(context.TODO(), testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
|
err = processMailserverBatch(ctx, testTransport, testBatch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,10 +3,11 @@ package protocol
|
||||||
import (
|
import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
"github.com/status-im/status-go/protocol/storenodes"
|
"github.com/status-im/status-go/protocol/storenodes"
|
||||||
|
|
||||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||||
|
@ -350,10 +351,10 @@ func (s *MessengerStoreNodeCommunitySuite) TestToggleUseMailservers() {
|
||||||
// Enable use of mailservers
|
// Enable use of mailservers
|
||||||
err := s.owner.ToggleUseMailservers(true)
|
err := s.owner.ToggleUseMailservers(true)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().NotNil(s.owner.storenodeCycle.activeStorenode)
|
s.Require().NotNil(s.owner.mailserverCycle.activeMailserver)
|
||||||
|
|
||||||
// Disable use of mailservers
|
// Disable use of mailservers
|
||||||
err = s.owner.ToggleUseMailservers(false)
|
err = s.owner.ToggleUseMailservers(false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Nil(s.owner.storenodeCycle.activeStorenode)
|
s.Require().Nil(s.owner.mailserverCycle.activeMailserver)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1046,7 +1046,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare things depending on the configuration
|
// Prepare things depending on the configuration
|
||||||
nodesList := mailserversDB.DefaultStorenodesByFleet(fleet)
|
nodesList := mailserversDB.DefaultMailserversByFleet(fleet)
|
||||||
descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
|
descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
|
||||||
shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID)))
|
shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID)))
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
|
||||||
"github.com/status-im/status-go/protocol/wakusync"
|
"github.com/status-im/status-go/protocol/wakusync"
|
||||||
|
|
||||||
"github.com/status-im/status-go/protocol/identity"
|
"github.com/status-im/status-go/protocol/identity"
|
||||||
|
|
|
@ -2,6 +2,7 @@ package storenodes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/protocol/protobuf"
|
"github.com/status-im/status-go/protocol/protobuf"
|
||||||
"github.com/status-im/status-go/services/mailservers"
|
"github.com/status-im/status-go/services/mailservers"
|
||||||
|
|
|
@ -39,8 +39,8 @@ type storenodesData struct {
|
||||||
storenodes []Storenode
|
storenodes []Storenode
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStorenodeByCommunnityID returns the active storenode for a community
|
// GetStorenodeByCommunityID returns the active storenode for a community
|
||||||
func (m *CommunityStorenodes) GetStorenodeByCommunnityID(communityID string) (mailservers.Mailserver, error) {
|
func (m *CommunityStorenodes) GetStorenodeByCommunityID(communityID string) (mailservers.Mailserver, error) {
|
||||||
m.storenodesByCommunityIDMutex.RLock()
|
m.storenodesByCommunityIDMutex.RLock()
|
||||||
defer m.storenodesByCommunityIDMutex.RUnlock()
|
defer m.storenodesByCommunityIDMutex.RUnlock()
|
||||||
|
|
||||||
|
|
|
@ -65,11 +65,11 @@ func TestUpdateStorenodesInDB(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// check if storenodes are loaded
|
// check if storenodes are loaded
|
||||||
ms1, err := csn.GetStorenodeByCommunnityID(communityID1.String())
|
ms1, err := csn.GetStorenodeByCommunityID(communityID1.String())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
matchStoreNode(t, snodes1[0], ms1)
|
matchStoreNode(t, snodes1[0], ms1)
|
||||||
|
|
||||||
ms2, err := csn.GetStorenodeByCommunnityID(communityID2.String())
|
ms2, err := csn.GetStorenodeByCommunityID(communityID2.String())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
matchStoreNode(t, snodes2[0], ms2)
|
matchStoreNode(t, snodes2[0], ms2)
|
||||||
}
|
}
|
||||||
|
|
|
@ -511,14 +511,14 @@ func (t *Transport) SendMessagesRequestForTopics(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
peerID peer.ID,
|
peerID peer.ID,
|
||||||
from, to uint32,
|
from, to uint32,
|
||||||
previousStoreCursor types.StoreRequestCursor,
|
prevCursor types.StoreRequestCursor,
|
||||||
pubsubTopic string,
|
pubsubTopic string,
|
||||||
contentTopics []types.TopicType,
|
contentTopics []types.TopicType,
|
||||||
limit uint32,
|
limit uint32,
|
||||||
waitForResponse bool,
|
waitForResponse bool,
|
||||||
processEnvelopes bool,
|
processEnvelopes bool,
|
||||||
) (storeCursor types.StoreRequestCursor, envelopesCount int, err error) {
|
) (cursor types.StoreRequestCursor, envelopesCount int, err error) {
|
||||||
return t.createMessagesRequest(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes)
|
return t.createMessagesRequest(ctx, peerID, from, to, prevCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest {
|
func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType, limit uint32) types.MessagesRequest {
|
||||||
|
@ -631,6 +631,10 @@ func (t *Transport) ConnectionChanged(state connection.State) {
|
||||||
t.waku.ConnectionChanged(state)
|
t.waku.ConnectionChanged(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Transport) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||||
|
return t.waku.PingPeer(ctx, peerID)
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
|
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
|
||||||
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
|
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
|
||||||
if t.waku.Version() == 2 {
|
if t.waku.Version() == 2 {
|
||||||
|
|
|
@ -1412,7 +1412,7 @@ func (api *PublicAPI) RequestAllHistoricMessagesWithRetries(forceFetchingBackup
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *PublicAPI) DisconnectActiveMailserver() {
|
func (api *PublicAPI) DisconnectActiveMailserver() {
|
||||||
api.service.messenger.DisconnectActiveStorenode()
|
api.service.messenger.DisconnectActiveMailserver()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Echo is a method for testing purposes.
|
// Echo is a method for testing purposes.
|
||||||
|
|
|
@ -11,7 +11,9 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
|
||||||
|
@ -55,6 +57,7 @@ func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...)
|
||||||
maddrs = append(maddrs, addrInfo.Addrs...)
|
maddrs = append(maddrs, addrInfo.Addrs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,9 +2,9 @@ package mailservers
|
||||||
|
|
||||||
import "github.com/status-im/status-go/params"
|
import "github.com/status-im/status-go/params"
|
||||||
|
|
||||||
func DefaultStorenodesByFleet(fleet string) []Mailserver {
|
func DefaultMailserversByFleet(fleet string) []Mailserver {
|
||||||
var items []Mailserver
|
var items []Mailserver
|
||||||
for _, ms := range DefaultStorenodes() {
|
for _, ms := range DefaultMailservers() {
|
||||||
if ms.Fleet == fleet {
|
if ms.Fleet == fleet {
|
||||||
items = append(items, ms)
|
items = append(items, ms)
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ func DefaultStorenodesByFleet(fleet string) []Mailserver {
|
||||||
return items
|
return items
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultStorenodes() []Mailserver {
|
func DefaultMailservers() []Mailserver {
|
||||||
|
|
||||||
return []Mailserver{
|
return []Mailserver{
|
||||||
{
|
{
|
||||||
|
@ -53,13 +53,13 @@ func DefaultStorenodes() []Mailserver {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "store-01.do-ams3.status.prod",
|
ID: "store-01.do-ams3.status.prod",
|
||||||
ENR: MustDecodeENR("enr:-QEeuEA08-NJJDuKh6V8739MPl2G7ykaC0EWyUg21KtjQ1UtKxuE2qNy5uES2_bobr7sC5C4sS_-GhDVYMpOrM2IFc8KAYJpZIJ2NIJpcIQiqsAnim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMS5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQN_aBxNsOBrceDLyC75vBFRuzv_tWfaHG50Jc9DQztwkIN0Y3CCdl-DdWRwgiMohXdha3UyAw"),
|
ENR: MustDecodeENR("enr:-QEMuEAs8JmmyUI3b9v_ADqYtELHUYAsAMS21lA2BMtrzF86tVmyy9cCrhmzfHGHx_g3nybn7jIRybzXTGNj3C2KzrriAYJpZIJ2NIJpcISf3_Jeim11bHRpYWRkcnO4XAArNiZzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwAtNiZzdG9yZS0wMS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLfoaQH3oSYW59yxEBfeAZbltmUnC4BzYkHqer2VQMTyoN0Y3CCdl-DdWRwgiMohXdha3UyAw"),
|
||||||
Addr: MustDecodeMultiaddress("/dns4/store-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT"),
|
Addr: MustDecodeMultiaddress("/dns4/store-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT"),
|
||||||
Fleet: params.FleetStatusProd,
|
Fleet: params.FleetStatusProd,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "store-02.do-ams3.status.prod",
|
ID: "store-02.do-ams3.status.prod",
|
||||||
ENR: MustDecodeENR("enr:-QEeuECQiv4VvUk04UnU3wxKXgWvErYcGMgYU8aDuc8VvEt1km2GvcEBq-R9XT-loNL5PZjxGKzB1rDtCOQaFVYQtgPnAYJpZIJ2NIJpcIQiqpoCim11bHRpYWRkcnO4bgA0Ni9zdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwA2Ni9zdG9yZS0wMi5nYy11cy1jZW50cmFsMS1hLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQNbEg1bkMJCBiD5Tje3Z_11R-kd9munZF0v4iiYZa1jgoN0Y3CCdl-DdWRwgiMohXdha3UyAw"),
|
ENR: MustDecodeENR("enr:-QEMuEDuTfD47Hz_NXDwf7LJMf0qhjp3CQhZ9Fy0Ulp4XehtEzewBzmJCoe77hjno3khH8kX2B9B1DgbJuc2n32fMZvOAYJpZIJ2NIJpcISf3_Kaim11bHRpYWRkcnO4XAArNiZzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQZ2XwAtNiZzdG9yZS0wMi5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1cy5pbQYBu94DgnJzjQAQBQABACAAQACAAQCJc2VjcDI1NmsxoQLSM62HmqGpZ382YM4CyI-MCIlkxMP7ZbOwqwRPvk9wsIN0Y3CCdl-DdWRwgiMohXdha3UyAw"),
|
||||||
Addr: MustDecodeMultiaddress("/dns4/store-02.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9aDJPkhGxc2SFcEACTFdZ91Q5TJjp76qZEhq9iF59x7R"),
|
Addr: MustDecodeMultiaddress("/dns4/store-02.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm9aDJPkhGxc2SFcEACTFdZ91Q5TJjp76qZEhq9iF59x7R"),
|
||||||
Fleet: params.FleetStatusProd,
|
Fleet: params.FleetStatusProd,
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,152 +0,0 @@
|
||||||
package mailservers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
multiaddr "github.com/multiformats/go-multiaddr"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
||||||
"github.com/status-im/status-go/rtt"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PingQuery struct {
|
|
||||||
Addresses []string `json:"addresses"`
|
|
||||||
TimeoutMs int `json:"timeoutMs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type PingResult struct {
|
|
||||||
Address string `json:"address"`
|
|
||||||
RTTMs *int `json:"rttMs"`
|
|
||||||
Err *string `json:"error"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type parseFn func(string) (string, error)
|
|
||||||
|
|
||||||
func (pr *PingResult) Update(rttMs int, err error) {
|
|
||||||
if err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
pr.Err = &errStr
|
|
||||||
}
|
|
||||||
if rttMs >= 0 {
|
|
||||||
pr.RTTMs = &rttMs
|
|
||||||
} else {
|
|
||||||
pr.RTTMs = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func EnodeToAddr(node *enode.Node) (string, error) {
|
|
||||||
var ip4 enr.IPv4
|
|
||||||
err := node.Load(&ip4)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
var tcp enr.TCP
|
|
||||||
err = node.Load(&tcp)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s:%d", net.IP(ip4).String(), tcp), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func EnodeStringToAddr(enodeAddr string) (string, error) {
|
|
||||||
node, err := enode.ParseV4(enodeAddr)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return EnodeToAddr(node)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parse(addresses []string, fn parseFn) (map[string]*PingResult, []string) {
|
|
||||||
results := make(map[string]*PingResult, len(addresses))
|
|
||||||
var toPing []string
|
|
||||||
|
|
||||||
for i := range addresses {
|
|
||||||
addr, err := fn(addresses[i])
|
|
||||||
if err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
results[addresses[i]] = &PingResult{Address: addresses[i], Err: &errStr}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
results[addr] = &PingResult{Address: addresses[i]}
|
|
||||||
toPing = append(toPing, addr)
|
|
||||||
}
|
|
||||||
return results, toPing
|
|
||||||
}
|
|
||||||
|
|
||||||
func mapValues(m map[string]*PingResult) []*PingResult {
|
|
||||||
rval := make([]*PingResult, 0, len(m))
|
|
||||||
for _, value := range m {
|
|
||||||
rval = append(rval, value)
|
|
||||||
}
|
|
||||||
return rval
|
|
||||||
}
|
|
||||||
|
|
||||||
func DoPing(ctx context.Context, addresses []string, timeoutMs int, p parseFn) ([]*PingResult, error) {
|
|
||||||
timeout := time.Duration(timeoutMs) * time.Millisecond
|
|
||||||
|
|
||||||
resultsMap, toPing := parse(addresses, p)
|
|
||||||
|
|
||||||
// run the checks concurrently
|
|
||||||
results, err := rtt.CheckHosts(toPing, timeout)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// set ping results
|
|
||||||
for i := range results {
|
|
||||||
r := results[i]
|
|
||||||
pr := resultsMap[r.Addr]
|
|
||||||
if pr == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pr.Update(r.RTTMs, r.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return mapValues(resultsMap), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *API) Ping(ctx context.Context, pq PingQuery) ([]*PingResult, error) {
|
|
||||||
return DoPing(ctx, pq.Addresses, pq.TimeoutMs, EnodeStringToAddr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MultiAddressToAddress(multiAddr string) (string, error) {
|
|
||||||
|
|
||||||
ma, err := multiaddr.NewMultiaddr(multiAddr)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
dns4, err := ma.ValueForProtocol(multiaddr.P_DNS4)
|
|
||||||
if err != nil && err != multiaddr.ErrProtocolNotFound {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
ip4 := ""
|
|
||||||
if dns4 != "" {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer cancel()
|
|
||||||
ip4, err = net.DefaultResolver.LookupCNAME(ctx, dns4)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ip4, err = ma.ValueForProtocol(multiaddr.P_IP4)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tcp, err := ma.ValueForProtocol(multiaddr.P_TCP)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s:%s", ip4, tcp), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *API) MultiAddressPing(ctx context.Context, pq PingQuery) ([]*PingResult, error) {
|
|
||||||
return DoPing(ctx, pq.Addresses, pq.TimeoutMs, MultiAddressToAddress)
|
|
||||||
}
|
|
|
@ -1,19 +1,9 @@
|
||||||
package wakuext
|
package wakuext
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/ecdsa"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/services/ext"
|
"github.com/status-im/status-go/services/ext"
|
||||||
waku "github.com/status-im/status-go/waku/common"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// defaultWorkTime is a work time reported in messages sent to MailServer nodes.
|
|
||||||
defaultWorkTime = 5
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PublicAPI extends waku public API.
|
// PublicAPI extends waku public API.
|
||||||
|
@ -33,39 +23,3 @@ func NewPublicAPI(s *Service) *PublicAPI {
|
||||||
log: log.New("package", "status-go/services/wakuext.PublicAPI"),
|
log: log.New("package", "status-go/services/wakuext.PublicAPI"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeEnvelop makes an envelop for a historic messages request.
|
|
||||||
// Symmetric key is used to authenticate to MailServer.
|
|
||||||
// PK is the current node ID.
|
|
||||||
// DEPRECATED
|
|
||||||
func makeEnvelop(
|
|
||||||
payload []byte,
|
|
||||||
symKey []byte,
|
|
||||||
publicKey *ecdsa.PublicKey,
|
|
||||||
nodeID *ecdsa.PrivateKey,
|
|
||||||
pow float64,
|
|
||||||
now time.Time,
|
|
||||||
) (types.Envelope, error) {
|
|
||||||
params := waku.MessageParams{
|
|
||||||
PoW: pow,
|
|
||||||
Payload: payload,
|
|
||||||
WorkTime: defaultWorkTime,
|
|
||||||
Src: nodeID,
|
|
||||||
}
|
|
||||||
// Either symKey or public key is required.
|
|
||||||
// This condition is verified in `message.Wrap()` method.
|
|
||||||
if len(symKey) > 0 {
|
|
||||||
params.KeySym = symKey
|
|
||||||
} else if publicKey != nil {
|
|
||||||
params.Dst = publicKey
|
|
||||||
}
|
|
||||||
message, err := waku.NewSentMessage(¶ms)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
envelope, err := message.Wrap(¶ms, now)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return gethbridge.NewWakuEnvelope(envelope), nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,25 +1,18 @@
|
||||||
package wakuext
|
package wakuext
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
"github.com/syndtr/goleveldb/leveldb/storage"
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/node"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
|
||||||
"github.com/status-im/status-go/appdatabase"
|
"github.com/status-im/status-go/appdatabase"
|
||||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||||
"github.com/status-im/status-go/eth-node/crypto"
|
"github.com/status-im/status-go/eth-node/crypto"
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
|
||||||
"github.com/status-im/status-go/multiaccounts"
|
"github.com/status-im/status-go/multiaccounts"
|
||||||
"github.com/status-im/status-go/params"
|
"github.com/status-im/status-go/params"
|
||||||
"github.com/status-im/status-go/services/ext"
|
"github.com/status-im/status-go/services/ext"
|
||||||
|
@ -66,102 +59,3 @@ func TestInitProtocol(t *testing.T) {
|
||||||
err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop())
|
err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestShhExtSuite(t *testing.T) {
|
|
||||||
suite.Run(t, new(ShhExtSuite))
|
|
||||||
}
|
|
||||||
|
|
||||||
type ShhExtSuite struct {
|
|
||||||
suite.Suite
|
|
||||||
|
|
||||||
dir string
|
|
||||||
nodes []*node.Node
|
|
||||||
wakus []types.Waku
|
|
||||||
services []*Service
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ShhExtSuite) createAndAddNode() {
|
|
||||||
idx := len(s.nodes)
|
|
||||||
|
|
||||||
// create a node
|
|
||||||
cfg := &node.Config{
|
|
||||||
Name: strconv.Itoa(idx),
|
|
||||||
P2P: p2p.Config{
|
|
||||||
MaxPeers: math.MaxInt32,
|
|
||||||
NoDiscovery: true,
|
|
||||||
ListenAddr: ":0",
|
|
||||||
},
|
|
||||||
NoUSB: true,
|
|
||||||
}
|
|
||||||
stack, err := node.New(cfg)
|
|
||||||
s.NoError(err)
|
|
||||||
w := waku.New(nil, nil)
|
|
||||||
stack.RegisterLifecycle(w)
|
|
||||||
stack.RegisterAPIs(w.APIs())
|
|
||||||
stack.RegisterProtocols(w.Protocols())
|
|
||||||
s.NoError(err)
|
|
||||||
|
|
||||||
// set up protocol
|
|
||||||
config := params.NodeConfig{
|
|
||||||
RootDataDir: s.dir,
|
|
||||||
ShhextConfig: params.ShhextConfig{
|
|
||||||
InstallationID: "1",
|
|
||||||
PFSEnabled: true,
|
|
||||||
MailServerConfirmations: true,
|
|
||||||
ConnectionTarget: 10,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
db, err := leveldb.Open(storage.NewMemStorage(), nil)
|
|
||||||
s.Require().NoError(err)
|
|
||||||
nodeWrapper := ext.NewTestNodeWrapper(nil, gethbridge.NewGethWakuWrapper(w))
|
|
||||||
service := New(config, nodeWrapper, nil, nil, db)
|
|
||||||
|
|
||||||
appDB, cleanupDB, err := helpers.SetupTestSQLDB(appdatabase.DbInitializer{}, fmt.Sprintf("%d", idx))
|
|
||||||
s.Require().NoError(err)
|
|
||||||
defer func() { s.Require().NoError(cleanupDB()) }()
|
|
||||||
|
|
||||||
tmpfile, err := ioutil.TempFile("", "multi-accounts-tests-")
|
|
||||||
s.Require().NoError(err)
|
|
||||||
|
|
||||||
multiAccounts, err := multiaccounts.InitializeDB(tmpfile.Name())
|
|
||||||
s.Require().NoError(err)
|
|
||||||
|
|
||||||
privateKey, err := crypto.GenerateKey()
|
|
||||||
s.NoError(err)
|
|
||||||
|
|
||||||
acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"}
|
|
||||||
|
|
||||||
walletDB, err := helpers.SetupTestMemorySQLDB(&walletdatabase.DbInitializer{})
|
|
||||||
s.Require().NoError(err)
|
|
||||||
|
|
||||||
err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop())
|
|
||||||
s.NoError(err)
|
|
||||||
|
|
||||||
stack.RegisterLifecycle(service)
|
|
||||||
stack.RegisterAPIs(service.APIs())
|
|
||||||
stack.RegisterProtocols(service.Protocols())
|
|
||||||
|
|
||||||
s.NoError(err)
|
|
||||||
|
|
||||||
// start the node
|
|
||||||
err = stack.Start()
|
|
||||||
s.Require().NoError(err)
|
|
||||||
|
|
||||||
// store references
|
|
||||||
s.nodes = append(s.nodes, stack)
|
|
||||||
s.wakus = append(s.wakus, gethbridge.NewGethWakuWrapper(w))
|
|
||||||
s.services = append(s.services, service)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ShhExtSuite) SetupTest() {
|
|
||||||
s.dir = s.T().TempDir()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ShhExtSuite) TearDownTest() {
|
|
||||||
for _, n := range s.nodes {
|
|
||||||
s.NoError(n.Close())
|
|
||||||
}
|
|
||||||
s.nodes = nil
|
|
||||||
s.wakus = nil
|
|
||||||
s.services = nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -278,20 +278,6 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() {
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func discardPipe() *p2p.MsgPipeRW {
|
|
||||||
rw1, rw2 := p2p.MsgPipe()
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
msg, err := rw1.ReadMsg()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
msg.Discard() // nolint: errcheck
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return rw2
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() {
|
func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() {
|
||||||
c := &Config{
|
c := &Config{
|
||||||
MaxMessageSize: common.DefaultMaxMessageSize,
|
MaxMessageSize: common.DefaultMaxMessageSize,
|
||||||
|
|
|
@ -36,6 +36,7 @@ import (
|
||||||
"github.com/jellydator/ttlcache/v3"
|
"github.com/jellydator/ttlcache/v3"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -1792,6 +1793,19 @@ func (w *Waku) PeerID() peer.ID {
|
||||||
return w.node.Host().ID()
|
return w.node.Host().ID()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Waku) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||||
|
pingResultCh := ping.Ping(ctx, w.node.Host(), peerID)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return 0, ctx.Err()
|
||||||
|
case r := <-pingResultCh:
|
||||||
|
if r.Error != nil {
|
||||||
|
return 0, r.Error
|
||||||
|
}
|
||||||
|
return r.RTT, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (w *Waku) Peerstore() peerstore.Peerstore {
|
func (w *Waku) Peerstore() peerstore.Peerstore {
|
||||||
return w.node.Host().Peerstore()
|
return w.node.Host().Peerstore()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue