mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-10 12:36:35 +00:00
526 lines
14 KiB
Go
526 lines
14 KiB
Go
package history
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"net"
|
|
"net/http"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/waku-org/go-waku/waku/v2/api/common"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const defaultBackoff = 10 * time.Second
|
|
const graylistBackoff = 3 * time.Minute
|
|
const storenodeVerificationInterval = time.Second
|
|
const storenodeMaxFailedRequests uint = 2
|
|
const minStorenodesToChooseFrom = 3
|
|
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
|
|
const findNearestMailServer = !isAndroidEmulator
|
|
const overrideDNS = runtime.GOOS == "android" || runtime.GOOS == "ios"
|
|
const bootstrapDNS = "8.8.8.8:53"
|
|
|
|
type connStatus int
|
|
|
|
const (
|
|
disconnected connStatus = iota + 1
|
|
connected
|
|
)
|
|
|
|
type peerStatus struct {
|
|
status connStatus
|
|
canConnectAfter time.Time
|
|
lastConnectionAttempt time.Time
|
|
}
|
|
|
|
type StorenodeConfigProvider interface {
|
|
UseStorenodes() (bool, error)
|
|
GetPinnedStorenode() (peer.AddrInfo, error)
|
|
Storenodes() ([]peer.AddrInfo, error)
|
|
}
|
|
|
|
type StorenodeCycle struct {
|
|
sync.RWMutex
|
|
|
|
logger *zap.Logger
|
|
|
|
storenodeConfigProvider StorenodeConfigProvider
|
|
pinger common.Pinger
|
|
|
|
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
|
|
StorenodeChangedEmitter *Emitter[peer.ID]
|
|
StorenodeNotWorkingEmitter *Emitter[struct{}]
|
|
StorenodeAvailableEmitter *Emitter[peer.ID]
|
|
|
|
failedRequests map[peer.ID]uint
|
|
|
|
peersMutex sync.RWMutex
|
|
activeStorenode peer.ID
|
|
peers map[peer.ID]peerStatus
|
|
}
|
|
|
|
func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle {
|
|
return &StorenodeCycle{
|
|
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
|
|
StorenodeChangedEmitter: NewEmitter[peer.ID](),
|
|
StorenodeNotWorkingEmitter: NewEmitter[struct{}](),
|
|
StorenodeAvailableEmitter: NewEmitter[peer.ID](),
|
|
logger: logger.Named("storenode-cycle"),
|
|
}
|
|
}
|
|
|
|
func (m *StorenodeCycle) Start(ctx context.Context) {
|
|
m.logger.Debug("starting storenode cycle")
|
|
m.failedRequests = make(map[peer.ID]uint)
|
|
m.peers = make(map[peer.ID]peerStatus)
|
|
|
|
go m.verifyStorenodeStatus(ctx)
|
|
}
|
|
|
|
func (m *StorenodeCycle) DisconnectActiveStorenode(backoff time.Duration) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
m.disconnectActiveStorenode(backoff)
|
|
}
|
|
|
|
func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error {
|
|
// Handle pinned storenodes
|
|
m.logger.Info("disconnecting storenode")
|
|
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
|
if err != nil {
|
|
m.logger.Error("could not obtain the pinned storenode", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// If no pinned storenode, no need to disconnect and wait for it to be available
|
|
if pinnedStorenode.ID == "" {
|
|
m.disconnectActiveStorenode(graylistBackoff)
|
|
}
|
|
|
|
return m.findNewStorenode(ctx)
|
|
}
|
|
|
|
func (m *StorenodeCycle) disconnectStorenode(backoffDuration time.Duration) error {
|
|
if m.activeStorenode == "" {
|
|
m.logger.Info("no active storenode")
|
|
return nil
|
|
}
|
|
|
|
m.logger.Info("disconnecting active storenode", zap.Stringer("peerID", m.activeStorenode))
|
|
|
|
m.peersMutex.Lock()
|
|
pInfo, ok := m.peers[m.activeStorenode]
|
|
if ok {
|
|
pInfo.status = disconnected
|
|
pInfo.canConnectAfter = time.Now().Add(backoffDuration)
|
|
m.peers[m.activeStorenode] = pInfo
|
|
} else {
|
|
m.peers[m.activeStorenode] = peerStatus{
|
|
status: disconnected,
|
|
canConnectAfter: time.Now().Add(backoffDuration),
|
|
}
|
|
}
|
|
m.peersMutex.Unlock()
|
|
|
|
m.activeStorenode = ""
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *StorenodeCycle) disconnectActiveStorenode(backoffDuration time.Duration) {
|
|
err := m.disconnectStorenode(backoffDuration)
|
|
if err != nil {
|
|
m.logger.Error("failed to disconnect storenode", zap.Error(err))
|
|
}
|
|
|
|
m.StorenodeChangedEmitter.Emit("")
|
|
}
|
|
|
|
func (m *StorenodeCycle) Cycle(ctx context.Context) {
|
|
if m.storenodeConfigProvider == nil {
|
|
m.logger.Debug("storenodeConfigProvider not yet setup")
|
|
return
|
|
}
|
|
|
|
m.logger.Info("Automatically switching storenode")
|
|
|
|
if m.activeStorenode != "" {
|
|
m.disconnectActiveStorenode(graylistBackoff)
|
|
}
|
|
|
|
useStorenode, err := m.storenodeConfigProvider.UseStorenodes()
|
|
if err != nil {
|
|
m.logger.Error("failed to get use storenodes", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if !useStorenode {
|
|
m.logger.Info("Skipping storenode search due to useStorenode being false")
|
|
return
|
|
}
|
|
|
|
err = m.findNewStorenode(ctx)
|
|
if err != nil {
|
|
m.logger.Error("Error getting new storenode", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func poolSize(fleetSize int) int {
|
|
return int(math.Ceil(float64(fleetSize) / 4))
|
|
}
|
|
|
|
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.AddrInfo) []peer.AddrInfo {
|
|
peerIDToInfo := make(map[peer.ID]peer.AddrInfo)
|
|
for _, p := range allStorenodes {
|
|
peerIDToInfo[p.ID] = p
|
|
}
|
|
|
|
availableStorenodes := make(map[peer.ID]time.Duration)
|
|
availableStorenodesMutex := sync.Mutex{}
|
|
availableStorenodesWg := sync.WaitGroup{}
|
|
for _, storenode := range allStorenodes {
|
|
availableStorenodesWg.Add(1)
|
|
go func(peerInfo peer.AddrInfo) {
|
|
defer availableStorenodesWg.Done()
|
|
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
|
defer cancel()
|
|
|
|
rtt, err := m.pinger.PingPeer(ctx, peerInfo)
|
|
if err == nil { // pinging storenodes might fail, but we don't care
|
|
availableStorenodesMutex.Lock()
|
|
availableStorenodes[peerInfo.ID] = rtt
|
|
availableStorenodesMutex.Unlock()
|
|
}
|
|
}(storenode)
|
|
}
|
|
availableStorenodesWg.Wait()
|
|
|
|
if len(availableStorenodes) == 0 {
|
|
m.logger.Warn("No storenodes available") // Do nothing..
|
|
return nil
|
|
}
|
|
|
|
var sortedStorenodes []SortedStorenode
|
|
for storenodeID, rtt := range availableStorenodes {
|
|
sortedStorenode := SortedStorenode{
|
|
Storenode: peerIDToInfo[storenodeID],
|
|
RTT: rtt,
|
|
}
|
|
m.peersMutex.Lock()
|
|
pInfo, ok := m.peers[storenodeID]
|
|
m.peersMutex.Unlock()
|
|
if ok && time.Now().Before(pInfo.canConnectAfter) {
|
|
continue // We can't connect to this node yet
|
|
}
|
|
sortedStorenodes = append(sortedStorenodes, sortedStorenode)
|
|
}
|
|
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))
|
|
|
|
result := make([]peer.AddrInfo, len(sortedStorenodes))
|
|
for i, s := range sortedStorenodes {
|
|
result[i] = s.Storenode
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
|
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
|
if overrideDNS {
|
|
var dialer net.Dialer
|
|
net.DefaultResolver = &net.Resolver{
|
|
PreferGo: false,
|
|
Dial: func(context context.Context, _, _ string) (net.Conn, error) {
|
|
conn, err := dialer.DialContext(context, "udp", bootstrapDNS)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
},
|
|
}
|
|
}
|
|
|
|
pinnedStorenode, err := m.storenodeConfigProvider.GetPinnedStorenode()
|
|
if err != nil {
|
|
m.logger.Error("Could not obtain the pinned storenode", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if pinnedStorenode.ID != "" {
|
|
return m.setActiveStorenode(pinnedStorenode.ID)
|
|
}
|
|
|
|
m.logger.Info("Finding a new storenode..")
|
|
|
|
allStorenodes, err := m.storenodeConfigProvider.Storenodes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: remove this check once sockets are stable on x86_64 emulators
|
|
if findNearestMailServer {
|
|
allStorenodes = m.getAvailableStorenodesSortedByRTT(ctx, allStorenodes)
|
|
}
|
|
|
|
// Picks a random storenode amongs the ones with the lowest latency
|
|
// The pool size is 1/4 of the storenodes were pinged successfully
|
|
// If the pool size is less than `minStorenodesToChooseFrom`, it will
|
|
// pick a storenode fromm all the available storenodes
|
|
pSize := poolSize(len(allStorenodes) - 1)
|
|
if pSize <= minStorenodesToChooseFrom {
|
|
pSize = len(allStorenodes)
|
|
if pSize <= 0 {
|
|
m.logger.Warn("No storenodes available") // Do nothing..
|
|
return nil
|
|
}
|
|
}
|
|
|
|
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ms := allStorenodes[r.Int64()]
|
|
return m.setActiveStorenode(ms.ID)
|
|
}
|
|
|
|
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
|
|
m.peersMutex.RLock()
|
|
defer m.peersMutex.RUnlock()
|
|
|
|
peer, ok := m.peers[peerID]
|
|
if !ok {
|
|
return disconnected
|
|
}
|
|
return peer.status
|
|
}
|
|
|
|
func (m *StorenodeCycle) setActiveStorenode(peerID peer.ID) error {
|
|
m.activeStorenode = peerID
|
|
|
|
m.StorenodeChangedEmitter.Emit(m.activeStorenode)
|
|
|
|
storenodeStatus := m.storenodeStatus(peerID)
|
|
if storenodeStatus != connected {
|
|
m.peersMutex.Lock()
|
|
m.peers[peerID] = peerStatus{
|
|
status: connected,
|
|
lastConnectionAttempt: time.Now(),
|
|
canConnectAfter: time.Now().Add(defaultBackoff),
|
|
}
|
|
m.peersMutex.Unlock()
|
|
|
|
m.failedRequests[peerID] = 0
|
|
m.logger.Info("storenode available", zap.Stringer("peerID", m.activeStorenode))
|
|
|
|
m.StorenodeAvailableOneshotEmitter.Emit(struct{}{}) // Maybe can be refactored away?
|
|
m.StorenodeAvailableEmitter.Emit(m.activeStorenode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
return m.activeStorenode
|
|
}
|
|
|
|
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
|
return m.storenodeStatus(peerID) == connected
|
|
}
|
|
|
|
func (m *StorenodeCycle) penalizeStorenode(id peer.ID) {
|
|
m.peersMutex.Lock()
|
|
defer m.peersMutex.Unlock()
|
|
pInfo, ok := m.peers[id]
|
|
if !ok {
|
|
pInfo.status = disconnected
|
|
}
|
|
|
|
pInfo.canConnectAfter = time.Now().Add(graylistBackoff)
|
|
m.peers[id] = pInfo
|
|
}
|
|
|
|
func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) {
|
|
ticker := time.NewTicker(storenodeVerificationInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
err := m.disconnectStorenodeIfRequired(ctx)
|
|
if err != nil {
|
|
m.logger.Error("failed to handle storenode cycle event", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *StorenodeCycle) disconnectStorenodeIfRequired(ctx context.Context) error {
|
|
m.logger.Debug("wakuV2 storenode status verification")
|
|
|
|
if m.activeStorenode == "" {
|
|
// No active storenode, find a new one
|
|
m.Cycle(ctx)
|
|
return nil
|
|
}
|
|
|
|
// Check whether we want to disconnect the active storenode
|
|
if m.failedRequests[m.activeStorenode] >= storenodeMaxFailedRequests {
|
|
m.penalizeStorenode(m.activeStorenode)
|
|
m.StorenodeNotWorkingEmitter.Emit(struct{}{})
|
|
|
|
m.logger.Info("too many failed requests", zap.Stringer("storenode", m.activeStorenode))
|
|
m.failedRequests[m.activeStorenode] = 0
|
|
return m.connectToNewStorenodeAndWait(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProvider) {
|
|
m.storenodeConfigProvider = provider
|
|
}
|
|
|
|
func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context) bool {
|
|
// Note: Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
|
|
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
|
|
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for !m.IsStorenodeAvailable(m.activeStorenode) {
|
|
select {
|
|
case <-m.StorenodeAvailableOneshotEmitter.Subscribe():
|
|
case <-ctx.Done():
|
|
if errors.Is(ctx.Err(), context.Canceled) {
|
|
return
|
|
}
|
|
|
|
// Wait for an additional second, but handle cancellation
|
|
select {
|
|
case <-time.After(1 * time.Second):
|
|
case <-ctx.Done(): // context was cancelled
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-waitForWaitGroup(&wg):
|
|
case <-ctx.Done():
|
|
// Wait for an additional second, but handle cancellation
|
|
select {
|
|
case <-time.After(1 * time.Second):
|
|
case <-ctx.Done(): // context was cancelled o
|
|
}
|
|
}
|
|
|
|
return m.IsStorenodeAvailable(m.activeStorenode)
|
|
}
|
|
|
|
func waitForWaitGroup(wg *sync.WaitGroup) <-chan struct{} {
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(ch)
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
type storenodeTaskParameters struct {
|
|
customPeerID peer.ID
|
|
}
|
|
|
|
type StorenodeTaskOption func(*storenodeTaskParameters)
|
|
|
|
func WithPeerID(peerID peer.ID) StorenodeTaskOption {
|
|
return func(stp *storenodeTaskParameters) {
|
|
stp.customPeerID = peerID
|
|
}
|
|
}
|
|
|
|
func (m *StorenodeCycle) PerformStorenodeTask(fn func() error, options ...StorenodeTaskOption) error {
|
|
params := storenodeTaskParameters{}
|
|
for _, opt := range options {
|
|
opt(¶ms)
|
|
}
|
|
|
|
peerID := params.customPeerID
|
|
if peerID == "" {
|
|
peerID = m.GetActiveStorenode()
|
|
}
|
|
|
|
if peerID == "" {
|
|
return errors.New("storenode not available")
|
|
}
|
|
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
var tries uint = 0
|
|
for tries < storenodeMaxFailedRequests {
|
|
if params.customPeerID == "" && m.storenodeStatus(peerID) != connected {
|
|
return errors.New("storenode not available")
|
|
}
|
|
m.logger.Info("trying performing history requests", zap.Uint("try", tries), zap.Stringer("peerID", peerID))
|
|
|
|
// Peform request
|
|
err := fn()
|
|
if err == nil {
|
|
// Reset failed requests
|
|
m.logger.Debug("history request performed successfully", zap.Stringer("peerID", peerID))
|
|
m.failedRequests[peerID] = 0
|
|
return nil
|
|
}
|
|
|
|
m.logger.Error("failed to perform history request",
|
|
zap.Stringer("peerID", peerID),
|
|
zap.Uint("tries", tries),
|
|
zap.Error(err),
|
|
)
|
|
|
|
tries++
|
|
|
|
if storeErr, ok := err.(*store.StoreError); ok {
|
|
if storeErr.Code == http.StatusTooManyRequests {
|
|
m.disconnectActiveStorenode(defaultBackoff)
|
|
return fmt.Errorf("ratelimited at storenode %s: %w", peerID, err)
|
|
}
|
|
}
|
|
|
|
// Increment failed requests
|
|
m.failedRequests[peerID]++
|
|
|
|
// Change storenode
|
|
if m.failedRequests[peerID] >= storenodeMaxFailedRequests {
|
|
return errors.New("too many failed requests")
|
|
}
|
|
// Wait a couple of second not to spam
|
|
time.Sleep(2 * time.Second)
|
|
|
|
}
|
|
return errors.New("failed to perform history request")
|
|
}
|