mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-12 06:44:25 +00:00
225 lines
6.3 KiB
Go
225 lines
6.3 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
|
"github.com/waku-org/go-waku/logging"
|
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/exp/maps"
|
|
)
|
|
|
|
const maxAllowedPingFailures = 2
|
|
|
|
// If the difference between the last time the keep alive code was executed and now is greater
|
|
// than sleepDectectionIntervalFactor * keepAlivePeriod, force the ping verification to disconnect
|
|
// the peers if they don't reply back
|
|
const sleepDetectionIntervalFactor = 3
|
|
|
|
const maxPeersToPingPerProtocol = 10
|
|
|
|
const maxAllowedSubsequentPingFailures = 2
|
|
|
|
func disconnectAllPeers(host host.Host, logger *zap.Logger) {
|
|
for _, p := range host.Network().Peers() {
|
|
err := host.Network().ClosePeer(p)
|
|
if err != nil {
|
|
logger.Debug("closing conn to peer", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// startKeepAlive creates a go routine that periodically pings connected peers.
|
|
// This is necessary because TCP connections are automatically closed due to inactivity,
|
|
// and doing a ping will avoid this (with a small bandwidth cost)
|
|
func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) {
|
|
defer utils.LogOnPanic()
|
|
defer w.wg.Done()
|
|
|
|
if !w.opts.enableRelay {
|
|
return
|
|
}
|
|
|
|
w.log.Info("setting up ping protocol", zap.Duration("randomPeersPingDuration", randomPeersPingDuration), zap.Duration("allPeersPingDuration", allPeersPingDuration))
|
|
|
|
randomPeersTickerC := make(<-chan time.Time)
|
|
if randomPeersPingDuration != 0 {
|
|
randomPeersTicker := time.NewTicker(randomPeersPingDuration)
|
|
defer randomPeersTicker.Stop()
|
|
randomPeersTickerC = randomPeersTicker.C
|
|
}
|
|
|
|
allPeersTickerC := make(<-chan time.Time)
|
|
if allPeersPingDuration != 0 {
|
|
allPeersTicker := time.NewTicker(allPeersPingDuration)
|
|
defer allPeersTicker.Stop()
|
|
allPeersTickerC = allPeersTicker.C
|
|
}
|
|
|
|
lastTimeExecuted := w.timesource.Now()
|
|
|
|
sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor
|
|
|
|
var iterationFailure int
|
|
for {
|
|
peersToPing := []peer.ID{}
|
|
|
|
select {
|
|
case <-allPeersTickerC:
|
|
if w.opts.enableRelay {
|
|
relayPeersSet := make(map[peer.ID]struct{})
|
|
for _, t := range w.Relay().Topics() {
|
|
for _, p := range w.Relay().PubSub().ListPeers(t) {
|
|
relayPeersSet[p] = struct{}{}
|
|
}
|
|
}
|
|
peersToPing = append(peersToPing, maps.Keys(relayPeersSet)...)
|
|
}
|
|
|
|
case <-randomPeersTickerC:
|
|
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
|
|
if difference > sleepDetectionInterval {
|
|
lastTimeExecuted = w.timesource.Now()
|
|
w.log.Warn("keep alive hasnt been executed recently. Killing all connections")
|
|
disconnectAllPeers(w.host, w.log)
|
|
continue
|
|
} else if iterationFailure >= maxAllowedSubsequentPingFailures {
|
|
iterationFailure = 0
|
|
w.log.Warn("Pinging random peers failed, node is likely disconnected. Killing all connections")
|
|
disconnectAllPeers(w.host, w.log)
|
|
continue
|
|
}
|
|
|
|
if w.opts.enableRelay {
|
|
// Priorize mesh peers
|
|
meshPeersSet := make(map[peer.ID]struct{})
|
|
for _, t := range w.Relay().Topics() {
|
|
for _, p := range w.Relay().PubSub().MeshPeers(t) {
|
|
meshPeersSet[p] = struct{}{}
|
|
}
|
|
}
|
|
peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...)
|
|
|
|
// Ping also some random relay peers
|
|
if maxPeersToPingPerProtocol-len(peersToPing) > 0 {
|
|
relayPeersSet := make(map[peer.ID]struct{})
|
|
for _, t := range w.Relay().Topics() {
|
|
for _, p := range w.Relay().PubSub().ListPeers(t) {
|
|
if _, ok := meshPeersSet[p]; !ok {
|
|
relayPeersSet[p] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
relayPeers := maps.Keys(relayPeersSet)
|
|
rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] })
|
|
|
|
peerLen := maxPeersToPingPerProtocol - len(peersToPing)
|
|
if peerLen > len(relayPeers) {
|
|
peerLen = len(relayPeers)
|
|
}
|
|
peersToPing = append(peersToPing, relayPeers[0:peerLen]...)
|
|
}
|
|
}
|
|
|
|
if w.opts.enableFilterLightNode {
|
|
// We also ping all filter nodes
|
|
filterPeersSet := make(map[peer.ID]struct{})
|
|
for _, s := range w.FilterLightnode().Subscriptions() {
|
|
filterPeersSet[s.PeerID] = struct{}{}
|
|
}
|
|
peersToPing = append(peersToPing, maps.Keys(filterPeersSet)...)
|
|
}
|
|
case <-ctx.Done():
|
|
w.log.Info("stopping ping protocol")
|
|
return
|
|
}
|
|
|
|
pingWg := sync.WaitGroup{}
|
|
pingWg.Add(len(peersToPing))
|
|
pingResultChan := make(chan bool, len(peersToPing))
|
|
for _, p := range peersToPing {
|
|
go w.pingPeer(ctx, &pingWg, p, pingResultChan)
|
|
}
|
|
pingWg.Wait()
|
|
close(pingResultChan)
|
|
|
|
failureCounter := 0
|
|
for couldPing := range pingResultChan {
|
|
if !couldPing {
|
|
failureCounter++
|
|
}
|
|
}
|
|
|
|
if len(peersToPing) > 0 && failureCounter == len(peersToPing) {
|
|
iterationFailure++
|
|
} else {
|
|
iterationFailure = 0
|
|
}
|
|
|
|
lastTimeExecuted = w.timesource.Now()
|
|
}
|
|
}
|
|
|
|
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) {
|
|
defer utils.LogOnPanic()
|
|
defer wg.Done()
|
|
|
|
logger := w.log.With(logging.HostID("peer", peerID))
|
|
|
|
for i := 0; i < maxAllowedPingFailures; i++ {
|
|
if w.host.Network().Connectedness(peerID) != network.Connected {
|
|
// Peer is no longer connected. No need to ping
|
|
resultChan <- false
|
|
return
|
|
}
|
|
|
|
logger.Debug("pinging")
|
|
|
|
if w.tryPing(ctx, peerID, logger) {
|
|
resultChan <- true
|
|
return
|
|
}
|
|
}
|
|
|
|
if w.host.Network().Connectedness(peerID) != network.Connected {
|
|
resultChan <- false
|
|
return
|
|
}
|
|
|
|
logger.Info("disconnecting dead peer")
|
|
if err := w.host.Network().ClosePeer(peerID); err != nil {
|
|
logger.Debug("closing conn to peer", zap.Error(err))
|
|
}
|
|
|
|
resultChan <- false
|
|
}
|
|
|
|
func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool {
|
|
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
|
|
defer cancel()
|
|
|
|
pr := ping.Ping(ctx, w.host, peerID)
|
|
select {
|
|
case res := <-pr:
|
|
if res.Error != nil {
|
|
logger.Debug("could not ping", zap.Error(res.Error))
|
|
return false
|
|
}
|
|
case <-ctx.Done():
|
|
if !errors.Is(ctx.Err(), context.Canceled) {
|
|
logger.Debug("could not ping (context)", zap.Error(ctx.Err()))
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|