From 98784b752a34335d8d8d23a8b94ec27f0de66733 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 12 Jan 2022 16:02:01 +0000 Subject: [PATCH] feat: desktop mailserver cycle (#2481) --- api/geth_backend.go | 1 - cmd/statusd/main.go | 1 + eth-node/bridge/geth/waku.go | 4 + eth-node/bridge/geth/wakuv2.go | 4 + eth-node/types/waku.go | 40 ++ multiaccounts/accounts/database.go | 10 +- node/status_node_services.go | 4 +- params/config.go | 3 +- protocol/common/feature_flags.go | 3 + protocol/communities_messenger_test.go | 1 + protocol/messenger.go | 67 ++- protocol/messenger_config.go | 15 + protocol/messenger_mailserver_cycle.go | 666 +++++++++++++++++++++++++ protocol/messenger_response_test.go | 8 +- protocol/messenger_test.go | 1 + protocol/transport/transport.go | 4 + services/ext/api.go | 4 + services/ext/service.go | 47 +- services/mailservers/tcp_ping.go | 28 +- services/wakuext/api_test.go | 38 +- services/wakuext/service.go | 6 +- services/wakuv2ext/service.go | 6 +- signal/events_shhext.go | 23 +- wakuv2/waku.go | 60 ++- 24 files changed, 952 insertions(+), 92 deletions(-) create mode 100644 protocol/messenger_mailserver_cycle.go diff --git a/api/geth_backend.go b/api/geth_backend.go index c3b6c97b3..52b554168 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -1180,7 +1180,6 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error { return ErrWakuIdentityInjectionFailure } st := b.statusNode.WakuV2ExtService() - if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil { return err } diff --git a/cmd/statusd/main.go b/cmd/statusd/main.go index 65b089a8d..5295f6323 100644 --- a/cmd/statusd/main.go +++ b/cmd/statusd/main.go @@ -216,6 +216,7 @@ func main() { identity, gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), installationID.String(), + nil, options..., ) if err != nil { diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index d6ffe4304..754f38152 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -79,6 +79,10 @@ func (w *gethWakuWrapper) DropPeer(peerID string) error { return errors.New("not available in WakuV1") } +func (w *gethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { + return nil, errors.New("not available in WakuV1") +} + // Peers function only added for compatibility with waku V2 func (w *gethWakuWrapper) Peers() map[string][]string { p := make(map[string][]string) diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 3405106ac..177be8a75 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -263,6 +263,10 @@ func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) { w.waku.MarkP2PMessageAsProcessed(hash) } +func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { + return w.waku.SubscribeToConnStatusChanges(), nil +} + type wakuV2FilterWrapper struct { filter *wakucommon.Filter id string diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index ec1f9977e..bbd97abb0 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -2,11 +2,49 @@ package types import ( "crypto/ecdsa" + "sync" "time" + "github.com/pborman/uuid" + "github.com/ethereum/go-ethereum/common" ) +type ConnStatus struct { + IsOnline bool `json:"isOnline"` + HasHistory bool `json:"hasHistory"` + Peers map[string][]string `json:"peers"` +} + +type ConnStatusSubscription struct { + sync.RWMutex + + ID string + C chan ConnStatus + active bool +} + +func NewConnStatusSubscription() *ConnStatusSubscription { + return &ConnStatusSubscription{ + ID: uuid.NewRandom().String(), + C: make(chan ConnStatus, 100), + active: true, + } +} + +func (u *ConnStatusSubscription) Active() bool { + u.RLock() + defer u.RUnlock() + return u.active +} + +func (u *ConnStatusSubscription) Unsubscribe() { + u.Lock() + defer u.Unlock() + close(u.C) + u.active = false +} + // Whisper represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku interface { @@ -34,6 +72,8 @@ type Waku interface { DropPeer(peerID string) error + SubscribeToConnStatusChanges() (*ConnStatusSubscription, error) + // MinPow returns the PoW value required by this node. MinPow() float64 // BloomFilter returns the aggregated bloom filter for all the topics of interest. diff --git a/multiaccounts/accounts/database.go b/multiaccounts/accounts/database.go index 6c6082b22..7f1b56d97 100644 --- a/multiaccounts/accounts/database.go +++ b/multiaccounts/accounts/database.go @@ -459,7 +459,7 @@ func (db *Database) SaveSetting(setting string, value interface{}) error { } func (db *Database) GetNodeConfig(nodecfg interface{}) error { - return db.db.QueryRow("SELECT node_config FROM settings WHERE synthetic_id = 'id'").Scan(&sqlite.JSONBlob{nodecfg}) + return db.db.QueryRow("SELECT node_config FROM settings WHERE synthetic_id = 'id'").Scan(&sqlite.JSONBlob{Data: nodecfg}) } func (db *Database) GetSettings() (Settings, error) { @@ -656,6 +656,14 @@ func (db *Database) GetPublicKey() (rst string, err error) { return } +func (db *Database) GetFleet() (rst string, err error) { + err = db.db.QueryRow("SELECT COALESCE(fleet, '') FROM settings WHERE synthetic_id = 'id'").Scan(&rst) + if err == sql.ErrNoRows { + return rst, nil + } + return +} + func (db *Database) GetDappsAddress() (rst types.Address, err error) { err = db.db.QueryRow("SELECT dapps_address FROM settings WHERE synthetic_id = 'id'").Scan(&rst) if err == sql.ErrNoRows { diff --git a/node/status_node_services.go b/node/status_node_services.go index 787707a1c..0e596cb57 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -153,7 +153,7 @@ func (b *StatusNode) wakuExtService(config *params.NodeConfig) (*wakuext.Service } if b.wakuExtSrvc == nil { - b.wakuExtSrvc = wakuext.New(config.ShhextConfig, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db) + b.wakuExtSrvc = wakuext.New(*config, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db) } b.wakuExtSrvc.SetP2PServer(b.gethNode.Server()) @@ -165,7 +165,7 @@ func (b *StatusNode) wakuV2ExtService(config *params.NodeConfig) (*wakuv2ext.Ser return nil, errors.New("geth node not initialized") } if b.wakuV2ExtSrvc == nil { - b.wakuV2ExtSrvc = wakuv2ext.New(config.ShhextConfig, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db) + b.wakuV2ExtSrvc = wakuv2ext.New(*config, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db) } b.wakuV2ExtSrvc.SetP2PServer(b.gethNode.Server()) diff --git a/params/config.go b/params/config.go index d3954b588..9e6f0a6ca 100644 --- a/params/config.go +++ b/params/config.go @@ -591,7 +591,8 @@ type ShhextConfig struct { ConnectionTarget int // RequestsDelay used to ensure that no similar requests are sent within short periods of time. RequestsDelay time.Duration - + // EnableMailserverCycle is used to enable the mailserver cycle to switch between trusted servers to retrieve the message history + EnableMailserverCycle bool // MaxServerFailures defines maximum allowed expired requests before server will be swapped to another one. MaxServerFailures int diff --git a/protocol/common/feature_flags.go b/protocol/common/feature_flags.go index 18e81da26..7272e8cca 100644 --- a/protocol/common/feature_flags.go +++ b/protocol/common/feature_flags.go @@ -8,4 +8,7 @@ type FeatureFlags struct { // PushNotification indicates whether we should be enabling the push notification feature PushNotifications bool + + // MailserverCycle indicates whether we should enable or not the mailserver cycle + MailserverCycle bool } diff --git a/protocol/communities_messenger_test.go b/protocol/communities_messenger_test.go index 36d76a8a4..59329b937 100644 --- a/protocol/communities_messenger_test.go +++ b/protocol/communities_messenger_test.go @@ -75,6 +75,7 @@ func (s *MessengerCommunitiesSuite) newMessengerWithOptions(shh types.Waku, priv privateKey, &testNode{shh: shh}, uuid.New().String(), + nil, options..., ) s.Require().NoError(err) diff --git a/protocol/messenger.go b/protocol/messenger.go index 566084490..175d301f3 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -15,13 +15,18 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/pkg/errors" "go.uber.org/zap" + "github.com/ethereum/go-ethereum/event" + "github.com/davecgh/go-spew/spew" "github.com/golang/protobuf/proto" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appmetrics" "github.com/status-im/status-go/connection" @@ -47,7 +52,8 @@ import ( "github.com/status-im/status-go/protocol/sqlite" "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" - "github.com/status-im/status-go/services/mailservers" + "github.com/status-im/status-go/services/ext/mailservers" + mailserversDB "github.com/status-im/status-go/services/mailservers" "github.com/status-im/status-go/telemetry" ) @@ -81,6 +87,8 @@ var messageCacheIntervalMs uint64 = 1000 * 60 * 60 * 48 // mailservers because they can also be managed by the user. type Messenger struct { node types.Node + server *p2p.Server + peerStore *mailservers.PeerStore config *config identity *ecdsa.PrivateKey persistence *sqlitePersistence @@ -105,11 +113,13 @@ type Messenger struct { modifiedInstallations *stringBoolMap installationID string mailserver []byte + mailserverCycle mailserverCycle database *sql.DB multiAccounts *multiaccounts.Database + mailservers *mailserversDB.Database settings *accounts.Database account *multiaccounts.Account - mailserversDatabase *mailservers.Database + mailserversDatabase *mailserversDB.Database quit chan struct{} requestedCommunities map[string]*transport.Filter connectionState connection.State @@ -119,6 +129,28 @@ type Messenger struct { mutex sync.Mutex } +type connStatus int + +const ( + disconnected connStatus = iota + 1 + connecting + connected +) + +type peerStatus struct { + status connStatus + canConnectAfter time.Time + lastConnectionAttempt time.Time +} +type mailserverCycle struct { + sync.RWMutex + activeMailserver *enode.Node // For usage with wakuV1 + activeStoreNode *peer.ID // For usage with wakuV2 + peers map[string]peerStatus + events chan *p2p.PeerEvent + subscription event.Subscription +} + type dbConfig struct { dbPath string dbKey string @@ -175,6 +207,7 @@ func NewMessenger( identity *ecdsa.PrivateKey, node types.Node, installationID string, + peerStore *mailservers.PeerStore, opts ...Option, ) (*Messenger, error) { var messenger *Messenger @@ -346,6 +379,7 @@ func NewMessenger( return nil, err } settings := accounts.NewDB(database) + mailservers := mailserversDB.NewDB(database) messenger = &Messenger{ config: &c, node: node, @@ -372,10 +406,15 @@ func NewMessenger( database: database, multiAccounts: c.multiAccount, settings: settings, - mailserversDatabase: c.mailserversDatabase, - account: c.account, - quit: make(chan struct{}), - requestedCommunities: make(map[string]*transport.Filter), + peerStore: peerStore, + mailservers: mailservers, + mailserverCycle: mailserverCycle{ + peers: make(map[string]peerStatus), + }, + mailserversDatabase: c.mailserversDatabase, + account: c.account, + quit: make(chan struct{}), + requestedCommunities: make(map[string]*transport.Filter), shutdownTasks: []func() error{ ensVerifier.Stop, pushNotificationClient.Stop, @@ -410,6 +449,10 @@ func NewMessenger( return messenger, nil } +func (m *Messenger) SetP2PServer(server *p2p.Server) { + m.server = server +} + func (m *Messenger) processSentMessages(ids []string) error { if m.connectionState.Offline { return errors.New("Can't mark message as sent while offline") @@ -580,6 +623,13 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } + if m.config.featureFlags.MailserverCycle { + err := m.StartMailserverCycle() + if err != nil { + return nil, err + } + } + return response, nil } @@ -619,7 +669,12 @@ func (m *Messenger) handleConnectionChange(online bool) { if m.pushNotificationClient != nil { m.pushNotificationClient.Offline() } + + if m.config.featureFlags.MailserverCycle { + m.DisconnectActiveMailserver() // force mailserver cycle to run again + } } + m.ensVerifier.SetOnline(online) } diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index 6a563d730..8fd7d512c 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -54,6 +54,7 @@ type config struct { multiAccount *multiaccounts.Database mailserversDatabase *mailservers.Database account *multiaccounts.Account + clusterConfig params.ClusterConfig verifyTransactionClient EthClient verifyENSURL string @@ -231,3 +232,17 @@ func WithENSVerificationConfig(onENSVerified func(*MessengerResponse), url, addr return nil } } + +func WithClusterConfig(cc params.ClusterConfig) Option { + return func(c *config) error { + c.clusterConfig = cc + return nil + } +} + +func WithMailserverCycle() func(c *config) error { + return func(c *config) error { + c.featureFlags.MailserverCycle = true + return nil + } +} diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go new file mode 100644 index 000000000..b88f22a8b --- /dev/null +++ b/protocol/messenger_mailserver_cycle.go @@ -0,0 +1,666 @@ +package protocol + +import ( + "context" + "crypto/rand" + "math" + "math/big" + "sort" + "strings" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/status-im/go-waku/waku/v2/dnsdisc" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/params" + "github.com/status-im/status-go/services/mailservers" + "github.com/status-im/status-go/signal" +) + +const defaultBackoff = 30 * time.Second + +type byRTTMs []*mailservers.PingResult + +func (s byRTTMs) Len() int { + return len(s) +} + +func (s byRTTMs) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s byRTTMs) Less(i, j int) bool { + return *s[i].RTTMs < *s[j].RTTMs +} + +func (m *Messenger) StartMailserverCycle() error { + canUseMailservers, err := m.settings.CanUseMailservers() + if err != nil { + return err + } + if !canUseMailservers { + return errors.New("mailserver use is not allowed") + } + + m.logger.Debug("started mailserver cycle") + + m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20) + m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events) + + go m.checkMailserverConnection() + go m.updateWakuV1PeerStatus() + go m.updateWakuV2PeerStatus() + return nil +} + +func (m *Messenger) DisconnectActiveMailserver() { + m.mailserverCycle.Lock() + defer m.mailserverCycle.Unlock() + m.disconnectActiveMailserver() +} + +func (m *Messenger) disconnectV1Mailserver() { + // TODO: remove this function once WakuV1 is deprecated + if m.mailserverCycle.activeMailserver == nil { + return + } + m.logger.Info("Disconnecting active mailserver", zap.Any("nodeID", m.mailserverCycle.activeMailserver.ID())) + pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] + if ok { + pInfo.status = disconnected + pInfo.canConnectAfter = time.Now().Add(defaultBackoff) + m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] = pInfo + } else { + m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] = peerStatus{ + status: disconnected, + canConnectAfter: time.Now().Add(defaultBackoff), + } + } + + m.server.RemovePeer(m.mailserverCycle.activeMailserver) + m.mailserverCycle.activeMailserver = nil +} + +func (m *Messenger) disconnectStoreNode() { + if m.mailserverCycle.activeStoreNode == nil { + return + } + m.logger.Info("Disconnecting active storeNode", zap.Any("nodeID", m.mailserverCycle.activeStoreNode.Pretty())) + pInfo, ok := m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] + if ok { + pInfo.status = disconnected + pInfo.canConnectAfter = time.Now().Add(defaultBackoff) + m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] = pInfo + } else { + m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] = peerStatus{ + status: disconnected, + canConnectAfter: time.Now().Add(defaultBackoff), + } + } + + err := m.transport.DropPeer(string(*m.mailserverCycle.activeStoreNode)) + if err != nil { + m.logger.Warn("Could not drop peer") + } + + m.mailserverCycle.activeStoreNode = nil +} + +func (m *Messenger) disconnectActiveMailserver() { + switch m.transport.WakuVersion() { + case 1: + m.disconnectV1Mailserver() + case 2: + m.disconnectStoreNode() + } + signal.SendMailserverChanged("") +} + +func (m *Messenger) cycleMailservers() { + m.mailserverCycle.Lock() + defer m.mailserverCycle.Unlock() + + m.logger.Info("Automatically switching mailserver") + + if m.mailserverCycle.activeMailserver != nil { + m.disconnectActiveMailserver() + } + + err := m.findNewMailserver() + if err != nil { + m.logger.Error("Error getting new mailserver", zap.Error(err)) + } +} + +func poolSize(fleetSize int) int { + return int(math.Ceil(float64(fleetSize) / 4)) +} + +func (m *Messenger) findNewMailserver() error { + switch m.transport.WakuVersion() { + case 1: + return m.findNewMailserverV1() + case 2: + return m.findStoreNode() + default: + return errors.New("waku version is not supported") + } +} + +func (m *Messenger) findStoreNode() error { + allMailservers := parseStoreNodeConfig(m.config.clusterConfig.StoreNodes) + + // TODO: append user mailservers once that functionality is available for waku2 + + var mailserverList []multiaddr.Multiaddr + now := time.Now() + for _, node := range allMailservers { + pID, err := getPeerID(node) + if err != nil { + continue + } + + pInfo, ok := m.mailserverCycle.peers[string(pID)] + if !ok || pInfo.canConnectAfter.Before(now) { + mailserverList = append(mailserverList, node) + } + } + + m.logger.Info("Finding a new store node...") + + var mailserverStr []string + for _, m := range mailserverList { + mailserverStr = append(mailserverStr, m.String()) + } + + pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, mailservers.MultiAddressToAddress) + if err != nil { + return err + } + + var availableMailservers []*mailservers.PingResult + for _, result := range pingResult { + if result.Err != nil { + continue // The results with error are ignored + } + availableMailservers = append(availableMailservers, result) + } + sort.Sort(byRTTMs(availableMailservers)) + + if len(availableMailservers) == 0 { + m.logger.Warn("No store nodes available") // Do nothing... + return nil + } + + // Picks a random mailserver amongs the ones with the lowest latency + // The pool size is 1/4 of the mailservers were pinged successfully + pSize := poolSize(len(availableMailservers) - 1) + if pSize <= 0 { + m.logger.Warn("No store nodes available") // Do nothing... + return nil + } + + r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize))) + if err != nil { + return err + } + + return m.connectToStoreNode(parseMultiaddresses([]string{availableMailservers[r.Int64()].Address})[0]) +} + +func (m *Messenger) findNewMailserverV1() error { + // TODO: remove this function once WakuV1 is deprecated + + allMailservers := parseNodes(m.config.clusterConfig.TrustedMailServers) + + // Append user mailservers + var fleet string + dbFleet, err := m.settings.GetFleet() + if err != nil { + return err + } + if dbFleet != "" { + fleet = dbFleet + } else if m.config.clusterConfig.Fleet != "" { + fleet = m.config.clusterConfig.Fleet + } else { + fleet = params.FleetProd + } + + customMailservers, err := m.mailservers.Mailservers() + if err != nil { + return err + } + for _, c := range customMailservers { + if c.Fleet == fleet { + mNode, err := enode.ParseV4(c.Address) + if err != nil { + allMailservers = append(allMailservers, mNode) + } + } + } + + var mailserverList []*enode.Node + now := time.Now() + for _, node := range allMailservers { + pInfo, ok := m.mailserverCycle.peers[node.ID().String()] + if !ok || pInfo.canConnectAfter.Before(now) { + mailserverList = append(mailserverList, node) + } + } + + m.logger.Info("Finding a new mailserver...") + + var mailserverStr []string + for _, m := range mailserverList { + mailserverStr = append(mailserverStr, m.String()) + } + + pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, mailservers.EnodeStringToAddr) + if err != nil { + return err + } + + var availableMailservers []*mailservers.PingResult + for _, result := range pingResult { + if result.Err != nil { + continue // The results with error are ignored + } + availableMailservers = append(availableMailservers, result) + } + sort.Sort(byRTTMs(availableMailservers)) + + if len(availableMailservers) == 0 { + m.logger.Warn("No mailservers available") // Do nothing... + return nil + } + + // Picks a random mailserver amongs the ones with the lowest latency + // The pool size is 1/4 of the mailservers were pinged successfully + pSize := poolSize(len(availableMailservers) - 1) + r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize))) + if err != nil { + return err + } + + return m.connectToMailserver(parseNodes([]string{availableMailservers[r.Int64()].Address})[0]) +} + +func (m *Messenger) activeMailserverStatus() (connStatus, error) { + var mailserverID string + switch m.transport.WakuVersion() { + case 1: + if m.mailserverCycle.activeMailserver == nil { + return disconnected, errors.New("Active mailserver is not set") + } + mailserverID = m.mailserverCycle.activeMailserver.ID().String() + case 2: + if m.mailserverCycle.activeStoreNode == nil { + return disconnected, errors.New("Active storenode is not set") + } + mailserverID = string(*m.mailserverCycle.activeStoreNode) + default: + return disconnected, errors.New("waku version is not supported") + } + + return m.mailserverCycle.peers[mailserverID].status, nil +} + +func (m *Messenger) connectToMailserver(node *enode.Node) error { + // TODO: remove this function once WakuV1 is deprecated + + if m.transport.WakuVersion() != 1 { + return nil // This can only be used with wakuV1 + } + + m.logger.Info("Connecting to mailserver", zap.Any("peer", node.ID())) + nodeConnected := false + + m.mailserverCycle.activeMailserver = node + signal.SendMailserverChanged(m.mailserverCycle.activeMailserver.String()) + + // Adding a peer and marking it as connected can't be executed sync in WakuV1, because + // There's a delay between requesting a peer being added, and a signal being + // received after the peer was added. So we first set the peer status as + // Connecting and once a peerConnected signal is received, we mark it as + // Connected + activeMailserverStatus, err := m.activeMailserverStatus() + if err != nil { + return err + } + + if activeMailserverStatus == connected { + nodeConnected = true + } else { + // Attempt to connect to mailserver by adding it as a peer + m.SetMailserver(node.ID().Bytes()) + m.server.AddPeer(node) + if err := m.peerStore.Update([]*enode.Node{node}); err != nil { + return err + } + + pInfo, ok := m.mailserverCycle.peers[node.ID().String()] + if ok { + pInfo.status = connecting + pInfo.lastConnectionAttempt = time.Now() + m.mailserverCycle.peers[node.ID().String()] = pInfo + } else { + m.mailserverCycle.peers[node.ID().String()] = peerStatus{ + status: connecting, + lastConnectionAttempt: time.Now(), + } + } + } + + if nodeConnected { + m.logger.Info("Mailserver available") + signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.String()) + } + + return nil +} + +func (m *Messenger) connectToStoreNode(node multiaddr.Multiaddr) error { + if m.transport.WakuVersion() != 2 { + return nil // This can only be used with wakuV2 + } + + m.logger.Info("Connecting to storenode", zap.Any("peer", node)) + + nodeConnected := false + + peerID, err := getPeerID(node) + if err != nil { + return err + } + + m.mailserverCycle.activeStoreNode = &peerID + signal.SendMailserverChanged(m.mailserverCycle.activeStoreNode.Pretty()) + + // Adding a peer and marking it as connected can't be executed sync in WakuV1, because + // There's a delay between requesting a peer being added, and a signal being + // received after the peer was added. So we first set the peer status as + // Connecting and once a peerConnected signal is received, we mark it as + // Connected + activeMailserverStatus, err := m.activeMailserverStatus() + if err != nil { + return err + } + + if activeMailserverStatus == connected { + nodeConnected = true + } else { + // Attempt to connect to mailserver by adding it as a peer + m.SetMailserver([]byte(peerID.Pretty())) + if err := m.transport.DialPeer(node.String()); err != nil { + return err + } + + pInfo, ok := m.mailserverCycle.peers[string(peerID)] + if ok { + pInfo.status = connected + pInfo.lastConnectionAttempt = time.Now() + } else { + m.mailserverCycle.peers[string(peerID)] = peerStatus{ + status: connected, + lastConnectionAttempt: time.Now(), + } + } + + nodeConnected = true + } + + if nodeConnected { + m.logger.Info("Storenode available") + signal.SendMailserverAvailable(m.mailserverCycle.activeStoreNode.Pretty()) + } + + return nil +} + +func (m *Messenger) isActiveMailserverAvailable() bool { + m.mailserverCycle.RLock() + defer m.mailserverCycle.RUnlock() + + mailserverStatus, err := m.activeMailserverStatus() + if err != nil { + return false + } + + return mailserverStatus == connected +} + +func (m *Messenger) updateWakuV2PeerStatus() { + if m.transport.WakuVersion() != 2 { + return // This can only be used with wakuV2 + } + + connSubscription, err := m.transport.SubscribeToConnStatusChanges() + if err != nil { + m.logger.Error("Could not subscribe to connection status changes", zap.Error(err)) + } + + for { + select { + case status := <-connSubscription.C: + m.mailserverCycle.Lock() + + for pID, pInfo := range m.mailserverCycle.peers { + if pInfo.status == disconnected { + continue + } + + // Removing disconnected + + found := false + for connectedPeer := range status.Peers { + peerID, err := peer.Decode(connectedPeer) + if err != nil { + continue + } + + if string(peerID) == pID { + found = true + break + } + } + if !found && pInfo.status == connected { + m.logger.Info("Peer disconnected", zap.String("peer", peer.ID(pID).Pretty())) + pInfo.status = disconnected + pInfo.canConnectAfter = time.Now().Add(defaultBackoff) + } + + m.mailserverCycle.peers[pID] = pInfo + } + + for connectedPeer := range status.Peers { + peerID, err := peer.Decode(connectedPeer) + if err != nil { + continue + } + + pInfo, ok := m.mailserverCycle.peers[string(peerID)] + if !ok || pInfo.status != connected { + m.logger.Info("Peer connected", zap.String("peer", connectedPeer)) + pInfo.status = connected + pInfo.canConnectAfter = time.Now().Add(defaultBackoff) + m.mailserverCycle.peers[string(peerID)] = pInfo + } + } + m.mailserverCycle.Unlock() + + case <-m.quit: + connSubscription.Unsubscribe() + return + } + } +} + +func (m *Messenger) updateWakuV1PeerStatus() { + // TODO: remove this function once WakuV1 is deprecated + + if m.transport.WakuVersion() != 1 { + return // This can only be used with wakuV1 + } + + for { + select { + case <-m.mailserverCycle.events: + connectedPeers := m.server.PeersInfo() + m.mailserverCycle.Lock() + + for pID, pInfo := range m.mailserverCycle.peers { + if pInfo.status == disconnected { + continue + } + + // Removing disconnected + + found := false + for _, connectedPeer := range connectedPeers { + if enode.HexID(connectedPeer.ID) == enode.HexID(pID) { + found = true + break + } + } + if !found && (pInfo.status == connected || (pInfo.status == connecting && pInfo.lastConnectionAttempt.Add(8*time.Second).Before(time.Now()))) { + m.logger.Info("Peer disconnected", zap.String("peer", enode.HexID(pID).String())) + pInfo.status = disconnected + pInfo.canConnectAfter = time.Now().Add(defaultBackoff) + } + + m.mailserverCycle.peers[pID] = pInfo + } + + for _, connectedPeer := range connectedPeers { + hexID := enode.HexID(connectedPeer.ID).String() + pInfo, ok := m.mailserverCycle.peers[hexID] + if !ok || pInfo.status != connected { + m.logger.Info("Peer connected", zap.String("peer", hexID)) + pInfo.status = connected + pInfo.canConnectAfter = time.Now().Add(defaultBackoff) + if m.mailserverCycle.activeMailserver != nil && hexID == m.mailserverCycle.activeMailserver.ID().String() { + m.logger.Info("Mailserver available") + signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.String()) + } + m.mailserverCycle.peers[hexID] = pInfo + } + } + m.mailserverCycle.Unlock() + case <-m.quit: + m.mailserverCycle.Lock() + defer m.mailserverCycle.Unlock() + close(m.mailserverCycle.events) + m.mailserverCycle.subscription.Unsubscribe() + return + } + } +} + +func (m *Messenger) checkMailserverConnection() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + m.logger.Info("Verifying mailserver connection state...") + // m.settings.GetPinnedMailserver + //if pinnedMailserver != "" && self.activeMailserver != pinnedMailserver { + // connect to current mailserver from the settings + // self.mailservers = pinnedMailserver + // self.connect(pinnedMailserver) + //} else { + // or setup a random mailserver: + if !m.isActiveMailserverAvailable() { + m.cycleMailservers() + } + // } + + select { + case <-m.quit: + return + case <-ticker.C: + continue + } + } +} + +func parseNodes(enodes []string) []*enode.Node { + var nodes []*enode.Node + for _, item := range enodes { + parsedPeer, err := enode.ParseV4(item) + if err == nil { + nodes = append(nodes, parsedPeer) + } + } + return nodes +} + +func parseMultiaddresses(addresses []string) []multiaddr.Multiaddr { + var result []multiaddr.Multiaddr + for _, item := range addresses { + ma, err := multiaddr.NewMultiaddr(item) + if err == nil { + result = append(result, ma) + } + } + return result +} + +func parseStoreNodeConfig(addresses []string) []multiaddr.Multiaddr { + // TODO: once a scoring/reputation mechanism is added to waku, + // this function can be modified to retrieve the storenodes + // from waku peerstore. + // We don't do that now because we can't trust any random storenode + // So we use only those specified in the cluster config + var result []multiaddr.Multiaddr + var dnsDiscWg sync.WaitGroup + + maChan := make(chan multiaddr.Multiaddr, 1000) + + for _, addrString := range addresses { + if strings.HasPrefix(addrString, "enrtree://") { + // Use DNS Discovery + dnsDiscWg.Add(1) + go func(addr string) { + defer dnsDiscWg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + multiaddresses, err := dnsdisc.RetrieveNodes(ctx, addr) + if err == nil { + for _, ma := range multiaddresses { + maChan <- ma + } + } + }(addrString) + + } else { + // It's a normal multiaddress + ma, err := multiaddr.NewMultiaddr(addrString) + if err == nil { + maChan <- ma + } + } + } + dnsDiscWg.Wait() + close(maChan) + for ma := range maChan { + result = append(result, ma) + } + + return result +} + +func getPeerID(addr multiaddr.Multiaddr) (peer.ID, error) { + idStr, err := addr.ValueForProtocol(multiaddr.P_P2P) + if err != nil { + return "", err + } + return peer.Decode(idStr) +} diff --git a/protocol/messenger_response_test.go b/protocol/messenger_response_test.go index a0c2c00c3..d702b02c3 100644 --- a/protocol/messenger_response_test.go +++ b/protocol/messenger_response_test.go @@ -55,22 +55,22 @@ func TestMessengerResponseMergeNotImplemented(t *testing.T) { response1 := &MessengerResponse{} response2 := &MessengerResponse{ - Contacts: []*Contact{&Contact{}}, + Contacts: []*Contact{{}}, } require.Error(t, response1.Merge(response2)) response2 = &MessengerResponse{ - Installations: []*multidevice.Installation{&multidevice.Installation{}}, + Installations: []*multidevice.Installation{{}}, } require.Error(t, response1.Merge(response2)) response2 = &MessengerResponse{ - EmojiReactions: []*EmojiReaction{&EmojiReaction{}}, + EmojiReactions: []*EmojiReaction{{}}, } require.Error(t, response1.Merge(response2)) response2 = &MessengerResponse{ - Invitations: []*GroupChatInvitation{&GroupChatInvitation{}}, + Invitations: []*GroupChatInvitation{{}}, } require.Error(t, response1.Merge(response2)) diff --git a/protocol/messenger_test.go b/protocol/messenger_test.go index a81e44f41..cd091f39d 100644 --- a/protocol/messenger_test.go +++ b/protocol/messenger_test.go @@ -145,6 +145,7 @@ func newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey, logger *z privateKey, &testNode{shh: shh}, uuid.New().String(), + nil, options..., ) if err != nil { diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 699baa09a..7f668fed6 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -642,3 +642,7 @@ func (t *Transport) ProcessingP2PMessages() bool { func (t *Transport) MarkP2PMessageAsProcessed(hash common.Hash) { t.waku.MarkP2PMessageAsProcessed(hash) } + +func (t *Transport) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { + return t.waku.SubscribeToConnStatusChanges() +} diff --git a/services/ext/api.go b/services/ext/api.go index 0cc3f9dce..55b07877e 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -909,6 +909,10 @@ func (api *PublicAPI) RequestAllHistoricMessages() (*protocol.MessengerResponse, return api.service.messenger.RequestAllHistoricMessages() } +func (api *PublicAPI) DisconnectActiveMailserver() { + api.service.messenger.DisconnectActiveMailserver() +} + // Echo is a method for testing purposes. func (api *PublicAPI) Echo(ctx context.Context, message string) (string, error) { return message, nil diff --git a/services/ext/service.go b/services/ext/service.go index bf4944bec..7a2aaafbc 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -58,7 +58,7 @@ type Service struct { cancelMessenger chan struct{} storage db.TransactionalStorage n types.Node - config params.ShhextConfig + config params.NodeConfig mailMonitor *MailRequestMonitor server *p2p.Server peerStore *mailservers.PeerStore @@ -71,7 +71,7 @@ type Service struct { var _ node.Lifecycle = (*Service)(nil) func New( - config params.ShhextConfig, + config params.NodeConfig, n types.Node, ldb *leveldb.DB, mailMonitor *MailRequestMonitor, @@ -103,7 +103,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) { } func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db *sql.DB, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, logger *zap.Logger) error { - if !s.config.PFSEnabled { + if !s.config.ShhextConfig.PFSEnabled { return nil } @@ -118,15 +118,15 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db * s.identity = identity - dataDir := filepath.Clean(s.config.BackupDisabledDataDir) + dataDir := filepath.Clean(s.config.ShhextConfig.BackupDisabledDataDir) if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { return err } envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{ - MaxAttempts: s.config.MaxMessageDeliveryAttempts, - AwaitOnlyMailServerConfirmations: s.config.MailServerConfirmations, + MaxAttempts: s.config.ShhextConfig.MaxMessageDeliveryAttempts, + AwaitOnlyMailServerConfirmations: s.config.ShhextConfig.MailServerConfirmations, IsMailserver: func(peer types.EnodeID) bool { return s.peerStore.Exist(peer) }, @@ -146,13 +146,15 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db * nodeName, identity, s.n, - s.config.InstallationID, + s.config.ShhextConfig.InstallationID, + s.peerStore, options..., ) if err != nil { return err } s.messenger = messenger + s.messenger.SetP2PServer(s.server) return messenger.Init() } @@ -166,7 +168,7 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) { go s.retrieveMessagesLoop(time.Second, s.cancelMessenger) go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger) - if s.config.BandwidthStatsEnabled { + if s.config.ShhextConfig.BandwidthStatsEnabled { go s.retrieveStats(5*time.Second, s.cancelMessenger) } @@ -282,7 +284,7 @@ func (c *verifyTransactionClient) TransactionByHash(ctx context.Context, hash ty } func (s *Service) verifyTransactionLoop(tick time.Duration, cancel <-chan struct{}) { - if s.config.VerifyTransactionURL == "" { + if s.config.ShhextConfig.VerifyTransactionURL == "" { log.Warn("not starting transaction loop") return } @@ -392,7 +394,7 @@ func (s *Service) Stop() error { } func buildMessengerOptions( - config params.ShhextConfig, + config params.NodeConfig, identity *ecdsa.PrivateKey, db *sql.DB, multiAccounts *multiaccounts.Database, @@ -411,13 +413,18 @@ func buildMessengerOptions( protocol.WithAccount(account), protocol.WithEnvelopesMonitorConfig(envelopesMonitorConfig), protocol.WithSignalsHandler(messengerSignalsHandler), - protocol.WithENSVerificationConfig(publishMessengerResponse, config.VerifyENSURL, config.VerifyENSContractAddress), + protocol.WithENSVerificationConfig(publishMessengerResponse, config.ShhextConfig.VerifyENSURL, config.ShhextConfig.VerifyENSContractAddress), + protocol.WithClusterConfig(config.ClusterConfig), } - if config.DataSyncEnabled { + if config.ShhextConfig.DataSyncEnabled { options = append(options, protocol.WithDatasync()) } + if config.ShhextConfig.EnableMailserverCycle { + options = append(options, protocol.WithMailserverCycle()) + } + settings, err := accountsDB.GetSettings() if err != sql.ErrNoRows && err != nil { return nil, err @@ -425,7 +432,7 @@ func buildMessengerOptions( // Generate anon metrics client config if settings.AnonMetricsShouldSend { - keyBytes, err := hex.DecodeString(config.AnonMetricsSendID) + keyBytes, err := hex.DecodeString(config.ShhextConfig.AnonMetricsSendID) if err != nil { return nil, err } @@ -443,14 +450,14 @@ func buildMessengerOptions( } // Generate anon metrics server config - if config.AnonMetricsServerEnabled { - if len(config.AnonMetricsServerPostgresURI) == 0 { + if config.ShhextConfig.AnonMetricsServerEnabled { + if len(config.ShhextConfig.AnonMetricsServerPostgresURI) == 0 { return nil, errors.New("AnonMetricsServerPostgresURI must be set") } amsc := &anonmetrics.ServerConfig{ Enabled: true, - PostgresURI: config.AnonMetricsServerPostgresURI, + PostgresURI: config.ShhextConfig.AnonMetricsServerPostgresURI, } options = append(options, protocol.WithAnonMetricsServerConfig(amsc)) } @@ -468,17 +475,17 @@ func buildMessengerOptions( } options = append(options, protocol.WithPushNotificationClientConfig(&pushnotificationclient.Config{ - DefaultServers: config.DefaultPushNotificationsServers, + DefaultServers: config.ShhextConfig.DefaultPushNotificationsServers, BlockMentions: settings.PushNotificationsBlockMentions, SendEnabled: settings.SendPushNotifications, AllowFromContactsOnly: settings.PushNotificationsFromContactsOnly, RemoteNotificationsEnabled: settings.RemotePushNotificationsEnabled, })) - if config.VerifyTransactionURL != "" { + if config.ShhextConfig.VerifyTransactionURL != "" { client := &verifyTransactionClient{ - url: config.VerifyTransactionURL, - chainID: big.NewInt(config.VerifyTransactionChainID), + url: config.ShhextConfig.VerifyTransactionURL, + chainID: big.NewInt(config.ShhextConfig.VerifyTransactionChainID), } options = append(options, protocol.WithVerifyTransactionClient(client)) } diff --git a/services/mailservers/tcp_ping.go b/services/mailservers/tcp_ping.go index 560dc12a3..e7e773edc 100644 --- a/services/mailservers/tcp_ping.go +++ b/services/mailservers/tcp_ping.go @@ -38,13 +38,9 @@ func (pr *PingResult) Update(rttMs int, err error) { } } -func enodeToAddr(enodeAddr string) (string, error) { - node, err := enode.ParseV4(enodeAddr) - if err != nil { - return "", err - } +func EnodeToAddr(node *enode.Node) (string, error) { var ip4 enr.IPv4 - err = node.Load(&ip4) + err := node.Load(&ip4) if err != nil { return "", err } @@ -56,6 +52,14 @@ func enodeToAddr(enodeAddr string) (string, error) { return fmt.Sprintf("%s:%d", net.IP(ip4).String(), tcp), nil } +func EnodeStringToAddr(enodeAddr string) (string, error) { + node, err := enode.ParseV4(enodeAddr) + if err != nil { + return "", err + } + return EnodeToAddr(node) +} + func parse(addresses []string, fn parseFn) (map[string]*PingResult, []string) { results := make(map[string]*PingResult, len(addresses)) var toPing []string @@ -81,10 +85,10 @@ func mapValues(m map[string]*PingResult) []*PingResult { return rval } -func ping(ctx context.Context, pq PingQuery, p parseFn) ([]*PingResult, error) { - timeout := time.Duration(pq.TimeoutMs) * time.Millisecond +func DoPing(ctx context.Context, addresses []string, timeoutMs int, p parseFn) ([]*PingResult, error) { + timeout := time.Duration(timeoutMs) * time.Millisecond - resultsMap, toPing := parse(pq.Addresses, p) + resultsMap, toPing := parse(addresses, p) // run the checks concurrently results, err := rtt.CheckHosts(toPing, timeout) @@ -106,10 +110,10 @@ func ping(ctx context.Context, pq PingQuery, p parseFn) ([]*PingResult, error) { } func (a *API) Ping(ctx context.Context, pq PingQuery) ([]*PingResult, error) { - return ping(ctx, pq, enodeToAddr) + return DoPing(ctx, pq.Addresses, pq.TimeoutMs, EnodeStringToAddr) } -func multiAddressToAddress(multiAddr string) (string, error) { +func MultiAddressToAddress(multiAddr string) (string, error) { ma, err := multiaddr.NewMultiaddr(multiAddr) if err != nil { @@ -129,5 +133,5 @@ func multiAddressToAddress(multiAddr string) (string, error) { } func (a *API) MultiAddressPing(ctx context.Context, pq PingQuery) ([]*PingResult, error) { - return ping(ctx, pq, multiAddressToAddress) + return DoPing(ctx, pq.Addresses, pq.TimeoutMs, MultiAddressToAddress) } diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index 3f3f2d368..d310dd08f 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -53,10 +53,12 @@ func TestRequestMessagesErrors(t *testing.T) { defer func() { require.NoError(t, aNode.Close()) }() handler := ext.NewHandlerMock(1) - config := params.ShhextConfig{ - InstallationID: "1", - BackupDisabledDataDir: os.TempDir(), - PFSEnabled: true, + config := params.NodeConfig{ + ShhextConfig: params.ShhextConfig{ + InstallationID: "1", + BackupDisabledDataDir: os.TempDir(), + PFSEnabled: true, + }, } nodeWrapper := ext.NewTestNodeWrapper(nil, waku) service := New(config, nodeWrapper, handler, nil) @@ -102,12 +104,14 @@ func TestInitProtocol(t *testing.T) { directory, err := ioutil.TempDir("", "status-go-testing") require.NoError(t, err) - config := params.ShhextConfig{ - InstallationID: "2", - BackupDisabledDataDir: directory, - PFSEnabled: true, - MailServerConfirmations: true, - ConnectionTarget: 10, + config := params.NodeConfig{ + ShhextConfig: params.ShhextConfig{ + InstallationID: "2", + BackupDisabledDataDir: directory, + PFSEnabled: true, + MailServerConfirmations: true, + ConnectionTarget: 10, + }, } db, err := leveldb.Open(storage.NewMemStorage(), nil) require.NoError(t, err) @@ -171,12 +175,14 @@ func (s *ShhExtSuite) createAndAddNode() { s.NoError(err) // set up protocol - config := params.ShhextConfig{ - InstallationID: "1", - BackupDisabledDataDir: s.dir, - PFSEnabled: true, - MailServerConfirmations: true, - ConnectionTarget: 10, + config := params.NodeConfig{ + ShhextConfig: params.ShhextConfig{ + InstallationID: "1", + BackupDisabledDataDir: s.dir, + PFSEnabled: true, + MailServerConfirmations: true, + ConnectionTarget: 10, + }, } db, err := leveldb.Open(storage.NewMemStorage(), nil) s.Require().NoError(err) diff --git a/services/wakuext/service.go b/services/wakuext/service.go index 6358685f5..c3d86510d 100644 --- a/services/wakuext/service.go +++ b/services/wakuext/service.go @@ -15,14 +15,14 @@ type Service struct { w types.Waku } -func New(config params.ShhextConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service { +func New(config params.NodeConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service { w, err := n.GetWaku(nil) if err != nil { panic(err) } delay := ext.DefaultRequestsDelay - if config.RequestsDelay != 0 { - delay = config.RequestsDelay + if config.ShhextConfig.RequestsDelay != 0 { + delay = config.ShhextConfig.RequestsDelay } requestsRegistry := ext.NewRequestsRegistry(delay) mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry) diff --git a/services/wakuv2ext/service.go b/services/wakuv2ext/service.go index e108a47d3..45a4311d8 100644 --- a/services/wakuv2ext/service.go +++ b/services/wakuv2ext/service.go @@ -15,14 +15,14 @@ type Service struct { w types.Waku } -func New(config params.ShhextConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service { +func New(config params.NodeConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service { w, err := n.GetWakuV2(nil) if err != nil { panic(err) } delay := ext.DefaultRequestsDelay - if config.RequestsDelay != 0 { - delay = config.RequestsDelay + if config.ShhextConfig.RequestsDelay != 0 { + delay = config.ShhextConfig.RequestsDelay } requestsRegistry := ext.NewRequestsRegistry(delay) mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry) diff --git a/signal/events_shhext.go b/signal/events_shhext.go index 746bec0fc..fa14b0108 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/status-im/status-go/eth-node/types" ) @@ -49,6 +48,12 @@ const ( // EventBackupPerformed is triggered when a backup has been performed EventBackupPerformed = "backup.performed" + + // EventMailserverAvailable is triggered when a mailserver becomes available + EventMailserverAvailable = "mailserver.available" + + // EventMailserverChanged is triggered when switching the active mailserver + EventMailserverChanged = "mailserver.changed" ) // EnvelopeSignal includes hash of the envelope. @@ -84,6 +89,10 @@ type BundleAddedSignal struct { InstallationID string `json:"installationID"` } +type MailserverSignal struct { + Address string `json:"address"` +} + type Filter struct { // ChatID is the identifier of the chat ChatID string `json:"chatId"` @@ -195,3 +204,15 @@ func SendBundleAdded(identity string, installationID string) { func SendNewMessages(obj json.Marshaler) { send(EventNewMessages, obj) } + +func SendMailserverAvailable(nodeAddress string) { + send(EventMailserverAvailable, MailserverSignal{ + Address: nodeAddress, + }) +} + +func SendMailserverChanged(nodeAddress string) { + send(EventMailserverChanged, MailserverSignal{ + Address: nodeAddress, + }) +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 4ccef7a0a..5ea0ff1af 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -87,12 +87,6 @@ type settings struct { SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from } -type ConnStatus struct { - IsOnline bool `json:"isOnline"` - HasHistory bool `json:"hasHistory"` - Peers map[string][]string `json:"peers"` -} - // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { @@ -126,6 +120,9 @@ type Waku struct { storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDsMu sync.RWMutex + connStatusSubscriptions map[string]*types.ConnStatusSubscription + connStatusMu sync.Mutex + timeSource func() time.Time // source of time for waku logger *zap.Logger @@ -142,19 +139,20 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, logger.Debug("starting wakuv2 with config", zap.Any("config", cfg)) waku := &Waku{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), - expirations: make(map[uint32]mapset.Set), - msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), - sendQueue: make(chan *pb.WakuMessage, 1000), - quit: make(chan struct{}), - dnsAddressCache: make(map[string][]multiaddr.Multiaddr), - dnsAddressCacheLock: &sync.RWMutex{}, - storeMsgIDs: make(map[gethcommon.Hash]bool), - storeMsgIDsMu: sync.RWMutex{}, - timeSource: time.Now, - logger: logger, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), + expirations: make(map[uint32]mapset.Set), + msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), + sendQueue: make(chan *pb.WakuMessage, 1000), + connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), + quit: make(chan struct{}), + dnsAddressCache: make(map[string][]multiaddr.Multiaddr), + dnsAddressCacheLock: &sync.RWMutex{}, + storeMsgIDs: make(map[gethcommon.Hash]bool), + storeMsgIDsMu: sync.RWMutex{}, + timeSource: time.Now, + logger: logger, } waku.settings = settings{ @@ -271,7 +269,17 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, case <-waku.quit: return case c := <-connStatusChan: - signal.SendPeerStats(formatConnStatus(c)) + waku.connStatusMu.Lock() + latestConnStatus := formatConnStatus(c) + for k, subs := range waku.connStatusSubscriptions { + if subs.Active() { + subs.C <- latestConnStatus + } else { + delete(waku.connStatusSubscriptions, k) + } + } + waku.connStatusMu.Unlock() + signal.SendPeerStats(latestConnStatus) } } }() @@ -284,6 +292,14 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, return waku, nil } +func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { + w.connStatusMu.Lock() + defer w.connStatusMu.Unlock() + subscription := types.NewConnStatusSubscription() + w.connStatusSubscriptions[subscription.ID] = subscription + return subscription +} + type fnApplyToEachPeer func(ma multiaddr.Multiaddr, protocol libp2pproto.ID) func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApplyToEachPeer) { @@ -1151,8 +1167,8 @@ func FormatPeerStats(peers node.PeerStats) map[string][]string { return p } -func formatConnStatus(c node.ConnStatus) ConnStatus { - return ConnStatus{ +func formatConnStatus(c node.ConnStatus) types.ConnStatus { + return types.ConnStatus{ IsOnline: c.IsOnline, HasHistory: c.HasHistory, Peers: FormatPeerStats(c.Peers),