refactor: ping a subset of connected peers (#1148)

This commit is contained in:
richΛrd 2024-07-11 12:02:52 -04:00 committed by GitHub
parent 3b0c8e9207
commit 9412af28dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 135 additions and 65 deletions

View File

@ -134,7 +134,7 @@ func Execute(options NodeOptions) error {
node.WithLogLevel(lvl), node.WithLogLevel(lvl),
node.WithPrivateKey(prvKey), node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr), node.WithHostAddress(hostAddr),
node.WithKeepAlive(options.KeepAlive), node.WithKeepAlive(10*time.Second, options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections), node.WithMaxPeerConnections(options.MaxPeerConnections),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer), node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
node.WithPeerStoreCapacity(options.PeerStoreCapacity), node.WithPeerStoreCapacity(options.PeerStoreCapacity),

View File

@ -163,7 +163,7 @@ func NewNode(instance *WakuInstance, configJSON string) error {
opts := []node.WakuNodeOption{ opts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey), node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr), node.WithHostAddress(hostAddr),
node.WithKeepAlive(time.Duration(*config.KeepAliveInterval) * time.Second), node.WithKeepAlive(10*time.Second, time.Duration(*config.KeepAliveInterval)*time.Second),
} }
if *config.EnableRelay { if *config.EnableRelay {

View File

@ -2,6 +2,8 @@ package node
import ( import (
"context" "context"
"errors"
"math/rand"
"sync" "sync"
"time" "time"
@ -10,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps"
) )
const maxAllowedPingFailures = 2 const maxAllowedPingFailures = 2
@ -19,86 +22,155 @@ const maxAllowedPingFailures = 2
// the peers if they don't reply back // the peers if they don't reply back
const sleepDetectionIntervalFactor = 3 const sleepDetectionIntervalFactor = 3
const maxPeersToPing = 10
// startKeepAlive creates a go routine that periodically pings connected peers. // startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity, // This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost) // and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) {
defer w.wg.Done() defer w.wg.Done()
w.log.Info("setting up ping protocol", zap.Duration("duration", t))
ticker := time.NewTicker(t) if !w.opts.enableRelay {
defer ticker.Stop() return
}
w.log.Info("setting up ping protocol", zap.Duration("randomPeersPingDuration", randomPeersPingDuration), zap.Duration("allPeersPingDuration", allPeersPingDuration))
randomPeersTickerC := make(<-chan time.Time)
if randomPeersPingDuration != 0 {
randomPeersTicker := time.NewTicker(randomPeersPingDuration)
defer randomPeersTicker.Stop()
randomPeersTickerC = randomPeersTicker.C
}
allPeersTickerC := make(<-chan time.Time)
if randomPeersPingDuration != 0 {
allPeersTicker := time.NewTicker(randomPeersPingDuration)
defer allPeersTicker.Stop()
randomPeersTickerC = allPeersTicker.C
}
lastTimeExecuted := w.timesource.Now() lastTimeExecuted := w.timesource.Now()
sleepDetectionInterval := int64(t) * sleepDetectionIntervalFactor sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor
for { for {
peersToPing := []peer.ID{}
select { select {
case <-ticker.C: case <-allPeersTickerC:
relayPeersSet := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().ListPeers(t) {
relayPeersSet[p] = struct{}{}
}
}
peersToPing = maps.Keys(relayPeersSet)
case <-randomPeersTickerC:
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
forceDisconnectOnPingFailure := false
if difference > sleepDetectionInterval { if difference > sleepDetectionInterval {
forceDisconnectOnPingFailure = true
lastTimeExecuted = w.timesource.Now() lastTimeExecuted = w.timesource.Now()
w.log.Warn("keep alive hasnt been executed recently. Killing connections to peers if ping fails") w.log.Warn("keep alive hasnt been executed recently. Killing all connections")
for _, p := range w.host.Network().Peers() {
err := w.host.Network().ClosePeer(p)
if err != nil {
w.log.Debug("closing conn to peer", zap.Error(err))
}
}
continue continue
} }
// Network's peers collection, // Priorize mesh peers
// contains only currently active peers meshPeersSet := make(map[peer.ID]struct{})
pingWg := sync.WaitGroup{} for _, t := range w.Relay().Topics() {
peersToPing := w.host.Network().Peers() for _, p := range w.Relay().PubSub().MeshPeers(t) {
pingWg.Add(len(peersToPing)) meshPeersSet[p] = struct{}{}
for _, p := range peersToPing {
if p != w.host.ID() {
go w.pingPeer(ctx, &pingWg, p, forceDisconnectOnPingFailure)
} }
} }
pingWg.Wait() peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...)
// Ping also some random relay peers
if maxPeersToPing-len(peersToPing) > 0 {
relayPeersSet := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().ListPeers(t) {
if _, ok := meshPeersSet[p]; !ok {
relayPeersSet[p] = struct{}{}
}
}
}
relayPeers := maps.Keys(relayPeersSet)
rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] })
peerLen := maxPeersToPing - len(peersToPing)
if peerLen > len(relayPeers) {
peerLen = len(relayPeers)
}
peersToPing = append(peersToPing, relayPeers[0:peerLen]...)
}
lastTimeExecuted = w.timesource.Now()
case <-ctx.Done(): case <-ctx.Done():
w.log.Info("stopping ping protocol") w.log.Info("stopping ping protocol")
return return
} }
pingWg := sync.WaitGroup{}
pingWg.Add(len(peersToPing))
for _, p := range peersToPing {
go w.pingPeer(ctx, &pingWg, p)
}
pingWg.Wait()
lastTimeExecuted = w.timesource.Now()
} }
} }
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, forceDisconnectOnFail bool) { func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID) {
defer wg.Done() defer wg.Done()
logger := w.log.With(logging.HostID("peer", peerID))
for i := 0; i < maxAllowedPingFailures; i++ {
if w.host.Network().Connectedness(peerID) != network.Connected {
// Peer is no longer connected. No need to ping
return
}
logger.Debug("pinging")
if w.tryPing(ctx, peerID, logger) {
return
}
}
if w.host.Network().Connectedness(peerID) != network.Connected {
return
}
logger.Info("disconnecting dead peer")
if err := w.host.Network().ClosePeer(peerID); err != nil {
logger.Debug("closing conn to peer", zap.Error(err))
}
}
func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool {
ctx, cancel := context.WithTimeout(ctx, 7*time.Second) ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel() defer cancel()
logger := w.log.With(logging.HostID("peer", peerID))
logger.Debug("pinging")
pr := ping.Ping(ctx, w.host, peerID) pr := ping.Ping(ctx, w.host, peerID)
select { select {
case res := <-pr: case res := <-pr:
if res.Error != nil { if res.Error != nil {
w.keepAliveMutex.Lock()
w.keepAliveFails[peerID]++
w.keepAliveMutex.Unlock()
logger.Debug("could not ping", zap.Error(res.Error)) logger.Debug("could not ping", zap.Error(res.Error))
} else { return false
w.keepAliveMutex.Lock()
delete(w.keepAliveFails, peerID)
w.keepAliveMutex.Unlock()
} }
case <-ctx.Done(): case <-ctx.Done():
w.keepAliveMutex.Lock() if !errors.Is(ctx.Err(), context.Canceled) {
w.keepAliveFails[peerID]++ logger.Debug("could not ping (context)", zap.Error(ctx.Err()))
w.keepAliveMutex.Unlock()
logger.Debug("could not ping (context done)", zap.Error(ctx.Err()))
}
w.keepAliveMutex.Lock()
if (forceDisconnectOnFail || w.keepAliveFails[peerID] > maxAllowedPingFailures) && w.host.Network().Connectedness(peerID) == network.Connected {
logger.Info("disconnecting peer")
if err := w.host.Network().ClosePeer(peerID); err != nil {
logger.Debug("closing conn to peer", zap.Error(err))
} }
w.keepAliveFails[peerID] = 0 return false
} }
w.keepAliveMutex.Unlock() return true
} }

