status-go/protocol/messenger_mailserver_cycle.go
Siddarth Kumar 1551c98f96
chore_: override DNS for iOS as well (#5093)
We need to apply the DNS override hack for iOS as well.

This PR -> https://github.com/status-im/status-mobile/pull/19798 in status-mobile fixes the DNS resolution issue for
iOS ARM Simulators and for Android x86 Emulators.
But we still need this change to ensure storenodes are found on real iOS devices and on Android arm Emulators.
2024-04-26 14:14:17 +02:00

821 lines
22 KiB
Go

package protocol
import (
"context"
"crypto/rand"
"fmt"
"math"
"math/big"
"net"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/storenodes"
"github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/signal"
)
const defaultBackoff = 10 * time.Second
const graylistBackoff = 3 * time.Minute
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
const findNearestMailServer = !isAndroidEmulator
const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios"
const bootstrapDNS = "8.8.8.8:53"
func (m *Messenger) mailserversByFleet(fleet string) []mailservers.Mailserver {
return mailservers.DefaultMailserversByFleet(fleet)
}
type byRTTMsAndCanConnectBefore []SortedMailserver
func (s byRTTMsAndCanConnectBefore) Len() int {
return len(s)
}
func (s byRTTMsAndCanConnectBefore) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s byRTTMsAndCanConnectBefore) Less(i, j int) bool {
// Slightly inaccurate as time sensitive sorting, but it does not matter so much
now := time.Now()
if s[i].CanConnectAfter.Before(now) && s[j].CanConnectAfter.Before(now) {
return s[i].RTTMs < s[j].RTTMs
}
return s[i].CanConnectAfter.Before(s[j].CanConnectAfter)
}
func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) error {
m.mailserverCycle.allMailservers = mailservers
version := m.transport.WakuVersion()
switch version {
case 1:
if m.server == nil {
m.logger.Warn("not starting mailserver cycle: no p2p server is set")
return nil
}
m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20)
m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events)
go m.updateWakuV1PeerStatus()
case 2:
if len(mailservers) == 0 {
m.logger.Warn("not starting mailserver cycle: empty mailservers list")
return nil
}
for _, storenode := range mailservers {
_, err := m.transport.AddStorePeer(storenode.Address)
if err != nil {
return err
}
}
go m.verifyStorenodeStatus()
default:
return fmt.Errorf("unsupported waku version: %d", version)
}
m.logger.Debug("starting mailserver cycle",
zap.Uint("WakuVersion", m.transport.WakuVersion()),
zap.Any("mailservers", mailservers),
)
return nil
}
func (m *Messenger) DisconnectActiveMailserver() {
m.mailserverCycle.Lock()
defer m.mailserverCycle.Unlock()
m.disconnectActiveMailserver()
}
func (m *Messenger) disconnectMailserver() error {
if m.mailserverCycle.activeMailserver == nil {
m.logger.Info("no active mailserver")
return nil
}
m.logger.Info("disconnecting active mailserver", zap.String("nodeID", m.mailserverCycle.activeMailserver.ID))
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID]
if ok {
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = pInfo
} else {
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID] = peerStatus{
status: disconnected,
mailserver: *m.mailserverCycle.activeMailserver,
canConnectAfter: time.Now().Add(graylistBackoff),
}
}
m.mailPeersMutex.Unlock()
// WakuV2 does not keep an active storenode connection
if m.mailserverCycle.activeMailserver.Version == 1 {
node, err := m.mailserverCycle.activeMailserver.Enode()
if err != nil {
return err
}
m.server.RemovePeer(node)
}
m.mailserverCycle.activeMailserver = nil
return nil
}
func (m *Messenger) disconnectActiveMailserver() {
err := m.disconnectMailserver()
if err != nil {
m.logger.Error("failed to disconnect mailserver", zap.Error(err))
}
signal.SendMailserverChanged("", "")
}
func (m *Messenger) cycleMailservers() {
m.logger.Info("Automatically switching mailserver")
if m.mailserverCycle.activeMailserver != nil {
m.disconnectActiveMailserver()
}
err := m.findNewMailserver()
if err != nil {
m.logger.Error("Error getting new mailserver", zap.Error(err))
}
}
func poolSize(fleetSize int) int {
return int(math.Ceil(float64(fleetSize) / 4))
}
func (m *Messenger) getFleet() (string, error) {
var fleet string
dbFleet, err := m.settings.GetFleet()
if err != nil {
return "", err
}
if dbFleet != "" {
fleet = dbFleet
} else if m.config.clusterConfig.Fleet != "" {
fleet = m.config.clusterConfig.Fleet
} else {
fleet = params.FleetProd
}
return fleet, nil
}
func (m *Messenger) allMailservers() ([]mailservers.Mailserver, error) {
// Get configured fleet
fleet, err := m.getFleet()
if err != nil {
return nil, err
}
// Get default mailservers for given fleet
allMailservers := m.mailserversByFleet(fleet)
// Add custom configured mailservers
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return nil, err
}
for _, c := range customMailservers {
if c.Fleet == fleet {
c.Version = m.transport.WakuVersion()
allMailservers = append(allMailservers, c)
}
}
}
// Filter mailservers by configured waku version
wakuVersion := m.transport.WakuVersion()
matchingMailservers := make([]mailservers.Mailserver, 0, len(allMailservers))
for _, ms := range allMailservers {
if ms.Version == wakuVersion {
matchingMailservers = append(matchingMailservers, ms)
}
}
return matchingMailservers, nil
}
type SortedMailserver struct {
Address string
RTTMs int
CanConnectAfter time.Time
}
func (m *Messenger) findNewMailserver() error {
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
if overrideDNS {
var dialer net.Dialer
net.DefaultResolver = &net.Resolver{
PreferGo: false,
Dial: func(context context.Context, _, _ string) (net.Conn, error) {
conn, err := dialer.DialContext(context, "udp", bootstrapDNS)
if err != nil {
return nil, err
}
return conn, nil
},
}
}
pinnedMailserver, err := m.getPinnedMailserver()
if err != nil {
m.logger.Error("Could not obtain the pinned mailserver", zap.Error(err))
return err
}
if pinnedMailserver != nil {
return m.connectToMailserver(*pinnedMailserver)
}
allMailservers := m.mailserverCycle.allMailservers
// TODO: remove this check once sockets are stable on x86_64 emulators
if findNearestMailServer {
m.logger.Info("Finding a new mailserver...")
var mailserverStr []string
for _, m := range allMailservers {
mailserverStr = append(mailserverStr, m.Address)
}
if len(allMailservers) == 0 {
m.logger.Warn("no mailservers available") // Do nothing...
return nil
}
var parseFn func(string) (string, error)
if allMailservers[0].Version == 2 {
parseFn = mailservers.MultiAddressToAddress
} else {
parseFn = mailservers.EnodeStringToAddr
}
pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, parseFn)
if err != nil {
// pinging mailservers might fail, but we don't care
m.logger.Warn("mailservers.DoPing failed with", zap.Error(err))
}
var availableMailservers []*mailservers.PingResult
for _, result := range pingResult {
if result.Err != nil {
m.logger.Info("connecting error", zap.String("err", *result.Err))
continue // The results with error are ignored
}
availableMailservers = append(availableMailservers, result)
}
if len(availableMailservers) == 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
mailserversByAddress := make(map[string]mailservers.Mailserver)
for idx := range allMailservers {
mailserversByAddress[allMailservers[idx].Address] = allMailservers[idx]
}
var sortedMailservers []SortedMailserver
for _, ping := range availableMailservers {
address := ping.Address
ms := mailserversByAddress[address]
sortedMailserver := SortedMailserver{
Address: address,
RTTMs: *ping.RTTMs,
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[ms.ID]
m.mailPeersMutex.Unlock()
if ok {
if time.Now().Before(pInfo.canConnectAfter) {
continue // We can't connect to this node yet
}
}
sortedMailservers = append(sortedMailservers, sortedMailserver)
}
sort.Sort(byRTTMsAndCanConnectBefore(sortedMailservers))
// Picks a random mailserver amongs the ones with the lowest latency
// The pool size is 1/4 of the mailservers were pinged successfully
pSize := poolSize(len(sortedMailservers) - 1)
if pSize <= 0 {
pSize = len(sortedMailservers)
if pSize <= 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
}
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
if err != nil {
return err
}
msPing := sortedMailservers[r.Int64()]
ms := mailserversByAddress[msPing.Address]
m.logger.Info("connecting to mailserver", zap.String("address", ms.Address))
return m.connectToMailserver(ms)
}
mailserversByAddress := make(map[string]mailservers.Mailserver)
for idx := range allMailservers {
mailserversByAddress[allMailservers[idx].Address] = allMailservers[idx]
}
pSize := poolSize(len(allMailservers) - 1)
if pSize <= 0 {
pSize = len(allMailservers)
if pSize <= 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
}
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
if err != nil {
return err
}
msPing := allMailservers[r.Int64()]
ms := mailserversByAddress[msPing.Address]
m.logger.Info("connecting to mailserver", zap.String("address", ms.Address))
return m.connectToMailserver(ms)
}
func (m *Messenger) mailserverStatus(mailserverID string) connStatus {
m.mailPeersMutex.RLock()
defer m.mailPeersMutex.RUnlock()
peer, ok := m.mailserverCycle.peers[mailserverID]
if !ok {
return disconnected
}
return peer.status
}
func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
m.logger.Info("connecting to mailserver", zap.Any("peer", ms.ID))
m.mailserverCycle.activeMailserver = &ms
signal.SendMailserverChanged(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Adding a peer and marking it as connected can't be executed sync in WakuV1, because
// There's a delay between requesting a peer being added, and a signal being
// received after the peer was added. So we first set the peer status as
// Connecting and once a peerConnected signal is received, we mark it as
// Connected
activeMailserverStatus := m.mailserverStatus(ms.ID)
if ms.Version != m.transport.WakuVersion() {
return errors.New("mailserver waku version doesn't match")
}
if activeMailserverStatus != connected {
// WakuV2 does not require having the peer connected to query the peer
// Attempt to connect to mailserver by adding it as a peer
if ms.Version == 1 {
node, err := ms.Enode()
if err != nil {
return err
}
m.server.AddPeer(node)
if err := m.peerStore.Update([]*enode.Node{node}); err != nil {
return err
}
}
connectionStatus := connecting
if ms.Version == 2 {
connectionStatus = connected
}
m.mailPeersMutex.Lock()
m.mailserverCycle.peers[ms.ID] = peerStatus{
status: connectionStatus,
lastConnectionAttempt: time.Now(),
canConnectAfter: time.Now().Add(defaultBackoff),
mailserver: ms,
}
m.mailPeersMutex.Unlock()
if ms.Version == 2 {
m.mailserverCycle.activeMailserver.FailedRequests = 0
m.logger.Info("mailserver available", zap.String("address", m.mailserverCycle.activeMailserver.UniqueID()))
m.EmitMailserverAvailable()
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Query mailserver
if m.config.codeControlFlags.AutoRequestHistoricMessages {
go func() {
_, err := m.performMailserverRequest(&ms, func(_ mailservers.Mailserver) (*MessengerResponse, error) {
return m.RequestAllHistoricMessages(false, false)
})
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
}
}()
}
}
}
return nil
}
// getActiveMailserver returns the active mailserver if a communityID is present then it'll return the mailserver
// for that community if it has a mailserver setup otherwise it'll return the global mailserver
func (m *Messenger) getActiveMailserver(communityID ...string) *mailservers.Mailserver {
if len(communityID) == 0 || communityID[0] == "" {
return m.mailserverCycle.activeMailserver
}
ms, err := m.communityStorenodes.GetStorenodeByCommunnityID(communityID[0])
if err != nil {
if !errors.Is(err, storenodes.ErrNotFound) {
m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err))
}
// if we don't find a specific mailserver for the community, we just use the regular mailserverCycle's one
return m.mailserverCycle.activeMailserver
}
return &ms
}
func (m *Messenger) getActiveMailserverID(communityID ...string) string {
ms := m.getActiveMailserver(communityID...)
if ms == nil {
return ""
}
return ms.ID
}
func (m *Messenger) isMailserverAvailable(mailserverID string) bool {
return m.mailserverStatus(mailserverID) == connected
}
func mailserverAddressToID(uniqueID string, allMailservers []mailservers.Mailserver) (string, error) {
for _, ms := range allMailservers {
if uniqueID == ms.UniqueID() {
return ms.ID, nil
}
}
return "", nil
}
type ConnectedPeer struct {
UniqueID string
}
func (m *Messenger) mailserverPeersInfo() []ConnectedPeer {
var connectedPeers []ConnectedPeer
for _, connectedPeer := range m.server.PeersInfo() {
connectedPeers = append(connectedPeers, ConnectedPeer{
// This is a bit fragile, but should work
UniqueID: strings.TrimSuffix(connectedPeer.Enode, "?discport=0"),
})
}
return connectedPeers
}
func (m *Messenger) penalizeMailserver(id string) {
m.mailPeersMutex.Lock()
defer m.mailPeersMutex.Unlock()
pInfo, ok := m.mailserverCycle.peers[id]
if !ok {
pInfo.status = disconnected
}
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
m.mailserverCycle.peers[id] = pInfo
}
// handleMailserverCycleEvent runs every 1 second or when updating peers to keep the data of the active mailserver updated
func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) error {
m.logger.Debug("mailserver cycle event",
zap.Any("connected", connectedPeers),
zap.Any("peer-info", m.mailserverCycle.peers))
m.mailPeersMutex.Lock()
for pID, pInfo := range m.mailserverCycle.peers {
if pInfo.status == disconnected {
continue
}
// Removing disconnected
found := false
for _, connectedPeer := range connectedPeers {
id, err := mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers)
if err != nil {
m.logger.Error("failed to convert id to hex", zap.Error(err))
return err
}
if pID == id {
found = true
break
}
}
if !found && (pInfo.status == connected || (pInfo.status == connecting && pInfo.lastConnectionAttempt.Add(8*time.Second).Before(time.Now()))) {
m.logger.Info("peer disconnected", zap.String("peer", pID))
pInfo.status = disconnected
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
}
m.mailserverCycle.peers[pID] = pInfo
}
m.mailPeersMutex.Unlock()
// Only evaluate connected peers once a mailserver has been set
// otherwise, we would attempt to retrieve history and end up with a mailserver
// not available error
if m.mailserverCycle.activeMailserver != nil {
for _, connectedPeer := range connectedPeers {
id, err := mailserverAddressToID(connectedPeer.UniqueID, m.mailserverCycle.allMailservers)
if err != nil {
m.logger.Error("failed to convert id to hex", zap.Error(err))
return err
}
if id == "" {
continue
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[id]
if !ok || pInfo.status != connected {
m.logger.Info("peer connected", zap.String("peer", connectedPeer.UniqueID))
pInfo.status = connected
if pInfo.canConnectAfter.Before(time.Now()) {
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
}
m.mailserverCycle.peers[id] = pInfo
m.mailPeersMutex.Unlock()
if id == m.mailserverCycle.activeMailserver.ID {
m.mailserverCycle.activeMailserver.FailedRequests = 0
m.logger.Info("mailserver available", zap.String("address", connectedPeer.UniqueID))
m.EmitMailserverAvailable()
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
}
// Query mailserver
if m.config.codeControlFlags.AutoRequestHistoricMessages {
go func() {
_, err := m.RequestAllHistoricMessages(false, true)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
}
} else {
m.mailPeersMutex.Unlock()
}
}
}
// Check whether we want to disconnect the mailserver
if m.mailserverCycle.activeMailserver != nil {
if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests {
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
signal.SendMailserverNotWorking()
m.logger.Info("connecting too many failed requests")
m.mailserverCycle.activeMailserver.FailedRequests = 0
return m.connectToNewMailserverAndWait()
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID]
m.mailPeersMutex.Unlock()
if ok {
if pInfo.status != connected && pInfo.lastConnectionAttempt.Add(20*time.Second).Before(time.Now()) {
m.logger.Info("penalizing mailserver & disconnecting connecting", zap.String("id", m.mailserverCycle.activeMailserver.ID))
signal.SendMailserverNotWorking()
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
m.disconnectActiveMailserver()
}
}
} else {
m.cycleMailservers()
}
m.logger.Debug("updated-peers", zap.Any("peers", m.mailserverCycle.peers))
return nil
}
func (m *Messenger) asyncRequestAllHistoricMessages() {
m.logger.Debug("asyncRequestAllHistoricMessages")
go func() {
_, err := m.RequestAllHistoricMessages(false, true)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
}
func (m *Messenger) updateWakuV1PeerStatus() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.handleMailserverCycleEvent(m.mailserverPeersInfo())
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
continue
}
ms := m.getActiveMailserver()
if ms != nil {
node, err := ms.Enode()
if err != nil {
m.logger.Error("failed to parse enode", zap.Error(err))
continue
}
m.server.AddPeer(node)
if err := m.peerStore.Update([]*enode.Node{node}); err != nil {
m.logger.Error("failed to update peers", zap.Error(err))
continue
}
}
case <-m.mailserverCycle.events:
err := m.handleMailserverCycleEvent(m.mailserverPeersInfo())
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
return
}
case <-m.quit:
close(m.mailserverCycle.events)
m.mailserverCycle.subscription.Unsubscribe()
return
}
}
}
func (m *Messenger) verifyStorenodeStatus() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.disconnectStorenodeIfRequired()
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
continue
}
case <-m.quit:
return
}
}
}
func (m *Messenger) getPinnedMailserver() (*mailservers.Mailserver, error) {
fleet, err := m.getFleet()
if err != nil {
return nil, err
}
pinnedMailservers, err := m.settings.GetPinnedMailservers()
if err != nil {
return nil, err
}
pinnedMailserver, ok := pinnedMailservers[fleet]
if !ok {
return nil, nil
}
fleetMailservers := mailservers.DefaultMailservers()
for _, c := range fleetMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
return &c, nil
}
}
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return nil, err
}
for _, c := range customMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
c.Version = m.transport.WakuVersion()
return &c, nil
}
}
}
return nil, nil
}
func (m *Messenger) EmitMailserverAvailable() {
for _, s := range m.mailserverCycle.availabilitySubscriptions {
s <- struct{}{}
close(s)
l := len(m.mailserverCycle.availabilitySubscriptions)
m.mailserverCycle.availabilitySubscriptions = m.mailserverCycle.availabilitySubscriptions[:l-1]
}
}
func (m *Messenger) SubscribeMailserverAvailable() chan struct{} {
c := make(chan struct{})
m.mailserverCycle.availabilitySubscriptions = append(m.mailserverCycle.availabilitySubscriptions, c)
return c
}
func (m *Messenger) disconnectStorenodeIfRequired() error {
m.logger.Debug("wakuV2 storenode status verification")
if m.mailserverCycle.activeMailserver == nil {
// No active storenode, find a new one
m.cycleMailservers()
return nil
}
// Check whether we want to disconnect the active storenode
if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests {
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
signal.SendMailserverNotWorking()
m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.UniqueID()))
m.mailserverCycle.activeMailserver.FailedRequests = 0
return m.connectToNewMailserverAndWait()
}
return nil
}
func (m *Messenger) waitForAvailableStoreNode(timeout time.Duration) bool {
// Add 1 second to timeout, because the mailserver cycle has 1 second ticker, which doesn't tick on start.
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
timeout += time.Second
finish := make(chan struct{})
cancel := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
for !m.isMailserverAvailable(m.getActiveMailserverID()) {
select {
case <-m.SubscribeMailserverAvailable():
case <-cancel:
return
}
}
}()
go func() {
defer func() {
close(finish)
}()
wg.Wait()
}()
select {
case <-finish:
case <-time.After(timeout):
close(cancel)
case <-m.ctx.Done():
close(cancel)
}
return m.isMailserverAvailable(m.getActiveMailserverID())
}