status-go/protocol/messenger_mailserver_cycle.go

712 lines
19 KiB
Go

package protocol
import (
"context"
"crypto/rand"
"fmt"
"math"
"math/big"
"sort"
"strings"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/signal"
)
const defaultBackoff = 10 * time.Second
const graylistBackoff = 3 * time.Minute
func (m *Messenger) mailserversByFleet(fleet string) []mailservers.Mailserver {
var items []mailservers.Mailserver
for _, ms := range mailservers.DefaultMailservers() {
if ms.Fleet == fleet {
items = append(items, ms)
}
}
return items
}
type byRTTMsAndCanConnectBefore []SortedMailserver
func (s byRTTMsAndCanConnectBefore) Len() int {
return len(s)
}
func (s byRTTMsAndCanConnectBefore) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool {
// Slightly inaccurate as time sensitive sorting, but it does not matter so much
now := time.Now()
if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) {
return s[i].RTTMs < s[j].RTTMs
}
return s[i].CanConnectAfter.Before(s[j].CanConnectAfter)
}
func (m *Messenger) activeMailserverID() ([]byte, error) {
if m.mailserverCycle.activeMailserver == nil {
return nil, nil
}
return m.mailserverCycle.activeMailserver.IDBytes()
}
func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error {
m.mailserverCycle.allMailservers = mailservers
version := m.transport.WakuVersion()
switch version {
case 1:
if m.server == nil {
m.logger.Warn("not starting mailserver cycle")
return nil
}
m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20)
m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events)
go m.updateWakuV1PeerStatus()
case 2:
for _, storenode := range mailservers {
_, err := m.transport.AddStorePeer(storenode.Address)
if err != nil {
return err
}
}
go m.verifyStorenodeStatus()
default:
return fmt.Errorf("unsupported waku version: %d", version)
}
m.logger.Debug("starting mailserver cycle",
zap.Uint("WakuVersion", m.transport.WakuVersion()),
zap.Any("mailservers", mailservers),
)
return nil
}
func (m *Messenger) DisconnectActiveMailserver() {
m.mailserverCycle.Lock()
defer m.mailserverCycle.Unlock()
m.disconnectActiveMailserver()
}
func (m *Messenger) disconnectMailserver() error {
if m.mailserverCycle.activeMailserver == nil {
m.logger.Info("no active mailserver")
return nil
}
m.logger.Info("disconnecting active mailserver", zap.String("nodeID", m.mailserverCycle.activeMailserver.ID))
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID]
if ok {
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = pInfo
} else {
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = peerStatus{
status: disconnected,
mailserver: *m.mailserverCycle.activeMailserver,
canConnectAfter: time.Now().Add(graylistBackoff),
}
}
m.mailPeersMutex.Unlock()
// WakuV2 does not keep an active storenode connection
if m.mailserverCycle.activeMailserver.Version == 1 {
node, err := m.mailserverCycle.activeMailserver.Enode()
if err != nil {
return err
}
m.server.RemovePeer(node)
}
m.mailserverCycle.activeMailserver = nil
return nil
}
func (m *Messenger) disconnectActiveMailserver() {
err := m.disconnectMailserver()
if err != nil {
m.logger.Error("failed to disconnect mailserver", zap.Error(err))
}
signal.SendMailserverChanged("", "")
}
func (m *Messenger) cycleMailservers() {
m.logger.Info("Automatically switching mailserver")
if m.mailserverCycle.activeMailserver != nil {
m.disconnectActiveMailserver()
}
err := m.findNewMailserver()
if err != nil {
m.logger.Error("Error getting new mailserver", zap.Error(err))
}
}
func poolSize(fleetSize int) int {
return int(math.Ceil(float64(fleetSize) / 4))
}
func (m *Messenger) getFleet() (string, error) {
var fleet string
dbFleet, err := m.settings.GetFleet()
if err != nil {
return "", err
}
if dbFleet != "" {
fleet = dbFleet
} else if m.config.clusterConfig.Fleet != "" {
fleet = m.config.clusterConfig.Fleet
} else {
fleet = params.FleetProd
}
return fleet, nil
}
func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) {
// Get configured fleet
fleet, err := m.getFleet()
if err != nil {
return nil, err
}
// Get default mailservers for given fleet
allMailservers := m.mailserversByFleet(fleet)
// Add custom configured mailservers
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return nil, err
}
for _, c := range customMailservers {
if c.Fleet == fleet {
c.Version = m.transport.WakuVersion()
allMailservers = append(allMailservers, c)
}
}
}
// Filter mailservers by configured waku version
wakuVersion := m.transport.WakuVersion()
matchingMailservers := make([]mailservers.Mailserver, 0, len(allMailservers))
for _, ms := range allMailservers {
if ms.Version == wakuVersion {
matchingMailservers = append(matchingMailservers, ms)
}
}
return matchingMailservers, nil
}
type SortedMailserver struct {
Address string
RTTMs int
CanConnectAfter time.Time
}
func (m *Messenger) findNewMailserver() error {
pinnedMailserver, err := m.getPinnedMailserver()
if err != nil {
m.logger.Error("Could not obtain the pinned mailserver", zap.Error(err))
return err
}
if pinnedMailserver != nil {
return m.connectToMailserver(*pinnedMailserver)
}
allMailservers := m.mailserverCycle.allMailservers
m.logger.Info("Finding a new mailserver...")
var mailserverStr []string
for _, m := range allMailservers {
mailserverStr = append(mailserverStr, m.Address)
}
if len(allMailservers) == 0 {
m.logger.Warn("no mailservers available") // Do nothing...
return nil
}
var parseFn func(string) (string, error)
if allMailservers[0].Version == 2 {
parseFn = mailservers.MultiAddressToAddress
} else {
parseFn = mailservers.EnodeStringToAddr
}
pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, parseFn)
if err != nil {
return err
}
var availableMailservers []*mailservers.PingResult
for _, result := range pingResult {
if result.Err != nil {
m.logger.Info("connecting error", zap.String("err", *result.Err))
continue // The results with error are ignored
}
availableMailservers = append(availableMailservers, result)
}
if len(availableMailservers) == 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
mailserversByAddress := make(map[string]mailservers.Mailserver)
for idx := range allMailservers {
mailserversByAddress[allMailservers[idx].Address] = allMailservers[idx]
}
var sortedMailservers []SortedMailserver
for _, ping := range availableMailservers {
address := ping.Address
ms := mailserversByAddress[address]
sortedMailserver := SortedMailserver{
Address: address,
RTTMs: *ping.RTTMs,
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[ms.ID]
m.mailPeersMutex.Unlock()
if ok {
if time.Now().Before(pInfo.canConnectAfter) {
continue // We can't connect to this node yet
}
}
sortedMailservers = append(sortedMailservers, sortedMailserver)
}
sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers))
// Picks a random mailserver amongs the ones with the lowest latency
// The pool size is 1/4 of the mailservers were pinged successfully
pSize := poolSize(len(sortedMailservers) - 1)
if pSize <= 0 {
pSize = len(sortedMailservers)
if pSize <= 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
}
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
if err != nil {
return err
}
msPing := sortedMailservers[r.Int64()]
ms := mailserversByAddress[msPing.Address]
m.logger.Info("connecting to mailserver", zap.String("address", ms.Address))
return m.connectToMailserver(ms)
}
func (m *Messenger) activeMailserverStatus() (connStatus, error) {
if m.mailserverCycle.activeMailserver == nil {
return disconnected, errors.New("Active mailserver is not set")
}
mailserverID := m.mailserverCycle.activeMailserver.ID
m.mailPeersMutex.Lock()
status := m.mailserverCycle.peers[mailserverID].status
m.mailPeersMutex.Unlock()
return status, nil
}
func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
m.logger.Info("connecting to mailserver", zap.Any("peer", ms.ID))
m.mailserverCycle.activeMailserver = &ms
signal.SendMailserverChanged(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Adding a peer and marking it as connected can't be executed sync in WakuV1, because
// There's a delay between requesting a peer being added, and a signal being
// received after the peer was added. So we first set the peer status as
// Connecting and once a peerConnected signal is received, we mark it as
// Connected
activeMailserverStatus, err := m.activeMailserverStatus()
if err != nil {
return err
}
if ms.Version != m.transport.WakuVersion() {
return errors.New("mailserver waku version doesn't match")
}
if activeMailserverStatus != connected {
// WakuV2 does not require having the peer connected to query the peer
// Attempt to connect to mailserver by adding it as a peer
if ms.Version == 1 {
node, err := ms.Enode()
if err != nil {
return err
}
m.server.AddPeer(node)
if err := m.peerStore.Update([]*enode.Node{node}); err != nil {
return err
}
}
connectionStatus := connecting
if ms.Version == 2 {
connectionStatus = connected
}
m.mailPeersMutex.Lock()
m.mailserverCycle.peers[ms.ID] = peerStatus{
status: connectionStatus,
lastConnectionAttempt: time.Now(),
canConnectAfter: time.Now().Add(defaultBackoff),
mailserver: ms,
}
m.mailPeersMutex.Unlock()
if ms.Version == 2 {
m.mailserverCycle.activeMailserver.FailedRequests = 0
m.logger.Info("mailserver available", zap.String("address", m.mailserverCycle.activeMailserver.UniqueID()))
m.EmitMailserverAvailable()
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Query mailserver
go func() {
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(false) })
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
}
}()
}
}
return nil
}
func (m *Messenger) getActiveMailserver() *mailservers.Mailserver {
return m.mailserverCycle.activeMailserver
}
func (m *Messenger) isActiveMailserverAvailable() bool {
mailserverStatus, err := m.activeMailserverStatus()
if err != nil {
return false
}
return mailserverStatus == connected
}
func (m *Messenger) mailserverAddressToID(uniqueID string, allMailservers []mailservers.Mailserver) (string, error) {
for _, ms := range allMailservers {
if uniqueID == ms.UniqueID() {
return ms.ID, nil
}
}
return "", nil
}
type ConnectedPeer struct {
UniqueID string
}
func (m *Messenger) mailserverPeersInfo() []ConnectedPeer {
var connectedPeers []ConnectedPeer
for _, connectedPeer := range m.server.PeersInfo() {
connectedPeers = append(connectedPeers, ConnectedPeer{
// This is a bit fragile, but should work
UniqueID: strings.TrimSuffix(connectedPeer.Enode, "?discport=0"),
})
}
return connectedPeers
}
func (m *Messenger) penalizeMailserver(id string) {
m.mailPeersMutex.Lock()
defer m.mailPeersMutex.Unlock()
pInfo, ok := m.mailserverCycle.peers[id]
if !ok {
pInfo.status = disconnected
}
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
m.mailserverCycle.peers[id] = pInfo
}
func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) error {
m.logger.Debug("mailserver cycle event",
zap.Any("connected", connectedPeers),
zap.Any("peer-info", m.mailserverCycle.peers))
m.mailPeersMutex.Lock()
for pID, pInfo := range m.mailserverCycle.peers {
if pInfo.status == disconnected {
continue
}
// Removing disconnected
found := false
for _, connectedPeer := range connectedPeers {
id, err := m.mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers)
if err != nil {
m.logger.Error("failed to convert id to hex", zap.Error(err))
return err
}
if pID == id {
found = true
break
}
}
if !found && (pInfo.status == connected || (pInfo.status == connecting && pInfo.lastConnectionAttempt.Add(8*time.Second).Before(time.Now()))) {
m.logger.Info("peer disconnected", zap.String("peer", pID))
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
}
m.mailserverCycle.peers[pID] = pInfo
}
m.mailPeersMutex.Unlock()
// Only evaluate connected peers once a mailserver has been set
// otherwise, we would attempt to retrieve history and end up with a mailserver
// not available error
if m.mailserverCycle.activeMailserver != nil {
for _, connectedPeer := range connectedPeers {
id, err := m.mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers)
if err != nil {
m.logger.Error("failed to convert id to hex", zap.Error(err))
return err
}
if id == "" {
continue
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[id]
if !ok || pInfo.status != connected {
m.logger.Info("peer connected", zap.String("peer", connectedPeer.UniqueID))
pInfo.status = connected
if pInfo.canConnectAfter.Before(time.Now()) {
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
}
m.mailserverCycle.peers[id] = pInfo
m.mailPeersMutex.Unlock()
if id == m.mailserverCycle.activeMailserver.ID {
m.mailserverCycle.activeMailserver.FailedRequests = 0
m.logger.Info("mailserver available", zap.String("address", connectedPeer.UniqueID))
m.EmitMailserverAvailable()
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
}
// Query mailserver
go func() {
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(false) })
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
}
}()
} else {
m.mailPeersMutex.Unlock()
}
}
}
// Check whether we want to disconnect the mailserver
if m.mailserverCycle.activeMailserver != nil {
if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests {
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
signal.SendMailserverNotWorking()
m.logger.Info("connecting too many failed requests")
m.mailserverCycle.activeMailserver.FailedRequests = 0
return m.connectToNewMailserverAndWait()
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID]
m.mailPeersMutex.Unlock()
if ok {
if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(20*time.Second).Before(time.Now()) {
m.logger.Info("penalizing mailserver & disconnecting connecting", zap.String("id", m.mailserverCycle.activeMailserver.ID))
signal.SendMailserverNotWorking()
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
m.disconnectActiveMailserver()
}
}
} else {
m.cycleMailservers()
}
m.logger.Debug("updated-peers", zap.Any("peers", m.mailserverCycle.peers))
return nil
}
func (m *Messenger) updateWakuV1PeerStatus() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.handleMailserverCycleEvent(m.mailserverPeersInfo())
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
continue
}
ms := m.getActiveMailserver()
if ms != nil {
node, err := ms.Enode()
if err != nil {
m.logger.Error("failed to parse enode", zap.Error(err))
continue
}
m.server.AddPeer(node)
if err := m.peerStore.Update([]*enode.Node{node}); err != nil {
m.logger.Error("failed to update peers", zap.Error(err))
continue
}
}
case <-m.mailserverCycle.events:
err := m.handleMailserverCycleEvent(m.mailserverPeersInfo())
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
return
}
case <-m.quit:
close(m.mailserverCycle.events)
m.mailserverCycle.subscription.Unsubscribe()
return
}
}
}
func (m *Messenger) verifyStorenodeStatus() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.disconnectStorenodeIfRequired()
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
continue
}
case <-m.quit:
return
}
}
}
func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
fleet, err := m.getFleet()
if err != nil {
return nil, err
}
pinnedMailservers, err := m.settings.GetPinnedMailservers()
if err != nil {
return nil, err
}
pinnedMailserver, ok := pinnedMailservers[fleet]
if !ok {
return nil, nil
}
fleetMailservers := mailservers.DefaultMailservers()
for _, c := range fleetMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
return &c, nil
}
}
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return nil, err
}
for _, c := range customMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
c.Version = m.transport.WakuVersion()
return &c, nil
}
}
}
return nil, nil
}
func (m *Messenger) EmitMailserverAvailable() {
for _, s := range m.mailserverCycle.availabilitySubscriptions {
s <- struct{}{}
close(s)
l := len(m.mailserverCycle.availabilitySubscriptions)
m.mailserverCycle.availabilitySubscriptions = m.mailserverCycle.availabilitySubscriptions[:l-1]
}
}
func (m *Messenger) SubscribeMailserverAvailable() chan struct{} {
c := make(chan struct{})
m.mailserverCycle.availabilitySubscriptions = append(m.mailserverCycle.availabilitySubscriptions, c)
return c
}
func (m *Messenger) disconnectStorenodeIfRequired() error {
m.logger.Debug("wakuV2 storenode status verification")
if m.mailserverCycle.activeMailserver == nil {
// No active storenode, find a new one
m.cycleMailservers()
return nil
}
// Check whether we want to disconnect the active storenode
if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests {
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
signal.SendMailserverNotWorking()
m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.UniqueID()))
m.mailserverCycle.activeMailserver.FailedRequests = 0
return m.connectToNewMailserverAndWait()
}
return nil
}