View File

@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -40,15 +39,13 @@ func TestKeepAlive(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
w := &WakuNode{ w := &WakuNode{
host: host1, host: host1,
wg: wg, wg: wg,
log: utils.Logger(), log: utils.Logger(),
keepAliveMutex: sync.Mutex{},
keepAliveFails: make(map[peer.ID]int),
} }
w.wg.Add(1) w.wg.Add(1)
w.pingPeer(ctx2, w.wg, peerID2, false) w.pingPeer(ctx2, w.wg, peerID2)
require.NoError(t, ctx.Err()) require.NoError(t, ctx.Err())
} }
@ -70,7 +67,7 @@ func TestPeriodicKeepAlive(t *testing.T) {
WithPrivateKey(prvKey), WithPrivateKey(prvKey),
WithHostAddress(hostAddr), WithHostAddress(hostAddr),
WithWakuRelay(), WithWakuRelay(),
WithKeepAlive(time.Second), WithKeepAlive(time.Minute, time.Second),
) )
require.NoError(t, err) require.NoError(t, err)

View File

@ -116,9 +116,6 @@ type WakuNode struct {
addressChangesSub event.Subscription addressChangesSub event.Subscription
enrChangeCh chan struct{} enrChangeCh chan struct{}
keepAliveMutex sync.Mutex
keepAliveFails map[peer.ID]int
cancel context.CancelFunc cancel context.CancelFunc
wg *sync.WaitGroup wg *sync.WaitGroup
@ -193,7 +190,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params w.opts = params
w.log = params.logger.Named("node2") w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{} w.wg = &sync.WaitGroup{}
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay) w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo) w.circuitRelayNodes = make(chan peer.AddrInfo)
w.metrics = newMetrics(params.prometheusReg) w.metrics = newMetrics(params.prometheusReg)
@ -382,9 +378,9 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err return err
} }
if w.opts.keepAliveInterval > time.Duration(0) { if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) {
w.wg.Add(1) w.wg.Add(1)
go w.startKeepAlive(ctx, w.opts.keepAliveInterval) go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval)
} }
w.metadata.SetHost(host) w.metadata.SetHost(host)

