chore: disconnect on subsequent ping failures (#1164)

This commit is contained in:
richΛrd 2024-07-21 20:43:22 -04:00 committed by GitHub
parent f3da812b33
commit 75047cc9da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 9 deletions

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-libp2p/p2p/protocol/ping"
@ -24,6 +25,17 @@ const sleepDetectionIntervalFactor = 3
const maxPeersToPing = 10 const maxPeersToPing = 10
const maxAllowedSubsequentPingFailures = 2
func disconnectAllPeers(host host.Host, logger *zap.Logger) {
for _, p := range host.Network().Peers() {
err := host.Network().ClosePeer(p)
if err != nil {
logger.Debug("closing conn to peer", zap.Error(err))
}
}
}
// startKeepAlive creates a go routine that periodically pings connected peers. // startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity, // This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost) // and doing a ping will avoid this (with a small bandwidth cost)
@ -54,6 +66,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor
var iterationFailure int
for { for {
peersToPing := []peer.ID{} peersToPing := []peer.ID{}
@ -72,12 +85,12 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
if difference > sleepDetectionInterval { if difference > sleepDetectionInterval {
lastTimeExecuted = w.timesource.Now() lastTimeExecuted = w.timesource.Now()
w.log.Warn("keep alive hasnt been executed recently. Killing all connections") w.log.Warn("keep alive hasnt been executed recently. Killing all connections")
for _, p := range w.host.Network().Peers() { disconnectAllPeers(w.host, w.log)
err := w.host.Network().ClosePeer(p) continue
if err != nil { } else if iterationFailure >= maxAllowedSubsequentPingFailures {
w.log.Debug("closing conn to peer", zap.Error(err)) iterationFailure = 0
} w.log.Warn("Pinging random peers failed, node is likely disconnected. Killing all connections")
} disconnectAllPeers(w.host, w.log)
continue continue
} }
@ -118,16 +131,31 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
pingWg := sync.WaitGroup{} pingWg := sync.WaitGroup{}
pingWg.Add(len(peersToPing)) pingWg.Add(len(peersToPing))
pingResultChan := make(chan bool, len(peersToPing))
for _, p := range peersToPing { for _, p := range peersToPing {
go w.pingPeer(ctx, &pingWg, p) go w.pingPeer(ctx, &pingWg, p, pingResultChan)
} }
pingWg.Wait() pingWg.Wait()
close(pingResultChan)
failureCounter := 0
for couldPing := range pingResultChan {
if !couldPing {
failureCounter++
}
}
if len(peersToPing) > 0 && failureCounter == len(peersToPing) {
iterationFailure++
} else {
iterationFailure = 0
}
lastTimeExecuted = w.timesource.Now() lastTimeExecuted = w.timesource.Now()
} }
} }
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID) { func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) {
defer wg.Done() defer wg.Done()
logger := w.log.With(logging.HostID("peer", peerID)) logger := w.log.With(logging.HostID("peer", peerID))
@ -135,17 +163,20 @@ func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer
for i := 0; i < maxAllowedPingFailures; i++ { for i := 0; i < maxAllowedPingFailures; i++ {
if w.host.Network().Connectedness(peerID) != network.Connected { if w.host.Network().Connectedness(peerID) != network.Connected {
// Peer is no longer connected. No need to ping // Peer is no longer connected. No need to ping
resultChan <- false
return return
} }
logger.Debug("pinging") logger.Debug("pinging")
if w.tryPing(ctx, peerID, logger) { if w.tryPing(ctx, peerID, logger) {
resultChan <- true
return return
} }
} }
if w.host.Network().Connectedness(peerID) != network.Connected { if w.host.Network().Connectedness(peerID) != network.Connected {
resultChan <- false
return return
} }
@ -153,6 +184,8 @@ func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer
if err := w.host.Network().ClosePeer(peerID); err != nil { if err := w.host.Network().ClosePeer(peerID); err != nil {
logger.Debug("closing conn to peer", zap.Error(err)) logger.Debug("closing conn to peer", zap.Error(err))
} }
resultChan <- false
} }
func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool { func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool {

View File

@ -45,9 +45,11 @@ func TestKeepAlive(t *testing.T) {
} }
w.wg.Add(1) w.wg.Add(1)
w.pingPeer(ctx2, w.wg, peerID2)
peerFailureSignalChan := make(chan bool, 1)
w.pingPeer(ctx2, w.wg, peerID2, peerFailureSignalChan)
require.NoError(t, ctx.Err()) require.NoError(t, ctx.Err())
close(peerFailureSignalChan)
} }
func TestPeriodicKeepAlive(t *testing.T) { func TestPeriodicKeepAlive(t *testing.T) {