feat(waku)_: disconnect all peers if ping to randomly choosen peers fail 2 times (#5526)
This commit is contained in:
parent
a3e834c26e
commit
8de8818516
2
go.mod
2
go.mod
|
@ -96,7 +96,7 @@ require (
|
||||||
github.com/schollz/peerdiscovery v1.7.0
|
github.com/schollz/peerdiscovery v1.7.0
|
||||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||||
github.com/urfave/cli/v2 v2.27.2
|
github.com/urfave/cli/v2 v2.27.2
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d
|
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d
|
||||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
||||||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d h1:bXYwTtpvJSckgV1Jks2XpnZzvk1x1StJw9cL1odqOwY=
|
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d h1:thwE3nxBaINnQllutuZdMdyLMUZLKwX7TRufs1IrlQc=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/network"
|
"github.com/libp2p/go-libp2p/core/network"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||||
|
@ -24,6 +25,17 @@ const sleepDetectionIntervalFactor = 3
|
||||||
|
|
||||||
const maxPeersToPing = 10
|
const maxPeersToPing = 10
|
||||||
|
|
||||||
|
const maxAllowedSubsequentPingFailures = 2
|
||||||
|
|
||||||
|
func disconnectAllPeers(host host.Host, logger *zap.Logger) {
|
||||||
|
for _, p := range host.Network().Peers() {
|
||||||
|
err := host.Network().ClosePeer(p)
|
||||||
|
if err != nil {
|
||||||
|
logger.Debug("closing conn to peer", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// startKeepAlive creates a go routine that periodically pings connected peers.
|
// startKeepAlive creates a go routine that periodically pings connected peers.
|
||||||
// This is necessary because TCP connections are automatically closed due to inactivity,
|
// This is necessary because TCP connections are automatically closed due to inactivity,
|
||||||
// and doing a ping will avoid this (with a small bandwidth cost)
|
// and doing a ping will avoid this (with a small bandwidth cost)
|
||||||
|
@ -54,6 +66,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
|
||||||
|
|
||||||
sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor
|
sleepDetectionInterval := int64(randomPeersPingDuration) * sleepDetectionIntervalFactor
|
||||||
|
|
||||||
|
var iterationFailure int
|
||||||
for {
|
for {
|
||||||
peersToPing := []peer.ID{}
|
peersToPing := []peer.ID{}
|
||||||
|
|
||||||
|
@ -72,12 +85,12 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
|
||||||
if difference > sleepDetectionInterval {
|
if difference > sleepDetectionInterval {
|
||||||
lastTimeExecuted = w.timesource.Now()
|
lastTimeExecuted = w.timesource.Now()
|
||||||
w.log.Warn("keep alive hasnt been executed recently. Killing all connections")
|
w.log.Warn("keep alive hasnt been executed recently. Killing all connections")
|
||||||
for _, p := range w.host.Network().Peers() {
|
disconnectAllPeers(w.host, w.log)
|
||||||
err := w.host.Network().ClosePeer(p)
|
continue
|
||||||
if err != nil {
|
} else if iterationFailure >= maxAllowedSubsequentPingFailures {
|
||||||
w.log.Debug("closing conn to peer", zap.Error(err))
|
iterationFailure = 0
|
||||||
}
|
w.log.Warn("Pinging random peers failed, node is likely disconnected. Killing all connections")
|
||||||
}
|
disconnectAllPeers(w.host, w.log)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,16 +131,31 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
|
||||||
|
|
||||||
pingWg := sync.WaitGroup{}
|
pingWg := sync.WaitGroup{}
|
||||||
pingWg.Add(len(peersToPing))
|
pingWg.Add(len(peersToPing))
|
||||||
|
pingResultChan := make(chan bool, len(peersToPing))
|
||||||
for _, p := range peersToPing {
|
for _, p := range peersToPing {
|
||||||
go w.pingPeer(ctx, &pingWg, p)
|
go w.pingPeer(ctx, &pingWg, p, pingResultChan)
|
||||||
}
|
}
|
||||||
pingWg.Wait()
|
pingWg.Wait()
|
||||||
|
close(pingResultChan)
|
||||||
|
|
||||||
|
failureCounter := 0
|
||||||
|
for couldPing := range pingResultChan {
|
||||||
|
if !couldPing {
|
||||||
|
failureCounter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(peersToPing) > 0 && failureCounter == len(peersToPing) {
|
||||||
|
iterationFailure++
|
||||||
|
} else {
|
||||||
|
iterationFailure = 0
|
||||||
|
}
|
||||||
|
|
||||||
lastTimeExecuted = w.timesource.Now()
|
lastTimeExecuted = w.timesource.Now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID) {
|
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
logger := w.log.With(logging.HostID("peer", peerID))
|
logger := w.log.With(logging.HostID("peer", peerID))
|
||||||
|
@ -135,17 +163,20 @@ func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer
|
||||||
for i := 0; i < maxAllowedPingFailures; i++ {
|
for i := 0; i < maxAllowedPingFailures; i++ {
|
||||||
if w.host.Network().Connectedness(peerID) != network.Connected {
|
if w.host.Network().Connectedness(peerID) != network.Connected {
|
||||||
// Peer is no longer connected. No need to ping
|
// Peer is no longer connected. No need to ping
|
||||||
|
resultChan <- false
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug("pinging")
|
logger.Debug("pinging")
|
||||||
|
|
||||||
if w.tryPing(ctx, peerID, logger) {
|
if w.tryPing(ctx, peerID, logger) {
|
||||||
|
resultChan <- true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.host.Network().Connectedness(peerID) != network.Connected {
|
if w.host.Network().Connectedness(peerID) != network.Connected {
|
||||||
|
resultChan <- false
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,6 +184,8 @@ func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer
|
||||||
if err := w.host.Network().ClosePeer(peerID); err != nil {
|
if err := w.host.Network().ClosePeer(peerID); err != nil {
|
||||||
logger.Debug("closing conn to peer", zap.Error(err))
|
logger.Debug("closing conn to peer", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resultChan <- false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool {
|
func (w *WakuNode) tryPing(ctx context.Context, peerID peer.ID, logger *zap.Logger) bool {
|
||||||
|
|
|
@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||||
github.com/waku-org/go-libp2p-rendezvous
|
github.com/waku-org/go-libp2p-rendezvous
|
||||||
github.com/waku-org/go-libp2p-rendezvous/db
|
github.com/waku-org/go-libp2p-rendezvous/db
|
||||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||||
# github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d
|
# github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d
|
||||||
## explicit; go 1.21
|
## explicit; go 1.21
|
||||||
github.com/waku-org/go-waku/logging
|
github.com/waku-org/go-waku/logging
|
||||||
github.com/waku-org/go-waku/tests
|
github.com/waku-org/go-waku/tests
|
||||||
|
|
Loading…
Reference in New Issue