View File

@ -114,7 +114,8 @@ type WakuNodeParameters struct {
rlnTreePath string rlnTreePath string
rlnMembershipContractAddress common.Address rlnMembershipContractAddress common.Address
keepAliveInterval time.Duration keepAliveRandomPeersInterval time.Duration
keepAliveAllPeersInterval time.Duration
enableLightPush bool enableLightPush bool
@ -476,10 +477,14 @@ func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption {
} }
// WithKeepAlive is a WakuNodeOption used to set the interval of time when // WithKeepAlive is a WakuNodeOption used to set the interval of time when
// each peer will be ping to keep the TCP connection alive // each peer will be ping to keep the TCP connection alive. Option accepts two
func WithKeepAlive(t time.Duration) WakuNodeOption { // intervals, the `randomPeersInterval`, which will be used to ping full mesh
// peers (if using relay) and random connected peers, and `allPeersInterval`
// which is used to ping all connected peers
func WithKeepAlive(randomPeersInterval time.Duration, allPeersInterval time.Duration) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.keepAliveInterval = t params.keepAliveRandomPeersInterval = randomPeersInterval
params.keepAliveAllPeersInterval = allPeersInterval
return nil return nil
} }
} }

View File

@ -58,7 +58,7 @@ func TestWakuOptions(t *testing.T) {
WithWakuStore(), WithWakuStore(),
WithMessageProvider(&persistence.DBStore{}), WithMessageProvider(&persistence.DBStore{}),
WithLightPush(), WithLightPush(),
WithKeepAlive(time.Hour), WithKeepAlive(time.Minute, time.Hour),
WithTopicHealthStatusChannel(topicHealthStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
WithWakuStoreFactory(storeFactory), WithWakuStoreFactory(storeFactory),
} }
@ -107,7 +107,7 @@ func TestWakuRLNOptions(t *testing.T) {
WithWakuStore(), WithWakuStore(),
WithMessageProvider(&persistence.DBStore{}), WithMessageProvider(&persistence.DBStore{}),
WithLightPush(), WithLightPush(),
WithKeepAlive(time.Hour), WithKeepAlive(time.Minute, time.Hour),
WithTopicHealthStatusChannel(topicHealthStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
WithWakuStoreFactory(storeFactory), WithWakuStoreFactory(storeFactory),
WithStaticRLNRelay(&index, handleSpam), WithStaticRLNRelay(&index, handleSpam),
@ -147,7 +147,7 @@ func TestWakuRLNOptions(t *testing.T) {
WithWakuStore(), WithWakuStore(),
WithMessageProvider(&persistence.DBStore{}), WithMessageProvider(&persistence.DBStore{}),
WithLightPush(), WithLightPush(),
WithKeepAlive(time.Hour), WithKeepAlive(time.Minute, time.Hour),
WithTopicHealthStatusChannel(topicHealthStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
WithWakuStoreFactory(storeFactory), WithWakuStoreFactory(storeFactory),
WithDynamicRLNRelay(keystorePath, keystorePassword, rlnTreePath, common.HexToAddress(contractAddress), &index, handleSpam, ethClientAddress), WithDynamicRLNRelay(keystorePath, keystorePassword, rlnTreePath, common.HexToAddress(contractAddress), &index, handleSpam, ethClientAddress),