feat: desktop mailserver cycle (#2481)

This commit is contained in:
Richard Ramos 2022-01-12 16:02:01 +00:00 committed by GitHub
parent f1569e4bde
commit 98784b752a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 952 additions and 92 deletions

View File

@ -1180,7 +1180,6 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error {
return ErrWakuIdentityInjectionFailure
}
st := b.statusNode.WakuV2ExtService()
if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil {
return err
}

View File

@ -216,6 +216,7 @@ func main() {
identity,
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
installationID.String(),
nil,
options...,
)
if err != nil {

View File

@ -79,6 +79,10 @@ func (w *gethWakuWrapper) DropPeer(peerID string) error {
return errors.New("not available in WakuV1")
}
func (w *gethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) {
return nil, errors.New("not available in WakuV1")
}
// Peers function only added for compatibility with waku V2
func (w *gethWakuWrapper) Peers() map[string][]string {
p := make(map[string][]string)

View File

@ -263,6 +263,10 @@ func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}
func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) {
return w.waku.SubscribeToConnStatusChanges(), nil
}
type wakuV2FilterWrapper struct {
filter *wakucommon.Filter
id string

View File

@ -2,11 +2,49 @@ package types
import (
"crypto/ecdsa"
"sync"
"time"
"github.com/pborman/uuid"
"github.com/ethereum/go-ethereum/common"
)
type ConnStatus struct {
IsOnline bool `json:"isOnline"`
HasHistory bool `json:"hasHistory"`
Peers map[string][]string `json:"peers"`
}
type ConnStatusSubscription struct {
sync.RWMutex
ID string
C chan ConnStatus
active bool
}
func NewConnStatusSubscription() *ConnStatusSubscription {
return &ConnStatusSubscription{
ID: uuid.NewRandom().String(),
C: make(chan ConnStatus, 100),
active: true,
}
}
func (u *ConnStatusSubscription) Active() bool {
u.RLock()
defer u.RUnlock()
return u.active
}
func (u *ConnStatusSubscription) Unsubscribe() {
u.Lock()
defer u.Unlock()
close(u.C)
u.active = false
}
// Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku interface {
@ -34,6 +72,8 @@ type Waku interface {
DropPeer(peerID string) error
SubscribeToConnStatusChanges() (*ConnStatusSubscription, error)
// MinPow returns the PoW value required by this node.
MinPow() float64
// BloomFilter returns the aggregated bloom filter for all the topics of interest.

View File

@ -459,7 +459,7 @@ func (db *Database) SaveSetting(setting string, value interface{}) error {
}
func (db *Database) GetNodeConfig(nodecfg interface{}) error {
return db.db.QueryRow("SELECT node_config FROM settings WHERE synthetic_id = 'id'").Scan(&sqlite.JSONBlob{nodecfg})
return db.db.QueryRow("SELECT node_config FROM settings WHERE synthetic_id = 'id'").Scan(&sqlite.JSONBlob{Data: nodecfg})
}
func (db *Database) GetSettings() (Settings, error) {
@ -656,6 +656,14 @@ func (db *Database) GetPublicKey() (rst string, err error) {
return
}
func (db *Database) GetFleet() (rst string, err error) {
err = db.db.QueryRow("SELECT COALESCE(fleet, '') FROM settings WHERE synthetic_id = 'id'").Scan(&rst)
if err == sql.ErrNoRows {
return rst, nil
}
return
}
func (db *Database) GetDappsAddress() (rst types.Address, err error) {
err = db.db.QueryRow("SELECT dapps_address FROM settings WHERE synthetic_id = 'id'").Scan(&rst)
if err == sql.ErrNoRows {

View File

@ -153,7 +153,7 @@ func (b *StatusNode) wakuExtService(config *params.NodeConfig) (*wakuext.Service
}
if b.wakuExtSrvc == nil {
b.wakuExtSrvc = wakuext.New(config.ShhextConfig, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db)
b.wakuExtSrvc = wakuext.New(*config, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db)
}
b.wakuExtSrvc.SetP2PServer(b.gethNode.Server())
@ -165,7 +165,7 @@ func (b *StatusNode) wakuV2ExtService(config *params.NodeConfig) (*wakuv2ext.Ser
return nil, errors.New("geth node not initialized")
}
if b.wakuV2ExtSrvc == nil {
b.wakuV2ExtSrvc = wakuv2ext.New(config.ShhextConfig, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db)
b.wakuV2ExtSrvc = wakuv2ext.New(*config, b.nodeBridge(), ext.EnvelopeSignalHandler{}, b.db)
}
b.wakuV2ExtSrvc.SetP2PServer(b.gethNode.Server())

View File

@ -591,7 +591,8 @@ type ShhextConfig struct {
ConnectionTarget int
// RequestsDelay used to ensure that no similar requests are sent within short periods of time.
RequestsDelay time.Duration
// EnableMailserverCycle is used to enable the mailserver cycle to switch between trusted servers to retrieve the message history
EnableMailserverCycle bool
// MaxServerFailures defines maximum allowed expired requests before server will be swapped to another one.
MaxServerFailures int

View File

@ -8,4 +8,7 @@ type FeatureFlags struct {
// PushNotification indicates whether we should be enabling the push notification feature
PushNotifications bool
// MailserverCycle indicates whether we should enable or not the mailserver cycle
MailserverCycle bool
}

View File

@ -75,6 +75,7 @@ func (s *MessengerCommunitiesSuite) newMessengerWithOptions(shh types.Waku, priv
privateKey,
&testNode{shh: shh},
uuid.New().String(),
nil,
options...,
)
s.Require().NoError(err)

View File

@ -15,13 +15,18 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/event"
"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/appmetrics"
"github.com/status-im/status-go/connection"
@ -47,7 +52,8 @@ import (
"github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/services/ext/mailservers"
mailserversDB "github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/telemetry"
)
@ -81,6 +87,8 @@ var messageCacheIntervalMs uint64 = 1000 * 60 * 60 * 48
// mailservers because they can also be managed by the user.
type Messenger struct {
node types.Node
server *p2p.Server
peerStore *mailservers.PeerStore
config *config
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
@ -105,11 +113,13 @@ type Messenger struct {
modifiedInstallations *stringBoolMap
installationID string
mailserver []byte
mailserverCycle mailserverCycle
database *sql.DB
multiAccounts *multiaccounts.Database
mailservers *mailserversDB.Database
settings *accounts.Database
account *multiaccounts.Account
mailserversDatabase *mailservers.Database
mailserversDatabase *mailserversDB.Database
quit chan struct{}
requestedCommunities map[string]*transport.Filter
connectionState connection.State
@ -119,6 +129,28 @@ type Messenger struct {
mutex sync.Mutex
}
type connStatus int
const (
disconnected connStatus = iota + 1
connecting
connected
)
type peerStatus struct {
status connStatus
canConnectAfter time.Time
lastConnectionAttempt time.Time
}
type mailserverCycle struct {
sync.RWMutex
activeMailserver *enode.Node // For usage with wakuV1
activeStoreNode *peer.ID // For usage with wakuV2
peers map[string]peerStatus
events chan *p2p.PeerEvent
subscription event.Subscription
}
type dbConfig struct {
dbPath string
dbKey string
@ -175,6 +207,7 @@ func NewMessenger(
identity *ecdsa.PrivateKey,
node types.Node,
installationID string,
peerStore *mailservers.PeerStore,
opts ...Option,
) (*Messenger, error) {
var messenger *Messenger
@ -346,6 +379,7 @@ func NewMessenger(
return nil, err
}
settings := accounts.NewDB(database)
mailservers := mailserversDB.NewDB(database)
messenger = &Messenger{
config: &c,
node: node,
@ -372,6 +406,11 @@ func NewMessenger(
database: database,
multiAccounts: c.multiAccount,
settings: settings,
peerStore: peerStore,
mailservers: mailservers,
mailserverCycle: mailserverCycle{
peers: make(map[string]peerStatus),
},
mailserversDatabase: c.mailserversDatabase,
account: c.account,
quit: make(chan struct{}),
@ -410,6 +449,10 @@ func NewMessenger(
return messenger, nil
}
func (m *Messenger) SetP2PServer(server *p2p.Server) {
m.server = server
}
func (m *Messenger) processSentMessages(ids []string) error {
if m.connectionState.Offline {
return errors.New("Can't mark message as sent while offline")
@ -580,6 +623,13 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
}
if m.config.featureFlags.MailserverCycle {
err := m.StartMailserverCycle()
if err != nil {
return nil, err
}
}
return response, nil
}
@ -619,7 +669,12 @@ func (m *Messenger) handleConnectionChange(online bool) {
if m.pushNotificationClient != nil {
m.pushNotificationClient.Offline()
}
if m.config.featureFlags.MailserverCycle {
m.DisconnectActiveMailserver() // force mailserver cycle to run again
}
}
m.ensVerifier.SetOnline(online)
}

View File

@ -54,6 +54,7 @@ type config struct {
multiAccount *multiaccounts.Database
mailserversDatabase *mailservers.Database
account *multiaccounts.Account
clusterConfig params.ClusterConfig
verifyTransactionClient EthClient
verifyENSURL string
@ -231,3 +232,17 @@ func WithENSVerificationConfig(onENSVerified func(*MessengerResponse), url, addr
return nil
}
}
func WithClusterConfig(cc params.ClusterConfig) Option {
return func(c *config) error {
c.clusterConfig = cc
return nil
}
}
func WithMailserverCycle() func(c *config) error {
return func(c *config) error {
c.featureFlags.MailserverCycle = true
return nil
}
}

View File

@ -0,0 +1,666 @@
package protocol
import (
"context"
"crypto/rand"
"math"
"math/big"
"sort"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/go-waku/waku/v2/dnsdisc"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/signal"
)
const defaultBackoff = 30 * time.Second
type byRTTMs []*mailservers.PingResult
func (s byRTTMs) Len() int {
return len(s)
}
func (s byRTTMs) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s byRTTMs) Less(i, j int) bool {
return *s[i].RTTMs < *s[j].RTTMs
}
func (m *Messenger) StartMailserverCycle() error {
canUseMailservers, err := m.settings.CanUseMailservers()
if err != nil {
return err
}
if !canUseMailservers {
return errors.New("mailserver use is not allowed")
}
m.logger.Debug("started mailserver cycle")
m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20)
m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events)
go m.checkMailserverConnection()
go m.updateWakuV1PeerStatus()
go m.updateWakuV2PeerStatus()
return nil
}
func (m *Messenger) DisconnectActiveMailserver() {
m.mailserverCycle.Lock()
defer m.mailserverCycle.Unlock()
m.disconnectActiveMailserver()
}
func (m *Messenger) disconnectV1Mailserver() {
// TODO: remove this function once WakuV1 is deprecated
if m.mailserverCycle.activeMailserver == nil {
return
}
m.logger.Info("Disconnecting active mailserver", zap.Any("nodeID", m.mailserverCycle.activeMailserver.ID()))
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()]
if ok {
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] = pInfo
} else {
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] = peerStatus{
status: disconnected,
canConnectAfter: time.Now().Add(defaultBackoff),
}
}
m.server.RemovePeer(m.mailserverCycle.activeMailserver)
m.mailserverCycle.activeMailserver = nil
}
func (m *Messenger) disconnectStoreNode() {
if m.mailserverCycle.activeStoreNode == nil {
return
}
m.logger.Info("Disconnecting active storeNode", zap.Any("nodeID", m.mailserverCycle.activeStoreNode.Pretty()))
pInfo, ok := m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)]
if ok {
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] = pInfo
} else {
m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] = peerStatus{
status: disconnected,
canConnectAfter: time.Now().Add(defaultBackoff),
}
}
err := m.transport.DropPeer(string(*m.mailserverCycle.activeStoreNode))
if err != nil {
m.logger.Warn("Could not drop peer")
}
m.mailserverCycle.activeStoreNode = nil
}
func (m *Messenger) disconnectActiveMailserver() {
switch m.transport.WakuVersion() {
case 1:
m.disconnectV1Mailserver()
case 2:
m.disconnectStoreNode()
}
signal.SendMailserverChanged("")
}
func (m *Messenger) cycleMailservers() {
m.mailserverCycle.Lock()
defer m.mailserverCycle.Unlock()
m.logger.Info("Automatically switching mailserver")
if m.mailserverCycle.activeMailserver != nil {
m.disconnectActiveMailserver()
}
err := m.findNewMailserver()
if err != nil {
m.logger.Error("Error getting new mailserver", zap.Error(err))
}
}
func poolSize(fleetSize int) int {
return int(math.Ceil(float64(fleetSize) / 4))
}
func (m *Messenger) findNewMailserver() error {
switch m.transport.WakuVersion() {
case 1:
return m.findNewMailserverV1()
case 2:
return m.findStoreNode()
default:
return errors.New("waku version is not supported")
}
}
func (m *Messenger) findStoreNode() error {
allMailservers := parseStoreNodeConfig(m.config.clusterConfig.StoreNodes)
// TODO: append user mailservers once that functionality is available for waku2
var mailserverList []multiaddr.Multiaddr
now := time.Now()
for _, node := range allMailservers {
pID, err := getPeerID(node)
if err != nil {
continue
}
pInfo, ok := m.mailserverCycle.peers[string(pID)]
if !ok || pInfo.canConnectAfter.Before(now) {
mailserverList = append(mailserverList, node)
}
}
m.logger.Info("Finding a new store node...")
var mailserverStr []string
for _, m := range mailserverList {
mailserverStr = append(mailserverStr, m.String())
}
pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, mailservers.MultiAddressToAddress)
if err != nil {
return err
}
var availableMailservers []*mailservers.PingResult
for _, result := range pingResult {
if result.Err != nil {
continue // The results with error are ignored
}
availableMailservers = append(availableMailservers, result)
}
sort.Sort(byRTTMs(availableMailservers))
if len(availableMailservers) == 0 {
m.logger.Warn("No store nodes available") // Do nothing...
return nil
}
// Picks a random mailserver amongs the ones with the lowest latency
// The pool size is 1/4 of the mailservers were pinged successfully
pSize := poolSize(len(availableMailservers) - 1)
if pSize <= 0 {
m.logger.Warn("No store nodes available") // Do nothing...
return nil
}
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
if err != nil {
return err
}
return m.connectToStoreNode(parseMultiaddresses([]string{availableMailservers[r.Int64()].Address})[0])
}
func (m *Messenger) findNewMailserverV1() error {
// TODO: remove this function once WakuV1 is deprecated
allMailservers := parseNodes(m.config.clusterConfig.TrustedMailServers)
// Append user mailservers
var fleet string
dbFleet, err := m.settings.GetFleet()
if err != nil {
return err
}
if dbFleet != "" {
fleet = dbFleet
} else if m.config.clusterConfig.Fleet != "" {
fleet = m.config.clusterConfig.Fleet
} else {
fleet = params.FleetProd
}
customMailservers, err := m.mailservers.Mailservers()
if err != nil {
return err
}
for _, c := range customMailservers {
if c.Fleet == fleet {
mNode, err := enode.ParseV4(c.Address)
if err != nil {
allMailservers = append(allMailservers, mNode)
}
}
}
var mailserverList []*enode.Node
now := time.Now()
for _, node := range allMailservers {
pInfo, ok := m.mailserverCycle.peers[node.ID().String()]
if !ok || pInfo.canConnectAfter.Before(now) {
mailserverList = append(mailserverList, node)
}
}
m.logger.Info("Finding a new mailserver...")
var mailserverStr []string
for _, m := range mailserverList {
mailserverStr = append(mailserverStr, m.String())
}
pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, mailservers.EnodeStringToAddr)
if err != nil {
return err
}
var availableMailservers []*mailservers.PingResult
for _, result := range pingResult {
if result.Err != nil {
continue // The results with error are ignored
}
availableMailservers = append(availableMailservers, result)
}
sort.Sort(byRTTMs(availableMailservers))
if len(availableMailservers) == 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
// Picks a random mailserver amongs the ones with the lowest latency
// The pool size is 1/4 of the mailservers were pinged successfully
pSize := poolSize(len(availableMailservers) - 1)
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
if err != nil {
return err
}
return m.connectToMailserver(parseNodes([]string{availableMailservers[r.Int64()].Address})[0])
}
func (m *Messenger) activeMailserverStatus() (connStatus, error) {
var mailserverID string
switch m.transport.WakuVersion() {
case 1:
if m.mailserverCycle.activeMailserver == nil {
return disconnected, errors.New("Active mailserver is not set")
}
mailserverID = m.mailserverCycle.activeMailserver.ID().String()
case 2:
if m.mailserverCycle.activeStoreNode == nil {
return disconnected, errors.New("Active storenode is not set")
}
mailserverID = string(*m.mailserverCycle.activeStoreNode)
default:
return disconnected, errors.New("waku version is not supported")
}
return m.mailserverCycle.peers[mailserverID].status, nil
}
func (m *Messenger) connectToMailserver(node *enode.Node) error {
// TODO: remove this function once WakuV1 is deprecated
if m.transport.WakuVersion() != 1 {
return nil // This can only be used with wakuV1
}
m.logger.Info("Connecting to mailserver", zap.Any("peer", node.ID()))
nodeConnected := false
m.mailserverCycle.activeMailserver = node
signal.SendMailserverChanged(m.mailserverCycle.activeMailserver.String())
// Adding a peer and marking it as connected can't be executed sync in WakuV1, because
// There's a delay between requesting a peer being added, and a signal being
// received after the peer was added. So we first set the peer status as
// Connecting and once a peerConnected signal is received, we mark it as
// Connected
activeMailserverStatus, err := m.activeMailserverStatus()
if err != nil {
return err
}
if activeMailserverStatus == connected {
nodeConnected = true
} else {
// Attempt to connect to mailserver by adding it as a peer
m.SetMailserver(node.ID().Bytes())
m.server.AddPeer(node)
if err := m.peerStore.Update([]*enode.Node{node}); err != nil {
return err
}
pInfo, ok := m.mailserverCycle.peers[node.ID().String()]
if ok {
pInfo.status = connecting
pInfo.lastConnectionAttempt = time.Now()
m.mailserverCycle.peers[node.ID().String()] = pInfo
} else {
m.mailserverCycle.peers[node.ID().String()] = peerStatus{
status: connecting,
lastConnectionAttempt: time.Now(),
}
}
}
if nodeConnected {
m.logger.Info("Mailserver available")
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.String())
}
return nil
}
func (m *Messenger) connectToStoreNode(node multiaddr.Multiaddr) error {
if m.transport.WakuVersion() != 2 {
return nil // This can only be used with wakuV2
}
m.logger.Info("Connecting to storenode", zap.Any("peer", node))
nodeConnected := false
peerID, err := getPeerID(node)
if err != nil {
return err
}
m.mailserverCycle.activeStoreNode = &peerID
signal.SendMailserverChanged(m.mailserverCycle.activeStoreNode.Pretty())
// Adding a peer and marking it as connected can't be executed sync in WakuV1, because
// There's a delay between requesting a peer being added, and a signal being
// received after the peer was added. So we first set the peer status as
// Connecting and once a peerConnected signal is received, we mark it as
// Connected
activeMailserverStatus, err := m.activeMailserverStatus()
if err != nil {
return err
}
if activeMailserverStatus == connected {
nodeConnected = true
} else {
// Attempt to connect to mailserver by adding it as a peer
m.SetMailserver([]byte(peerID.Pretty()))
if err := m.transport.DialPeer(node.String()); err != nil {
return err
}
pInfo, ok := m.mailserverCycle.peers[string(peerID)]
if ok {
pInfo.status = connected
pInfo.lastConnectionAttempt = time.Now()
} else {
m.mailserverCycle.peers[string(peerID)] = peerStatus{
status: connected,
lastConnectionAttempt: time.Now(),
}
}
nodeConnected = true
}
if nodeConnected {
m.logger.Info("Storenode available")
signal.SendMailserverAvailable(m.mailserverCycle.activeStoreNode.Pretty())
}
return nil
}
func (m *Messenger) isActiveMailserverAvailable() bool {
m.mailserverCycle.RLock()
defer m.mailserverCycle.RUnlock()
mailserverStatus, err := m.activeMailserverStatus()
if err != nil {
return false
}
return mailserverStatus == connected
}
func (m *Messenger) updateWakuV2PeerStatus() {
if m.transport.WakuVersion() != 2 {
return // This can only be used with wakuV2
}
connSubscription, err := m.transport.SubscribeToConnStatusChanges()
if err != nil {
m.logger.Error("Could not subscribe to connection status changes", zap.Error(err))
}
for {
select {
case status := <-connSubscription.C:
m.mailserverCycle.Lock()
for pID, pInfo := range m.mailserverCycle.peers {
if pInfo.status == disconnected {
continue
}
// Removing disconnected
found := false
for connectedPeer := range status.Peers {
peerID, err := peer.Decode(connectedPeer)
if err != nil {
continue
}
if string(peerID) == pID {
found = true
break
}
}
if !found && pInfo.status == connected {
m.logger.Info("Peer disconnected", zap.String("peer", peer.ID(pID).Pretty()))
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
}
m.mailserverCycle.peers[pID] = pInfo
}
for connectedPeer := range status.Peers {
peerID, err := peer.Decode(connectedPeer)
if err != nil {
continue
}
pInfo, ok := m.mailserverCycle.peers[string(peerID)]
if !ok || pInfo.status != connected {
m.logger.Info("Peer connected", zap.String("peer", connectedPeer))
pInfo.status = connected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
m.mailserverCycle.peers[string(peerID)] = pInfo
}
}
m.mailserverCycle.Unlock()
case <-m.quit:
connSubscription.Unsubscribe()
return
}
}
}
func (m *Messenger) updateWakuV1PeerStatus() {
// TODO: remove this function once WakuV1 is deprecated
if m.transport.WakuVersion() != 1 {
return // This can only be used with wakuV1
}
for {
select {
case <-m.mailserverCycle.events:
connectedPeers := m.server.PeersInfo()
m.mailserverCycle.Lock()
for pID, pInfo := range m.mailserverCycle.peers {
if pInfo.status == disconnected {
continue
}
// Removing disconnected
found := false
for _, connectedPeer := range connectedPeers {
if enode.HexID(connectedPeer.ID) == enode.HexID(pID) {
found = true
break
}
}
if !found && (pInfo.status == connected || (pInfo.status == connecting && pInfo.lastConnectionAttempt.Add(8*time.Second).Before(time.Now()))) {
m.logger.Info("Peer disconnected", zap.String("peer", enode.HexID(pID).String()))
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
}
m.mailserverCycle.peers[pID] = pInfo
}
for _, connectedPeer := range connectedPeers {
hexID := enode.HexID(connectedPeer.ID).String()
pInfo, ok := m.mailserverCycle.peers[hexID]
if !ok || pInfo.status != connected {
m.logger.Info("Peer connected", zap.String("peer", hexID))
pInfo.status = connected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
if m.mailserverCycle.activeMailserver != nil && hexID == m.mailserverCycle.activeMailserver.ID().String() {
m.logger.Info("Mailserver available")
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.String())
}
m.mailserverCycle.peers[hexID] = pInfo
}
}
m.mailserverCycle.Unlock()
case <-m.quit:
m.mailserverCycle.Lock()
defer m.mailserverCycle.Unlock()
close(m.mailserverCycle.events)
m.mailserverCycle.subscription.Unsubscribe()
return
}
}
}
func (m *Messenger) checkMailserverConnection() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
m.logger.Info("Verifying mailserver connection state...")
// m.settings.GetPinnedMailserver
//if pinnedMailserver != "" && self.activeMailserver != pinnedMailserver {
// connect to current mailserver from the settings
// self.mailservers = pinnedMailserver
// self.connect(pinnedMailserver)
//} else {
// or setup a random mailserver:
if !m.isActiveMailserverAvailable() {
m.cycleMailservers()
}
// }
select {
case <-m.quit:
return
case <-ticker.C:
continue
}
}
}
func parseNodes(enodes []string) []*enode.Node {
var nodes []*enode.Node
for _, item := range enodes {
parsedPeer, err := enode.ParseV4(item)
if err == nil {
nodes = append(nodes, parsedPeer)
}
}
return nodes
}
func parseMultiaddresses(addresses []string) []multiaddr.Multiaddr {
var result []multiaddr.Multiaddr
for _, item := range addresses {
ma, err := multiaddr.NewMultiaddr(item)
if err == nil {
result = append(result, ma)
}
}
return result
}
func parseStoreNodeConfig(addresses []string) []multiaddr.Multiaddr {
// TODO: once a scoring/reputation mechanism is added to waku,
// this function can be modified to retrieve the storenodes
// from waku peerstore.
// We don't do that now because we can't trust any random storenode
// So we use only those specified in the cluster config
var result []multiaddr.Multiaddr
var dnsDiscWg sync.WaitGroup
maChan := make(chan multiaddr.Multiaddr, 1000)
for _, addrString := range addresses {
if strings.HasPrefix(addrString, "enrtree://") {
// Use DNS Discovery
dnsDiscWg.Add(1)
go func(addr string) {
defer dnsDiscWg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
multiaddresses, err := dnsdisc.RetrieveNodes(ctx, addr)
if err == nil {
for _, ma := range multiaddresses {
maChan <- ma
}
}
}(addrString)
} else {
// It's a normal multiaddress
ma, err := multiaddr.NewMultiaddr(addrString)
if err == nil {
maChan <- ma
}
}
}
dnsDiscWg.Wait()
close(maChan)
for ma := range maChan {
result = append(result, ma)
}
return result
}
func getPeerID(addr multiaddr.Multiaddr) (peer.ID, error) {
idStr, err := addr.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return "", err
}
return peer.Decode(idStr)
}

View File

@ -55,22 +55,22 @@ func TestMessengerResponseMergeNotImplemented(t *testing.T) {
response1 := &MessengerResponse{}
response2 := &MessengerResponse{
Contacts: []*Contact{&Contact{}},
Contacts: []*Contact{{}},
}
require.Error(t, response1.Merge(response2))
response2 = &MessengerResponse{
Installations: []*multidevice.Installation{&multidevice.Installation{}},
Installations: []*multidevice.Installation{{}},
}
require.Error(t, response1.Merge(response2))
response2 = &MessengerResponse{
EmojiReactions: []*EmojiReaction{&EmojiReaction{}},
EmojiReactions: []*EmojiReaction{{}},
}
require.Error(t, response1.Merge(response2))
response2 = &MessengerResponse{
Invitations: []*GroupChatInvitation{&GroupChatInvitation{}},
Invitations: []*GroupChatInvitation{{}},
}
require.Error(t, response1.Merge(response2))

View File

@ -145,6 +145,7 @@ func newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey, logger *z
privateKey,
&testNode{shh: shh},
uuid.New().String(),
nil,
options...,
)
if err != nil {

View File

@ -642,3 +642,7 @@ func (t *Transport) ProcessingP2PMessages() bool {
func (t *Transport) MarkP2PMessageAsProcessed(hash common.Hash) {
t.waku.MarkP2PMessageAsProcessed(hash)
}
func (t *Transport) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) {
return t.waku.SubscribeToConnStatusChanges()
}

View File

@ -909,6 +909,10 @@ func (api *PublicAPI) RequestAllHistoricMessages() (*protocol.MessengerResponse,
return api.service.messenger.RequestAllHistoricMessages()
}
func (api *PublicAPI) DisconnectActiveMailserver() {
api.service.messenger.DisconnectActiveMailserver()
}
// Echo is a method for testing purposes.
func (api *PublicAPI) Echo(ctx context.Context, message string) (string, error) {
return message, nil

View File

@ -58,7 +58,7 @@ type Service struct {
cancelMessenger chan struct{}
storage db.TransactionalStorage
n types.Node
config params.ShhextConfig
config params.NodeConfig
mailMonitor *MailRequestMonitor
server *p2p.Server
peerStore *mailservers.PeerStore
@ -71,7 +71,7 @@ type Service struct {
var _ node.Lifecycle = (*Service)(nil)
func New(
config params.ShhextConfig,
config params.NodeConfig,
n types.Node,
ldb *leveldb.DB,
mailMonitor *MailRequestMonitor,
@ -103,7 +103,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) {
}
func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db *sql.DB, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, logger *zap.Logger) error {
if !s.config.PFSEnabled {
if !s.config.ShhextConfig.PFSEnabled {
return nil
}
@ -118,15 +118,15 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db *
s.identity = identity
dataDir := filepath.Clean(s.config.BackupDisabledDataDir)
dataDir := filepath.Clean(s.config.ShhextConfig.BackupDisabledDataDir)
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
return err
}
envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{
MaxAttempts: s.config.MaxMessageDeliveryAttempts,
AwaitOnlyMailServerConfirmations: s.config.MailServerConfirmations,
MaxAttempts: s.config.ShhextConfig.MaxMessageDeliveryAttempts,
AwaitOnlyMailServerConfirmations: s.config.ShhextConfig.MailServerConfirmations,
IsMailserver: func(peer types.EnodeID) bool {
return s.peerStore.Exist(peer)
},
@ -146,13 +146,15 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db *
nodeName,
identity,
s.n,
s.config.InstallationID,
s.config.ShhextConfig.InstallationID,
s.peerStore,
options...,
)
if err != nil {
return err
}
s.messenger = messenger
s.messenger.SetP2PServer(s.server)
return messenger.Init()
}
@ -166,7 +168,7 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) {
go s.retrieveMessagesLoop(time.Second, s.cancelMessenger)
go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger)
if s.config.BandwidthStatsEnabled {
if s.config.ShhextConfig.BandwidthStatsEnabled {
go s.retrieveStats(5*time.Second, s.cancelMessenger)
}
@ -282,7 +284,7 @@ func (c *verifyTransactionClient) TransactionByHash(ctx context.Context, hash ty
}
func (s *Service) verifyTransactionLoop(tick time.Duration, cancel <-chan struct{}) {
if s.config.VerifyTransactionURL == "" {
if s.config.ShhextConfig.VerifyTransactionURL == "" {
log.Warn("not starting transaction loop")
return
}
@ -392,7 +394,7 @@ func (s *Service) Stop() error {
}
func buildMessengerOptions(
config params.ShhextConfig,
config params.NodeConfig,
identity *ecdsa.PrivateKey,
db *sql.DB,
multiAccounts *multiaccounts.Database,
@ -411,13 +413,18 @@ func buildMessengerOptions(
protocol.WithAccount(account),
protocol.WithEnvelopesMonitorConfig(envelopesMonitorConfig),
protocol.WithSignalsHandler(messengerSignalsHandler),
protocol.WithENSVerificationConfig(publishMessengerResponse, config.VerifyENSURL, config.VerifyENSContractAddress),
protocol.WithENSVerificationConfig(publishMessengerResponse, config.ShhextConfig.VerifyENSURL, config.ShhextConfig.VerifyENSContractAddress),
protocol.WithClusterConfig(config.ClusterConfig),
}
if config.DataSyncEnabled {
if config.ShhextConfig.DataSyncEnabled {
options = append(options, protocol.WithDatasync())
}
if config.ShhextConfig.EnableMailserverCycle {
options = append(options, protocol.WithMailserverCycle())
}
settings, err := accountsDB.GetSettings()
if err != sql.ErrNoRows && err != nil {
return nil, err
@ -425,7 +432,7 @@ func buildMessengerOptions(
// Generate anon metrics client config
if settings.AnonMetricsShouldSend {
keyBytes, err := hex.DecodeString(config.AnonMetricsSendID)
keyBytes, err := hex.DecodeString(config.ShhextConfig.AnonMetricsSendID)
if err != nil {
return nil, err
}
@ -443,14 +450,14 @@ func buildMessengerOptions(
}
// Generate anon metrics server config
if config.AnonMetricsServerEnabled {
if len(config.AnonMetricsServerPostgresURI) == 0 {
if config.ShhextConfig.AnonMetricsServerEnabled {
if len(config.ShhextConfig.AnonMetricsServerPostgresURI) == 0 {
return nil, errors.New("AnonMetricsServerPostgresURI must be set")
}
amsc := &anonmetrics.ServerConfig{
Enabled: true,
PostgresURI: config.AnonMetricsServerPostgresURI,
PostgresURI: config.ShhextConfig.AnonMetricsServerPostgresURI,
}
options = append(options, protocol.WithAnonMetricsServerConfig(amsc))
}
@ -468,17 +475,17 @@ func buildMessengerOptions(
}
options = append(options, protocol.WithPushNotificationClientConfig(&pushnotificationclient.Config{
DefaultServers: config.DefaultPushNotificationsServers,
DefaultServers: config.ShhextConfig.DefaultPushNotificationsServers,
BlockMentions: settings.PushNotificationsBlockMentions,
SendEnabled: settings.SendPushNotifications,
AllowFromContactsOnly: settings.PushNotificationsFromContactsOnly,
RemoteNotificationsEnabled: settings.RemotePushNotificationsEnabled,
}))
if config.VerifyTransactionURL != "" {
if config.ShhextConfig.VerifyTransactionURL != "" {
client := &verifyTransactionClient{
url: config.VerifyTransactionURL,
chainID: big.NewInt(config.VerifyTransactionChainID),
url: config.ShhextConfig.VerifyTransactionURL,
chainID: big.NewInt(config.ShhextConfig.VerifyTransactionChainID),
}
options = append(options, protocol.WithVerifyTransactionClient(client))
}

View File

@ -38,13 +38,9 @@ func (pr *PingResult) Update(rttMs int, err error) {
}
}
func enodeToAddr(enodeAddr string) (string, error) {
node, err := enode.ParseV4(enodeAddr)
if err != nil {
return "", err
}
func EnodeToAddr(node *enode.Node) (string, error) {
var ip4 enr.IPv4
err = node.Load(&ip4)
err := node.Load(&ip4)
if err != nil {
return "", err
}
@ -56,6 +52,14 @@ func enodeToAddr(enodeAddr string) (string, error) {
return fmt.Sprintf("%s:%d", net.IP(ip4).String(), tcp), nil
}
func EnodeStringToAddr(enodeAddr string) (string, error) {
node, err := enode.ParseV4(enodeAddr)
if err != nil {
return "", err
}
return EnodeToAddr(node)
}
func parse(addresses []string, fn parseFn) (map[string]*PingResult, []string) {
results := make(map[string]*PingResult, len(addresses))
var toPing []string
@ -81,10 +85,10 @@ func mapValues(m map[string]*PingResult) []*PingResult {
return rval
}
func ping(ctx context.Context, pq PingQuery, p parseFn) ([]*PingResult, error) {
timeout := time.Duration(pq.TimeoutMs) * time.Millisecond
func DoPing(ctx context.Context, addresses []string, timeoutMs int, p parseFn) ([]*PingResult, error) {
timeout := time.Duration(timeoutMs) * time.Millisecond
resultsMap, toPing := parse(pq.Addresses, p)
resultsMap, toPing := parse(addresses, p)
// run the checks concurrently
results, err := rtt.CheckHosts(toPing, timeout)
@ -106,10 +110,10 @@ func ping(ctx context.Context, pq PingQuery, p parseFn) ([]*PingResult, error) {
}
func (a *API) Ping(ctx context.Context, pq PingQuery) ([]*PingResult, error) {
return ping(ctx, pq, enodeToAddr)
return DoPing(ctx, pq.Addresses, pq.TimeoutMs, EnodeStringToAddr)
}
func multiAddressToAddress(multiAddr string) (string, error) {
func MultiAddressToAddress(multiAddr string) (string, error) {
ma, err := multiaddr.NewMultiaddr(multiAddr)
if err != nil {
@ -129,5 +133,5 @@ func multiAddressToAddress(multiAddr string) (string, error) {
}
func (a *API) MultiAddressPing(ctx context.Context, pq PingQuery) ([]*PingResult, error) {
return ping(ctx, pq, multiAddressToAddress)
return DoPing(ctx, pq.Addresses, pq.TimeoutMs, MultiAddressToAddress)
}

View File

@ -53,10 +53,12 @@ func TestRequestMessagesErrors(t *testing.T) {
defer func() { require.NoError(t, aNode.Close()) }()
handler := ext.NewHandlerMock(1)
config := params.ShhextConfig{
config := params.NodeConfig{
ShhextConfig: params.ShhextConfig{
InstallationID: "1",
BackupDisabledDataDir: os.TempDir(),
PFSEnabled: true,
},
}
nodeWrapper := ext.NewTestNodeWrapper(nil, waku)
service := New(config, nodeWrapper, handler, nil)
@ -102,12 +104,14 @@ func TestInitProtocol(t *testing.T) {
directory, err := ioutil.TempDir("", "status-go-testing")
require.NoError(t, err)
config := params.ShhextConfig{
config := params.NodeConfig{
ShhextConfig: params.ShhextConfig{
InstallationID: "2",
BackupDisabledDataDir: directory,
PFSEnabled: true,
MailServerConfirmations: true,
ConnectionTarget: 10,
},
}
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
@ -171,12 +175,14 @@ func (s *ShhExtSuite) createAndAddNode() {
s.NoError(err)
// set up protocol
config := params.ShhextConfig{
config := params.NodeConfig{
ShhextConfig: params.ShhextConfig{
InstallationID: "1",
BackupDisabledDataDir: s.dir,
PFSEnabled: true,
MailServerConfirmations: true,
ConnectionTarget: 10,
},
}
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)

View File

@ -15,14 +15,14 @@ type Service struct {
w types.Waku
}
func New(config params.ShhextConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service {
func New(config params.NodeConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service {
w, err := n.GetWaku(nil)
if err != nil {
panic(err)
}
delay := ext.DefaultRequestsDelay
if config.RequestsDelay != 0 {
delay = config.RequestsDelay
if config.ShhextConfig.RequestsDelay != 0 {
delay = config.ShhextConfig.RequestsDelay
}
requestsRegistry := ext.NewRequestsRegistry(delay)
mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry)

View File

@ -15,14 +15,14 @@ type Service struct {
w types.Waku
}
func New(config params.ShhextConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service {
func New(config params.NodeConfig, n types.Node, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service {
w, err := n.GetWakuV2(nil)
if err != nil {
panic(err)
}
delay := ext.DefaultRequestsDelay
if config.RequestsDelay != 0 {
delay = config.RequestsDelay
if config.ShhextConfig.RequestsDelay != 0 {
delay = config.ShhextConfig.RequestsDelay
}
requestsRegistry := ext.NewRequestsRegistry(delay)
mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry)

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/eth-node/types"
)
@ -49,6 +48,12 @@ const (
// EventBackupPerformed is triggered when a backup has been performed
EventBackupPerformed = "backup.performed"
// EventMailserverAvailable is triggered when a mailserver becomes available
EventMailserverAvailable = "mailserver.available"
// EventMailserverChanged is triggered when switching the active mailserver
EventMailserverChanged = "mailserver.changed"
)
// EnvelopeSignal includes hash of the envelope.
@ -84,6 +89,10 @@ type BundleAddedSignal struct {
InstallationID string `json:"installationID"`
}
type MailserverSignal struct {
Address string `json:"address"`
}
type Filter struct {
// ChatID is the identifier of the chat
ChatID string `json:"chatId"`
@ -195,3 +204,15 @@ func SendBundleAdded(identity string, installationID string) {
func SendNewMessages(obj json.Marshaler) {
send(EventNewMessages, obj)
}
func SendMailserverAvailable(nodeAddress string) {
send(EventMailserverAvailable, MailserverSignal{
Address: nodeAddress,
})
}
func SendMailserverChanged(nodeAddress string) {
send(EventMailserverChanged, MailserverSignal{
Address: nodeAddress,
})
}

View File

@ -87,12 +87,6 @@ type settings struct {
SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from
}
type ConnStatus struct {
IsOnline bool `json:"isOnline"`
HasHistory bool `json:"hasHistory"`
Peers map[string][]string `json:"peers"`
}
// Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku struct {
@ -126,6 +120,9 @@ type Waku struct {
storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
storeMsgIDsMu sync.RWMutex
connStatusSubscriptions map[string]*types.ConnStatusSubscription
connStatusMu sync.Mutex
timeSource func() time.Time // source of time for waku
logger *zap.Logger
@ -148,6 +145,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *pb.WakuMessage, 1000),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
quit: make(chan struct{}),
dnsAddressCache: make(map[string][]multiaddr.Multiaddr),
dnsAddressCacheLock: &sync.RWMutex{},
@ -271,7 +269,17 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
case <-waku.quit:
return
case c := <-connStatusChan:
signal.SendPeerStats(formatConnStatus(c))
waku.connStatusMu.Lock()
latestConnStatus := formatConnStatus(c)
for k, subs := range waku.connStatusSubscriptions {
if subs.Active() {
subs.C <- latestConnStatus
} else {
delete(waku.connStatusSubscriptions, k)
}
}
waku.connStatusMu.Unlock()
signal.SendPeerStats(latestConnStatus)
}
}
}()
@ -284,6 +292,14 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
return waku, nil
}
func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription {
w.connStatusMu.Lock()
defer w.connStatusMu.Unlock()
subscription := types.NewConnStatusSubscription()
w.connStatusSubscriptions[subscription.ID] = subscription
return subscription
}
type fnApplyToEachPeer func(ma multiaddr.Multiaddr, protocol libp2pproto.ID)
func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApplyToEachPeer) {
@ -1151,8 +1167,8 @@ func FormatPeerStats(peers node.PeerStats) map[string][]string {
return p
}
func formatConnStatus(c node.ConnStatus) ConnStatus {
return ConnStatus{
func formatConnStatus(c node.ConnStatus) types.ConnStatus {
return types.ConnStatus{
IsOnline: c.IsOnline,
HasHistory: c.HasHistory,
Peers: FormatPeerStats(c.Peers),